Skip to content

feat(ingestion/superset): add timeout values to config to prevent hanging queries from blocking ingestion #2152

feat(ingestion/superset): add timeout values to config to prevent hanging queries from blocking ingestion

feat(ingestion/superset): add timeout values to config to prevent hanging queries from blocking ingestion #2152

GitHub Actions / Unit Test Results (Dagster Plugin) failed Mar 16, 2025 in 0s

1 fail, 1 pass in 8s

2 tests  ±0   1 ✅  - 1   8s ⏱️ -1s
1 suites ±0   0 💤 ±0 
1 files   ±0   1 ❌ +1 

Results for commit 1a0da83. ± Comparison against earlier commit 672ba6a.

Annotations

Check warning on line 0 in tests.unit.test_dagster

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results (Dagster Plugin)

test_emit_metadata (tests.unit.test_dagster) failed

artifacts/Test Results (dagster Plugin 3.11)/metadata-ingestion-modules/dagster-plugin/junit.quick.xml [took 0s]
Raw output
NotImplementedError: DeltaInfoOperator needs to define a normalize_value_for_hashing method to be compatible with ignore_order=True or iterable_compare_func.
mock_emit = <MagicMock name='DataHubGraph' spec='DataHubGraph' id='139835227558224'>
mock_uuid = <MagicMock name='uuid4' id='139835227031056'>

    @patch.object(uuid, "uuid4", side_effect=TEST_UUIDS)
    @patch("datahub_dagster_plugin.sensors.datahub_sensors.DataHubGraph", autospec=True)
    @freeze_time(FROZEN_TIME)
    def test_emit_metadata(mock_emit: Mock, mock_uuid: Mock) -> None:
        mock_emitter = Mock()
        mock_emit.return_value = mock_emitter
    
        @op(
            out={
                "result": Out(
                    metadata={
                        "datahub.outputs": [
                            "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableB,PROD)"
                        ]
                    }
                )
            }
        )
        def extract():
            results = [1, 2, 3, 4]
            return results
    
        @op(
            ins={
                "data": In(
                    metadata={
                        "datahub.inputs": [
                            "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)"
                        ]
                    }
                )
            }
        )
        def transform(data):
            results = []
            for each in data:
                results.append(str(each))
            return results
    
        @job
        def etl():
            transform(extract())
    
        instance = DagsterInstance.ephemeral()
        test_run_id = "12345678123456781234567812345678"
        result = etl.execute_in_process(instance=instance, run_id=test_run_id)
    
        # retrieve the DagsterRun
        dagster_run = result.dagster_run
    
        # retrieve a success event from the completed execution
        dagster_event = result.get_run_success_event()
    
        # create the context
        run_status_sensor_context = build_run_status_sensor_context(
            sensor_name="my_email_sensor",
            dagster_instance=instance,
            dagster_run=dagster_run,
            dagster_event=dagster_event,
        )
    
        with tempfile.TemporaryDirectory() as tmp_path:
            DatahubSensors()._emit_metadata(run_status_sensor_context)
            mcpws: List[Dict] = []
            for mock_call in mock_emitter.method_calls:
                if not mock_call.args:
                    continue
                mcpw = mock_call.args[0]
                if isinstance(mcpw, MetadataChangeProposalWrapper):
                    mcpws.append(mcpw.to_obj(simplified_structure=True))
    
            with open(f"{tmp_path}/test_emit_metadata_mcps.json", "w") as f:
                json_object = json.dumps(mcpws, indent=2)
                f.write(json_object)
    
>           assert_metadata_files_equal(
                output_path=pathlib.Path(f"{tmp_path}/test_emit_metadata_mcps.json"),
                golden_path=pathlib.Path(
                    "tests/unit/golden/golden_test_emit_metadata_mcps.json"
                ),
                ignore_paths=["root[*]['systemMetadata']['created']"],
            )

tests/unit/test_dagster.py:171: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../metadata-ingestion/src/datahub/testing/compare_metadata_json.py:90: in assert_metadata_files_equal
    diff = diff_metadata_json(output, golden, ignore_paths, ignore_order=ignore_order)
../../metadata-ingestion/src/datahub/testing/compare_metadata_json.py:126: in diff_metadata_json
    return MCPDiff.create(
../../metadata-ingestion/src/datahub/testing/mcp_diff.py:174: in create
    diff = DeepDiff(
venv/lib/python3.11/site-packages/deepdiff/diff.py:348: in __init__
    self._diff(root, parents_ids=frozenset({id(t1)}), _original_type=_original_type)
venv/lib/python3.11/site-packages/deepdiff/diff.py:1720: in _diff
    self._diff_iterable(level, parents_ids, _original_type=_original_type, local_tree=local_tree)
venv/lib/python3.11/site-packages/deepdiff/diff.py:738: in _diff_iterable
    self._diff_iterable_with_deephash(level, parents_ids, _original_type=_original_type, local_tree=local_tree)
venv/lib/python3.11/site-packages/deepdiff/diff.py:1282: in _diff_iterable_with_deephash
    full_t1_hashtable = self._create_hashtable(level, 't1')
venv/lib/python3.11/site-packages/deepdiff/diff.py:1098: in _create_hashtable
    deep_hash = DeepHash(
venv/lib/python3.11/site-packages/deepdiff/deephash.py:221: in __init__
    self._hash(obj, parent=parent, parents_ids=frozenset({get_id(obj)}))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[TypeError("object of type 'object' has no len()") raised in repr()] DeepHash object at 0x7f2decc32b90>
obj = 
		{"urn": "urn:li:dataFlow:(dagster,prod/etl,PROD)", "change_type": "UPSERT", "aspect_name": "dataFlowInfo", "aspect": "<aspect>"}
parent = 'root[0]', parents_ids = frozenset({'!>*id139835223738128'})

    def _hash(self, obj, parent, parents_ids=EMPTY_FROZENSET):
        """The main hash method"""
        counts = 1
        if self.custom_operators is not None:
            for operator in self.custom_operators:
                func = getattr(operator, 'normalize_value_for_hashing', None)
                if func is None:
>                   raise NotImplementedError(f"{operator.__class__.__name__} needs to define a normalize_value_for_hashing method to be compatible with ignore_order=True or iterable_compare_func.".format(operator))
E                   NotImplementedError: DeltaInfoOperator needs to define a normalize_value_for_hashing method to be compatible with ignore_order=True or iterable_compare_func.

venv/lib/python3.11/site-packages/deepdiff/deephash.py:513: NotImplementedError

Check notice on line 0 in .github

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results (Dagster Plugin)

2 tests found

There are 2 tests, see "Raw output" for the full list of tests.
Raw output
tests.unit.test_dagster ‑ test_datahub_sensor
tests.unit.test_dagster ‑ test_emit_metadata