diff --git a/.github/workflows/conda.yml b/.github/workflows/conda.yml
index b89fe859606..c18261582ed 100644
--- a/.github/workflows/conda.yml
+++ b/.github/workflows/conda.yml
@@ -50,11 +50,9 @@ jobs:
 
           # conda search for the latest dask-core pre-release
           dask_core_arr=($(conda search --override-channels -c dask/label/dev dask-core | tail -n 1))
-          dask_expr_arr=($(conda search --override-channels -c dask/label/dev dask-expr | tail -n 1))
 
-          # extract dask-core & dask-expr pre-release versions
+          # extract dask-core pre-release version
           export DASK_CORE_VERSION=${dask_core_arr[1]}
-          export DASK_EXPR_VERSION=${dask_expr_arr[1]}
 
           # distributed pre-release build
           conda mambabuild continuous_integration/recipes/distributed \
diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml
index 5cacdb0eefc..bfa30c6a96c 100644
--- a/.github/workflows/tests.yaml
+++ b/.github/workflows/tests.yaml
@@ -73,12 +73,12 @@ jobs:
           - os: ubuntu-latest
             environment: mindeps
             label: pandas
-            extra_packages: [numpy=1.24, pandas=2.0, pyarrow=14.0.1, dask-expr]
+            extra_packages: [numpy=1.24, pandas=2.0, pyarrow=14.0.1]
             partition: "ci1"
           - os: ubuntu-latest
             environment: mindeps
             label: pandas
-            extra_packages: [numpy=1.24, pandas=2.0, pyarrow=14.0.1, dask-expr]
+            extra_packages: [numpy=1.24, pandas=2.0, pyarrow=14.0.1]
             partition: "not ci1"
 
           - os: ubuntu-latest
diff --git a/continuous_integration/environment-3.10.yaml b/continuous_integration/environment-3.10.yaml
index e5436c5cf34..5b8b21d02dd 100644
--- a/continuous_integration/environment-3.10.yaml
+++ b/continuous_integration/environment-3.10.yaml
@@ -57,7 +57,6 @@ dependencies:
   - httpx<0.28.0
   - pip:
       - git+https://github.com/dask/dask
-      - git+https://github.com/dask/dask-expr
       - git+https://github.com/dask/zict
       - git+https://github.com/dask/crick  # Only tested here
       # Revert after https://github.com/dask/distributed/issues/8614 is fixed
diff --git a/continuous_integration/environment-3.11.yaml b/continuous_integration/environment-3.11.yaml
index 2c08c679ad1..ae136e17387 100644
--- a/continuous_integration/environment-3.11.yaml
+++ b/continuous_integration/environment-3.11.yaml
@@ -51,7 +51,6 @@ dependencies:
   - httpx<0.28.0
   - pip:
       - git+https://github.com/dask/dask
-      - git+https://github.com/dask/dask-expr
       - git+https://github.com/dask/zict
       # Revert after https://github.com/dask/distributed/issues/8614 is fixed
       # - git+https://github.com/dask/s3fs
diff --git a/continuous_integration/environment-3.12.yaml b/continuous_integration/environment-3.12.yaml
index 44709c46391..bd37fbf4042 100644
--- a/continuous_integration/environment-3.12.yaml
+++ b/continuous_integration/environment-3.12.yaml
@@ -51,7 +51,6 @@ dependencies:
   - httpx<0.28.0
   - pip:
       - git+https://github.com/dask/dask
-      - git+https://github.com/dask/dask-expr
       - git+https://github.com/dask/zict
       # Revert after https://github.com/dask/distributed/issues/8614 is fixed
       # - git+https://github.com/dask/s3fs
diff --git a/continuous_integration/environment-3.13.yaml b/continuous_integration/environment-3.13.yaml
index f478a34bded..a60f216f996 100644
--- a/continuous_integration/environment-3.13.yaml
+++ b/continuous_integration/environment-3.13.yaml
@@ -50,7 +50,6 @@ dependencies:
   - httpx<0.28.0
   - pip:
       - git+https://github.com/dask/dask
-      - git+https://github.com/dask/dask-expr
       - git+https://github.com/dask/zict
       # Revert after https://github.com/dask/distributed/issues/8614 is fixed
       # - git+https://github.com/dask/s3fs
diff --git a/continuous_integration/recipes/dask/meta.yaml b/continuous_integration/recipes/dask/meta.yaml
index bd8f8be7dac..b48c799914d 100644
--- a/continuous_integration/recipes/dask/meta.yaml
+++ b/continuous_integration/recipes/dask/meta.yaml
@@ -2,7 +2,6 @@
 {% set new_patch = major_minor_patch[2] | int + 1 %}
 {% set version = (major_minor_patch[:2] + [new_patch]) | join('.') + environ.get('VERSION_SUFFIX', '') %}
 {% set dask_version = environ.get('DASK_CORE_VERSION', '0.0.0.dev') %}
-{% set dask_expr_version = environ.get('DASK_EXPR_VERSION', '0.0.0.dev') %}
 
 
 package:
@@ -21,12 +20,10 @@ requirements:
   host:
     - python >=3.10
     - dask-core {{ dask_version }}
-    - dask-expr {{ dask_expr_version }}
     - distributed {{ version }}
   run:
     - python >=3.10
     - {{ pin_compatible('dask-core', max_pin='x.x.x.x') }}
-    - {{ pin_compatible('dask-expr', max_pin='x.x.x.x') }}
     - {{ pin_compatible('distributed', exact=True) }}
     - cytoolz >=0.11.2
     - lz4 >=4.3.2
diff --git a/continuous_integration/scripts/test_report.py b/continuous_integration/scripts/test_report.py
index 9371278b294..1f339ed7ef3 100644
--- a/continuous_integration/scripts/test_report.py
+++ b/continuous_integration/scripts/test_report.py
@@ -142,7 +142,7 @@ def get_jobs(run, session, repo):
     # this to the JXML results.
 
     if repo.endswith("/dask"):
-        # example name: "test (windows-latest, 3.9)" or "test (ubuntu-latest, 3.12, dask-expr)"
+        # example name: "test (windows-latest, 3.9)" or "test (ubuntu-latest, 3.12)"
         df_jobs = df_jobs[~df_jobs.name.str.contains("Event File")]
 
         def format(row):
@@ -389,10 +389,6 @@ def download_and_parse_artifacts(
                 if a["name"].startswith("test-results") and repo.endswith("/dask"):
                     continue
 
-                # NOTE: Temporarily ignore reporting dask-expr related test cases
-                if "dask-expr" in a["name"]:
-                    continue
-
                 # Note: we assign a column with the workflow run timestamp rather
                 # than the artifact timestamp so that artifacts triggered under
                 # the same workflow run can be aligned according to the same trigger
diff --git a/distributed/shuffle/tests/test_merge.py b/distributed/shuffle/tests/test_merge.py
index 92d1152bf8e..6be130cec00 100644
--- a/distributed/shuffle/tests/test_merge.py
+++ b/distributed/shuffle/tests/test_merge.py
@@ -57,9 +57,9 @@ async def test_basic_merge(c, s, a, b, how):
     joined = a.merge(b, left_on="y", right_on="y", how=how)
 
     # Ensure we're using a hash join
-    from dask_expr._merge import HashJoinP2P
-
-    assert any(isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk())
+    assert any(
+        expr._name.startswith("hashjoinp2p") for expr in joined.optimize()._expr.walk()
+    )
 
     expected = pd.merge(A, B, how, "y")
     await list_eq(joined, expected)
diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py
index 18dc1f18879..7b888feb7bc 100644
--- a/distributed/shuffle/tests/test_shuffle.py
+++ b/distributed/shuffle/tests/test_shuffle.py
@@ -1637,33 +1637,6 @@ async def test_multi(c, s, a, b):
     await assert_scheduler_cleanup(s)
 
 
-@pytest.mark.skipif(reason="worker restrictions are not supported in dask-expr")
-@gen_cluster(client=True)
-async def test_restrictions(c, s, a, b):
-    df = dask.datasets.timeseries(
-        start="2000-01-01",
-        end="2000-01-10",
-        dtypes={"x": float, "y": float},
-        freq="10 s",
-    ).persist(workers=a.address)
-    await df
-    assert a.data
-    assert not b.data
-
-    with dask.config.set({"dataframe.shuffle.method": "p2p"}):
-        x = df.shuffle("x")
-        y = df.shuffle("y")
-
-    x = x.persist(workers=b.address)
-    y = y.persist(workers=a.address)
-
-    await x
-    assert all(key in b.data for key in x.__dask_keys__())
-
-    await y
-    assert all(key in a.data for key in y.__dask_keys__())
-
-
 @gen_cluster(client=True)
 async def test_delete_some_results(c, s, a, b):
     df = dask.datasets.timeseries(
diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py
index ac77936d75e..9230dd4fff1 100644
--- a/distributed/tests/test_scheduler.py
+++ b/distributed/tests/test_scheduler.py
@@ -2829,9 +2829,7 @@ async def test_default_task_duration_splits(c, s, a, b):
     await wait(fut)
 
     split_prefix = [pre for pre in s.task_prefixes.keys() if "split" in pre]
-    # dask-expr enabled: ['split-taskshuffle', 'split-stage']
-    # dask-expr disabled: ['split-shuffle']
-    assert split_prefix
+    assert split_prefix == ["split-taskshuffle", "split-stage"]
     default_times = dask.config.get("distributed.scheduler.default-task-durations")
     for p in split_prefix:
         default_time = parse_timedelta(default_times[p])
diff --git a/distributed/tests/test_spans.py b/distributed/tests/test_spans.py
index 63db4ccf4ce..b207d460b74 100644
--- a/distributed/tests/test_spans.py
+++ b/distributed/tests/test_spans.py
@@ -803,7 +803,7 @@ def f():
 
 @gen_cluster(client=True)
 async def test_span_on_persist(c, s, a, b):
-    """As a workaround to lack of annotations support in dask-expr and loss of
+    """As a workaround to lack of annotations support in dask.dataframe and loss of
     annotations due to low level optimization in dask.array, you can use span() to wrap
     calls to persist() and compute()
     """