asyncio snippet: Get running event loop or create new loop
This asyncio
snippet shows how to get the running event loop or create a new loop if none is running.
import asyncio
def get_or_create_eventloop():
try:
return asyncio.get_event_loop()
except RuntimeError as ex:
if "There is no current event loop in thread" in str(ex):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return asyncio.get_event_loop()
else:
raise
# Usage example:
get_or_create_eventloop().create_task(asyncio.sleep(1))