|
1 | 1 | import json
|
2 | 2 | import logging
|
3 |
| -from datetime import datetime, timezone |
| 3 | +from datetime import datetime |
4 | 4 | from functools import lru_cache
|
5 | 5 | from typing import Any, Dict, Iterable, List, Optional
|
6 | 6 |
|
@@ -107,6 +107,18 @@ class SupersetDataset(BaseModel):
|
107 | 107 | changed_on_utc: Optional[str] = None
|
108 | 108 | explore_url: Optional[str] = ""
|
109 | 109 |
|
| 110 | + @property |
| 111 | + def modified_dt(self) -> Optional[datetime]: |
| 112 | + if self.changed_on_utc: |
| 113 | + return dp.parse(self.changed_on_utc) |
| 114 | + return None |
| 115 | + |
| 116 | + @property |
| 117 | + def modified_ts(self) -> Optional[int]: |
| 118 | + if self.modified_dt: |
| 119 | + return int(self.modified_dt.timestamp() * 1000) |
| 120 | + return None |
| 121 | + |
110 | 122 |
|
111 | 123 | class SupersetConfig(
|
112 | 124 | StatefulIngestionConfigBase, EnvConfigMixin, PlatformInstanceConfigMixin
|
@@ -149,6 +161,7 @@ class SupersetConfig(
|
149 | 161 | )
|
150 | 162 |
|
151 | 163 | class Config:
|
| 164 | + # This is required to allow preset configs to get parsed |
152 | 165 | extra = "allow"
|
153 | 166 |
|
154 | 167 | @validator("connect_uri", "display_uri")
|
@@ -219,7 +232,6 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig):
|
219 | 232 | graph=self.ctx.graph,
|
220 | 233 | )
|
221 | 234 | self.session = self.login()
|
222 |
| - # Default to postgres field type mappings |
223 | 235 |
|
224 | 236 | def login(self) -> requests.Session:
|
225 | 237 | login_response = requests.post(
|
@@ -258,7 +270,6 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
|
258 | 270 | config = SupersetConfig.parse_obj(config_dict)
|
259 | 271 | return cls(ctx, config)
|
260 | 272 |
|
261 |
| - @lru_cache(maxsize=None) |
262 | 273 | def paginate_entity_api_results(self, entity_type, page_size=100):
|
263 | 274 | current_page = 0
|
264 | 275 | total_items = page_size
|
@@ -336,7 +347,7 @@ def get_datasource_urn_from_id(
|
336 | 347 |
|
337 | 348 | if database_id and table_name:
|
338 | 349 | return make_dataset_urn(
|
339 |
| - platform=self.platform, |
| 350 | + platform=platform, |
340 | 351 | name=".".join(
|
341 | 352 | name for name in [database_name, schema_name, table_name] if name
|
342 | 353 | ),
|
@@ -585,18 +596,15 @@ def construct_dataset_from_dataset_data(
|
585 | 596 | datasource_urn = self.get_datasource_urn_from_id(
|
586 | 597 | dataset_response, self.platform
|
587 | 598 | )
|
588 |
| - if dataset.changed_on_utc: |
589 |
| - modified_dt = dp.parse(dataset.changed_on_utc) |
590 |
| - else: |
591 |
| - modified_dt = datetime.now(timezone.utc) |
592 | 599 |
|
593 |
| - modified_ts = int(modified_dt.timestamp() * 1000) |
594 | 600 | dataset_url = f"{self.config.display_uri}{dataset.explore_url or ''}"
|
595 | 601 |
|
596 | 602 | dataset_info = DatasetPropertiesClass(
|
597 | 603 | name=dataset.table_name,
|
598 | 604 | description="",
|
599 |
| - lastModified=TimeStamp(time=modified_ts), |
| 605 | + lastModified=TimeStamp(time=dataset.modified_ts) |
| 606 | + if dataset.modified_ts |
| 607 | + else None, |
600 | 608 | externalUrl=dataset_url,
|
601 | 609 | )
|
602 | 610 | aspects_items: List[Any] = []
|
|
0 commit comments