Skip to content

Commit c3f33a7

Browse files
✨feat(source-google-drive): Introduce File Based Stream Permissions Reader (#55689)
1 parent 0fb16e3 commit c3f33a7

File tree

8 files changed

+249
-184
lines changed

8 files changed

+249
-184
lines changed

airbyte-integrations/connectors/source-google-drive/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ data:
77
connectorSubtype: file
88
connectorType: source
99
definitionId: 9f8dda77-1048-4368-815b-269bf54ee9b8
10-
dockerImageTag: 0.2.4
10+
dockerImageTag: 0.3.0
1111
dockerRepository: airbyte/source-google-drive
1212
githubIssueLabel: source-google-drive
1313
icon: google-drive.svg

airbyte-integrations/connectors/source-google-drive/poetry.lock

+21-21
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

airbyte-integrations/connectors/source-google-drive/pyproject.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "0.2.4"
6+
version = "0.3.0"
77
name = "source-google-drive"
88
description = "Source implementation for Google Drive."
99
authors = [ "Airbyte <[email protected]>",]
@@ -21,7 +21,7 @@ google-api-python-client = "==2.104.0"
2121
google-auth-httplib2 = "==0.1.1"
2222
google-auth-oauthlib = "==1.1.0"
2323
google-api-python-client-stubs = "==1.18.0"
24-
airbyte-cdk = {extras = ["file-based"], version = "^6.33.6"}
24+
airbyte-cdk = {extras = ["file-based"], version = "^6.38.5"}
2525

2626

2727
[tool.poetry.scripts]

airbyte-integrations/connectors/source-google-drive/source_google_drive/source.py

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource
1111
from airbyte_cdk.sources.file_based.stream.cursor.default_file_based_cursor import DefaultFileBasedCursor
1212
from source_google_drive.spec import SourceGoogleDriveSpec
13+
from source_google_drive.stream_permissions_reader import SourceGoogleDriveStreamPermissionsReader
1314
from source_google_drive.stream_reader import SourceGoogleDriveStreamReader
1415

1516

@@ -22,6 +23,7 @@ def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional
2223
config=config,
2324
state=state,
2425
cursor_cls=DefaultFileBasedCursor,
26+
stream_permissions_reader=SourceGoogleDriveStreamPermissionsReader(),
2527
)
2628

2729
def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
3+
import json
4+
import logging
5+
import uuid
6+
from datetime import datetime
7+
from typing import Any, Dict, Iterator, List, Tuple
8+
9+
import pytz
10+
from google.oauth2 import credentials, service_account
11+
from googleapiclient.discovery import build
12+
13+
from airbyte_cdk import AirbyteTracedException, FailureType
14+
from airbyte_cdk.sources.file_based.file_based_stream_permissions_reader import AbstractFileBasedStreamPermissionsReader
15+
from airbyte_cdk.sources.streams.core import package_name_from_class
16+
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
17+
from source_google_drive.exceptions import ErrorFetchingMetadata
18+
from source_google_drive.spec import RemoteIdentity, RemoteIdentityType, RemotePermissions, SourceGoogleDriveSpec
19+
20+
21+
DRIVE_SERVICE_SCOPES = [
22+
"https://www.googleapis.com/auth/admin.directory.group.readonly",
23+
"https://www.googleapis.com/auth/admin.directory.group.member.readonly",
24+
"https://www.googleapis.com/auth/admin.directory.user.readonly",
25+
]
26+
27+
PUBLIC_PERMISSION_IDS = [
28+
"anyoneWithLink",
29+
"anyoneCanFind",
30+
"domainCanFind",
31+
"domainWithLink",
32+
]
33+
34+
35+
def datetime_now() -> datetime:
36+
return datetime.now(pytz.UTC)
37+
38+
39+
class SourceGoogleDriveStreamPermissionsReader(AbstractFileBasedStreamPermissionsReader):
40+
def __init__(self):
41+
super().__init__()
42+
self._drive_service = None
43+
self._directory_service = None
44+
45+
@property
46+
def config(self) -> SourceGoogleDriveSpec:
47+
return self._config
48+
49+
@config.setter
50+
def config(self, value: SourceGoogleDriveSpec):
51+
assert isinstance(value, SourceGoogleDriveSpec)
52+
self._config = value
53+
54+
def _build_google_service(self, service_name: str, version: str, scopes: List[str] = None):
55+
if self.config is None:
56+
# We shouldn't hit this; config should always get set before attempting to
57+
# list or read files.
58+
raise ValueError(f"Source config is missing; cannot create the Google {service_name} client.")
59+
try:
60+
if self.config.credentials.auth_type == "Client":
61+
creds = credentials.Credentials.from_authorized_user_info(self.config.credentials.dict())
62+
else:
63+
creds = service_account.Credentials.from_service_account_info(
64+
json.loads(self.config.credentials.service_account_info), scopes=scopes
65+
)
66+
google_service = build(service_name, version, credentials=creds)
67+
except Exception as e:
68+
raise AirbyteTracedException(
69+
internal_message=str(e),
70+
message=f"Could not authenticate with Google {service_name}. Please check your credentials.",
71+
failure_type=FailureType.config_error,
72+
exception=e,
73+
)
74+
75+
return google_service
76+
77+
@property
78+
def google_drive_service(self):
79+
if self._drive_service is None:
80+
self._drive_service = self._build_google_service("drive", "v3")
81+
return self._drive_service
82+
83+
@property
84+
def google_directory_service(self):
85+
if self._directory_service is None:
86+
self._directory_service = self._build_google_service("admin", "directory_v1", DRIVE_SERVICE_SCOPES)
87+
return self._directory_service
88+
89+
def _get_looping_google_api_list_response(
90+
self, service: Any, key: str, args: dict[str, Any], logger: logging.Logger
91+
) -> Iterator[dict[str, Any]]:
92+
try:
93+
looping = True
94+
next_page_token: str | None = None
95+
while looping:
96+
rsp = service.list(pageToken=next_page_token, **args).execute()
97+
next_page_token = rsp.get("nextPageToken")
98+
items: list[dict[str, Any]] = rsp.get(key)
99+
100+
if items is None or len(items) == 0:
101+
looping = False
102+
break
103+
104+
if rsp.get("nextPageToken") is None:
105+
looping = False
106+
else:
107+
next_page_token = rsp.get("nextPageToken")
108+
109+
for item in items:
110+
yield item
111+
except Exception as e:
112+
logger.error(f"There was an error listing {key} with {args}: {str(e)}")
113+
raise e
114+
115+
def _to_remote_file_identity(self, identity: dict[str, Any]) -> RemoteIdentity | None:
116+
if identity.get("id") in PUBLIC_PERMISSION_IDS:
117+
return None
118+
if identity.get("deleted") is True:
119+
return None
120+
121+
return RemoteIdentity(
122+
modified_at=datetime.now(),
123+
id=uuid.uuid4(),
124+
remote_id=identity.get("emailAddress"),
125+
name=identity.get("name"),
126+
email_address=identity.get("emailAddress"),
127+
type=identity.get("type"),
128+
description=None,
129+
)
130+
131+
def get_file_permissions(self, file_id: str, file_name: str, logger: logging.Logger) -> Tuple[List[RemoteIdentity], bool]:
132+
"""
133+
Retrieves the permissions of a file in Google Drive and checks for public access.
134+
135+
Args:
136+
file_id (str): The file to get permissions for.
137+
file_name (str): The name of the file to get permissions for.
138+
logger (logging.Logger): Logger for debugging and information.
139+
140+
Returns:
141+
Tuple(List[RemoteFileIdentity], boolean): A list of RemoteFileIdentity objects containing permission details.
142+
"""
143+
try:
144+
request = self.google_drive_service.permissions().list(
145+
fileId=file_id,
146+
fields="permissions, permissions/role, permissions/type, permissions/id, permissions/emailAddress",
147+
supportsAllDrives=True,
148+
)
149+
response = request.execute()
150+
permissions = response.get("permissions", [])
151+
is_public = False
152+
153+
remote_identities = []
154+
155+
for p in permissions:
156+
identity = self._to_remote_file_identity(p)
157+
if p.get("id") in PUBLIC_PERMISSION_IDS:
158+
is_public = True
159+
if identity is not None:
160+
remote_identities.append(identity)
161+
162+
return remote_identities, is_public
163+
except Exception as e:
164+
raise ErrorFetchingMetadata(f"An error occurred while retrieving file permissions: {str(e)}")
165+
166+
def get_file_acl_permissions(self, file: Any, logger: logging.Logger) -> Dict[str, Any]:
167+
remote_identities, is_public = self.get_file_permissions(file.id, file_name=file.uri, logger=logger)
168+
return RemotePermissions(
169+
id=file.id,
170+
file_path=file.uri,
171+
allowed_identity_remote_ids=[p.remote_id for p in remote_identities],
172+
publicly_accessible=is_public,
173+
).dict(exclude_none=True)
174+
175+
def load_identity_groups(self, logger: logging.Logger) -> Iterator[Dict[str, Any]]:
176+
domain = self.config.delivery_method.domain
177+
if not domain:
178+
logger.info("No domain provided. Trying to fetch identities from the user workspace.")
179+
api_args = {"customer": "my_customer"}
180+
else:
181+
api_args = {"domain": domain}
182+
183+
users_api = self.google_directory_service.users()
184+
groups_api = self.google_directory_service.groups()
185+
members_api = self.google_directory_service.members()
186+
187+
for user in self._get_looping_google_api_list_response(users_api, "users", args=api_args, logger=logger):
188+
rfp = RemoteIdentity(
189+
id=uuid.uuid4(),
190+
remote_id=user["primaryEmail"],
191+
name=user["name"]["fullName"] if user["name"] is not None else None,
192+
email_address=user["primaryEmail"],
193+
member_email_addresses=[x["address"] for x in user["emails"]],
194+
type=RemoteIdentityType.USER,
195+
modified_at=datetime_now(),
196+
)
197+
yield rfp.dict()
198+
199+
for group in self._get_looping_google_api_list_response(groups_api, "groups", args=api_args, logger=logger):
200+
rfp = RemoteIdentity(
201+
id=uuid.uuid4(),
202+
remote_id=group["email"],
203+
name=group["name"],
204+
email_address=group["email"],
205+
type=RemoteIdentityType.GROUP,
206+
modified_at=datetime_now(),
207+
)
208+
209+
for member in self._get_looping_google_api_list_response(members_api, "members", {"groupKey": group["id"]}, logger):
210+
rfp.member_email_addresses = rfp.member_email_addresses or []
211+
rfp.member_email_addresses.append(member["email"])
212+
213+
yield rfp.dict()
214+
215+
@property
216+
def file_permissions_schema(self) -> Dict[str, Any]:
217+
return ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("file_permissions")
218+
219+
@property
220+
def identities_schema(self) -> Dict[str, Any]:
221+
return ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("identities")

0 commit comments

Comments
 (0)