Use BPM events in Process Mining : Camunda linked to Logpickr with KsqlDB
In today’s world we deal a lot with real-time streaming data and events that come from a bunch of different sources and from which we can derive stats, facts and information that are crucial for today’s businesses.
Processing and storing these real-time streams in a scalable and fault-tolerant manner is now possible using Kafka as an Event Streaming Platform and KsqlDB as an Event Streaming Database. These technologies have made Event Stream Processing Applications thrive.
I have explained the basics behind Apache Kafka and Zookeper in my previous article. However, In this article, i will be dealing with the concepts behind KsqlDB and how we can derive a huge benefit from it through linking it to BPM along with Process Mining. As a use-case. I will be using Camunda along with Logpickr Process Explorer 360.
Then, i will include a demonstration part where i will be using camunda to get informations about the running processes. These real time informations will be transformed using KsqlDB and then will be sent to a basic NodeJS API and a CSV file . These Final outputs ( i.e. CSV file ) will be used by Logpickr Process Explorer 360 in order to analyse, anticipate and optimize any type of incoming process.
At the end of this article, i will provide you with all of the references that i have found useful in order to let you move forward with your learning process.
An overview of ksqlDB
What is ksqlDB?
KsqlDB is an event streaming database built on top of Kafka for stream processing applications. An event is anything that happens at a point in time. It represent ksqlDB’s core unit of data. A stream is a just a bunch of events.
What are the core components of ksqlDB?
KsqlDB is made of : collections, materialized views, queries and connectors.
Collections provide durable storage for sequences of events. ksqlDB offers two kinds of collections: streams and tables.
- Streams are immutable, append-only collections. They’re useful for representing a series of historical facts. Adding multiple events with the same key means that they are simply appended to the end of the stream.
- Tables are mutable collections. They let you represent the latest version of each value per key. They’re helpful for modeling change over time, and they’re often used to represent aggregations.
Because ksqlDB leverages Kafka for its storage layer, creating a new collection equates to defining a stream or a table over a Apache Kafka topic. You can declare a collection over an existing topic, or ksqlDB can create a new topic for the collection at declaration time.
Materialized views are derived representations of streams or tables. They enable you to create new collections over existing streams or tables. Materialized views are perpetually kept up to date as new events arrive. This means that you can chain materialized views together to create many representations of the same data. Materialized views are especially useful for maintaining aggregated tables of data.
While materialized views allow you to maintain state, queries enable you to access these materializations from applications or microservices. Applications can query materialized views with pull queries and push queries.
- Pull queries enable you to fetch the current state of a materialized view.
- Push queries enable you to subscribe to materialized view updates and stream changes. When new events arrive, push queries emit refinements, so your event streaming applications can react to new information in real-time.
Often the data you want to work with isn’t in Kafka. Generally it stored in diverse source such traditional databases, application APIs, CSV and JSON files or other systems.
Kafka Connect has a great ecosystem of prebuilt connectors that can help you to continuously ingest the streams of data you want into Kafka (source connectors) as well as to continuously export the data out of Kafka (sink connectors).
ksqlDB allows you to directly control and execute connectors built to work with Kafka Connect which means you can build an end-to-end event streaming application in just a few SQL queries:
- Extract: one or more queries to extract the event streams from source systems using Kafka connectors
- Transform: queries to continually materialize results off these streams
- Query: queries to access particular results to serve an application
As long as we’re dealing with camunda’s data, i found crucial to give you a brief introduction to what camunda is, how to implement it using docker and how to access its process information.
Most importantly, i will also present the Logpickr Process Explorer 360 platform that will play the consumer role in our kafka cluster to analyze the behavior of each process, anticipate and optimize any type of incoming processes.
A brief Presentation of Camunda BPM Engine
The Camunda BPM engine is an open source, Java-based framework that provides a business process management (BPM) system for any kind and size of organisation. It offers pre-designed BPM systems that can be modeled and executed for workflow and business process automation. It is centered around a runtime engine and uses an in-built modeling tool to execute the business process designs.
You can check this article for more details. In this article, we’re interested in getting the logs of the running processes stored in the Database in order to study and analyze the behavior of these processes using Logpickr Process Explorer 360 platform.
Logpickr Process Explorer 360 platform
Logpickr Process Explorer 360 is a platform that brings together Process Mining and Machine Learning technologies to help you move your business forward through analyzing, anticipating, optimizing any type of incoming process and most importantly, predicting the future behavior of each analysed process which will allow you to accelerate your digital transformation and your RPA automation projects
Recognized by Gartner since 2019 and several times awarded, the solution identifies the flaws in your processes but also the points to be improved, anticipates failures and allows you to efficiently improve your processes as well as your organization’s profitability.
The great new is that, in its future version, Logpickr Process Explorer 360 will integrate kSQLDB allowing it to be connnected to more than 80 connectors !
KsqlDB in Action
In this demonstration, i will setup the following architecture :
Setting up KsqlDB
In this demonstration i will be using :
- Docker in order to run the Kafka cluster.
- Confluent Platform in order to have the necessary commands to install connectors etc.
- Confluent Platform Quick Start (Docker) project.
- Postman to perform API calls
Download Docker for Ubuntu
Download the Confluent Platform for Ubuntu
First, make sure to install Confluent Platform. It is a fully managed Kafka service and enterprise stream processing platform that allows building Real-time data streaming applications.
Then add the path of the confluent folder to the
source /etc/profile and tap
confluent to make sure that the command is recognized.
The Confluent Hub Client is integrated within the Confluent Platform.It allows you to install the different connectors to connect KsqlDB to your data sources.
Here are the connectors that you need to install for this demo :
Once installed, the connectors will be available in
make sure to run
confluent-hubin as a root user to enable the auto creation of the
Installation instructions are defined here.
Confluent Platform Quick Start (Docker)
Now, you need to download and start Confluent Platform Using Docker
Next, you need to modify the
docker-compose.ymlfile to the following :
The reported modifications are :
- In the connect container, i have added a volume to import the connectors defined in
- I’ve added the camunda and postgres containers to interact with camunda’s postgres database.
First, make sure to stop and remove your containers if you’ve any running container related to confluent :
Once the containers are up, you can access the following interfaces :
- Control Center: a web-based tool for managing and monitoring Apache Kafka
- Connectors: to see the defined connectors
- Camunda: access it using demo as username and password
- Camunda’s postgres database
Define connectors to get source data
Our source data is stored in Camunda’s postgres database that we have defined in
By default, camunda uses an h2 database however, there’s no jdbc driver to access it using KsqlDB, in such case, it is better to get the source data using API calls which means, defining an http connector instead of a jdbc connector. The configuration of both cases (Postgres database and API) will be described later on.
The following instructions describe how to access camunda’s postgres database or the default database,
The path of camunda’s API is presented as follows :
To get information about the processes logs that we’re interested in for this demonstration you should run the following:
demo:demo are the default username and password for authentification.
You can run a GET request in postman to get the logs as follows :
In the demo we have 2 processes : Invoice Receipt and Review Invoice(available in 2 different versions) that start at
StartTime and end at
Each process is defined by its ID (
processDefinitionId) and is made of different process instances each with its corresponding ID (
executionId). Each process instance execute a set of activities described by
We’re interested in getting the logs of camunda’s processes in order to analyse their behavior using Logpickr Process Explorer 360. So, make sure to access the camunda’s postgres database and check if the table that we’re interested in have been loaded.
JDBC Connector Constraints :
- We need to make sure that the name of the table we wanna import is written in lowercase.
- We need to make sure that the table have a column that is of type int and is not null, it is necessary for the JDBC connector. in our case, the
act_hi_taskinsttable doesn’t have such a column so we need to create it and set its value to 0 just to make things work correctly for this demo.
In order to import the
act_hi_taskinsttable content into a kafka topic, you need to create a JDBC connector that we have already installed. This can be done as follows:
- Connect to the KsqlDB server :
2. Create the
After creating this connector, you should have a topic named
jdbc_act_hi_taskinst that contains the content of the
act_hi_taskinst table. For more information about the JDBC connector parameter, check JDBC Source for Confluent Platform Parameters. For now, just notice that we’ve told the connector to connect to the camunda postgres database and import the content of
act_hi_taskinst to the
jdbc_act_hi_taskinst kafka topic.
You can print the content of the
print ‘jdbc_act_hi_taskinst’ from beginning;
Once you have the data imported to the kafka topic, you can create streams and tables to perform different manipulations on the imported data using SQL.
Let’s create a stream from the
jdbc_act_hi_taskinsttopic and set its format to AVRO:
CREATE STREAM PROCESS WITH (KAFKA_TOPIC='jdbc_act_hi_taskinst',VALUE_FORMAT='AVRO');
Then display its content
SET 'auto.offset.reset' = 'earliest';SELECT * FROM PROCESS EMIT CHANGES;
Check the schema of the stream.
You can see, that your imported data available in the
jdbc_act_hi_taskinst is now just a simple SQL table.
After running the
describe process;command, you can notice that the columns
endTime are by default set to
BIGINTbecause KsqlDB doesn’t support the
TIMESTAMPtype(check ksqlDB data types) .That’s why we need to converted them to
VARCHAR following the format
yyyy-MM-dd HH:mm:ss.SSS .So, i have created the transition stream which is a materialized view that runs a push query (the
EMIT CHANGESoption) over the
PROCESS STREAM .
Then from the
transition stream, we create the
PROCESS_LOGPICKRstream that contains the correct format of the needed columns.
You can display the content of the created stream, its schema and its corresponding topic as follows:
print ‘process_logpickr’ from beginning;SET ‘auto.offset.reset’ = ‘earliest’;SELECT * FROM PROCESS_LOGPICKR EMIT CHANGES;DESCRIBE PROCESS_LOGPICKR;
1. For the conversion part, i am sure that we don’t need to pass by the transition stream but for instance it is the only solution i have found. Feel free to propose any suggestion.
2. The creation of streams/tables that are based on push/pull queries generate a new topic defined in
kafka_topicparameter and a query.
You can display tables, streams, topics and query as follows:
As described above, if you’re not using postgres database for camunda, you can use API calls to get your data into the kafka topic through the creation the Kafka Connect HTTP source connector as follows :
Define sink connectors to connect your end-user application with kafka topics
For this demo, let’s define a basic nodejs consumer API, that we will connect to the created
process_logpickrtopic through Kafka Connect Http Sink Connector.
First, create a directory
listenerhttp-node access it and execute the command
npm initthat will generate a package.json file.
Then, create an
index.jsfile where we define the http listener server that receives the POST requests of the events coming from the kafka topic through the Kafka Connect Http Sink Connector.
Next, run the server with the command
node index.js in order for your API to listen for incoming POST calls from the ksqlDB server.
After that, you need to define a sink that will connect the above defined API to the
Now, you can check on the NodeJS API server command line interface to see all of the POST methods that have been invoked. From now, any new real-time incoming event will be automatically posted to the NodeJS API.
2. FileStreamSink Connector
You can find the generated file in the connect container:
docker-compose exec connect bash
You can copy the file from the connect container to localhost using:
docker cp connect:/process.csv $HOME
process.csv file will be passed to the Logpickr Process Explorer 360 Platform for further analysis in order to get a better understanding of the process behavior.
Through this article, i have covered the basics behind ksqlDB then i have demonstrated a simple use-case with Camunda to get the process logs transformed and prepared by KsqlDB for further analysis on the Logpickr Process Explorer 360 platform.
Logpickr Process Explorer 360:
- Introduction to ksqlDB
- Stream Data Pipeline
- Streaming ETL demo
- Confluent demo github
- Confluent Quick Start github
- Install the Spool Dir Connector Package
- JDBC Source for Confluent Platform Parameters
- CSV Source Connector for Confluent Platform Parameters
- JSON Source Connector for Confluent Platform Parameters
- Kafka Connect Http Sink Connector
- Kafka Connect HTTP
- What is ksqlDB?
- Introducing ksqlDB
- ksqlDB with Embedded Connect
- KSQL in Action: Enriching CSV Events with Data from RDBMS into AWS
- KsqlDB Syntax Reference
- Kafka Connect Deep Dive — Converters and Serialization Explained
- Building a Clickstream Dashboard Application with ksqlDB and Elasticsearch
- Streaming ETL pipeline
- Deep Dive into ksqlDB Deployment Options
- Exploring ksqlDB with Twitter Data
- Kafka Connect Deep Dive — Converters and Serialization Explained
- Say Hello World to Event Streaming.
- Webify Event Streams Using the Kafka Connect HTTP Sink Connector
- Créer un datapipeline en 20 minutes à l’aide de Kafka Connect
- How to get started with Apache Kafka in 5 minutes
- Changing Data Serialization Format from Avro to CSV
- First steps with Kafka Connect HTTP Sink: installation, configuration, errors manager and monitoring
- Kafka Connect Tutorial
- Kafka Connect FilePulse — Un connecteur pour tous les ingérer!
- JDBC Sink
Camunda using Docker: