Skip to content

Commit 5ca02ec

Browse files
Rachel ChenRachel Chen
Rachel Chen
authored and
Rachel Chen
committed
comments and ready
1 parent b903a77 commit 5ca02ec

File tree

11 files changed

+116
-42
lines changed

11 files changed

+116
-42
lines changed

snuba/datasets/entities/storage_selectors/eap_items.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def select_storage(
2020

2121
tier = query_settings.get_sampling_tier()
2222

23-
if tier == Tier.TIER_1:
23+
if tier == Tier.TIER_1 or tier == Tier.TIER_NO_TIER:
2424
storage_key = StorageKey.EAP_ITEMS
2525
else:
2626
storage_key = getattr(StorageKey, f"EAP_ITEMS_DOWNSAMPLE_{tier.value}")

snuba/downsampled_storage_tiers.py

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33

44
class Tier(Enum):
5+
TIER_NO_TIER = -1
56
TIER_1 = 1
67
TIER_8 = 8
78
TIER_64 = 64

snuba/query/query_settings.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def __init__(
8787
apply_default_subscriptable_mapping: bool = True,
8888
) -> None:
8989
super().__init__()
90-
self.__tier = Tier.TIER_1
90+
self.__tier = Tier.TIER_NO_TIER
9191
self.__turbo = turbo
9292
self.__consistent = consistent
9393
self.__debug = debug

snuba/web/db_query.py

+6
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@
6767
SerializableExceptionDict,
6868
)
6969
from snuba.web import QueryException, QueryResult, constants
70+
from snuba.web.rpc.v1.resolvers.R_eap_spans.common.sampling_in_storage_util import (
71+
add_sampling_tier_to_query_stats,
72+
)
7073

7174
metrics = MetricsWrapper(environment.metrics, "db_query")
7275

@@ -732,6 +735,9 @@ def db_query(
732735
metrics.increment("cache_miss", tags={"dataset": dataset_name})
733736
if stats.get("cache_hit_simple"):
734737
metrics.increment("cache_hit_simple", tags={"dataset": dataset_name})
738+
739+
add_sampling_tier_to_query_stats(result, query_settings)
740+
735741
if result:
736742
return result
737743
raise error or Exception(

snuba/web/rpc/common/debug_info.py

+24-14
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,29 @@
1515
from snuba.web import QueryResult
1616

1717

18+
def _construct_meta_if_downsampled(
19+
query_results: List[QueryResult],
20+
) -> DownsampledStorageMeta | None:
21+
highest_sampling_tier = Tier.TIER_NO_TIER
22+
23+
for query_result in query_results:
24+
sampling_tier = query_result.extra.get("stats", {}).get("sampling_tier")
25+
if sampling_tier:
26+
if sampling_tier.value > highest_sampling_tier.value:
27+
highest_sampling_tier = sampling_tier
28+
29+
return (
30+
DownsampledStorageMeta(
31+
tier=getattr(
32+
DownsampledStorageMeta.SelectedTier,
33+
"SELECTED_" + highest_sampling_tier.name,
34+
),
35+
)
36+
if highest_sampling_tier != Tier.TIER_NO_TIER
37+
else None
38+
)
39+
40+
1841
def extract_response_meta(
1942
request_id: str,
2043
debug: bool,
@@ -24,20 +47,7 @@ def extract_response_meta(
2447
) -> ResponseMeta:
2548
query_info: List[QueryInfo] = []
2649

27-
downsampled_storage_meta = None
28-
29-
if extract_sampling_tier:
30-
assert (
31-
len(query_results) == 1
32-
), "we can only extract 1 result at a time for EndpointTimeSeres and EndpointTraceItemTable"
33-
sampling_tier = (
34-
query_results[0].extra.get("stats", {}).get("sampling_tier", Tier.TIER_1)
35-
)
36-
downsampled_storage_meta = DownsampledStorageMeta(
37-
tier=getattr(
38-
DownsampledStorageMeta.SelectedTier, "SELECTED_" + sampling_tier.name
39-
),
40-
)
50+
downsampled_storage_meta = _construct_meta_if_downsampled(query_results)
4151

4252
if not debug:
4353
return (

snuba/web/rpc/v1/resolvers/R_eap_spans/common/sampling_in_storage_util.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ def construct_query_settings(
2727
def add_sampling_tier_to_query_stats(
2828
query_result: QueryResult, query_settings: HTTPQuerySettings
2929
) -> None:
30-
stats = dict(query_result.extra["stats"])
31-
stats["sampling_tier"] = query_settings.get_sampling_tier()
32-
query_result.extra["stats"] = stats
30+
if query_settings.get_sampling_tier() != Tier.TIER_NO_TIER:
31+
stats = dict(query_result.extra["stats"])
32+
stats["sampling_tier"] = query_settings.get_sampling_tier()
33+
query_result.extra["stats"] = stats

snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
use_eap_items_table,
5656
)
5757
from snuba.web.rpc.v1.resolvers.R_eap_spans.common.sampling_in_storage_util import (
58-
add_sampling_tier_to_query_stats,
5958
construct_query_settings,
6059
)
6160

@@ -148,6 +147,7 @@ def _convert_result_timeseries(
148147
]
149148

150149
# this loop fill in our pre-computed dictionaries so that we can zerofill later
150+
print("datainresultconverstion", data)
151151
for row in data:
152152
group_by_map = {}
153153

@@ -399,7 +399,7 @@ def resolve(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse:
399399
request=snuba_request,
400400
timer=self._timer,
401401
)
402-
add_sampling_tier_to_query_stats(res, query_settings)
402+
# add_sampling_tier_to_query_stats(res, query_settings)
403403
response_meta = extract_response_meta(
404404
in_msg.meta.request_id,
405405
in_msg.meta.debug,

snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
use_eap_items_table,
5757
)
5858
from snuba.web.rpc.v1.resolvers.R_eap_spans.common.sampling_in_storage_util import (
59-
add_sampling_tier_to_query_stats,
6059
construct_query_settings,
6160
)
6261

@@ -370,7 +369,7 @@ def resolve(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse:
370369
request=snuba_request,
371370
timer=self._timer,
372371
)
373-
add_sampling_tier_to_query_stats(res, query_settings)
372+
# add_sampling_tier_to_query_stats(res, query_settings)
374373
column_values = convert_results(in_msg, res.result.get("data", []))
375374
response_meta = extract_response_meta(
376375
in_msg.meta.request_id,

tests/conftest.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
)
1414
from snuba.core.initialize import initialize_snuba
1515
from snuba.datasets.factory import reset_dataset_factory
16-
from snuba.datasets.schemas.tables import WritableTableSchema
16+
from snuba.datasets.schemas.tables import TableSchema
1717
from snuba.datasets.storages.factory import get_all_storage_keys, get_storage
1818
from snuba.environment import setup_sentry
1919
from snuba.redis import all_redis_clients
@@ -189,15 +189,20 @@ def _clear_db() -> None:
189189
database = cluster.get_database()
190190

191191
schema = storage.get_schema()
192-
if isinstance(schema, WritableTableSchema):
192+
if isinstance(schema, TableSchema):
193193
table_name = schema.get_local_table_name()
194194

195195
nodes = [*cluster.get_local_nodes(), *cluster.get_distributed_nodes()]
196196
for node in nodes:
197197
connection = cluster.get_node_connection(
198198
ClickhouseClientSettings.MIGRATE, node
199199
)
200-
connection.execute(f"TRUNCATE TABLE IF EXISTS {database}.{table_name}")
200+
try:
201+
connection.execute(
202+
f"TRUNCATE TABLE IF EXISTS {database}.{table_name}"
203+
)
204+
except Exception:
205+
pass
201206

202207

203208
@pytest.fixture

tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series.py

+34-10
Original file line numberDiff line numberDiff line change
@@ -1293,6 +1293,17 @@ def test_preflight(self) -> None:
12931293
metrics=[DummyMetric("test_preflight_metric", get_value=lambda x: 1)],
12941294
)
12951295

1296+
aggregations = [
1297+
AttributeAggregation(
1298+
aggregate=Function.FUNCTION_SUM,
1299+
key=AttributeKey(
1300+
type=AttributeKey.TYPE_FLOAT, name="test_preflight_metric"
1301+
),
1302+
label="sum",
1303+
extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE,
1304+
),
1305+
]
1306+
12961307
preflight_message = TimeSeriesRequest(
12971308
meta=RequestMeta(
12981309
project_ids=[1, 2, 3],
@@ -1308,22 +1319,35 @@ def test_preflight(self) -> None:
13081319
mode=DownsampledStorageConfig.MODE_PREFLIGHT
13091320
),
13101321
),
1311-
aggregations=[
1312-
AttributeAggregation(
1313-
aggregate=Function.FUNCTION_SUM,
1314-
key=AttributeKey(
1315-
type=AttributeKey.TYPE_FLOAT, name="test_preflight_metric"
1316-
),
1317-
label="sum",
1318-
extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE,
1322+
aggregations=aggregations,
1323+
granularity_secs=granularity_secs,
1324+
)
1325+
1326+
message_to_non_downsampled_tier = TimeSeriesRequest(
1327+
meta=RequestMeta(
1328+
project_ids=[1, 2, 3],
1329+
organization_id=1,
1330+
cogs_category="something",
1331+
referrer="something",
1332+
start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())),
1333+
end_timestamp=Timestamp(
1334+
seconds=int(BASE_TIME.timestamp() + query_duration)
13191335
),
1320-
],
1336+
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
1337+
),
1338+
aggregations=aggregations,
13211339
granularity_secs=granularity_secs,
13221340
)
13231341

13241342
preflight_response = EndpointTimeSeries().execute(preflight_message)
1343+
non_downsampled_tier_response = EndpointTimeSeries().execute(
1344+
message_to_non_downsampled_tier
1345+
)
13251346

1326-
assert preflight_response.result_timeseries == []
1347+
assert (
1348+
len(preflight_response.result_timeseries)
1349+
< len(non_downsampled_tier_response.result_timeseries) / 100
1350+
)
13271351
assert (
13281352
preflight_response.meta.downsampled_storage_meta
13291353
== DownsampledStorageMeta(

tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py

+34-6
Original file line numberDiff line numberDiff line change
@@ -3114,13 +3114,17 @@ def test_preflight(self, setup_teardown: Any) -> None:
31143114
msg_timestamp,
31153115
tags={"preflighttag": "preflight"},
31163116
)
3117-
for i in range(30)
3117+
for _ in range(30)
31183118
]
31193119
write_raw_unprocessed_events(items_storage, messages) # type: ignore
31203120

31213121
ts = Timestamp(seconds=int(BASE_TIME.timestamp()))
31223122
hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp())
3123-
message = TraceItemTableRequest(
3123+
3124+
columns = [
3125+
Column(key=AttributeKey(type=AttributeKey.TYPE_STRING, name="preflighttag"))
3126+
]
3127+
preflight_message = TraceItemTableRequest(
31243128
meta=RequestMeta(
31253129
project_ids=[1, 2, 3],
31263130
organization_id=1,
@@ -3134,14 +3138,38 @@ def test_preflight(self, setup_teardown: Any) -> None:
31343138
mode=DownsampledStorageConfig.MODE_PREFLIGHT
31353139
),
31363140
),
3141+
columns=columns,
3142+
)
3143+
3144+
message_to_non_downsampled_tier = TraceItemTableRequest(
3145+
meta=RequestMeta(
3146+
project_ids=[1, 2, 3],
3147+
organization_id=1,
3148+
cogs_category="something",
3149+
referrer="something",
3150+
start_timestamp=Timestamp(seconds=hour_ago),
3151+
end_timestamp=ts,
3152+
request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480",
3153+
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
3154+
),
31373155
columns=[
31383156
Column(
31393157
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="preflighttag")
31403158
)
31413159
],
31423160
)
3143-
response = EndpointTraceItemTable().execute(message)
3144-
assert response.column_values == []
3145-
assert response.meta.downsampled_storage_meta == DownsampledStorageMeta(
3146-
tier=DownsampledStorageMeta.SelectedTier.SELECTED_TIER_512
3161+
3162+
preflight_response = EndpointTraceItemTable().execute(preflight_message)
3163+
non_downsampled_tier_response = EndpointTraceItemTable().execute(
3164+
message_to_non_downsampled_tier
3165+
)
3166+
assert (
3167+
len(preflight_response.column_values)
3168+
< len(non_downsampled_tier_response.column_values) / 100
3169+
)
3170+
assert (
3171+
preflight_response.meta.downsampled_storage_meta
3172+
== DownsampledStorageMeta(
3173+
tier=DownsampledStorageMeta.SelectedTier.SELECTED_TIER_512
3174+
)
31473175
)

0 commit comments

Comments
 (0)