Messaging

One of the important parts of workflow is ability to interact with outside world. In many cases this means sending and receiving data from other systems.

Automatiko makes it easy to consume and produce events that are relevant to the workflow definition. These are defined in the workflow definition as message events (in BPMN) and events (in serverless workflow).

water leaks workflow msg

In the above workflow definition there are three message events

  • Water measurement - it’s a start event that is triggered based on incoming event

  • Humidity measurement - it’s another start event that is triggered based on incoming event

  • Publish hourly measurement - is an end event that will publish an event

This illustrates that workflow can consume and produce messages as messages.

There are several things that must be provided to make sure that events can be consumed or produced.

Connectors

Automatiko uses connectors to provide connectivity with other systems. There are several out of the box connectors available depending which use case for the service you selected (event streams, IoT, messaging, etc).

Table 1. Supported expressions
Connector Automatiko use case Description

Apache Kafka

Event Streams

Preconfigured connector for EventStream use case to take advantage of Kafka records as the format of the events.

MQTT

IoT

Preconfigured connector for IoT use case to allow simple integrations with Internet of Things appliances e.g. sensors

Apache Camel

EventStreams and Batch

Preconfigured for Batch Processing use case as Apache Camel comes with comprehensive list of components that allow to connect to various 3rd party systems

JMS

Messaging and Batch

Allows to use JMS as traditional messaging middleware as part of workflows

AMQP

Messaging and Batch

Similar to JMS but more powerful in terms of messaging capabilities

In case there is more than one connector defined in the service each message definition needs to provide connector custom attribute to indicate which connector should be used. Allowed values kafka, mqtt, camel, amqp, jms

Configuration of the connector

In most of the cases there is no need to explicitly configure any of the connector properties as Automatiko will do that for you and will print instructions at build time

Expand Build time instructions section to see an example of instructions generated during build.

Build time instructions
59) ****************** Automatiko Instructions *********************
602
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) Following are set of information that can be useful down the line...
603
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) Properties for MQTT based message event 'Water measurement'
604
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.water.topic' should be used to configure MQTT topic defaults to 'building/+/+/water'
605
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.water.host' should be used to configure MQTT host that defaults to localhost
606
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.water.port' should be used to configure MQTT port that defaults to 1883
607
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.water.client-id' should be used to configure MQTT client id that defaults to 'Waterleaks-consumer'
608
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) Properties for MQTT based message event 'Humidity measurement'
609
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.humidity.topic' should be used to configure MQTT topic defaults to 'building/+/+/humidity'
610
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.humidity.host' should be used to configure MQTT host that defaults to localhost
611
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.humidity.port' should be used to configure MQTT port that defaults to 1883
612
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.humidity.client-id' should be used to configure MQTT client id that defaults to 'Waterleaks-consumer'
613
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) Properties for MQTT based message event 'Report received'
614
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.buildingreports.topic' should be used to configure MQTT topic defaults to 'reports/+/+/hourly'
615
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.buildingreports.host' should be used to configure MQTT host that defaults to localhost
616
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.buildingreports.port' should be used to configure MQTT port that defaults to 1883
617
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.buildingreports.client-id' should be used to configure MQTT client id that defaults to 'Reports-consumer'
618
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) Properties for MQTT based message event 'Publish hourly measurement'
619
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.outgoing.reports.topic' should be used to configure MQTT topic defaults to 'reports'
620
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.outgoing.reports.host' should be used to configure MQTT host that defaults to localhost
621
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.outgoing.reports.port' should be used to configure MQTT port that defaults to 1883
622
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.outgoing.reports.client-id' should be used to configure MQTT client id that defaults to 'Waterleaks-producer'
623
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) ***************************************************************

In case you will have to change them or add extra configuration properties, you can do that in three ways

  • directly in application.properties file in src/main/resources

  • via system properties (-Dprop=value)

  • via environment variables

Consuming events

Consuming events can be applied in workflow in three scenarios

  • to start an instance of the workflow

  • to continue already started instance as part of the main path (so called intermediate events)

  • to trigger alternative paths in already started workflow instance (so called event sub workflows/process)

To be able to consume events in the workflow there must be a message definition present

A properly defined message must consists of

  • name

  • data type

Name of the message is by default used as topic on message broker
messaging definition message

In addition to that there are optional attributes that can be specified to instruct the runtime on expected execution behaviour. These are given via custom attributes

  • topic - instructs what is the expected topic in the message broker this message should be connected to - if not defined name of the message is used

  • correlation - a static value used to do correlation between incoming message and active workflow instances - see correlation section for details

  • correlationExpression - an expression that will be applied on the incoming event to extract correlation value to be used to find matching workflow instances

  • connector - in case there are multiple connectors used it must be given explicitly for every message. Note it is not required if there is only one connector - e.g. MQTT or Apache Kafka.

messaging definition general

In the above example topic and correlationExpression are defined to connect to MQTT message broker and reply on MQTT feature called wildcard topic. It will receive events from any building and room of that building for water measurements.

Expression topic(message, 1, 2) is a function available out of the box that allows to extract elements of the actual topic event was received from.

Event published to building/ABC/room1/water will extract correlation of ABC-room1 and use it to either find existing instances or start new instance and assign that value as business key.

In addition to defining the message the received event (which data type was given on the message definition) needs to be mapped to workflow data objects

messaging data mapping

Event can be mapped directly to given data object (above) or as expression (below) that allows more options like adding to a list. In this example received event is added to a list of measurements data object.

messaging data mapping expr
Table 2. Supported expressions
Expression Description

var.field.another

Dot notation for data objects to fill in given attribute of the data object instead of the entire data object. Note that data object needs to be initialised to be able to fill in its attributes

list[+]

Add item to a data object that is of List type

list[-]

Remove item from a data object that is of List type, it relies on identity of the data objects to be properly removed

Producing events

Producing events can be applied in workflow in three scenarios

  • to end an instance of the workflow

  • to push out data of the instance as part of the main path (so called intermediate events)

  • to end an alternative paths in workflow instance (so called event sub workflows/process)

To be able to produce events in the workflow there must be a message definition present

A properly defined message must consists of

  • name

  • data type

Name of the message is by default used as topic on message broker

In addition to that there are optional attributes that can be specified to instruct the runtime on expected execution behaviour. These are given via custom attributes

messaging publishing attr
  • topicExpression - instructs what is the expected topic in the message broker this message should be sent to - if not defined name of the message is used - applies to MQTT only

  • addressExpression - instructs what is the expected address in the message broker this message should be sent to - if not defined name of the message is used - applies to AMQP only

  • connector - in case there are multiple connectors used it must be given explicitly for every message. Note it is not required if there is only one connector - e.g. MQTT or Apache Kafka.

topicExpression provides an flexible way to define location where the event should be published - it is dedicated to MQTT, similar addressExpression provides same flexibility for dynamically selecting address that message should be pushed to - this one is dedicated to AMQP

Similar to consuming events, producing events also requires data mapping. This is to instruct what should be the payload of the event

messaging publishing mapping

Correlation

Correlation refers to finding workflow instances that should be given the received event. Automatiko allows to define correlation related attributes on each message event within the workflow.

  • correlation - a static value used to do correlation between incoming message and active workflow instances - see correlation section for details

  • correlationExpression - an expression that will be applied on the incoming event to extract correlation value to be used to find matching workflow instances

Correlation expression can be defined in Functions specific to the service to hide the complexity of the correlation.

In case correlation attribute (either correlation or correlationExpression) is found it will be used to look up workflow instances. Look up mechanism takes into consideration two items of the workflow instance

  • business key

  • workflow instance tags

In case any of these two matches the correlation that workflow instance will be given the event via the message event defined in the workflow.

There is additional correlation used in case of Apache Kafka used as message broker - that is the key of the kafka record.

Event payload converters

Messages defined in workflow must have data type configured but sometimes the data type cannot be easily mapped to the received event. To accommodate this use case service developers can implement converters.

  • io.automatiko.engine.api.io.InputConverter<D>

  • io.automatiko.engine.api.io.OutputConverter<V, T>

where

  • D for InputConverter is the actual data type expected by message.

  • V is the data type used in workflow and then T is the type to be sent out for OutputConverter

Apache Camel connector

When using Apache Camel as the connector, in many cases there is a need to specify additional information to be available to Camel component. These are usually set as Message headers from Camel component point of view.

Automatiko allows to specify any number of headers via custom attributes of the message.

Custom attributes that starts with Camel will be considered headers.

For example when using Camel Dropbox component to upload files you can specify the target file name as custom attribute that will be set as header on Camel’s message.

messaging camel headers

Headers can be set as

  • static value e.g. "my value"

  • reference to a workflow variable e.g. myvariable

  • reference to a function e.g. buildFileName() or buildFileName(id) where id is workflow instance id

JMS connector

In many situations when using JMS as the messaging provider there is a need to set message properties. It is really handy when combined with message selectors to filter efficiently messages that should not be consumed.

Automatiko allows to specify any number of properties via custom attributes of the message.

Custom attributes that starts with JMS will be considered properties.

Properties can be set as

  • static value e.g. "my value"

  • reference to a workflow variable e.g. myvariable

  • reference to a function e.g. buildFileName() or buildFileName(id) where id is workflow instance id