Skip to content

Commit bd416e2

Browse files
erohmensingjatinyadav-cc
authored andcommitted
restore kvdb to state from airbytehq#35424 (airbytehq#35454)
1 parent 14fecee commit bd416e2

File tree

11 files changed

+1524
-0
lines changed

11 files changed

+1524
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# Kvdb Destination
2+
3+
This is the repository for the [Kvdb](https://kvdb.io) destination connector, written in Python. It is intended to be an example for how to write a Python destination. KvDB is a very simple key value store, which makes it great for the purposes of illustrating how to write a Python destination connector.
4+
5+
## Local development
6+
7+
### Prerequisites
8+
**To iterate on this connector, make sure to complete this prerequisites section.**
9+
10+
#### Minimum Python version required `= 3.7.0`
11+
12+
#### Build & Activate Virtual Environment and install dependencies
13+
From this connector directory, create a virtual environment:
14+
```
15+
python -m venv .venv
16+
```
17+
18+
This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
19+
development environment of choice. To activate it from the terminal, run:
20+
```
21+
source .venv/bin/activate
22+
pip install -r requirements.txt
23+
```
24+
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.
25+
26+
Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
27+
used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
28+
If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
29+
should work as you expect.
30+
31+
#### Building via Gradle
32+
From the Airbyte repository root, run:
33+
```
34+
./gradlew :airbyte-integrations:connectors:destination-kvdb:build
35+
```
36+
37+
#### Create credentials
38+
**If you are a community contributor**, generate the necessary credentials from [Kvdb](https://kvdb.io/docs/api/), and then create a file `secrets/config.json` conforming to the `destination_kvdb/spec.json` file.
39+
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
40+
See `integration_tests/sample_config.json` for a sample config file.
41+
42+
**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination kvdb test creds`
43+
and place them into `secrets/config.json`.
44+
45+
### Locally running the connector
46+
```
47+
python main.py spec
48+
python main.py check --config secrets/config.json
49+
python main.py discover --config secrets/config.json
50+
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
51+
```
52+
53+
### Locally running the connector docker image
54+
55+
56+
57+
#### Build
58+
**Via [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md) (recommended):**
59+
```bash
60+
airbyte-ci connectors --name=destination-kvdb build
61+
```
62+
63+
An image will be built with the tag `airbyte/destination-kvdb:dev`.
64+
65+
**Via `docker build`:**
66+
```bash
67+
docker build -t airbyte/destination-kvdb:dev .
68+
```
69+
#### Run
70+
Then run any of the connector commands as follows:
71+
```
72+
docker run --rm airbyte/destination-kvdb:dev spec
73+
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-kvdb:dev check --config /secrets/config.json
74+
# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages
75+
cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-kvdb:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
76+
```
77+
## Testing
78+
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
79+
First install test dependencies into your virtual environment:
80+
```
81+
pip install .[tests]
82+
```
83+
### Unit Tests
84+
To run unit tests locally, from the connector directory run:
85+
```
86+
python -m pytest unit_tests
87+
```
88+
89+
### Integration Tests
90+
There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector).
91+
#### Custom Integration tests
92+
Place custom tests inside `integration_tests/` folder, then, from the connector root, run
93+
```
94+
python -m pytest integration_tests
95+
```
96+
#### Acceptance Tests
97+
You can run our full test suite locally using [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md):
98+
```bash
99+
airbyte-ci connectors --name=destination-kvdb test
100+
```
101+
102+
103+
## Dependency Management
104+
All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development.
105+
We split dependencies between two groups, dependencies that are:
106+
* required for your connector to work need to go to `MAIN_REQUIREMENTS` list.
107+
* required for the testing need to go to `TEST_REQUIREMENTS` list
108+
109+
### Publishing a new version of the connector
110+
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
111+
1. Make sure your changes are passing our test suite: `airbyte-ci connectors --name=destination-kvdb test`
112+
2. Bump the connector version in `metadata.yaml`: increment the `dockerImageTag` value. Please follow [semantic versioning for connectors](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#semantic-versioning-for-connectors).
113+
3. Make sure the `metadata.yaml` content is up to date.
114+
4. Make the connector documentation and its changelog is up to date (`docs/integrations/destinations/kvdb.md`).
115+
5. Create a Pull Request: use [our PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#pull-request-title-convention).
116+
6. Pat yourself on the back for being an awesome contributor.
117+
7. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
118+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# MIT License
2+
#
3+
# Copyright (c) 2020 Airbyte
4+
#
5+
# Permission is hereby granted, free of charge, to any person obtaining a copy
6+
# of this software and associated documentation files (the "Software"), to deal
7+
# in the Software without restriction, including without limitation the rights
8+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
# copies of the Software, and to permit persons to whom the Software is
10+
# furnished to do so, subject to the following conditions:
11+
#
12+
# The above copyright notice and this permission notice shall be included in all
13+
# copies or substantial portions of the Software.
14+
#
15+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
# SOFTWARE.
22+
23+
24+
from .destination import DestinationKvdb
25+
26+
__all__ = ["DestinationKvdb"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from typing import Any, Iterable, List, Mapping, Tuple, Union
6+
7+
import requests
8+
9+
10+
class KvDbClient:
11+
base_url = "https://kvdb.io"
12+
PAGE_SIZE = 1000
13+
14+
def __init__(self, bucket_id: str, secret_key: str = None):
15+
self.secret_key = secret_key
16+
self.bucket_id = bucket_id
17+
18+
def write(self, key: str, value: Mapping[str, Any]):
19+
return self.batch_write([(key, value)])
20+
21+
def batch_write(self, keys_and_values: List[Tuple[str, Mapping[str, Any]]]):
22+
"""
23+
https://kvdb.io/docs/api/#execute-transaction
24+
"""
25+
request_body = {"txn": [{"set": key, "value": value} for key, value in keys_and_values]}
26+
return self._request("POST", json=request_body)
27+
28+
def list_keys(self, list_values: bool = False, prefix: str = None) -> Iterable[Union[str, List]]:
29+
"""
30+
https://kvdb.io/docs/api/#list-keys
31+
"""
32+
# TODO handle rate limiting
33+
pagination_complete = False
34+
offset = 0
35+
36+
while not pagination_complete:
37+
response = self._request(
38+
"GET",
39+
params={
40+
"limit": self.PAGE_SIZE,
41+
"skip": offset,
42+
"format": "json",
43+
"prefix": prefix or "",
44+
"values": "true" if list_values else "false",
45+
},
46+
endpoint="/", # the "list" endpoint doesn't work without adding a trailing slash to the URL
47+
)
48+
49+
response_json = response.json()
50+
yield from response_json
51+
52+
pagination_complete = len(response_json) < self.PAGE_SIZE
53+
offset += self.PAGE_SIZE
54+
55+
def delete(self, key: Union[str, List[str]]):
56+
"""
57+
https://kvdb.io/docs/api/#execute-transaction
58+
"""
59+
key_list = key if isinstance(key, List) else [key]
60+
request_body = {"txn": [{"delete": k} for k in key_list]}
61+
return self._request("POST", json=request_body)
62+
63+
def _get_base_url(self) -> str:
64+
return f"{self.base_url}/{self.bucket_id}"
65+
66+
def _get_auth_headers(self) -> Mapping[str, Any]:
67+
return {"Authorization": f"Bearer {self.secret_key}"} if self.secret_key else {}
68+
69+
def _request(
70+
self, http_method: str, endpoint: str = None, params: Mapping[str, Any] = None, json: Mapping[str, Any] = None
71+
) -> requests.Response:
72+
url = self._get_base_url() + (endpoint or "")
73+
headers = {"Accept": "application/json", **self._get_auth_headers()}
74+
75+
response = requests.request(method=http_method, params=params, url=url, headers=headers, json=json)
76+
77+
response.raise_for_status()
78+
return response
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
import time
7+
import traceback
8+
import uuid
9+
from typing import Any, Iterable, Mapping
10+
11+
from airbyte_cdk import AirbyteLogger
12+
from airbyte_cdk.destinations import Destination
13+
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type
14+
from destination_kvdb.client import KvDbClient
15+
from destination_kvdb.writer import KvDbWriter
16+
17+
18+
class DestinationKvdb(Destination):
19+
def write(
20+
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
21+
) -> Iterable[AirbyteMessage]:
22+
23+
"""
24+
Reads the input stream of messages, config, and catalog to write data to the destination.
25+
26+
This method returns an iterable (typically a generator of AirbyteMessages via yield) containing state messages received
27+
in the input message stream. Outputting a state message means that every AirbyteRecordMessage which came before it has been
28+
successfully persisted to the destination. This is used to ensure fault tolerance in the case that a sync fails before fully completing,
29+
then the source is given the last state message output from this method as the starting point of the next sync.
30+
"""
31+
writer = KvDbWriter(KvDbClient(**config))
32+
33+
for configured_stream in configured_catalog.streams:
34+
if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
35+
writer.delete_stream_entries(configured_stream.stream.name)
36+
37+
for message in input_messages:
38+
if message.type == Type.STATE:
39+
# Emitting a state message indicates that all records which came before it have been written to the destination. So we flush
40+
# the queue to ensure writes happen, then output the state message to indicate it's safe to checkpoint state
41+
writer.flush()
42+
yield message
43+
elif message.type == Type.RECORD:
44+
record = message.record
45+
writer.queue_write_operation(
46+
record.stream, record.data, time.time_ns() / 1_000_000
47+
) # convert from nanoseconds to milliseconds
48+
else:
49+
# ignore other message types for now
50+
continue
51+
52+
# Make sure to flush any records still in the queue
53+
writer.flush()
54+
55+
def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
56+
"""
57+
Tests if the input configuration can be used to successfully connect to the destination with the needed permissions
58+
e.g: if a provided API token or password can be used to connect and write to the destination.
59+
"""
60+
try:
61+
# Verify write access by attempting to write and then delete to a random key
62+
client = KvDbClient(**config)
63+
random_key = str(uuid.uuid4())
64+
client.write(random_key, {"value": "_airbyte_connection_check"})
65+
client.delete(random_key)
66+
except Exception as e:
67+
traceback.print_exc()
68+
return AirbyteConnectionStatus(
69+
status=Status.FAILED, message=f"An exception occurred: {e}. \nStacktrace: \n{traceback.format_exc()}"
70+
)
71+
else:
72+
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
{
2+
"documentationUrl": "https://kvdb.io/docs/api/",
3+
"supported_destination_sync_modes": ["overwrite", "append"],
4+
"supportsIncremental": true,
5+
"connectionSpecification": {
6+
"$schema": "http://json-schema.org/draft-07/schema#",
7+
"title": "Destination KVdb",
8+
"type": "object",
9+
"required": ["bucket_id", "secret_key"],
10+
"additionalProperties": false,
11+
"properties": {
12+
"bucket_id": {
13+
"title": "Bucket ID",
14+
"type": "string",
15+
"description": "The ID of your KVdb bucket.",
16+
"order": 1
17+
},
18+
"secret_key": {
19+
"title": "Secret Key",
20+
"type": "string",
21+
"description": "Your bucket Secret Key.",
22+
"order": 2
23+
}
24+
}
25+
}
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from collections import Mapping
6+
7+
from destination_kvdb.client import KvDbClient
8+
9+
10+
class KvDbWriter:
11+
"""
12+
Data is written to KvDB in the following format:
13+
key: stream_name__ab__<record_extraction_timestamp>
14+
value: a JSON object representing the record's data
15+
16+
This is because unless a data source explicitly designates a primary key, we don't know what to key the record on.
17+
Since KvDB allows reading records with certain prefixes, we treat it more like a message queue, expecting the reader to
18+
read messages with a particular prefix e.g: name__ab__123, where 123 is the timestamp they last read data from.
19+
"""
20+
21+
write_buffer = []
22+
flush_interval = 1000
23+
24+
def __init__(self, client: KvDbClient):
25+
self.client = client
26+
27+
def delete_stream_entries(self, stream_name: str):
28+
"""Deletes all the records belonging to the input stream"""
29+
keys_to_delete = []
30+
for key in self.client.list_keys(prefix=f"{stream_name}__ab__"):
31+
keys_to_delete.append(key)
32+
if len(keys_to_delete) == self.flush_interval:
33+
self.client.delete(keys_to_delete)
34+
keys_to_delete.clear()
35+
if len(keys_to_delete) > 0:
36+
self.client.delete(keys_to_delete)
37+
38+
def queue_write_operation(self, stream_name: str, record: Mapping, written_at: int):
39+
kv_pair = (f"{stream_name}__ab__{written_at}", record)
40+
self.write_buffer.append(kv_pair)
41+
if len(self.write_buffer) == self.flush_interval:
42+
self.flush()
43+
44+
def flush(self):
45+
self.client.batch_write(self.write_buffer)
46+
self.write_buffer.clear()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
import sys
7+
8+
from destination_kvdb import DestinationKvdb
9+
10+
if __name__ == "__main__":
11+
DestinationKvdb().run(sys.argv[1:])

0 commit comments

Comments
 (0)