• +43 660 1453541
  • contact@germaniumhq.com

Python BPMN Event Deduplication With Adhesive


Python BPMN Event Deduplication With Adhesive

What happens when we might receive many events, but only need to process the last one? For example, in a Kubernetes operator, where events from a custom resource keep streaming in, we want to process only the custom resource’s last state. This approach is called event deduplication. Before we created a whole scaffolding around in BPMN, that would wait using locks and manually track the events. Using this is prone to problems that we’ll analyze and see the bundled solution offered by Adhesive.

First, let’s start with the problems. This picture shows the BPMN we used before:

Old Deduplication BPMN

This approach of manually keeping track of the live events has a host of problems:

  1. Error checks must be carefully be designed. Otherwise, the execution token raises an error and the event loop deadlocks. Have an error on an edge condition? Too bad. Deadlock.

  2. The deduplication data gets managed inside the process. That means we can only use multi-threading, not parallel processes processing. If the tasks run in the process pool, they can’t see the synchronization data, to check what events are in flight. Because we use multi-threading, it means now we can’t just terminate BPMN subprocesses in case of errors.

  3. Locks come with a cost. Since the task scheduling happens on a single thread, using locks shouldn’t be necessary.

Now let’s see the solution.

Because of these reasons, the @task and @usertask definitions can now accept a new deduplicate attribute, that’s an expression. From that moment, when a token enters the task, the token gets the deduplication id assigned to it. If other events with the same ID exist somewhere in the process, the task will wait until they disappear.

That means no more looping, and no more scaffolding needed. The process becomes this:

Deduplication BPMN

With the added attribute that reads the deduplication id expression:

@adhesive.task("Execute Task for {event.event_id}",
               deduplicate="event.event_id")
def execute_task(context):
    # ...

Performance-wise, things get insanely good. For the old manual run that could only execute using multi-threading, we got from an average of 1m20s to execute 10000 input events. Using the new approach, we got 41s to execute 10000 input events, and this time is similar to both multi-threading and multi-process.

Of course, this has a lot to do with dropping the scaffolding around it and removing the locks. The gains won’t be as dramatic in a real process, but still, it’s fantastic to see.

This is also updated in the samples page for Adhesive: https://github.com/germaniumhq/adhesive-samples