
One of the important parts of workflow is ability to interact with outside world. In many cases this means sending and receiving data from other systems.

Automatiko makes it easy to consume and produce events that are relevant to the workflow definition. These are defined in the workflow definition as message events (in BPMN) and events (in serverless workflow).

water leaks workflow msg

In the above workflow definition there are three message events

  • Water measurement - it’s a start event that is triggered based on incoming event

  • Humidity measurement - it’s another start event that is triggered based on incoming event

  • Publish hourly measurement - is an end event that will publish an event

This illustrates that workflow can consume and produce messages as messages.

There are several things that must be provided to make sure that events can be consumed or produced.


Automatiko uses connectors to provide connectivity with other systems. There are several out of the box connectors available depending which use case for the service you selected (event streams, IoT, messaging, etc).

Table 1. Supported expressions
Connector Automatiko use case Description

Apache Kafka

Event Streams

Preconfigured connector for EventStream use case to take advantage of Kafka records as the format of the events.



Preconfigured connector for IoT use case to allow simple integrations with Internet of Things appliances e.g. sensors

Apache Camel

EventStreams and Batch

Preconfigured for Batch Processing use case as Apache Camel comes with comprehensive list of components that allow to connect to various 3rd party systems


Messaging and Batch

Allows to use JMS as traditional messaging middleware as part of workflows


Messaging and Batch

Similar to JMS but more powerful in terms of messaging capabilities


Messaging and Orchestration

HTTP based messaging that allows to use simple message posting and receiving

Apache Pulsar

Event Streams

Preconfigured connector for EventStream use case to take advantage of Pulsar messages as the format of the events.


Messaging and Batch

Similar to AMQP but with additional messaging capabilities


Messaging and async processing

Reactive messaging based async invocation

In case there is more than one connector defined in the service each message definition needs to provide connector custom attribute to indicate which connector should be used. Allowed values kafka, mqtt, camel, amqp, jms, http, pulsar, rabbitmq

Configuration of the connector

In most of the cases there is no need to explicitly configure any of the connector properties as Automatiko will do that for you and will print instructions at build time

Expand Build time instructions section to see an example of instructions generated during build.

Build time instructions
59) ****************** Automatiko Instructions *********************
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) Following are set of information that can be useful down the line...
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) Properties for MQTT based message event 'Water measurement'
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.water.topic' should be used to configure MQTT topic defaults to 'building/+/+/water'
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'' should be used to configure MQTT host that defaults to localhost
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.water.port' should be used to configure MQTT port that defaults to 1883
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.water.client-id' should be used to configure MQTT client id that defaults to 'Waterleaks-consumer'
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) Properties for MQTT based message event 'Humidity measurement'
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.humidity.topic' should be used to configure MQTT topic defaults to 'building/+/+/humidity'
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'' should be used to configure MQTT host that defaults to localhost
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.humidity.port' should be used to configure MQTT port that defaults to 1883
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.humidity.client-id' should be used to configure MQTT client id that defaults to 'Waterleaks-consumer'
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) Properties for MQTT based message event 'Report received'
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.buildingreports.topic' should be used to configure MQTT topic defaults to 'reports/+/+/hourly'
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'' should be used to configure MQTT host that defaults to localhost
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.buildingreports.port' should be used to configure MQTT port that defaults to 1883
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.incoming.buildingreports.client-id' should be used to configure MQTT client id that defaults to 'Reports-consumer'
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) Properties for MQTT based message event 'Publish hourly measurement'
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.outgoing.reports.topic' should be used to configure MQTT topic defaults to 'reports'
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'' should be used to configure MQTT host that defaults to localhost
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.outgoing.reports.port' should be used to configure MQTT port that defaults to 1883
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) 	'mp.messaging.outgoing.reports.client-id' should be used to configure MQTT client id that defaults to 'Waterleaks-producer'
2020-12-31 19:18:15,246 INFO  [io.aut.eng.cod.GeneratorContext] (build-59) ***************************************************************

In case you will have to change them or add extra configuration properties, you can do that in three ways

  • directly in file in src/main/resources

  • via system properties (-Dprop=value)

  • via environment variables

Consuming events

Consuming events can be applied in workflow in three scenarios

  • to start an instance of the workflow

  • to continue already started instance as part of the main path (so called intermediate events)

  • to trigger alternative paths in already started workflow instance (so called event sub workflows/process)

To be able to consume events in the workflow there must be a message definition present

A properly defined message must consists of

  • name

  • data type

Name of the message is by default used as topic on message broker
messaging definition message

In addition to that there are optional attributes that can be specified to instruct the runtime on expected execution behaviour. These are given via custom attributes

  • topic - instructs what is the expected topic in the message broker this message should be connected to - if not defined name of the message is used

  • correlation - a static value used to do correlation between incoming message and active workflow instances - see correlation section for details

  • correlationExpression - an expression that will be applied on the incoming event to extract correlation value to be used to find matching workflow instances

  • connector - in case there are multiple connectors used it must be given explicitly for every message. Note it is not required if there is only one connector - e.g. MQTT or Apache Kafka.

  • filterExpression - allows to pass a function/expression that will have access to message payload (already converted) and message itself to check if given message is accepted or not. By default all messages are accepted.

  • channel - specifies what channel should be used for this message - by default name of the node is used - only applies to direct connector

messaging definition general

In the above example topic and correlationExpression are defined to connect to MQTT message broker and reply on MQTT feature called wildcard topic. It will receive events from any building and room of that building for water measurements.

Expression topic(message, 1, 2) is a function available out of the box that allows to extract elements of the actual topic event was received from.

Event published to building/ABC/room1/water will extract correlation of ABC-room1 and use it to either find existing instances or start new instance and assign that value as business key.

In addition to defining the message the received event (which data type was given on the message definition) needs to be mapped to workflow data objects

messaging data mapping

Event can be mapped directly to given data object (above) or as expression (below) that allows more options like adding to a list. In this example received event is added to a list of measurements data object.

messaging data mapping expr
Table 2. Supported expressions
Expression Description


Dot notation for data objects to fill in given attribute of the data object instead of the entire data object. Note that data object needs to be initialised to be able to fill in its attributes


Add item to a data object that is of List type


Remove item from a data object that is of List type, it relies on identity of the data objects to be properly removed

Producing events

Producing events can be applied in workflow in three scenarios

  • to end an instance of the workflow

  • to push out data of the instance as part of the main path (so called intermediate events)

  • to end an alternative paths in workflow instance (so called event sub workflows/process)

To be able to produce events in the workflow there must be a message definition present

A properly defined message must consists of

  • name

  • data type

Name of the message is by default used as topic on message broker

In addition to that there are optional attributes that can be specified to instruct the runtime on expected execution behaviour. These are given via custom attributes

messaging publishing attr
  • topicExpression - instructs what is the expected topic in the message broker this message should be sent to - if not defined name of the message is used - applies to MQTT only

  • addressExpression - instructs what is the expected address in the message broker this message should be sent to - if not defined name of the message is used - applies to AMQP only

  • keyExpression - instructs what is the expected key for a Kafka/Pulsar message being sent - if not defined business key is used which can be null and by that no key is set - applies to Apache Kafka and Apache Pulsar only

  • routingKeyExpression - instructs what is the expected routing key in the message broker this message should be sent to - if not defined name of the message is used - applies to RabbitMQ only

  • connector - in case there are multiple connectors used it must be given explicitly for every message. Note it is not required if there is only one connector - e.g. MQTT or Apache Kafka.

  • channel - specifies what channel should be used for this message - by default name of the node is used - only applies to direct connector

topicExpression provides an flexible way to define location where the event should be published - it is dedicated to MQTT, similar addressExpression provides same flexibility for dynamically selecting address that message should be pushed to - this one is dedicated to AMQP

Similar to consuming events, producing events also requires data mapping. This is to instruct what should be the payload of the event

messaging publishing mapping


Correlation refers to finding workflow instances that should be given the received event. Automatiko allows to define correlation related attributes on each message event within the workflow.

  • correlation - a static value used to do correlation between incoming message and active workflow instances - see correlation section for details

  • correlationExpression - an expression that will be applied on the incoming event to extract correlation value to be used to find matching workflow instances

Correlation expression can be defined in Functions specific to the service to hide the complexity of the correlation.

In case correlation attribute (either correlation or correlationExpression) is found it will be used to look up workflow instances. Look up mechanism takes into consideration two items of the workflow instance

  • business key

  • workflow instance tags

In case any of these two matches the correlation that workflow instance will be given the event via the message event defined in the workflow.

There is additional correlation used in case of Apache Kafka used as message broker - that is the key of the kafka record.


Acknowledgement can be configured on message similar to how correlation is defined that is via custom attributes. Acknowledgement is defined via custom property named ack-mode and can have following values:

  • post - the acknowledgement of the incoming message is executed once the produced message is acknowledged.

  • pre - the acknowledgement of the incoming message is executed before the message is processed by the method.

  • manual - the acknowledgement is done by the user.

  • none - No acknowledgment is performed, neither manually or automatically.

Event payload converters

Messages defined in workflow must have data type configured but sometimes the data type cannot be easily mapped to the received event. To accommodate this use case service developers can implement converters.


  •<V, T>


  • D for InputConverter is the actual data type expected by message.

  • V is the data type used in workflow and then T is the type to be sent out for OutputConverter

Apache Camel connector

When using Apache Camel as the connector, in many cases there is a need to specify additional information to be available to Camel component. These are usually set as Message headers from Camel component point of view.

Automatiko allows to specify any number of headers via custom attributes of the message.

Custom attributes that starts with Camel will be considered headers.

For example when using Camel Dropbox component to upload files you can specify the target file name as custom attribute that will be set as header on Camel’s message.

messaging camel headers

Headers can be set as

  • static value e.g. "my value"

  • reference to a workflow variable e.g. myvariable

  • reference to a function e.g. buildFileName() or buildFileName(id) where id is workflow instance id

JMS connector

In many situations when using JMS as the messaging provider there is a need to set message properties. It is really handy when combined with message selectors to filter efficiently messages that should not be consumed.

Automatiko allows to specify any number of properties via custom attributes of the message.

Custom attributes that starts with JMS will be considered properties.

Properties can be set as

  • static value e.g. "my value"

  • reference to a workflow variable e.g. myvariable

  • reference to a function e.g. buildFileName() or buildFileName(id) where id is workflow instance id

HTTP connector

HTTP connector requires currently explicitly connector to be set as custom attributes of message.

Automatiko allows to specify any number of headers via custom attributes of the message.

Custom attributes that starts with HTTP will be considered headers.

Headers can be set as

  • static value e.g. "my value"

  • reference to a workflow variable e.g. myvariable

  • reference to a function e.g. buildFileName() or buildFileName(id) where id is workflow instance id

RabbitMQ connector

RabbitMQ is provided as dedicated connector that provides all capabilities of RabitMQ.

Using AMQP for RabbitMQ

RabbitMQ integration is provided by amqp connector but it requires special setup of the RabbitMQ broker and additional properties.

RabbitMQ broker must be configured with enabled amqp 1.0 plugin - rabbitmq_amqp1_0. Details about this plugin can be found here

In addition, incoming channels must have following property defined:


Outgoing channels must have following properties


A complete sample project can be found in Automatiko integration tests