Skip to content

Commit a06a229

Browse files
authored
feat(ingest/datahub): use stream_results with mysql (#12278)
1 parent 30a77c0 commit a06a229

File tree

4 files changed

+15
-19
lines changed

4 files changed

+15
-19
lines changed

metadata-ingestion/setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@
461461
"mssql-odbc": sql_common | mssql_common | {"pyodbc"},
462462
"mysql": mysql,
463463
# mariadb should have same dependency as mysql
464-
"mariadb": sql_common | {"pymysql>=1.0.2"},
464+
"mariadb": sql_common | mysql,
465465
"okta": {"okta~=1.7.0", "nest-asyncio"},
466466
"oracle": sql_common | {"oracledb"},
467467
"postgres": sql_common | postgres_common,

metadata-ingestion/src/datahub/ingestion/source/datahub/config.py

+10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
from typing import Optional, Set
33

4+
import pydantic
45
from pydantic import Field, root_validator
56

67
from datahub.configuration.common import AllowDenyPattern
@@ -119,3 +120,12 @@ def check_ingesting_data(cls, values):
119120
" Please specify at least one of `database_connection` or `kafka_connection`, ideally both."
120121
)
121122
return values
123+
124+
@pydantic.validator("database_connection")
125+
def validate_mysql_scheme(
126+
cls, v: SQLAlchemyConnectionConfig
127+
) -> SQLAlchemyConnectionConfig:
128+
if "mysql" in v.scheme:
129+
if v.scheme != "mysql+pymysql":
130+
raise ValueError("For MySQL, the scheme must be mysql+pymysql.")
131+
return v

metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py

+3-17
Original file line numberDiff line numberDiff line change
@@ -151,31 +151,17 @@ def execute_server_cursor(
151151
self, query: str, params: Dict[str, Any]
152152
) -> Iterable[Dict[str, Any]]:
153153
with self.engine.connect() as conn:
154-
if self.engine.dialect.name == "postgresql":
154+
if self.engine.dialect.name in ["postgresql", "mysql", "mariadb"]:
155155
with conn.begin(): # Transaction required for PostgreSQL server-side cursor
156+
# Note that stream_results=True is mainly supported by PostgreSQL and MySQL-based dialects.
157+
# https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Connection.execution_options.params.stream_results
156158
conn = conn.execution_options(
157159
stream_results=True,
158160
yield_per=self.config.database_query_batch_size,
159161
)
160162
result = conn.execute(query, params)
161163
for row in result:
162164
yield dict(row)
163-
elif self.engine.dialect.name == "mysql": # MySQL
164-
import MySQLdb
165-
166-
with contextlib.closing(
167-
conn.connection.cursor(MySQLdb.cursors.SSCursor)
168-
) as cursor:
169-
logger.debug(f"Using Cursor type: {cursor.__class__.__name__}")
170-
cursor.execute(query, params)
171-
172-
columns = [desc[0] for desc in cursor.description]
173-
while True:
174-
rows = cursor.fetchmany(self.config.database_query_batch_size)
175-
if not rows:
176-
break # Use break instead of return in generator
177-
for row in rows:
178-
yield dict(zip(columns, row))
179165
else:
180166
raise ValueError(f"Unsupported dialect: {self.engine.dialect.name}")
181167

metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ def _get_database_workunits(
130130
self._commit_progress(i)
131131

132132
def _get_kafka_workunits(
133-
self, from_offsets: Dict[int, int], soft_deleted_urns: List[str] = []
133+
self, from_offsets: Dict[int, int], soft_deleted_urns: List[str]
134134
) -> Iterable[MetadataWorkUnit]:
135135
if self.config.kafka_connection is None:
136136
return

0 commit comments

Comments
 (0)