Skip to content

Commit 296ae41

Browse files
committed
Assignment 5 conclusion
0 parents  commit 296ae41

8 files changed

+225
-0
lines changed

.env.example

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
COIN_API_KEY =
2+
OAUTH_CLIENT_ID=
3+
OAUTH_CLIENT_SECRET=
4+
NOTION_AUTH_URL=""
5+
INTERNAL_ACCESS_TOKEN =

.gitignore

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#Secret keys
2+
.env
3+
Keys/
4+
5+
# virtual environments
6+
demo1/
7+
.cache
8+
.venv-source-faker
9+
.venv-source-google-drive
10+
.venv-source-notion
11+
12+
# Autogenerated Keys
13+
token.json

authenticate.py

Whitespace-only changes.

main.py

+113
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import os
2+
import json
3+
import airbyte as ab
4+
from google.auth.transport.requests import Request
5+
from google.oauth2.credentials import Credentials
6+
from google_auth_oauthlib.flow import InstalledAppFlow
7+
from googleapiclient.discovery import build
8+
from googleapiclient.errors import HttpError
9+
from pydantic import BaseModel, Field
10+
from datetime import datetime
11+
import pandas as pd
12+
13+
14+
# If modifying these scopes, delete the file token.json.
15+
SCOPES = ["https://www.googleapis.com/auth/drive"]
16+
17+
18+
def get_credentials():
19+
"""Authenticate user and return credentials."""
20+
creds = None
21+
# Check if token.json exists and load credentials
22+
if os.path.exists("token.json"):
23+
creds = Credentials.from_authorized_user_file("token.json", SCOPES)
24+
25+
# If credentials are not valid, refresh or request new ones
26+
if not creds or not creds.valid:
27+
if creds and creds.expired and creds.refresh_token:
28+
creds.refresh(Request())
29+
else:
30+
flow = InstalledAppFlow.from_client_secrets_file("credentials.json", SCOPES)
31+
creds = flow.run_local_server(port=0)
32+
# Save new credentials
33+
with open("token.json", "w") as token:
34+
token.write(creds.to_json())
35+
return creds
36+
37+
38+
def load_credentials_from_token():
39+
"""Extract client_id, client_secret, and refresh_token from token.json."""
40+
if not os.path.exists("token.json"):
41+
raise FileNotFoundError("token.json not found. Run authentication first.")
42+
43+
with open("token.json", "r") as token_file:
44+
data = json.load(token_file)
45+
46+
return {
47+
"auth_type": "Client",
48+
"client_id": data.get("client_id"),
49+
"client_secret": data.get("client_secret"),
50+
"refresh_token": data.get("refresh_token"),
51+
}
52+
53+
54+
def sync_google_drive():
55+
"""Use Airbyte to sync Google Drive folder."""
56+
try:
57+
credentials = load_credentials_from_token()
58+
folder_url = "https://drive.google.com/drive/folders/10DrawuhFx85xmr8v8vVB6PPOTVWPu7nI" # Extracted from folder URL
59+
60+
source = ab.get_source(
61+
"source-google-drive",
62+
install_if_missing=True,
63+
config={
64+
"folder_url": folder_url,
65+
"credentials": credentials,
66+
"streams": [
67+
{
68+
"name": "Csv_data",
69+
"format": {
70+
"filetype": "csv"
71+
},
72+
"globs": ["**/*.csv"],
73+
},
74+
{
75+
"name":"Unstructured_data",
76+
"format":{
77+
"filetype":"unstructured"
78+
}
79+
}
80+
81+
]
82+
}
83+
)
84+
source.check()
85+
86+
source.select_streams("Unstructured_data") # Select all streams from the Google Drive source
87+
read_result = source.read() # Read the data from the selected streams
88+
# documents_list = []
89+
90+
# Convert the read data into document objects
91+
for stream_name, cached_dataset in read_result.items():
92+
print(f"Stream Name: {stream_name}")
93+
print(cached_dataset)
94+
95+
df = cached_dataset.to_pandas()
96+
print(df.to_string())
97+
98+
99+
100+
except HttpError as error:
101+
print(f"An error occurred: {error}")
102+
except FileNotFoundError as e:
103+
print(e)
104+
105+
106+
def main():
107+
"""Authenticate and sync Google Drive files using Airbyte."""
108+
get_credentials() # Ensure authentication
109+
sync_google_drive() # Sync with Airbyte
110+
111+
112+
if __name__ == "__main__":
113+
main()

notion.py

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import airbyte as ab
2+
import os
3+
from notion_client import Client
4+
5+
notion = Client(auth=os.environ["INTERNAL_ACCESS_TOKEN"])
6+
7+
token = ab.get_secret("INTERNAL_ACCESS_TOKEN")
8+
9+
source = ab.get_source(
10+
'source-notion',
11+
config = {
12+
"credentials":{
13+
"auth_type":"token",
14+
"token" : token
15+
}
16+
}
17+
)
18+
19+
source.check()

requirement.txt

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
airbyte

test.py

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import airbyte as ab
2+
from airbyte.caches import BigQueryCache
3+
import pandas
4+
source: ab.Source = ab.get_source("source-faker")
5+
6+
# Configure the source
7+
source.set_config(
8+
config={
9+
"count": 5000, # Adjust this to get a larger or smaller dataset
10+
"seed": 10,
11+
},
12+
)
13+
# Verify the config and creds by running `check`:
14+
source.check()

test2.py

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import airbyte as ab
2+
import pandas as pd
3+
import matplotlib.pyplot as plt
4+
# Create and configure the source connector:
5+
source = ab.get_source(
6+
"source-coin-api",
7+
config={
8+
"api_key": ab.get_secret("COIN_API_KEY"),
9+
"environment": "production",
10+
"symbol_id": "COINBASE_SPOT_INDEX_USD",
11+
"period": "1DAY",
12+
"start_date": "2023-01-01T00:00:00",
13+
},
14+
streams="*",
15+
)
16+
17+
# Verify the config and creds by running `check`:
18+
source.check()
19+
20+
# Read data from the source into the default cache:
21+
cache = ab.get_default_cache()
22+
result = source.read(cache=cache)
23+
24+
# Read from the cache into a pandas Dataframe:
25+
ohlcv_df = cache["ohlcv_historical_data"].to_pandas()
26+
27+
# Convert 'time_period_start' to datetime format and necessary columns to numeric
28+
ohlcv_df["time_period_start"] = pd.to_datetime(ohlcv_df["time_period_start"])
29+
numeric_columns = [
30+
"price_open",
31+
"price_high",
32+
"price_low",
33+
"price_close",
34+
"volume_traded",
35+
"trades_count",
36+
]
37+
ohlcv_df[numeric_columns] = ohlcv_df[numeric_columns].apply(
38+
pd.to_numeric, errors="coerce"
39+
)
40+
41+
# Calculate daily price movement
42+
ohlcv_df["daily_movement"] = ohlcv_df["price_close"] - ohlcv_df["price_open"]
43+
44+
# Set the 'time_period_start' column as the index for plotting
45+
ohlcv_df.set_index("time_period_start", inplace=True)
46+
47+
# Plotting the daily movement
48+
plt.figure(figsize=(12, 6)) # Set the figure size
49+
plt.plot(ohlcv_df["daily_movement"], marker="o", linestyle="-")
50+
plt.title("Daily Price Movement")
51+
plt.xlabel("Date")
52+
plt.ylabel("Price Movement")
53+
plt.grid(True)
54+
plt.xticks(rotation=45) # Rotates the date labels for better readability
55+
plt.tight_layout() # Adjusts the plot to ensure everything fits without overlapping
56+
57+
# Save the plot as an image file
58+
plt.savefig("daily_price_movement.png")
59+
print("Plot saved as 'daily_price_movement.png'")
60+

0 commit comments

Comments
 (0)