Skip to content

Commit d6758e3

Browse files
artem1205jatinyadav-cc
authored andcommitted
Source SalesForce: Add Stream Slice Step option to specification (airbytehq#35421)
Signed-off-by: Artem Inzhyyants <[email protected]>
1 parent ded3403 commit d6758e3

File tree

11 files changed

+140
-63
lines changed

11 files changed

+140
-63
lines changed

airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml

+2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ acceptance_tests:
4848
future_state:
4949
future_state_path: "integration_tests/future_state.json"
5050
timeout_seconds: 7200
51+
# skip incremental tests as filter condition greater than or equal is used, so last record for any stream state is duplicated
52+
skip_comprehensive_incremental_tests: true
5153
full_refresh:
5254
tests:
5355
- config_path: "secrets/config.json"

airbyte-integrations/connectors/source-salesforce/integration_tests/expected_records.jsonl

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
{"stream": "ActiveProfileMetric", "data": {"Id": "5H04W00000U3Ph4SAF", "MetricsDate": "2023-10-22", "UserLicenseId": "1004W000001gXv2QAE", "ProfileId": "00e4W000002LjMoQAK", "SystemModstamp": "2023-10-22T05:59:12.000Z", "AssignedUserCount": 0, "ActiveUserCount": 0}, "emitted_at": 1698150320258}
1111
{"stream": "ActiveProfileMetric", "data": {"Id": "5H04W00000U3Ph5SAF", "MetricsDate": "2023-10-22", "UserLicenseId": "1004W000001gXv3QAE", "ProfileId": "00e4W000002LjMqQAK", "SystemModstamp": "2023-10-22T05:59:12.000Z", "AssignedUserCount": 0, "ActiveUserCount": 0}, "emitted_at": 1698150320258}
1212
{"stream": "ActiveProfileMetric", "data": {"Id": "5H04W00000U3Ph6SAF", "MetricsDate": "2023-10-22", "UserLicenseId": "1004W000001gXv4QAE", "ProfileId": "00e4W000002LjMrQAK", "SystemModstamp": "2023-10-22T05:59:12.000Z", "AssignedUserCount": 0, "ActiveUserCount": 0}, "emitted_at": 1698150320259}
13-
{"stream": "AppDefinition", "data": {"Id": "000000000000000AAA", "DurableId": "06m4W000001ldIZQAY", "Label": "Sales", "MasterLabel": "salesforce", "NamespacePrefix": "standard", "DeveloperName": "Sales", "LogoUrl": "/img/salesforce-noname-logo-v2.svg", "Description": "The world's most popular sales force automation (SFA) solution", "UiType": "Aloha", "NavType": "Standard", "UtilityBar": null, "HeaderColor": "#0070D2", "IsOverrideOrgTheme": false, "IsSmallFormFactorSupported": false, "IsMediumFormFactorSupported": false, "IsLargeFormFactorSupported": false, "IsNavPersonalizationDisabled": false, "IsNavAutoTempTabsDisabled": false, "IsNavTabPersistenceDisabled": false}, "emitted_at": 1697452785550}
14-
{"stream": "AppDefinition", "data": {"Id": "000000000000000AAA", "DurableId": "06m4W000001ldIdQAI", "Label": "Service", "MasterLabel": "supportforce", "NamespacePrefix": "standard", "DeveloperName": "Service", "LogoUrl": "/img/salesforce-noname-logo-v2.svg", "Description": "Manage customer service with accounts, contacts, cases, and more", "UiType": "Aloha", "NavType": "Standard", "UtilityBar": null, "HeaderColor": "#0070D2", "IsOverrideOrgTheme": false, "IsSmallFormFactorSupported": false, "IsMediumFormFactorSupported": false, "IsLargeFormFactorSupported": true, "IsNavPersonalizationDisabled": false, "IsNavAutoTempTabsDisabled": false, "IsNavTabPersistenceDisabled": false}, "emitted_at": 1697452785551}
15-
{"stream": "AppDefinition", "data": {"Id": "000000000000000AAA", "DurableId": "06m4W000001ldIeQAI", "Label": "Marketing", "MasterLabel": "Marketing", "NamespacePrefix": "standard", "DeveloperName": "Marketing", "LogoUrl": "/img/salesforce-noname-logo-v2.svg", "Description": "Best-in-class on-demand marketing automation", "UiType": "Aloha", "NavType": "Standard", "UtilityBar": null, "HeaderColor": "#0070D2", "IsOverrideOrgTheme": false, "IsSmallFormFactorSupported": false, "IsMediumFormFactorSupported": false, "IsLargeFormFactorSupported": true, "IsNavPersonalizationDisabled": false, "IsNavAutoTempTabsDisabled": false, "IsNavTabPersistenceDisabled": false}, "emitted_at": 1697452785552}
13+
{"stream":"AppDefinition","data":{"Id":"000000000000000AAA","DurableId":"06m4W000001ldIZQAY","Label":"Sales","MasterLabel":"salesforce","NamespacePrefix":"standard","DeveloperName":"Sales","LogoUrl":"/img/salesforce-noname-logo-v2.svg","Description":"The world's most popular sales force automation (SFA) solution","UiType":"Aloha","NavType":"Standard","UtilityBar":null,"HeaderColor":"#0070D2","IsOverrideOrgTheme":false,"IsSmallFormFactorSupported":false,"IsMediumFormFactorSupported":false,"IsLargeFormFactorSupported":false,"IsNavPersonalizationDisabled":false,"IsNavAutoTempTabsDisabled":false,"IsNavTabPersistenceDisabled":false},"emitted_at":1708425402368}
14+
{"stream":"AppDefinition","data":{"Id":"000000000000000AAA","DurableId":"06m4W000001ldIdQAI","Label":"Service","MasterLabel":"supportforce","NamespacePrefix":"standard","DeveloperName":"Service","LogoUrl":"/img/salesforce-noname-logo-v2.svg","Description":"Manage customer service with accounts, contacts, cases, and more","UiType":"Aloha","NavType":"Standard","UtilityBar":null,"HeaderColor":"#0070D2","IsOverrideOrgTheme":false,"IsSmallFormFactorSupported":false,"IsMediumFormFactorSupported":false,"IsLargeFormFactorSupported":true,"IsNavPersonalizationDisabled":false,"IsNavAutoTempTabsDisabled":false,"IsNavTabPersistenceDisabled":false},"emitted_at":1708425402369}
15+
{"stream":"AppDefinition","data":{"Id":"000000000000000AAA","DurableId":"06m4W000001ldIeQAI","Label":"Marketing CRM Classic","MasterLabel":"Marketing","NamespacePrefix":"standard","DeveloperName":"Marketing","LogoUrl":"/img/salesforce-noname-logo-v2.svg","Description":"Track sales and marketing efforts with CRM objects.","UiType":"Aloha","NavType":"Standard","UtilityBar":null,"HeaderColor":"#0070D2","IsOverrideOrgTheme":false,"IsSmallFormFactorSupported":false,"IsMediumFormFactorSupported":false,"IsLargeFormFactorSupported":true,"IsNavPersonalizationDisabled":false,"IsNavAutoTempTabsDisabled":false,"IsNavTabPersistenceDisabled":false},"emitted_at":1708425402369}
1616
{"stream": "Asset", "data": {"attributes": {"type": "Asset", "url": "/services/data/v57.0/sobjects/Asset/02i4W00000EkJspQAF"}, "Id": "02i4W00000EkJspQAF", "ContactId": null, "AccountId": "0014W00002DkoWNQAZ", "ParentId": null, "RootAssetId": "02i4W00000EkJspQAF", "Product2Id": null, "ProductCode": null, "IsCompetitorProduct": false, "CreatedDate": "2021-01-18T21:44:57.000+0000", "CreatedById": "0054W00000BZkk0QAD", "LastModifiedDate": "2021-01-18T21:44:57.000+0000", "LastModifiedById": "0054W00000BZkk0QAD", "SystemModstamp": "2021-01-18T21:44:57.000+0000", "IsDeleted": false, "Name": "Radish - Black, Winter, Organic", "SerialNumber": null, "InstallDate": null, "PurchaseDate": null, "UsageEndDate": null, "LifecycleStartDate": null, "LifecycleEndDate": null, "Status": null, "Price": null, "Quantity": null, "Description": null, "OwnerId": "0054W00000BZkk0QAD", "AssetProvidedById": null, "AssetServicedById": null, "IsInternal": false, "AssetLevel": 1, "StockKeepingUnit": null, "HasLifecycleManagement": false, "CurrentMrr": null, "CurrentLifecycleEndDate": null, "CurrentQuantity": null, "CurrentAmount": null, "TotalLifecycleAmount": null, "Street": null, "City": null, "State": null, "PostalCode": null, "Country": null, "Latitude": null, "Longitude": null, "GeocodeAccuracy": null, "Address": null, "LastViewedDate": null, "LastReferencedDate": null}, "emitted_at": 1697452787097}
1717
{"stream": "Asset", "data": {"attributes": {"type": "Asset", "url": "/services/data/v57.0/sobjects/Asset/02i4W00000EkJsqQAF"}, "Id": "02i4W00000EkJsqQAF", "ContactId": null, "AccountId": "0014W00002DkoW0QAJ", "ParentId": null, "RootAssetId": "02i4W00000EkJsqQAF", "Product2Id": null, "ProductCode": null, "IsCompetitorProduct": false, "CreatedDate": "2021-01-18T21:44:57.000+0000", "CreatedById": "0054W00000BZkk0QAD", "LastModifiedDate": "2021-01-18T21:44:57.000+0000", "LastModifiedById": "0054W00000BZkk0QAD", "SystemModstamp": "2021-01-18T21:44:57.000+0000", "IsDeleted": false, "Name": "Cheese - Valancey", "SerialNumber": null, "InstallDate": null, "PurchaseDate": null, "UsageEndDate": null, "LifecycleStartDate": null, "LifecycleEndDate": null, "Status": null, "Price": null, "Quantity": null, "Description": null, "OwnerId": "0054W00000BZkk0QAD", "AssetProvidedById": null, "AssetServicedById": null, "IsInternal": false, "AssetLevel": 1, "StockKeepingUnit": null, "HasLifecycleManagement": false, "CurrentMrr": null, "CurrentLifecycleEndDate": null, "CurrentQuantity": null, "CurrentAmount": null, "TotalLifecycleAmount": null, "Street": null, "City": null, "State": null, "PostalCode": null, "Country": null, "Latitude": null, "Longitude": null, "GeocodeAccuracy": null, "Address": null, "LastViewedDate": null, "LastReferencedDate": null}, "emitted_at": 1697452787099}
1818
{"stream": "Asset", "data": {"attributes": {"type": "Asset", "url": "/services/data/v57.0/sobjects/Asset/02i4W00000EkJsrQAF"}, "Id": "02i4W00000EkJsrQAF", "ContactId": null, "AccountId": "0014W00002DkoW5QAJ", "ParentId": null, "RootAssetId": "02i4W00000EkJsrQAF", "Product2Id": null, "ProductCode": null, "IsCompetitorProduct": false, "CreatedDate": "2021-01-18T21:44:57.000+0000", "CreatedById": "0054W00000BZkk0QAD", "LastModifiedDate": "2021-01-18T21:44:57.000+0000", "LastModifiedById": "0054W00000BZkk0QAD", "SystemModstamp": "2021-01-18T21:44:57.000+0000", "IsDeleted": false, "Name": "Truffle Cups Green", "SerialNumber": null, "InstallDate": null, "PurchaseDate": null, "UsageEndDate": null, "LifecycleStartDate": null, "LifecycleEndDate": null, "Status": null, "Price": null, "Quantity": null, "Description": null, "OwnerId": "0054W00000BZkk0QAD", "AssetProvidedById": null, "AssetServicedById": null, "IsInternal": false, "AssetLevel": 1, "StockKeepingUnit": null, "HasLifecycleManagement": false, "CurrentMrr": null, "CurrentLifecycleEndDate": null, "CurrentQuantity": null, "CurrentAmount": null, "TotalLifecycleAmount": null, "Street": null, "City": null, "State": null, "PostalCode": null, "Country": null, "Latitude": null, "Longitude": null, "GeocodeAccuracy": null, "Address": null, "LastViewedDate": null, "LastReferencedDate": null}, "emitted_at": 1697452787100}

airbyte-integrations/connectors/source-salesforce/metadata.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ data:
66
hosts:
77
- "*.salesforce.com"
88
connectorBuildOptions:
9-
baseImage: docker.io/airbyte/python-connector-base:1.1.0@sha256:bd98f6505c6764b1b5f99d3aedc23dfc9e9af631a62533f60eb32b1d3dbab20c
9+
baseImage: docker.io/airbyte/python-connector-base:1.2.0@sha256:c22a9d97464b69d6ef01898edf3f8612dc11614f05a84984451dde195f337db9
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: b117307c-14b6-41aa-9422-947e34922962
13-
dockerImageTag: 2.3.1
13+
dockerImageTag: 2.3.2
1414
dockerRepository: airbyte/source-salesforce
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
1616
githubIssueLabel: source-salesforce

airbyte-integrations/connectors/source-salesforce/poetry.lock

+5-5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

airbyte-integrations/connectors/source-salesforce/pyproject.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "2.3.1"
6+
version = "2.3.2"
77
name = "source-salesforce"
88
description = "Source implementation for Salesforce."
99
authors = [ "Airbyte <[email protected]>",]
@@ -18,7 +18,7 @@ include = "source_salesforce"
1818
[tool.poetry.dependencies]
1919
python = "^3.9,<3.12"
2020
pandas = "==2.2.0"
21-
airbyte-cdk = "==0.59.2"
21+
airbyte-cdk = "^0.63.2"
2222

2323
[tool.poetry.scripts]
2424
source-salesforce = "source_salesforce.run:run"

airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py

+21-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
2323
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
2424
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
25+
from airbyte_protocol.models import FailureType
2526
from dateutil.relativedelta import relativedelta
27+
from pendulum.parsing.exceptions import ParserError
2628
from requests import codes, exceptions # type: ignore[import]
2729

2830
from .api import PARENT_SALESFORCE_OBJECTS, UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce
@@ -49,7 +51,7 @@ class SourceSalesforce(ConcurrentSourceAdapter):
4951
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
5052
START_DATE_OFFSET_IN_YEARS = 2
5153
MAX_WORKERS = 5
52-
54+
stop_sync_on_stream_failure = True
5355
message_repository = InMemoryMessageRepository(Level(AirbyteLogFormatter.level_mapping[logger.level]))
5456

5557
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: Optional[TState], **kwargs):
@@ -71,7 +73,24 @@ def _get_sf_object(config: Mapping[str, Any]) -> Salesforce:
7173
sf.login()
7274
return sf
7375

76+
@staticmethod
77+
def _validate_stream_slice_step(stream_slice_step: str):
78+
if stream_slice_step:
79+
try:
80+
duration = pendulum.parse(stream_slice_step)
81+
if not isinstance(duration, pendulum.Duration):
82+
message = "Stream slice step Interval should be provided in ISO 8601 format."
83+
elif duration < pendulum.Duration(seconds=1):
84+
message = "Stream slice step Interval is too small. It should be no less than 1 second. Please set higher value and try again."
85+
else:
86+
return
87+
raise ParserError(message)
88+
except ParserError as e:
89+
internal_message = "Incorrect stream slice step"
90+
raise AirbyteTracedException(failure_type=FailureType.config_error, internal_message=internal_message, message=e.args[0])
91+
7492
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Optional[str]]:
93+
self._validate_stream_slice_step(config.get("stream_slice_step"))
7594
try:
7695
salesforce = self._get_sf_object(config)
7796
salesforce.describe()
@@ -147,6 +166,7 @@ def prepare_stream(cls, stream_name: str, json_schema, sobject_options, sf_objec
147166
if replication_key and stream_name not in UNSUPPORTED_FILTERING_STREAMS:
148167
stream_class = incremental
149168
stream_kwargs["replication_key"] = replication_key
169+
stream_kwargs["stream_slice_step"] = config.get("stream_slice_step", "P30D")
150170
else:
151171
stream_class = full_refresh
152172

airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml

+13-1
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,21 @@ connectionSpecification:
6060
description: Toggle to use Bulk API (this might cause empty fields for some streams)
6161
default: false
6262
order: 6
63+
stream_slice_step:
64+
title: Stream Slice Step for Incremental sync
65+
type: string
66+
description: The size of the time window (ISO8601 duration) to slice requests.
67+
default: P30D
68+
order: 7
69+
examples:
70+
- PT12H
71+
- P7D
72+
- P30D
73+
- P1M
74+
- P1Y
6375
streams_criteria:
6476
type: array
65-
order: 7
77+
order: 8
6678
items:
6779
type: object
6880
required:

airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py

+19-14
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import uuid
1212
from abc import ABC
1313
from contextlib import closing
14-
from datetime import datetime, timedelta
1514
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Type, Union
1615

1716
import pandas as pd
@@ -112,7 +111,7 @@ def get_error_display_message(self, exception: BaseException) -> Optional[str]:
112111
return f"After {self.max_retries} retries the connector has failed with a network error. It looks like Salesforce API experienced temporary instability, please try again later."
113112
return super().get_error_display_message(exception)
114113

115-
def get_start_date_from_state(self, stream_state: Mapping[str, Any] = None) -> datetime:
114+
def get_start_date_from_state(self, stream_state: Mapping[str, Any] = None) -> pendulum.DateTime:
116115
if self.state_converter.is_state_message_compatible(stream_state):
117116
# stream_state is in the concurrent format
118117
if stream_state.get("slices", []):
@@ -689,28 +688,34 @@ def transform_empty_string_to_none(instance: Any, schema: Any):
689688

690689
class IncrementalRestSalesforceStream(RestSalesforceStream, ABC):
691690
state_checkpoint_interval = 500
692-
STREAM_SLICE_STEP = 30
693691
_slice = None
694692

695-
def __init__(self, replication_key: str, **kwargs):
693+
def __init__(self, replication_key: str, stream_slice_step: str = "P30D", **kwargs):
696694
super().__init__(**kwargs)
697695
self.replication_key = replication_key
696+
self._stream_slice_step = stream_slice_step
698697

699698
def stream_slices(
700699
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
701700
) -> Iterable[Optional[Mapping[str, Any]]]:
702-
start, end = (None, None)
703701
now = pendulum.now(tz="UTC")
704702
assert LOOKBACK_SECONDS is not None and LOOKBACK_SECONDS >= 0
705-
initial_date = self.get_start_date_from_state(stream_state) - timedelta(seconds=LOOKBACK_SECONDS)
706-
707-
slice_number = 1
708-
while not end == now:
709-
start = initial_date.add(days=(slice_number - 1) * self.STREAM_SLICE_STEP)
710-
end = min(now, initial_date.add(days=slice_number * self.STREAM_SLICE_STEP))
711-
self._slice = {"start_date": start.isoformat(timespec="milliseconds"), "end_date": end.isoformat(timespec="milliseconds")}
712-
yield {"start_date": start.isoformat(timespec="milliseconds"), "end_date": end.isoformat(timespec="milliseconds")}
713-
slice_number = slice_number + 1
703+
704+
initial_date = self.get_start_date_from_state(stream_state) - pendulum.Duration(seconds=LOOKBACK_SECONDS)
705+
slice_start = initial_date
706+
while slice_start < now:
707+
slice_end = slice_start + self.stream_slice_step
708+
self._slice = {
709+
"start_date": slice_start.isoformat(timespec="milliseconds"),
710+
"end_date": min(slice_end, now).isoformat(timespec="milliseconds"),
711+
}
712+
yield self._slice
713+
714+
slice_start += self.stream_slice_step
715+
716+
@property
717+
def stream_slice_step(self) -> pendulum.Duration:
718+
return pendulum.parse(self._stream_slice_step)
714719

715720
def request_params(
716721
self,

0 commit comments

Comments
 (0)