Python asyncio TaskGroup by example
Python async is really helpful for anything waiting on I/O. Things like making API calls where most of the time is spent waiting for data to transfer to and from somewhere. For example, one project I was working on went from 30+ minutes to < 5 minutes by making the API calls asyncronous.
This is my quick reference for Python asyncio basics. It's mostly selective copy+paste from the Python 3.13.3 asyncio docs.
This post starts out with a definitions section and increases in complexity up to an example of fire-and-forget background tasks.
Important definitions
couroutine: Implemented with an async def
statement. They can be entered, exited, and resumed at many different points.
await: The await expression suspends the execution of a coroutine on an awaitable (see couroutine above) object. It can only be used inside of a coroutine function.
Level 1: Running a coroutine
The asyncio.run
function runs a coroutine function. async def
functions can't just be called like regular def
functions.
>>> import asyncio
>>> async def main():
... print("hi")
... await asyncio.sleep(1)
... print("Stephen")
>>> # Simply calling a couroutine will do nothing
>>> main()
<coroutine object main at 0x00000210B0315B40>
>>> # Use the asyncio.run() function to run the main() function
>>> asyncio.run(main())
hi
Stephen
Level 2: Run couroutines in series
The following code will pring "hi" after waiting 1 second and "Stephen" after waiting 2 seconds:
import asyncio
import time
async def say_after(delay: int, what: str):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, "hi")
await say_after(2, "Stephen")
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
Expected output:
started at 15:45:20
hi
Stephen
finished at 15:45:23
Notice that the total running time was ~3 seconds because the second say_after
did not run until the first finished.
Level 3: Concurrent coroutines Python 3.11+
This example shows the "magic" of asyncio
. The asyncio.TaskGroup
class was added in Python 3.11 and is a more modern alternative to asyncio.create_task()
.
import asyncio
import time
async def say_after(delay: int, what: str):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
async with asyncio.TaskGroup() as tg:
task_1 = tg.create_task(say_after(1, "hello"))
task_2 = tg.create_task(say_after(2, "Stephen"))
# Implicit await when the context manager exits.
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
Expected output:
started at 16:15:07
hello
Stephen
finished at 16:15:09
Level 4: TaskGroup ExceptionGroups
Exceptions raised in a TaskGroup cancel all the other tasks. The exceptions are combined and raised as an ExceptionGroup
Here is an example:
import asyncio
import time
async def say_after(delay: int, what: str):
# Exception if the delay is not an integer
if not isinstance(delay, int):
raise ValueError(f"'{delay}' is not a valid integer")
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
try:
async with asyncio.TaskGroup() as tg:
task_1 = tg.create_task(say_after(1, "hello"))
task_2 = tg.create_task(say_after("A", "there"))
task_3 = tg.create_task(say_after(2, "Stephen"))
# Implicit await when the context manager exits.
except ExceptionGroup as eg:
for i, exc in enumerate(eg.exceptions):
print(f"{i}: {type(exc)}: {exc}")
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
Expected output:
started at 14:45:07
0: <class 'ValueError'>: 'A' is not a valid integer
finished at 14:45:07
Level 5: Fire-and-forget background tasks
There have been a few times where I needed a fire-and-forget type of function to run in the background without celery or some other background task manager to avoid storing some sensitive parameter on disk somewhere.
Note: There are multiple warnings in the 3.13.3 docs that you need to be careful to keep a "strong reference" to tasks to avoid them getting garbage collected and disappearing before they are done.
import asyncio
import time
async def say_after(delay: int, what: str):
await asyncio.sleep(delay)
print(what)
async def do_background_tasks():
# This set will keep "strong references" to tasks so they
# are not prematurely deleted.
background_tasks = set()
# This loop will create multiple background tasks
for i in range(5):
# Define and schedule a coroutine as a task.
# 'say_after' is a couroutine (defined with 'async def')
# It's called with a different parameter 'i' each time.
#
# asyncio.create_task() scheduls the coroutine to run on
# the event loop and returns a Task object.
task = asyncio.create_task(say_after(i, f"hi for the {i} time"))
# Add the task to the set. This creates a "strong reference" to the task.
# Without this, the task might be garbage collected before finishing.
# Adding it in the set ensures it stays alive while running.
background_tasks.add(task)
# Add a "done callback" to the task.
# task.add_done_callback() registers a function to be called automatically
# *after* the task completes (successuflly, with an error, or cancelled).
#
# The callback function provided is 'background_tasks.discard()'
# 'set.discard(item)' removes 'item' from the set if it's present.
# It does *not* raise an error if the item isn't found (unlike
# set.remove())
#
# When 'task' finishes, asyncio will automatically call
# background_tasks.discard(task). This removes the
# (now finished) task object from the set.
task.add_done_callback(background_tasks.discard)
print(f"started at {time.strftime('%X')}")
# Use the asyncio.Runner() context manager to start
# an event loop and keep it running for a bit with
# additional `runner.run()` tasks.
with asyncio.Runner() as runner:
# fire-and-forget the background tasks
runner.run(do_background_tasks())
# Keep the event loop running and kick
# off some other coroutine tasksin the event loop
runner.run(say_after(0, "Are we done yet?"))
runner.run(say_after(3, "Okay, how about now?"))
print(f"finished at {time.strftime('%X')}")
The expected output should look something like this:
started at 16:50:56
hi for the 0 time
Are we done yet?
hi for the 1 time
hi for the 2 time
hi for the 3 time
Okay, how about now?
finished at 16:50:59
Notice:
- The script ran for ~3 seconds.
- The 4th background task did not run because the event loop ((
asyncio.Runner()
) closed before it completed.
Appendix
asyncio.TaskGroup
The asyncio.TaskGroup
class is a context manager for holding a group of tasks.
- Tasks can be added to the group using
create_task()
. - All tasks are automatically awaited when the context manager exists.
Example:
async def main():
async with asyncio.TaskGroup() as tg:
task_1 = tg.create_task(first_coro(...))
task_2 = tg.create_task(another_coro(...))
print(f"Both tasks completed: {task_1.result()}, {task_2.result()}")
- The
async with
statement will wait for all tasks in the group to finish. - It is possible that the task group,
tg
, could be passed into one of the coroutines and create new tasks withtg.create_task()
. - Once the last task is finished and the
async with
block is exited, no new tasks can be added to the group.
Concurrent coroutines with asyncio.create_task()
Before Python 3.11, corouties could be ran concurrently with asyncio.create_task()
import asyncio
import time
async def say_after(delay: int, what: str):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
task_1 = asyncio.create_task(say_after(1, "hello"))
task_2 = asyncio.create_task(say_after(2, "Stephen"))
await task_1
await task_2
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
Now the total running time for the tasks is ~2 seconds. The two tasks ran concurrently.