MCPcopy
hub / github.com/piccolo-orm/piccolo / run_sync

Function run_sync

piccolo/utils/sync.py:11–29  ·  view source on GitHub ↗

Run the coroutine synchronously - trying to accommodate as many edge cases as possible. 1. When called within a coroutine. 2. When called from ``python -m asyncio``, or iPython with %autoawait enabled, which means an event loop may already be running in the current

(
    coroutine: Coroutine[Any, Any, ReturnType],
)

Source from the content-addressed store, hash-verified

9
10
11def run_sync(
12 coroutine: Coroutine[Any, Any, ReturnType],
13) -> ReturnType:
14 """
15 Run the coroutine synchronously - trying to accommodate as many edge cases
16 as possible.
17 1. When called within a coroutine.
18 2. When called from ``python -m asyncio``, or iPython with %autoawait
19 enabled, which means an event loop may already be running in the
20 current thread.
21 """
22 try:
23 # We try this first, as in most situations this will work.
24 return asyncio.run(coroutine)
25 except RuntimeError:
26 # An event loop already exists.
27 with ThreadPoolExecutor(max_workers=1) as executor:
28 future: Future = executor.submit(asyncio.run, coroutine)
29 return future.result()

Callers 15

run_syncMethod · 0.90
run_syncMethod · 0.90
run_syncMethod · 0.90
create_db_tables_syncFunction · 0.90
drop_db_tables_syncFunction · 0.90
mainFunction · 0.90
update_password_syncMethod · 0.90
login_syncMethod · 0.90
create_user_syncMethod · 0.90
__init__Method · 0.90
run_syncMethod · 0.90
get_version_syncMethod · 0.90

Calls 1

runMethod · 0.45

Tested by 15

test_sync_simpleMethod · 0.72
testMethod · 0.72
test_sync_nestedMethod · 0.72
test_adminMethod · 0.72
test_activeMethod · 0.72