-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathsearch.py
235 lines (198 loc) · 10.5 KB
/
search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import streamlit as st
from src.utils import *
from src.cortex_functions import *
from src.query_result_builder import *
from src.notification import *
import asyncio
import threading
import json
from snowflake.core import Root
config_path = Path("src/settings_config.json")
with open(config_path, "r") as f:
config = json.load(f)
def display_search(session):
st.title("Cortex Search")
st.subheader("Use Your Tables As Context To Search")
# Display "Create or Use Knowledge Source" dropdown
create_or_use = st.selectbox("Select Action", ("Create", "Use","Display"), key="create_or_use")
# Display "Create Cortex Search Service" form
# if st.button("show cortex search service"):
# st.write("Cortex Search Service")
# q = f"SHOW CORTEX SEARCH SERVICES"
# res = session.sql(q).collect()
# st.write(res)
warehouse = config["snowflake"]["warehouse"]
if create_or_use == "Create":
service_name = st.text_input("Enter Service Name", key="service_name")
# Row 1: Database and Schema Selection
col1, col2 = st.columns(2)
with col1:
selected_db = st.selectbox("Database", list_databases(session))
with col2:
selected_schema = st.selectbox("Schema", list_schemas(session, selected_db))
# Row 2: Stage Selection and File Upload
col1, col2 = st.columns(2)
required_columns = []
with col1:
tables = list_tables(session, selected_db, selected_schema)
selected_table = st.selectbox("Table", tables or [])
with col2:
if selected_table:
required_columns = list_columns(session, selected_db, selected_schema, selected_table)
selected_column = st.selectbox("Search Column", required_columns or [])
attributes_cols = [col for col in required_columns if col != selected_column]
# Embedding Options
col1, col2 = st.columns(2)
with col1:
selected_attributes = st.multiselect("Attributes", attributes_cols or [])
with col2:
embedding_model = st.selectbox("Model", config["default_settings"]["embeddings"]["CORTEX_SUPPORTED"])
# Create Cortex Search Service
if st.button("Create"):
# Add notification for process tracking
details = f"Creating cortex search service in table {selected_table} with the selected columns."
print("coming to notification")
notification_id = add_notification_entry(session, "Create Cortex Search Service", "In-Progress", details)
print("added to notification")
try:
# Trigger async cortex search service creation
trigger_async_search_process(
session, selected_db, selected_schema, selected_table, selected_column, selected_attributes,service_name, embedding_model, warehouse, notification_id
)
st.success("Cortex search service creation initiated. Check notifications for updates.")
except Exception as e:
# Update notification to Failed and log the error
update_notification_entry(session, notification_id, "Failed")
add_log_entry(session, "Create Cortex Search Service", str(e))
st.error(f"Failed to initiate cortex search service creation: {e}")
elif create_or_use == "Use":
# for name in cortex_services:
# q = f"DROP CORTEX SEARCH SERVICE {name.lower()}"
# session.sql(q).collect()
# print(cortex_services)
st.subheader("Choose Your Search Service")
col1, col2 = st.columns(2)
with col1:
selected_db = st.selectbox("Database", list_databases(session))
with col2:
selected_schema = st.selectbox("Schema", list_schemas(session, selected_db))
col1, col2 = st.columns(2)
with col1:
cortex_services = list_cortex_services(session,selected_db,selected_schema)
selected_service = st.selectbox("Service", cortex_services or [])
with col2:
data = fetch_cortex_service(session, selected_service,selected_db,selected_schema)
row = data[0]
cols = row.columns.split(",")
attributes = row.attribute_columns.split(",")
columns = st.multiselect("Display Columns", cols)
st.subheader("Create Filter & Limits")
col3, col4 = st.columns(2)
with col3:
filter_column = st.selectbox("Filter Columns", attributes)
with col4:
filter_operator = st.selectbox("Filter Operator", ["@eq", "@contains", "@gte", "@lte"])
filter_value = st.text_input(f"Enter value for {filter_operator} on {filter_column}")
if filter_column and filter_operator and filter_value:
if filter_operator == "@eq":
filter = { "@eq": { filter_column: filter_value } }
elif filter_operator == "@contains":
filter = { "@contains": { filter_column: filter_value} }
elif filter_operator == "@gte":
filter = { "@gte": { filter_column: filter_value } }
elif filter_operator == "@lte":
filter = { "@lte": { filter_column: filter_value } }
st.write(f"Generated Filter: {filter}")
else:
filter = {}
limit = st.slider("Limit Results", min_value=1, max_value=10, value=1)
st.subheader("Choose Your Model")
col5, col6 = st.columns(2)
with col5:
model_type = st.selectbox("Model Type", ["Base","Fine Tuned", "Private Preview"])
with col6:
if model_type == "Base":
selected_model = st.selectbox("Model", config["default_settings"]["model"])
elif model_type == "Private Preview":
selected_model = st.selectbox("Model", config["default_settings"]["private_preview_models"])
else:
fine_tuned_models = fetch_fine_tuned_models(session)
print(fine_tuned_models)
selected_model = st.selectbox("Model", fine_tuned_models)
question = st.text_input("Enter Question", key="question")
if st.button("Search"):
try:
root = Root(session)
service = (root
.databases[selected_db]
.schemas[selected_schema]
.cortex_search_services[selected_service.lower()]
)
print("service: ",service)
print("query: ",question)
print("columns: ",columns)
if not columns:
show_toast_message("Please select columns to display.")
return
columns = [col.lower() for col in columns]
# print("filter: ",filter)
print("limit: ",limit)
resp = service.search(
query=question,
columns=columns,
filter=filter, # Use the dynamically generated filter
limit=int(limit)
)
# print("resp: ",resp)
import pandas as pd
if isinstance(resp.results, list) and len(resp.results) > 0:
df = pd.DataFrame(resp.results)
st.dataframe(df) # Display as a table
else:
st.write("No results found.")
except Exception as e:
add_log_entry(session, "Generate Search Response", str(e))
print(e)
st.error("An error occurred. Please check logs for details.")
elif create_or_use == "Display":
col1, col2,col3 = st.columns(3)
with col1:
selected_db = st.selectbox("Database", list_databases(session))
with col2:
selected_schema = st.selectbox("Schema", list_schemas(session, selected_db))
with col3:
selected_service = st.selectbox("Service", list_cortex_services(session, selected_db, selected_schema))
if st.button("Display"):
data = cortex_search_data_scan(session, selected_service)
st.write(data)
def trigger_async_search_process(session, database, schema, table, column, attributes, service_name, embedding_model, warehouse, notification_id):
"""
Triggers the asynchronous process to create a Cortex Search Service.
Args:
session (snowflake.connector.connection.SnowflakeConnection): Active Snowflake session.
database (str): The database containing the table.
schema (str): The schema containing the table.
table (str): The table to be used for the Cortex Search Service.
columns (list[str]): The column to be used for the Cortex Search Service.
embedding_model (str): The embedding model to use for the process.
warehouse (str): The Snowflake warehouse to use for the process.
notification_id (int): The ID of the notification entry to update.
"""
async def async_rag_process():
try:
# Simulate asynchronous processing
await asyncio.sleep(1)
# Create the Cortex Search Service
create_cortex_search_service(session, database, schema, table, column, attributes,service_name,embedding_model, warehouse)
# Update notification status to Success
update_notification_entry(session, notification_id, "Success")
st.success(f"Cortex Search Service created successfully. Check the notification screen for details.")
except Exception as e:
# Log the error and update notification status to Failed
update_notification_entry(session, notification_id, "Failed")
add_log_entry(session, "Create Cortex Search Service", str(e))
st.error(f"An error occurred: {e}")
raise e
# Trigger async process using threading
thread = threading.Thread(target=asyncio.run, args=(async_rag_process(),))
thread.start()