From f37763d8b8aa3d40a7365d3fba3e8e72016f5a87 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 10 Mar 2025 13:13:24 +0200 Subject: [PATCH 1/4] Add Export Lookback Window --- .../connectors/source-mixpanel/metadata.yaml | 2 +- .../connectors/source-mixpanel/pyproject.toml | 2 +- .../source-mixpanel/source_mixpanel/source.py | 5 +++++ .../source-mixpanel/source_mixpanel/spec.json | 8 ++++++++ .../source-mixpanel/source_mixpanel/streams/base.py | 11 ++++++++--- docs/integrations/sources/mixpanel.md | 4 +++- 6 files changed, 26 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/metadata.yaml b/airbyte-integrations/connectors/source-mixpanel/metadata.yaml index 6539ae6ab0162..b3f33911d5791 100644 --- a/airbyte-integrations/connectors/source-mixpanel/metadata.yaml +++ b/airbyte-integrations/connectors/source-mixpanel/metadata.yaml @@ -11,7 +11,7 @@ data: connectorSubtype: api connectorType: source definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a - dockerImageTag: 3.4.21 + dockerImageTag: 3.5.0 dockerRepository: airbyte/source-mixpanel documentationUrl: https://docs.airbyte.com/integrations/sources/mixpanel githubIssueLabel: source-mixpanel diff --git a/airbyte-integrations/connectors/source-mixpanel/pyproject.toml b/airbyte-integrations/connectors/source-mixpanel/pyproject.toml index 3ae2727740510..0b5a6766c2176 100644 --- a/airbyte-integrations/connectors/source-mixpanel/pyproject.toml +++ b/airbyte-integrations/connectors/source-mixpanel/pyproject.toml @@ -3,7 +3,7 @@ requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "3.4.21" +version = "3.5.0" name = "source-mixpanel" description = "Source implementation for Mixpanel." authors = ["Airbyte "] diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py index 1e5dccab2eeb7..1ead08d0d2ae0 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -83,6 +83,7 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]): date_window_size, project_id, page_size, + export_lookback_window, ) = ( config.get("project_timezone", "US/Pacific"), config.get("start_date"), @@ -93,6 +94,7 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]): config.get("date_window_size", 30), config.get("credentials", dict()).get("project_id"), config.get("page_size", 1000), + config.get("export_lookback_window", 0) ) try: project_timezone = pendulum.timezone(project_timezone) @@ -109,6 +111,8 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]): raise_config_error("Please provide a valid integer for the `Attribution window` parameter.") if not isinstance(date_window_size, int) or date_window_size < 1: raise_config_error("Please provide a valid integer for the `Date slicing window` parameter.") + if not isinstance(export_lookback_window, int) or export_lookback_window < 0: + raise_config_error("Please provide a valid integer for the `Export Lookback Window` parameter.") auth = self.get_authenticator(config) if isinstance(auth, TokenAuthenticatorBase64) and project_id: @@ -126,5 +130,6 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]): config["date_window_size"] = date_window_size config["project_id"] = project_id config["page_size"] = page_size + config["export_lookback_window"] = export_lookback_window return config diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json index 6ecd392f98778..7f7b88910791d 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json @@ -127,6 +127,14 @@ "type": "integer", "minimum": 1, "default": 1000 + }, + "export_lookback_window": { + "order": 10, + "title": "Export Lookback Window", + "description": "The number of seconds to look back from the last synced timestamp during incremental syncs of the Export stream. This ensures no data is missed due to delays in event recording. Default is 0 seconds. Must be a non-negative integer.", + "type": "integer", + "minimum": 0, + "default": 0 } } } diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py index 52563919279d7..1a507881d3b60 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py @@ -59,6 +59,7 @@ def __init__( end_date: Optional[Date] = None, date_window_size: int = 30, # in days attribution_window: int = 0, # in days + export_lookback_window: int = 0, # in seconds select_properties_by_default: bool = True, project_id: int = None, reqs_per_hour_limit: int = DEFAULT_REQS_PER_HOUR_LIMIT, @@ -68,6 +69,7 @@ def __init__( self.end_date = end_date self.date_window_size = date_window_size self.attribution_window = attribution_window + self.export_lookback_window = export_lookback_window self.additional_properties = select_properties_by_default self.region = region self.project_timezone = project_timezone @@ -166,11 +168,14 @@ def stream_slices( # Remove time part from state because API accept 'from_date' param in date format only ('YYYY-MM-DD') # It also means that sync returns duplicated entries for the date from the state (date range is inclusive) cursor_value = stream_state[self.cursor_field] - stream_state_date = pendulum.parse(stream_state[self.cursor_field]).date() - start_date = max(start_date, stream_state_date) + # This stream is only used for Export stream, so we use export_lookback_window here + cursor_value = (pendulum.parse(cursor_value) - timedelta(seconds=self.export_lookback_window)).to_iso8601_string() + stream_state_date = pendulum.parse(stream_state[self.cursor_field]) + start_date = max(start_date, stream_state_date.date()) + final_lookback_window = max(self.export_lookback_window, self.attribution_window * 24 * 60 * 60) # move start_date back days to sync data since that time as well - start_date = start_date - timedelta(days=self.attribution_window) + start_date = start_date - timedelta(seconds=final_lookback_window) # end_date cannot be later than today end_date = min(self.end_date, pendulum.today(tz=self.project_timezone).date()) diff --git a/docs/integrations/sources/mixpanel.md b/docs/integrations/sources/mixpanel.md index 71924a92b2f01..fbe3a9d331294 100644 --- a/docs/integrations/sources/mixpanel.md +++ b/docs/integrations/sources/mixpanel.md @@ -20,7 +20,8 @@ To set up the Mixpanel source connector, you'll need a Mixpanel [Service Account 10. For **End Date**, enter the date in YYYY-MM-DD format. 11. For **Region**, enter the [region](https://help.mixpanel.com/hc/en-us/articles/360039135652-Data-Residency-in-EU) for your Mixpanel project. 12. For **Date slicing window**, enter the number of days to slice through data. If you encounter RAM usage issues due to a huge amount of data in each window, try using a lower value for this parameter. -13. Click **Set up source**. +13. For **Export Lookback Window**, enter the number of seconds to look back from the last synced timestamp during incremental syncs of the Export stream. This ensures no data is missed due to event recording delays. Default is 0 seconds. +14. Click **Set up source**. ## Supported sync modes @@ -58,6 +59,7 @@ Syncing huge date windows may take longer due to Mixpanel's low API rate-limits | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.5.0 | 2025-03-10 | [55224](https://github.com/airbytehq/airbyte/pull/55224) | Add Export Lookback Window | | 3.4.21 | 2025-03-06 | [55224](https://github.com/airbytehq/airbyte/pull/55224) | Migrate streams to latest endpoint | | 3.4.20 | 2025-03-01 | [54769](https://github.com/airbytehq/airbyte/pull/54769) | Update dependencies | | 3.4.19 | 2025-02-22 | [54319](https://github.com/airbytehq/airbyte/pull/54319) | Update dependencies | From e274afcbf07c3f38c7e77ca6b78863bdb0e00004 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 10 Mar 2025 13:15:30 +0200 Subject: [PATCH 2/4] Update PR url --- docs/integrations/sources/mixpanel.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/sources/mixpanel.md b/docs/integrations/sources/mixpanel.md index fbe3a9d331294..22c6fb91da878 100644 --- a/docs/integrations/sources/mixpanel.md +++ b/docs/integrations/sources/mixpanel.md @@ -59,7 +59,7 @@ Syncing huge date windows may take longer due to Mixpanel's low API rate-limits | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.5.0 | 2025-03-10 | [55224](https://github.com/airbytehq/airbyte/pull/55224) | Add Export Lookback Window | +| 3.5.0 | 2025-03-10 | [55673](https://github.com/airbytehq/airbyte/pull/55673) | Add Export Lookback Window | | 3.4.21 | 2025-03-06 | [55224](https://github.com/airbytehq/airbyte/pull/55224) | Migrate streams to latest endpoint | | 3.4.20 | 2025-03-01 | [54769](https://github.com/airbytehq/airbyte/pull/54769) | Update dependencies | | 3.4.19 | 2025-02-22 | [54319](https://github.com/airbytehq/airbyte/pull/54319) | Update dependencies | From c33e7fe3bc96768351c7359eda47353abb66c9fc Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 10 Mar 2025 13:42:16 +0200 Subject: [PATCH 3/4] Add unit test for new parameter --- .../unit_tests/test_streams.py | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py index f8b372ec9e93b..ed7d6f0c7daee 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py @@ -737,3 +737,59 @@ def test_export_iter_dicts(config): assert list(stream.iter_dicts([record_string, record_string[:2], record_string[2:], record_string])) == [record, record, record] # drop record parts because they are not standing nearby assert list(stream.iter_dicts([record_string, record_string[:2], record_string, record_string[2:]])) == [record, record] + + +def test_export_stream_lookback_window(requests_mock, export_response, config_raw, mocker): + """Test that export_lookback_window correctly adjusts the start date during incremental sync and verifies slice parameters""" + config_raw["export_lookback_window"] = 7200 # 1 hour lookback + config_raw["start_date"] = "2021-06-01T00:00:00Z" + config_raw["end_date"] = "2021-07-10T00:00:00Z" + + stream = init_stream("export", config=config_raw) + + # Mock get_json_schema to avoid actual schema fetching + mocker.patch.object(Export, "get_json_schema", return_value={ + "type": "object", + "properties": { + "event": {"type": "string"}, + "time": {"type": "string"}, + "distinct_id": {"type": "string"}, + "insert_id": {"type": "string"} + } + }) + + # Mock response with two records at different times in JSONL format + export_response_multiple = ( + b'{"event": "Viewed Page", "properties": {"time": 1623860880, "distinct_id": "user1", "$insert_id": "insert1"}}\n' + b'{"event": "Clicked Button", "properties": {"time": 1623864480, "distinct_id": "user2", "$insert_id": "insert2"}}' + ) + + requests_mock.register_uri( + "GET", + get_url_to_mock(stream), + content=export_response_multiple, # Use content directly for bytes + status_code=200 + ) + + # State with a timestamp 1 hour ago from the latest record + stream_state = {"time": "2021-06-16T16:28:00Z"} + stream_slices = list(stream.stream_slices(sync_mode=SyncMode.incremental, stream_state=stream_state)) + assert len(stream_slices) > 0 # Ensure we have at least one slice + stream_slice = stream_slices[0] + + # Verify slice parameters + expected_start = pendulum.parse("2021-06-16T14:28:00Z") # 16:28:00 - 2 hours due to lookback + expected_end = pendulum.parse("2021-07-10T00:00:00Z") # From config end_date + + # Note: start_date might differ due to date_window_size slicing, adjust if needed + assert pendulum.parse(stream_slice["start_date"]) == pendulum.parse("2021-06-11T00:00:00Z") # Adjusted by attribution_window + assert pendulum.parse(stream_slice["end_date"]) == expected_end + assert pendulum.parse(stream_slice["time"]) == expected_start + + # Read records and verify both are included due to lookback + records = list(stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice)) + assert len(records) == 2 + + # Verify updated state is set to the latest record time + new_state = stream.get_updated_state(stream_state, records[-1]) + assert new_state["time"] == "2021-06-16T17:28:00Z" From d14d0e765c18e4441effb9811ea1c223944cc635 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Mon, 10 Mar 2025 11:48:08 +0000 Subject: [PATCH 4/4] chore: auto-fix lint and format issues --- .../source-mixpanel/source_mixpanel/source.py | 2 +- .../unit_tests/test_streams.py | 24 +++++++++++-------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py index 1ead08d0d2ae0..d6aa41b78aeed 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -94,7 +94,7 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]): config.get("date_window_size", 30), config.get("credentials", dict()).get("project_id"), config.get("page_size", 1000), - config.get("export_lookback_window", 0) + config.get("export_lookback_window", 0), ) try: project_timezone = pendulum.timezone(project_timezone) diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py index ed7d6f0c7daee..04c5f71768f6d 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py @@ -748,15 +748,19 @@ def test_export_stream_lookback_window(requests_mock, export_response, config_ra stream = init_stream("export", config=config_raw) # Mock get_json_schema to avoid actual schema fetching - mocker.patch.object(Export, "get_json_schema", return_value={ - "type": "object", - "properties": { - "event": {"type": "string"}, - "time": {"type": "string"}, - "distinct_id": {"type": "string"}, - "insert_id": {"type": "string"} - } - }) + mocker.patch.object( + Export, + "get_json_schema", + return_value={ + "type": "object", + "properties": { + "event": {"type": "string"}, + "time": {"type": "string"}, + "distinct_id": {"type": "string"}, + "insert_id": {"type": "string"}, + }, + }, + ) # Mock response with two records at different times in JSONL format export_response_multiple = ( @@ -768,7 +772,7 @@ def test_export_stream_lookback_window(requests_mock, export_response, config_ra "GET", get_url_to_mock(stream), content=export_response_multiple, # Use content directly for bytes - status_code=200 + status_code=200, ) # State with a timestamp 1 hour ago from the latest record