Skip to content

Commit eeabc38

Browse files
dbgold17david.goldnatikgadzhiian-at-airbyte
authored
Add optional board filtering for Source Monday streams (#55713)
Co-authored-by: david.gold <[email protected]> Co-authored-by: Natik Gadzhi <[email protected]> Co-authored-by: Ian Alton <[email protected]>
1 parent 728c4b9 commit eeabc38

20 files changed

+492
-7
lines changed

airbyte-integrations/connectors/source-monday/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: 80a54ea2-9959-4040-aac1-eee42423ec9b
13-
dockerImageTag: 2.1.13
13+
dockerImageTag: 2.2.0
1414
releases:
1515
breakingChanges:
1616
2.0.0:

airbyte-integrations/connectors/source-monday/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "2.1.13"
6+
version = "2.2.0"
77
name = "source-monday"
88
description = "Source implementation for Monday."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-monday/source_monday/graphql_requester.py

+13
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ def _build_query(self, object_name: str, field_schema: dict, **object_arguments)
7575
else:
7676
fields.append(field)
7777

78+
# when querying the boards stream (object_name == "boards"), filter by board_ids if they provided in the config
79+
if object_name == "boards" and "board_ids" in self.config:
80+
# if we are building a query for incremental syncs, board ids are already present under 'ids' key in object_arguments (as a result of fetching the activity_logs stream first)
81+
# These ids are already an intersection of the board_ids provided in the config and the ones that must be fetched for the incremental sync and need not be overridden
82+
if "ids" not in object_arguments:
83+
object_arguments["ids"] = self.config.get("board_ids")
84+
7885
arguments = self._get_object_arguments(**object_arguments)
7986
arguments = f"({arguments})" if arguments else ""
8087

@@ -113,6 +120,9 @@ def _build_items_query(self, object_name: str, field_schema: dict, sub_page: Opt
113120
query = self._build_query("next_items_page", field_schema, limit=nested_limit, cursor=f'"{sub_page}"')
114121
else:
115122
query = self._build_query("items_page", field_schema, limit=nested_limit)
123+
# since items are a subresource of boards, when querying items, filter by board_ids if provided in the config
124+
if "board_ids" in self.config and "ids" not in object_arguments:
125+
object_arguments["ids"] = self.config.get("board_ids")
116126
arguments = self._get_object_arguments(**object_arguments)
117127
query = f"boards({arguments}){{{query}}}"
118128

@@ -156,7 +166,10 @@ def _build_activity_query(self, object_name: str, field_schema: dict, sub_page:
156166
created_at = datetime.fromtimestamp(created_at).strftime("%Y-%m-%dT%H:%M:%SZ")
157167

158168
query = self._build_query(object_name, field_schema, limit=nested_limit, page=sub_page, fromt=created_at)
169+
if "board_ids" in self.config and "ids" not in object_arguments:
170+
object_arguments["ids"] = self.config.get("board_ids")
159171
arguments = self._get_object_arguments(**object_arguments)
172+
160173
return f"boards({arguments}){{{query}}}"
161174

162175
def get_request_headers(

airbyte-integrations/connectors/source-monday/source_monday/spec.json

+8
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@
7272
}
7373
}
7474
]
75+
},
76+
"board_ids": {
77+
"type": "array",
78+
"title": "Boards to sync",
79+
"description": "The IDs of the boards that the Items and Boards streams will extract records from. When left empty, streams will extract records from all boards that exist within the account.",
80+
"items": {
81+
"type": "integer"
82+
}
7583
}
7684
}
7785
},

airbyte-integrations/connectors/source-monday/unit_tests/integrations/config.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
22

3-
from typing import Any, Dict
3+
from typing import Any, Dict, List
44

55

66
class ConfigBuilder:
77
def __init__(self) -> None:
88
self._credentials: Dict[str, str] = {}
9+
self._board_ids: List[int] = []
910

1011
def with_oauth_credentials(self, client_id: str, client_secret: str, access_token: str, subdomain: str) -> "ConfigBuilder":
1112
self._credentials["auth_type"] = "oauth2.0"
@@ -20,8 +21,14 @@ def with_api_token_credentials(self, api_token: str) -> "ConfigBuilder":
2021
self._credentials["auth_type"] = "api_token"
2122
return self
2223

24+
def with_board_ids(self, board_ids: List[int]) -> "ConfigBuilder":
25+
self._board_ids = board_ids
26+
return self
27+
2328
def build(self) -> Dict[str, Any]:
2429
config = {}
2530
if self._credentials:
2631
config["credentials"] = self._credentials
32+
if self._board_ids:
33+
config["board_ids"] = self._board_ids
2734
return config
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
from .teams_requests_builder import TeamsRequestBuilder
2+
from .items_request_builder import ItemsRequestBuilder
3+
from .boards_request_builder import BoardsRequestBuilder
24

3-
__all__ = ["TeamsRequestBuilder"]
5+
__all__ = ["TeamsRequestBuilder", "ItemsRequestBuilder", "BoardsRequestBuilder"]

airbyte-integrations/connectors/source-monday/unit_tests/integrations/monday_requests/base_requests_builder.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
22

33
import abc
4-
from typing import Any, Dict, Optional
4+
from typing import Any, Dict, List, Optional
55

66
from airbyte_cdk.test.mock_http import HttpRequest
77

@@ -55,3 +55,7 @@ def request_body(self):
5555
def with_authenticator(self, authenticator: Authenticator) -> "MondayBaseRequestBuilder":
5656
self._authenticator: Authenticator = authenticator
5757
return self
58+
59+
def with_board_ids(self, board_ids: List[int]) -> "MondayBaseRequestBuilder":
60+
self._board_ids = board_ids
61+
return self
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
from typing import List
4+
5+
from .base_requests_builder import MondayBaseRequestBuilder
6+
from .request_authenticators.authenticator import Authenticator
7+
8+
9+
class BoardsRequestBuilder(MondayBaseRequestBuilder):
10+
@classmethod
11+
def boards_endpoint(cls, authenticator: Authenticator, board_ids: List[int] = None) -> "BoardsRequestBuilder":
12+
return cls().with_authenticator(authenticator).with_board_ids(board_ids)
13+
14+
@property
15+
def query_params(self):
16+
params = super().query_params or {}
17+
if self._board_ids:
18+
board_ids = ", ".join(list(map(str, self._board_ids)))
19+
board_ids_str = f",ids:[{board_ids}]"
20+
else:
21+
board_ids_str = ""
22+
23+
params["query"] = (
24+
"query{boards(limit:10%s){id,name,board_kind,type,columns{archived,description,id,settings_str,title,type,width},communication,description,groups{archived,color,deleted,id,position,title},owners{id},creator{id},permissions,state,subscribers{id},tags{id},top_group{id},updated_at,updates{id},views{id,name,settings_str,type,view_specific_data_str},workspace{id,name,kind,description}}}"
25+
% board_ids_str
26+
)
27+
return params
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
3+
from typing import List
4+
5+
from .base_requests_builder import MondayBaseRequestBuilder
6+
from .request_authenticators.authenticator import Authenticator
7+
8+
9+
class ItemsRequestBuilder(MondayBaseRequestBuilder):
10+
@classmethod
11+
def items_endpoint(cls, authenticator: Authenticator, board_ids: List[int] = None) -> "ItemsRequestBuilder":
12+
return cls().with_authenticator(authenticator).with_board_ids(board_ids)
13+
14+
@property
15+
def query_params(self):
16+
params = super().query_params or {}
17+
if self._board_ids:
18+
board_ids = ", ".join(list(map(str, self._board_ids)))
19+
board_ids_str = f",ids:[{board_ids}]"
20+
else:
21+
board_ids_str = ""
22+
23+
params["query"] = (
24+
"query{boards(limit:1%s){items_page(limit:20){cursor,items{id,name,assets{created_at,file_extension,file_size,id,name,original_geometry,public_url,uploaded_by{id},url,url_thumbnail},board{id,name},column_values{id,text,type,value,... on MirrorValue{display_value},... on BoardRelationValue{display_value},... on DependencyValue{display_value}},created_at,creator_id,group{id},parent_item{id},state,subscribers{id},updated_at,updates{id}}}}}"
25+
% board_ids_str
26+
)
27+
return params
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
from .error_response_builder import ErrorResponseBuilder
22
from .teams_response_builder import TeamsResponseBuilder
3+
from .items_response_builder import ItemsResponseBuilder
4+
from .boards_response_builder import BoardsResponseBuilder
35

4-
__all__ = ["ErrorResponseBuilder", "TeamsResponseBuilder"]
6+
__all__ = ["ErrorResponseBuilder", "TeamsResponseBuilder", "ItemsResponseBuilder", "BoardsResponseBuilder"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
from airbyte_cdk.test.mock_http.response_builder import HttpResponseBuilder, NestedPath, find_template
4+
5+
6+
class BoardsResponseBuilder(HttpResponseBuilder):
7+
@classmethod
8+
def boards_response(cls) -> "BoardsResponseBuilder":
9+
return cls(find_template("boards", __file__), NestedPath(["data", "boards"]), None)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
3+
from airbyte_cdk.test.mock_http.response_builder import HttpResponseBuilder, NestedPath, find_template
4+
5+
6+
class ItemsResponseBuilder(HttpResponseBuilder):
7+
@classmethod
8+
def items_response(cls) -> "ItemsResponseBuilder":
9+
return cls(find_template("items", __file__), NestedPath(["data", "boards", 0, "items_page", "items"]), None)
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
from .teams_record_builder import TeamsRecordBuilder
2+
from .items_record_builder import ItemsRecordBuilder
3+
from .boards_record_builder import BoardsRecordBuilder
24

3-
__all__ = ["TeamsRecordBuilder"]
5+
__all__ = ["TeamsRecordBuilder", "ItemsRecordBuilder", "BoardsRecordBuilder"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
from airbyte_cdk.test.mock_http.response_builder import FieldPath, NestedPath
4+
5+
from .record_builder import MondayRecordBuilder
6+
7+
8+
class BoardsRecordBuilder(MondayRecordBuilder):
9+
@classmethod
10+
def boards_record(cls) -> "BoardsRecordBuilder":
11+
record_template = cls.extract_record("boards", __file__, NestedPath(["data", "boards", 0]))
12+
return cls(record_template, FieldPath("id"), None)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
3+
from airbyte_cdk.test.mock_http.response_builder import FieldPath, NestedPath
4+
5+
from .record_builder import MondayRecordBuilder
6+
7+
8+
class ItemsRecordBuilder(MondayRecordBuilder):
9+
@classmethod
10+
def items_record(cls) -> "ItemsRecordBuilder":
11+
record_template = cls.extract_record("items", __file__, NestedPath(["data", "boards", 0, "items_page", "items", 0]))
12+
return cls(record_template, FieldPath("id"), None)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
from unittest import TestCase
3+
4+
from airbyte_cdk.test.mock_http import HttpMocker
5+
from airbyte_protocol.models import SyncMode
6+
7+
from .config import ConfigBuilder
8+
from .monday_requests import BoardsRequestBuilder
9+
from .monday_requests.request_authenticators import ApiTokenAuthenticator
10+
from .monday_responses import BoardsResponseBuilder
11+
from .monday_responses.records import BoardsRecordBuilder
12+
from .utils import read_stream
13+
14+
15+
class TestBoardsStreamFullRefresh(TestCase):
16+
def get_authenticator(self, config):
17+
return ApiTokenAuthenticator(api_token=config["credentials"]["api_token"])
18+
19+
@HttpMocker()
20+
def test_read_board_record(self, http_mocker):
21+
"""
22+
A full refresh sync of the boards stream without pagination or board filtering
23+
"""
24+
config = ConfigBuilder().with_api_token_credentials("api-token").build()
25+
api_token_authenticator = self.get_authenticator(config)
26+
27+
http_mocker.get(
28+
BoardsRequestBuilder.boards_endpoint(api_token_authenticator).build(),
29+
BoardsResponseBuilder.boards_response()
30+
.with_record(BoardsRecordBuilder.boards_record())
31+
.with_record(BoardsRecordBuilder.boards_record())
32+
.build(),
33+
)
34+
35+
output = read_stream("boards", SyncMode.full_refresh, config)
36+
assert len(output.records) == 2
37+
38+
@HttpMocker()
39+
def test_read_with_board_ids_filter(self, http_mocker):
40+
"""
41+
A full refresh sync of the boards stream with board filtering and without pagination
42+
"""
43+
board_ids = [123, 456]
44+
config = ConfigBuilder().with_api_token_credentials("api-token").with_board_ids(board_ids).build()
45+
api_token_authenticator = self.get_authenticator(config)
46+
47+
http_mocker.get(
48+
BoardsRequestBuilder.boards_endpoint(api_token_authenticator, board_ids).build(),
49+
BoardsResponseBuilder.boards_response().with_record(BoardsRecordBuilder.boards_record()).build(),
50+
)
51+
52+
output = read_stream("boards", SyncMode.full_refresh, config)
53+
assert len(output.records) == 1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
from unittest import TestCase
3+
4+
from airbyte_cdk.test.mock_http import HttpMocker
5+
from airbyte_protocol.models import SyncMode
6+
7+
from .config import ConfigBuilder
8+
from .monday_requests import ItemsRequestBuilder
9+
from .monday_requests.request_authenticators import ApiTokenAuthenticator
10+
from .monday_responses import ItemsResponseBuilder
11+
from .monday_responses.records import ItemsRecordBuilder
12+
from .utils import read_stream
13+
14+
15+
class TestItemsStreamFullRefresh(TestCase):
16+
def get_authenticator(self, config):
17+
return ApiTokenAuthenticator(api_token=config["credentials"]["api_token"])
18+
19+
@HttpMocker()
20+
def test_read_item_record(self, http_mocker):
21+
"""
22+
A full refresh sync of the items stream without pagination or board filtering
23+
"""
24+
config = ConfigBuilder().with_api_token_credentials("api-token").build()
25+
api_token_authenticator = self.get_authenticator(config)
26+
27+
http_mocker.get(
28+
ItemsRequestBuilder.items_endpoint(api_token_authenticator).build(),
29+
ItemsResponseBuilder.items_response()
30+
.with_record(ItemsRecordBuilder.items_record())
31+
.with_record(ItemsRecordBuilder.items_record())
32+
.build(),
33+
)
34+
35+
output = read_stream("items", SyncMode.full_refresh, config)
36+
assert len(output.records) == 2
37+
38+
@HttpMocker()
39+
def test_read_with_board_ids_filter(self, http_mocker):
40+
"""
41+
A full refresh sync of the items stream with board filtering and without pagination
42+
"""
43+
board_ids = [123, 456]
44+
config = ConfigBuilder().with_api_token_credentials("api-token").with_board_ids(board_ids).build()
45+
api_token_authenticator = self.get_authenticator(config)
46+
47+
http_mocker.get(
48+
ItemsRequestBuilder.items_endpoint(api_token_authenticator, board_ids).build(),
49+
ItemsResponseBuilder.items_response().with_record(ItemsRecordBuilder.items_record()).build(),
50+
)
51+
52+
output = read_stream("items", SyncMode.full_refresh, config)
53+
assert len(output.records) == 1

0 commit comments

Comments
 (0)