Christopher Sardegna's Blog

Thoughts on technology, design, data analysis, and data visualization.


Synchronous Code in Asynchronous Contexts

Synchronous Code in Asynchronous Contexts

The Dependency Problem

In Python, asynchronous code can be as simple as writing async def get_result() and later await get_result(). However, if get_result interacts with third party library code that cannot be awaited, we run into problems where asynchronous-looking code runs synchronously.

Sample Function

Several prominent HTTP libraries in the Python ecosystem do not support asynchronous programming natively. Consider the following code:

async def get_response_code(url: str) -> int:
    return requests.get(url).status_code

No matter how hard we try, the requests will all be executed in series:

async def run_concurrent() -> None:
    awaitables: List[coroutine] = []
    for url in list_of_url_strings:
        awaitables.append(get_response_code(url))
    return await asyncio.gather(*awaitables)

Why Does This Happen

Even though we are awaiting the result of get_response_code, nothing in that function tells Python when to actually wait for IO, so the code always waits for the current coroutine to finish before starting the next one. We can verify this with asyncio.run()'s debug parameter:

>>> asyncio.run(run_concurrent(), debug=True)
Executing <Task finished name='Task-58' coro=<get_response_code() done, defined at <stdin>:1> result=200 created at ~/.pyenv/versions/3.8.2/lib/python3.8/asyncio/tasks.py:806> took 0.699 seconds
Executing <Task finished name='Task-59' coro=<get_response_code() done, defined at <stdin>:1> result=200 created at ~/.pyenv/versions/3.8.2/lib/python3.8/asyncio/tasks.py:806> took 0.637 seconds
Executing <Task finished name='Task-60' coro=<get_response_code() done, defined at <stdin>:1> result=200 created at ~/.pyenv/versions/3.8.2/lib/python3.8/asyncio/tasks.py:806> took 0.687 seconds
[200, 200, 200]

TypeError on await

If we try and solve this by awaiting the call to requests.get() like so:

async def get_response_code(url: str) -> int:
    result = await requests.get(url).status_code
    return result

Python will raise the following error:

>>> asyncio.run(get_response_code('https://google.com'))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "~/.pyenv/versions/3.8.2/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "~/.pyenv/versions/3.8.2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "<stdin>", line 2, in get_response_code
TypeError: object int can't be used in 'await' expression

Because requests.get() does not return a future, we cannot await its result.

Wrapping Synchronous Code in an Event Loop

Given that knowledge, we can instead nest the synchronous request inside of its own event loop, then await the result of that event loop. The standard library provides a function called run_in_executor which we can leverage to wrap this:

async def get_response_code(url: str) -> int:
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, requests.get, url)
    return result.status_code

Now, when sample code will run concurrently, as verified by asyncio.run()'s debug parameter:

>>> asyncio.run(run_concurrent(), debug=True)
[200, 200, 200]

A More Complex Example

If we need to pass more complex data to the synchronous code, we can leverage functools.partial, which allows us to generate a callable for a function that has some or all of the parameters pre-filled.

The Problem

In Amazon’s boto3 docs, all of the parameters for a Lambda client’s invoke method are named:

boto3.client('lambda').invoke(
    FunctionName='lambda-function-name',
    InvocationType='RequestResponse',
    Payload={...},
    Qualifier='$LATEST'
)

However, this form does not work with run_in_executor because the arguments are only passed as positional parameters, not named parameters. To get around this, we need to create a callable that already has these data filled.

Building a Partial

A small partial can look like this:

>>> from functools import partial
>>> def multiply(a: int, b: int) -> int:
...     return a * b

>>> double = partial(multiply, 2)
>>> double(6)  # Same as multiply(2, 6)
12

Thus, we can build a partial for invoke like so:

callable = functools.partial(
    boto3.client('lambda').invoke,
    FunctionName='lambda-function-name',
    InvocationType='RequestResponse',
    Payload=payload,
    Qualifier='$LATEST'
)

Now, callable({}) will execute invoke with all of the provided named parameters, passing {} as payload.

Bringing it all Together

We can just drop the entire partial block inside of the callable positional parameter in run_in_executor to bring these two methods together:

async def get_lambda_response(payload: dict) -> dict:
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        None, functools.partial(
            boto3.client('lambda').invoke,
            FunctionName='lambda-function-name',
            InvocationType='RequestResponse',
            Payload=payload,
            Qualifier='$LATEST'
        )
    )
    ...
    return result

This allows us to call the invoke method of a lambda client using its named parameters, which is not possible in the default implementation of run_in_executor.

Conclusion

Using a combination of these two methods, we can efficiently leverage popular libraries' synchronous code in asynchronous contexts.