• +43 660 1453541
  • contact@germaniumhq.com

Python: Fighting the Invisible Memory Thief


Python: Fighting the Invisible Memory Thief

I noticed that when processing a lot of events, adhesive would significantly slow down. I always assumed it was because of the way the processing created the execution tokens - with new callbacks tied to each execution. It’s all clear now what was going on.

To understand the problem: for each execution token, the code looked similar to this:

def clone_event(self,
                old_event: ActiveEvent,
                task: ExecutableNode,
                parent_id: Optional[str] = None) -> ActiveEvent:

    event = old_event.clone(task, parent_id)
    self.register_event(event)

    def process_event(_event) -> ActiveEventState: ...
    def wait_task(_event) -> Optional[ActiveEventState]: ...
    def run_task(_event) -> Optional[ActiveEventState]: ...
    def log_running_done(_event): ...
    def error_task(_event) -> None: ...
    def route_task(_event) -> None: ...
    def done_check(_event) -> Optional[ActiveEventState]: ...
    def done_task(_event) -> None: ...

    event.state.after_enter(ActiveEventState.PROCESSING, process_event)
    event.state.after_enter(ActiveEventState.WAITING, wait_task)
    event.state.after_enter(ActiveEventState.RUNNING, run_task)
    event.state.after_leave(ActiveEventState.RUNNING, log_running_done)
    event.state.after_enter(ActiveEventState.ERROR, error_task)
    event.state.after_enter(ActiveEventState.ROUTING, route_task)
    event.state.after_enter(ActiveEventState.DONE_CHECK, done_check)
    event.state.after_enter(ActiveEventState.DONE, done_task)

    return event

Yes, that means on each execution token, the processing loop would create the callbacks for the lifecycle of the token.

The problem is when you have a process like this:

Multiple Tasks

For each input event, you’ll get five tokens potentially running in parallel. Let’s see some numbers when we throw 1000 events in this process. We’ll wrap the execution around memory profiling (straight from the documentation):

import linecache
import os
import tracemalloc

def display_top(snapshot, key_type='lineno', limit=10):
    snapshot = snapshot.filter_traces((
        tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
        tracemalloc.Filter(False, "<unknown>"),
    ))
    top_stats = snapshot.statistics(key_type)

    print("Top %s lines" % limit)
    for index, stat in enumerate(top_stats[:limit], 1):
        frame = stat.traceback[0]
        # replace "/path/to/module/file.py" with "module/file.py"
        filename = os.sep.join(frame.filename.split(os.sep)[-2:])
        print("#%s: %s:%s: %.1f KiB"
              % (index, filename, frame.lineno, stat.size / 1024))
        line = linecache.getline(frame.filename, frame.lineno).strip()
        if line:
            print('    %s' % line)

    other = top_stats[limit:]
    if other:
        size = sum(stat.size for stat in other)
        print("%s other: %.1f KiB" % (len(other), size / 1024))
    total = sum(stat.size for stat in top_stats)
    print("Total allocated size: %.1f KiB" % (total / 1024))

tracemalloc.start()

# ... run your application ...

snapshot = tracemalloc.take_snapshot()
display_top(snapshot)

The results:

Top 10 lines
#1: model/ActiveEventStateMachine.py:447: 33468.3 KiB
    self.registered = dict()
#2: python3.8/copy.py:229: 19604.3 KiB
    y[deepcopy(key, memo)] = deepcopy(value, memo)
#3: python3.8/uuid.py:780: 11974.8 KiB
    return UUID(bytes=os.urandom(16), version=4)
#4: python3.8/copy.py:226: 9187.5 KiB
    y = {}
#5: python3.8/copy.py:278: 7852.7 KiB
    y.__dict__.update(state)
#6: model/ActiveEventStateMachine.py:453: 3985.2 KiB
    event_listeners = self.registered[event_name.value] = dict()
#7: python3.8/copyreg.py:91: 2812.9 KiB
    return cls.__new__(cls, *args)
#8: model/ProcessExecutor.py:917: 2579.6 KiB
    def done_check(_event) -> Optional[ActiveEventState]:
#9: model/ProcessExecutor.py:747: 2578.3 KiB
    def wait_task(_event) -> Optional[ActiveEventState]:
#10: model/ProcessExecutor.py:879: 2531.9 KiB
    def route_task(_event) -> None:
536 other: 37647.2 KiB
Total allocated size: 134222.6 KiB

The numbers aren’t inspiring, and as expected, there’s a bunch of memory allocated by the callbacks. Way too much memory. At least this validates the supposition as this being the root cause.

If we drop the callbacks, the state machine for each event, and instead, we go with a simple queue to process the events:

Top 10 lines
#1: python3.8/copy.py:229: 19111.0 KiB
    y[deepcopy(key, memo)] = deepcopy(value, memo)
#2: python3.8/copy.py:226: 9003.5 KiB
    y = {}
#3: python3.8/copy.py:278: 6826.5 KiB
    y.__dict__.update(state)
#4: python3.8/copyreg.py:91: 2534.1 KiB
    return cls.__new__(cls, *args)
#5: model/ProcessEvents.py:65: 1148.1 KiB
    self.handlers[event.state][event.token_id] = (event, data)
#6: model/ProcessEvents.py:64: 1040.2 KiB
    self.bystate[event.state][event.token_id] = event
#7: model/ProcessEvents.py:124: 288.0 KiB
    self.events[key] = value
#8: model/ProcessEvents.py:126: 208.0 KiB
    self.handlers[value.state][key] = (value, None)
#9: model/ProcessEvents.py:125: 208.0 KiB
    self.bystate[value.state][key] = value
#10: model/ProcessExecutor.py:711: 144.0 KiB
    self.futures[future] = event.token_id
496 other: 352.5 KiB
Total allocated size: 40864.0 KiB

Wait, what? Memory usage dropped to 40MB out of 134MB?!

How about times? From:

real    0m46,740s
user    0m42,990s
sys     0m0,881s

To:

real    0m19,666s
user    0m19,319s
sys     0m0,444s

Not only memory, but also he time dropped more than half, just by easing the pressure on the garbage collector and memory allocations. The actual processing algorithms that validate the precedence of tokens, routing, etc. were unchanged.

Conclusions

  1. Use memory profiling. It is only a few lines of code, and it’s included in python.

  2. Stay away from passing callbacks that depend on the current context. They don’t look like new variable allocations, but they are, and they hit hard.