Skip to content

Commit 13d3cb4

Browse files
evantahlerjatinyadav-cc
authored andcommitted
destination-kvdb - publish for real (airbytehq#35379)
1 parent 4116758 commit 13d3cb4

File tree

12 files changed

+454
-0
lines changed

12 files changed

+454
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
*
2+
!Dockerfile
3+
!main.py
4+
!destination_kvdb
5+
!setup.py
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
FROM python:3.9.11-alpine3.15 as base
2+
3+
# build and load all requirements
4+
FROM base as builder
5+
WORKDIR /airbyte/integration_code
6+
7+
# upgrade pip to the latest version
8+
RUN apk --no-cache upgrade \
9+
&& pip install --upgrade pip \
10+
&& apk --no-cache add tzdata build-base
11+
12+
13+
COPY setup.py ./
14+
# install necessary packages to a temporary folder
15+
RUN pip install --prefix=/install .
16+
17+
# build a clean environment
18+
FROM base
19+
WORKDIR /airbyte/integration_code
20+
21+
# copy all loaded and built libraries to a pure basic image
22+
COPY --from=builder /install /usr/local
23+
# add default timezone settings
24+
COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime
25+
RUN echo "Etc/UTC" > /etc/timezone
26+
27+
# bash is installed for more convenient debugging.
28+
RUN apk --no-cache add bash
29+
30+
# copy payload code only
31+
COPY main.py ./
32+
COPY destination_kvdb ./destination_kvdb
33+
34+
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
35+
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
36+
37+
LABEL io.airbyte.version=0.1.0
38+
LABEL io.airbyte.name=airbyte/destination-kvdb
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# Kvdb Destination
2+
3+
This is the repository for the [Kvdb](https://kvdb.io) destination connector, written in Python. It is intended to be an example for how to write a Python destination. KvDB is a very simple key value store, which makes it great for the purposes of illustrating how to write a Python destination connector.
4+
5+
## Local development
6+
7+
### Prerequisites
8+
**To iterate on this connector, make sure to complete this prerequisites section.**
9+
10+
#### Minimum Python version required `= 3.7.0`
11+
12+
#### Build & Activate Virtual Environment and install dependencies
13+
From this connector directory, create a virtual environment:
14+
```
15+
python -m venv .venv
16+
```
17+
18+
This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
19+
development environment of choice. To activate it from the terminal, run:
20+
```
21+
source .venv/bin/activate
22+
pip install -r requirements.txt
23+
```
24+
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.
25+
26+
Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
27+
used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
28+
If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
29+
should work as you expect.
30+
31+
#### Building via Gradle
32+
From the Airbyte repository root, run:
33+
```
34+
./gradlew :airbyte-integrations:connectors:destination-kvdb:build
35+
```
36+
37+
#### Create credentials
38+
**If you are a community contributor**, generate the necessary credentials from [Kvdb](https://kvdb.io/docs/api/), and then create a file `secrets/config.json` conforming to the `destination_kvdb/spec.json` file.
39+
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
40+
See `integration_tests/sample_config.json` for a sample config file.
41+
42+
**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination kvdb test creds`
43+
and place them into `secrets/config.json`.
44+
45+
### Locally running the connector
46+
```
47+
python main.py spec
48+
python main.py check --config secrets/config.json
49+
python main.py discover --config secrets/config.json
50+
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
51+
```
52+
53+
### Locally running the connector docker image
54+
55+
56+
57+
#### Build
58+
**Via [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md) (recommended):**
59+
```bash
60+
airbyte-ci connectors --name=destination-kvdb build
61+
```
62+
63+
An image will be built with the tag `airbyte/destination-kvdb:dev`.
64+
65+
**Via `docker build`:**
66+
```bash
67+
docker build -t airbyte/destination-kvdb:dev .
68+
```
69+
#### Run
70+
Then run any of the connector commands as follows:
71+
```
72+
docker run --rm airbyte/destination-kvdb:dev spec
73+
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-kvdb:dev check --config /secrets/config.json
74+
# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages
75+
cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-kvdb:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
76+
```
77+
## Testing
78+
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
79+
First install test dependencies into your virtual environment:
80+
```
81+
pip install .[tests]
82+
```
83+
### Unit Tests
84+
To run unit tests locally, from the connector directory run:
85+
```
86+
python -m pytest unit_tests
87+
```
88+
89+
### Integration Tests
90+
There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector).
91+
#### Custom Integration tests
92+
Place custom tests inside `integration_tests/` folder, then, from the connector root, run
93+
```
94+
python -m pytest integration_tests
95+
```
96+
#### Acceptance Tests
97+
You can run our full test suite locally using [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md):
98+
```bash
99+
airbyte-ci connectors --name=destination-kvdb test
100+
```
101+
102+
103+
## Dependency Management
104+
All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development.
105+
We split dependencies between two groups, dependencies that are:
106+
* required for your connector to work need to go to `MAIN_REQUIREMENTS` list.
107+
* required for the testing need to go to `TEST_REQUIREMENTS` list
108+
109+
### Publishing a new version of the connector
110+
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
111+
1. Make sure your changes are passing our test suite: `airbyte-ci connectors --name=destination-kvdb test`
112+
2. Bump the connector version in `metadata.yaml`: increment the `dockerImageTag` value. Please follow [semantic versioning for connectors](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#semantic-versioning-for-connectors).
113+
3. Make sure the `metadata.yaml` content is up to date.
114+
4. Make the connector documentation and its changelog is up to date (`docs/integrations/destinations/kvdb.md`).
115+
5. Create a Pull Request: use [our PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#pull-request-title-convention).
116+
6. Pat yourself on the back for being an awesome contributor.
117+
7. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
118+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# MIT License
2+
#
3+
# Copyright (c) 2020 Airbyte
4+
#
5+
# Permission is hereby granted, free of charge, to any person obtaining a copy
6+
# of this software and associated documentation files (the "Software"), to deal
7+
# in the Software without restriction, including without limitation the rights
8+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
# copies of the Software, and to permit persons to whom the Software is
10+
# furnished to do so, subject to the following conditions:
11+
#
12+
# The above copyright notice and this permission notice shall be included in all
13+
# copies or substantial portions of the Software.
14+
#
15+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
# SOFTWARE.
22+
23+
24+
from .destination import DestinationKvdb
25+
26+
__all__ = ["DestinationKvdb"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from typing import Any, Iterable, List, Mapping, Tuple, Union
6+
7+
import requests
8+
9+
10+
class KvDbClient:
11+
base_url = "https://kvdb.io"
12+
PAGE_SIZE = 1000
13+
14+
def __init__(self, bucket_id: str, secret_key: str = None):
15+
self.secret_key = secret_key
16+
self.bucket_id = bucket_id
17+
18+
def write(self, key: str, value: Mapping[str, Any]):
19+
return self.batch_write([(key, value)])
20+
21+
def batch_write(self, keys_and_values: List[Tuple[str, Mapping[str, Any]]]):
22+
"""
23+
https://kvdb.io/docs/api/#execute-transaction
24+
"""
25+
request_body = {"txn": [{"set": key, "value": value} for key, value in keys_and_values]}
26+
return self._request("POST", json=request_body)
27+
28+
def list_keys(self, list_values: bool = False, prefix: str = None) -> Iterable[Union[str, List]]:
29+
"""
30+
https://kvdb.io/docs/api/#list-keys
31+
"""
32+
# TODO handle rate limiting
33+
pagination_complete = False
34+
offset = 0
35+
36+
while not pagination_complete:
37+
response = self._request(
38+
"GET",
39+
params={
40+
"limit": self.PAGE_SIZE,
41+
"skip": offset,
42+
"format": "json",
43+
"prefix": prefix or "",
44+
"values": "true" if list_values else "false",
45+
},
46+
endpoint="/", # the "list" endpoint doesn't work without adding a trailing slash to the URL
47+
)
48+
49+
response_json = response.json()
50+
yield from response_json
51+
52+
pagination_complete = len(response_json) < self.PAGE_SIZE
53+
offset += self.PAGE_SIZE
54+
55+
def delete(self, key: Union[str, List[str]]):
56+
"""
57+
https://kvdb.io/docs/api/#execute-transaction
58+
"""
59+
key_list = key if isinstance(key, List) else [key]
60+
request_body = {"txn": [{"delete": k} for k in key_list]}
61+
return self._request("POST", json=request_body)
62+
63+
def _get_base_url(self) -> str:
64+
return f"{self.base_url}/{self.bucket_id}"
65+
66+
def _get_auth_headers(self) -> Mapping[str, Any]:
67+
return {"Authorization": f"Bearer {self.secret_key}"} if self.secret_key else {}
68+
69+
def _request(
70+
self, http_method: str, endpoint: str = None, params: Mapping[str, Any] = None, json: Mapping[str, Any] = None
71+
) -> requests.Response:
72+
url = self._get_base_url() + (endpoint or "")
73+
headers = {"Accept": "application/json", **self._get_auth_headers()}
74+
75+
response = requests.request(method=http_method, params=params, url=url, headers=headers, json=json)
76+
77+
response.raise_for_status()
78+
return response
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
import time
7+
import traceback
8+
import uuid
9+
from typing import Any, Iterable, Mapping
10+
11+
from airbyte_cdk import AirbyteLogger
12+
from airbyte_cdk.destinations import Destination
13+
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type
14+
from destination_kvdb.client import KvDbClient
15+
from destination_kvdb.writer import KvDbWriter
16+
17+
18+
class DestinationKvdb(Destination):
19+
def write(
20+
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
21+
) -> Iterable[AirbyteMessage]:
22+
23+
"""
24+
Reads the input stream of messages, config, and catalog to write data to the destination.
25+
26+
This method returns an iterable (typically a generator of AirbyteMessages via yield) containing state messages received
27+
in the input message stream. Outputting a state message means that every AirbyteRecordMessage which came before it has been
28+
successfully persisted to the destination. This is used to ensure fault tolerance in the case that a sync fails before fully completing,
29+
then the source is given the last state message output from this method as the starting point of the next sync.
30+
"""
31+
writer = KvDbWriter(KvDbClient(**config))
32+
33+
for configured_stream in configured_catalog.streams:
34+
if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
35+
writer.delete_stream_entries(configured_stream.stream.name)
36+
37+
for message in input_messages:
38+
if message.type == Type.STATE:
39+
# Emitting a state message indicates that all records which came before it have been written to the destination. So we flush
40+
# the queue to ensure writes happen, then output the state message to indicate it's safe to checkpoint state
41+
writer.flush()
42+
yield message
43+
elif message.type == Type.RECORD:
44+
record = message.record
45+
writer.queue_write_operation(
46+
record.stream, record.data, time.time_ns() / 1_000_000
47+
) # convert from nanoseconds to milliseconds
48+
else:
49+
# ignore other message types for now
50+
continue
51+
52+
# Make sure to flush any records still in the queue
53+
writer.flush()
54+
55+
def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
56+
"""
57+
Tests if the input configuration can be used to successfully connect to the destination with the needed permissions
58+
e.g: if a provided API token or password can be used to connect and write to the destination.
59+
"""
60+
try:
61+
# Verify write access by attempting to write and then delete to a random key
62+
client = KvDbClient(**config)
63+
random_key = str(uuid.uuid4())
64+
client.write(random_key, {"value": "_airbyte_connection_check"})
65+
client.delete(random_key)
66+
except Exception as e:
67+
traceback.print_exc()
68+
return AirbyteConnectionStatus(
69+
status=Status.FAILED, message=f"An exception occurred: {e}. \nStacktrace: \n{traceback.format_exc()}"
70+
)
71+
else:
72+
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
{
2+
"documentationUrl": "https://kvdb.io/docs/api/",
3+
"supported_destination_sync_modes": ["overwrite", "append"],
4+
"supportsIncremental": true,
5+
"connectionSpecification": {
6+
"$schema": "http://json-schema.org/draft-07/schema#",
7+
"title": "Destination KVdb",
8+
"type": "object",
9+
"required": ["bucket_id", "secret_key"],
10+
"additionalProperties": false,
11+
"properties": {
12+
"bucket_id": {
13+
"title": "Bucket ID",
14+
"type": "string",
15+
"description": "The ID of your KVdb bucket.",
16+
"order": 1
17+
},
18+
"secret_key": {
19+
"title": "Secret Key",
20+
"type": "string",
21+
"description": "Your bucket Secret Key.",
22+
"order": 2
23+
}
24+
}
25+
}
26+
}

0 commit comments

Comments
 (0)