diff --git a/airbyte-integrations/connectors/source-slack/.coveragerc b/airbyte-integrations/connectors/source-slack/.coveragerc
new file mode 100644
index 0000000000000..ce32301144712
--- /dev/null
+++ b/airbyte-integrations/connectors/source-slack/.coveragerc
@@ -0,0 +1,3 @@
+[run]
+omit =
+    source_slack/run.py
diff --git a/airbyte-integrations/connectors/source-slack/acceptance-test-config.yml b/airbyte-integrations/connectors/source-slack/acceptance-test-config.yml
index 2fd1ba9af04bd..a9ee3dea68d49 100644
--- a/airbyte-integrations/connectors/source-slack/acceptance-test-config.yml
+++ b/airbyte-integrations/connectors/source-slack/acceptance-test-config.yml
@@ -6,7 +6,9 @@ acceptance_tests:
       - spec_path: "source_slack/spec.json"
         backward_compatibility_tests_config:
           # edited `min`/`max` > `minimum`/`maximum` for `lookback_window` field
-          disable_for_version: "0.1.26"
+          #disable_for_version: "0.1.26"
+          # slight changes: removed doc url, added new null oauth param
+          disable_for_version: "0.3.10"
   connection:
     tests:
       - config_path: "secrets/config.json"
diff --git a/airbyte-integrations/connectors/source-slack/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-slack/integration_tests/abnormal_state.json
index d55652e4e69a1..104b5856e0748 100644
--- a/airbyte-integrations/connectors/source-slack/integration_tests/abnormal_state.json
+++ b/airbyte-integrations/connectors/source-slack/integration_tests/abnormal_state.json
@@ -9,8 +9,49 @@
   {
     "type": "STREAM",
     "stream": {
-      "stream_state": { "float_ts": 7270247822 },
-      "stream_descriptor": { "name": "channel_messages" }
+      "stream_descriptor": {
+        "name": "channel_messages"
+      },
+      "stream_state": {
+        "states": [
+          {
+            "partition": {
+              "channel_id": "C04LTCM2Y56",
+              "parent_slice": {}
+            },
+            "cursor": {
+              "float_ts": "2534945416"
+            }
+          },
+          {
+            "partition": {
+              "channel": "C04KX3KEZ54",
+              "parent_slice": {}
+            },
+            "cursor": {
+              "float_ts": "2534945416"
+            }
+          },
+          {
+            "partition": {
+              "channel": "C04L3M4PTJ6",
+              "parent_slice": {}
+            },
+            "cursor": {
+              "float_ts": "2534945416"
+            }
+          },
+          {
+            "partition": {
+              "channel": "C04LTCM2Y56",
+              "parent_slice": {}
+            },
+            "cursor": {
+              "float_ts": "2534945416"
+            }
+          }
+        ]
+      }
     }
   }
 ]
diff --git a/airbyte-integrations/connectors/source-slack/integration_tests/expected_records.jsonl b/airbyte-integrations/connectors/source-slack/integration_tests/expected_records.jsonl
index e5b6c79113767..6ed8e6208f04e 100644
--- a/airbyte-integrations/connectors/source-slack/integration_tests/expected_records.jsonl
+++ b/airbyte-integrations/connectors/source-slack/integration_tests/expected_records.jsonl
@@ -4,12 +4,14 @@
 {"stream": "channel_members", "data": {"member_id": "U04L65GPMKN", "channel_id": "C04KX3KEZ54"}, "emitted_at": 1707568736171}
 {"stream": "channel_members", "data": {"member_id": "U04LY6NARHU", "channel_id": "C04KX3KEZ54"}, "emitted_at": 1707568736172}
 {"stream": "channel_members", "data": {"member_id": "U04M23SBJGM", "channel_id": "C04KX3KEZ54"}, "emitted_at": 1707568736172}
-{"stream": "channel_messages", "data": {"client_msg_id": "3ae60d35-58b8-441c-923a-75de35a4ed8a", "type": "message", "text": "Test Thread 2", "user": "U04L65GPMKN", "ts": "1683104542.931169", "blocks": [{"type": "rich_text", "block_id": "WLB", "elements": [{"type": "rich_text_section", "elements": [{"type": "text", "text": "Test Thread 2"}]}]}], "team": "T04KX3KDDU6", "thread_ts": "1683104542.931169", "reply_count": 2, "reply_users_count": 1, "latest_reply": "1683104568.059569", "reply_users": ["U04L65GPMKN"], "is_locked": false, "subscribed": true, "last_read": "1683104568.059569", "channel_id": "C04KX3KEZ54", "float_ts": 1683104542.931169}, "emitted_at": 1707568738170}
-{"stream": "channel_messages", "data": {"client_msg_id": "e27672c0-451e-42a6-8eff-a14d2db8ac1e", "type": "message", "text": "Test Thread 1", "user": "U04L65GPMKN", "ts": "1683104499.808709", "blocks": [{"type": "rich_text", "block_id": "0j7", "elements": [{"type": "rich_text_section", "elements": [{"type": "text", "text": "Test Thread 1"}]}]}], "team": "T04KX3KDDU6", "thread_ts": "1683104499.808709", "reply_count": 2, "reply_users_count": 1, "latest_reply": "1683104528.084359", "reply_users": ["U04L65GPMKN"], "is_locked": false, "subscribed": true, "last_read": "1683104528.084359", "channel_id": "C04LTCM2Y56", "float_ts": 1683104499.808709}, "emitted_at": 1707569060525}
-{"stream": "channel_messages", "data": {"type": "message", "subtype": "reminder_add", "text": " set up a reminder \u201ctest reminder\u201d in this channel at 9AM tomorrow, Eastern European Summer Time.", "user": "U04L65GPMKN", "ts": "1695814864.744249", "channel_id": "C04LTCM2Y56", "float_ts": 1695814864.744249}, "emitted_at": 1707569208689}
-{"stream": "threads", "data": {"client_msg_id": "3ae60d35-58b8-441c-923a-75de35a4ed8a", "type": "message", "text": "Test Thread 2", "user": "U04L65GPMKN", "ts": "1683104542.931169", "blocks": [{"type": "rich_text", "block_id": "WLB", "elements": [{"type": "rich_text_section", "elements": [{"type": "text", "text": "Test Thread 2"}]}]}], "team": "T04KX3KDDU6", "thread_ts": "1683104542.931169", "reply_count": 2, "reply_users_count": 1, "latest_reply": "1683104568.059569", "reply_users": ["U04L65GPMKN"], "is_locked": false, "subscribed": true, "last_read": "1683104568.059569", "channel_id": "C04KX3KEZ54", "float_ts": 1683104542.931169}, "emitted_at": 1707569354932}
-{"stream": "threads", "data": {"client_msg_id": "3e96d351-270c-493f-a1a0-fdc3c4c0e11f", "type": "message", "text": "<@U04M23SBJGM> test test test", "user": "U04L65GPMKN", "ts": "1683104559.922849", "blocks": [{"type": "rich_text", "block_id": "tX6vr", "elements": [{"type": "rich_text_section", "elements": [{"type": "user", "user_id": "U04M23SBJGM"}, {"type": "text", "text": " test test test"}]}]}], "team": "T04KX3KDDU6", "thread_ts": "1683104542.931169", "parent_user_id": "U04L65GPMKN", "channel_id": "C04KX3KEZ54", "float_ts": 1683104559.922849}, "emitted_at": 1707569354933}
-{"stream": "threads", "data": {"client_msg_id": "08023e44-9d18-41ed-81dd-5f04ed699656", "type": "message", "text": "<@U04LY6NARHU> test test", "user": "U04L65GPMKN", "ts": "1683104568.059569", "blocks": [{"type": "rich_text", "block_id": "IyUF", "elements": [{"type": "rich_text_section", "elements": [{"type": "user", "user_id": "U04LY6NARHU"}, {"type": "text", "text": " test test"}]}]}], "team": "T04KX3KDDU6", "thread_ts": "1683104542.931169", "parent_user_id": "U04L65GPMKN", "channel_id": "C04KX3KEZ54", "float_ts": 1683104568.059569}, "emitted_at": 1707569354933}
-{"stream": "users", "data": {"id": "USLACKBOT", "team_id": "T04KX3KDDU6", "name": "slackbot", "deleted": false, "color": "757575", "real_name": "Slackbot", "tz": "America/Los_Angeles", "tz_label": "Pacific Standard Time", "tz_offset": -28800, "profile": {"title": "", "phone": "", "skype": "", "real_name": "Slackbot", "real_name_normalized": "Slackbot", "display_name": "Slackbot", "display_name_normalized": "Slackbot", "fields": {}, "status_text": "", "status_emoji": "", "status_emoji_display_info": [], "status_expiration": 0, "avatar_hash": "sv41d8cd98f0", "always_active": true, "first_name": "slackbot", "last_name": "", "image_24": "https://a.slack-edge.com/80588/img/slackbot_24.png", "image_32": "https://a.slack-edge.com/80588/img/slackbot_32.png", "image_48": "https://a.slack-edge.com/80588/img/slackbot_48.png", "image_72": "https://a.slack-edge.com/80588/img/slackbot_72.png", "image_192": "https://a.slack-edge.com/80588/marketing/img/avatars/slackbot/avatar-slackbot.png", "image_512": "https://a.slack-edge.com/80588/img/slackbot_512.png", "status_text_canonical": "", "team": "T04KX3KDDU6"}, "is_admin": false, "is_owner": false, "is_primary_owner": false, "is_restricted": false, "is_ultra_restricted": false, "is_bot": false, "is_app_user": false, "updated": 0, "is_email_confirmed": false, "who_can_share_contact_card": "EVERYONE"}, "emitted_at": 1707569357949}
-{"stream": "users", "data": {"id": "U04KUMXNYMV", "team_id": "T04KX3KDDU6", "name": "deactivateduser693438", "deleted": true, "profile": {"title": "", "phone": "", "skype": "", "real_name": "Deactivated User", "real_name_normalized": "Deactivated User", "display_name": "deactivateduser", "display_name_normalized": "deactivateduser", "fields": null, "status_text": "", "status_emoji": "", "status_emoji_display_info": [], "status_expiration": 0, "avatar_hash": "g849cc56ed76", "huddle_state": "default_unset", "first_name": "Deactivated", "last_name": "User", "image_24": "https://secure.gravatar.com/avatar/d5320ceddda202563fd9e6222c07c00a.jpg?s=24&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0011-24.png", "image_32": "https://secure.gravatar.com/avatar/d5320ceddda202563fd9e6222c07c00a.jpg?s=32&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0011-32.png", "image_48": "https://secure.gravatar.com/avatar/d5320ceddda202563fd9e6222c07c00a.jpg?s=48&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0011-48.png", "image_72": "https://secure.gravatar.com/avatar/d5320ceddda202563fd9e6222c07c00a.jpg?s=72&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0011-72.png", "image_192": "https://secure.gravatar.com/avatar/d5320ceddda202563fd9e6222c07c00a.jpg?s=192&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0011-192.png", "image_512": "https://secure.gravatar.com/avatar/d5320ceddda202563fd9e6222c07c00a.jpg?s=512&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0011-512.png", "status_text_canonical": "", "team": "T04KX3KDDU6"}, "is_bot": false, "is_app_user": false, "updated": 1675090804, "is_forgotten": true, "is_invited_user": true}, "emitted_at": 1707569357951}
-{"stream": "users", "data": {"id": "U04L2KY5CES", "team_id": "T04KX3KDDU6", "name": "deactivateduser686066", "deleted": true, "profile": {"title": "", "phone": "", "skype": "", "real_name": "Deactivated User", "real_name_normalized": "Deactivated User", "display_name": "deactivateduser", "display_name_normalized": "deactivateduser", "fields": null, "status_text": "", "status_emoji": "", "status_emoji_display_info": [], "status_expiration": 0, "avatar_hash": "g849cc56ed76", "huddle_state": "default_unset", "first_name": "Deactivated", "last_name": "User", "image_24": "https://secure.gravatar.com/avatar/cacb225265b3b19c4e72029a62cf1ef1.jpg?s=24&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0009-24.png", "image_32": "https://secure.gravatar.com/avatar/cacb225265b3b19c4e72029a62cf1ef1.jpg?s=32&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0009-32.png", "image_48": "https://secure.gravatar.com/avatar/cacb225265b3b19c4e72029a62cf1ef1.jpg?s=48&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0009-48.png", "image_72": "https://secure.gravatar.com/avatar/cacb225265b3b19c4e72029a62cf1ef1.jpg?s=72&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0009-72.png", "image_192": "https://secure.gravatar.com/avatar/cacb225265b3b19c4e72029a62cf1ef1.jpg?s=192&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0009-192.png", "image_512": "https://secure.gravatar.com/avatar/cacb225265b3b19c4e72029a62cf1ef1.jpg?s=512&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0009-512.png", "status_text_canonical": "", "team": "T04KX3KDDU6"}, "is_bot": false, "is_app_user": false, "updated": 1675090785, "is_forgotten": true, "is_invited_user": true}, "emitted_at": 1707569357951}
+{"stream": "channel_messages", "data": {"user": "U04L65GPMKN", "type": "message", "ts": "1683104542.931169", "client_msg_id": "3ae60d35-58b8-441c-923a-75de35a4ed8a", "text": "Test Thread 2", "team": "T04KX3KDDU6", "thread_ts": "1683104542.931169", "reply_count": 2, "reply_users_count": 1, "latest_reply": "1683104568.059569", "reply_users": ["U04L65GPMKN"], "is_locked": false, "subscribed": true, "last_read": "1683104568.059569", "blocks": [{"type": "rich_text", "block_id": "WLB", "elements": [{"type": "rich_text_section", "elements": [{"type": "text", "text": "Test Thread 2"}]}]}], "float_ts": 1683104542, "channel_id": "C04KX3KEZ54"}, "emitted_at": 1711041520506}
+{"stream": "channel_messages", "data": {"user": "U04L65GPMKN", "type": "message", "ts": "1683104499.808709", "client_msg_id": "e27672c0-451e-42a6-8eff-a14d2db8ac1e", "text": "Test Thread 1", "team": "T04KX3KDDU6", "thread_ts": "1683104499.808709", "reply_count": 2, "reply_users_count": 1, "latest_reply": "1683104528.084359", "reply_users": ["U04L65GPMKN"], "is_locked": false, "subscribed": true, "last_read": "1683104528.084359", "blocks": [{"type": "rich_text", "block_id": "0j7", "elements": [{"type": "rich_text_section", "elements": [{"type": "text", "text": "Test Thread 1"}]}]}], "float_ts": 1683104499, "channel_id": "C04LTCM2Y56"}, "emitted_at": 1711041522765}
+{"stream": "channel_messages", "data": {"user": "USLACKBOT", "type": "message", "ts": "1695880827.186049", "bot_id": "B01", "text": "Reminder: test reminder.", "team": "T04KX3KDDU6", "blocks": [{"type": "rich_text", "block_id": "BGzX", "elements": [{"type": "rich_text_section", "elements": [{"type": "text", "text": "Reminder: test reminder."}]}]}], "float_ts": 1695880827, "channel_id": "C04LTCM2Y56"}, "emitted_at": 1711041523073}
+{"stream": "channel_messages", "data": {"subtype": "reminder_add", "user": "U04L65GPMKN", "type": "message", "ts": "1695814864.744249", "text": " set up a reminder \u201ctest reminder\u201d in this channel at 9AM tomorrow, Eastern European Summer Time.", "float_ts": 1695814864, "channel_id": "C04LTCM2Y56"}, "emitted_at": 1711041523080}
+{"stream": "threads", "data": {"user": "U04L65GPMKN", "type": "message", "ts": "1683104542.931169", "client_msg_id": "3ae60d35-58b8-441c-923a-75de35a4ed8a", "text": "Test Thread 2", "team": "T04KX3KDDU6", "thread_ts": "1683104542.931169", "reply_count": 2, "reply_users_count": 1, "latest_reply": "1683104568.059569", "reply_users": ["U04L65GPMKN"], "is_locked": false, "subscribed": true, "last_read": "1683104568.059569", "blocks": [{"type": "rich_text", "block_id": "WLB", "elements": [{"type": "rich_text_section", "elements": [{"type": "text", "text": "Test Thread 2"}]}]}], "channel_id": "C04KX3KEZ54", "float_ts": 1683104542.931169}, "emitted_at": 1712056304168}
+{"stream": "threads", "data": {"user": "U04L65GPMKN", "type": "message", "ts": "1683104559.922849", "client_msg_id": "3e96d351-270c-493f-a1a0-fdc3c4c0e11f", "text": "<@U04M23SBJGM> test test test", "team": "T04KX3KDDU6", "thread_ts": "1683104542.931169", "parent_user_id": "U04L65GPMKN", "blocks": [{"type": "rich_text", "block_id": "tX6vr", "elements": [{"type": "rich_text_section", "elements": [{"type": "user", "user_id": "U04M23SBJGM"}, {"type": "text", "text": " test test test"}]}]}], "channel_id": "C04KX3KEZ54", "float_ts": 1683104559.922849}, "emitted_at": 1712056304169}
+{"stream": "threads", "data": {"user": "U04L65GPMKN", "type": "message", "ts": "1683104568.059569", "client_msg_id": "08023e44-9d18-41ed-81dd-5f04ed699656", "text": "<@U04LY6NARHU> test test", "team": "T04KX3KDDU6", "thread_ts": "1683104542.931169", "parent_user_id": "U04L65GPMKN", "blocks": [{"type": "rich_text", "block_id": "IyUF", "elements": [{"type": "rich_text_section", "elements": [{"type": "user", "user_id": "U04LY6NARHU"}, {"type": "text", "text": " test test"}]}]}], "channel_id": "C04KX3KEZ54", "float_ts": 1683104568.059569}, "emitted_at": 1712056304169}
+{"stream": "threads", "data": {"user": "USLACKBOT", "type": "message", "ts": "1695880827.186049", "bot_id": "B01", "text": "Reminder: test reminder.", "team": "T04KX3KDDU6", "blocks": [{"type": "rich_text", "block_id": "BGzX", "elements": [{"type": "rich_text_section", "elements": [{"type": "text", "text": "Reminder: test reminder."}]}]}], "channel_id": "C04LTCM2Y56", "float_ts": 1695880827.186049}, "emitted_at": 1712056304703}
+{"stream": "users", "data": {"id": "USLACKBOT", "team_id": "T04KX3KDDU6", "name": "slackbot", "deleted": false, "color": "757575", "real_name": "Slackbot", "tz": "America/Los_Angeles", "tz_label": "Pacific Daylight Time", "tz_offset": -25200, "profile": {"title": "", "phone": "", "skype": "", "real_name": "Slackbot", "real_name_normalized": "Slackbot", "display_name": "Slackbot", "display_name_normalized": "Slackbot", "fields": {}, "status_text": "", "status_emoji": "", "status_emoji_display_info": [], "status_expiration": 0, "avatar_hash": "sv41d8cd98f0", "always_active": true, "first_name": "slackbot", "last_name": "", "image_24": "https://a.slack-edge.com/80588/img/slackbot_24.png", "image_32": "https://a.slack-edge.com/80588/img/slackbot_32.png", "image_48": "https://a.slack-edge.com/80588/img/slackbot_48.png", "image_72": "https://a.slack-edge.com/80588/img/slackbot_72.png", "image_192": "https://a.slack-edge.com/80588/marketing/img/avatars/slackbot/avatar-slackbot.png", "image_512": "https://a.slack-edge.com/80588/img/slackbot_512.png", "status_text_canonical": "", "team": "T04KX3KDDU6"}, "is_admin": false, "is_owner": false, "is_primary_owner": false, "is_restricted": false, "is_ultra_restricted": false, "is_bot": false, "is_app_user": false, "updated": 0, "is_email_confirmed": false, "who_can_share_contact_card": "EVERYONE"}, "emitted_at": 1710501138877}
+{"stream": "users", "data": {"id": "U04KUMXNYMV", "team_id": "T04KX3KDDU6", "name": "deactivateduser693438", "deleted": true, "profile": {"title": "", "phone": "", "skype": "", "real_name": "Deactivated User", "real_name_normalized": "Deactivated User", "display_name": "deactivateduser", "display_name_normalized": "deactivateduser", "fields": null, "status_text": "", "status_emoji": "", "status_emoji_display_info": [], "status_expiration": 0, "avatar_hash": "g849cc56ed76", "huddle_state": "default_unset", "first_name": "Deactivated", "last_name": "User", "image_24": "https://secure.gravatar.com/avatar/d5320ceddda202563fd9e6222c07c00a.jpg?s=24&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0011-24.png", "image_32": "https://secure.gravatar.com/avatar/d5320ceddda202563fd9e6222c07c00a.jpg?s=32&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0011-32.png", "image_48": "https://secure.gravatar.com/avatar/d5320ceddda202563fd9e6222c07c00a.jpg?s=48&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0011-48.png", "image_72": "https://secure.gravatar.com/avatar/d5320ceddda202563fd9e6222c07c00a.jpg?s=72&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0011-72.png", "image_192": "https://secure.gravatar.com/avatar/d5320ceddda202563fd9e6222c07c00a.jpg?s=192&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0011-192.png", "image_512": "https://secure.gravatar.com/avatar/d5320ceddda202563fd9e6222c07c00a.jpg?s=512&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0011-512.png", "status_text_canonical": "", "team": "T04KX3KDDU6"}, "is_bot": false, "is_app_user": false, "updated": 1675090804, "is_forgotten": true, "is_invited_user": true}, "emitted_at": 1710501138879}
+{"stream": "users", "data": {"id": "U04L2KY5CES", "team_id": "T04KX3KDDU6", "name": "deactivateduser686066", "deleted": true, "profile": {"title": "", "phone": "", "skype": "", "real_name": "Deactivated User", "real_name_normalized": "Deactivated User", "display_name": "deactivateduser", "display_name_normalized": "deactivateduser", "fields": null, "status_text": "", "status_emoji": "", "status_emoji_display_info": [], "status_expiration": 0, "avatar_hash": "g849cc56ed76", "huddle_state": "default_unset", "first_name": "Deactivated", "last_name": "User", "image_24": "https://secure.gravatar.com/avatar/cacb225265b3b19c4e72029a62cf1ef1.jpg?s=24&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0009-24.png", "image_32": "https://secure.gravatar.com/avatar/cacb225265b3b19c4e72029a62cf1ef1.jpg?s=32&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0009-32.png", "image_48": "https://secure.gravatar.com/avatar/cacb225265b3b19c4e72029a62cf1ef1.jpg?s=48&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0009-48.png", "image_72": "https://secure.gravatar.com/avatar/cacb225265b3b19c4e72029a62cf1ef1.jpg?s=72&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0009-72.png", "image_192": "https://secure.gravatar.com/avatar/cacb225265b3b19c4e72029a62cf1ef1.jpg?s=192&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0009-192.png", "image_512": "https://secure.gravatar.com/avatar/cacb225265b3b19c4e72029a62cf1ef1.jpg?s=512&d=https%3A%2F%2Fa.slack-edge.com%2Fdf10d%2Fimg%2Favatars%2Fava_0009-512.png", "status_text_canonical": "", "team": "T04KX3KDDU6"}, "is_bot": false, "is_app_user": false, "updated": 1675090785, "is_forgotten": true, "is_invited_user": true}, "emitted_at": 1710501138881}
diff --git a/airbyte-integrations/connectors/source-slack/metadata.yaml b/airbyte-integrations/connectors/source-slack/metadata.yaml
index a8159eb12552c..37373659df839 100644
--- a/airbyte-integrations/connectors/source-slack/metadata.yaml
+++ b/airbyte-integrations/connectors/source-slack/metadata.yaml
@@ -10,7 +10,7 @@ data:
   connectorSubtype: api
   connectorType: source
   definitionId: c2281cee-86f9-4a86-bb48-d23286b4c7bd
-  dockerImageTag: 0.4.1
+  dockerImageTag: 1.0.0
   dockerRepository: airbyte/source-slack
   documentationUrl: https://docs.airbyte.com/integrations/sources/slack
   githubIssueLabel: source-slack
@@ -27,6 +27,19 @@ data:
     oss:
       enabled: true
   releaseStage: generally_available
+  releases:
+    breakingChanges:
+      1.0.0:
+        message:
+          The source Slack connector is being migrated from the Python CDK to our declarative low-code CDK.
+          Due to changes in the handling of state format for incremental substreams, this migration constitutes a breaking change for the channel_messages stream.
+          Users will need to reset source configuration, refresh the source schema and reset the channel_messages stream after upgrading.
+          For more information, see our migration documentation for source Slack.
+        upgradeDeadline: "2024-04-29"
+        scopedImpact:
+          - scopeType: stream
+            impactedScopes:
+              - "channel_messages"
   suggestedStreams:
     streams:
       - users
@@ -37,5 +50,5 @@ data:
   supportLevel: certified
   tags:
     - language:python
-    - cdk:python
+    - cdk:low-code
 metadataSpecVersion: "1.0"
diff --git a/airbyte-integrations/connectors/source-slack/poetry.lock b/airbyte-integrations/connectors/source-slack/poetry.lock
index a6a77535d1183..6cf0fe1c2aab4 100644
--- a/airbyte-integrations/connectors/source-slack/poetry.lock
+++ b/airbyte-integrations/connectors/source-slack/poetry.lock
@@ -1,18 +1,18 @@
-# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
+# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand.
 
 [[package]]
 name = "airbyte-cdk"
-version = "0.77.2"
+version = "0.81.4"
 description = "A framework for writing Airbyte Connectors."
 optional = false
 python-versions = "<4.0,>=3.9"
 files = [
-    {file = "airbyte_cdk-0.77.2-py3-none-any.whl", hash = "sha256:6dffbe0c4b3454a5cdd20525b4f1e9cfef2e80c005b6b30473fc5bf6f75af64e"},
-    {file = "airbyte_cdk-0.77.2.tar.gz", hash = "sha256:84aeb27862a18e135c7bc3a5dfc363037665d428e7495e8824673f853adcca70"},
+    {file = "airbyte_cdk-0.81.4-py3-none-any.whl", hash = "sha256:4ed193da4e8be4867e1d8983172d10afb3c3b10f3e10ec618431deec1f2af4cb"},
+    {file = "airbyte_cdk-0.81.4.tar.gz", hash = "sha256:5c63d8c792edf5f24d0ad804b34b3ebcc056ecede6cb4f87ebf9ac07aa987f24"},
 ]
 
 [package.dependencies]
-airbyte-protocol-models = "0.5.1"
+airbyte-protocol-models = "*"
 backoff = "*"
 cachetools = "*"
 Deprecated = ">=1.2,<1.3"
@@ -32,19 +32,19 @@ requests_cache = "*"
 wcmatch = "8.4"
 
 [package.extras]
-file-based = ["avro (>=1.11.2,<1.12.0)", "fastavro (>=1.8.0,<1.9.0)", "markdown", "pyarrow (>=15.0.0,<15.1.0)", "pytesseract (==0.3.10)", "unstructured.pytesseract (>=0.3.12)", "unstructured[docx,pptx] (==0.10.27)"]
+file-based = ["avro (>=1.11.2,<1.12.0)", "fastavro (>=1.8.0,<1.9.0)", "markdown", "pdf2image (==1.16.3)", "pdfminer.six (==20221105)", "pyarrow (>=15.0.0,<15.1.0)", "pytesseract (==0.3.10)", "unstructured.pytesseract (>=0.3.12)", "unstructured[docx,pptx] (==0.10.27)"]
 sphinx-docs = ["Sphinx (>=4.2,<4.3)", "sphinx-rtd-theme (>=1.0,<1.1)"]
 vector-db-based = ["cohere (==4.21)", "langchain (==0.0.271)", "openai[embeddings] (==0.27.9)", "tiktoken (==0.4.0)"]
 
 [[package]]
 name = "airbyte-protocol-models"
-version = "0.5.1"
+version = "0.9.0"
 description = "Declares the Airbyte Protocol."
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "airbyte_protocol_models-0.5.1-py3-none-any.whl", hash = "sha256:dfe84e130e51ce2ae81a06d5aa36f6c5ce3152b9e36e6f0195fad6c3dab0927e"},
-    {file = "airbyte_protocol_models-0.5.1.tar.gz", hash = "sha256:7c8b16c7c1c7956b1996052e40585a3a93b1e44cb509c4e97c1ee4fe507ea086"},
+    {file = "airbyte_protocol_models-0.9.0-py3-none-any.whl", hash = "sha256:e972e140b5efd1edad5a338bcae8fdee9fc12545caf2c321e0f61b151c163a9b"},
+    {file = "airbyte_protocol_models-0.9.0.tar.gz", hash = "sha256:40b69c33df23fe82d7078e84beb123bd604480e4d73cb277a890fcc92aedc8d2"},
 ]
 
 [package.dependencies]
@@ -300,6 +300,20 @@ files = [
 [package.extras]
 test = ["pytest (>=6)"]
 
+[[package]]
+name = "freezegun"
+version = "1.4.0"
+description = "Let your Python tests travel through time"
+optional = false
+python-versions = ">=3.7"
+files = [
+    {file = "freezegun-1.4.0-py3-none-any.whl", hash = "sha256:55e0fc3c84ebf0a96a5aa23ff8b53d70246479e9a68863f1fcac5a3e52f19dd6"},
+    {file = "freezegun-1.4.0.tar.gz", hash = "sha256:10939b0ba0ff5adaecf3b06a5c2f73071d9678e507c5eaedb23c761d56ac774b"},
+]
+
+[package.dependencies]
+python-dateutil = ">=2.7"
+
 [[package]]
 name = "genson"
 version = "1.2.2"
@@ -312,13 +326,13 @@ files = [
 
 [[package]]
 name = "idna"
-version = "3.6"
+version = "3.7"
 description = "Internationalized Domain Names in Applications (IDNA)"
 optional = false
 python-versions = ">=3.5"
 files = [
-    {file = "idna-3.6-py3-none-any.whl", hash = "sha256:c05567e9c24a6b9faaa835c4821bad0590fbb9d5779e7caa6e1cc4978e7eb24f"},
-    {file = "idna-3.6.tar.gz", hash = "sha256:9ecdbbd083b06798ae1e86adcbfe8ab1479cf864e4ee30fe4e46a003d12491ca"},
+    {file = "idna-3.7-py3-none-any.whl", hash = "sha256:82fee1fc78add43492d3a1898bfa6d8a904cc97d8427f683ed8e798d07761aa0"},
+    {file = "idna-3.7.tar.gz", hash = "sha256:028ff3aadf0609c1fd278d8ea3089299412a7a8b9bd005dd08b9f8285bcb5cfc"},
 ]
 
 [[package]]
@@ -552,47 +566,47 @@ files = [
 
 [[package]]
 name = "pydantic"
-version = "1.10.14"
+version = "1.10.15"
 description = "Data validation and settings management using python type hints"
 optional = false
 python-versions = ">=3.7"
 files = [
-    {file = "pydantic-1.10.14-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:7f4fcec873f90537c382840f330b90f4715eebc2bc9925f04cb92de593eae054"},
-    {file = "pydantic-1.10.14-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8e3a76f571970fcd3c43ad982daf936ae39b3e90b8a2e96c04113a369869dc87"},
-    {file = "pydantic-1.10.14-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:82d886bd3c3fbeaa963692ef6b643159ccb4b4cefaf7ff1617720cbead04fd1d"},
-    {file = "pydantic-1.10.14-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:798a3d05ee3b71967844a1164fd5bdb8c22c6d674f26274e78b9f29d81770c4e"},
-    {file = "pydantic-1.10.14-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:23d47a4b57a38e8652bcab15a658fdb13c785b9ce217cc3a729504ab4e1d6bc9"},
-    {file = "pydantic-1.10.14-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:f9f674b5c3bebc2eba401de64f29948ae1e646ba2735f884d1594c5f675d6f2a"},
-    {file = "pydantic-1.10.14-cp310-cp310-win_amd64.whl", hash = "sha256:24a7679fab2e0eeedb5a8924fc4a694b3bcaac7d305aeeac72dd7d4e05ecbebf"},
-    {file = "pydantic-1.10.14-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:9d578ac4bf7fdf10ce14caba6f734c178379bd35c486c6deb6f49006e1ba78a7"},
-    {file = "pydantic-1.10.14-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:fa7790e94c60f809c95602a26d906eba01a0abee9cc24150e4ce2189352deb1b"},
-    {file = "pydantic-1.10.14-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:aad4e10efa5474ed1a611b6d7f0d130f4aafadceb73c11d9e72823e8f508e663"},
-    {file = "pydantic-1.10.14-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1245f4f61f467cb3dfeced2b119afef3db386aec3d24a22a1de08c65038b255f"},
-    {file = "pydantic-1.10.14-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:21efacc678a11114c765eb52ec0db62edffa89e9a562a94cbf8fa10b5db5c046"},
-    {file = "pydantic-1.10.14-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:412ab4a3f6dbd2bf18aefa9f79c7cca23744846b31f1d6555c2ee2b05a2e14ca"},
-    {file = "pydantic-1.10.14-cp311-cp311-win_amd64.whl", hash = "sha256:e897c9f35281f7889873a3e6d6b69aa1447ceb024e8495a5f0d02ecd17742a7f"},
-    {file = "pydantic-1.10.14-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:d604be0f0b44d473e54fdcb12302495fe0467c56509a2f80483476f3ba92b33c"},
-    {file = "pydantic-1.10.14-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a42c7d17706911199798d4c464b352e640cab4351efe69c2267823d619a937e5"},
-    {file = "pydantic-1.10.14-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:596f12a1085e38dbda5cbb874d0973303e34227b400b6414782bf205cc14940c"},
-    {file = "pydantic-1.10.14-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:bfb113860e9288d0886e3b9e49d9cf4a9d48b441f52ded7d96db7819028514cc"},
-    {file = "pydantic-1.10.14-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:bc3ed06ab13660b565eed80887fcfbc0070f0aa0691fbb351657041d3e874efe"},
-    {file = "pydantic-1.10.14-cp37-cp37m-win_amd64.whl", hash = "sha256:ad8c2bc677ae5f6dbd3cf92f2c7dc613507eafe8f71719727cbc0a7dec9a8c01"},
-    {file = "pydantic-1.10.14-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:c37c28449752bb1f47975d22ef2882d70513c546f8f37201e0fec3a97b816eee"},
-    {file = "pydantic-1.10.14-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:49a46a0994dd551ec051986806122767cf144b9702e31d47f6d493c336462597"},
-    {file = "pydantic-1.10.14-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:53e3819bd20a42470d6dd0fe7fc1c121c92247bca104ce608e609b59bc7a77ee"},
-    {file = "pydantic-1.10.14-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0fbb503bbbbab0c588ed3cd21975a1d0d4163b87e360fec17a792f7d8c4ff29f"},
-    {file = "pydantic-1.10.14-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:336709883c15c050b9c55a63d6c7ff09be883dbc17805d2b063395dd9d9d0022"},
-    {file = "pydantic-1.10.14-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:4ae57b4d8e3312d486e2498d42aed3ece7b51848336964e43abbf9671584e67f"},
-    {file = "pydantic-1.10.14-cp38-cp38-win_amd64.whl", hash = "sha256:dba49d52500c35cfec0b28aa8b3ea5c37c9df183ffc7210b10ff2a415c125c4a"},
-    {file = "pydantic-1.10.14-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:c66609e138c31cba607d8e2a7b6a5dc38979a06c900815495b2d90ce6ded35b4"},
-    {file = "pydantic-1.10.14-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:d986e115e0b39604b9eee3507987368ff8148222da213cd38c359f6f57b3b347"},
-    {file = "pydantic-1.10.14-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:646b2b12df4295b4c3148850c85bff29ef6d0d9621a8d091e98094871a62e5c7"},
-    {file = "pydantic-1.10.14-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:282613a5969c47c83a8710cc8bfd1e70c9223feb76566f74683af889faadc0ea"},
-    {file = "pydantic-1.10.14-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:466669501d08ad8eb3c4fecd991c5e793c4e0bbd62299d05111d4f827cded64f"},
-    {file = "pydantic-1.10.14-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:13e86a19dca96373dcf3190fcb8797d40a6f12f154a244a8d1e8e03b8f280593"},
-    {file = "pydantic-1.10.14-cp39-cp39-win_amd64.whl", hash = "sha256:08b6ec0917c30861e3fe71a93be1648a2aa4f62f866142ba21670b24444d7fd8"},
-    {file = "pydantic-1.10.14-py3-none-any.whl", hash = "sha256:8ee853cd12ac2ddbf0ecbac1c289f95882b2d4482258048079d13be700aa114c"},
-    {file = "pydantic-1.10.14.tar.gz", hash = "sha256:46f17b832fe27de7850896f3afee50ea682220dd218f7e9c88d436788419dca6"},
+    {file = "pydantic-1.10.15-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:22ed12ee588b1df028a2aa5d66f07bf8f8b4c8579c2e96d5a9c1f96b77f3bb55"},
+    {file = "pydantic-1.10.15-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:75279d3cac98186b6ebc2597b06bcbc7244744f6b0b44a23e4ef01e5683cc0d2"},
+    {file = "pydantic-1.10.15-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:50f1666a9940d3d68683c9d96e39640f709d7a72ff8702987dab1761036206bb"},
+    {file = "pydantic-1.10.15-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:82790d4753ee5d00739d6cb5cf56bceb186d9d6ce134aca3ba7befb1eedbc2c8"},
+    {file = "pydantic-1.10.15-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:d207d5b87f6cbefbdb1198154292faee8017d7495a54ae58db06762004500d00"},
+    {file = "pydantic-1.10.15-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:e49db944fad339b2ccb80128ffd3f8af076f9f287197a480bf1e4ca053a866f0"},
+    {file = "pydantic-1.10.15-cp310-cp310-win_amd64.whl", hash = "sha256:d3b5c4cbd0c9cb61bbbb19ce335e1f8ab87a811f6d589ed52b0254cf585d709c"},
+    {file = "pydantic-1.10.15-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:c3d5731a120752248844676bf92f25a12f6e45425e63ce22e0849297a093b5b0"},
+    {file = "pydantic-1.10.15-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:c365ad9c394f9eeffcb30a82f4246c0006417f03a7c0f8315d6211f25f7cb654"},
+    {file = "pydantic-1.10.15-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3287e1614393119c67bd4404f46e33ae3be3ed4cd10360b48d0a4459f420c6a3"},
+    {file = "pydantic-1.10.15-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:be51dd2c8596b25fe43c0a4a59c2bee4f18d88efb8031188f9e7ddc6b469cf44"},
+    {file = "pydantic-1.10.15-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:6a51a1dd4aa7b3f1317f65493a182d3cff708385327c1c82c81e4a9d6d65b2e4"},
+    {file = "pydantic-1.10.15-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:4e316e54b5775d1eb59187f9290aeb38acf620e10f7fd2f776d97bb788199e53"},
+    {file = "pydantic-1.10.15-cp311-cp311-win_amd64.whl", hash = "sha256:0d142fa1b8f2f0ae11ddd5e3e317dcac060b951d605fda26ca9b234b92214986"},
+    {file = "pydantic-1.10.15-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:7ea210336b891f5ea334f8fc9f8f862b87acd5d4a0cbc9e3e208e7aa1775dabf"},
+    {file = "pydantic-1.10.15-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3453685ccd7140715e05f2193d64030101eaad26076fad4e246c1cc97e1bb30d"},
+    {file = "pydantic-1.10.15-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9bea1f03b8d4e8e86702c918ccfd5d947ac268f0f0cc6ed71782e4b09353b26f"},
+    {file = "pydantic-1.10.15-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:005655cabc29081de8243126e036f2065bd7ea5b9dff95fde6d2c642d39755de"},
+    {file = "pydantic-1.10.15-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:af9850d98fc21e5bc24ea9e35dd80a29faf6462c608728a110c0a30b595e58b7"},
+    {file = "pydantic-1.10.15-cp37-cp37m-win_amd64.whl", hash = "sha256:d31ee5b14a82c9afe2bd26aaa405293d4237d0591527d9129ce36e58f19f95c1"},
+    {file = "pydantic-1.10.15-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:5e09c19df304b8123938dc3c53d3d3be6ec74b9d7d0d80f4f4b5432ae16c2022"},
+    {file = "pydantic-1.10.15-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:7ac9237cd62947db00a0d16acf2f3e00d1ae9d3bd602b9c415f93e7a9fc10528"},
+    {file = "pydantic-1.10.15-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:584f2d4c98ffec420e02305cf675857bae03c9d617fcfdc34946b1160213a948"},
+    {file = "pydantic-1.10.15-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bbc6989fad0c030bd70a0b6f626f98a862224bc2b1e36bfc531ea2facc0a340c"},
+    {file = "pydantic-1.10.15-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:d573082c6ef99336f2cb5b667b781d2f776d4af311574fb53d908517ba523c22"},
+    {file = "pydantic-1.10.15-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:6bd7030c9abc80134087d8b6e7aa957e43d35714daa116aced57269a445b8f7b"},
+    {file = "pydantic-1.10.15-cp38-cp38-win_amd64.whl", hash = "sha256:3350f527bb04138f8aff932dc828f154847fbdc7a1a44c240fbfff1b57f49a12"},
+    {file = "pydantic-1.10.15-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:51d405b42f1b86703555797270e4970a9f9bd7953f3990142e69d1037f9d9e51"},
+    {file = "pydantic-1.10.15-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:a980a77c52723b0dc56640ced396b73a024d4b74f02bcb2d21dbbac1debbe9d0"},
+    {file = "pydantic-1.10.15-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:67f1a1fb467d3f49e1708a3f632b11c69fccb4e748a325d5a491ddc7b5d22383"},
+    {file = "pydantic-1.10.15-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:676ed48f2c5bbad835f1a8ed8a6d44c1cd5a21121116d2ac40bd1cd3619746ed"},
+    {file = "pydantic-1.10.15-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:92229f73400b80c13afcd050687f4d7e88de9234d74b27e6728aa689abcf58cc"},
+    {file = "pydantic-1.10.15-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:2746189100c646682eff0bce95efa7d2e203420d8e1c613dc0c6b4c1d9c1fde4"},
+    {file = "pydantic-1.10.15-cp39-cp39-win_amd64.whl", hash = "sha256:394f08750bd8eaad714718812e7fab615f873b3cdd0b9d84e76e51ef3b50b6b7"},
+    {file = "pydantic-1.10.15-py3-none-any.whl", hash = "sha256:28e552a060ba2740d0d2aabe35162652c1459a0b9069fe0db7f4ee0e18e74d58"},
+    {file = "pydantic-1.10.15.tar.gz", hash = "sha256:ca832e124eda231a60a041da4f013e3ff24949d94a01154b137fc2f2a43c3ffb"},
 ]
 
 [package.dependencies]
@@ -837,13 +851,13 @@ yaml = ["pyyaml (>=6.0.1)"]
 
 [[package]]
 name = "requests-mock"
-version = "1.12.0"
+version = "1.12.1"
 description = "Mock out responses from the requests package"
 optional = false
-python-versions = "*"
+python-versions = ">=3.5"
 files = [
-    {file = "requests-mock-1.12.0.tar.gz", hash = "sha256:4e34f2a2752f0b78397fb414526605d95fcdeab021ac1f26d18960e7eb41f6a8"},
-    {file = "requests_mock-1.12.0-py2.py3-none-any.whl", hash = "sha256:4f6fdf956de568e0bac99eee4ad96b391c602e614cc0ad33e7f5c72edd699e70"},
+    {file = "requests-mock-1.12.1.tar.gz", hash = "sha256:e9e12e333b525156e82a3c852f22016b9158220d2f47454de9cae8a77d371401"},
+    {file = "requests_mock-1.12.1-py2.py3-none-any.whl", hash = "sha256:b1e37054004cdd5e56c84454cc7df12b25f90f382159087f4b6915aaeef39563"},
 ]
 
 [package.dependencies]
@@ -854,18 +868,18 @@ fixture = ["fixtures"]
 
 [[package]]
 name = "setuptools"
-version = "69.2.0"
+version = "69.5.1"
 description = "Easily download, build, install, upgrade, and uninstall Python packages"
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "setuptools-69.2.0-py3-none-any.whl", hash = "sha256:c21c49fb1042386df081cb5d86759792ab89efca84cf114889191cd09aacc80c"},
-    {file = "setuptools-69.2.0.tar.gz", hash = "sha256:0ff4183f8f42cd8fa3acea16c45205521a4ef28f73c6391d8a25e92893134f2e"},
+    {file = "setuptools-69.5.1-py3-none-any.whl", hash = "sha256:c636ac361bc47580504644275c9ad802c50415c7522212252c033bd15f301f32"},
+    {file = "setuptools-69.5.1.tar.gz", hash = "sha256:6c1fccdac05a97e598fb0ae3bbed5904ccb317337a51139dcd51453611bbb987"},
 ]
 
 [package.extras]
-docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx (<7.2.5)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
-testing = ["build[virtualenv]", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.9)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mypy (==1.9)", "packaging (>=23.2)", "pip (>=19.1)", "pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-ruff (>=0.2.1)", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
+docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
+testing = ["build[virtualenv]", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.9)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mypy (==1.9)", "packaging (>=23.2)", "pip (>=19.1)", "pytest (>=6,!=8.1.1)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf", "pytest-ruff (>=0.2.1)", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
 testing-integration = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "packaging (>=23.2)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"]
 
 [[package]]
@@ -892,13 +906,13 @@ files = [
 
 [[package]]
 name = "typing-extensions"
-version = "4.10.0"
+version = "4.11.0"
 description = "Backported and Experimental Type Hints for Python 3.8+"
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "typing_extensions-4.10.0-py3-none-any.whl", hash = "sha256:69b1a937c3a517342112fb4c6df7e72fc39a38e7891a5730ed4985b5214b5475"},
-    {file = "typing_extensions-4.10.0.tar.gz", hash = "sha256:b0abd7c89e8fb96f98db18d86106ff1d90ab692004eb746cf6eda2682f91b3cb"},
+    {file = "typing_extensions-4.11.0-py3-none-any.whl", hash = "sha256:c1f94d72897edaf4ce775bb7558d5b79d8126906a14ea5ed1635921406c0387a"},
+    {file = "typing_extensions-4.11.0.tar.gz", hash = "sha256:83f085bd5ca59c80295fc2a82ab5dac679cbe02b9f33f7d83af68e241bea51b0"},
 ]
 
 [[package]]
@@ -1028,4 +1042,4 @@ files = [
 [metadata]
 lock-version = "2.0"
 python-versions = "^3.9,<3.12"
-content-hash = "725b28d6832d760577625b1c889cb58847a9971894c7d9276708078685c56b38"
+content-hash = "59138844bec5f4f46b8a260d963d206e9881f8580ecdbeb4329d266ec0071a75"
diff --git a/airbyte-integrations/connectors/source-slack/pyproject.toml b/airbyte-integrations/connectors/source-slack/pyproject.toml
index a4ba01d34e609..aca63d06159f2 100644
--- a/airbyte-integrations/connectors/source-slack/pyproject.toml
+++ b/airbyte-integrations/connectors/source-slack/pyproject.toml
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
 build-backend = "poetry.core.masonry.api"
 
 [tool.poetry]
-version = "0.4.1"
+version = "1.0.0"
 name = "source-slack"
 description = "Source implementation for Slack."
 authors = [ "Airbyte <contact@airbyte.io>",]
@@ -19,6 +19,7 @@ include = "source_slack"
 python = "^3.9,<3.12"
 pendulum = "==2.1.2"
 airbyte-cdk = "^0"
+freezegun = "^1.4.0"
 
 [tool.poetry.scripts]
 source-slack = "source_slack.run:run"
diff --git a/airbyte-integrations/connectors/source-slack/source_slack/components/__init__.py b/airbyte-integrations/connectors/source-slack/source_slack/components/__init__.py
new file mode 100644
index 0000000000000..e69de29bb2d1d
diff --git a/airbyte-integrations/connectors/source-slack/source_slack/components/channel_members_extractor.py b/airbyte-integrations/connectors/source-slack/source_slack/components/channel_members_extractor.py
new file mode 100644
index 0000000000000..9dbb401a07e9d
--- /dev/null
+++ b/airbyte-integrations/connectors/source-slack/source_slack/components/channel_members_extractor.py
@@ -0,0 +1,21 @@
+# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+
+from dataclasses import dataclass
+from typing import List
+
+import requests
+from airbyte_cdk.sources.declarative.extractors import DpathExtractor
+from airbyte_cdk.sources.declarative.types import Record
+
+
+@dataclass
+class ChannelMembersExtractor(DpathExtractor):
+    """
+    Transform response from list of strings to list dicts:
+    from: ['aa', 'bb']
+    to: [{'member_id': 'aa'}, {{'member_id': 'bb'}]
+    """
+
+    def extract_records(self, response: requests.Response) -> List[Record]:
+        records = super().extract_records(response)
+        return [{"member_id": record} for record in records]
diff --git a/airbyte-integrations/connectors/source-slack/source_slack/components/join_channels.py b/airbyte-integrations/connectors/source-slack/source_slack/components/join_channels.py
new file mode 100644
index 0000000000000..e7f33851784c2
--- /dev/null
+++ b/airbyte-integrations/connectors/source-slack/source_slack/components/join_channels.py
@@ -0,0 +1,123 @@
+# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+
+import logging
+from functools import partial
+from typing import Any, Iterable, List, Mapping, Optional
+
+import requests
+from airbyte_cdk.models import SyncMode
+from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter
+from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
+from airbyte_cdk.sources.declarative.types import Record, StreamSlice
+from airbyte_cdk.sources.streams.core import StreamData
+from airbyte_cdk.sources.streams.http import HttpStream
+from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
+
+LOGGER = logging.getLogger("airbyte_logger")
+
+
+class JoinChannelsStream(HttpStream):
+    """
+    This class is a special stream which joins channels because the Slack API only returns messages from channels this bot is in.
+    Its responses should only be logged for debugging reasons, not read as records.
+    """
+
+    url_base = "https://slack.com/api/"
+    http_method = "POST"
+    primary_key = "id"
+
+    def __init__(self, channel_filter: List[str] = None, **kwargs):
+        self.channel_filter = channel_filter or []
+        super().__init__(**kwargs)
+
+    def path(self, **kwargs) -> str:
+        return "conversations.join"
+
+    def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable:
+        """
+        Override to simply indicate that the specific channel was joined successfully.
+        This method should not return any data, but should return an empty iterable.
+        """
+        is_ok = response.json().get("ok", False)
+        if is_ok:
+            self.logger.info(f"Successfully joined channel: {stream_slice['channel_name']}")
+        else:
+            self.logger.info(f"Unable to joined channel: {stream_slice['channel_name']}. Reason: {response.json()}")
+        return []
+
+    def request_body_json(self, stream_slice: Mapping = None, **kwargs) -> Optional[Mapping]:
+        if stream_slice:
+            return {"channel": stream_slice.get("channel")}
+
+    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
+        """
+        The pagination is not applicable to this Service Stream.
+        """
+        return None
+
+
+class ChannelsRetriever(SimpleRetriever):
+    def __post_init__(self, parameters: Mapping[str, Any]):
+        super().__post_init__(parameters)
+        self.stream_slicer = SinglePartitionRouter(parameters={})
+        self.record_selector.transformations = []
+
+    def should_join_to_channel(self, config: Mapping[str, Any], record: Record) -> bool:
+        """
+        The `is_member` property indicates whether the API Bot is already assigned / joined to the channel.
+        https://api.slack.com/types/conversation#booleans
+        """
+        return config["join_channels"] and not record.get("is_member")
+
+    def make_join_channel_slice(self, channel: Mapping[str, Any]) -> Mapping[str, Any]:
+        channel_id: str = channel.get("id")
+        channel_name: str = channel.get("name")
+        LOGGER.info(f"Joining Slack Channel: `{channel_name}`")
+        return {"channel": channel_id, "channel_name": channel_name}
+
+    def join_channels_stream(self, config) -> JoinChannelsStream:
+        token = config["credentials"].get("api_token") or config["credentials"].get("access_token")
+        authenticator = TokenAuthenticator(token)
+        channel_filter = config["channel_filter"]
+        return JoinChannelsStream(authenticator=authenticator, channel_filter=channel_filter)
+
+    def join_channel(self, config: Mapping[str, Any], record: Mapping[str, Any]):
+        list(
+            self.join_channels_stream(config).read_records(
+                sync_mode=SyncMode.full_refresh,
+                stream_slice=self.make_join_channel_slice(record),
+            )
+        )
+
+    def read_records(
+        self,
+        records_schema: Mapping[str, Any],
+        stream_slice: Optional[StreamSlice] = None,
+    ) -> Iterable[StreamData]:
+        _slice = stream_slice or StreamSlice(partition={}, cursor_slice={})  # None-check
+
+        self._paginator.reset()
+
+        most_recent_record_from_slice = None
+        record_generator = partial(
+            self._parse_records,
+            stream_state=self.state or {},
+            stream_slice=_slice,
+            records_schema=records_schema,
+        )
+
+        for stream_data in self._read_pages(record_generator, self.state, _slice):
+            # joining channel logic
+            if self.should_join_to_channel(self.config, stream_data):
+                self.join_channel(self.config, stream_data)
+
+            current_record = self._extract_record(stream_data, _slice)
+            if self.cursor and current_record:
+                self.cursor.observe(_slice, current_record)
+
+            most_recent_record_from_slice = self._get_most_recent_record(most_recent_record_from_slice, current_record, _slice)
+            yield stream_data
+
+        if self.cursor:
+            self.cursor.observe(_slice, most_recent_record_from_slice)
+        return
diff --git a/airbyte-integrations/connectors/source-slack/source_slack/config_migrations.py b/airbyte-integrations/connectors/source-slack/source_slack/config_migrations.py
new file mode 100644
index 0000000000000..cc6d9cd036070
--- /dev/null
+++ b/airbyte-integrations/connectors/source-slack/source_slack/config_migrations.py
@@ -0,0 +1,73 @@
+# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+
+import logging
+from typing import Any, List, Mapping
+
+from airbyte_cdk import AirbyteEntrypoint
+from airbyte_cdk.config_observation import create_connector_config_control_message
+from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
+from source_slack import SourceSlack
+
+logger = logging.getLogger("airbyte_logger")
+
+
+class MigrateLegacyConfig:
+    message_repository: MessageRepository = InMemoryMessageRepository()
+
+    @classmethod
+    def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
+        """
+        legacy config:
+        {
+            "start_date": "2021-07-22T20:00:00Z",
+            "end_date": "2021-07-23T20:00:00Z",
+            "lookback_window": 1,
+            "join_channels": True,
+            "channel_filter": ["airbyte-for-beginners", "good-reads"],
+            "api_token": "api-token"
+        }
+        api token should be in  the credentials object
+        """
+        if config.get("api_token") and not config.get("credentials"):
+            return True
+        return False
+
+    @classmethod
+    def _move_token_to_credentials(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
+        api_token = config["api_token"]
+        config.update({"credentials": {"api_token": api_token, "option_title": "API Token Credentials"}})
+        config.pop("api_token")
+        return config
+
+    @classmethod
+    def _modify_and_save(cls, config_path: str, source: SourceSlack, config: Mapping[str, Any]) -> Mapping[str, Any]:
+        migrated_config = cls._move_token_to_credentials(config)
+        # save the config
+        source.write_config(migrated_config, config_path)
+        return migrated_config
+
+    @classmethod
+    def _emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
+        # add the Airbyte Control Message to message repo
+        cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
+        # emit the Airbyte Control Message from message queue to stdout
+        for message in cls.message_repository._message_queue:
+            print(message.json(exclude_unset=True))
+
+    @classmethod
+    def migrate(cls, args: List[str], source: SourceSlack) -> None:
+        """
+        This method checks the input args, should the config be migrated,
+        transform if necessary and emit the CONTROL message.
+        """
+        # get config path
+        config_path = AirbyteEntrypoint(source).extract_config(args)
+        # proceed only if `--config` arg is provided
+        if config_path:
+            # read the existing config
+            config = source.read_config(config_path)
+            # migration check
+            if cls._should_migrate(config):
+                cls._emit_control_message(
+                    cls._modify_and_save(config_path, source, config),
+                )
diff --git a/airbyte-integrations/connectors/source-slack/source_slack/manifest.yaml b/airbyte-integrations/connectors/source-slack/source_slack/manifest.yaml
new file mode 100644
index 0000000000000..5a00f9a41ea1f
--- /dev/null
+++ b/airbyte-integrations/connectors/source-slack/source_slack/manifest.yaml
@@ -0,0 +1,260 @@
+version: 0.73.0
+type: DeclarativeSource
+
+definitions:
+  schema_loader:
+    type: JsonFileSchemaLoader
+    file_path: "./source_slack/schemas/{{ parameters['name'] }}.json"
+
+  default_paginator:
+    type: DefaultPaginator
+    page_token_option:
+      type: RequestOption
+      inject_into: request_parameter
+      field_name: cursor
+    page_size_option:
+      type: RequestOption
+      field_name: limit
+      inject_into: request_parameter
+    pagination_strategy:
+      type: CursorPagination
+      page_size: 1000
+      cursor_value: '{{ response.get("response_metadata", {}).get("next_cursor", {}) }}'
+      stop_condition: >-
+        {{ not response.get("response_metadata", {}).get("next_cursor", {})
+        }}
+
+  api_token_auth:
+    type: BearerAuthenticator
+    api_token: "{{ config['credentials']['api_token'] }}"
+  access_token_auth:
+    type: BearerAuthenticator
+    api_token: "{{ config['credentials']['access_token'] }}"
+
+  requester:
+    type: HttpRequester
+    url_base: https://slack.com/api/
+    path: "{{ parameters['path'] }}"
+    http_method: GET
+    request_parameters: {}
+    request_headers: {}
+    authenticator:
+      type: SelectiveAuthenticator
+      authenticator_selection_path: ["credentials", "option_title"]
+      authenticators:
+        Default OAuth2.0 authorization: "#/definitions/access_token_auth"
+        API Token Credentials: "#/definitions/api_token_auth"
+    request_body_json: {}
+    error_handler:
+      type: DefaultErrorHandler
+      response_filters:
+        - error_message_contains: "invalid_auth"
+          action: FAIL
+          error_message: Authentication has failed, please update your credentials.
+        - http_codes: [429]
+          action: RETRY
+          error_message: Failed to perform a request due to rate limits.
+        - http_codes: [500, 503]
+          action: RETRY
+          error_message: Failed to perform a request due to internal server error.
+
+  selector:
+    type: RecordSelector
+    extractor:
+      type: DpathExtractor
+      field_path:
+        - "{{ parameters['field_path'] }}"
+
+  retriever:
+    type: SimpleRetriever
+    requester:
+      $ref: "#/definitions/requester"
+    record_selector:
+      $ref: "#/definitions/selector"
+    paginator:
+      $ref: "#/definitions/default_paginator"
+    partition_router: []
+
+  stream_base:
+    primary_key: "id"
+    retriever:
+      $ref: "#/definitions/retriever"
+    schema_loader:
+      $ref: "#/definitions/schema_loader"
+
+  users_stream:
+    primary_key: "id"
+    retriever:
+      type: SimpleRetriever
+      requester:
+        $ref: "#/definitions/requester"
+        error_handler:
+          type: DefaultErrorHandler
+          response_filters:
+            - error_message_contains: "invalid_auth"
+              action: FAIL
+              error_message: Authentication has failed, please update your credentials.
+            - http_codes: [429]
+              action: RETRY
+              error_message: Failed to perform a request due to rate limits.
+            - http_codes: [403, 400]
+              action: FAIL
+              error_message: Got an exception while trying to set up the connection. Most probably, there are no users in the given Slack instance or your token is incorrect.
+            - http_codes: [500, 503]
+              action: RETRY
+              error_message: Failed to perform a request due to internal server error.
+      record_selector:
+        $ref: "#/definitions/selector"
+      paginator:
+        $ref: "#/definitions/default_paginator"
+      partition_router: []
+    schema_loader:
+      $ref: "#/definitions/schema_loader"
+    $parameters:
+      name: users
+      path: users.list
+      field_path: members
+
+  channels_stream:
+    primary_key: "id"
+    $parameters:
+      name: channels
+      path: conversations.list
+      field_path: channels
+    schema_loader:
+      $ref: "#/definitions/schema_loader"
+    retriever:
+      class_name: "source_slack.components.join_channels.ChannelsRetriever"
+      requester:
+        $ref: "#/definitions/requester"
+        request_parameters:
+          types: "public_channel"
+      record_selector:
+        $ref: "#/definitions/selector"
+        record_filter:
+          type: RecordFilter
+          condition: "{{ record.name in config.channel_filter or not config.channel_filter }}"
+        $parameters:
+          transformations: [[]]
+      paginator:
+        $ref: "#/definitions/default_paginator"
+        $parameters:
+          url_base: https://slack.com/api/
+      partition_router: []
+
+  channels_partition_router:
+    type: SubstreamPartitionRouter
+    parent_stream_configs:
+      - type: ParentStreamConfig
+        parent_key: id
+        request_option:
+          type: RequestOption
+          field_name: channel
+          inject_into: request_parameter
+        partition_field: channel_id
+        stream: "#/definitions/channels_stream"
+
+  channel_members_stream:
+    $ref: "#/definitions/stream_base"
+    $parameters:
+      name: channel_members
+      path: conversations.members
+      field_path: members
+    primary_key:
+      - member_id
+      - channel_id
+    retriever:
+      $ref: "#/definitions/retriever"
+      partition_router:
+        $ref: "#/definitions/channels_partition_router"
+      record_selector:
+        type: RecordSelector
+        extractor:
+          class_name: "source_slack.components.channel_members_extractor.ChannelMembersExtractor"
+          field_path: ["members"]
+    transformations:
+      - type: AddFields
+        fields:
+          - path:
+              - channel_id
+            value: "{{ stream_partition.get('channel_id') }}"
+
+  channel_messages_stream:
+    $ref: "#/definitions/stream_base"
+    $parameters:
+      name: channel_messages
+      path: conversations.history
+      field_path: messages
+    primary_key:
+      - channel_id
+      - ts
+    retriever:
+      $ref: "#/definitions/retriever"
+      requester:
+        $ref: "#/definitions/requester"
+        request_parameters:
+          inclusive: "True"
+      record_selector:
+        $ref: "#/definitions/selector"
+      paginator:
+        $ref: "#/definitions/default_paginator"
+      partition_router:
+        type: SubstreamPartitionRouter
+        parent_stream_configs:
+          - type: ParentStreamConfig
+            stream:
+              $ref: "#/definitions/channels_stream"
+              $parameters:
+                name: channels
+                path: conversations.list
+                field_path: channels
+            parent_key: id
+            partition_field: channel
+            request_option:
+              field_name: "channel"
+              inject_into: "request_parameter"
+    incremental_sync:
+      type: DatetimeBasedCursor
+      cursor_field: float_ts
+      cursor_datetime_formats:
+        - "%s"
+      step: P100D
+      cursor_granularity: P10D
+      lookback_window: "P{{ config.get('lookback_window', 0) }}D"
+      datetime_format: "%s"
+      start_datetime:
+        type: MinMaxDatetime
+        datetime: "{{ config['start_date'] }}"
+        datetime_format: "%Y-%m-%dT%H:%M:%SZ"
+      start_time_option:
+        inject_into: request_parameter
+        field_name: oldest
+        type: RequestOption
+      end_time_option:
+        inject_into: request_parameter
+        field_name: latest
+        type: RequestOption
+      end_datetime:
+        type: MinMaxDatetime
+        datetime: "{{ now_utc().strftime('%Y-%m-%dT%H:%M:%SZ') }}"
+        datetime_format: "%Y-%m-%dT%H:%M:%SZ"
+    transformations:
+      - type: AddFields
+        fields:
+          - path:
+              - float_ts
+            value: "{{ record.ts|float }}"
+          - path:
+              - channel_id
+            value: "{{ stream_partition.get('channel') }}"
+
+streams:
+  - "#/definitions/users_stream"
+  - "#/definitions/channels_stream"
+  - "#/definitions/channel_members_stream"
+  - "#/definitions/channel_messages_stream"
+
+check:
+  type: CheckStream
+  stream_names:
+    - users
diff --git a/airbyte-integrations/connectors/source-slack/source_slack/run.py b/airbyte-integrations/connectors/source-slack/source_slack/run.py
index 14caa9ab08e1e..fd5e385857b95 100644
--- a/airbyte-integrations/connectors/source-slack/source_slack/run.py
+++ b/airbyte-integrations/connectors/source-slack/source_slack/run.py
@@ -7,8 +7,10 @@
 
 from airbyte_cdk.entrypoint import launch
 from source_slack import SourceSlack
+from source_slack.config_migrations import MigrateLegacyConfig
 
 
 def run():
     source = SourceSlack()
+    MigrateLegacyConfig.migrate(sys.argv[1:], source)
     launch(source, sys.argv[1:])
diff --git a/airbyte-integrations/connectors/source-slack/source_slack/source.py b/airbyte-integrations/connectors/source-slack/source_slack/source.py
index e785114f865f4..3925e4bd44a67 100644
--- a/airbyte-integrations/connectors/source-slack/source_slack/source.py
+++ b/airbyte-integrations/connectors/source-slack/source_slack/source.py
@@ -2,354 +2,21 @@
 # Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 #
 
-
-from abc import ABC, abstractmethod
-from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
+from typing import Any, List, Mapping
 
 import pendulum
-import requests
-from airbyte_cdk import AirbyteLogger
-from airbyte_cdk.models import SyncMode
-from airbyte_cdk.sources import AbstractSource
+from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
 from airbyte_cdk.sources.streams import Stream
-from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
+from airbyte_cdk.sources.streams.http import HttpStream
 from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
-from pendulum import DateTime
-
-from .utils import chunk_date_range
-
-
-class SlackStream(HttpStream, ABC):
-    url_base = "https://slack.com/api/"
-    primary_key = "id"
-    page_size = 1000
-
-    @property
-    def max_retries(self) -> int:
-        # Slack's rate limiting can be unpredictable so we increase the max number of retries by a lot before failing
-        return 20
-
-    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
-        """Slack uses a cursor-based pagination strategy.
-        Extract the cursor from the response if it exists and return it in a format
-        that can be used to update request parameters"""
-
-        json_response = response.json()
-        next_cursor = json_response.get("response_metadata", {}).get("next_cursor")
-        if next_cursor:
-            return {"cursor": next_cursor}
-
-    def request_params(
-        self,
-        stream_state: Mapping[str, Any],
-        stream_slice: Mapping[str, Any] = None,
-        next_page_token: Mapping[str, Any] = None,
-    ) -> MutableMapping[str, Any]:
-        params = {"limit": self.page_size}
-        if next_page_token:
-            params.update(**next_page_token)
-        return params
-
-    def parse_response(
-        self,
-        response: requests.Response,
-        stream_state: Mapping[str, Any] = None,
-        stream_slice: Mapping[str, Any] = None,
-        next_page_token: Mapping[str, Any] = None,
-    ) -> Iterable[MutableMapping]:
-        json_response = response.json()
-        yield from json_response.get(self.data_field, [])
-
-    def backoff_time(self, response: requests.Response) -> Optional[float]:
-        """This method is called if we run into the rate limit.
-        Slack puts the retry time in the `Retry-After` response header so we
-        we return that value. If the response is anything other than a 429 (e.g: 5XX)
-        fall back on default retry behavior.
-        Rate Limits Docs: https://api.slack.com/docs/rate-limits#web"""
-
-        if "Retry-After" in response.headers:
-            return int(response.headers["Retry-After"])
-        else:
-            self.logger.info("Retry-after header not found. Using default backoff value")
-            return 5
-
-    @property
-    @abstractmethod
-    def data_field(self) -> str:
-        """The name of the field in the response which contains the data"""
-
-    def should_retry(self, response: requests.Response) -> bool:
-        return response.status_code == requests.codes.REQUEST_TIMEOUT or super().should_retry(response)
-
-
-class JoinChannelsStream(HttpStream):
-    """
-    This class is a special stream which joins channels because the Slack API only returns messages from channels this bot is in.
-    Its responses should only be logged for debugging reasons, not read as records.
-    """
-
-    url_base = "https://slack.com/api/"
-    http_method = "POST"
-    primary_key = "id"
-
-    def __init__(self, channel_filter: List[str] = None, **kwargs):
-        self.channel_filter = channel_filter or []
-        super().__init__(**kwargs)
-
-    def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable:
-        """
-        Override to simply indicate that the specific channel was joined successfully.
-        This method should not return any data, but should return an empty iterable.
-        """
-        self.logger.info(f"Successfully joined channel: {stream_slice['channel_name']}")
-        return []
-
-    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
-        """
-        The pagination is not applicable to this Service Stream.
-        """
-        return None
-
-    def path(self, **kwargs) -> str:
-        return "conversations.join"
-
-    def request_body_json(self, stream_slice: Mapping = None, **kwargs) -> Optional[Mapping]:
-        return {"channel": stream_slice["channel"]}
-
-
-class ChanneledStream(SlackStream, ABC):
-    """Slack stream with channel filter"""
-
-    def __init__(self, channel_filter: List[str] = [], join_channels: bool = False, **kwargs):
-        self.channel_filter = channel_filter
-        self.join_channels = join_channels
-        self.kwargs = kwargs
-        super().__init__(**kwargs)
-
-    @property
-    def join_channels_stream(self) -> JoinChannelsStream:
-        return JoinChannelsStream(authenticator=self.kwargs.get("authenticator"), channel_filter=self.channel_filter)
-
-    def should_join_to_channel(self, channel: Mapping[str, Any]) -> bool:
-        """
-        The `is_member` property indicates whether or not the API Bot is already assigned / joined to the channel.
-        https://api.slack.com/types/conversation#booleans
-        """
-        return self.join_channels and not channel.get("is_member")
-
-    def make_join_channel_slice(self, channel: Mapping[str, Any]) -> Mapping[str, Any]:
-        channel_id: str = channel.get("id")
-        channel_name: str = channel.get("name")
-        self.logger.info(f"Joining Slack Channel: `{channel_name}`")
-        return {"channel": channel_id, "channel_name": channel_name}
-
-
-class Channels(ChanneledStream):
-    data_field = "channels"
-
-    @property
-    def use_cache(self) -> bool:
-        return True
-
-    def path(self, **kwargs) -> str:
-        return "conversations.list"
-
-    def request_params(self, **kwargs) -> MutableMapping[str, Any]:
-        params = super().request_params(**kwargs)
-        params["types"] = "public_channel"
-        return params
-
-    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[MutableMapping]:
-        json_response = response.json()
-        channels = json_response.get(self.data_field, [])
-        if self.channel_filter:
-            channels = [channel for channel in channels if channel["name"] in self.channel_filter]
-        yield from channels
-
-    def read_records(self, sync_mode: SyncMode, **kwargs) -> Iterable[Mapping[str, Any]]:
-        """
-        Override the default `read_records` method to provide the `JoinChannelsStream` functionality,
-        and be able to read all the channels, not just the ones that already has the API Bot joined.
-        """
-        for channel in super().read_records(sync_mode=sync_mode):
-            # check the channel should be joined before reading
-            if self.should_join_to_channel(channel):
-                # join the channel before reading it
-                yield from self.join_channels_stream.read_records(
-                    sync_mode=sync_mode,
-                    stream_slice=self.make_join_channel_slice(channel),
-                )
-            # reading the channel data
-            self.logger.info(f"Reading the channel: `{channel.get('name')}`")
-            yield channel
-
-
-class ChannelMembers(ChanneledStream):
-    data_field = "members"
-    primary_key = ["member_id", "channel_id"]
-
-    def path(self, **kwargs) -> str:
-        return "conversations.members"
-
-    def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
-        params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, **kwargs)
-        params["channel"] = stream_slice["channel_id"]
-        return params
+from source_slack.streams import Threads
 
-    def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]:
-        for member_id in super().parse_response(response, **kwargs):
-            # Slack just returns raw IDs as a string, so we want to put them in a "join table" format
-            yield {"member_id": member_id, "channel_id": stream_slice["channel_id"]}
 
-    def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
-        channels_stream = Channels(authenticator=self._session.auth, channel_filter=self.channel_filter)
-        for channel_record in channels_stream.read_records(sync_mode=SyncMode.full_refresh):
-            yield {"channel_id": channel_record["id"]}
+class SourceSlack(YamlDeclarativeSource):
+    def __init__(self):
+        super().__init__(**{"path_to_yaml": "manifest.yaml"})
 
-
-class Users(SlackStream):
-    data_field = "members"
-
-    def path(self, **kwargs) -> str:
-        return "users.list"
-
-
-# Incremental Streams
-class IncrementalMessageStream(ChanneledStream, ABC):
-    data_field = "messages"
-    cursor_field = "float_ts"
-    primary_key = ["channel_id", "ts"]
-
-    def __init__(self, default_start_date: DateTime, end_date: Optional[DateTime] = None, **kwargs):
-        self._start_ts = default_start_date.timestamp()
-        self._end_ts = end_date and end_date.timestamp()
-        self.set_sub_primary_key()
-        super().__init__(**kwargs)
-
-    def set_sub_primary_key(self):
-        if isinstance(self.primary_key, list):
-            for index, value in enumerate(self.primary_key):
-                setattr(self, f"sub_primary_key_{index + 1}", value)
-        else:
-            self.logger.error("Failed during setting sub primary keys. Primary key should be list.")
-
-    def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
-        params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, **kwargs)
-        params.update(**stream_slice)
-        return params
-
-    def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]:
-        for record in super().parse_response(response, **kwargs):
-            record[self.sub_primary_key_1] = stream_slice.get("channel", "")
-            record[self.cursor_field] = float(record[self.sub_primary_key_2])
-            yield record
-
-    def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
-        current_stream_state = current_stream_state or {}
-        current_stream_state[self.cursor_field] = max(
-            latest_record[self.cursor_field], current_stream_state.get(self.cursor_field, self._start_ts)
-        )
-
-        return current_stream_state
-
-    def read_records(
-        self,
-        sync_mode: SyncMode,
-        cursor_field: List[str] = None,
-        stream_slice: Mapping[str, Any] = None,
-        stream_state: Mapping[str, Any] = None,
-    ) -> Iterable[Mapping[str, Any]]:
-        if not stream_slice:
-            # return an empty iterator
-            # this is done to emit at least one state message when no slices are generated
-            return iter([])
-        return super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)
-
-
-class ChannelMessages(HttpSubStream, IncrementalMessageStream):
-    def path(self, **kwargs) -> str:
-        return "conversations.history"
-
-    @property
-    def use_cache(self) -> bool:
-        return True
-
-    def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
-        stream_state = stream_state or {}
-        start_date = pendulum.from_timestamp(stream_state.get(self.cursor_field, self._start_ts))
-        end_date = self._end_ts and pendulum.from_timestamp(self._end_ts)
-        slice_yielded = False
-        for parent_slice in super().stream_slices(sync_mode=SyncMode.full_refresh):
-            channel = parent_slice["parent"]
-            for period in chunk_date_range(start_date=start_date, end_date=end_date):
-                yield {"channel": channel["id"], "oldest": period.start.timestamp(), "latest": period.end.timestamp()}
-                slice_yielded = True
-        if not slice_yielded:
-            # yield an empty slice to checkpoint state later
-            yield {}
-
-
-class Threads(IncrementalMessageStream):
-    def __init__(self, lookback_window: Mapping[str, int], **kwargs):
-        self.messages_lookback_window = lookback_window
-        super().__init__(**kwargs)
-
-    def path(self, **kwargs) -> str:
-        return "conversations.replies"
-
-    def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
-        """
-        The logic for incrementally syncing threads is not very obvious, so buckle up.
-
-        To get all messages in a thread, one must specify the channel and timestamp of the parent (first) message of that thread,
-        basically its ID.
-
-        One complication is that threads can be updated at Any time in the future. Therefore, if we wanted to comprehensively sync data
-        i.e: get every single response in a thread, we'd have to read every message in the slack instance every time we ran a sync,
-        because otherwise there is no way to guarantee that a thread deep in the past didn't receive a new message.
-
-        A pragmatic workaround is to say we want threads to be at least N days fresh i.e: look back N days into the past,
-        get every message since, and read all of the thread responses. This is essentially the approach we're taking here via slicing:
-        create slices from N days into the past and read all messages in threads since then. We could optionally filter out records we have
-        already read, but that's omitted to keep the logic simple to reason about.
-
-        Good luck.
-        """
-
-        stream_state = stream_state or {}
-        channels_stream = Channels(authenticator=self._session.auth, channel_filter=self.channel_filter)
-
-        if self.cursor_field in stream_state:
-            # Since new messages can be posted to threads continuously after the parent message has been posted,
-            # we get messages from the latest date
-            # found in the state minus X days to pick up any new messages in threads.
-            # If there is state always use lookback
-            messages_start_date = pendulum.from_timestamp(stream_state[self.cursor_field]) - self.messages_lookback_window
-        else:
-            # If there is no state i.e: this is the first sync then there is no use for lookback, just get messages
-            # from the default start date
-            messages_start_date = pendulum.from_timestamp(self._start_ts)
-
-        messages_stream = ChannelMessages(
-            parent=channels_stream,
-            authenticator=self._session.auth,
-            default_start_date=messages_start_date,
-            end_date=self._end_ts and pendulum.from_timestamp(self._end_ts),
-        )
-
-        slice_yielded = False
-        for message_chunk in messages_stream.stream_slices(stream_state={self.cursor_field: messages_start_date.timestamp()}):
-            self.logger.info(f"Syncing replies {message_chunk}")
-            for message in messages_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=message_chunk):
-                yield {"channel": message_chunk["channel"], self.sub_primary_key_2: message[self.sub_primary_key_2]}
-                slice_yielded = True
-        if not slice_yielded:
-            # yield an empty slice to checkpoint state later
-            yield {}
-
-
-class SourceSlack(AbstractSource):
-    def _get_authenticator(self, config: Mapping[str, Any]):
+    def _threads_authenticator(self, config: Mapping[str, Any]):
         # Added to maintain backward compatibility with previous versions
         if "api_token" in config:
             return TokenAuthenticator(config["api_token"])
@@ -363,48 +30,27 @@ def _get_authenticator(self, config: Mapping[str, Any]):
         else:
             raise Exception(f"No supported option_title: {credentials_title} specified. See spec.json for references")
 
-    def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
-        try:
-            authenticator = self._get_authenticator(config)
-            users_stream = Users(authenticator=authenticator)
-            next(users_stream.read_records(SyncMode.full_refresh))
-            return True, None
-        except Exception as e:
-            return (
-                False,
-                f"Got an exception while trying to set up the connection: {e}. "
-                f"Most probably, there are no users in the given Slack instance or your token is incorrect",
-            )
-
-    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
-        authenticator = self._get_authenticator(config)
+    def get_threads_stream(self, config: Mapping[str, Any]) -> HttpStream:
+        authenticator = self._threads_authenticator(config)
         default_start_date = pendulum.parse(config["start_date"])
         # this field is not exposed to spec, used only for testing purposes
         end_date = config.get("end_date")
         end_date = end_date and pendulum.parse(end_date)
         threads_lookback_window = pendulum.Duration(days=config["lookback_window"])
         channel_filter = config.get("channel_filter", [])
-        should_join_to_channels = config.get("join_channels")
+        threads = Threads(
+            authenticator=authenticator,
+            default_start_date=default_start_date,
+            end_date=end_date,
+            lookback_window=threads_lookback_window,
+            channel_filter=channel_filter,
+        )
+        return threads
+
+    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
+        declarative_streams = super().streams(config)
 
-        channels = Channels(authenticator=authenticator, join_channels=should_join_to_channels, channel_filter=channel_filter)
-        streams = [
-            channels,
-            ChannelMembers(authenticator=authenticator, channel_filter=channel_filter),
-            ChannelMessages(
-                parent=channels,
-                authenticator=authenticator,
-                default_start_date=default_start_date,
-                end_date=end_date,
-                channel_filter=channel_filter,
-            ),
-            Threads(
-                authenticator=authenticator,
-                default_start_date=default_start_date,
-                end_date=end_date,
-                lookback_window=threads_lookback_window,
-                channel_filter=channel_filter,
-            ),
-            Users(authenticator=authenticator),
-        ]
+        threads_stream = self.get_threads_stream(config)
+        declarative_streams.append(threads_stream)
 
-        return streams
+        return declarative_streams
diff --git a/airbyte-integrations/connectors/source-slack/source_slack/spec.json b/airbyte-integrations/connectors/source-slack/source_slack/spec.json
index 2ed0ba91abbdf..e59f508e41abe 100644
--- a/airbyte-integrations/connectors/source-slack/source_slack/spec.json
+++ b/airbyte-integrations/connectors/source-slack/source_slack/spec.json
@@ -1,5 +1,4 @@
 {
-  "documentationUrl": "https://docs.airbyte.com/integrations/sources/slack",
   "connectionSpecification": {
     "$schema": "http://json-schema.org/draft-07/schema#",
     "title": "Slack Spec",
@@ -107,6 +106,7 @@
     "predicate_key": ["credentials", "option_title"],
     "predicate_value": "Default OAuth2.0 authorization",
     "oauth_config_specification": {
+      "oauth_user_input_from_connector_config_specification": null,
       "complete_oauth_output_specification": {
         "type": "object",
         "additionalProperties": false,
diff --git a/airbyte-integrations/connectors/source-slack/source_slack/streams.py b/airbyte-integrations/connectors/source-slack/source_slack/streams.py
new file mode 100644
index 0000000000000..b565d9670e25b
--- /dev/null
+++ b/airbyte-integrations/connectors/source-slack/source_slack/streams.py
@@ -0,0 +1,280 @@
+#
+# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+#
+
+
+from abc import ABC, abstractmethod
+from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
+
+import pendulum
+import requests
+from airbyte_cdk.models import SyncMode
+from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
+from pendulum import DateTime
+
+from .components.join_channels import JoinChannelsStream
+from .utils import chunk_date_range
+
+
+class SlackStream(HttpStream, ABC):
+    url_base = "https://slack.com/api/"
+    primary_key = "id"
+    page_size = 1000
+
+    @property
+    def max_retries(self) -> int:
+        # Slack's rate limiting can be unpredictable so we increase the max number of retries by a lot before failing
+        return 20
+
+    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
+        """Slack uses a cursor-based pagination strategy.
+        Extract the cursor from the response if it exists and return it in a format
+        that can be used to update request parameters"""
+
+        json_response = response.json()
+        next_cursor = json_response.get("response_metadata", {}).get("next_cursor")
+        if next_cursor:
+            return {"cursor": next_cursor}
+
+    def request_params(
+        self,
+        stream_state: Mapping[str, Any],
+        stream_slice: Mapping[str, Any] = None,
+        next_page_token: Mapping[str, Any] = None,
+    ) -> MutableMapping[str, Any]:
+        params = {"limit": self.page_size}
+        if next_page_token:
+            params.update(**next_page_token)
+        return params
+
+    def parse_response(
+        self,
+        response: requests.Response,
+        stream_state: Mapping[str, Any] = None,
+        stream_slice: Mapping[str, Any] = None,
+        next_page_token: Mapping[str, Any] = None,
+    ) -> Iterable[MutableMapping]:
+        json_response = response.json()
+        yield from json_response.get(self.data_field, [])
+
+    def backoff_time(self, response: requests.Response) -> Optional[float]:
+        """This method is called if we run into the rate limit.
+        Slack puts the retry time in the `Retry-After` response header so we
+        we return that value. If the response is anything other than a 429 (e.g: 5XX)
+        fall back on default retry behavior.
+        Rate Limits Docs: https://api.slack.com/docs/rate-limits#web"""
+
+        if "Retry-After" in response.headers:
+            return int(response.headers["Retry-After"])
+        else:
+            self.logger.info("Retry-after header not found. Using default backoff value")
+            return 5
+
+    @property
+    @abstractmethod
+    def data_field(self) -> str:
+        """The name of the field in the response which contains the data"""
+
+    def should_retry(self, response: requests.Response) -> bool:
+        return response.status_code == requests.codes.REQUEST_TIMEOUT or super().should_retry(response)
+
+
+class ChanneledStream(SlackStream, ABC):
+    """Slack stream with channel filter"""
+
+    def __init__(self, channel_filter: List[str] = [], join_channels: bool = False, **kwargs):
+        self.channel_filter = channel_filter
+        self.join_channels = join_channels
+        self.kwargs = kwargs
+        super().__init__(**kwargs)
+
+    @property
+    def join_channels_stream(self) -> JoinChannelsStream:
+        return JoinChannelsStream(authenticator=self.kwargs.get("authenticator"), channel_filter=self.channel_filter)
+
+    def should_join_to_channel(self, channel: Mapping[str, Any]) -> bool:
+        """
+        The `is_member` property indicates whether or not the API Bot is already assigned / joined to the channel.
+        https://api.slack.com/types/conversation#booleans
+        """
+        return self.join_channels and not channel.get("is_member")
+
+    def make_join_channel_slice(self, channel: Mapping[str, Any]) -> Mapping[str, Any]:
+        channel_id: str = channel.get("id")
+        channel_name: str = channel.get("name")
+        self.logger.info(f"Joining Slack Channel: `{channel_name}`")
+        return {"channel": channel_id, "channel_name": channel_name}
+
+
+class Channels(ChanneledStream):
+    data_field = "channels"
+
+    @property
+    def use_cache(self) -> bool:
+        return True
+
+    def path(self, **kwargs) -> str:
+        return "conversations.list"
+
+    def request_params(self, **kwargs) -> MutableMapping[str, Any]:
+        params = super().request_params(**kwargs)
+        params["types"] = "public_channel"
+        return params
+
+    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[MutableMapping]:
+        json_response = response.json()
+        channels = json_response.get(self.data_field, [])
+        if self.channel_filter:
+            channels = [channel for channel in channels if channel["name"] in self.channel_filter]
+        yield from channels
+
+    def read_records(self, sync_mode: SyncMode, **kwargs) -> Iterable[Mapping[str, Any]]:
+        """
+        Override the default `read_records` method to provide the `JoinChannelsStream` functionality,
+        and be able to read all the channels, not just the ones that already has the API Bot joined.
+        """
+        for channel in super().read_records(sync_mode=sync_mode):
+            # check the channel should be joined before reading
+            if self.should_join_to_channel(channel):
+                # join the channel before reading it
+                yield from self.join_channels_stream.read_records(
+                    sync_mode=sync_mode,
+                    stream_slice=self.make_join_channel_slice(channel),
+                )
+            # reading the channel data
+            self.logger.info(f"Reading the channel: `{channel.get('name')}`")
+            yield channel
+
+
+# Incremental Streams
+class IncrementalMessageStream(ChanneledStream, ABC):
+    data_field = "messages"
+    cursor_field = "float_ts"
+    primary_key = ["channel_id", "ts"]
+
+    def __init__(self, default_start_date: DateTime, end_date: Optional[DateTime] = None, **kwargs):
+        self._start_ts = default_start_date.timestamp()
+        self._end_ts = end_date and end_date.timestamp()
+        self.set_sub_primary_key()
+        super().__init__(**kwargs)
+
+    def set_sub_primary_key(self):
+        if isinstance(self.primary_key, list):
+            for index, value in enumerate(self.primary_key):
+                setattr(self, f"sub_primary_key_{index + 1}", value)
+        else:
+            self.logger.error("Failed during setting sub primary keys. Primary key should be list.")
+
+    def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
+        params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, **kwargs)
+        params.update(**stream_slice)
+        return params
+
+    def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]:
+        for record in super().parse_response(response, **kwargs):
+            record[self.sub_primary_key_1] = stream_slice.get("channel", "")
+            record[self.cursor_field] = float(record[self.sub_primary_key_2])
+            yield record
+
+    def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
+        current_stream_state = current_stream_state or {}
+        current_stream_state[self.cursor_field] = max(
+            latest_record[self.cursor_field], current_stream_state.get(self.cursor_field, self._start_ts)
+        )
+
+        return current_stream_state
+
+    def read_records(
+        self,
+        sync_mode: SyncMode,
+        cursor_field: List[str] = None,
+        stream_slice: Mapping[str, Any] = None,
+        stream_state: Mapping[str, Any] = None,
+    ) -> Iterable[Mapping[str, Any]]:
+        if not stream_slice:
+            # return an empty iterator
+            # this is done to emit at least one state message when no slices are generated
+            return iter([])
+        return super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)
+
+
+class ChannelMessages(HttpSubStream, IncrementalMessageStream):
+    def path(self, **kwargs) -> str:
+        return "conversations.history"
+
+    @property
+    def use_cache(self) -> bool:
+        return True
+
+    def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
+        stream_state = stream_state or {}
+        start_date = pendulum.from_timestamp(stream_state.get(self.cursor_field, self._start_ts))
+        end_date = self._end_ts and pendulum.from_timestamp(self._end_ts)
+        slice_yielded = False
+        for parent_slice in super().stream_slices(sync_mode=SyncMode.full_refresh):
+            channel = parent_slice["parent"]
+            for period in chunk_date_range(start_date=start_date, end_date=end_date):
+                yield {"channel": channel["id"], "oldest": period.start.timestamp(), "latest": period.end.timestamp()}
+                slice_yielded = True
+        if not slice_yielded:
+            # yield an empty slice to checkpoint state later
+            yield {}
+
+
+class Threads(IncrementalMessageStream):
+    def __init__(self, lookback_window: Mapping[str, int], **kwargs):
+        self.messages_lookback_window = lookback_window
+        super().__init__(**kwargs)
+
+    def path(self, **kwargs) -> str:
+        return "conversations.replies"
+
+    def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
+        """
+        The logic for incrementally syncing threads is not very obvious, so buckle up.
+
+        To get all messages in a thread, one must specify the channel and timestamp of the parent (first) message of that thread,
+        basically its ID.
+
+        One complication is that threads can be updated at Any time in the future. Therefore, if we wanted to comprehensively sync data
+        i.e: get every single response in a thread, we'd have to read every message in the slack instance every time we ran a sync,
+        because otherwise there is no way to guarantee that a thread deep in the past didn't receive a new message.
+
+        A pragmatic workaround is to say we want threads to be at least N days fresh i.e: look back N days into the past,
+        get every message since, and read all of the thread responses. This is essentially the approach we're taking here via slicing:
+        create slices from N days into the past and read all messages in threads since then. We could optionally filter out records we have
+        already read, but that's omitted to keep the logic simple to reason about.
+
+        Good luck.
+        """
+
+        stream_state = stream_state or {}
+        channels_stream = Channels(authenticator=self._session.auth, channel_filter=self.channel_filter)
+
+        if self.cursor_field in stream_state:
+            # Since new messages can be posted to threads continuously after the parent message has been posted,
+            # we get messages from the latest date
+            # found in the state minus X days to pick up any new messages in threads.
+            # If there is state always use lookback
+            messages_start_date = pendulum.from_timestamp(stream_state[self.cursor_field]) - self.messages_lookback_window
+        else:
+            # If there is no state i.e: this is the first sync then there is no use for lookback, just get messages
+            # from the default start date
+            messages_start_date = pendulum.from_timestamp(self._start_ts)
+
+        messages_stream = ChannelMessages(
+            parent=channels_stream,
+            authenticator=self._session.auth,
+            default_start_date=messages_start_date,
+            end_date=self._end_ts and pendulum.from_timestamp(self._end_ts),
+        )
+
+        slice_yielded = False
+        for message_chunk in messages_stream.stream_slices(stream_state={self.cursor_field: messages_start_date.timestamp()}):
+            self.logger.info(f"Syncing replies {message_chunk}")
+            for message in messages_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=message_chunk):
+                yield {"channel": message_chunk["channel"], self.sub_primary_key_2: message[self.sub_primary_key_2]}
+                slice_yielded = True
+        if not slice_yielded:
+            # yield an empty slice to checkpoint state later
+            yield {}
diff --git a/airbyte-integrations/connectors/source-slack/source_slack/utils.py b/airbyte-integrations/connectors/source-slack/source_slack/utils.py
index 7507dbab35657..febfb788442ac 100644
--- a/airbyte-integrations/connectors/source-slack/source_slack/utils.py
+++ b/airbyte-integrations/connectors/source-slack/source_slack/utils.py
@@ -9,7 +9,7 @@
 from pendulum import DateTime, Period
 
 
-def chunk_date_range(start_date: DateTime, interval=pendulum.duration(days=1), end_date: Optional[DateTime] = None) -> Iterable[Period]:
+def chunk_date_range(start_date: DateTime, interval=pendulum.duration(days=100), end_date: Optional[DateTime] = None) -> Iterable[Period]:
     """
     Yields a list of the beginning and ending timestamps of each day between the start date and now.
     The return value is a pendulum.period
diff --git a/airbyte-integrations/connectors/source-slack/unit_tests/configs/actual_config.json b/airbyte-integrations/connectors/source-slack/unit_tests/configs/actual_config.json
new file mode 100644
index 0000000000000..065ffde78394f
--- /dev/null
+++ b/airbyte-integrations/connectors/source-slack/unit_tests/configs/actual_config.json
@@ -0,0 +1,11 @@
+{
+  "start_date": "2021-07-22T20:00:00Z",
+  "end_date": "2021-07-23T20:00:00Z",
+  "lookback_window": 1,
+  "join_channels": true,
+  "channel_filter": ["airbyte-for-beginners", "good-reads"],
+  "credentials": {
+    "api_token": "api-token",
+    "option_title": "API Token Credentials"
+  }
+}
diff --git a/airbyte-integrations/connectors/source-slack/unit_tests/configs/legacy_config.json b/airbyte-integrations/connectors/source-slack/unit_tests/configs/legacy_config.json
new file mode 100644
index 0000000000000..99eda1d750b38
--- /dev/null
+++ b/airbyte-integrations/connectors/source-slack/unit_tests/configs/legacy_config.json
@@ -0,0 +1,8 @@
+{
+  "start_date": "2021-07-22T20:00:00Z",
+  "end_date": "2021-07-23T20:00:00Z",
+  "lookback_window": 1,
+  "join_channels": true,
+  "channel_filter": ["airbyte-for-beginners", "good-reads"],
+  "api_token": "api-token"
+}
diff --git a/airbyte-integrations/connectors/source-slack/unit_tests/conftest.py b/airbyte-integrations/connectors/source-slack/unit_tests/conftest.py
index 6d9254730d5f6..002a9ec96779d 100644
--- a/airbyte-integrations/connectors/source-slack/unit_tests/conftest.py
+++ b/airbyte-integrations/connectors/source-slack/unit_tests/conftest.py
@@ -18,20 +18,11 @@ def conversations_list(requests_mock):
         "https://slack.com/api/conversations.list?limit=1000&types=public_channel",
         json={
             "channels": [
-                {"name": "advice-data-architecture", "id": 1, "is_member": False},
-                {"name": "advice-data-orchestration", "id": 2, "is_member": True},
-                {"name": "airbyte-for-beginners", "id": 3, "is_member": False},
-                {"name": "good-reads", "id": 4, "is_member": True},
-            ]
+                {"id": "airbyte-for-beginners", "is_member": True},
+                {"id": "good-reads", "is_member": True}]
         },
     )
 
-
-@pytest.fixture(autouse=True)
-def join_channels(requests_mock):
-    return requests_mock.register_uri("POST", "https://slack.com/api/conversations.join")
-
-
 def base_config() -> MutableMapping:
     return copy.deepcopy(
         {
@@ -100,7 +91,18 @@ def invalid_config() -> MutableMapping:
     (
         (_token_config(), True),
         (_oauth_config(), True),
-        (_legacy_token_config(), True),
         (_invalid_config(), False),
     ),
 )
+
+
+@pytest.fixture
+def joined_channel():
+    return {"id": "C061EG9SL", "name": "general", "is_channel": True, "is_group": False, "is_im": False,
+            "created": 1449252889,
+            "creator": "U061F7AUR", "is_archived": False, "is_general": True, "unlinked": 0, "name_normalized": "general",
+            "is_shared": False,
+            "is_ext_shared": False, "is_org_shared": False, "pending_shared": [], "is_pending_ext_shared": False,
+            "is_member": True, "is_private": False, "is_mpim": False,
+            "topic": {"value": "Which widget do you worry about?", "creator": "", "last_set": 0},
+            "purpose": {"value": "For widget discussion", "creator": "", "last_set": 0}, "previous_names": []}
diff --git a/airbyte-integrations/connectors/source-slack/unit_tests/test_components.py b/airbyte-integrations/connectors/source-slack/unit_tests/test_components.py
new file mode 100644
index 0000000000000..e40e700fe97b8
--- /dev/null
+++ b/airbyte-integrations/connectors/source-slack/unit_tests/test_components.py
@@ -0,0 +1,99 @@
+# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+
+from unittest.mock import MagicMock
+
+import pendulum
+import pytest
+from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordSelector
+from airbyte_cdk.sources.declarative.requesters import HttpRequester
+from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
+from airbyte_protocol.models import SyncMode
+from source_slack import SourceSlack
+from source_slack.components.channel_members_extractor import ChannelMembersExtractor
+from source_slack.components.join_channels import ChannelsRetriever, JoinChannelsStream
+
+
+def get_stream_by_name(stream_name, config):
+    streams = SourceSlack().streams(config=config)
+    for stream in streams:
+        if stream.name == stream_name:
+            return stream
+    raise ValueError(f"Stream {stream_name} not found")
+
+
+def test_channel_members_extractor(token_config):
+    response_mock = MagicMock()
+    response_mock.json.return_value = {"members": [
+        "U023BECGF",
+        "U061F7AUR",
+        "W012A3CDE"
+    ]}
+    records = ChannelMembersExtractor(config=token_config, parameters={}, field_path=["members"]).extract_records(response=response_mock)
+    assert records == [{"member_id": "U023BECGF"},
+                       {"member_id": "U061F7AUR"},
+                       {"member_id": "W012A3CDE"}]
+
+
+def test_join_channels(token_config, requests_mock, joined_channel):
+    mocked_request = requests_mock.post(
+        url="https://slack.com/api/conversations.join",
+        json={"ok": True, "channel": joined_channel}
+    )
+    token = token_config["credentials"]["api_token"]
+    authenticator = TokenAuthenticator(token)
+    channel_filter = token_config["channel_filter"]
+    stream = JoinChannelsStream(authenticator=authenticator, channel_filter=channel_filter)
+    records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice={"channel": "C061EG9SL", "channel_name": "general"})
+    assert not list(records)
+    assert mocked_request.called
+
+
+def get_channels_retriever_instance(token_config):
+    return ChannelsRetriever(
+        config=token_config,
+        requester=HttpRequester(name="channels", path="conversations.list", url_base="https://slack.com/api/", config=token_config,
+                                parameters={}),
+        record_selector=RecordSelector(
+            extractor=DpathExtractor(field_path=["channels"], config=token_config, parameters={}),
+            config=token_config, parameters={},
+            schema_normalization=None),
+        parameters={}
+    )
+
+
+def test_join_channels_should_join_to_channel(token_config):
+    retriever = get_channels_retriever_instance(token_config)
+    assert retriever.should_join_to_channel(token_config, {"is_member": False}) is True
+    assert retriever.should_join_to_channel(token_config, {"is_member": True}) is False
+
+
+def test_join_channels_make_join_channel_slice(token_config):
+    retriever = get_channels_retriever_instance(token_config)
+    expected_slice = {"channel": "C061EG9SL", "channel_name": "general"}
+    assert retriever.make_join_channel_slice({"id": "C061EG9SL", "name": "general"}) == expected_slice
+
+
+@pytest.mark.parametrize(
+    "join_response, log_message",
+    (
+        ({"ok": True, "channel": {"is_member": True, "id": "channel 2", "name": "test channel"}}, "Successfully joined channel: test channel"),
+        ({"ok": False, "error": "missing_scope", "needed": "channels:write"},
+         "Unable to joined channel: test channel. Reason: {'ok': False, 'error': " "'missing_scope', 'needed': 'channels:write'}"),
+    ),
+    ids=["successful_join_to_channel", "failed_join_to_channel"]
+)
+def test_join_channel_read(requests_mock, token_config, joined_channel, caplog, join_response, log_message):
+    mocked_request = requests_mock.post(
+        url="https://slack.com/api/conversations.join",
+        json=join_response
+    )
+    requests_mock.get(
+        url="https://slack.com/api/conversations.list",
+        json={"channels": [{"is_member": True, "id": "channel 1"}, {"is_member": False, "id": "channel 2", "name": "test channel"}]}
+    )
+
+    retriever = get_channels_retriever_instance(token_config)
+    assert len(list(retriever.read_records(records_schema={}))) == 2
+    assert mocked_request.called
+    assert mocked_request.last_request._request.body == b'{"channel": "channel 2"}'
+    assert log_message in caplog.text
diff --git a/airbyte-integrations/connectors/source-slack/unit_tests/test_config_migrations.py b/airbyte-integrations/connectors/source-slack/unit_tests/test_config_migrations.py
new file mode 100644
index 0000000000000..761597a66fc22
--- /dev/null
+++ b/airbyte-integrations/connectors/source-slack/unit_tests/test_config_migrations.py
@@ -0,0 +1,47 @@
+# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+
+import json
+import os
+from typing import Any, Mapping
+
+from source_slack import SourceSlack
+from source_slack.config_migrations import MigrateLegacyConfig
+
+CMD = "check"
+TEST_CONFIG_LEGACY_PATH = f"{os.path.dirname(__file__)}/configs/legacy_config.json"
+TEST_CONFIG_ACTUAL_PATH = f"{os.path.dirname(__file__)}/configs/actual_config.json"
+
+SOURCE_INPUT_ARGS_LEGACY = [CMD, "--config", TEST_CONFIG_LEGACY_PATH]
+SOURCE_INPUT_ARGS_ACTUAL = [CMD, "--config", TEST_CONFIG_ACTUAL_PATH]
+
+
+def revert_config():
+    with open(TEST_CONFIG_LEGACY_PATH, "r") as test_config:
+        config = json.load(test_config)
+        config.pop("credentials")
+        config.update({"api_token": "api-token"})
+        with open(TEST_CONFIG_LEGACY_PATH, "w") as updated_config:
+            config = json.dumps(config)
+            updated_config.write(config)
+
+
+def load_config(config_path: str = TEST_CONFIG_LEGACY_PATH) -> Mapping[str, Any]:
+    with open(config_path, "r") as config:
+        return json.load(config)
+
+
+def test_config_migration():
+    migration = MigrateLegacyConfig()
+    migration.migrate(SOURCE_INPUT_ARGS_LEGACY, SourceSlack())
+    test_migrated_config = load_config()
+    assert test_migrated_config["credentials"]["api_token"] == "api-token"
+    assert test_migrated_config["credentials"]["option_title"] == "API Token Credentials"
+    revert_config()
+
+
+def test_config_not_migrated():
+    config_before_migration = load_config(TEST_CONFIG_ACTUAL_PATH)
+    migration = MigrateLegacyConfig()
+    migration.migrate(SOURCE_INPUT_ARGS_ACTUAL, SourceSlack())
+    test_migrated_config = load_config(TEST_CONFIG_ACTUAL_PATH)
+    assert config_before_migration == test_migrated_config
diff --git a/airbyte-integrations/connectors/source-slack/unit_tests/test_source.py b/airbyte-integrations/connectors/source-slack/unit_tests/test_source.py
index bef3bf26651f9..ae1a589227970 100644
--- a/airbyte-integrations/connectors/source-slack/unit_tests/test_source.py
+++ b/airbyte-integrations/connectors/source-slack/unit_tests/test_source.py
@@ -10,6 +10,13 @@
 from .conftest import parametrized_configs
 
 
+def get_stream_by_name(stream_name, config):
+    streams = SourceSlack().streams(config=config)
+    for stream in streams:
+        if stream.name == stream_name:
+            return stream
+    raise ValueError(f"Stream {stream_name} not found")
+
 @parametrized_configs
 def test_streams(conversations_list, config, is_valid):
     source = SourceSlack()
@@ -19,28 +26,25 @@ def test_streams(conversations_list, config, is_valid):
     else:
         with pytest.raises(Exception) as exc_info:
             _ = source.streams(config)
-        assert "No supported option_title: None specified. See spec.json for references" in repr(exc_info.value)
+        assert "The path from `authenticator_selection_path` is not found in the config." in repr(exc_info.value)
 
 
 @pytest.mark.parametrize(
     "status_code, response, is_connection_successful, error_msg",
     (
         (200, {"members": [{"id": 1, "name": "Abraham"}]}, True, None),
+        (200, {"ok": False, "error": "invalid_auth"}, False, "Authentication has failed, please update your credentials."),
         (
             400,
             "Bad request",
             False,
-            "Got an exception while trying to set up the connection: 400 Client Error: "
-            "None for url: https://slack.com/api/users.list?limit=1000. Most probably, there are no users in the given Slack instance or "
-            "your token is incorrect",
+            "Got an exception while trying to set up the connection. Most probably, there are no users in the given Slack instance or your token is incorrect.",
         ),
         (
             403,
             "Forbidden",
             False,
-            "Got an exception while trying to set up the connection: 403 Client Error: "
-            "None for url: https://slack.com/api/users.list?limit=1000. Most probably, there are no users in the given Slack instance or "
-            "your token is incorrect",
+            "Got an exception while trying to set up the connection. Most probably, there are no users in the given Slack instance or your token is incorrect.",
         ),
     ),
 )
@@ -49,4 +53,20 @@ def test_check_connection(token_config, requests_mock, status_code, response, is
     source = SourceSlack()
     success, error = source.check_connection(logger=logging.getLogger("airbyte"), config=token_config)
     assert success is is_connection_successful
-    assert error == error_msg
+    if not success:
+        assert error_msg in error
+
+
+def test_threads_auth(token_config, oauth_config):
+    source = SourceSlack()
+    auth = source._threads_authenticator(token_config)
+    assert auth.token == "Bearer api-token"
+    source = SourceSlack()
+    auth = source._threads_authenticator(oauth_config)
+    assert auth.token == "Bearer access-token"
+
+
+def test_get_threads_stream(token_config):
+    source = SourceSlack()
+    threads_stream = source.get_threads_stream(token_config)
+    assert threads_stream
diff --git a/airbyte-integrations/connectors/source-slack/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-slack/unit_tests/test_streams.py
index d0327093318f8..9a3cd092d90b4 100644
--- a/airbyte-integrations/connectors/source-slack/unit_tests/test_streams.py
+++ b/airbyte-integrations/connectors/source-slack/unit_tests/test_streams.py
@@ -7,12 +7,21 @@
 import pendulum
 import pytest
 from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
-from source_slack.source import Channels, Threads, Users
+from source_slack import SourceSlack
+from source_slack.streams import Channels, JoinChannelsStream, Threads
 
 
 @pytest.fixture
-def authenticator(legacy_token_config):
-    return TokenAuthenticator(legacy_token_config["api_token"])
+def authenticator(token_config):
+    return TokenAuthenticator(token_config["credentials"]["api_token"])
+
+
+def get_stream_by_name(stream_name, config):
+    streams = SourceSlack().streams(config=config)
+    for stream in streams:
+        if stream.name == stream_name:
+            return stream
+    raise ValueError(f"Stream {stream_name} not found")
 
 
 @pytest.mark.parametrize(
@@ -25,10 +34,10 @@ def authenticator(legacy_token_config):
             {},
             [
                 # two messages per each channel
-                {"channel": 3, "ts": 1577866844},
-                {"channel": 3, "ts": 1577877406},
-                {"channel": 4, "ts": 1577866844},
-                {"channel": 4, "ts": 1577877406},
+                {'channel': 'airbyte-for-beginners', 'ts': 1577866844},
+                {'channel': 'airbyte-for-beginners', 'ts': 1577877406},
+                {'channel': 'good-reads', 'ts': 1577866844},
+                {'channel': 'good-reads', 'ts': 1577877406},
             ],
         ),
         ("2020-01-02T00:00:00Z", "2020-01-01T00:00:00Z", [], {}, [{}]),
@@ -36,35 +45,36 @@ def authenticator(legacy_token_config):
             "2020-01-01T00:00:00Z",
             "2020-01-02T00:00:00Z",
             [{"ts": 1577866844}, {"ts": 1577877406}],
-            {"float_ts": 1577915266},
+            {"float_ts": 2577866844},
             [
-                # two messages per each channel per datetime slice
-                {"channel": 3, "ts": 1577866844},
-                {"channel": 3, "ts": 1577877406},
-                {"channel": 3, "ts": 1577866844},
-                {"channel": 3, "ts": 1577877406},
-                {"channel": 4, "ts": 1577866844},
-                {"channel": 4, "ts": 1577877406},
-                {"channel": 4, "ts": 1577866844},
-                {"channel": 4, "ts": 1577877406},
+                # no slice when state greater than ts
+                {},
             ],
         ),
     ),
 )
 def test_threads_stream_slices(
-    requests_mock, authenticator, legacy_token_config, start_date, end_date, messages, stream_state, expected_result
+    requests_mock, authenticator, token_config, start_date, end_date, messages, stream_state, expected_result
 ):
+    token_config["channel_filter"] = []
+
+    requests_mock.register_uri(
+        "GET", "https://slack.com/api/conversations.history?limit=1000&channel=airbyte-for-beginners",
+        [{"json": {"messages": messages}}, {"json": {"messages": []}}]
+    )
     requests_mock.register_uri(
-        "GET", "https://slack.com/api/conversations.history", [{"json": {"messages": messages}}, {"json": {"messages": messages}}]
+        "GET", "https://slack.com/api/conversations.history?limit=1000&channel=good-reads",
+        [{"json": {"messages": messages}}, {"json": {"messages": []}}]
     )
+
     start_date = pendulum.parse(start_date)
     end_date = end_date and pendulum.parse(end_date)
+
     stream = Threads(
         authenticator=authenticator,
         default_start_date=start_date,
         end_date=end_date,
-        lookback_window=pendulum.Duration(days=legacy_token_config["lookback_window"]),
-        channel_filter=legacy_token_config["channel_filter"],
+        lookback_window=pendulum.Duration(days=token_config["lookback_window"])
     )
     slices = list(stream.stream_slices(stream_state=stream_state))
     assert slices == expected_result
@@ -79,19 +89,64 @@ def test_threads_stream_slices(
         ({"float_ts": 1577800844}, {"float_ts": 1577866844}, {"float_ts": 1577866844}),
     ),
 )
-def test_get_updated_state(authenticator, legacy_token_config, current_state, latest_record, expected_state):
+def test_get_updated_state(authenticator, token_config, current_state, latest_record, expected_state):
+
     stream = Threads(
         authenticator=authenticator,
-        default_start_date=pendulum.parse(legacy_token_config["start_date"]),
-        lookback_window=legacy_token_config["lookback_window"],
-        channel_filter=legacy_token_config["channel_filter"],
+        default_start_date=pendulum.parse(token_config["start_date"]),
+        lookback_window=token_config["lookback_window"]
     )
     assert stream.get_updated_state(current_stream_state=current_state, latest_record=latest_record) == expected_state
 
 
+def test_threads_request_params(authenticator, token_config):
+    stream = Threads(
+        authenticator=authenticator,
+        default_start_date=pendulum.parse(token_config["start_date"]),
+        lookback_window=token_config["lookback_window"]
+    )
+    threads_slice = {'channel': 'airbyte-for-beginners', 'ts': 1577866844}
+    expected = {'channel': 'airbyte-for-beginners', 'limit': 1000, 'ts': 1577866844}
+    assert stream.request_params(stream_slice=threads_slice, stream_state={}) == expected
+
+
+def test_threads_parse_response(mocker, authenticator, token_config):
+    stream = Threads(
+        authenticator=authenticator,
+        default_start_date=pendulum.parse(token_config["start_date"]),
+        lookback_window=token_config["lookback_window"]
+    )
+    resp = {
+        "messages": [
+            {
+                "type": "message",
+                "user": "U061F7AUR",
+                "text": "island",
+                "thread_ts": "1482960137.003543",
+                "reply_count": 3,
+                "subscribed": True,
+                "last_read": "1484678597.521003",
+                "unread_count": 0,
+                "ts": "1482960137.003543"
+            }
+        ]
+    }
+    resp_mock = mocker.Mock()
+    resp_mock.json.return_value = resp
+    threads_slice = {'channel': 'airbyte-for-beginners', 'ts': 1577866844}
+    actual_response = list(stream.parse_response(response=resp_mock,stream_slice=threads_slice))
+    assert len(actual_response) == 1
+    assert actual_response[0]["float_ts"] == 1482960137.003543
+    assert actual_response[0]["channel_id"] == "airbyte-for-beginners"
+
+
 @pytest.mark.parametrize("headers, expected_result", (({}, 5), ({"Retry-After": 15}, 15)))
-def test_backoff(authenticator, headers, expected_result):
-    stream = Users(authenticator=authenticator)
+def test_backoff(token_config, authenticator, headers, expected_result):
+    stream = Threads(
+        authenticator=authenticator,
+        default_start_date=pendulum.parse(token_config["start_date"]),
+        lookback_window=token_config["lookback_window"]
+    )
     assert stream.backoff_time(Mock(headers=headers)) == expected_result
 
 
@@ -100,11 +155,38 @@ def test_channels_stream_with_autojoin(authenticator) -> None:
     The test uses the `conversations_list` fixture(autouse=true) as API mocker.
     """
     expected = [
-        {'name': 'advice-data-architecture', 'id': 1, 'is_member': False},
-        {'name': 'advice-data-orchestration', 'id': 2, 'is_member': True},
-        {'name': 'airbyte-for-beginners', 'id': 3, 'is_member': False},
-        {'name': 'good-reads', 'id': 4, 'is_member': True},
+        {'id': 'airbyte-for-beginners', 'is_member': True},
+        {'id': 'good-reads', 'is_member': True}
     ]
     stream = Channels(channel_filter=[], join_channels=True, authenticator=authenticator)
     assert list(stream.read_records(None)) == expected
-    
\ No newline at end of file
+
+
+def test_next_page_token(authenticator, token_config):
+    stream = Threads(
+        authenticator=authenticator,
+        default_start_date=pendulum.parse(token_config["start_date"]),
+        lookback_window=token_config["lookback_window"]
+    )
+    mocked_response = Mock()
+    mocked_response.json.return_value = {"response_metadata": {"next_cursor": "next page"}}
+    assert stream.next_page_token(mocked_response) == {"cursor": "next page"}
+
+
+@pytest.mark.parametrize(
+    "status_code, expected",
+    (
+        (200, False),
+        (403, False),
+        (429, True),
+        (500, True),
+    ),
+)
+def test_should_retry(authenticator, token_config, status_code, expected):
+    stream = Threads(
+        authenticator=authenticator,
+        default_start_date=pendulum.parse(token_config["start_date"]),
+        lookback_window=token_config["lookback_window"]
+    )
+    mocked_response = Mock(status_code=status_code)
+    assert stream.should_retry(mocked_response) == expected
diff --git a/docs/integrations/sources/slack-migrations.md b/docs/integrations/sources/slack-migrations.md
new file mode 100644
index 0000000000000..31458bc54c1c9
--- /dev/null
+++ b/docs/integrations/sources/slack-migrations.md
@@ -0,0 +1,18 @@
+# Slack Migration Guide
+
+## Upgrading to 1.0.0
+
+We're continuously striving to enhance the quality and reliability of our connectors at Airbyte. 
+As part of our commitment to delivering exceptional service, we are transitioning source Slack from the
+Python Connector Development Kit (CDK) to our innovative low-code framework. 
+This is part of a strategic move to streamline many processes across connectors, bolstering maintainability and 
+freeing us to focus more of our efforts on improving the performance and features of our evolving platform and growing catalog.
+However, due to differences between the Python and low-code CDKs, this migration constitutes a breaking change. 
+
+We’ve evolved and standardized how state is managed for incremental streams that are nested within a parent stream. 
+This change impacts how individual states are tracked and stored for each partition, using a more structured approach
+to ensure the most granular and flexible state management.
+This change will affect the `Channel Messages` stream.
+
+## Migration Steps
+* A `reset` for `Channel Messages` stream is required after upgrading to this version.
diff --git a/docs/integrations/sources/slack.md b/docs/integrations/sources/slack.md
index 9df24431ec9c8..a8db6c5c6ed23 100644
--- a/docs/integrations/sources/slack.md
+++ b/docs/integrations/sources/slack.md
@@ -161,40 +161,41 @@ Slack has [rate limit restrictions](https://api.slack.com/docs/rate-limits).
 
 ## Changelog
 
-| Version | Date       | Pull Request                                             | Subject                                                                             |
-|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------|
+| Version | Date       | Pull Request                                             | Subject                                                                              |
+|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------|
+| 1.0.0   | 2024-04-02 | [35477](https://github.com/airbytehq/airbyte/pull/35477) | Migration to low-code CDK                                                            |
 | 0.4.1   | 2024-03-27 | [36579](https://github.com/airbytehq/airbyte/pull/36579) | Upgrade airbyte-cdk version to emit record counts as floats                          |
-| 0.4.0   | 2024-03-19 | [36267](https://github.com/airbytehq/airbyte/pull/36267) | Pin airbyte-cdk version to `^0` |
-| 0.3.9   | 2024-02-12 | [35157](https://github.com/airbytehq/airbyte/pull/35157) | Manage dependencies with Poetry. |
+| 0.4.0   | 2024-03-19 | [36267](https://github.com/airbytehq/airbyte/pull/36267) | Pin airbyte-cdk version to `^0`                                                      |
+| 0.3.9   | 2024-02-12 | [35157](https://github.com/airbytehq/airbyte/pull/35157) | Manage dependencies with Poetry.                                                     |
 | 0.3.8   | 2024-02-09 | [35131](https://github.com/airbytehq/airbyte/pull/35131) | Fixed the issue when `schema discovery` fails with `502` due to the platform timeout |
-| 0.3.7   | 2024-01-10 | [1234](https://github.com/airbytehq/airbyte/pull/1234) | prepare for airbyte-lib                                                        |
-| 0.3.6   | 2023-11-21 | [32707](https://github.com/airbytehq/airbyte/pull/32707) | Threads: do not use client-side record filtering                                    |
-| 0.3.5   | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Base image migration: remove Dockerfile and use the python-connector-base image     |
-| 0.3.4   | 2023-10-06 | [31134](https://github.com/airbytehq/airbyte/pull/31134) | Update CDK and remove non iterable return from records                              |
-| 0.3.3   | 2023-09-28 | [30580](https://github.com/airbytehq/airbyte/pull/30580) | Add `bot_id` field to threads schema                                                |
-| 0.3.2   | 2023-09-20 | [30613](https://github.com/airbytehq/airbyte/pull/30613) | Set default value for channel_filters during discover                               |
-| 0.3.1   | 2023-09-19 | [30570](https://github.com/airbytehq/airbyte/pull/30570) | Use default availability strategy                                                   |
-| 0.3.0   | 2023-09-18 | [30521](https://github.com/airbytehq/airbyte/pull/30521) | Add unexpected fields to streams `channel_messages`, `channels`, `threads`, `users` |
-| 0.2.0   | 2023-05-24 | [26497](https://github.com/airbytehq/airbyte/pull/26497) | Fixed `lookback window` value limitations                                           |
-| 0.1.26  | 2023-05-17 | [26186](https://github.com/airbytehq/airbyte/pull/26186) | Limited the `lookback window` range for input configuration                         |
-| 0.1.25  | 2023-03-20 | [22889](https://github.com/airbytehq/airbyte/pull/22889) | Specified date formatting in specification                                          |
-| 0.1.24  | 2023-03-20 | [24126](https://github.com/airbytehq/airbyte/pull/24126) | Increase page size to 1000                                                          |
-| 0.1.23  | 2023-02-21 | [21907](https://github.com/airbytehq/airbyte/pull/21907) | Do not join channels that not gonna be synced                                       |
-| 0.1.22  | 2023-01-27 | [22022](https://github.com/airbytehq/airbyte/pull/22022) | Set `AvailabilityStrategy` for streams explicitly to `None`                         |
-| 0.1.21  | 2023-01-12 | [21321](https://github.com/airbytehq/airbyte/pull/21321) | Retry Timeout error                                                                 |
-| 0.1.20  | 2022-12-21 | [20767](https://github.com/airbytehq/airbyte/pull/20767) | Update schema                                                                       |
-| 0.1.19  | 2022-12-01 | [19970](https://github.com/airbytehq/airbyte/pull/19970) | Remove OAuth2.0 broken `refresh_token` support                                      |
-| 0.1.18  | 2022-09-28 | [17315](https://github.com/airbytehq/airbyte/pull/17315) | Always install latest version of Airbyte CDK                                        |
-| 0.1.17  | 2022-08-28 | [16085](https://github.com/airbytehq/airbyte/pull/16085) | Increase unit test coverage                                                         |
-| 0.1.16  | 2022-08-28 | [16050](https://github.com/airbytehq/airbyte/pull/16050) | Fix SATs                                                                            |
-| 0.1.15  | 2022-03-31 | [11613](https://github.com/airbytehq/airbyte/pull/11613) | Add 'channel_filter' config and improve performance                                 |
-| 0.1.14  | 2022-01-26 | [9575](https://github.com/airbytehq/airbyte/pull/9575)   | Correct schema                                                                      |
-| 0.1.13  | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499)   | Remove base-python dependencies                                                     |
-| 0.1.12  | 2021-10-07 | [6570](https://github.com/airbytehq/airbyte/pull/6570)   | Implement OAuth support with OAuth authenticator                                    |
-| 0.1.11  | 2021-08-27 | [5830](https://github.com/airbytehq/airbyte/pull/5830)   | Fix sync operations hang forever issue                                              |
-| 0.1.10  | 2021-08-27 | [5697](https://github.com/airbytehq/airbyte/pull/5697)   | Fix max retries issue                                                               |
-| 0.1.9   | 2021-07-20 | [4860](https://github.com/airbytehq/airbyte/pull/4860)   | Fix reading threads issue                                                           |
-| 0.1.8   | 2021-07-14 | [4683](https://github.com/airbytehq/airbyte/pull/4683)   | Add float\_ts primary key                                                           |
-| 0.1.7   | 2021-06-25 | [3978](https://github.com/airbytehq/airbyte/pull/3978)   | Release Slack CDK Connector                                                         |
+| 0.3.7   | 2024-01-10 | [1234](https://github.com/airbytehq/airbyte/pull/1234)   | prepare for airbyte-lib                                                              |
+| 0.3.6   | 2023-11-21 | [32707](https://github.com/airbytehq/airbyte/pull/32707) | Threads: do not use client-side record filtering                                     |
+| 0.3.5   | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Base image migration: remove Dockerfile and use the python-connector-base image      |
+| 0.3.4   | 2023-10-06 | [31134](https://github.com/airbytehq/airbyte/pull/31134) | Update CDK and remove non iterable return from records                               |
+| 0.3.3   | 2023-09-28 | [30580](https://github.com/airbytehq/airbyte/pull/30580) | Add `bot_id` field to threads schema                                                 |
+| 0.3.2   | 2023-09-20 | [30613](https://github.com/airbytehq/airbyte/pull/30613) | Set default value for channel_filters during discover                                |
+| 0.3.1   | 2023-09-19 | [30570](https://github.com/airbytehq/airbyte/pull/30570) | Use default availability strategy                                                    |
+| 0.3.0   | 2023-09-18 | [30521](https://github.com/airbytehq/airbyte/pull/30521) | Add unexpected fields to streams `channel_messages`, `channels`, `threads`, `users`  |
+| 0.2.0   | 2023-05-24 | [26497](https://github.com/airbytehq/airbyte/pull/26497) | Fixed `lookback window` value limitations                                            |
+| 0.1.26  | 2023-05-17 | [26186](https://github.com/airbytehq/airbyte/pull/26186) | Limited the `lookback window` range for input configuration                          |
+| 0.1.25  | 2023-03-20 | [22889](https://github.com/airbytehq/airbyte/pull/22889) | Specified date formatting in specification                                           |
+| 0.1.24  | 2023-03-20 | [24126](https://github.com/airbytehq/airbyte/pull/24126) | Increase page size to 1000                                                           |
+| 0.1.23  | 2023-02-21 | [21907](https://github.com/airbytehq/airbyte/pull/21907) | Do not join channels that not gonna be synced                                        |
+| 0.1.22  | 2023-01-27 | [22022](https://github.com/airbytehq/airbyte/pull/22022) | Set `AvailabilityStrategy` for streams explicitly to `None`                          |
+| 0.1.21  | 2023-01-12 | [21321](https://github.com/airbytehq/airbyte/pull/21321) | Retry Timeout error                                                                  |
+| 0.1.20  | 2022-12-21 | [20767](https://github.com/airbytehq/airbyte/pull/20767) | Update schema                                                                        |
+| 0.1.19  | 2022-12-01 | [19970](https://github.com/airbytehq/airbyte/pull/19970) | Remove OAuth2.0 broken `refresh_token` support                                       |
+| 0.1.18  | 2022-09-28 | [17315](https://github.com/airbytehq/airbyte/pull/17315) | Always install latest version of Airbyte CDK                                         |
+| 0.1.17  | 2022-08-28 | [16085](https://github.com/airbytehq/airbyte/pull/16085) | Increase unit test coverage                                                          |
+| 0.1.16  | 2022-08-28 | [16050](https://github.com/airbytehq/airbyte/pull/16050) | Fix SATs                                                                             |
+| 0.1.15  | 2022-03-31 | [11613](https://github.com/airbytehq/airbyte/pull/11613) | Add 'channel_filter' config and improve performance                                  |
+| 0.1.14  | 2022-01-26 | [9575](https://github.com/airbytehq/airbyte/pull/9575)   | Correct schema                                                                       |
+| 0.1.13  | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499)   | Remove base-python dependencies                                                      |
+| 0.1.12  | 2021-10-07 | [6570](https://github.com/airbytehq/airbyte/pull/6570)   | Implement OAuth support with OAuth authenticator                                     |
+| 0.1.11  | 2021-08-27 | [5830](https://github.com/airbytehq/airbyte/pull/5830)   | Fix sync operations hang forever issue                                               |
+| 0.1.10  | 2021-08-27 | [5697](https://github.com/airbytehq/airbyte/pull/5697)   | Fix max retries issue                                                                |
+| 0.1.9   | 2021-07-20 | [4860](https://github.com/airbytehq/airbyte/pull/4860)   | Fix reading threads issue                                                            |
+| 0.1.8   | 2021-07-14 | [4683](https://github.com/airbytehq/airbyte/pull/4683)   | Add float\_ts primary key                                                            |
+| 0.1.7   | 2021-06-25 | [3978](https://github.com/airbytehq/airbyte/pull/3978)   | Release Slack CDK Connector                                                          |
 
 </HideInUI>