Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Source Zendesk Support: ticket metrics streams to low-code #55795

Open
wants to merge 8 commits into
base: issue-11894/article_streams-to-low-code
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,93 @@ definitions:
cursor_filter: "start_time"
primary_key: "id"

ticket_metrics_stream:
type: StateDelegatingStream
name: "ticket_metrics"
schema_loader:
type: JsonFileSchemaLoader
file_path: "./source_zendesk_support/schemas/ticket_metrics.json"
full_refresh_stream:
$ref: "#/definitions/base_incremental_stream"
schema_loader:
type: JsonFileSchemaLoader
file_path: "./source_zendesk_support/schemas/ticket_metrics.json"
name: "ticket_metrics"
primary_key: "id"
incremental_sync:
type: DatetimeBasedCursor
cursor_datetime_formats:
- "%s"
datetime_format: "%s"
cursor_field: "_ab_updated_at"
start_datetime:
datetime: "0" # not used as the API does not take filters in
retriever:
$ref: "#/definitions/retriever"
ignore_stream_slicer_parameters_on_paginated_requests: true
requester:
$ref: "#/definitions/retriever/requester"
path: "ticket_metrics"
paginator:
$ref: "#/definitions/links_next_paginator"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["ticket_metrics"]
transformations:
- type: AddFields
fields:
- path:
- "_ab_updated_at"
value: "{{ format_datetime(record['updated_at'], '%s') }}"
incremental_stream:
$ref: "#/definitions/base_incremental_stream"
schema_loader:
type: JsonFileSchemaLoader
file_path: "./source_zendesk_support/schemas/ticket_metrics.json"
name: "ticket_metrics"
primary_key: "id"
incremental_sync:
type: DatetimeBasedCursor
cursor_datetime_formats:
- "%s"
datetime_format: "%s"
cursor_field: "_ab_updated_at"
start_datetime:
datetime: "0" # not used as we should start from the state value
retriever:
$ref: "#/definitions/retriever"
ignore_stream_slicer_parameters_on_paginated_requests: true
requester:
$ref: "#/definitions/retriever/requester"
path: "tickets/{{ stream_partition.ticket_id }}/metrics"
paginator:
type: NoPagination
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["ticket_metric"]
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: "id"
partition_field: "ticket_id"
extra_fields:
- ["generated_timestamp"]
stream:
$ref: "#/definitions/tickets_stream"
incremental_dependency: true
transformations:
- type: AddFields
fields:
- path:
- "_ab_updated_at"
value: "{{ record['generated_timestamp'] if 'generated_timestamp' in record else stream_slice.extra_fields['generated_timestamp'] }}"
value_type: "integer"

topics_stream:
$ref: "#/definitions/semi_incremental_stream"
retriever:
Expand Down Expand Up @@ -951,6 +1038,7 @@ streams:
- $ref: "#/definitions/ticket_metric_events_stream"
- $ref: "#/definitions/ticket_skips_stream"
- $ref: "#/definitions/tickets_stream"
- $ref: "#/definitions/ticket_metrics_stream"
- $ref: "#/definitions/triggers_stream"
- $ref: "#/definitions/users_stream"
- $ref: "#/definitions/users_identities_stream"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from datetime import datetime
from typing import Any, List, Mapping, Optional, Tuple

import pendulum

from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.source import TState
Expand All @@ -17,7 +15,6 @@
from source_zendesk_support.streams import DATETIME_FORMAT, ZendeskConfigException

from .streams import (
TicketMetrics,
UserSettingsStream,
)

Expand All @@ -39,21 +36,6 @@ class SourceZendeskSupport(YamlDeclarativeSource):
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs):
super().__init__(catalog=catalog, config=config, state=state, **{"path_to_yaml": "manifest.yaml"})

@classmethod
def get_default_start_date(cls) -> str:
"""
Gets the default start date for data retrieval.

The default date is set to the current date and time in UTC minus 2 years.

Returns:
str: The default start date in 'YYYY-MM-DDTHH:mm:ss[Z]' format.

Note:
Start Date is a required request parameter for Zendesk Support API streams.
"""
return pendulum.now(tz="UTC").subtract(years=2).format("YYYY-MM-DDTHH:mm:ss[Z]")

@classmethod
def get_authenticator(cls, config: Mapping[str, Any]) -> [TokenAuthenticator, BasicApiTokenAuthenticator]:
# new authentication flow
Expand Down Expand Up @@ -89,29 +71,6 @@ def check_connection(self, logger, config) -> Tuple[bool, any]:
)
return True, None

@classmethod
def convert_config2stream_args(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
"""Convert input configs to parameters of the future streams
This function is used by unit tests too
"""
return {
"subdomain": config["subdomain"],
"start_date": config.get("start_date", cls.get_default_start_date()),
"authenticator": cls.get_authenticator(config),
"ignore_pagination": config.get("ignore_pagination", False),
}

def get_nested_streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""Returns relevant a list of available streams
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""
args = self.convert_config2stream_args(config)

streams = [
TicketMetrics(**args),
]
return streams

def check_enterprise_streams(self, declarative_streams: List[Stream]) -> List[Stream]:
"""Returns relevant a list of available streams
:param config: A Mapping of the user input configuration as defined in the connector spec.
Expand All @@ -137,9 +96,4 @@ def check_enterprise_streams(self, declarative_streams: List[Stream]) -> List[St

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
declarative_streams = super().streams(config)

nested_streams = self.get_nested_streams(config)
declarative_streams.extend(nested_streams)

declarative_streams = self.check_enterprise_streams(declarative_streams)
return declarative_streams
return self.check_enterprise_streams(declarative_streams)
Loading
Loading