Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source-Monday): Update CDK to v6 #55225

Open
wants to merge 30 commits into
base: master
Choose a base branch
from

Conversation

tolik0
Copy link
Contributor

@tolik0 tolik0 commented Mar 6, 2025

What

Update CDK to v6.

How

  • Migrated boards and items stream to StateDelegatingStream.
  • Update custom components to match abstract classes from the CDK

User Impact

The sync should become quicker as the source is now concurrent.

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

Sorry, something went wrong.

Copy link

vercel bot commented Mar 6, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
airbyte-docs ✅ Ready (Inspect) Visit Preview 💬 Add feedback Mar 21, 2025 1:35pm

@natikgadzhi
Copy link
Contributor

@tolik0 is this ready to go? If so, @dbgold17 can you review this and see if we should merge this first and apply your changes on top, so you get to work with the new CDK?

Copy link
Contributor

@dbgold17 dbgold17 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @tolik0 , I left a bunch of comments, a lot of small things and then some which have to do with me not understanding the changes you're making. Would you be able to write a couple sentences on the PR description about the structural changes you needed to make in order to update the CDK to version 6? It would be super helpful for me and for others looking at this code in the future :)

Also, how have you tested these changes? Do you feel confident that they won't break anything for uses?

python = "^3.9,<3.12"
airbyte-cdk = "0.78.6" # Breaks with newer versions of the CDK
python = ">=3.10,<3.12"
airbyte_cdk = { path = "/home/anatolii/airbytehq/airbyte/airbyte-cdk/python/airbyte-python-cdk/", develop = true }
Copy link
Contributor

@dbgold17 dbgold17 Mar 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be replaced with a public package version right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added dev version

@@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 80a54ea2-9959-4040-aac1-eee42423ec9b
dockerImageTag: 2.1.13
dockerImageTag: 2.2.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there consensus about whether to bump minor or patch number in this version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the minor update

return extracted if isinstance(extracted, list) else [extracted]
return []
decoded_response = self.decoder.decode(response)
for response_body in decoded_response:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you now looping over this response where we weren't before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underlying CDK class changed

response: requests.Response,
last_page_size: int,
last_record: Optional[Record],
last_page_token_value: Optional[Any]) -> Optional[Tuple[Optional[int], Optional[int]]]:
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update docstring to conform with new interface.

@@ -72,7 +78,11 @@ def __post_init__(self, parameters: Mapping[str, Any]):
self._page: Optional[int] = self.start_from_page
self._sub_page: Optional[int] = self.start_from_page

def next_page_token(self, response, last_records: List[Mapping[str, Any]]) -> Optional[Tuple[Optional[int], Optional[int]]]:
def next_page_token(self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change necessary? It doesn't seem like you're using any of these arguments.

If it's only in order to conform to some interface spec, can you note that in a comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the underlying class in the CDK changed, so we need to align with it

response_decoded = self.decoder.decode(response)
for response_body in response_decoded:
if not response_body["data"]["boards"]:
yield from []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you actually want to return a generator object with nothing in it here or just continue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -37,32 +37,30 @@ class MondayActivityExtractor(RecordExtractor):
decoder: Decoder = JsonDecoder(parameters={})

def extract_records(self, response: requests.Response) -> List[Record]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the return type annotation be updated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@tolik0 tolik0 marked this pull request as draft March 10, 2025 09:55
@tolik0
Copy link
Contributor Author

tolik0 commented Mar 10, 2025

@dbgold17 Sorry, this PR is not in review yet. I still need to add StateDelegatingRetriever, which hasn’t been merged yet.
I'm currently busy handling P1 issues, but I’ll ping you once it’s ready.
Thank you!

@dbgold17
Copy link
Contributor

@dbgold17 Sorry, this PR is not in review yet. I still need to add StateDelegatingRetriever, which hasn’t been merged yet. I'm currently busy handling P1 issues, but I’ll ping you once it’s ready. Thank you!

Sounds good, sorry for the premature review!

@tolik0
Copy link
Contributor Author

tolik0 commented Mar 18, 2025

/format-fix

Format-fix job started... Check job output.

✅ Changes applied successfully. (1264ab2)

@tolik0 tolik0 marked this pull request as ready for review March 18, 2025 15:25
@tolik0
Copy link
Contributor Author

tolik0 commented Mar 19, 2025

/format-fix

Format-fix job started... Check job output.

✅ Changes applied successfully. (39c39bf)

@tolik0
Copy link
Contributor Author

tolik0 commented Mar 19, 2025

/format-fix

Format-fix job started... Check job output.

✅ Changes applied successfully. (727c1a1)

@tolik0
Copy link
Contributor Author

tolik0 commented Mar 20, 2025

/format-fix

Format-fix job started... Check job output.

✅ Changes applied successfully. (9d4234d)

@tolik0
Copy link
Contributor Author

tolik0 commented Mar 20, 2025

Regression test results:
Screenshot from 2025-03-20 20-25-09
Screenshot from 2025-03-20 20-24-52
Screenshot from 2025-03-20 20-24-37
Screenshot from 2025-03-20 12-32-24
Screenshot from 2025-03-19 18-52-19

… into tolik0/source-monday/add-grouping-partition-router
@tolik0 tolik0 requested a review from brianjlai March 20, 2025 18:36
Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one suggestions on refactoring try_extract_records() to process records iteratively since the underlying call became a generator, but otherwise no other comments and looks good to me!

return [x for x in extracted if x]
return extracted if isinstance(extracted, list) else [extracted]
return []
decoded_response = self.decoder.decode(response)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because self.decoder.decode(response) has been changed to become a generator, can we refactor this method to also iterate over the generator and yield from extracted below instead of storing them in the result list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -117,4 +117,4 @@ def extract_records(self, response: requests.Response) -> List[Record]:
if display_value and not text:
values["text"] = display_value

return result
yield from result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to the above comment, if we refactor self.try_extract_records() to be an iterable, we should also yield record at the end of each iteration of the for loop so we don't have to keep everything in memory with result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

concurrency_level:
type: ConcurrencyLevel
default_concurrency: "{{ config.get('num_workers', 4) }}"
max_concurrency: 40
Copy link
Contributor

@brianjlai brianjlai Mar 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Monday actually has a pretty detailed section on rate limits: https://developer.monday.com/api-reference/docs/rate-limits

40 actually seems like an acceptable threshold since they call out only allowing for at most 40 concurrent requests at any given moment. But let's directly reference https://developer.monday.com/api-reference/docs/rate-limits#concurrency-limit this in the comment

@tolik0
Copy link
Contributor Author

tolik0 commented Mar 21, 2025

/format-fix

Format-fix job started... Check job output.

✅ Changes applied successfully. (6adeeb1)

Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i had a couple small questions, but i'm fine with just answering or addressing and no need to block. :shipit:

else:
path = [p.eval(self.config) for p in field_path]
if "*" in path:
extracted = dpath.values(body, path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit/question, why the switch to use dpath.values() instead of dpath.util.values()? same for the below get() as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated according to the latest implementation of DpathExtractor. Here is the PR for the util change: #38847

result = self.try_extract_records(response, self.field_path_incremental)

for record in result:
if "updated_at" in record:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we have to edit some of the fields on each record object anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to transformations

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation connectors/source/monday
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants