Skip to content

Commit 149b840

Browse files
treff7essleeperdeep
authored andcommitted
fix(ingest/datahub): Use server side cursor instead of local one (datahub-project#12129)
1 parent 76ac477 commit 149b840

File tree

1 file changed

+41
-21
lines changed

1 file changed

+41
-21
lines changed

metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py

+41-21
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,47 @@ def query(self) -> str:
147147
version
148148
"""
149149

150+
def execute_server_cursor(
151+
self, query: str, params: Dict[str, Any]
152+
) -> Iterable[Dict[str, Any]]:
153+
with self.engine.connect() as conn:
154+
if self.engine.dialect.name == "postgresql":
155+
with conn.begin(): # Transaction required for PostgreSQL server-side cursor
156+
conn = conn.execution_options(
157+
stream_results=True,
158+
yield_per=self.config.database_query_batch_size,
159+
)
160+
result = conn.execute(query, params)
161+
for row in result:
162+
yield dict(row)
163+
elif self.engine.dialect.name == "mysql": # MySQL
164+
import MySQLdb
165+
166+
with contextlib.closing(
167+
conn.connection.cursor(MySQLdb.cursors.SSCursor)
168+
) as cursor:
169+
logger.debug(f"Using Cursor type: {cursor.__class__.__name__}")
170+
cursor.execute(query, params)
171+
172+
columns = [desc[0] for desc in cursor.description]
173+
while True:
174+
rows = cursor.fetchmany(self.config.database_query_batch_size)
175+
if not rows:
176+
break # Use break instead of return in generator
177+
for row in rows:
178+
yield dict(zip(columns, row))
179+
else:
180+
raise ValueError(f"Unsupported dialect: {self.engine.dialect.name}")
181+
182+
def _get_rows(
183+
self, from_createdon: datetime, stop_time: datetime
184+
) -> Iterable[Dict[str, Any]]:
185+
params = {
186+
"exclude_aspects": list(self.config.exclude_aspects),
187+
"since_createdon": from_createdon.strftime(DATETIME_FORMAT),
188+
}
189+
yield from self.execute_server_cursor(self.query, params)
190+
150191
def get_aspects(
151192
self, from_createdon: datetime, stop_time: datetime
152193
) -> Iterable[Tuple[MetadataChangeProposalWrapper, datetime]]:
@@ -159,27 +200,6 @@ def get_aspects(
159200
if mcp:
160201
yield mcp, row["createdon"]
161202

162-
def _get_rows(
163-
self, from_createdon: datetime, stop_time: datetime
164-
) -> Iterable[Dict[str, Any]]:
165-
with self.engine.connect() as conn:
166-
with contextlib.closing(conn.connection.cursor()) as cursor:
167-
cursor.execute(
168-
self.query,
169-
{
170-
"exclude_aspects": list(self.config.exclude_aspects),
171-
"since_createdon": from_createdon.strftime(DATETIME_FORMAT),
172-
},
173-
)
174-
175-
columns = [desc[0] for desc in cursor.description]
176-
while True:
177-
rows = cursor.fetchmany(self.config.database_query_batch_size)
178-
if not rows:
179-
return
180-
for row in rows:
181-
yield dict(zip(columns, row))
182-
183203
def get_soft_deleted_rows(self) -> Iterable[Dict[str, Any]]:
184204
"""
185205
Fetches all soft-deleted entities from the database.

0 commit comments

Comments
 (0)