Use BPM events in Process Mining : Camunda linked to Logpickr with KsqlDB

Introduction

In today’s world we deal a lot with real-time streaming data and events that come from a bunch of different sources and from which we can derive stats, facts and information that are crucial for today’s businesses.

Processing and storing these real-time streams in a scalable and fault-tolerant manner is now possible using Kafka as an Event Streaming Platform and KsqlDB as an Event Streaming Database. These technologies have made Event Stream Processing Applications thrive.

I have explained the basics behind Apache Kafka and Zookeper in my previous article. However, In this article, i will be dealing with the concepts behind KsqlDB and how we can derive a huge benefit from it through linking it to BPM along with Process Mining. As a use-case. I will be using Camunda along with Logpickr Process Explorer 360.

Then, i will include a demonstration part where i will be using camunda to get informations about the running processes. These real time informations will be transformed using KsqlDB and then will be sent to a basic NodeJS API and a CSV file . These Final outputs ( i.e. CSV file ) will be used by Logpickr Process Explorer 360 in order to analyse, anticipate and optimize any type of incoming process.

At the end of this article, i will provide you with all of the references that i have found useful in order to let you move forward with your learning process.

An overview of ksqlDB

KsqlDB is an event streaming database built on top of Kafka for stream processing applications. An event is anything that happens at a point in time. It represent ksqlDB’s core unit of data. A stream is a just a bunch of events.

KsqlDB

KsqlDB is made of : collections, materialized views, queries and connectors.

Collections provide durable storage for sequences of events. ksqlDB offers two kinds of collections: streams and tables.

  • Streams are immutable, append-only collections. They’re useful for representing a series of historical facts. Adding multiple events with the same key means that they are simply appended to the end of the stream.
  • Tables are mutable collections. They let you represent the latest version of each value per key. They’re helpful for modeling change over time, and they’re often used to represent aggregations.

Because ksqlDB leverages Kafka for its storage layer, creating a new collection equates to defining a stream or a table over a Apache Kafka topic. You can declare a collection over an existing topic, or ksqlDB can create a new topic for the collection at declaration time.

Materialized views are derived representations of streams or tables. They enable you to create new collections over existing streams or tables. Materialized views are perpetually kept up to date as new events arrive. This means that you can chain materialized views together to create many representations of the same data. Materialized views are especially useful for maintaining aggregated tables of data.

While materialized views allow you to maintain state, queries enable you to access these materializations from applications or microservices. Applications can query materialized views with pull queries and push queries.

  • Pull queries enable you to fetch the current state of a materialized view.
  • Push queries enable you to subscribe to materialized view updates and stream changes. When new events arrive, push queries emit refinements, so your event streaming applications can react to new information in real-time.

Often the data you want to work with isn’t in Kafka. Generally it stored in diverse source such traditional databases, application APIs, CSV and JSON files or other systems.

Kafka Connect has a great ecosystem of prebuilt connectors that can help you to continuously ingest the streams of data you want into Kafka (source connectors) as well as to continuously export the data out of Kafka (sink connectors).

ksqlDB allows you to directly control and execute connectors built to work with Kafka Connect which means you can build an end-to-end event streaming application in just a few SQL queries:

  1. Extract: one or more queries to extract the event streams from source systems using Kafka connectors
  2. Transform: queries to continually materialize results off these streams
  3. Query: queries to access particular results to serve an application

As long as we’re dealing with camunda’s data, i found crucial to give you a brief introduction to what camunda is, how to implement it using docker and how to access its process information.

Most importantly, i will also present the Logpickr Process Explorer 360 platform that will play the consumer role in our kafka cluster to analyze the behavior of each process, anticipate and optimize any type of incoming processes.

A brief Presentation of Camunda BPM Engine

The Camunda BPM engine is an open source, Java-based framework that provides a business process management (BPM) system for any kind and size of organisation. It offers pre-designed BPM systems that can be modeled and executed for workflow and business process automation. It is centered around a runtime engine and uses an in-built modeling tool to execute the business process designs.

Camunda Architecture

You can check this article for more details. In this article, we’re interested in getting the logs of the running processes stored in the Database in order to study and analyze the behavior of these processes using Logpickr Process Explorer 360 platform.

Logpickr Process Explorer 360 platform

Logpickr Process Explorer 360 is a platform that brings together Process Mining and Machine Learning technologies to help you move your business forward through analyzing, anticipating, optimizing any type of incoming process and most importantly, predicting the future behavior of each analysed process which will allow you to accelerate your digital transformation and your RPA automation projects

Recognized by Gartner since 2019 and several times awarded, the solution identifies the flaws in your processes but also the points to be improved, anticipates failures and allows you to efficiently improve your processes as well as your organization’s profitability.

The great new is that, in its future version, Logpickr Process Explorer 360 will integrate kSQLDB allowing it to be connnected to more than 80 connectors !

KsqlDB in Action

In this demonstration, i will setup the following architecture :

Setting up KsqlDB

In this demonstration i will be using :

  1. Docker in order to run the Kafka cluster.
  2. Confluent Platform in order to have the necessary commands to install connectors etc.
  3. Confluent Platform Quick Start (Docker) project.
  4. Postman to perform API calls
  1. Install Docker Engine on Ubuntu
  2. Install Docker Compose
  3. Post-installation steps for Linux

First, make sure to install Confluent Platform. It is a fully managed Kafka service and enterprise stream processing platform that allows building Real-time data streaming applications.

Then add the path of the confluent folder to the /etc/profile

Then source /etc/profile and tap confluent to make sure that the command is recognized.

The Confluent Hub Client is integrated within the Confluent Platform.It allows you to install the different connectors to connect KsqlDB to your data sources.

Here are the connectors that you need to install for this demo :

Once installed, the connectors will be available in /usr/share/confluent-hub/components directory.

Note :

make sure to run confluent-hub in as a root user to enable the auto creation of the /usr/share/confluent-hub/components directory

Installation instructions are defined here.

Now, you need to download and start Confluent Platform Using Docker

Next, you need to modify the docker-compose.ymlfile to the following :

The reported modifications are :

  1. In the connect container, i have added a volume to import the connectors defined in /usr/share/confluent-hub/components
  2. I’ve added the camunda and postgres containers to interact with camunda’s postgres database.

Running KsqlDB

First, make sure to stop and remove your containers if you’ve any running container related to confluent :

Once the containers are up, you can access the following interfaces :

  1. Control Center: a web-based tool for managing and monitoring Apache Kafka
  2. Connectors: to see the defined connectors
  3. Camunda: access it using demo as username and password
  4. Camunda’s postgres database

Our source data is stored in Camunda’s postgres database that we have defined in docker-compose.yml .

Note :

By default, camunda uses an h2 database however, there’s no jdbc driver to access it using KsqlDB, in such case, it is better to get the source data using API calls which means, defining an http connector instead of a jdbc connector. The configuration of both cases (Postgres database and API) will be described later on.

The following instructions describe how to access camunda’s postgres database or the default database,

The path of camunda’s API is presented as follows :

http://demo:demo@localhost:8080/engine-rest/task

To get information about the processes logs that we’re interested in for this demonstration you should run the following:

http://demo:demo@localhost:8080/engine-rest/history/activity-instance/

Where demo:demo are the default username and password for authentification.

You can run a GET request in postman to get the logs as follows :

Postman

In the demo we have 2 processes : Invoice Receipt and Review Invoice(available in 2 different versions) that start at StartTime and end at endTime.

Each process is defined by its ID (processDefinitionId) and is made of different process instances each with its corresponding ID (executionId). Each process instance execute a set of activities described by activityName.

We’re interested in getting the logs of camunda’s processes in order to analyse their behavior using Logpickr Process Explorer 360. So, make sure to access the camunda’s postgres database and check if the table that we’re interested in have been loaded.

JDBC Connector Constraints :

  1. We need to make sure that the name of the table we wanna import is written in lowercase.
  2. We need to make sure that the table have a column that is of type int and is not null, it is necessary for the JDBC connector. in our case, the act_hi_taskinsttable doesn’t have such a column so we need to create it and set its value to 0 just to make things work correctly for this demo.

In order to import the act_hi_taskinsttable content into a kafka topic, you need to create a JDBC connector that we have already installed. This can be done as follows:

  1. Connect to the KsqlDB server :

2. Create the jdbc_source connector

After creating this connector, you should have a topic named jdbc_act_hi_taskinst that contains the content of the act_hi_taskinst table. For more information about the JDBC connector parameter, check JDBC Source for Confluent Platform Parameters. For now, just notice that we’ve told the connector to connect to the camunda postgres database and import the content of act_hi_taskinst to the jdbc_act_hi_taskinst kafka topic.

You can print the content of the jdbc_act_hi_taskinst topic.

print ‘jdbc_act_hi_taskinst’ from beginning;

Once you have the data imported to the kafka topic, you can create streams and tables to perform different manipulations on the imported data using SQL.

Let’s create a stream from the jdbc_act_hi_taskinsttopic and set its format to AVRO:

CREATE STREAM PROCESS WITH (KAFKA_TOPIC='jdbc_act_hi_taskinst',VALUE_FORMAT='AVRO');

Then display its content

SET 'auto.offset.reset' = 'earliest';SELECT * FROM PROCESS EMIT CHANGES;

Check the schema of the stream.

DESCRIBE PROCESS;

You can see, that your imported data available in the jdbc_act_hi_taskinst is now just a simple SQL table.

After running the describe process;command, you can notice that the columns StartTime and endTime are by default set to BIGINTbecause KsqlDB doesn’t support the TIMESTAMPtype(check ksqlDB data types) .That’s why we need to converted them to VARCHAR following the format yyyy-MM-dd HH:mm:ss.SSS .So, i have created the transition stream which is a materialized view that runs a push query (the EMIT CHANGESoption) over the PROCESS STREAM .

Then from the transition stream, we create the PROCESS_LOGPICKRstream that contains the correct format of the needed columns.

You can display the content of the created stream, its schema and its corresponding topic as follows:

print ‘process_logpickr’ from beginning;SET ‘auto.offset.reset’ = ‘earliest’;SELECT * FROM PROCESS_LOGPICKR EMIT CHANGES;DESCRIBE PROCESS_LOGPICKR;

Note:

1. For the conversion part, i am sure that we don’t need to pass by the transition stream but for instance it is the only solution i have found. Feel free to propose any suggestion.

2. The creation of streams/tables that are based on push/pull queries generate a new topic defined in kafka_topic parameter and a query.

You can display tables, streams, topics and query as follows:

As described above, if you’re not using postgres database for camunda, you can use API calls to get your data into the kafka topic through the creation the Kafka Connect HTTP source connector as follows :

  1. Kafka Connect Http Sink Connector

For this demo, let’s define a basic nodejs consumer API, that we will connect to the created process_logpickrtopic through Kafka Connect Http Sink Connector.

First, create a directory listenerhttp-node access it and execute the command npm initthat will generate a package.json file.

Then, create an index.jsfile where we define the http listener server that receives the POST requests of the events coming from the kafka topic through the Kafka Connect Http Sink Connector.

Next, run the server with the command node index.js in order for your API to listen for incoming POST calls from the ksqlDB server.

After that, you need to define a sink that will connect the above defined API to the process_logpickrtopic.

Now, you can check on the NodeJS API server command line interface to see all of the POST methods that have been invoked. From now, any new real-time incoming event will be automatically posted to the NodeJS API.

2. FileStreamSink Connector

You can find the generated file in the connect container:

docker-compose exec connect bash

You can copy the file from the connect container to localhost using:

docker cp connect:/process.csv $HOME

Finally, this process.csv file will be passed to the Logpickr Process Explorer 360 Platform for further analysis in order to get a better understanding of the process behavior.

Logpickr Process Explorer 360

Conclusion

Through this article, i have covered the basics behind ksqlDB then i have demonstrated a simple use-case with Camunda to get the process logs transformed and prepared by KsqlDB for further analysis on the Logpickr Process Explorer 360 platform.

References

Logpickr Process Explorer 360:

Confluent Demos:

Confluent Connectors:

Confluent Articles:

Camunda using Docker:

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store