Skip to content

Commit eb1b32b

Browse files
committed
Update fivetran_api_client.py
1 parent 6ab9d16 commit eb1b32b

File tree

1 file changed

+82
-23
lines changed

1 file changed

+82
-23
lines changed

metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_api_client.py

+82-23
Original file line numberDiff line numberDiff line change
@@ -62,18 +62,17 @@ def _make_request(self, method: str, endpoint: str, **kwargs: Any) -> Dict:
6262
kwargs["headers"] = headers
6363

6464
try:
65+
logger.debug(f"Making {method} request to {url}")
6566
response = self._session.request(method, url, **kwargs)
6667
response.raise_for_status()
6768
return response.json()
6869
except requests.exceptions.HTTPError as e:
6970
logger.error(f"HTTP error occurred: {e}")
70-
if e.response.status_code == 404:
71-
logger.error(f"Endpoint not found: {url}")
72-
if "sync_history" in endpoint:
73-
logger.info("Attempting fallback to /sync endpoint")
74-
# Try with the correct endpoint
75-
new_endpoint = endpoint.replace("sync_history", "sync")
76-
return self._make_request(method, new_endpoint, **kwargs)
71+
# If we get a 405 (Method Not Allowed) error, log additional information
72+
if e.response.status_code == 405:
73+
logger.error(f"Method {method} not allowed for {url}")
74+
allowed_methods = e.response.headers.get("Allow", "unknown")
75+
logger.error(f"Allowed methods: {allowed_methods}")
7776
raise
7877

7978
def list_connectors(self) -> List[Dict]:
@@ -179,16 +178,47 @@ def list_connector_sync_history(
179178
self, connector_id: str, days: int = 7
180179
) -> List[Dict]:
181180
"""Get the sync history for a connector."""
182-
# Calculate the start time for the lookback period
183-
since_time = int(time.time()) - (days * 24 * 60 * 60)
184-
params = {"limit": 100, "since": since_time}
185-
186-
# Updated to use the correct endpoint
187-
response = self._make_request(
188-
"GET", f"/connectors/{connector_id}/sync", params=params
189-
)
181+
try:
182+
# First, try the "synchronization_history" endpoint (most likely correct)
183+
since_time = int(time.time()) - (days * 24 * 60 * 60)
184+
params = {"limit": 100, "since": since_time}
185+
186+
response = self._make_request(
187+
"GET",
188+
f"/connectors/{connector_id}/synchronization_history",
189+
params=params,
190+
)
191+
return response.get("data", {}).get("items", [])
192+
except requests.exceptions.HTTPError as e:
193+
if e.response.status_code == 404:
194+
logger.warning(
195+
"synchronization_history endpoint not found, trying alternative endpoints"
196+
)
190197

191-
return response.get("data", {}).get("items", [])
198+
# Try alternative endpoints
199+
try:
200+
# Try "history" endpoint
201+
response = self._make_request(
202+
"GET", f"/connectors/{connector_id}/history", params=params
203+
)
204+
return response.get("data", {}).get("items", [])
205+
except requests.exceptions.HTTPError:
206+
# As a last resort, try getting runs directly
207+
logger.warning(
208+
"history endpoint not found, trying to get connector runs"
209+
)
210+
try:
211+
response = self._make_request(
212+
"GET", f"/connectors/{connector_id}/runs", params=params
213+
)
214+
return response.get("data", {}).get("items", [])
215+
except requests.exceptions.HTTPError as e:
216+
logger.error(f"Failed to get sync history: {e}")
217+
# Return empty list if all attempts fail
218+
return []
219+
else:
220+
logger.error(f"Failed to get sync history: {e}")
221+
return []
192222

193223
def _parse_timestamp(self, iso_timestamp: Optional[str]) -> Optional[int]:
194224
"""Parse ISO timestamp to Unix timestamp."""
@@ -223,24 +253,53 @@ def extract_connector_metadata(
223253
# Convert sync jobs to our Job model
224254
jobs = []
225255
for job in sync_history:
226-
started_at = self._parse_timestamp(job.get("started_at"))
227-
completed_at = self._parse_timestamp(job.get("completed_at"))
256+
# Try different possible field names for start and end times
257+
started_at = None
258+
completed_at = None
259+
260+
# Check for different possible field names
261+
for start_field in ["started_at", "start_time", "created_at"]:
262+
if start_field in job:
263+
started_at = self._parse_timestamp(job.get(start_field))
264+
if started_at:
265+
break
266+
267+
for end_field in ["completed_at", "end_time", "finished_at", "updated_at"]:
268+
if end_field in job:
269+
completed_at = self._parse_timestamp(job.get(end_field))
270+
if completed_at:
271+
break
228272

229273
# Only include completed jobs
230274
if started_at and completed_at:
231-
status = job.get("status", "").upper()
275+
# Try different possible field names for status
276+
status = None
277+
for status_field in ["status", "state", "result"]:
278+
if status_field in job:
279+
status = job.get(status_field, "").upper()
280+
if status:
281+
break
282+
283+
if not status:
284+
continue
285+
232286
# Map Fivetran API status to our constants
233287
# API returns: "COMPLETED", "FAILED", "CANCELLED", etc.
234-
if status == "COMPLETED":
288+
if status in ["COMPLETED", "SUCCESS", "SUCCEEDED"]:
235289
status = Constant.SUCCESSFUL
236-
elif status == "FAILED":
290+
elif status in ["FAILED", "FAILURE", "ERROR"]:
237291
status = Constant.FAILURE_WITH_TASK
238-
elif status == "CANCELLED":
292+
elif status in ["CANCELLED", "CANCELED", "ABORTED", "STOPPED"]:
239293
status = Constant.CANCELED
294+
else:
295+
# Skip unknown statuses
296+
continue
240297

241298
jobs.append(
242299
Job(
243-
job_id=job.get("id", ""),
300+
job_id=job.get(
301+
"id", str(hash(str(job)))
302+
), # Use hash as fallback ID
244303
start_time=started_at,
245304
end_time=completed_at,
246305
status=status,

0 commit comments

Comments
 (0)