diff --git a/airbyte-integrations/connectors/source-mixpanel/metadata.yaml b/airbyte-integrations/connectors/source-mixpanel/metadata.yaml index 6539ae6ab0162..b3f33911d5791 100644 --- a/airbyte-integrations/connectors/source-mixpanel/metadata.yaml +++ b/airbyte-integrations/connectors/source-mixpanel/metadata.yaml @@ -11,7 +11,7 @@ data: connectorSubtype: api connectorType: source definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a - dockerImageTag: 3.4.21 + dockerImageTag: 3.5.0 dockerRepository: airbyte/source-mixpanel documentationUrl: https://docs.airbyte.com/integrations/sources/mixpanel githubIssueLabel: source-mixpanel diff --git a/airbyte-integrations/connectors/source-mixpanel/pyproject.toml b/airbyte-integrations/connectors/source-mixpanel/pyproject.toml index 3ae2727740510..0b5a6766c2176 100644 --- a/airbyte-integrations/connectors/source-mixpanel/pyproject.toml +++ b/airbyte-integrations/connectors/source-mixpanel/pyproject.toml @@ -3,7 +3,7 @@ requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "3.4.21" +version = "3.5.0" name = "source-mixpanel" description = "Source implementation for Mixpanel." authors = ["Airbyte <contact@airbyte.io>"] diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py index 1e5dccab2eeb7..d6aa41b78aeed 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -83,6 +83,7 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]): date_window_size, project_id, page_size, + export_lookback_window, ) = ( config.get("project_timezone", "US/Pacific"), config.get("start_date"), @@ -93,6 +94,7 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]): config.get("date_window_size", 30), config.get("credentials", dict()).get("project_id"), config.get("page_size", 1000), + config.get("export_lookback_window", 0), ) try: project_timezone = pendulum.timezone(project_timezone) @@ -109,6 +111,8 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]): raise_config_error("Please provide a valid integer for the `Attribution window` parameter.") if not isinstance(date_window_size, int) or date_window_size < 1: raise_config_error("Please provide a valid integer for the `Date slicing window` parameter.") + if not isinstance(export_lookback_window, int) or export_lookback_window < 0: + raise_config_error("Please provide a valid integer for the `Export Lookback Window` parameter.") auth = self.get_authenticator(config) if isinstance(auth, TokenAuthenticatorBase64) and project_id: @@ -126,5 +130,6 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]): config["date_window_size"] = date_window_size config["project_id"] = project_id config["page_size"] = page_size + config["export_lookback_window"] = export_lookback_window return config diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json index 6ecd392f98778..7f7b88910791d 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json @@ -127,6 +127,14 @@ "type": "integer", "minimum": 1, "default": 1000 + }, + "export_lookback_window": { + "order": 10, + "title": "Export Lookback Window", + "description": "The number of seconds to look back from the last synced timestamp during incremental syncs of the Export stream. This ensures no data is missed due to delays in event recording. Default is 0 seconds. Must be a non-negative integer.", + "type": "integer", + "minimum": 0, + "default": 0 } } } diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py index 52563919279d7..1a507881d3b60 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py @@ -59,6 +59,7 @@ def __init__( end_date: Optional[Date] = None, date_window_size: int = 30, # in days attribution_window: int = 0, # in days + export_lookback_window: int = 0, # in seconds select_properties_by_default: bool = True, project_id: int = None, reqs_per_hour_limit: int = DEFAULT_REQS_PER_HOUR_LIMIT, @@ -68,6 +69,7 @@ def __init__( self.end_date = end_date self.date_window_size = date_window_size self.attribution_window = attribution_window + self.export_lookback_window = export_lookback_window self.additional_properties = select_properties_by_default self.region = region self.project_timezone = project_timezone @@ -166,11 +168,14 @@ def stream_slices( # Remove time part from state because API accept 'from_date' param in date format only ('YYYY-MM-DD') # It also means that sync returns duplicated entries for the date from the state (date range is inclusive) cursor_value = stream_state[self.cursor_field] - stream_state_date = pendulum.parse(stream_state[self.cursor_field]).date() - start_date = max(start_date, stream_state_date) + # This stream is only used for Export stream, so we use export_lookback_window here + cursor_value = (pendulum.parse(cursor_value) - timedelta(seconds=self.export_lookback_window)).to_iso8601_string() + stream_state_date = pendulum.parse(stream_state[self.cursor_field]) + start_date = max(start_date, stream_state_date.date()) + final_lookback_window = max(self.export_lookback_window, self.attribution_window * 24 * 60 * 60) # move start_date back <attribution_window> days to sync data since that time as well - start_date = start_date - timedelta(days=self.attribution_window) + start_date = start_date - timedelta(seconds=final_lookback_window) # end_date cannot be later than today end_date = min(self.end_date, pendulum.today(tz=self.project_timezone).date()) diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py index f8b372ec9e93b..04c5f71768f6d 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py @@ -737,3 +737,63 @@ def test_export_iter_dicts(config): assert list(stream.iter_dicts([record_string, record_string[:2], record_string[2:], record_string])) == [record, record, record] # drop record parts because they are not standing nearby assert list(stream.iter_dicts([record_string, record_string[:2], record_string, record_string[2:]])) == [record, record] + + +def test_export_stream_lookback_window(requests_mock, export_response, config_raw, mocker): + """Test that export_lookback_window correctly adjusts the start date during incremental sync and verifies slice parameters""" + config_raw["export_lookback_window"] = 7200 # 1 hour lookback + config_raw["start_date"] = "2021-06-01T00:00:00Z" + config_raw["end_date"] = "2021-07-10T00:00:00Z" + + stream = init_stream("export", config=config_raw) + + # Mock get_json_schema to avoid actual schema fetching + mocker.patch.object( + Export, + "get_json_schema", + return_value={ + "type": "object", + "properties": { + "event": {"type": "string"}, + "time": {"type": "string"}, + "distinct_id": {"type": "string"}, + "insert_id": {"type": "string"}, + }, + }, + ) + + # Mock response with two records at different times in JSONL format + export_response_multiple = ( + b'{"event": "Viewed Page", "properties": {"time": 1623860880, "distinct_id": "user1", "$insert_id": "insert1"}}\n' + b'{"event": "Clicked Button", "properties": {"time": 1623864480, "distinct_id": "user2", "$insert_id": "insert2"}}' + ) + + requests_mock.register_uri( + "GET", + get_url_to_mock(stream), + content=export_response_multiple, # Use content directly for bytes + status_code=200, + ) + + # State with a timestamp 1 hour ago from the latest record + stream_state = {"time": "2021-06-16T16:28:00Z"} + stream_slices = list(stream.stream_slices(sync_mode=SyncMode.incremental, stream_state=stream_state)) + assert len(stream_slices) > 0 # Ensure we have at least one slice + stream_slice = stream_slices[0] + + # Verify slice parameters + expected_start = pendulum.parse("2021-06-16T14:28:00Z") # 16:28:00 - 2 hours due to lookback + expected_end = pendulum.parse("2021-07-10T00:00:00Z") # From config end_date + + # Note: start_date might differ due to date_window_size slicing, adjust if needed + assert pendulum.parse(stream_slice["start_date"]) == pendulum.parse("2021-06-11T00:00:00Z") # Adjusted by attribution_window + assert pendulum.parse(stream_slice["end_date"]) == expected_end + assert pendulum.parse(stream_slice["time"]) == expected_start + + # Read records and verify both are included due to lookback + records = list(stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice)) + assert len(records) == 2 + + # Verify updated state is set to the latest record time + new_state = stream.get_updated_state(stream_state, records[-1]) + assert new_state["time"] == "2021-06-16T17:28:00Z" diff --git a/docs/integrations/sources/mixpanel.md b/docs/integrations/sources/mixpanel.md index 71924a92b2f01..22c6fb91da878 100644 --- a/docs/integrations/sources/mixpanel.md +++ b/docs/integrations/sources/mixpanel.md @@ -20,7 +20,8 @@ To set up the Mixpanel source connector, you'll need a Mixpanel [Service Account 10. For **End Date**, enter the date in YYYY-MM-DD format. 11. For **Region**, enter the [region](https://help.mixpanel.com/hc/en-us/articles/360039135652-Data-Residency-in-EU) for your Mixpanel project. 12. For **Date slicing window**, enter the number of days to slice through data. If you encounter RAM usage issues due to a huge amount of data in each window, try using a lower value for this parameter. -13. Click **Set up source**. +13. For **Export Lookback Window**, enter the number of seconds to look back from the last synced timestamp during incremental syncs of the Export stream. This ensures no data is missed due to event recording delays. Default is 0 seconds. +14. Click **Set up source**. ## Supported sync modes @@ -58,6 +59,7 @@ Syncing huge date windows may take longer due to Mixpanel's low API rate-limits | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.5.0 | 2025-03-10 | [55673](https://github.com/airbytehq/airbyte/pull/55673) | Add Export Lookback Window | | 3.4.21 | 2025-03-06 | [55224](https://github.com/airbytehq/airbyte/pull/55224) | Migrate streams to latest endpoint | | 3.4.20 | 2025-03-01 | [54769](https://github.com/airbytehq/airbyte/pull/54769) | Update dependencies | | 3.4.19 | 2025-02-22 | [54319](https://github.com/airbytehq/airbyte/pull/54319) | Update dependencies |