Files
exo/rust/exo_rs/tests/test_python.py
Andrei Cravtov 629c55d6ba Rename exo_pyo3_bindings to exo_rs (#2131)
## Motivation

(I think it) Makes Evan's massive PR easier to merge later on

## Changes

- Renamed exo_pyo3_bindings to exo_rs
- Upgraded versions of pyo3-based dependencies
- Renamed PyFromSwarm to just FromSwarm, and PyNetworkingHandle to just
NetworkingHandle
2026-05-31 19:23:41 +01:00

50 lines
1.2 KiB
Python

import asyncio
import pytest
from _pytest.capture import CaptureFixture
from exo_rs import (
Keypair,
NetworkingHandle,
NoPeersSubscribedToTopicError,
Pidfile,
FromSwarm,
)
@pytest.mark.asyncio
async def test_sleep_on_multiple_items() -> None:
print("PYTHON: starting handle")
h = NetworkingHandle(Keypair.generate(), [], 0)
rt = asyncio.create_task(_await_recv(h))
# sleep for 4 ticks
for i in range(4):
await asyncio.sleep(1)
try:
await h.gossipsub_publish("topic", b"somehting or other")
except NoPeersSubscribedToTopicError as e:
print("caught it", e)
def test_pidfile(capsys: CaptureFixture[str]):
with capsys.disabled():
print("\nbefore python")
scoped_lock_file()
print("after python")
async def _await_recv(h: NetworkingHandle):
while True:
event = await h.recv()
match event:
case FromSwarm.Connection() as c:
print(f"PYTHON: connection update: {c}")
case FromSwarm.Message() as m:
print(f"PYTHON: message: {m}")
def scoped_lock_file():
a = Pidfile("/tmp/lock.pid", 0o0600)