Python asyncio TaskGroup by example

Posted 1 week, 3 days ago

Python async is really helpful for anything waiting on I/O. Things like making API calls where most of the time is spent waiting for data to transfer to and from somewhere. For example, one project I was working on went from 30+ minutes to < 5 minutes by making the API calls asyncronous.

This is my quick reference for Python asyncio basics. It's mostly selective copy+paste from the Python 3.13.3 asyncio docs.

This post starts out with a definitions section and increases in complexity up to an example of fire-and-forget background tasks.

Important definitions

couroutine: Implemented with an async def statement. They can be entered, exited, and resumed at many different points.

await: The await expression suspends the execution of a coroutine on an awaitable (see couroutine above) object. It can only be used inside of a coroutine function.

Level 1: Running a coroutine

The asyncio.run function runs a coroutine function. async def functions can't just be called like regular def functions.

>>> import asyncio

>>> async def main():
...    print("hi")
...    await asyncio.sleep(1)
...    print("Stephen")

>>> # Simply calling a couroutine will do nothing
>>> main()
<coroutine object main at 0x00000210B0315B40>

>>> # Use the asyncio.run() function to run the main() function
>>> asyncio.run(main())
hi
Stephen

Level 2: Run couroutines in series

The following code will pring "hi" after waiting 1 second and "Stephen" after waiting 2 seconds:

import asyncio
import time

async def say_after(delay: int, what: str):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, "hi")
    await say_after(2, "Stephen")

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

Expected output:

started at 15:45:20
hi
Stephen
finished at 15:45:23

Notice that the total running time was ~3 seconds because the second say_after did not run until the first finished.

Level 3: Concurrent coroutines Python 3.11+

This example shows the "magic" of asyncio. The asyncio.TaskGroup class was added in Python 3.11 and is a more modern alternative to asyncio.create_task().

import asyncio
import time

async def say_after(delay: int, what: str):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    async with asyncio.TaskGroup() as tg:
        task_1 = tg.create_task(say_after(1, "hello"))
        task_2 = tg.create_task(say_after(2, "Stephen"))

    # Implicit await when the context manager exits.

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

Expected output:

started at 16:15:07
hello
Stephen
finished at 16:15:09

Level 4: TaskGroup ExceptionGroups

Exceptions raised in a TaskGroup cancel all the other tasks. The exceptions are combined and raised as an ExceptionGroup

Here is an example:

import asyncio
import time

async def say_after(delay: int, what: str):
    # Exception if the delay is not an integer
    if not isinstance(delay, int):
        raise ValueError(f"'{delay}' is not a valid integer")
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    try:
        async with asyncio.TaskGroup() as tg:
            task_1 = tg.create_task(say_after(1, "hello"))
            task_2 = tg.create_task(say_after("A", "there"))
            task_3 = tg.create_task(say_after(2, "Stephen"))

        # Implicit await when the context manager exits.

    except ExceptionGroup as eg:
        for i, exc in enumerate(eg.exceptions):
            print(f"{i}: {type(exc)}: {exc}")

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

Expected output:

started at 14:45:07
0: <class 'ValueError'>: 'A' is not a valid integer
finished at 14:45:07

Level 5: Fire-and-forget background tasks

There have been a few times where I needed a fire-and-forget type of function to run in the background without celery or some other background task manager to avoid storing some sensitive parameter on disk somewhere.

Note: There are multiple warnings in the 3.13.3 docs that you need to be careful to keep a "strong reference" to tasks to avoid them getting garbage collected and disappearing before they are done.

import asyncio
import time

async def say_after(delay: int, what: str):
    await asyncio.sleep(delay)
    print(what)

async def do_background_tasks():
    # This set will keep "strong references" to tasks so they
    #   are not prematurely deleted.
    background_tasks = set()

    # This loop will create multiple background tasks
    for i in range(5):
        # Define and schedule a coroutine as a task.
        #   'say_after' is a couroutine (defined with 'async def')
        #   It's called with a different parameter 'i' each time.
        #
        # asyncio.create_task() scheduls the coroutine to run on
        #    the event loop and returns a Task object.
        task = asyncio.create_task(say_after(i, f"hi for the {i} time"))

        # Add the task to the set. This creates a "strong reference" to the task.
        #   Without this, the task might be garbage collected before finishing.
        #   Adding it in the set ensures it stays alive while running.
        background_tasks.add(task)

        # Add a "done callback" to the task.
        #    task.add_done_callback() registers a function to be called automatically
        #    *after* the task completes (successuflly, with an error, or cancelled).
        #
        # The callback function provided is 'background_tasks.discard()'
        #   'set.discard(item)' removes 'item' from the set if it's present.
        #   It does *not* raise an error if the item isn't found (unlike
        #   set.remove())
        #
        #    When 'task' finishes, asyncio will automatically call
        #    background_tasks.discard(task). This removes the
        #    (now finished) task object from the set.
        task.add_done_callback(background_tasks.discard)


print(f"started at {time.strftime('%X')}")

# Use the asyncio.Runner() context manager to start
#    an event loop and keep it running for a bit with 
#    additional `runner.run()` tasks.
with asyncio.Runner() as runner:
    # fire-and-forget the background tasks
    runner.run(do_background_tasks())

    # Keep the event loop running and kick
    # off some other coroutine tasksin the event loop
    runner.run(say_after(0, "Are we done yet?"))
    runner.run(say_after(3, "Okay, how about now?"))

print(f"finished at {time.strftime('%X')}")

The expected output should look something like this:

started at 16:50:56
hi for the 0 time
Are we done yet?
hi for the 1 time
hi for the 2 time
hi for the 3 time
Okay, how about now?
finished at 16:50:59

Notice:

  • The script ran for ~3 seconds.
  • The 4th background task did not run because the event loop ((asyncio.Runner()) closed before it completed.

Appendix

asyncio.TaskGroup

The asyncio.TaskGroup class is a context manager for holding a group of tasks.

  • Tasks can be added to the group using create_task().
  • All tasks are automatically awaited when the context manager exists.

Example:

async def main():
    async with asyncio.TaskGroup() as tg:
        task_1 = tg.create_task(first_coro(...))
        task_2 = tg.create_task(another_coro(...))
    print(f"Both tasks completed: {task_1.result()}, {task_2.result()}")
  • The async with statement will wait for all tasks in the group to finish.
  • It is possible that the task group, tg, could be passed into one of the coroutines and create new tasks with tg.create_task().
  • Once the last task is finished and the async with block is exited, no new tasks can be added to the group.

Concurrent coroutines with asyncio.create_task()

Before Python 3.11, corouties could be ran concurrently with asyncio.create_task()

import asyncio
import time

async def say_after(delay: int, what: str):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    task_1 = asyncio.create_task(say_after(1, "hello"))
    task_2 = asyncio.create_task(say_after(2, "Stephen"))

    await task_1
    await task_2

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

Now the total running time for the tasks is ~2 seconds. The two tasks ran concurrently.