Messaging
One of the important parts of workflow is ability to interact with outside world. In many cases this means sending and receiving data from other systems.
Automatiko makes it easy to consume and produce events that are relevant to the workflow definition. These are defined in the workflow definition as message events (in BPMN) and events (in serverless workflow).
In the above workflow definition there are three message events
-
Water measurement - it’s a start event that is triggered based on incoming event
-
Humidity measurement - it’s another start event that is triggered based on incoming event
-
Publish hourly measurement - is an end event that will publish an event
This illustrates that workflow can consume and produce messages as messages.
There are several things that must be provided to make sure that events can be consumed or produced.
Connectors
Automatiko uses connectors to provide connectivity with other systems. There are several out of the box connectors available depending which use case for the service you selected (event streams, IoT, etc).
Connector | Automatiko use case | Description |
---|---|---|
Apache Kafka |
Event Streams |
Preconfigured connector for EventStream use case to take advantage of Kafka records as the format of the events. |
MQTT |
IoT |
Preconfigured connector for IoT use case to allow simple integrations with Internet of Things appliances e.g. sensors |
Apache Camel |
EventStreams and Batch |
Preconfigured for Batch Processing use case as Apache Camel comes with comprehensive list of components that allow to connect to various 3rd party systems |
In case there is more than one connector defined in the service
each message definition needs to provide connector custom attribute to
indicate which connector should be used.
|
Configuration of the connector
In most of the cases there is no need to explicitly configure any of the connector properties as Automatiko will do that for you and will print instructions at build time
Expand Build time instructions
section to see an example of instructions generated during build.
Build time instructions
59) ****************** Automatiko Instructions *********************
602
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) Following are set of information that can be useful down the line...
603
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) Properties for MQTT based message event 'Water measurement'
604
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) 'mp.messaging.incoming.water.topic' should be used to configure MQTT topic defaults to 'building/+/+/water'
605
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) 'mp.messaging.incoming.water.host' should be used to configure MQTT host that defaults to localhost
606
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) 'mp.messaging.incoming.water.port' should be used to configure MQTT port that defaults to 1883
607
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) 'mp.messaging.incoming.water.client-id' should be used to configure MQTT client id that defaults to 'Waterleaks-consumer'
608
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) Properties for MQTT based message event 'Humidity measurement'
609
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) 'mp.messaging.incoming.humidity.topic' should be used to configure MQTT topic defaults to 'building/+/+/humidity'
610
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) 'mp.messaging.incoming.humidity.host' should be used to configure MQTT host that defaults to localhost
611
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) 'mp.messaging.incoming.humidity.port' should be used to configure MQTT port that defaults to 1883
612
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) 'mp.messaging.incoming.humidity.client-id' should be used to configure MQTT client id that defaults to 'Waterleaks-consumer'
613
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) Properties for MQTT based message event 'Report received'
614
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) 'mp.messaging.incoming.buildingreports.topic' should be used to configure MQTT topic defaults to 'reports/+/+/hourly'
615
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) 'mp.messaging.incoming.buildingreports.host' should be used to configure MQTT host that defaults to localhost
616
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) 'mp.messaging.incoming.buildingreports.port' should be used to configure MQTT port that defaults to 1883
617
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) 'mp.messaging.incoming.buildingreports.client-id' should be used to configure MQTT client id that defaults to 'Reports-consumer'
618
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) Properties for MQTT based message event 'Publish hourly measurement'
619
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) 'mp.messaging.outgoing.reports.topic' should be used to configure MQTT topic defaults to 'reports'
620
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) 'mp.messaging.outgoing.reports.host' should be used to configure MQTT host that defaults to localhost
621
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) 'mp.messaging.outgoing.reports.port' should be used to configure MQTT port that defaults to 1883
622
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) 'mp.messaging.outgoing.reports.client-id' should be used to configure MQTT client id that defaults to 'Waterleaks-producer'
623
2020-12-31 19:18:15,246 INFO [io.aut.eng.cod.GeneratorContext] (build-59) ***************************************************************
In case you will have to change them or add extra configuration properties, you can do that in three ways
-
directly in
application.properties
file insrc/main/resources
-
via system properties (
-Dprop=value
) -
via environment variables
Consuming events
Consuming events can be applied in workflow in three scenarios
-
to start an instance of the workflow
-
to continue already started instance as part of the main path (so called intermediate events)
-
to trigger alternative paths in already started workflow instance (so called event sub workflows/process)
To be able to consume events in the workflow there must be a message definition present
A properly defined message must consists of
-
name
-
data type
Name of the message is by default used as topic on message broker |
In addition to that there are optional attributes that can be specified to instruct the runtime on expected execution behaviour. These are given via custom attributes
-
topic - instructs what is the expected topic in the message broker this message should be connected to - if not defined name of the message is used
-
correlation - a static value used to do correlation between incoming message and active workflow instances - see correlation section for details
-
correlationExpression - an expression that will be applied on the incoming event to extract correlation value to be used to find matching workflow instances
-
connector - in case there are multiple connectors used it must be given explicitly for every message. Note it is not required if there is only one connector - e.g. MQTT or Apache Kafka.
In the above example topic
and correlationExpression
are defined to connect
to MQTT message broker and reply on MQTT feature called wildcard topic. It will
receive events from any building and room of that building for water measurements.
Expression topic(message, 1, 2)
is a function available out of the box that
allows to extract elements of the actual topic event was received from.
Event published to building/ABC/room1/water will extract correlation of ABC-room1
and use it to either find existing instances or start new instance
and assign that value as business key.
|
In addition to defining the message the received event (which data type was given on the message definition) needs to be mapped to workflow data objects
Event can be mapped directly to given data object (above) or as expression (below)
that allows more options like adding to a list. In this example received event
is added to a list of measurements
data object.
Expression | Description |
---|---|
var.field.another |
Dot notation for data objects to fill in given attribute of the data object instead of the entire data object. Note that data object needs to be initialised to be able to fill in its attributes |
list[+] |
Add item to a data object that is of |
list[-] |
Remove item from a data object that is of |
Producing events
Producing events can be applied in workflow in three scenarios
-
to end an instance of the workflow
-
to push out data of the instance as part of the main path (so called intermediate events)
-
to end an alternative paths in workflow instance (so called event sub workflows/process)
To be able to produce events in the workflow there must be a message definition present
A properly defined message must consists of
-
name
-
data type
Name of the message is by default used as topic on message broker |
In addition to that there are optional attributes that can be specified to instruct the runtime on expected execution behaviour. These are given via custom attributes
-
topicExpression - instructs what is the expected topic in the message broker this message should be connected to - if not defined name of the message is used
-
connector - in case there are multiple connectors used it must be given explicitly for every message. Note it is not required if there is only one connector - e.g. MQTT or Apache Kafka.
topicExpression provides an flexible way to define location where
the event should be published. Note all connectors support it - at the moment
it is dedicated to MQTT
|
Similar to consuming events, producing events also requires data mapping. This is to instruct what should be the payload of the event
Correlation
Correlation refers to finding workflow instances that should be given the received event. Automatiko allows to define correlation related attributes on each message event within the workflow.
-
correlation - a static value used to do correlation between incoming message and active workflow instances - see correlation section for details
-
correlationExpression - an expression that will be applied on the incoming event to extract correlation value to be used to find matching workflow instances
Correlation expression can be defined in Functions specific to the service
to hide the complexity of the correlation.
|
In case correlation attribute (either correlation
or correlationExpression
)
is found it will be used to look up workflow instances. Look up mechanism takes
into consideration two items of the workflow instance
-
business key
-
workflow instance tags
In case any of these two matches the correlation that workflow instance will be given the event via the message event defined in the workflow.
There is additional correlation used in case of Apache Kafka used as
message broker - that is the key of the kafka record.
|
Event payload converters
Messages defined in workflow must have data type configured but sometimes the data type cannot be easily mapped to the received event. To accommodate this use case service developers can implement converters.
-
io.automatiko.engine.api.io.InputConverter<D>
-
io.automatiko.engine.api.io.OutputConverter<V, T>
where
-
D
forInputConverter
is the actual data type expected by message. -
V
is the data type used in workflow and thenT
is the type to be sent out forOutputConverter