1
- use std:: { cell :: RefCell , collections:: BTreeMap , sync:: Arc } ;
1
+ use std:: { collections:: BTreeMap , sync:: Arc } ;
2
2
3
3
use async_graphql:: { Enum , SimpleObject } ;
4
4
use serde:: Serialize ;
5
5
use tokio:: sync:: Mutex ;
6
6
7
7
use crate :: {
8
8
tui:: event:: { CacheResult , TaskResult } ,
9
- wui:: event:: WebUIEvent ,
9
+ wui:: { event:: WebUIEvent , server :: SharedState } ,
10
10
} ;
11
11
12
12
/// Subscribes to the Web UI events and updates the state
@@ -22,23 +22,23 @@ impl Subscriber {
22
22
pub async fn watch (
23
23
self ,
24
24
// We use a tokio::sync::Mutex here because we want this future to be Send.
25
- #[ allow( clippy:: type_complexity) ] state : Arc < Mutex < RefCell < WebUIState > > > ,
25
+ #[ allow( clippy:: type_complexity) ] state : SharedState ,
26
26
) {
27
27
let mut rx = self . rx ;
28
28
while let Some ( event) = rx. recv ( ) . await {
29
29
Self :: add_message ( & state, event) . await ;
30
30
}
31
31
}
32
32
33
- async fn add_message ( state : & Arc < Mutex < RefCell < WebUIState > > > , event : WebUIEvent ) {
34
- let state = state. lock ( ) . await ;
33
+ async fn add_message ( state : & Arc < Mutex < WebUIState > > , event : WebUIEvent ) {
34
+ let mut state = state. lock ( ) . await ;
35
35
36
36
match event {
37
37
WebUIEvent :: StartTask {
38
38
task,
39
39
output_logs : _,
40
40
} => {
41
- state. borrow_mut ( ) . tasks . insert (
41
+ state. tasks . insert (
42
42
task,
43
43
TaskState {
44
44
output : Vec :: new ( ) ,
@@ -49,35 +49,27 @@ impl Subscriber {
49
49
) ;
50
50
}
51
51
WebUIEvent :: TaskOutput { task, output } => {
52
- state
53
- . borrow_mut ( )
54
- . tasks
55
- . get_mut ( & task)
56
- . unwrap ( )
57
- . output
58
- . extend ( output) ;
52
+ state. tasks . get_mut ( & task) . unwrap ( ) . output . extend ( output) ;
59
53
}
60
54
WebUIEvent :: EndTask { task, result } => {
61
- state. borrow_mut ( ) . tasks . get_mut ( & task) . unwrap ( ) . status = TaskStatus :: from ( result) ;
55
+ state. tasks . get_mut ( & task) . unwrap ( ) . status = TaskStatus :: from ( result) ;
62
56
}
63
57
WebUIEvent :: CacheStatus {
64
58
task,
65
59
result,
66
60
message,
67
61
} => {
68
- let mut state_ref = state. borrow_mut ( ) ;
69
-
70
62
if result == CacheResult :: Hit {
71
- state_ref . tasks . get_mut ( & task) . unwrap ( ) . status = TaskStatus :: Cached ;
63
+ state . tasks . get_mut ( & task) . unwrap ( ) . status = TaskStatus :: Cached ;
72
64
}
73
- state_ref . tasks . get_mut ( & task) . unwrap ( ) . cache_result = Some ( result) ;
74
- state_ref . tasks . get_mut ( & task) . unwrap ( ) . cache_message = Some ( message) ;
65
+ state . tasks . get_mut ( & task) . unwrap ( ) . cache_result = Some ( result) ;
66
+ state . tasks . get_mut ( & task) . unwrap ( ) . cache_message = Some ( message) ;
75
67
}
76
68
WebUIEvent :: Stop => {
77
69
// TODO: stop watching
78
70
}
79
71
WebUIEvent :: UpdateTasks { tasks } => {
80
- state. borrow_mut ( ) . tasks = tasks
72
+ state. tasks = tasks
81
73
. into_iter ( )
82
74
. map ( |task| {
83
75
(
@@ -93,7 +85,7 @@ impl Subscriber {
93
85
. collect ( ) ;
94
86
}
95
87
WebUIEvent :: RestartTasks { tasks } => {
96
- state. borrow_mut ( ) . tasks = tasks
88
+ state. tasks = tasks
97
89
. into_iter ( )
98
90
. map ( |task| {
99
91
(
@@ -164,7 +156,7 @@ mod test {
164
156
#[ tokio:: test]
165
157
async fn test_web_ui_state ( ) -> Result < ( ) , crate :: Error > {
166
158
let ( tx, rx) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
167
- let state = Arc :: new ( Mutex :: new ( RefCell :: new ( WebUIState :: default ( ) ) ) ) ;
159
+ let state = Arc :: new ( Mutex :: new ( WebUIState :: default ( ) ) ) ;
168
160
let subscriber = Subscriber :: new ( rx) ;
169
161
170
162
let sender = WebUISender :: new ( tx) ;
@@ -188,7 +180,7 @@ mod test {
188
180
// Run the subscriber blocking
189
181
subscriber. watch ( state. clone ( ) ) . await ;
190
182
191
- let state_handle = state. lock ( ) . await . borrow ( ) . clone ( ) ;
183
+ let state_handle = state. lock ( ) . await . clone ( ) ;
192
184
assert_eq ! ( state_handle. tasks( ) . len( ) , 3 ) ;
193
185
assert_eq ! (
194
186
state_handle. tasks( ) . get( "task2" ) . unwrap( ) . status,
@@ -248,7 +240,7 @@ mod test {
248
240
#[ tokio:: test]
249
241
async fn test_restart_tasks ( ) -> Result < ( ) , crate :: Error > {
250
242
let ( tx, rx) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
251
- let state = Arc :: new ( Mutex :: new ( RefCell :: new ( WebUIState :: default ( ) ) ) ) ;
243
+ let state = Arc :: new ( Mutex :: new ( WebUIState :: default ( ) ) ) ;
252
244
let subscriber = Subscriber :: new ( rx) ;
253
245
254
246
let sender = WebUISender :: new ( tx) ;
@@ -271,7 +263,7 @@ mod test {
271
263
// Run the subscriber blocking
272
264
subscriber. watch ( state. clone ( ) ) . await ;
273
265
274
- let state_handle = state. lock ( ) . await . borrow ( ) . clone ( ) ;
266
+ let state_handle = state. lock ( ) . await . clone ( ) ;
275
267
assert_eq ! ( state_handle. tasks( ) . len( ) , 1 ) ;
276
268
assert_eq ! (
277
269
state_handle. tasks( ) . get( "task" ) . unwrap( ) . status,
0 commit comments