From 645fa2491762fa3615327efe23a5fa51ff29e1d9 Mon Sep 17 00:00:00 2001
From: evantahler <evan@airbyte.io>
Date: Fri, 16 Feb 2024 15:29:54 -0800
Subject: [PATCH] destination-kvdb - publish for real

---
 .../connectors/destination-kvdb/.dockerignore |   5 +
 .../connectors/destination-kvdb/Dockerfile    |  38 ++++++
 .../connectors/destination-kvdb/README.md     | 118 ++++++++++++++++++
 .../destination_kvdb/__init__.py              |  26 ++++
 .../destination_kvdb/client.py                |  78 ++++++++++++
 .../destination_kvdb/destination.py           |  72 +++++++++++
 .../destination_kvdb/spec.json                |  26 ++++
 .../destination_kvdb/writer.py                |  46 +++++++
 .../connectors/destination-kvdb/main.py       |  11 ++
 .../destination-kvdb/requirements.txt         |   1 +
 .../connectors/destination-kvdb/setup.py      |  26 ++++
 .../destination-kvdb/unit_tests/unit_test.py  |   7 ++
 12 files changed, 454 insertions(+)
 create mode 100644 airbyte-integrations/connectors/destination-kvdb/.dockerignore
 create mode 100644 airbyte-integrations/connectors/destination-kvdb/Dockerfile
 create mode 100644 airbyte-integrations/connectors/destination-kvdb/README.md
 create mode 100644 airbyte-integrations/connectors/destination-kvdb/destination_kvdb/__init__.py
 create mode 100644 airbyte-integrations/connectors/destination-kvdb/destination_kvdb/client.py
 create mode 100644 airbyte-integrations/connectors/destination-kvdb/destination_kvdb/destination.py
 create mode 100644 airbyte-integrations/connectors/destination-kvdb/destination_kvdb/spec.json
 create mode 100644 airbyte-integrations/connectors/destination-kvdb/destination_kvdb/writer.py
 create mode 100644 airbyte-integrations/connectors/destination-kvdb/main.py
 create mode 100644 airbyte-integrations/connectors/destination-kvdb/requirements.txt
 create mode 100644 airbyte-integrations/connectors/destination-kvdb/setup.py
 create mode 100644 airbyte-integrations/connectors/destination-kvdb/unit_tests/unit_test.py

diff --git a/airbyte-integrations/connectors/destination-kvdb/.dockerignore b/airbyte-integrations/connectors/destination-kvdb/.dockerignore
new file mode 100644
index 0000000000000..1b4b5767b5549
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-kvdb/.dockerignore
@@ -0,0 +1,5 @@
+*
+!Dockerfile
+!main.py
+!destination_kvdb
+!setup.py
diff --git a/airbyte-integrations/connectors/destination-kvdb/Dockerfile b/airbyte-integrations/connectors/destination-kvdb/Dockerfile
new file mode 100644
index 0000000000000..b0493da1cfc1a
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-kvdb/Dockerfile
@@ -0,0 +1,38 @@
+FROM python:3.9.11-alpine3.15 as base
+
+# build and load all requirements
+FROM base as builder
+WORKDIR /airbyte/integration_code
+
+# upgrade pip to the latest version
+RUN apk --no-cache upgrade \
+    && pip install --upgrade pip \
+    && apk --no-cache add tzdata build-base
+
+
+COPY setup.py ./
+# install necessary packages to a temporary folder
+RUN pip install --prefix=/install .
+
+# build a clean environment
+FROM base
+WORKDIR /airbyte/integration_code
+
+# copy all loaded and built libraries to a pure basic image
+COPY --from=builder /install /usr/local
+# add default timezone settings
+COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime
+RUN echo "Etc/UTC" > /etc/timezone
+
+# bash is installed for more convenient debugging.
+RUN apk --no-cache add bash
+
+# copy payload code only
+COPY main.py ./
+COPY destination_kvdb ./destination_kvdb
+
+ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
+ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
+
+LABEL io.airbyte.version=0.1.0
+LABEL io.airbyte.name=airbyte/destination-kvdb
diff --git a/airbyte-integrations/connectors/destination-kvdb/README.md b/airbyte-integrations/connectors/destination-kvdb/README.md
new file mode 100644
index 0000000000000..b834894111b6b
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-kvdb/README.md
@@ -0,0 +1,118 @@
+# Kvdb Destination
+
+This is the repository for the [Kvdb](https://kvdb.io) destination connector, written in Python. It is intended to be an example for how to write a Python destination. KvDB is a very simple key value store, which makes it great for the purposes of illustrating how to write a Python destination connector.
+
+## Local development
+
+### Prerequisites
+**To iterate on this connector, make sure to complete this prerequisites section.**
+
+#### Minimum Python version required `= 3.7.0`
+
+#### Build & Activate Virtual Environment and install dependencies
+From this connector directory, create a virtual environment:
+```
+python -m venv .venv
+```
+
+This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
+development environment of choice. To activate it from the terminal, run:
+```
+source .venv/bin/activate
+pip install -r requirements.txt
+```
+If you are in an IDE, follow your IDE's instructions to activate the virtualenv.
+
+Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
+used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
+If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
+should work as you expect.
+
+#### Building via Gradle
+From the Airbyte repository root, run:
+```
+./gradlew :airbyte-integrations:connectors:destination-kvdb:build
+```
+
+#### Create credentials
+**If you are a community contributor**, generate the necessary credentials from [Kvdb](https://kvdb.io/docs/api/), and then create a file `secrets/config.json` conforming to the `destination_kvdb/spec.json` file.
+Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
+See `integration_tests/sample_config.json` for a sample config file.
+
+**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination kvdb test creds`
+and place them into `secrets/config.json`.
+
+### Locally running the connector
+```
+python main.py spec
+python main.py check --config secrets/config.json
+python main.py discover --config secrets/config.json
+python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
+```
+
+### Locally running the connector docker image
+
+
+
+#### Build
+**Via [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md) (recommended):**
+```bash
+airbyte-ci connectors --name=destination-kvdb build
+```
+
+An image will be built with the tag `airbyte/destination-kvdb:dev`.
+
+**Via `docker build`:**
+```bash
+docker build -t airbyte/destination-kvdb:dev .
+```
+#### Run
+Then run any of the connector commands as follows:
+```
+docker run --rm airbyte/destination-kvdb:dev spec
+docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-kvdb:dev check --config /secrets/config.json
+# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages
+cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-kvdb:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
+```
+## Testing
+   Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
+First install test dependencies into your virtual environment:
+```
+pip install .[tests]
+```
+### Unit Tests
+To run unit tests locally, from the connector directory run:
+```
+python -m pytest unit_tests
+```
+
+### Integration Tests
+There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector).
+#### Custom Integration tests
+Place custom tests inside `integration_tests/` folder, then, from the connector root, run
+```
+python -m pytest integration_tests
+```
+#### Acceptance Tests
+You can run our full test suite locally using [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md):
+```bash
+airbyte-ci connectors --name=destination-kvdb test
+```
+
+
+## Dependency Management
+All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development.
+We split dependencies between two groups, dependencies that are:
+* required for your connector to work need to go to `MAIN_REQUIREMENTS` list.
+* required for the testing need to go to `TEST_REQUIREMENTS` list
+
+### Publishing a new version of the connector
+You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
+1. Make sure your changes are passing our test suite: `airbyte-ci connectors --name=destination-kvdb test`
+2. Bump the connector version in `metadata.yaml`: increment the `dockerImageTag` value. Please follow [semantic versioning for connectors](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#semantic-versioning-for-connectors).
+3. Make sure the `metadata.yaml` content is up to date.
+4. Make the connector documentation and its changelog is up to date (`docs/integrations/destinations/kvdb.md`).
+5. Create a Pull Request: use [our PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#pull-request-title-convention).
+6. Pat yourself on the back for being an awesome contributor.
+7. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
+
diff --git a/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/__init__.py b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/__init__.py
new file mode 100644
index 0000000000000..5f3b041035bf9
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/__init__.py
@@ -0,0 +1,26 @@
+# MIT License
+#
+# Copyright (c) 2020 Airbyte
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+
+from .destination import DestinationKvdb
+
+__all__ = ["DestinationKvdb"]
diff --git a/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/client.py b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/client.py
new file mode 100644
index 0000000000000..74d9f41176f57
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/client.py
@@ -0,0 +1,78 @@
+#
+# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+#
+
+from typing import Any, Iterable, List, Mapping, Tuple, Union
+
+import requests
+
+
+class KvDbClient:
+    base_url = "https://kvdb.io"
+    PAGE_SIZE = 1000
+
+    def __init__(self, bucket_id: str, secret_key: str = None):
+        self.secret_key = secret_key
+        self.bucket_id = bucket_id
+
+    def write(self, key: str, value: Mapping[str, Any]):
+        return self.batch_write([(key, value)])
+
+    def batch_write(self, keys_and_values: List[Tuple[str, Mapping[str, Any]]]):
+        """
+        https://kvdb.io/docs/api/#execute-transaction
+        """
+        request_body = {"txn": [{"set": key, "value": value} for key, value in keys_and_values]}
+        return self._request("POST", json=request_body)
+
+    def list_keys(self, list_values: bool = False, prefix: str = None) -> Iterable[Union[str, List]]:
+        """
+        https://kvdb.io/docs/api/#list-keys
+        """
+        # TODO handle rate limiting
+        pagination_complete = False
+        offset = 0
+
+        while not pagination_complete:
+            response = self._request(
+                "GET",
+                params={
+                    "limit": self.PAGE_SIZE,
+                    "skip": offset,
+                    "format": "json",
+                    "prefix": prefix or "",
+                    "values": "true" if list_values else "false",
+                },
+                endpoint="/",  # the "list" endpoint doesn't work without adding a trailing slash to the URL
+            )
+
+            response_json = response.json()
+            yield from response_json
+
+            pagination_complete = len(response_json) < self.PAGE_SIZE
+            offset += self.PAGE_SIZE
+
+    def delete(self, key: Union[str, List[str]]):
+        """
+        https://kvdb.io/docs/api/#execute-transaction
+        """
+        key_list = key if isinstance(key, List) else [key]
+        request_body = {"txn": [{"delete": k} for k in key_list]}
+        return self._request("POST", json=request_body)
+
+    def _get_base_url(self) -> str:
+        return f"{self.base_url}/{self.bucket_id}"
+
+    def _get_auth_headers(self) -> Mapping[str, Any]:
+        return {"Authorization": f"Bearer {self.secret_key}"} if self.secret_key else {}
+
+    def _request(
+        self, http_method: str, endpoint: str = None, params: Mapping[str, Any] = None, json: Mapping[str, Any] = None
+    ) -> requests.Response:
+        url = self._get_base_url() + (endpoint or "")
+        headers = {"Accept": "application/json", **self._get_auth_headers()}
+
+        response = requests.request(method=http_method, params=params, url=url, headers=headers, json=json)
+
+        response.raise_for_status()
+        return response
diff --git a/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/destination.py b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/destination.py
new file mode 100644
index 0000000000000..33ab8565fae4b
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/destination.py
@@ -0,0 +1,72 @@
+#
+# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+#
+
+
+import time
+import traceback
+import uuid
+from typing import Any, Iterable, Mapping
+
+from airbyte_cdk import AirbyteLogger
+from airbyte_cdk.destinations import Destination
+from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type
+from destination_kvdb.client import KvDbClient
+from destination_kvdb.writer import KvDbWriter
+
+
+class DestinationKvdb(Destination):
+    def write(
+        self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
+    ) -> Iterable[AirbyteMessage]:
+
+        """
+        Reads the input stream of messages, config, and catalog to write data to the destination.
+
+        This method returns an iterable (typically a generator of AirbyteMessages via yield) containing state messages received
+        in the input message stream. Outputting a state message means that every AirbyteRecordMessage which came before it has been
+        successfully persisted to the destination. This is used to ensure fault tolerance in the case that a sync fails before fully completing,
+        then the source is given the last state message output from this method as the starting point of the next sync.
+        """
+        writer = KvDbWriter(KvDbClient(**config))
+
+        for configured_stream in configured_catalog.streams:
+            if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
+                writer.delete_stream_entries(configured_stream.stream.name)
+
+        for message in input_messages:
+            if message.type == Type.STATE:
+                # Emitting a state message indicates that all records which came before it have been written to the destination. So we flush
+                # the queue to ensure writes happen, then output the state message to indicate it's safe to checkpoint state
+                writer.flush()
+                yield message
+            elif message.type == Type.RECORD:
+                record = message.record
+                writer.queue_write_operation(
+                    record.stream, record.data, time.time_ns() / 1_000_000
+                )  # convert from nanoseconds to milliseconds
+            else:
+                # ignore other message types for now
+                continue
+
+        # Make sure to flush any records still in the queue
+        writer.flush()
+
+    def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
+        """
+        Tests if the input configuration can be used to successfully connect to the destination with the needed permissions
+            e.g: if a provided API token or password can be used to connect and write to the destination.
+        """
+        try:
+            # Verify write access by attempting to write and then delete to a random key
+            client = KvDbClient(**config)
+            random_key = str(uuid.uuid4())
+            client.write(random_key, {"value": "_airbyte_connection_check"})
+            client.delete(random_key)
+        except Exception as e:
+            traceback.print_exc()
+            return AirbyteConnectionStatus(
+                status=Status.FAILED, message=f"An exception occurred: {e}. \nStacktrace: \n{traceback.format_exc()}"
+            )
+        else:
+            return AirbyteConnectionStatus(status=Status.SUCCEEDED)
diff --git a/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/spec.json b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/spec.json
new file mode 100644
index 0000000000000..0ced52c17a227
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/spec.json
@@ -0,0 +1,26 @@
+{
+  "documentationUrl": "https://kvdb.io/docs/api/",
+  "supported_destination_sync_modes": ["overwrite", "append"],
+  "supportsIncremental": true,
+  "connectionSpecification": {
+    "$schema": "http://json-schema.org/draft-07/schema#",
+    "title": "Destination KVdb",
+    "type": "object",
+    "required": ["bucket_id", "secret_key"],
+    "additionalProperties": false,
+    "properties": {
+      "bucket_id": {
+        "title": "Bucket ID",
+        "type": "string",
+        "description": "The ID of your KVdb bucket.",
+        "order": 1
+      },
+      "secret_key": {
+        "title": "Secret Key",
+        "type": "string",
+        "description": "Your bucket Secret Key.",
+        "order": 2
+      }
+    }
+  }
+}
diff --git a/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/writer.py b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/writer.py
new file mode 100644
index 0000000000000..33acbf8a22fb3
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/writer.py
@@ -0,0 +1,46 @@
+#
+# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+#
+
+from collections import Mapping
+
+from destination_kvdb.client import KvDbClient
+
+
+class KvDbWriter:
+    """
+    Data is written to KvDB in the following format:
+        key: stream_name__ab__<record_extraction_timestamp>
+        value: a JSON object representing the record's data
+
+    This is because unless a data source explicitly designates a primary key, we don't know what to key the record on.
+    Since KvDB allows reading records with certain prefixes, we treat it more like a message queue, expecting the reader to
+    read messages with a particular prefix e.g: name__ab__123, where 123 is the timestamp they last read data from.
+    """
+
+    write_buffer = []
+    flush_interval = 1000
+
+    def __init__(self, client: KvDbClient):
+        self.client = client
+
+    def delete_stream_entries(self, stream_name: str):
+        """Deletes all the records belonging to the input stream"""
+        keys_to_delete = []
+        for key in self.client.list_keys(prefix=f"{stream_name}__ab__"):
+            keys_to_delete.append(key)
+            if len(keys_to_delete) == self.flush_interval:
+                self.client.delete(keys_to_delete)
+                keys_to_delete.clear()
+        if len(keys_to_delete) > 0:
+            self.client.delete(keys_to_delete)
+
+    def queue_write_operation(self, stream_name: str, record: Mapping, written_at: int):
+        kv_pair = (f"{stream_name}__ab__{written_at}", record)
+        self.write_buffer.append(kv_pair)
+        if len(self.write_buffer) == self.flush_interval:
+            self.flush()
+
+    def flush(self):
+        self.client.batch_write(self.write_buffer)
+        self.write_buffer.clear()
diff --git a/airbyte-integrations/connectors/destination-kvdb/main.py b/airbyte-integrations/connectors/destination-kvdb/main.py
new file mode 100644
index 0000000000000..178789589e5af
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-kvdb/main.py
@@ -0,0 +1,11 @@
+#
+# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+#
+
+
+import sys
+
+from destination_kvdb import DestinationKvdb
+
+if __name__ == "__main__":
+    DestinationKvdb().run(sys.argv[1:])
diff --git a/airbyte-integrations/connectors/destination-kvdb/requirements.txt b/airbyte-integrations/connectors/destination-kvdb/requirements.txt
new file mode 100644
index 0000000000000..d6e1198b1ab1f
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-kvdb/requirements.txt
@@ -0,0 +1 @@
+-e .
diff --git a/airbyte-integrations/connectors/destination-kvdb/setup.py b/airbyte-integrations/connectors/destination-kvdb/setup.py
new file mode 100644
index 0000000000000..dab5520718aba
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-kvdb/setup.py
@@ -0,0 +1,26 @@
+#
+# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+#
+
+
+from setuptools import find_packages, setup
+
+MAIN_REQUIREMENTS = [
+    "airbyte-cdk",
+    "requests",
+]
+
+TEST_REQUIREMENTS = ["pytest~=6.1"]
+
+setup(
+    name="destination_kvdb",
+    description="Destination implementation for Kvdb.",
+    author="Airbyte",
+    author_email="contact@airbyte.io",
+    packages=find_packages(),
+    install_requires=MAIN_REQUIREMENTS,
+    package_data={"": ["*.json"]},
+    extras_require={
+        "tests": TEST_REQUIREMENTS,
+    },
+)
diff --git a/airbyte-integrations/connectors/destination-kvdb/unit_tests/unit_test.py b/airbyte-integrations/connectors/destination-kvdb/unit_tests/unit_test.py
new file mode 100644
index 0000000000000..219ae0142c724
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-kvdb/unit_tests/unit_test.py
@@ -0,0 +1,7 @@
+#
+# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+#
+
+
+def test_example_method():
+    assert True