Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source-mixpanel): Add Export Lookback Window #55673

Merged
merged 4 commits into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerImageTag: 3.4.21
dockerImageTag: 3.5.0
dockerRepository: airbyte/source-mixpanel
documentationUrl: https://docs.airbyte.com/integrations/sources/mixpanel
githubIssueLabel: source-mixpanel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "3.4.21"
version = "3.5.0"
name = "source-mixpanel"
description = "Source implementation for Mixpanel."
authors = ["Airbyte <[email protected]>"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]):
date_window_size,
project_id,
page_size,
export_lookback_window,
) = (
config.get("project_timezone", "US/Pacific"),
config.get("start_date"),
Expand All @@ -93,6 +94,7 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]):
config.get("date_window_size", 30),
config.get("credentials", dict()).get("project_id"),
config.get("page_size", 1000),
config.get("export_lookback_window", 0),
)
try:
project_timezone = pendulum.timezone(project_timezone)
Expand All @@ -109,6 +111,8 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]):
raise_config_error("Please provide a valid integer for the `Attribution window` parameter.")
if not isinstance(date_window_size, int) or date_window_size < 1:
raise_config_error("Please provide a valid integer for the `Date slicing window` parameter.")
if not isinstance(export_lookback_window, int) or export_lookback_window < 0:
raise_config_error("Please provide a valid integer for the `Export Lookback Window` parameter.")

auth = self.get_authenticator(config)
if isinstance(auth, TokenAuthenticatorBase64) and project_id:
Expand All @@ -126,5 +130,6 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]):
config["date_window_size"] = date_window_size
config["project_id"] = project_id
config["page_size"] = page_size
config["export_lookback_window"] = export_lookback_window

return config
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@
"type": "integer",
"minimum": 1,
"default": 1000
},
"export_lookback_window": {
"order": 10,
"title": "Export Lookback Window",
"description": "The number of seconds to look back from the last synced timestamp during incremental syncs of the Export stream. This ensures no data is missed due to delays in event recording. Default is 0 seconds. Must be a non-negative integer.",
"type": "integer",
"minimum": 0,
"default": 0
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
end_date: Optional[Date] = None,
date_window_size: int = 30, # in days
attribution_window: int = 0, # in days
export_lookback_window: int = 0, # in seconds
select_properties_by_default: bool = True,
project_id: int = None,
reqs_per_hour_limit: int = DEFAULT_REQS_PER_HOUR_LIMIT,
Expand All @@ -68,6 +69,7 @@ def __init__(
self.end_date = end_date
self.date_window_size = date_window_size
self.attribution_window = attribution_window
self.export_lookback_window = export_lookback_window
self.additional_properties = select_properties_by_default
self.region = region
self.project_timezone = project_timezone
Expand Down Expand Up @@ -166,11 +168,14 @@ def stream_slices(
# Remove time part from state because API accept 'from_date' param in date format only ('YYYY-MM-DD')
# It also means that sync returns duplicated entries for the date from the state (date range is inclusive)
cursor_value = stream_state[self.cursor_field]
stream_state_date = pendulum.parse(stream_state[self.cursor_field]).date()
start_date = max(start_date, stream_state_date)
# This stream is only used for Export stream, so we use export_lookback_window here
cursor_value = (pendulum.parse(cursor_value) - timedelta(seconds=self.export_lookback_window)).to_iso8601_string()
stream_state_date = pendulum.parse(stream_state[self.cursor_field])
start_date = max(start_date, stream_state_date.date())

final_lookback_window = max(self.export_lookback_window, self.attribution_window * 24 * 60 * 60)
# move start_date back <attribution_window> days to sync data since that time as well
start_date = start_date - timedelta(days=self.attribution_window)
start_date = start_date - timedelta(seconds=final_lookback_window)

# end_date cannot be later than today
end_date = min(self.end_date, pendulum.today(tz=self.project_timezone).date())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,3 +737,63 @@ def test_export_iter_dicts(config):
assert list(stream.iter_dicts([record_string, record_string[:2], record_string[2:], record_string])) == [record, record, record]
# drop record parts because they are not standing nearby
assert list(stream.iter_dicts([record_string, record_string[:2], record_string, record_string[2:]])) == [record, record]


def test_export_stream_lookback_window(requests_mock, export_response, config_raw, mocker):
"""Test that export_lookback_window correctly adjusts the start date during incremental sync and verifies slice parameters"""
config_raw["export_lookback_window"] = 7200 # 1 hour lookback
config_raw["start_date"] = "2021-06-01T00:00:00Z"
config_raw["end_date"] = "2021-07-10T00:00:00Z"

stream = init_stream("export", config=config_raw)

# Mock get_json_schema to avoid actual schema fetching
mocker.patch.object(
Export,
"get_json_schema",
return_value={
"type": "object",
"properties": {
"event": {"type": "string"},
"time": {"type": "string"},
"distinct_id": {"type": "string"},
"insert_id": {"type": "string"},
},
},
)

# Mock response with two records at different times in JSONL format
export_response_multiple = (
b'{"event": "Viewed Page", "properties": {"time": 1623860880, "distinct_id": "user1", "$insert_id": "insert1"}}\n'
b'{"event": "Clicked Button", "properties": {"time": 1623864480, "distinct_id": "user2", "$insert_id": "insert2"}}'
)

requests_mock.register_uri(
"GET",
get_url_to_mock(stream),
content=export_response_multiple, # Use content directly for bytes
status_code=200,
)

# State with a timestamp 1 hour ago from the latest record
stream_state = {"time": "2021-06-16T16:28:00Z"}
stream_slices = list(stream.stream_slices(sync_mode=SyncMode.incremental, stream_state=stream_state))
assert len(stream_slices) > 0 # Ensure we have at least one slice
stream_slice = stream_slices[0]

# Verify slice parameters
expected_start = pendulum.parse("2021-06-16T14:28:00Z") # 16:28:00 - 2 hours due to lookback
expected_end = pendulum.parse("2021-07-10T00:00:00Z") # From config end_date

# Note: start_date might differ due to date_window_size slicing, adjust if needed
assert pendulum.parse(stream_slice["start_date"]) == pendulum.parse("2021-06-11T00:00:00Z") # Adjusted by attribution_window
assert pendulum.parse(stream_slice["end_date"]) == expected_end
assert pendulum.parse(stream_slice["time"]) == expected_start

# Read records and verify both are included due to lookback
records = list(stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice))
assert len(records) == 2

# Verify updated state is set to the latest record time
new_state = stream.get_updated_state(stream_state, records[-1])
assert new_state["time"] == "2021-06-16T17:28:00Z"
4 changes: 3 additions & 1 deletion docs/integrations/sources/mixpanel.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ To set up the Mixpanel source connector, you'll need a Mixpanel [Service Account
10. For **End Date**, enter the date in YYYY-MM-DD format.
11. For **Region**, enter the [region](https://help.mixpanel.com/hc/en-us/articles/360039135652-Data-Residency-in-EU) for your Mixpanel project.
12. For **Date slicing window**, enter the number of days to slice through data. If you encounter RAM usage issues due to a huge amount of data in each window, try using a lower value for this parameter.
13. Click **Set up source**.
13. For **Export Lookback Window**, enter the number of seconds to look back from the last synced timestamp during incremental syncs of the Export stream. This ensures no data is missed due to event recording delays. Default is 0 seconds.
14. Click **Set up source**.

## Supported sync modes

Expand Down Expand Up @@ -58,6 +59,7 @@ Syncing huge date windows may take longer due to Mixpanel's low API rate-limits

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.5.0 | 2025-03-10 | [55673](https://github.com/airbytehq/airbyte/pull/55673) | Add Export Lookback Window |
| 3.4.21 | 2025-03-06 | [55224](https://github.com/airbytehq/airbyte/pull/55224) | Migrate streams to latest endpoint |
| 3.4.20 | 2025-03-01 | [54769](https://github.com/airbytehq/airbyte/pull/54769) | Update dependencies |
| 3.4.19 | 2025-02-22 | [54319](https://github.com/airbytehq/airbyte/pull/54319) | Update dependencies |
Expand Down
Loading