52
52
UpstreamLineageClass ,
53
53
_Aspect as AspectAbstract ,
54
54
)
55
- from datahub .metadata .urns import DataFlowUrn , DatasetUrn , TagUrn
55
+ from datahub .metadata .urns import (
56
+ ChartUrn ,
57
+ DashboardUrn ,
58
+ DataFlowUrn ,
59
+ DataJobUrn ,
60
+ DataPlatformUrn ,
61
+ DatasetUrn ,
62
+ TagUrn ,
63
+ )
56
64
from datahub .utilities .urn_encoder import UrnEncoder
57
65
58
66
logger = logging .getLogger (__name__ )
@@ -119,7 +127,7 @@ def parse_ts_millis(ts: Optional[float]) -> Optional[datetime]:
119
127
def make_data_platform_urn (platform : str ) -> str :
120
128
if platform .startswith ("urn:li:dataPlatform:" ):
121
129
return platform
122
- return f"urn:li:dataPlatform: { platform } "
130
+ return DataPlatformUrn . create_from_id ( platform ). urn ()
123
131
124
132
125
133
def make_dataset_urn (platform : str , name : str , env : str = DEFAULT_ENV ) -> str :
@@ -236,7 +244,7 @@ def make_user_urn(username: str) -> str:
236
244
Makes a user urn if the input is not a user or group urn already
237
245
"""
238
246
return (
239
- f"urn:li:corpuser:{ username } "
247
+ f"urn:li:corpuser:{ UrnEncoder . encode_string ( username ) } "
240
248
if not username .startswith (("urn:li:corpuser:" , "urn:li:corpGroup:" ))
241
249
else username
242
250
)
@@ -249,7 +257,7 @@ def make_group_urn(groupname: str) -> str:
249
257
if groupname and groupname .startswith (("urn:li:corpGroup:" , "urn:li:corpuser:" )):
250
258
return groupname
251
259
else :
252
- return f"urn:li:corpGroup:{ groupname } "
260
+ return f"urn:li:corpGroup:{ UrnEncoder . encode_string ( groupname ) } "
253
261
254
262
255
263
def make_tag_urn (tag : str ) -> str :
@@ -301,7 +309,12 @@ def make_data_flow_urn(
301
309
302
310
303
311
def make_data_job_urn_with_flow (flow_urn : str , job_id : str ) -> str :
304
- return f"urn:li:dataJob:({ flow_urn } ,{ job_id } )"
312
+ data_flow_urn = DataFlowUrn .from_string (flow_urn )
313
+ data_job_urn = DataJobUrn .create_from_ids (
314
+ data_flow_urn = data_flow_urn .urn (),
315
+ job_id = job_id ,
316
+ )
317
+ return data_job_urn .urn ()
305
318
306
319
307
320
def make_data_process_instance_urn (dataProcessInstanceId : str ) -> str :
@@ -324,10 +337,11 @@ def make_dashboard_urn(
324
337
platform : str , name : str , platform_instance : Optional [str ] = None
325
338
) -> str :
326
339
# FIXME: dashboards don't currently include data platform urn prefixes.
327
- if platform_instance :
328
- return f"urn:li:dashboard:({ platform } ,{ platform_instance } .{ name } )"
329
- else :
330
- return f"urn:li:dashboard:({ platform } ,{ name } )"
340
+ return DashboardUrn .create_from_ids (
341
+ platform = platform ,
342
+ name = name ,
343
+ platform_instance = platform_instance ,
344
+ ).urn ()
331
345
332
346
333
347
def dashboard_urn_to_key (dashboard_urn : str ) -> Optional [DashboardKeyClass ]:
@@ -342,10 +356,11 @@ def make_chart_urn(
342
356
platform : str , name : str , platform_instance : Optional [str ] = None
343
357
) -> str :
344
358
# FIXME: charts don't currently include data platform urn prefixes.
345
- if platform_instance :
346
- return f"urn:li:chart:({ platform } ,{ platform_instance } .{ name } )"
347
- else :
348
- return f"urn:li:chart:({ platform } ,{ name } )"
359
+ return ChartUrn .create_from_ids (
360
+ platform = platform ,
361
+ name = name ,
362
+ platform_instance = platform_instance ,
363
+ ).urn ()
349
364
350
365
351
366
def chart_urn_to_key (chart_urn : str ) -> Optional [ChartKeyClass ]:
0 commit comments