Skip to content

Commit 93c02d7

Browse files
🎉 New Destination deepset (#48875)
Co-authored-by: Natik Gadzhi <[email protected]>
1 parent dd0a913 commit 93c02d7

25 files changed

+3387
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Deepset Destination
2+
3+
This is the repository for the Deepset destination connector, written in Python.
4+
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.com/integrations/destinations/deepset).
5+
6+
## Local development
7+
8+
### Prerequisites
9+
10+
* Python (`^3.10`)
11+
* Poetry (`^1.8`) - installation instructions [here](https://python-poetry.org/docs/#installation)
12+
13+
14+
15+
### Installing the connector
16+
17+
From this connector directory, run:
18+
```bash
19+
poetry install --with dev
20+
```
21+
22+
23+
#### Create credentials
24+
25+
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/destinations/deepset)
26+
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_deepset/spec.json` file.
27+
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
28+
See `integration_tests/sample_config.json` for a sample config file.
29+
30+
**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination deepset test creds`
31+
and place them into `secrets/config.json`.
32+
33+
### Locally running the connector
34+
```
35+
poetry run destination-deepset spec
36+
poetry run destination-deepset check --config secrets/config.json
37+
poetry run destination-deepset write --config secrets/config.json --catalog sample_files/configured_catalog.json
38+
```
39+
40+
### Running tests
41+
42+
To run tests locally, from the connector directory run:
43+
44+
```
45+
poetry run pytest tests
46+
```
47+
48+
### Building the docker image
49+
50+
1. Install [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md)
51+
2. Run the following command to build the docker image:
52+
```bash
53+
airbyte-ci connectors --name=destination-deepset build
54+
```
55+
56+
An image will be available on your host with the tag `airbyte/destination-deepset:dev`.
57+
58+
### Running as a docker container
59+
60+
Then run any of the connector commands as follows:
61+
```
62+
docker run --rm airbyte/destination-deepset:dev spec
63+
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-deepset:dev check --config /secrets/config.json
64+
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-deepset:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
65+
```
66+
67+
### Running our CI test suite
68+
69+
You can run our full test suite locally using [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md):
70+
71+
```bash
72+
airbyte-ci connectors --name=destination-deepset test
73+
```
74+
75+
### Customizing acceptance Tests
76+
77+
Customize `acceptance-test-config.yml` file to configure acceptance tests. See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information.
78+
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
79+
80+
### Dependency Management
81+
82+
All of your dependencies should be managed via Poetry.
83+
To add a new dependency, run:
84+
85+
```bash
86+
poetry add <package-name>
87+
```
88+
89+
Please commit the changes to `pyproject.toml` and `poetry.lock` files.
90+
91+
## Publishing a new version of the connector
92+
93+
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
94+
1. Make sure your changes are passing our test suite: `airbyte-ci connectors --name=destination-deepset test`
95+
2. Bump the connector version (please follow [semantic versioning for connectors](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#semantic-versioning-for-connectors)):
96+
- bump the `dockerImageTag` value in in `metadata.yaml`
97+
- bump the `version` value in `pyproject.toml`
98+
3. Make sure the `metadata.yaml` content is up to date.
99+
4. Make sure the connector documentation and its changelog is up to date (`docs/integrations/destinations/deepset.md`).
100+
5. Create a Pull Request: use [our PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#pull-request-title-convention).
101+
6. Pat yourself on the back for being an awesome contributor.
102+
7. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
103+
8. Once your PR is merged, the new version of the connector will be automatically published to Docker Hub and our connector registry.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
connector_image: airbyte/destination-deepset:dev
2+
acceptance_tests:
3+
spec:
4+
tests:
5+
- spec_path: "destination_deepset/spec.json"
6+
connection:
7+
tests:
8+
- config_path: "secrets/config.json"
9+
status: "succeed"
10+
- config_path: "sample_files/invalid_config.json"
11+
status: "failed"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
from __future__ import annotations
5+
6+
from .destination import DestinationDeepset
7+
8+
9+
__all__ = ["DestinationDeepset"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
from __future__ import annotations
4+
5+
from uuid import UUID
6+
7+
import httpx
8+
from httpx import HTTPError, HTTPStatusError
9+
from tenacity import Retrying, retry_if_exception_type, stop_after_attempt, wait_random_exponential
10+
11+
from destination_deepset import util
12+
from destination_deepset.models import DeepsetCloudConfig, DeepsetCloudFile
13+
14+
15+
class APIError(RuntimeError):
16+
"""Raised when any error occurs while using the API."""
17+
18+
19+
class ConfigurationError(ValueError, APIError):
20+
"""Raised when the configuration is missing or incorrect."""
21+
22+
23+
class FileUploadError(APIError):
24+
"""Raised when the server is unable to successfully upload the file."""
25+
26+
def __str__(self) -> str:
27+
return "File upload failed."
28+
29+
30+
class DeepsetCloudApi:
31+
def __init__(self, config: DeepsetCloudConfig) -> None:
32+
self.config = config
33+
self._client: httpx.Client | None = None
34+
35+
# retry settings in seconds
36+
self.max = 60
37+
self.multiplier = 0.5
38+
39+
@property
40+
def client(self) -> httpx.Client:
41+
if not self.config:
42+
raise ConfigurationError
43+
44+
if self._client is None:
45+
self._client = httpx.Client(
46+
base_url=self.config.base_url.removesuffix("/"),
47+
headers={
48+
"Accept": "application/json",
49+
"Authorization": f"Bearer {self.config.api_key}",
50+
"X-Client-Source": "airbyte-destination-deepset",
51+
},
52+
follow_redirects=True,
53+
)
54+
55+
return self._client
56+
57+
def retry(self) -> Retrying:
58+
"""Retrial configurations
59+
60+
Returns:
61+
Retrying: The instance
62+
"""
63+
return Retrying(
64+
retry=retry_if_exception_type(HTTPError),
65+
stop=stop_after_attempt(self.config.retries),
66+
wait=wait_random_exponential(multiplier=self.multiplier, max=self.max),
67+
reraise=True,
68+
)
69+
70+
def health_check(self) -> None:
71+
"""Check the health of deepset cloud API
72+
73+
Raises:
74+
APIError: Raised when an error is encountered.
75+
"""
76+
try:
77+
for attempt in self.retry():
78+
with attempt:
79+
response = self.client.get("/api/v1/me")
80+
response.raise_for_status()
81+
82+
workspaces = util.get(response.json(), "organization.workspaces", [])
83+
access = next((True for workspace in workspaces if workspace["name"] == self.config.workspace), False)
84+
except Exception as ex:
85+
raise APIError from ex
86+
else:
87+
if access:
88+
return
89+
90+
error = "User does not have access to the selected workspace!"
91+
raise ConfigurationError(error)
92+
93+
def upload(self, file: DeepsetCloudFile, write_mode: str = "KEEP") -> UUID:
94+
"""Upload file to deepset Cloud.
95+
96+
Args:
97+
file (DeepsetCloudFile): The file to upload
98+
write_mode (str, Optional): The write mode. Defaults to `KEEP`.
99+
100+
Raises:
101+
APIError: Raised whenever the file upload fails
102+
103+
Returns:
104+
UUID: The unique identifier of the uploaded file
105+
"""
106+
107+
try:
108+
for attempt in self.retry():
109+
with attempt:
110+
response = self.client.post(
111+
f"/api/v1/workspaces/{self.config.workspace}/files",
112+
files={"file": (file.name, file.content)},
113+
data={"meta": file.meta_as_string},
114+
params={"write_mode": write_mode},
115+
)
116+
response.raise_for_status()
117+
118+
if file_id := response.json().get("file_id"):
119+
return UUID(file_id)
120+
121+
except HTTPStatusError as ex:
122+
status_code, response_text = ex.response.status_code, ex.response.text
123+
message = f"File upload failed: {status_code = }, {response_text = }."
124+
raise FileUploadError(message) from ex
125+
except Exception as ex:
126+
raise FileUploadError from ex
127+
128+
raise FileUploadError
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
from __future__ import annotations
5+
6+
import logging
7+
from collections.abc import Iterable, Mapping
8+
from typing import Any
9+
10+
from airbyte_cdk.destinations import Destination
11+
from airbyte_cdk.models import (
12+
AirbyteConnectionStatus,
13+
AirbyteMessage,
14+
ConfiguredAirbyteCatalog,
15+
DestinationSyncMode,
16+
Status,
17+
Type,
18+
)
19+
from destination_deepset import util
20+
from destination_deepset.models import DeepsetCloudFile
21+
from destination_deepset.writer import DeepsetCloudFileWriter
22+
23+
24+
logger = logging.getLogger("airbyte")
25+
26+
27+
class DestinationDeepset(Destination):
28+
def write(
29+
self,
30+
config: Mapping[str, Any],
31+
configured_catalog: ConfiguredAirbyteCatalog,
32+
input_messages: Iterable[AirbyteMessage],
33+
) -> Iterable[AirbyteMessage]:
34+
"""Reads the input stream of messages, config, and catalog to write data to the destination.
35+
36+
This method returns an iterable (typically a generator of AirbyteMessages via yield) containing state messages
37+
received in the input message stream. Outputting a state message means that every AirbyteRecordMessage which
38+
came before it has been successfully persisted to the destination. This is used to ensure fault tolerance in the
39+
case that a sync fails before fully completing, then the source is given the last state message output from this
40+
method as the starting point of the next sync.
41+
42+
Args:
43+
config (Mapping[str, Any]): dict of JSON configuration matching the configuration declared in spec.json
44+
configured_catalog (ConfiguredAirbyteCatalog): The Configured Catalog describing the schema of the data
45+
being received and how it should be persisted in the destination
46+
input_messages (Iterable[AirbyteMessage]): The stream of input messages received from the source
47+
48+
Returns:
49+
Iterable[AirbyteMessage]: Iterable of AirbyteStateMessages wrapped in AirbyteMessage structs
50+
"""
51+
writer = DeepsetCloudFileWriter.factory(config)
52+
53+
streams: dict[str, DestinationSyncMode] = {
54+
catalog_stream.stream.name: catalog_stream.destination_sync_mode for catalog_stream in configured_catalog.streams
55+
}
56+
57+
for message in input_messages:
58+
if message.type == Type.STATE:
59+
yield message
60+
elif message.type == Type.RECORD:
61+
if (destination_sync_mode := streams.get(message.record.stream)) is None:
62+
logger.debug(f"Stream {message.record.stream} was not present in configured streams, skipping")
63+
continue
64+
65+
try:
66+
file = DeepsetCloudFile.from_record(message.record)
67+
except ValueError as ex:
68+
yield util.get_trace_message("Failed to parse data into deepset cloud file instance.", exception=ex)
69+
else:
70+
yield writer.write(file, destination_sync_mode=destination_sync_mode)
71+
else:
72+
continue
73+
74+
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
75+
"""Tests if the input configuration can be used to successfully connect to the destination with the needed
76+
permissions e.g: if a provided API token or password can be used to connect and write to the destination.
77+
78+
Args:
79+
logger (logging.Logger): Logging object to display debug/info/error to the logs (logs will not be accessible
80+
via airbyte UI if they are not passed to this logger)
81+
config (Mapping[str, Any]): Json object containing the configuration of this destination, content of this
82+
json is as specified in the properties of the spec.json file
83+
84+
Returns:
85+
AirbyteConnectionStatus: AirbyteConnectionStatus indicating a Success or Failure
86+
"""
87+
try:
88+
writer = DeepsetCloudFileWriter.factory(config)
89+
writer.client.health_check()
90+
except Exception as ex:
91+
message = f"Failed to connect to deepset cloud, reason: {ex!s}"
92+
return AirbyteConnectionStatus(status=Status.FAILED, message=message)
93+
94+
return AirbyteConnectionStatus(status=Status.SUCCEEDED)

0 commit comments

Comments
 (0)