Skip to content

Commit 1fc9766

Browse files
authored
Merge pull request #6 from A5rocks/into-the-future
Update monitor to pass basic tests
2 parents e5b20d0 + 499e4c3 commit 1fc9766

File tree

7 files changed

+143
-20
lines changed

7 files changed

+143
-20
lines changed

.github/workflows/ci.yml

+1-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ jobs:
2323
allow-prereleases: true
2424

2525
- run: pip install uv nox
26-
- run: nox -s test
27-
# TODO: combine coverage
26+
- run: nox -s test test_oldest
2827

2928
- name: upload coverage
3029
if: always()

noxfile.py

+31
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,37 @@ def precommit(session):
1818
def test(session):
1919
session.install(".")
2020
session.install("-r", "test-requirements.txt")
21+
run_tests(session)
22+
23+
24+
@nox.session(venv_backend="uv")
25+
def test_oldest(session):
26+
pyver = session.run("python", "--version", silent=True)
27+
pyver = ".".join(pyver.split(" ")[1].split(".")[:2])
28+
# based on classifiers
29+
possible_trio = {
30+
"3.13": "trio>=0.26.0",
31+
"3.12": "trio>=0.23.0",
32+
"3.11": "trio>=0.21.0",
33+
"3.10": "trio>=0.20.0",
34+
"3.9": "trio",
35+
}
36+
37+
session.install(".", possible_trio[pyver], "--resolution=lowest-direct")
38+
session.install("pip")
39+
versions = session.run("pip", "freeze", silent=True)
40+
41+
# make sure we don't change the trio version
42+
session.install(
43+
"-r",
44+
"test-requirements.txt",
45+
*[line for line in versions.split() if line.startswith("trio==")],
46+
)
47+
48+
run_tests(session)
49+
50+
51+
def run_tests(session):
2152
session.run(
2253
"coverage",
2354
"run",

pyproject.toml

+6-2
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ name = "trio-monitor"
99
description = "A monitor utility for Trio"
1010
readme = "README.rst"
1111
license = "MIT OR Apache-2.0"
12-
dependencies = ["trio"]
13-
requires-python = ">=3.5"
12+
dependencies = ["trio>=0.19.0"]
13+
requires-python = ">=3.9"
1414
authors = [
1515
{ name = "Lura Skye", email = "[email protected]" }
1616
]
@@ -68,3 +68,7 @@ source = ["src", "**/.nox/test*/**/site-packages"]
6868
[tool.coverage.report]
6969
precision = 1
7070
exclude_lines = ["pragma: no cover", "abc.abstractmethod"]
71+
72+
[tool.uv]
73+
# make sure that nox reinstalls trio-monitor
74+
reinstall-package = ["trio-monitor"]

src/trio_monitor/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
"""Top-level package for trio-monitor."""
2+
3+
from .monitor import Monitor as Monitor

src/trio_monitor/_tests/test_example.py

-2
This file was deleted.
+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import trio
2+
import trio_monitor
3+
import pytest
4+
5+
6+
def run_with_monitor(async_fn, monitor):
7+
trio.run(
8+
async_fn,
9+
instruments=[monitor],
10+
clock=trio.testing.MockClock(autojump_threshold=0),
11+
)
12+
13+
14+
def test_it_does_not_crash():
15+
async def main():
16+
async with trio.open_nursery() as nursery:
17+
nursery.start_soon(trio.sleep, 1)
18+
19+
run_with_monitor(main, trio_monitor.Monitor())
20+
21+
22+
def test_it_can_start_a_server():
23+
async def main():
24+
async with trio.open_nursery() as nursery:
25+
listeners = await nursery.start(trio.serve_tcp, monitor.listen_on_stream, 0)
26+
27+
# make sure it can handle connections
28+
for listener in listeners:
29+
for _ in range(2):
30+
stream = await trio.testing.open_stream_to_socket_listener(listener)
31+
async with stream:
32+
buffer = b""
33+
with trio.fail_after(1):
34+
async for part in stream:
35+
buffer += part
36+
if buffer[-6:] == b"trio> ":
37+
break
38+
39+
await stream.send_all(b"exit\r\n")
40+
41+
nursery.cancel_scope.cancel()
42+
43+
monitor = trio_monitor.Monitor()
44+
run_with_monitor(main, monitor)
45+
46+
47+
@pytest.mark.xfail
48+
async def test_close_socket_before_start(nursery, autojump_clock):
49+
monitor = trio_monitor.Monitor()
50+
listeners = await nursery.start(trio.serve_tcp, monitor.listen_on_stream, 0)
51+
stream = await trio.testing.open_stream_to_socket_listener(listeners[0])
52+
await stream.aclose()
53+
await trio.sleep(1)
54+
55+
56+
def test_monitor_program():
57+
monitor = trio_monitor.Monitor()
58+
59+
async def main():
60+
async with trio.open_nursery() as nursery:
61+
tx, rx = trio.testing.memory_stream_one_way_pair()
62+
nursery.start_soon(monitor.do_monitor, tx)
63+
64+
await trio.testing.wait_all_tasks_blocked()
65+
monitor._is_monitoring = True
66+
67+
nursery.start_soon(trio.lowlevel.checkpoint)
68+
await trio.testing.wait_all_tasks_blocked()
69+
70+
monitor._is_monitoring = False
71+
buffer = b""
72+
while True:
73+
with trio.move_on_after(1) as cs:
74+
buffer += await rx.receive_some()
75+
76+
if cs.cancelled_caught:
77+
break
78+
79+
assert b"Task spawned: trio.lowlevel.checkpoint" in buffer
80+
assert b"Task scheduled: trio.lowlevel.checkpoint" in buffer
81+
assert b"Task stepping: trio.lowlevel.checkpoint" in buffer
82+
assert b"Task finished stepping: trio.lowlevel.checkpoint" in buffer
83+
assert b"Task exited: trio.lowlevel.checkpoint" in buffer
84+
85+
nursery.cancel_scope.cancel()
86+
87+
# copied from `main_loop` :/
88+
monitor._is_monitoring = False
89+
monitor._tx, monitor._rx = trio.open_memory_channel(100)
90+
91+
run_with_monitor(main, monitor)

src/trio_monitor/monitor.py

+12-14
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,13 @@
55
import string
66
import sys
77
import traceback
8+
import importlib.metadata
89
from types import FunctionType
910

10-
from async_generator._impl import ANextIter
11-
12-
from trio import Queue, WouldBlock, BrokenStreamError
11+
from trio import WouldBlock, open_memory_channel, BrokenResourceError
1312
from trio._highlevel_serve_listeners import _run_handler
14-
from ._version import __version__
1513
from trio.abc import Instrument
16-
from trio.hazmat import current_task, Task
14+
from trio.lowlevel import current_task, Task
1715

1816

1917
# inspiration: https://github.com/python-trio/trio/blob/master/notes-to-self/print-task-tree.py
@@ -26,9 +24,6 @@ def walk_coro_stack(coro):
2624
# A real coroutine
2725
yield coro.cr_frame, coro.cr_frame.f_lineno
2826
coro = coro.cr_await
29-
elif isinstance(coro, ANextIter):
30-
# black hole
31-
return
3227
else:
3328
# A generator decorated with @types.coroutine
3429
yield coro.gi_frame, coro.gi_frame.f_lineno
@@ -58,7 +53,7 @@ def __init__(self):
5853
self._is_monitoring = False
5954
# semi-arbitrary size, because otherwise we'll be dropping events
6055
# no clue how to make this better, alas.
61-
self._monitoring_queue = Queue(capacity=100)
56+
self._tx, self._rx = open_memory_channel(100)
6257

6358
@staticmethod
6459
def get_root_task() -> Task:
@@ -123,9 +118,11 @@ def _add_to_monitoring_queue(self, item):
123118
# if it's our own handler, skip it!
124119
if loc["handler"] == self.listen_on_stream:
125120
return
121+
if task.coro.cr_code == self.do_monitor.__code__:
122+
return
126123

127124
try:
128-
self._monitoring_queue.put_nowait(item)
125+
self._tx.send_nowait(item)
129126
except WouldBlock:
130127
return
131128

@@ -138,7 +135,7 @@ async def listen_on_stream(self, stream):
138135
async def main_loop(self, stream):
139136
"""Runs the main loop of the monitor."""
140137
# send the banner
141-
version = __version__
138+
version = importlib.metadata.version("trio")
142139
await stream.send_all(
143140
b"Connected to the Trio monitor, using "
144141
b"trio " + version.encode(encoding="ascii") + b"\n"
@@ -165,7 +162,7 @@ async def main_loop(self, stream):
165162
finally:
166163
self._is_monitoring = False
167164
# empty out the queue
168-
self._monitoring_queue = Queue(capacity=100)
165+
self._tx, self._rx = open_memory_channel(100)
169166

170167
try:
171168
fn = getattr(self, "command_{}".format(name))
@@ -195,7 +192,8 @@ async def main_loop(self, stream):
195192
async def do_monitor(self, stream):
196193
"""Livefeeds information about the running program."""
197194
prefix = "[FEED] "
198-
async for item in self._monitoring_queue:
195+
while True:
196+
item = await self._rx.receive()
199197
key = item[0]
200198

201199
if key == "task_spawned":
@@ -232,7 +230,7 @@ async def do_monitor(self, stream):
232230

233231
try:
234232
await stream.send_all(message.encode("ascii") + b"\n")
235-
except BrokenStreamError: # client disconnected on us
233+
except BrokenResourceError: # client disconnected on us
236234
return
237235

238236
# command definitions

0 commit comments

Comments
 (0)