• +43 660 1453541
  • contact@germaniumhq.com

Event Deduplication in BPMN


Event Deduplication in BPMN

When ingesting a lot of events it makes sense to process only the last relevant event from the received events, and discard the previous ones. Since BMPN already works with execution tokens, which are in themselves events, how can we drop duplicated events?

Let’s assume we have a task that we need to process, and that one is called "Execute Task" for a specific resource. For example that could be "Configure VM", or create "DNS Entry", etc. that works for a resource type, and our BPMN receives events.

An easy way to drop duplicate events, is to give a state to the events: new, process and done. Then we model it with a simple Gateway:

BPMN Process

Events start in the new state. If Deduplicate Events decides the event should be processed, it changes its state to process. After is done, the Set the event as processed, again changes its state to done. Thus in Deduplicate Events we know if we’re being called withe a finished event, so we just need to check if we have pending events, or with a brand new event queued.

In order to keep track what’s going on, we have two structures the already_running events, and the pending_events. The idea is that we still want parallel execution for different resources, and to differentiate the resources we create a function called get_message_id, that reads the event, and outputs a message identifier. This is the key used in our structures:

already_running: Set[str] = set()
pending_events: Dict[str, Any] = dict()

Each event that lands in the process is in the new state. The Deduplicate Events checks if there’s already a running task for the event ID. When not, we change the state of the event to process. This allows the token to fall through the Gateway, and start executing the Execute Task.

If the task is already running, we store the data of the current event into the pending_events, and simply drop the current execution token.

When the running task finishes, the Set the event as processed changes the state to done, and we’re sending it back to the deduplication.

Finally on the deduplication side, if meanwhile new events appeared, we’d need to start a new execution. For that we check the pending_events, we get the stored data and patch it into the current token, and change the state of the token back to process.

Basically we have the initial event looping, and changing state, for as long as we have new events being queued, as long as we have new events:

@adhesive.task('Deduplicate Events')
def deduplicate_events(context):
    global already_running
    global pending_events

    event = context.data.event
    event_id = get_event_id(event)

    context.data.event_id = event_id

    # Since we already have events running, we let this token
    # pass through. Since the state will be "new" and not "process"
    # we'll drop this token.
    if context.data.event["state"] == "new" and event_id in already_running:
        pending_events[event_id] = event
        return context.data

    # If we're getting notified that a task finished, we're marking
    # the task as not running anymore for that event id type
    if context.data.event["state"] == "done":
        already_running.remove(event_id)

    # If we did a loop and we returned with the done event, and nothing
    # else is waiting we return
    if context.data.event["state"] == "done" and not pending_events:
        return context.data

    # we have either a new event, or a done event arriving
    if context.data.event["state"] == "done":
        context.data.event = pending_events[event_id]
        del pending_events[event_id]

    context.data.event["state"] = "process"
    already_running.add(event_id)

    return context.data

Then the Set the event as processed becomes simply:

@adhesive.task('Set the event as processed')
def set_the_event_as_processed(context):
    context.data.event["state"] = "done"

Why is this nice? Because we have now a pattern that’s usable every time:

  1. Implement the get_event_id,

  2. Recreate the BPMN structure,

  3. Implement the actual task.

Then we get a fully parallel task execution for different resources, with event deduplication for the same resource.

To run it, you just need adhesive:

pip install -U adhesive

Then either call adhesive in that folder,

adhesive

or execute it as a python script:

python _adhesive.py

Enjoy!