1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-27 13:34:28 +00:00
royalnet/test_executor.py
2018-01-15 18:25:12 +01:00

17 lines
No EOL
414 B
Python

import asyncio
import functools
import concurrent.futures
loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
async def call_me():
await loop.run_in_executor(executor, functools.partial(print, "ciao"))
return
async def spam_calls():
while True:
asyncio.ensure_future(call_me())
await asyncio.sleep(1)
loop.run_until_complete(spam_calls())