Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Source-LinkedIn-Ads: Performance improvements for Campaign Analytics Streams #55747

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


import datetime
from collections import defaultdict
from collections import OrderedDict, defaultdict
from copy import deepcopy
from dataclasses import dataclass, field
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
Expand All @@ -20,6 +20,7 @@
from airbyte_cdk.sources.declarative.incremental import CursorFactory, DatetimeBasedCursor, PerPartitionCursor
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.partition_routers import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import SinglePartitionRouter
from airbyte_cdk.sources.declarative.requesters import HttpRequester
from airbyte_cdk.sources.declarative.requesters.error_handlers import DefaultErrorHandler
Expand Down Expand Up @@ -123,6 +124,89 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
)


@dataclass
class AnalyticsPerPartitionCursor(PerPartitionCursor):
"""
A custom PerPartitionCursor for Ad Analytics streams. Ensures that the state of the partition
data is passed to the cursor. It also helps to store extra fields of the partition in
the state.
"""

def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter):
"""
Constructs the partition cursor.
"""
super().__init__(cursor_factory, partition_router)
self._partition_meta: OrderedDict[str, dict] = OrderedDict()
self._initial_state: StreamState

def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
"""
Generate slices for a partition. It helps storing partition data inside _partition_meta.
Then, it passes the extra fields and the initial state of the partition to the stream
slice.
"""
# Ensure the maximum number of partitions is not exceeded
self._ensure_partition_limit()

cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
if not cursor:
partition_state = self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE
cursor = self._create_cursor(partition_state)
self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor

self._partition_meta[self._to_partition_key(partition.partition)] = partition.extra_fields
initial_state_of_partition = self._get_initial_state_for_partition(partition.partition)

cursor.partition = partition
cursor.parent_state = initial_state_of_partition
for cursor_slice in cursor.stream_slices():
yield StreamSlice(partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields)

def get_stream_state(self) -> StreamState:
"""
Gets stream state. While preparing the status of the stream, it adds extra information of
the parititon to the state.
"""
states = []
for partition_tuple, cursor in self._cursor_per_partition.items():
cursor_state = cursor.get_stream_state()
if cursor_state:
partition = self._to_dict(partition_tuple)
extras = {}
if partition_tuple in self._partition_meta:
extras = self._partition_meta[partition_tuple]

states.append({"partition": partition, "cursor": cursor_state, "extra": extras})
state: dict[str, Any] = {"states": states}

parent_state = self._partition_router.get_stream_state()
if parent_state:
state["parent_state"] = parent_state
return state

def set_initial_state(self, stream_state: StreamState) -> None:
"""
Sets the initiial state of the stream. It also stores the initial state of the stream.
"""
(super().set_initial_state(stream_state),)
self._initial_state = deepcopy(stream_state)

def _get_initial_state_for_partition(self, partition: Mapping[str, Any]):
"""
Gets the initial state for a given partition if exists.
"""
if "states" not in self._initial_state:
return None

states_for_partitions = self._initial_state["states"]
partition_key = self._to_partition_key(partition)
for state in states_for_partitions:
state_partition_key = self._to_partition_key(state["partition"])
if state_partition_key == partition_key:
return state


@dataclass
class AnalyticsDatetimeBasedCursor(DatetimeBasedCursor):
"""
Expand Down Expand Up @@ -188,6 +272,70 @@ def _partition_daterange(
return dates


@dataclass
class CampaignAnalyticsDatetimeBasedCursor(AnalyticsDatetimeBasedCursor):
"""
A cursor for Ad Campaign Analytics streams. Helps to prevent unnecessary API calls.
It uses the last status and modification time of the campaign to derive trimming strategy.


For the Completed campaigns, it considers the runSchedule field.
RunSchedule field contains the information about scheduling of the campaign.
If campaign is completed, we do not have to fetch records after the scheduling day.
"""

def stream_slices(self) -> Iterable[StreamSlice]:
# if campaign is completed use runSchedule.end as endDate
campaign_current_status = self.partition.extra_fields["status"]
campaign_current_motified_time = self.partition.extra_fields["lastModified"]

# if the status and last modification time of the campaign did not change
# and campaign is PAUSED, REMOVED or COMPLETED then we do not have to check for new data.
if (
self.parent_state
and "extra" in self.parent_state
and "status" in self.parent_state["extra"]
and "lastModified" in self.parent_state["extra"]
):
parent_extras = self.parent_state["extra"]
campaign_former_status = parent_extras["status"]
campaign_former_motified_time = parent_extras["lastModified"]

if (
campaign_former_status in ["PAUSED", "REMOVED", "COMPLETED"]
and campaign_former_status == campaign_current_status
and campaign_current_motified_time == campaign_former_motified_time
):
# meaning nothing changed since last sync, so we do not have to fetch new data for this campaign.
return []

# if campaign is COMPLETED, does not check for the data after "end" time of the schedule.
if campaign_current_status == "COMPLETED":
if "end" in self.partition.extra_fields["runSchedule"]:
end_datetime = datetime.datetime.fromtimestamp(
int(self.partition.extra_fields["runSchedule"]["end"] / 1000), tz=self._timezone
)
else: # some campaigns may miss scheduling, in this case assume now is the end date.
now = datetime.datetime.now(tz=self._timezone)
end_datetime = now
elif campaign_current_status in ["PAUSED", "REMOVED"]:
# for PAUSED and REMOVED campaigns, do not check data after the last modification time.
last_modified_date = self.partition.extra_fields["lastModified"]
end_datetime = datetime.datetime.strptime(last_modified_date, "%Y-%m-%dT%H:%M:%S%z")
elif campaign_current_status == "DRAFT":
# for DRAFT campaigns, we do not have to check for data.
return []
else:
end_datetime = self.select_best_end_datetime()

start_datetime = self._calculate_earliest_possible_value(end_datetime)

if start_datetime < end_datetime:
return self._partition_daterange(start_datetime, end_datetime, self._step)

return []


@dataclass
class LinkedInAdsRecordExtractor(RecordExtractor):
"""
Expand Down Expand Up @@ -240,7 +388,7 @@ def _initialize_cursor(self):
else self.partition_router
)

return PerPartitionCursor(
return AnalyticsPerPartitionCursor(
cursor_factory=CursorFactory(
lambda: deepcopy(self.stream_slicer),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,9 +583,13 @@ definitions:
partition_field: campaign_id
stream:
$ref: "#/definitions/streams/campaigns"
extra_fields:
- ["status"]
- ["runSchedule"]
- ["lastModified"]
incremental_sync:
type: CustomIncrementalSync
class_name: source_linkedin_ads.components.AnalyticsDatetimeBasedCursor
class_name: source_linkedin_ads.components.CampaignAnalyticsDatetimeBasedCursor
cursor_field: end_date
cursor_datetime_formats:
- "%Y-%m-%d"
Expand Down Expand Up @@ -745,9 +749,13 @@ definitions:
partition_field: campaign_id
stream:
$ref: "#/definitions/streams/campaigns"
extra_fields:
- ["status"]
- ["runSchedule"]
- ["lastModified"]
incremental_sync:
type: CustomIncrementalSync
class_name: source_linkedin_ads.components.AnalyticsDatetimeBasedCursor
class_name: source_linkedin_ads.components.CampaignAnalyticsDatetimeBasedCursor
cursor_field: end_date
cursor_datetime_formats:
- "%Y-%m-%d"
Expand Down Expand Up @@ -826,9 +834,13 @@ definitions:
partition_field: campaign_id
stream:
$ref: "#/definitions/streams/campaigns"
extra_fields:
- ["status"]
- ["runSchedule"]
- ["lastModified"]
incremental_sync:
type: CustomIncrementalSync
class_name: source_linkedin_ads.components.AnalyticsDatetimeBasedCursor
class_name: source_linkedin_ads.components.CampaignAnalyticsDatetimeBasedCursor
cursor_field: end_date
cursor_datetime_formats:
- "%Y-%m-%d"
Expand Down Expand Up @@ -907,9 +919,13 @@ definitions:
partition_field: campaign_id
stream:
$ref: "#/definitions/streams/campaigns"
extra_fields:
- ["status"]
- ["runSchedule"]
- ["lastModified"]
incremental_sync:
type: CustomIncrementalSync
class_name: source_linkedin_ads.components.AnalyticsDatetimeBasedCursor
class_name: source_linkedin_ads.components.CampaignAnalyticsDatetimeBasedCursor
cursor_field: end_date
cursor_datetime_formats:
- "%Y-%m-%d"
Expand Down Expand Up @@ -988,9 +1004,13 @@ definitions:
partition_field: campaign_id
stream:
$ref: "#/definitions/streams/campaigns"
extra_fields:
- ["status"]
- ["runSchedule"]
- ["lastModified"]
incremental_sync:
type: CustomIncrementalSync
class_name: source_linkedin_ads.components.AnalyticsDatetimeBasedCursor
class_name: source_linkedin_ads.components.CampaignAnalyticsDatetimeBasedCursor
cursor_field: end_date
cursor_datetime_formats:
- "%Y-%m-%d"
Expand Down Expand Up @@ -1069,9 +1089,13 @@ definitions:
partition_field: campaign_id
stream:
$ref: "#/definitions/streams/campaigns"
extra_fields:
- ["status"]
- ["runSchedule"]
- ["lastModified"]
incremental_sync:
type: CustomIncrementalSync
class_name: source_linkedin_ads.components.AnalyticsDatetimeBasedCursor
class_name: source_linkedin_ads.components.CampaignAnalyticsDatetimeBasedCursor
cursor_field: end_date
cursor_datetime_formats:
- "%Y-%m-%d"
Expand Down Expand Up @@ -1150,9 +1174,13 @@ definitions:
partition_field: campaign_id
stream:
$ref: "#/definitions/streams/campaigns"
extra_fields:
- ["status"]
- ["runSchedule"]
- ["lastModified"]
incremental_sync:
type: CustomIncrementalSync
class_name: source_linkedin_ads.components.AnalyticsDatetimeBasedCursor
class_name: source_linkedin_ads.components.CampaignAnalyticsDatetimeBasedCursor
cursor_field: end_date
cursor_datetime_formats:
- "%Y-%m-%d"
Expand Down Expand Up @@ -1231,9 +1259,13 @@ definitions:
partition_field: campaign_id
stream:
$ref: "#/definitions/streams/campaigns"
extra_fields:
- ["status"]
- ["runSchedule"]
- ["lastModified"]
incremental_sync:
type: CustomIncrementalSync
class_name: source_linkedin_ads.components.AnalyticsDatetimeBasedCursor
class_name: source_linkedin_ads.components.CampaignAnalyticsDatetimeBasedCursor
cursor_field: end_date
cursor_datetime_formats:
- "%Y-%m-%d"
Expand Down Expand Up @@ -1312,9 +1344,13 @@ definitions:
partition_field: campaign_id
stream:
$ref: "#/definitions/streams/campaigns"
extra_fields:
- ["status"]
- ["runSchedule"]
- ["lastModified"]
incremental_sync:
type: CustomIncrementalSync
class_name: source_linkedin_ads.components.AnalyticsDatetimeBasedCursor
class_name: source_linkedin_ads.components.CampaignAnalyticsDatetimeBasedCursor
cursor_field: end_date
cursor_datetime_formats:
- "%Y-%m-%d"
Expand Down Expand Up @@ -1393,9 +1429,13 @@ definitions:
partition_field: campaign_id
stream:
$ref: "#/definitions/streams/campaigns"
extra_fields:
- ["status"]
- ["runSchedule"]
- ["lastModified"]
incremental_sync:
type: CustomIncrementalSync
class_name: source_linkedin_ads.components.AnalyticsDatetimeBasedCursor
class_name: source_linkedin_ads.components.CampaignAnalyticsDatetimeBasedCursor
cursor_field: end_date
cursor_datetime_formats:
- "%Y-%m-%d"
Expand Down Expand Up @@ -1475,9 +1515,13 @@ definitions:
partition_field: campaign_id
stream:
$ref: "#/definitions/streams/campaigns"
extra_fields:
- ["status"]
- ["runSchedule"]
- ["lastModified"]
incremental_sync:
type: CustomIncrementalSync
class_name: source_linkedin_ads.components.AnalyticsDatetimeBasedCursor
class_name: source_linkedin_ads.components.CampaignAnalyticsDatetimeBasedCursor
cursor_field: end_date
cursor_datetime_formats:
- "%Y-%m-%d"
Expand Down
Loading