|
2 | 2 | # Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
3 | 3 | #
|
4 | 4 |
|
5 |
| -import datetime |
6 |
| -from dataclasses import InitVar, dataclass |
7 |
| -from typing import Any, ClassVar, Iterable, Mapping, MutableMapping, Optional, Union |
| 5 | +from typing import Any, Mapping |
8 | 6 |
|
9 |
| -from airbyte_cdk.models import SyncMode |
10 |
| -from airbyte_cdk.sources.declarative.incremental import Cursor |
11 |
| -from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState |
12 |
| -from airbyte_cdk.sources.streams.core import Stream |
| 7 | +from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream |
| 8 | +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString |
| 9 | +from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import LegacyToPerPartitionStateMigration |
| 10 | +from airbyte_cdk.sources.declarative.types import Config |
13 | 11 |
|
14 | 12 |
|
15 |
| -@dataclass |
16 |
| -class GreenHouseSlicer(Cursor): |
17 |
| - parameters: InitVar[Mapping[str, Any]] |
18 |
| - cursor_field: str |
19 |
| - request_cursor_field: str |
| 13 | +class GreenhouseStateMigration(LegacyToPerPartitionStateMigration): |
| 14 | + declarative_stream: DeclarativeStream |
| 15 | + config: Config |
20 | 16 |
|
21 |
| - START_DATETIME: ClassVar[str] = "1970-01-01T00:00:00.000Z" |
22 |
| - DATETIME_FORMAT: ClassVar[str] = "%Y-%m-%dT%H:%M:%S.%fZ" |
| 17 | + def __init__(self, declarative_stream: DeclarativeStream, config: Config): |
| 18 | + self._partition_router = declarative_stream.retriever.partition_router |
| 19 | + self._cursor = declarative_stream.incremental_sync |
| 20 | + self._config = config |
| 21 | + self._parameters = declarative_stream.parameters |
| 22 | + self._partition_key_field = InterpolatedString.create( |
| 23 | + self._get_partition_field(self._partition_router), parameters=self._parameters |
| 24 | + ).eval(self._config) |
| 25 | + self._cursor_field = InterpolatedString.create(self._cursor.cursor_field, parameters=self._parameters).eval(self._config) |
23 | 26 |
|
24 |
| - def __post_init__(self, parameters: Mapping[str, Any]): |
25 |
| - self._state = {} |
26 |
| - |
27 |
| - def stream_slices(self) -> Iterable[StreamSlice]: |
28 |
| - yield StreamSlice(partition={}, cursor_slice={self.request_cursor_field: self._state.get(self.cursor_field, self.START_DATETIME)}) |
29 |
| - |
30 |
| - def _max_dt_str(self, *args: str) -> Optional[str]: |
31 |
| - new_state_candidates = list(map(lambda x: datetime.datetime.strptime(x, self.DATETIME_FORMAT), filter(None, args))) |
32 |
| - if not new_state_candidates: |
33 |
| - return |
34 |
| - max_dt = max(new_state_candidates) |
35 |
| - # `.%f` gives us microseconds, we need milliseconds |
36 |
| - (dt, micro) = max_dt.strftime(self.DATETIME_FORMAT).split(".") |
37 |
| - return "%s.%03dZ" % (dt, int(micro[:-1:]) / 1000) |
38 |
| - |
39 |
| - def set_initial_state(self, stream_state: StreamState) -> None: |
40 |
| - cursor_value = stream_state.get(self.cursor_field) |
41 |
| - if cursor_value: |
42 |
| - self._state[self.cursor_field] = cursor_value |
43 |
| - |
44 |
| - def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None: |
45 |
| - stream_slice_value = stream_slice.get(self.cursor_field) |
46 |
| - current_state = self._state.get(self.cursor_field) |
47 |
| - record_cursor_value = most_recent_record and most_recent_record[self.cursor_field] |
48 |
| - max_dt = self._max_dt_str(stream_slice_value, current_state, record_cursor_value) |
49 |
| - if not max_dt: |
50 |
| - return |
51 |
| - self._state[self.cursor_field] = max_dt |
52 |
| - |
53 |
| - def should_be_synced(self, record: Record) -> bool: |
54 |
| - """ |
55 |
| - As of 2023-06-28, the expectation is that this method will only be used for semi-incremental and data feed and therefore the |
56 |
| - implementation is irrelevant for greenhouse |
| 27 | + def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: |
57 | 28 | """
|
58 |
| - return True |
59 |
| - |
60 |
| - def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: |
| 29 | + LegacyToPerPartitionStateMigration migrates partition keys as string, while real type of id in greenhouse is integer, |
| 30 | + which leads to partition mismatch. |
| 31 | + To prevent this type casting for partition key was added. |
61 | 32 | """
|
62 |
| - Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice |
63 |
| - """ |
64 |
| - first_cursor_value = first.get(self.cursor_field, "") |
65 |
| - second_cursor_value = second.get(self.cursor_field, "") |
66 |
| - if first_cursor_value and second_cursor_value: |
67 |
| - return first_cursor_value >= second_cursor_value |
68 |
| - elif first_cursor_value: |
69 |
| - return True |
70 |
| - else: |
71 |
| - return False |
72 |
| - |
73 |
| - def _parse_to_datetime(self, datetime_str: str) -> datetime.datetime: |
74 |
| - return datetime.datetime.strptime(datetime_str, self.DATETIME_FORMAT) |
75 |
| - |
76 |
| - def get_stream_state(self) -> StreamState: |
77 |
| - return self._state |
78 |
| - |
79 |
| - def get_request_params( |
80 |
| - self, |
81 |
| - *, |
82 |
| - stream_state: Optional[StreamState] = None, |
83 |
| - stream_slice: Optional[StreamSlice] = None, |
84 |
| - next_page_token: Optional[Mapping[str, Any]] = None, |
85 |
| - ) -> MutableMapping[str, Any]: |
86 |
| - return stream_slice or {} |
87 |
| - |
88 |
| - def get_request_headers(self, *args, **kwargs) -> Mapping[str, Any]: |
89 |
| - return {} |
90 |
| - |
91 |
| - def get_request_body_data(self, *args, **kwargs) -> Optional[Union[Mapping, str]]: |
92 |
| - return {} |
93 |
| - |
94 |
| - def get_request_body_json(self, *args, **kwargs) -> Optional[Mapping]: |
95 |
| - return {} |
96 |
| - |
97 |
| - |
98 |
| -@dataclass |
99 |
| -class GreenHouseSubstreamSlicer(GreenHouseSlicer): |
100 |
| - parent_stream: Stream |
101 |
| - stream_slice_field: str |
102 |
| - parent_key: str |
103 |
| - |
104 |
| - def stream_slices(self) -> Iterable[StreamSlice]: |
105 |
| - for parent_stream_slice in self.parent_stream.stream_slices( |
106 |
| - sync_mode=SyncMode.full_refresh, cursor_field=None, stream_state=self.get_stream_state() |
107 |
| - ): |
108 |
| - for parent_record in self.parent_stream.read_records( |
109 |
| - sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None |
110 |
| - ): |
111 |
| - parent_primary_key = parent_record.get(self.parent_key) |
112 |
| - |
113 |
| - partition = {self.stream_slice_field: parent_primary_key} |
114 |
| - cursor_slice = { |
115 |
| - self.request_cursor_field: self._state.get(str(parent_primary_key), {}).get(self.cursor_field, self.START_DATETIME) |
116 |
| - } |
117 |
| - |
118 |
| - yield StreamSlice(partition=partition, cursor_slice=cursor_slice) |
119 |
| - |
120 |
| - def set_initial_state(self, stream_state: StreamState) -> None: |
121 |
| - if self.stream_slice_field in stream_state: |
122 |
| - return |
123 |
| - substream_ids = map(lambda x: str(x), set(stream_state.keys()) | set(self._state.keys())) |
124 |
| - for id_ in substream_ids: |
125 |
| - self._state[id_] = { |
126 |
| - self.cursor_field: self._max_dt_str( |
127 |
| - stream_state.get(id_, {}).get(self.cursor_field), self._state.get(id_, {}).get(self.cursor_field) |
128 |
| - ) |
129 |
| - } |
130 |
| - |
131 |
| - def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None: |
132 |
| - if most_recent_record: |
133 |
| - substream_id = str(stream_slice[self.stream_slice_field]) |
134 |
| - current_state = self._state.get(substream_id, {}).get(self.cursor_field) |
135 |
| - last_state = most_recent_record[self.cursor_field] |
136 |
| - max_dt = self._max_dt_str(last_state, current_state) |
137 |
| - self._state[substream_id] = {self.cursor_field: max_dt} |
138 |
| - return |
139 |
| - |
140 |
| - def should_be_synced(self, record: Record) -> bool: |
141 |
| - """ |
142 |
| - As of 2023-06-28, the expectation is that this method will only be used for semi-incremental and data feed and therefore the |
143 |
| - implementation is irrelevant for greenhouse |
144 |
| - """ |
145 |
| - return True |
146 |
| - |
147 |
| - def get_request_params( |
148 |
| - self, |
149 |
| - *, |
150 |
| - stream_state: Optional[StreamState] = None, |
151 |
| - stream_slice: Optional[StreamSlice] = None, |
152 |
| - next_page_token: Optional[Mapping[str, Any]] = None, |
153 |
| - ) -> MutableMapping[str, Any]: |
154 |
| - # ignore other fields in a slice |
155 |
| - return {self.request_cursor_field: stream_slice[self.request_cursor_field]} |
| 33 | + states = [ |
| 34 | + {"partition": {self._partition_key_field: int(key), "parent_slice": {}}, "cursor": value} for key, value in stream_state.items() |
| 35 | + ] |
| 36 | + return {"states": states} |
0 commit comments