Water Leak detection
Water leak detection example service demonstrates simple anomalies detection such as water leaks based on collected sensor data.
Buildings are equipped with various sensors such as water meters, humidity and more. In this example each room in the building will report its water consumption on regular intervals - every 2 minutes. In addition to that each room will also report humidity level, again on time intervals - every 10 minutes.
Based on this data, service will perform calculation of
-
min value
-
max value
-
average value
Knowing these values anomalies can be detected by applying rules defined.
-
Humidity higher than 80% can be considered as anomaly or even potential water leak
-
water consumption within 5 consequent measurements that is higher than 100 is considered leak.
In any of the above cases technicians would be called to investigate the situation in the actual building.
Note that this example is currently not equipped with user facing UI |
Run it
The only thing that is needed is Apache Kafka running that will be accessible.
It needs to be given as part of run command via environment variable KAFKA_SERVERS
docker run -e MQTT_SERVER= -e MQTT_PORT= -p 8080:8080 automatiko/event-streams-sensors
To get quickly a running instance of Mosquitto to run the tests visit Docker Hub for details |
once this is done you can see the fully described service at http://localhost:8080/q/swagger-ui/#/
You can open your browser http://localhost:8080/management/processes/ui to visualize your running service |
An important detail to mention is that MQTT allows to subscribe using wild cards. This feature is used in this example as it allows flexible workflow definition.
Both start events (Water measurement
and Humidity measurement
) subscribe to
MQTT topics using wild cards
-
building/+/+/water
- this allows to receive any building’s and any room of that building water consumption data -
building/+/+/humidity
- this allows to receive any building’s and any room of that building humidity data
In reality this means that every incoming sensor data to topic that matches this MQTT wildcard will either start new instance of the workflow or push data into already existing one.
Example
Humidity sensor in room 555 in building ABC will push data to building/ABC/555/humidity
topic and humidity sensor in the same building but room 123 will push data to
building/ABC/555/humidity
topic.
This in turn will create two workflow (waterleasks
)
instances that will be uniquely identified (so called business key)
ABC-555
and ABC-123
respectively.
This business key (ABC-555 ) can be used at any time to get hold of
given workflow instance with service API. It is also configurable when it
come to the format and content of the business key - in this example it
retrieves information from the actual topic data was sent to.
|
Similar water consumption data are handled. If they match the same building and room the data will be "appended" to already existing workflow instance, otherwise new instance will be started.
There are multiple paths that can be taken during detection process
Each incoming sensor data will trigger either
-
Water measurement
in case it comes from water meter -
Humidity measurement
in case it comes from humidity sensor
Water measurements are collected into a bucket that allows to accumulate values and perform calculation on top of items.
Humidity data is constantly replacing previous value (if there was any), so it always represents last known value.
The happy path
The happy path for humidity means humidity level below 85%. The happy path for water measurement means that there won’t be max value greater than 100.
Water measurements are calculated every fifth value being received. |
Try it
Follow steps in the Details
section to see the happy path in action.
Here are the steps to try out with happy path
-
Publish room 100 of building Main humidity data
-
Topic
building/Main/100/humidity
-
Payload
-
{ "ts" : 1608570416309, "val" : 48.0 }
-
Publish room 100 of building Main water data
-
Topic
building/Main/100/water
-
Payload
-
{ "ts" : 1608570416309, "val" : 25.0 }
Repeat water consumption data more than 5 times to see the report being calculated. At any point in time you can use service API to see current state of the active instances.
Why the format of the payload is as it is? The main reason is that it follows MQTT SmartHome specification to provide some degree of standardisation. |
The leak detected path
The leak detected path means water measurement won’t be at any time greater than 100. This is based on report - calculated values of at least 5 data points - measurements.
As soon as there will be a value exceeding 100 for water consumption level it will trigger a task assigned to technicians. That means that there is a need for investigation on site to confirm or deny detected anomaly.
Water measurements are calculated every fifth value being received. |
Try it
Follow steps in the Details
section to see the leak detected path in action.
Here are the steps to try out with leak detected path
-
Publish room 100 of building Main water data
-
Topic
building/Main/100/water
-
Payload
-
{ "ts" : 1608570416309, "val" : 105.0 }
Repeat water consumption data more than 5 times to see the report being calculated and task assigned to technicians.
At any point in time you can use service API to see current state of the active instances. For instance to get currently assigned tasks for technicians for room 100 in building ABC issue following request
Why the format of the payload is as it is? The main reason is that it follows MQTT SmartHome specification to provide some degree of standardisation. |
The humidity level too high path
The humidity level too high path means that at any time when humidity level is grater than 85 it will assign another task for technicians. It is rather probable that there is some kind of anomaly when humidity is too high.
Try it
Follow steps in the Details
section to see the humidity level too high path in action.
Here are the steps to try out with humidity level too high path
-
Publish room 100 of building Main humidity data
-
Topic
building/Main/100/humidity
-
Payload
-
{ "ts" : 1608570416309, "val" : 88.0 }
This will create workflow instance and directly assign task for technicians.
At any point in time you can use service API to see current state of the active instances. For instance to get currently assigned tasks for technicians for room 100 in building ABC issue following request
Why the format of the payload is as it is? The main reason is that it follows MQTT SmartHome specification to provide some degree of standardisation. |
The publish report path
Sensor data are collected for given amount of time. In this example they are set to 1 hour. After one hour from first data received for given room and a building report will be published to another MQTT topic.
The reports topic will be calculated based on the building and room it collects
data for. For example, workflow instance identified as (ABC-123
which means
it collects data for room 123 in building ABC) will publish its report to
MQTT topic reports/ABC/123/hourly
.
In case you don’t want to wait for an hour to see the report being published
you can start the docker image with extra environment variable -e WATER_LEAKS_TIMER=PT5M
this will publish report after 5 minutes instead of 1 hour.
|
Try it
Follow steps in the Details
section to see the humidity level too high path in action.
Here are the steps to try out with publish report path
-
Publish room 100 of building Main humidity data
-
Topic
building/Main/100/humidity
-
Payload
-
{ "ts" : 1608570416309, "val" : 48.0 }
-
Publish room 100 of building Main water data
-
Topic
building/Main/100/water
-
Payload
-
{ "ts" : 1608570416309, "val" : 25.0 }
Repeat water consumption data more than 5 times to see the report being calculated. At any point in time you can use service API to see current state of the active instances.
This will create workflow instance and directly assign task for technicians.
At any point in time you can use service API to see current state of the active instances. For instance to get currently assigned tasks for technicians for room 100 in building ABC issue following request
Why the format of the payload is as it is? The main reason is that it follows MQTT SmartHome specification to provide some degree of standardisation. |
Another interesting aspect is that when report is published it will be consumed but another workflow that is responsible for collecting data for entire building based on published reports.
Source code
Complete source code of this example can be found in GitHub