Explaining event loop in 100 lines of code

There is plenty of articles out there about the event loop. However, as a software engineer, I prefer to read code, not text. And there is no better way of learning a new concept than implementing it yourself. So, let's try to grasp the idea of the event loop by coding a new and shiny one.

NB: In the article, we will try to describe the idea of the event loop in general, not a specific implementation of the event loop in Node.js or Python, or some other language/library.

Justification

The need for the event loop usually arises in response to a requirement to tackle with event-driven environments. The most obvious examples are asynchronous I/O, UIs, and simulations. Without loss of generality, in the article, we will be focusing mostly on the async I/O related scenarios. And for the sake of simplicity, we will try to deal only with the most primitive language constructs. Hopefully having only the closures support will be enough for our journey.

Imagine we program in a language which behaves similarly to Python, Ruby, or PHP (but not JavaScript, its execution model is a bit different). Consider the following snippet:

// sync code
data = sync_sock.recv(1024);
print('Here is your data', data);

// async code
function on_read(data) {
  print(data);
}
async_sock.recv(1024, on_read);
print('No data right after recv()');

No knowledge of the internals of the platform is required to start reading data from sockets following any of the approaches above. It's just enough to remember that some calls are blocking and the data will be available immediately after the return while other ones deliver the data only eventually, calling you back when it becomes available and letting you instantly jump to the next line.

Now, let's make the async example a bit more complicated:

function on_sent() {
  function on_read(data) {
    print(data);
  }
  sock.recv(1024, on_read);
}

sock.send(bytes('foo'), on_sent);

What is the desired control flow of our script? I believe the author wanted it to be sock.send -> on_sent -> sock.recv -> on_read. But what is the actual control flow? It's always tricky to statically follow the control flow of the async code based on callbacks, but let's try. On the topmost level, we have only the definition of on_sent() function and the call to sock.send(). Basically, the execution will start from the last line of the script, triggering some asynchronous network data transfer. And what usually happens when a script passes through the last line of the code? Right, it just exits, even though some background tasks may still be active. What does it mean to us? Probably, we will never see on_sent running.

One way to solve the problem with abandoned callbacks is to introduce a supervising entity tracking such callbacks and executing them when it's appropriate. Such an entity is usually called an event loop. In the snippet above the event loop would be responsible for suspending the execution of the script after the sock.send() and resuming it when the data has actually been sent. The entry point to resume the execution will be on_sent() callback. Similarly, when the execution of on_sent() function reaches sock.recv() a new callback on_read() will be registered and the script will be suspended again.

Depending on the language/platform we use, this could be done transparently of under the hood. For example, Node.js doesn't terminate the script when the "last line" is passed. Node.js has a built-in event loop and I/O operations are async by default, hence all the callbacks will be registered and awaited without expressly stating so. On the other hand, Python has an explicit event loop in the standard library and provides some ways to switch to the async operation mode.

But let's get back to the code...

Event loop in less than 100 lines of code

Your level of understanding of the event loop concept may vary at the moment. But there is nothing better for comprehension of an idea than implementing it yourself.

We will be implementing our event loop using plain old Python without any specific to the language features. The only thing we need is closures support.

Let's start from the use case:

import socket as _socket

class socket:
  pass  # tbd

def main():
  sock = socket(_socket.AF_INET, _socket.SOCK_STREAM)

  def on_timer():

    def on_conn(err):
      if err:
        raise err

      def on_sent(err):
        if err:
          sock.close()
          raise err

        def on_read(err, data=None):
          sock.close()
          if err:
            raise err
          print(data)

        sock.recv(1024, on_read)

      sock.sendall(b'foobar', on_sent)

    sock.connect(('127.0.0.1', 53210), on_conn)

  set_timer(1000, on_timer)

The desired control flow is as follows: main -> set_timer -> on_timer -> sock.connect -> on_conn -> sock.sendall -> on_sent -> sock.recv -> on_read.

Notice, that we wrapped the scenario in the entry point function main(). Also, we have a tweaked version of the socket mimicking the standard one but making all the operations asynchronous. See the appendix for the asynchronous socket implementation details. Another thing we've introduced is a timer. A timer is a way to sleep (i.e. postpone the execution) in the asynchronous code.

The entry point is supposed to be run by the event loop:

event_loop = EventLoop()
Context.set_event_loop(event_loop)
event_loop.run(main)

Now, let's take a look at the set_timer() implementation. This function is a part of the runtime we provide, it resides on the same level as the event loop itself:

class set_timer(Context):
  def __init__(self, duration, callback):
    """ duration is in microseconds """
    self.evloop.set_timer(duration, callback)

And the Context class is an execution context, providing a placeholder for the event loop reference:

class Context:
  _event_loop = None

  @classmethod
  def set_event_loop(cls, event_loop):
    cls._event_loop = event_loop

  @property
  def evloop(self):
    return self._event_loop

Basically, set_timer() is just a convenience method to call event_loop.set_timer() without knowing about the current event loop variable.

The same trick we do with the socket class which is also a part of the runtime. In the constructor, we create a normal socket object, switch it to the non-blocking mode and register the underlying file descriptor in the event loop. Every time the file descriptor has some new information (there is some data to be read, all the data has been written, or an error occurred) the event loop will invoke the corresponding callback (if there is one):

import socket as _socket

class socket(Context):
  def __init__(self, *args):
    self._sock = _socket.socket(*args)
    self._sock.setblocking(False)  # Important line
    self.evloop.register_fileobj(self._sock, self._on_event)
    # ...
    self._callbacks = {}

  def _on_event(self, *args):
    # run a callback from self._callbacks if exists

  def connect(self, addr, callback):
    # self._callbacks['on_conn'] = callback
    # self._sock.connect(addr)

  def recv(self, n, callback):
    # self._callbacks['on_read_ready'] = callback

  def sendall(self, data, callback):
    # self._callbacks['on_write_ready'] = callback

Getting back to the use case snippet. Once the control flow reaches the sock.connect() line a callback on_conn will be registered in the tweaked socket and the actual (but asynchronous) _sock.connect() will be triggered. But starting from this moment and until the connection procedure is finished there is nothing to execute in our script. The event loop needs to handle this suspension somehow.

And finally the promised 100 37 lines of code to implement The Event Loop:

import time

class EventLoop:
  def __init__(self):
    self._queue = Queue()
    self._time = None

  def run(self, entry_point, *args):
    self._execute(entry_point, *args)

    while not self._queue.is_empty():
      fn, mask = self._queue.pop(self._time)
      self._execute(fn, mask)

    self._queue.close()

  def _execute(self, callback, *args):
    self._time = hrtime()
    try:
      callback(*args)  # new callstack starts
    except Exception as err:
      print('Uncaught exception:', err)
    self._time = hrtime()

  def register_fileobj(self, fileobj, callback):
    self._queue.register_fileobj(fileobj, callback)

  def unregister_fileobj(self, fileobj):
    self._queue.unregister_fileobj(fileobj)

  def set_timer(self, duration, callback):
    self._time = hrtime()
    self._queue.register_timer(self._time + duration,
                               lambda _: callback())

def hrtime():
  return int(time.time() * 10e6)

The most important part of the code from above is EventLoop.run() method. What a surprise, it's a loop! However, before starting the loop we have to execute the entry point [main() function from our example], to prepopulate a mysterious queue of callbacks. Then, while the queue is not empty, we pull the next callback from the queue and execute it. And those callbacks are essentially the ones we've registered by the set_timer or sock.(connect|recv|sendall) calls. Basically, that's it for the event loop!

And now 54 lines more to implement The Queue:

import collections
import heapq
import selectors

class Queue:
  def __init__(self):
    self._selector = selectors.DefaultSelector()
    self._timers = []
    self._timer_no = 0
    self._ready = collections.deque()

  def is_empty(self):
    return not (self._ready or self._timers or self._selector.get_map())

  def pop(self, tick):
    if self._ready:
      return self._ready.popleft()

    timeout = None
    if self._timers:
      timeout = (self._timers[0][0] - tick) / 10e6

    events = self._selector.select(timeout)
    for key, mask in events:
      callback = key.data
      self._ready.append((callback, mask))

    if not self._ready and self._timers:
      idle = (self._timers[0][0] - tick)
      if idle > 0:
        time.sleep(idle / 10e6)
        return self.pop(tick + idle)

    while self._timers and self._timers[0][0] <= tick:
      _, _, callback = heapq.heappop(self._timers)
      self._ready.append((callback, None))

    return self._ready.popleft()

  def register_timer(self, tick, callback):
    timer = (tick, self._timer_no, callback)
    heapq.heappush(self._timers, timer)
    self._timer_no += 1

  def register_fileobj(self, fileobj, callback):
    self._selector.register(fileobj,
      selectors.EVENT_READ | selectors.EVENT_WRITE,
      callback)

  def unregister_fileobj(self, fileobj):
    self._selector.unregister(fileobj)

  def close(self):
    self._selector.close()

The most important part of the Queue class is Queue.pop() method. It returns us a next callback ready to be executed. The Queue class is just a facade for several underlying sub-queues. For the sake of simplicity, we have only two kinds of such sub-queues. One to deal with async I/O and one to deal with timers.

Asynchronous I/O is implemented using selectors module which is basically a high-level wrapper around select-like functionality. The queue for async I/O callbacks boils down to a single selector.select() call which returns us a list of activated file descriptors with attached callbacks and operation masks (EVENT_READ or EVENT_WRITE). And by activated I mean - performing a corresponding to the mask I/O operation on a file descriptor will not block the execution of the main thread.

Timers are implemented via a priority queue. A timer S with a shorter duration may be set after the timer L with the longer duration leading to a situation when S has to be invoked before L even though L was already scheduled. The standard module heapq reorders the underlying queue based on the priority of the elements. And for a timer, the priority is defined as an earliest wall clock time at which this timer can be executed (with an auto-increment counter as a tie-breaker to avoid collisions). That is, if the timers queue is not empty, the next timer to invoke is at its head self._timers[0].

Looking at the Queue.pop() method it's easy to notice that it can block the execution of the main thread. And it is great. This is exactly the way the event loop achieves the suspension of the execution of the program at the moments when there is nothing to run.

On each iteration, the event loop tries to synchronously pull out the next callback from the queue. If there is no callback to execute at the moment, pop() blocks the main thread. When the callback is ready, the event loop executes it. The execution of a callback always happens synchronously. Each callback execution starts a new call stack which lasts until the utter synchronous call in the call tree with a root in the original callback. This also explains why errors should be delivered as callback parameters and not being thrown. Throwing an exception affects only the current call stack while the receiver's call stack may be in a different tree. And at any given moment of time, only a single call stack exists. I.e. if an exception thrown by a function was not caught all along the current call stack, it will pop up directly in EventLoop._execute() method.

(image from MDN)

On the visualization above the queue feeds the loop. Callbacks enqueued on the right and dequeued on the left and the order of the callbacks in the queue corresponds to their readiness and priority. The leftmost callback originates the call stack, which goes from a single element (the callback itself) up to the deepest synchronous invocation (or up to max-stack-size with the corresponding error). Execution of the current callback registers new callbacks in the queue. And the loop repeats.

As I've told you, event loop - it is simple! Obviously, we didn't cover a lot of use cases and the error handling is rather omitted, but again - we made a workable example of an event loop in 91 lines of code!

However, programs that are written with extensive usage of callbacks hard to read and maintain. Even though we managed to unleash the power of single-threaded asynchronous programming by introducing the event loop as the core component of our programs, we still can improve the expressiveness and decrease the complexity of our code by eliminating the numerous callbacks here and there. Check out the next article in the series to learn more about this technique - "From Callback Hell to async/await Heaven".

Make code, not war!

Appendix: source code

Click here to unfold...

Event loop & client code:

# python3
# file: event_loop.py
import collections
import errno
import heapq
import json
import random
import selectors
import socket as _socket
import sys
import time


class EventLoop:
  def __init__(self):
    self._queue = Queue()
    self._time = None

  def run(self, entry_point, *args):
    self._execute(entry_point, *args)

    while not self._queue.is_empty():
      fn, mask = self._queue.pop(self._time)
      self._execute(fn, mask)

    self._queue.close()

  def register_fileobj(self, fileobj, callback):
    self._queue.register_fileobj(fileobj, callback)

  def unregister_fileobj(self, fileobj):
    self._queue.unregister_fileobj(fileobj)

  def set_timer(self, duration, callback):
    self._time = hrtime()
    self._queue.register_timer(self._time + duration,
                   lambda _: callback())

  def _execute(self, callback, *args):
    self._time = hrtime()
    try:
      callback(*args)  # new callstack starts
    except Exception as err:
      print('Uncaught exception:', err)
    self._time = hrtime()


class Queue:
  def __init__(self):
    self._selector = selectors.DefaultSelector()
    self._timers = []
    self._timer_no = 0
    self._ready = collections.deque()

  def register_timer(self, tick, callback):
    timer = (tick, self._timer_no, callback)
    heapq.heappush(self._timers, timer)
    self._timer_no += 1

  def register_fileobj(self, fileobj, callback):
    self._selector.register(fileobj,
        selectors.EVENT_READ | selectors.EVENT_WRITE,
        callback)

  def unregister_fileobj(self, fileobj):
    self._selector.unregister(fileobj)

  def pop(self, tick):
    if self._ready:
      return self._ready.popleft()

    timeout = None
    if self._timers:
      timeout = (self._timers[0][0] - tick) / 10e6

    events = self._selector.select(timeout)
    for key, mask in events:
      callback = key.data
      self._ready.append((callback, mask))

    if not self._ready and self._timers:
      idle = (self._timers[0][0] - tick)
      if idle > 0:
        time.sleep(idle / 10e6)
        return self.pop(tick + idle)

    while self._timers and self._timers[0][0] <= tick:
      _, _, callback = heapq.heappop(self._timers)
      self._ready.append((callback, None))

    return self._ready.popleft()

  def is_empty(self):
    return not (self._ready or self._timers or self._selector.get_map())

  def close(self):
    self._selector.close()


class Context:
  _event_loop = None

  @classmethod
  def set_event_loop(cls, event_loop):
    cls._event_loop = event_loop

  @property
  def evloop(self):
    return self._event_loop


class IOError(Exception):
  def __init__(self, message, errorno, errorcode):
    super().__init__(message)
    self.errorno = errorno
    self.errorcode = errorcode

  def __str__(self):
    return super().__str__() + f' (error {self.errorno} {self.errorcode})'


def hrtime():
  """ returns time in microseconds """
  return int(time.time() * 10e6)


class set_timer(Context):
  def __init__(self, duration, callback):
    """ duration is in microseconds """
    self.evloop.set_timer(duration, callback)


class socket(Context):
  def __init__(self, *args):
    self._sock = _socket.socket(*args)
    self._sock.setblocking(False)
    self.evloop.register_fileobj(self._sock, self._on_event)
    # 0 - initial
    # 1 - connecting
    # 2 - connected
    # 3 - closed
    self._state = 0
    self._callbacks = {}

  def connect(self, addr, callback):
    assert self._state == 0
    self._state = 1
    self._callbacks['conn'] = callback
    err = self._sock.connect_ex(addr)
    assert errno.errorcode[err] == 'EINPROGRESS'

  def recv(self, n, callback):
    assert self._state == 2
    assert 'recv' not in self._callbacks

    def _on_read_ready(err):
      if err:
        return callback(err)
      data = self._sock.recv(n)
      callback(None, data)

    self._callbacks['recv'] = _on_read_ready

  def sendall(self, data, callback):
    assert self._state == 2
    assert 'sent' not in self._callbacks

    def _on_write_ready(err):
      nonlocal data
      if err:
        return callback(err)

      n = self._sock.send(data)
      if n < len(data):
        data = data[n:]
        self._callbacks['sent'] = _on_write_ready
      else:
        callback(None)

    self._callbacks['sent'] = _on_write_ready

  def close(self):
    self.evloop.unregister_fileobj(self._sock)
    self._callbacks.clear()
    self._state = 3
    self._sock.close()

  def _on_event(self, mask):
    if self._state == 1:
      assert mask == selectors.EVENT_WRITE
      cb = self._callbacks.pop('conn')
      err = self._get_sock_error()
      if err:
        self.close()
      else:
        self._state = 2
      cb(err)

    if mask & selectors.EVENT_READ:
      cb = self._callbacks.get('recv')
      if cb:
        del self._callbacks['recv']
        err = self._get_sock_error()
        cb(err)

    if mask & selectors.EVENT_WRITE:
      cb = self._callbacks.get('sent')
      if cb:
        del self._callbacks['sent']
        err = self._get_sock_error()
        cb(err)

  def _get_sock_error(self):
    err = self._sock.getsockopt(_socket.SOL_SOCKET,
                  _socket.SO_ERROR)
    if not err:
      return None
    return IOError('connection failed',
             err, errno.errorcode[err])

###############################################################################

class Client:
  def __init__(self, addr):
    self.addr = addr

  def get_user(self, user_id, callback):
    self._get(f'GET user {user_id}\n', callback)

  def get_balance(self, account_id, callback):
    self._get(f'GET account {account_id}\n', callback)

  def _get(self, req, callback):
    sock = socket(_socket.AF_INET, _socket.SOCK_STREAM)

    def _on_conn(err):
      if err:
        return callback(err)

      def _on_sent(err):
        if err:
          sock.close()
          return callback(err)

        def _on_resp(err, resp=None):
          sock.close()
          if err:
            return callback(err)
          callback(None, json.loads(resp))

        sock.recv(1024, _on_resp)

      sock.sendall(req.encode('utf8'), _on_sent)

    sock.connect(self.addr, _on_conn)


def get_user_balance(serv_addr, user_id, done):
  client = Client(serv_addr)

  def on_timer():

    def on_user(err, user=None):
      if err:
        return done(err)

      def on_account(err, acc=None):
        if err:
          return done(err)
        done(None, f'User {user["name"]} has {acc["balance"]} USD')

      if user_id % 5 == 0:
        raise Exception('Do not throw from callbacks')
      client.get_balance(user['account_id'], on_account)

    client.get_user(user_id, on_user)

  set_timer(random.randint(0, 10e6), on_timer)


def main(serv_addr):
  def on_balance(err, balance=None):
    if err:
      print('ERROR', err)
    else:
      print(balance)

  for i in range(10):
    get_user_balance(serv_addr, i, on_balance)


if __name__ == '__main__':
  event_loop = EventLoop()
  Context.set_event_loop(event_loop)

  serv_addr = ('127.0.0.1', int(sys.argv[1]))
  event_loop.run(main, serv_addr)

Accessory server (not based on the event loop):

# python3
# file: server.py
import json
import random
import sys
from socketserver import BaseRequestHandler, TCPServer
from uuid import uuid4


class Handler(BaseRequestHandler):
  users = {}
  accounts = {}

  def handle(self):
    client = f'client {self.client_address}'
    req = self.request.recv(1024)
    if not req:
      print(f'{client} unexpectedly disconnected')
      return

    print(f'{client} < {req}')
    req = req.decode('utf8')
    if req[-1] != '\n':
      raise Exception('Max request length exceeded')

    method, entity_kind, entity_id = req[:-1].split(' ', 3)
    if (method != 'GET'
       or entity_kind not in ('user', 'account')
       or not entity_id.isdigit()):
      raise Exception('Bad request')

    if entity_kind == 'user':
      user = self.users.get(entity_id) or {'id': entity_id}
      self.users[entity_id] = user

      if 'name' not in user:
        user['name'] = str(uuid4()).split('-')[0]

      if 'account_id' not in user:
        account_id = str(len(self.accounts) + 1)
        account = {'id': account_id,
               'balance': random.randint(0, 100)}
        self.accounts[account_id] = account
        user['account_id'] = account_id
      self.send(user)
      return

    if entity_kind == 'account':
      account = self.accounts[entity_id]
      self.send(account)
      return

  def send(self, data):
    resp = json.dumps(data).encode('utf8')
    print(f'client {self.client_address} > {resp}')
    self.request.sendall(resp)


if __name__ == '__main__':
  port = int(sys.argv[1])
  with TCPServer(('127.0.0.1', port), Handler) as server:
    server.serve_forever()

Run it:

# server
> python server.py 53210

# client
> python event_loop.py 53210

... or check it out on GitHub and don't forget to give it a star 😉