|
9 | 9 | from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
|
10 | 10 | from arroyo.processing.strategies.commit import CommitOffsets
|
11 | 11 | from arroyo.processing.strategies.produce import Produce
|
12 |
| -from arroyo.processing.strategies.run_task import RunTask |
13 | 12 | from arroyo.processing.strategies.unfold import Unfold
|
14 | 13 | from arroyo.types import Commit, FilteredPayload, Message, Partition, Value
|
15 | 14 | from sentry_kafka_schemas.codecs import Codec
|
@@ -67,17 +66,11 @@ def create_with_partitions(
|
67 | 66 | else:
|
68 | 67 | produce_step = commit_step
|
69 | 68 |
|
70 |
| - # XXX: Remove after https://github.com/getsentry/arroyo/pull/427: Unfold |
71 |
| - # does not pass through the commit and there is no way to access it from |
72 |
| - # the generator function. |
73 |
| - zip_commit = RunTask( |
74 |
| - function=lambda m: (m.payload, m.committable), |
75 |
| - next_step=Unfold(generator=_unfold_segment, next_step=produce_step), |
76 |
| - ) |
| 69 | + unfold_step = Unfold(generator=_unfold_segment, next_step=produce_step) |
77 | 70 |
|
78 | 71 | return run_task_with_multiprocessing(
|
79 | 72 | function=_process_message,
|
80 |
| - next_step=zip_commit, |
| 73 | + next_step=unfold_step, |
81 | 74 | max_batch_size=self.max_batch_size,
|
82 | 75 | max_batch_time=self.max_batch_time,
|
83 | 76 | pool=self.pool,
|
@@ -105,13 +98,11 @@ def _process_message(message: Message[KafkaPayload]) -> list[Span]:
|
105 | 98 | # raise InvalidMessage(message.value.partition, message.value.offset)
|
106 | 99 |
|
107 | 100 |
|
108 |
| -def _unfold_segment(message: tuple[list[Span], Mapping[Partition, int]]): |
109 |
| - spans, committable = message |
110 |
| - last = len(spans) - 1 |
111 |
| - for i, span in enumerate(spans): |
| 101 | +def _unfold_segment(spans: list[Span]): |
| 102 | + for span in spans: |
112 | 103 | if span is not None:
|
113 | 104 | yield Value(
|
114 |
| - payload=KafkaPayload(key=None, value=orjson.dumps(span), headers=[]), |
115 |
| - committable=committable if i == last else {}, |
| 105 | + KafkaPayload(key=None, value=orjson.dumps(span), headers=[]), |
| 106 | + {}, |
116 | 107 | timestamp=None,
|
117 | 108 | )
|
0 commit comments