Skip to content

Commit b103b46

Browse files
authored
feat(source-mixpanel): Add Export Lookback Window (#55673)
1 parent 641cf9b commit b103b46

File tree

7 files changed

+86
-6
lines changed

7 files changed

+86
-6
lines changed

airbyte-integrations/connectors/source-mixpanel/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ data:
1111
connectorSubtype: api
1212
connectorType: source
1313
definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
14-
dockerImageTag: 3.4.21
14+
dockerImageTag: 3.5.0
1515
dockerRepository: airbyte/source-mixpanel
1616
documentationUrl: https://docs.airbyte.com/integrations/sources/mixpanel
1717
githubIssueLabel: source-mixpanel

airbyte-integrations/connectors/source-mixpanel/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = ["poetry-core>=1.0.0"]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "3.4.21"
6+
version = "3.5.0"
77
name = "source-mixpanel"
88
description = "Source implementation for Mixpanel."
99
authors = ["Airbyte <[email protected]>"]

airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py

+5
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]):
8383
date_window_size,
8484
project_id,
8585
page_size,
86+
export_lookback_window,
8687
) = (
8788
config.get("project_timezone", "US/Pacific"),
8889
config.get("start_date"),
@@ -93,6 +94,7 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]):
9394
config.get("date_window_size", 30),
9495
config.get("credentials", dict()).get("project_id"),
9596
config.get("page_size", 1000),
97+
config.get("export_lookback_window", 0),
9698
)
9799
try:
98100
project_timezone = pendulum.timezone(project_timezone)
@@ -109,6 +111,8 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]):
109111
raise_config_error("Please provide a valid integer for the `Attribution window` parameter.")
110112
if not isinstance(date_window_size, int) or date_window_size < 1:
111113
raise_config_error("Please provide a valid integer for the `Date slicing window` parameter.")
114+
if not isinstance(export_lookback_window, int) or export_lookback_window < 0:
115+
raise_config_error("Please provide a valid integer for the `Export Lookback Window` parameter.")
112116

113117
auth = self.get_authenticator(config)
114118
if isinstance(auth, TokenAuthenticatorBase64) and project_id:
@@ -126,5 +130,6 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]):
126130
config["date_window_size"] = date_window_size
127131
config["project_id"] = project_id
128132
config["page_size"] = page_size
133+
config["export_lookback_window"] = export_lookback_window
129134

130135
return config

airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json

+8
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,14 @@
127127
"type": "integer",
128128
"minimum": 1,
129129
"default": 1000
130+
},
131+
"export_lookback_window": {
132+
"order": 10,
133+
"title": "Export Lookback Window",
134+
"description": "The number of seconds to look back from the last synced timestamp during incremental syncs of the Export stream. This ensures no data is missed due to delays in event recording. Default is 0 seconds. Must be a non-negative integer.",
135+
"type": "integer",
136+
"minimum": 0,
137+
"default": 0
130138
}
131139
}
132140
}

airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def __init__(
5959
end_date: Optional[Date] = None,
6060
date_window_size: int = 30, # in days
6161
attribution_window: int = 0, # in days
62+
export_lookback_window: int = 0, # in seconds
6263
select_properties_by_default: bool = True,
6364
project_id: int = None,
6465
reqs_per_hour_limit: int = DEFAULT_REQS_PER_HOUR_LIMIT,
@@ -68,6 +69,7 @@ def __init__(
6869
self.end_date = end_date
6970
self.date_window_size = date_window_size
7071
self.attribution_window = attribution_window
72+
self.export_lookback_window = export_lookback_window
7173
self.additional_properties = select_properties_by_default
7274
self.region = region
7375
self.project_timezone = project_timezone
@@ -166,11 +168,14 @@ def stream_slices(
166168
# Remove time part from state because API accept 'from_date' param in date format only ('YYYY-MM-DD')
167169
# It also means that sync returns duplicated entries for the date from the state (date range is inclusive)
168170
cursor_value = stream_state[self.cursor_field]
169-
stream_state_date = pendulum.parse(stream_state[self.cursor_field]).date()
170-
start_date = max(start_date, stream_state_date)
171+
# This stream is only used for Export stream, so we use export_lookback_window here
172+
cursor_value = (pendulum.parse(cursor_value) - timedelta(seconds=self.export_lookback_window)).to_iso8601_string()
173+
stream_state_date = pendulum.parse(stream_state[self.cursor_field])
174+
start_date = max(start_date, stream_state_date.date())
171175

176+
final_lookback_window = max(self.export_lookback_window, self.attribution_window * 24 * 60 * 60)
172177
# move start_date back <attribution_window> days to sync data since that time as well
173-
start_date = start_date - timedelta(days=self.attribution_window)
178+
start_date = start_date - timedelta(seconds=final_lookback_window)
174179

175180
# end_date cannot be later than today
176181
end_date = min(self.end_date, pendulum.today(tz=self.project_timezone).date())

airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py

+60
Original file line numberDiff line numberDiff line change
@@ -737,3 +737,63 @@ def test_export_iter_dicts(config):
737737
assert list(stream.iter_dicts([record_string, record_string[:2], record_string[2:], record_string])) == [record, record, record]
738738
# drop record parts because they are not standing nearby
739739
assert list(stream.iter_dicts([record_string, record_string[:2], record_string, record_string[2:]])) == [record, record]
740+
741+
742+
def test_export_stream_lookback_window(requests_mock, export_response, config_raw, mocker):
743+
"""Test that export_lookback_window correctly adjusts the start date during incremental sync and verifies slice parameters"""
744+
config_raw["export_lookback_window"] = 7200 # 1 hour lookback
745+
config_raw["start_date"] = "2021-06-01T00:00:00Z"
746+
config_raw["end_date"] = "2021-07-10T00:00:00Z"
747+
748+
stream = init_stream("export", config=config_raw)
749+
750+
# Mock get_json_schema to avoid actual schema fetching
751+
mocker.patch.object(
752+
Export,
753+
"get_json_schema",
754+
return_value={
755+
"type": "object",
756+
"properties": {
757+
"event": {"type": "string"},
758+
"time": {"type": "string"},
759+
"distinct_id": {"type": "string"},
760+
"insert_id": {"type": "string"},
761+
},
762+
},
763+
)
764+
765+
# Mock response with two records at different times in JSONL format
766+
export_response_multiple = (
767+
b'{"event": "Viewed Page", "properties": {"time": 1623860880, "distinct_id": "user1", "$insert_id": "insert1"}}\n'
768+
b'{"event": "Clicked Button", "properties": {"time": 1623864480, "distinct_id": "user2", "$insert_id": "insert2"}}'
769+
)
770+
771+
requests_mock.register_uri(
772+
"GET",
773+
get_url_to_mock(stream),
774+
content=export_response_multiple, # Use content directly for bytes
775+
status_code=200,
776+
)
777+
778+
# State with a timestamp 1 hour ago from the latest record
779+
stream_state = {"time": "2021-06-16T16:28:00Z"}
780+
stream_slices = list(stream.stream_slices(sync_mode=SyncMode.incremental, stream_state=stream_state))
781+
assert len(stream_slices) > 0 # Ensure we have at least one slice
782+
stream_slice = stream_slices[0]
783+
784+
# Verify slice parameters
785+
expected_start = pendulum.parse("2021-06-16T14:28:00Z") # 16:28:00 - 2 hours due to lookback
786+
expected_end = pendulum.parse("2021-07-10T00:00:00Z") # From config end_date
787+
788+
# Note: start_date might differ due to date_window_size slicing, adjust if needed
789+
assert pendulum.parse(stream_slice["start_date"]) == pendulum.parse("2021-06-11T00:00:00Z") # Adjusted by attribution_window
790+
assert pendulum.parse(stream_slice["end_date"]) == expected_end
791+
assert pendulum.parse(stream_slice["time"]) == expected_start
792+
793+
# Read records and verify both are included due to lookback
794+
records = list(stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice))
795+
assert len(records) == 2
796+
797+
# Verify updated state is set to the latest record time
798+
new_state = stream.get_updated_state(stream_state, records[-1])
799+
assert new_state["time"] == "2021-06-16T17:28:00Z"

docs/integrations/sources/mixpanel.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ To set up the Mixpanel source connector, you'll need a Mixpanel [Service Account
2020
10. For **End Date**, enter the date in YYYY-MM-DD format.
2121
11. For **Region**, enter the [region](https://help.mixpanel.com/hc/en-us/articles/360039135652-Data-Residency-in-EU) for your Mixpanel project.
2222
12. For **Date slicing window**, enter the number of days to slice through data. If you encounter RAM usage issues due to a huge amount of data in each window, try using a lower value for this parameter.
23-
13. Click **Set up source**.
23+
13. For **Export Lookback Window**, enter the number of seconds to look back from the last synced timestamp during incremental syncs of the Export stream. This ensures no data is missed due to event recording delays. Default is 0 seconds.
24+
14. Click **Set up source**.
2425

2526
## Supported sync modes
2627

@@ -58,6 +59,7 @@ Syncing huge date windows may take longer due to Mixpanel's low API rate-limits
5859

5960
| Version | Date | Pull Request | Subject |
6061
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
62+
| 3.5.0 | 2025-03-10 | [55673](https://github.com/airbytehq/airbyte/pull/55673) | Add Export Lookback Window |
6163
| 3.4.21 | 2025-03-06 | [55224](https://github.com/airbytehq/airbyte/pull/55224) | Migrate streams to latest endpoint |
6264
| 3.4.20 | 2025-03-01 | [54769](https://github.com/airbytehq/airbyte/pull/54769) | Update dependencies |
6365
| 3.4.19 | 2025-02-22 | [54319](https://github.com/airbytehq/airbyte/pull/54319) | Update dependencies |

0 commit comments

Comments
 (0)