Skip to content

Commit f2163e8

Browse files
authored
Log entire stdout and stderr for terminated backend worker process (#3036)
* Log entire stdout and stderr for terminated backend worker process * Handle stout and stderr reader thread gradeful termination * Fix Java formatting * Add integration test * Remove redundant logic to terminate stdout and stderr reader threads
1 parent 1ec8932 commit f2163e8

File tree

4 files changed

+195
-22
lines changed

4 files changed

+195
-22
lines changed

frontend/server/src/main/java/org/pytorch/serve/wlm/WorkerLifeCycle.java

+1-21
Original file line numberDiff line numberDiff line change
@@ -306,22 +306,10 @@ private void attachRunner(
306306
argl.add(String.valueOf(1));
307307
}
308308

309-
public synchronized void terminateIOStreams() {
310-
if (errReader != null) {
311-
logger.warn("terminateIOStreams() threadName={}", errReader.getName());
312-
errReader.terminate();
313-
}
314-
if (outReader != null) {
315-
logger.warn("terminateIOStreams() threadName={}", outReader.getName());
316-
outReader.terminate();
317-
}
318-
}
319-
320309
public synchronized void exit() {
321310
if (process != null) {
322311
process.destroyForcibly();
323312
connector.clean();
324-
terminateIOStreams();
325313
}
326314
}
327315

@@ -373,19 +361,11 @@ public ReaderThread(String name, InputStream is, boolean error, WorkerLifeCycle
373361
this.metricCache = MetricCache.getInstance();
374362
}
375363

376-
public void terminate() {
377-
isRunning.set(false);
378-
}
379-
380364
@Override
381365
public void run() {
382366
try (Scanner scanner = new Scanner(is, StandardCharsets.UTF_8.name())) {
383-
while (isRunning.get() && scanner.hasNext()) {
367+
while (scanner.hasNextLine()) {
384368
String result = scanner.nextLine();
385-
if (result == null) {
386-
break;
387-
}
388-
389369
Matcher matcher = METRIC_PATTERN.matcher(result);
390370
if (matcher.matches()) {
391371
logger.info("result={}, pattern={}", result, matcher.group(2));

frontend/server/src/main/java/org/pytorch/serve/wlm/WorkerThread.java

-1
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,6 @@ public void shutdown() {
481481
}
482482
}
483483
backendChannel.clear();
484-
lifeCycle.terminateIOStreams();
485484
Thread thread = currentThread.getAndSet(null);
486485
if (thread != null) {
487486
thread.interrupt();

requirements/developer.txt

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pytest==7.3.1
44
pylint==3.0.3
55
pytest-mock==3.14.0
66
pytest-cov==4.1.0
7+
pytest-timeout==2.3.1
78
grpcio==1.62.1
89
protobuf==4.25.1
910
grpcio-tools==1.60.0
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
import shutil
2+
from pathlib import Path
3+
from unittest.mock import patch
4+
5+
import pytest
6+
import test_utils
7+
from model_archiver import ModelArchiverConfig
8+
9+
CURR_FILE_PATH = Path(__file__).parent
10+
REPO_ROOT_DIR = CURR_FILE_PATH.parent.parent
11+
12+
MODEL_PY = """
13+
import torch
14+
import torch.nn as nn
15+
16+
class Foo(nn.Module):
17+
def __init__(self):
18+
super().__init__()
19+
20+
def forward(self, x):
21+
return x
22+
"""
23+
24+
HANDLER_PY = """
25+
from typing import List, Dict, Any
26+
from ts.context import Context
27+
28+
29+
class FailingModel(object):
30+
def __init__(self) -> None:
31+
pass
32+
33+
def initialize(self, context: Context) -> None:
34+
# Deliberate bug in handler with nested calls to test traceback logging
35+
self.call1()
36+
37+
def handle(self, data: List[Dict[str, Any]], context: Context):
38+
return None
39+
40+
def call1(self):
41+
self.call2()
42+
43+
def call2(self):
44+
self.call3()
45+
46+
def call3(self):
47+
self.call4()
48+
49+
def call4(self):
50+
self.call5()
51+
52+
def call5(self):
53+
assert False
54+
"""
55+
56+
MODEL_CONFIG_YAML = """
57+
maxRetryTimeoutInSec: 300
58+
"""
59+
60+
CONFIG_PROPERTIES = """
61+
default_response_timeout=120
62+
"""
63+
64+
65+
@pytest.fixture(scope="module")
66+
def model_name():
67+
yield "test_model"
68+
69+
70+
@pytest.fixture(scope="module")
71+
def work_dir(tmp_path_factory, model_name):
72+
return Path(tmp_path_factory.mktemp(model_name))
73+
74+
75+
@pytest.fixture(scope="module")
76+
def torchserve(model_store, work_dir):
77+
test_utils.torchserve_cleanup()
78+
79+
config_properties_file = work_dir / "config.properties"
80+
config_properties_file.write_text(CONFIG_PROPERTIES)
81+
82+
pipe = test_utils.start_torchserve(
83+
model_store=model_store,
84+
no_config_snapshots=True,
85+
gen_mar=False,
86+
snapshot_file=config_properties_file.as_posix(),
87+
)
88+
89+
yield pipe
90+
91+
test_utils.torchserve_cleanup()
92+
93+
94+
@pytest.fixture(scope="module", name="mar_file_path")
95+
def create_mar_file(work_dir, model_archiver, model_name):
96+
mar_file_path = work_dir.joinpath(model_name + ".mar")
97+
98+
model_py_file = work_dir / "model.py"
99+
model_py_file.write_text(MODEL_PY)
100+
101+
model_config_yaml_file = work_dir / "model_config.yaml"
102+
model_config_yaml_file.write_text(MODEL_CONFIG_YAML)
103+
104+
handler_py_file = work_dir / "handler.py"
105+
handler_py_file.write_text(HANDLER_PY)
106+
107+
config = ModelArchiverConfig(
108+
model_name=model_name,
109+
version="1.0",
110+
serialized_file=None,
111+
model_file=model_py_file.as_posix(),
112+
handler=handler_py_file.as_posix(),
113+
extra_files=None,
114+
export_path=work_dir,
115+
requirements_file=None,
116+
runtime="python",
117+
force=False,
118+
archive_format="default",
119+
config_file=model_config_yaml_file.as_posix(),
120+
)
121+
122+
with patch("archiver.ArgParser.export_model_args_parser", return_value=config):
123+
model_archiver.generate_model_archive()
124+
125+
assert mar_file_path.exists()
126+
127+
yield mar_file_path.as_posix()
128+
129+
# Clean up files
130+
mar_file_path.unlink(missing_ok=True)
131+
132+
133+
@pytest.fixture(scope="module", name="model_name")
134+
def register_model(mar_file_path, model_store, torchserve):
135+
"""
136+
Register the model in torchserve
137+
"""
138+
shutil.copy(mar_file_path, model_store)
139+
140+
file_name = Path(mar_file_path).name
141+
142+
model_name = Path(file_name).stem
143+
144+
params = (
145+
("model_name", model_name),
146+
("url", file_name),
147+
("initial_workers", "1"),
148+
("synchronous", "false"),
149+
("batch_size", "1"),
150+
)
151+
152+
test_utils.reg_resp = test_utils.register_model_with_params(params)
153+
154+
yield model_name, torchserve
155+
156+
test_utils.unregister_model(model_name)
157+
158+
159+
@pytest.mark.timeout(60)
160+
def test_handler_traceback_logging(model_name):
161+
"""
162+
Full circle test with torchserve
163+
"""
164+
165+
model_name, pipe = model_name
166+
167+
traceback = [
168+
"Traceback (most recent call last):",
169+
"line 12, in initialize",
170+
"self.call1()",
171+
"line 18, in call1",
172+
"self.call2()",
173+
"line 21, in call2",
174+
"self.call3()",
175+
"line 24, in call3",
176+
"self.call4()",
177+
"line 27, in call4",
178+
"self.call5()",
179+
"line 30, in call5",
180+
"assert False",
181+
"AssertionError",
182+
]
183+
184+
# Test traceback logging for first attempt and three retries to start worker
185+
for _ in range(4):
186+
logs = []
187+
while True:
188+
logs.append(pipe.get())
189+
if "AssertionError" in logs[-1]:
190+
break
191+
192+
for line in traceback:
193+
assert any(line in log for log in logs)

0 commit comments

Comments
 (0)