@@ -101,19 +101,19 @@ def fetch_group_to_event_data(
101
101
102
102
103
103
def get_dcg_group_workflow_detector_data (
104
- workflow_event_dcg_data : dict [str , str ]
105
- ) -> tuple [DataConditionGroupGroups , dict [DataConditionHandler .Type , dict [int , int ]]]:
104
+ workflow_event_dcg_data : dict [str , str ],
105
+ ) -> tuple [DataConditionGroupGroups , dict [DataConditionHandler .Group , dict [int , int ]]]:
106
106
"""
107
107
Parse the data in the buffer hash, which is in the form of {workflow/detector_id}:{group_id}:{dcg_id, ..., dcg_id}:{dcg_type}
108
108
"""
109
109
110
110
dcg_to_groups : DataConditionGroupGroups = defaultdict (set )
111
- trigger_type_to_dcg_model : dict [DataConditionHandler .Type , dict [int , int ]] = defaultdict (dict )
111
+ trigger_group_to_dcg_model : dict [DataConditionHandler .Group , dict [int , int ]] = defaultdict (dict )
112
112
113
113
for workflow_group_dcg , _ in workflow_event_dcg_data .items ():
114
114
data = workflow_group_dcg .split (":" )
115
115
try :
116
- dcg_type = DataConditionHandler .Type (data [3 ])
116
+ dcg_group = DataConditionHandler .Group (data [3 ])
117
117
except ValueError :
118
118
continue
119
119
@@ -123,9 +123,9 @@ def get_dcg_group_workflow_detector_data(
123
123
for dcg_id in dcg_ids :
124
124
dcg_to_groups [dcg_id ].add (group_id )
125
125
126
- trigger_type_to_dcg_model [ dcg_type ][dcg_id ] = int (data [0 ])
126
+ trigger_group_to_dcg_model [ dcg_group ][dcg_id ] = int (data [0 ])
127
127
128
- return dcg_to_groups , trigger_type_to_dcg_model
128
+ return dcg_to_groups , trigger_group_to_dcg_model
129
129
130
130
131
131
def fetch_workflows_envs (
@@ -226,7 +226,7 @@ def get_condition_query_groups(
226
226
227
227
228
228
def get_condition_group_results (
229
- queries_to_groups : dict [UniqueConditionQuery , set [int ]]
229
+ queries_to_groups : dict [UniqueConditionQuery , set [int ]],
230
230
) -> dict [UniqueConditionQuery , dict [int , int ]]:
231
231
condition_group_results = {}
232
232
current_time = timezone .now ()
@@ -386,7 +386,7 @@ def get_group_to_groupevent(
386
386
387
387
def fire_actions_for_groups (
388
388
groups_to_fire : dict [int , set [DataConditionGroup ]],
389
- trigger_type_to_dcg_model : dict [DataConditionHandler .Type , dict [int , int ]],
389
+ trigger_group_to_dcg_model : dict [DataConditionHandler .Group , dict [int , int ]],
390
390
group_to_groupevent : dict [Group , GroupEvent ],
391
391
) -> None :
392
392
for group , group_event in group_to_groupevent .items ():
@@ -396,9 +396,9 @@ def fire_actions_for_groups(
396
396
workflow_triggers : set [DataConditionGroup ] = set ()
397
397
action_filters : set [DataConditionGroup ] = set ()
398
398
for dcg in groups_to_fire [group .id ]:
399
- if dcg .id in trigger_type_to_dcg_model [DataConditionHandler .Type .WORKFLOW_TRIGGER ]:
399
+ if dcg .id in trigger_group_to_dcg_model [DataConditionHandler .Group .WORKFLOW_TRIGGER ]:
400
400
workflow_triggers .add (dcg )
401
- elif dcg .id in trigger_type_to_dcg_model [DataConditionHandler .Type .ACTION_FILTER ]:
401
+ elif dcg .id in trigger_group_to_dcg_model [DataConditionHandler .Group .ACTION_FILTER ]:
402
402
action_filters .add (dcg )
403
403
404
404
# process action filters
@@ -464,11 +464,11 @@ def process_delayed_workflows(
464
464
workflow_event_dcg_data = fetch_group_to_event_data (project_id , Workflow , batch_key )
465
465
466
466
# Get mappings from DataConditionGroups to other info
467
- dcg_to_groups , trigger_type_to_dcg_model = get_dcg_group_workflow_detector_data (
467
+ dcg_to_groups , trigger_group_to_dcg_model = get_dcg_group_workflow_detector_data (
468
468
workflow_event_dcg_data
469
469
)
470
- dcg_to_workflow = trigger_type_to_dcg_model [DataConditionHandler .Type .WORKFLOW_TRIGGER ].copy ()
471
- dcg_to_workflow .update (trigger_type_to_dcg_model [DataConditionHandler .Type .ACTION_FILTER ])
470
+ dcg_to_workflow = trigger_group_to_dcg_model [DataConditionHandler .Group .WORKFLOW_TRIGGER ].copy ()
471
+ dcg_to_workflow .update (trigger_group_to_dcg_model [DataConditionHandler .Group .ACTION_FILTER ])
472
472
473
473
_ , workflows_to_envs = fetch_workflows_envs (list (dcg_to_workflow .values ()))
474
474
data_condition_groups = fetch_data_condition_groups (list (dcg_to_groups .keys ()))
@@ -495,7 +495,7 @@ def process_delayed_workflows(
495
495
dcg_group_to_event_data , list (groups_to_dcgs .keys ()), event_ids , occurrence_ids , project_id
496
496
)
497
497
498
- fire_actions_for_groups (groups_to_dcgs , trigger_type_to_dcg_model , group_to_groupevent )
498
+ fire_actions_for_groups (groups_to_dcgs , trigger_group_to_dcg_model , group_to_groupevent )
499
499
500
500
cleanup_redis_buffer (project_id , workflow_event_dcg_data , batch_key )
501
501
0 commit comments