@@ -13,8 +13,9 @@ use thiserror::Error;
13
13
use tokio:: { sync:: mpsc, time:: timeout} ;
14
14
use tonic:: transport:: Endpoint ;
15
15
use tracing:: debug;
16
+ use turbopath:: AbsoluteSystemPath ;
16
17
17
- use super :: { proto:: turbod_client:: TurbodClient , DaemonClient } ;
18
+ use super :: { proto:: turbod_client:: TurbodClient , DaemonClient , Paths } ;
18
19
use crate :: daemon:: DaemonError ;
19
20
20
21
#[ derive( Error , Debug ) ]
@@ -64,11 +65,23 @@ pub struct DaemonConnector {
64
65
/// Whether the connector is allowed to kill a running daemon (for example,
65
66
/// in the event of a version mismatch).
66
67
pub can_kill_server : bool ,
67
- pub pid_file : turbopath:: AbsoluteSystemPathBuf ,
68
- pub sock_file : turbopath:: AbsoluteSystemPathBuf ,
68
+ pub paths : Paths ,
69
69
}
70
70
71
71
impl DaemonConnector {
72
+ pub fn new (
73
+ can_start_server : bool ,
74
+ can_kill_server : bool ,
75
+ repo_root : & AbsoluteSystemPath ,
76
+ ) -> Self {
77
+ let paths = Paths :: from_repo_root ( repo_root) ;
78
+ Self {
79
+ can_start_server,
80
+ can_kill_server,
81
+ paths,
82
+ }
83
+ }
84
+
72
85
const CONNECT_RETRY_MAX : usize = 3 ;
73
86
const CONNECT_TIMEOUT : Duration = Duration :: from_secs ( 1 ) ;
74
87
const SHUTDOWN_TIMEOUT : Duration = Duration :: from_secs ( 1 ) ;
@@ -93,7 +106,7 @@ impl DaemonConnector {
93
106
let pid = self . get_or_start_daemon ( ) . await ?;
94
107
debug ! ( "got daemon with pid: {}" , pid) ;
95
108
96
- let conn = match self . get_connection ( self . sock_file . clone ( ) ) . await {
109
+ let conn = match self . get_connection ( self . paths . sock_file . clone ( ) ) . await {
97
110
Err ( DaemonConnectorError :: Watcher ( _) ) => continue ,
98
111
Err ( DaemonConnectorError :: Socket ( e) ) => {
99
112
// assume the server is not yet ready
@@ -130,7 +143,7 @@ impl DaemonConnector {
130
143
///
131
144
/// If a daemon is not running, it starts one.
132
145
async fn get_or_start_daemon ( & self ) -> Result < sysinfo:: Pid , DaemonConnectorError > {
133
- debug ! ( "looking for pid in lockfile: {:?}" , self . pid_file) ;
146
+ debug ! ( "looking for pid in lockfile: {:?}" , self . paths . pid_file) ;
134
147
135
148
let pidfile = self . pid_lock ( ) ;
136
149
@@ -225,7 +238,7 @@ impl DaemonConnector {
225
238
226
239
match timeout (
227
240
Self :: SHUTDOWN_TIMEOUT ,
228
- wait_for_file ( & self . pid_file , WaitAction :: Deleted ) ,
241
+ wait_for_file ( & self . paths . pid_file , WaitAction :: Deleted ) ,
229
242
)
230
243
. await ?
231
244
{
@@ -273,19 +286,19 @@ impl DaemonConnector {
273
286
// exists to protect against stale .sock files
274
287
timeout (
275
288
Self :: SOCKET_TIMEOUT ,
276
- wait_for_file ( & self . pid_file , WaitAction :: Exists ) ,
289
+ wait_for_file ( & self . paths . pid_file , WaitAction :: Exists ) ,
277
290
)
278
291
. await ??;
279
292
timeout (
280
293
Self :: SOCKET_TIMEOUT ,
281
- wait_for_file ( & self . sock_file , WaitAction :: Exists ) ,
294
+ wait_for_file ( & self . paths . sock_file , WaitAction :: Exists ) ,
282
295
)
283
296
. await ?
284
297
. map_err ( Into :: into)
285
298
}
286
299
287
300
fn pid_lock ( & self ) -> pidlock:: Pidlock {
288
- pidlock:: Pidlock :: new ( self . pid_file . clone ( ) . into ( ) )
301
+ pidlock:: Pidlock :: new ( self . paths . pid_file . clone ( ) . into ( ) )
289
302
}
290
303
}
291
304
@@ -396,7 +409,7 @@ enum WaitAction {
396
409
397
410
#[ cfg( test) ]
398
411
mod test {
399
- use std:: { assert_matches:: assert_matches, path :: Path } ;
412
+ use std:: assert_matches:: assert_matches;
400
413
401
414
use sysinfo:: Pid ;
402
415
use tokio:: {
@@ -414,28 +427,18 @@ mod test {
414
427
#[ cfg( target_os = "windows" ) ]
415
428
const NODE_EXE : & str = "node.exe" ;
416
429
417
- fn pid_path ( tmp_path : & Path ) -> AbsoluteSystemPathBuf {
418
- AbsoluteSystemPathBuf :: try_from ( tmp_path. join ( "turbod.pid" ) ) . unwrap ( )
419
- }
420
-
421
- fn sock_path ( tmp_path : & Path ) -> AbsoluteSystemPathBuf {
422
- AbsoluteSystemPathBuf :: try_from ( tmp_path. join ( "turbod.sock" ) ) . unwrap ( )
423
- }
424
-
425
430
#[ tokio:: test]
426
431
async fn handles_invalid_pid ( ) {
427
432
let tmp_dir = tempfile:: tempdir ( ) . unwrap ( ) ;
428
- let tmp_path = tmp_dir. path ( ) . to_owned ( ) ;
429
-
430
- let pid = pid_path ( & tmp_path) ;
431
- std:: fs:: write ( & pid, "not a pid" ) . unwrap ( ) ;
432
-
433
- let connector = DaemonConnector {
434
- pid_file : pid,
435
- sock_file : sock_path ( & tmp_path) ,
436
- can_kill_server : false ,
437
- can_start_server : false ,
438
- } ;
433
+ let repo_root = AbsoluteSystemPathBuf :: try_from ( tmp_dir. path ( ) ) . unwrap ( ) ;
434
+
435
+ let connector = DaemonConnector :: new ( false , false , & repo_root) ;
436
+ connector. paths . pid_file . ensure_dir ( ) . unwrap ( ) ;
437
+ connector
438
+ . paths
439
+ . pid_file
440
+ . create_with_contents ( "not a pid" )
441
+ . unwrap ( ) ;
439
442
440
443
assert_matches ! (
441
444
connector. get_or_start_daemon( ) . await ,
@@ -446,17 +449,8 @@ mod test {
446
449
#[ tokio:: test]
447
450
async fn handles_missing_server_connect ( ) {
448
451
let tmp_dir = tempfile:: tempdir ( ) . unwrap ( ) ;
449
- let tmp_path = tmp_dir. path ( ) . to_owned ( ) ;
450
-
451
- let pid = pid_path ( & tmp_path) ;
452
- let sock = sock_path ( & tmp_path) ;
453
-
454
- let connector = DaemonConnector {
455
- pid_file : pid,
456
- sock_file : sock,
457
- can_kill_server : false ,
458
- can_start_server : false ,
459
- } ;
452
+ let repo_root = AbsoluteSystemPathBuf :: try_from ( tmp_dir. path ( ) ) . unwrap ( ) ;
453
+ let connector = DaemonConnector :: new ( false , false , & repo_root) ;
460
454
461
455
assert_matches ! (
462
456
connector. connect( ) . await ,
@@ -467,17 +461,8 @@ mod test {
467
461
#[ tokio:: test]
468
462
async fn handles_kill_dead_server_missing_pid ( ) {
469
463
let tmp_dir = tempfile:: tempdir ( ) . unwrap ( ) ;
470
- let tmp_path = tmp_dir. path ( ) . to_owned ( ) ;
471
-
472
- let pid = pid_path ( & tmp_path) ;
473
- let sock = sock_path ( & tmp_path) ;
474
-
475
- let connector = DaemonConnector {
476
- pid_file : pid,
477
- sock_file : sock,
478
- can_kill_server : false ,
479
- can_start_server : false ,
480
- } ;
464
+ let repo_root = AbsoluteSystemPathBuf :: try_from ( tmp_dir. path ( ) ) . unwrap ( ) ;
465
+ let connector = DaemonConnector :: new ( false , false , & repo_root) ;
481
466
482
467
assert_matches ! (
483
468
connector. kill_dead_server( Pid :: from( usize :: MAX ) ) . await ,
@@ -488,35 +473,34 @@ mod test {
488
473
#[ tokio:: test]
489
474
async fn handles_kill_dead_server_missing_process ( ) {
490
475
let tmp_dir = tempfile:: tempdir ( ) . unwrap ( ) ;
491
- let tmp_path = tmp_dir. path ( ) . to_owned ( ) ;
492
-
493
- let pid = pid_path ( & tmp_path) ;
494
- std:: fs:: write ( & pid, i32:: MAX . to_string ( ) ) . unwrap ( ) ;
495
- let sock = sock_path ( & tmp_path) ;
496
- std:: fs:: write ( & sock, "" ) . unwrap ( ) ;
497
-
498
- let connector = DaemonConnector {
499
- pid_file : pid,
500
- sock_file : sock,
501
- can_kill_server : false ,
502
- can_start_server : false ,
503
- } ;
476
+ let repo_root = AbsoluteSystemPathBuf :: try_from ( tmp_dir. path ( ) ) . unwrap ( ) ;
477
+ let connector = DaemonConnector :: new ( false , false , & repo_root) ;
478
+
479
+ connector. paths . pid_file . ensure_dir ( ) . unwrap ( ) ;
480
+ connector
481
+ . paths
482
+ . pid_file
483
+ . create_with_contents ( i32:: MAX . to_string ( ) )
484
+ . unwrap ( ) ;
485
+ connector. paths . sock_file . ensure_dir ( ) . unwrap ( ) ;
486
+ connector. paths . sock_file . create_with_contents ( "" ) . unwrap ( ) ;
504
487
505
488
assert_matches ! (
506
489
connector. kill_dead_server( Pid :: from( usize :: MAX ) ) . await ,
507
490
Ok ( ( ) )
508
491
) ;
509
492
510
493
assert ! (
511
- !connector. pid_file. exists( ) ,
494
+ !connector. paths . pid_file. exists( ) ,
512
495
"pid file should be cleaned up when getting the owner of a stale pid"
513
496
) ;
514
497
}
515
498
516
499
#[ tokio:: test]
517
500
async fn handles_kill_dead_server_wrong_process ( ) {
518
501
let tmp_dir = tempfile:: tempdir ( ) . unwrap ( ) ;
519
- let tmp_path = tmp_dir. path ( ) . to_owned ( ) ;
502
+ let repo_root = AbsoluteSystemPathBuf :: try_from ( tmp_dir. path ( ) ) . unwrap ( ) ;
503
+ let connector = DaemonConnector :: new ( false , false , & repo_root) ;
520
504
521
505
let proc = tokio:: process:: Command :: new ( NODE_EXE )
522
506
. stdout ( Stdio :: null ( ) )
@@ -526,17 +510,14 @@ mod test {
526
510
. spawn ( )
527
511
. unwrap ( ) ;
528
512
529
- let pid = pid_path ( & tmp_path) ;
530
- std:: fs:: write ( & pid, proc. id ( ) . unwrap ( ) . to_string ( ) ) . unwrap ( ) ;
531
- let sock = sock_path ( & tmp_path) ;
532
- std:: fs:: write ( & sock, "" ) . unwrap ( ) ;
533
-
534
- let connector = DaemonConnector {
535
- pid_file : pid,
536
- sock_file : sock,
537
- can_kill_server : true ,
538
- can_start_server : false ,
539
- } ;
513
+ connector. paths . pid_file . ensure_dir ( ) . unwrap ( ) ;
514
+ connector
515
+ . paths
516
+ . pid_file
517
+ . create_with_contents ( proc. id ( ) . unwrap ( ) . to_string ( ) )
518
+ . unwrap ( ) ;
519
+ connector. paths . sock_file . ensure_dir ( ) . unwrap ( ) ;
520
+ connector. paths . sock_file . create_with_contents ( "" ) . unwrap ( ) ;
540
521
541
522
let kill_pid = Pid :: from ( usize:: MAX ) ;
542
523
let proc_id = Pid :: from ( proc. id ( ) . unwrap ( ) as usize ) ;
@@ -546,13 +527,17 @@ mod test {
546
527
Err ( DaemonConnectorError :: WrongPidProcess ( daemon, running) ) if daemon == kill_pid && running == proc_id
547
528
) ;
548
529
549
- assert ! ( connector. pid_file. exists( ) , "pid file should still exist" ) ;
530
+ assert ! (
531
+ connector. paths. pid_file. exists( ) ,
532
+ "pid file should still exist"
533
+ ) ;
550
534
}
551
535
552
536
#[ tokio:: test]
553
537
async fn handles_kill_dead_server ( ) {
554
538
let tmp_dir = tempfile:: tempdir ( ) . unwrap ( ) ;
555
- let tmp_path = tmp_dir. path ( ) . to_owned ( ) ;
539
+ let repo_root = AbsoluteSystemPathBuf :: try_from ( tmp_dir. path ( ) ) . unwrap ( ) ;
540
+ let connector = DaemonConnector :: new ( false , true , & repo_root) ;
556
541
557
542
let proc = tokio:: process:: Command :: new ( NODE_EXE )
558
543
. stdout ( Stdio :: null ( ) )
@@ -562,17 +547,14 @@ mod test {
562
547
. spawn ( )
563
548
. unwrap ( ) ;
564
549
565
- let pid = pid_path ( & tmp_path) ;
566
- std:: fs:: write ( & pid, proc. id ( ) . unwrap ( ) . to_string ( ) ) . unwrap ( ) ;
567
- let sock = sock_path ( & tmp_path) ;
568
- std:: fs:: write ( & sock, "" ) . unwrap ( ) ;
569
-
570
- let connector = DaemonConnector {
571
- pid_file : pid,
572
- sock_file : sock,
573
- can_kill_server : true ,
574
- can_start_server : false ,
575
- } ;
550
+ connector. paths . pid_file . ensure_dir ( ) . unwrap ( ) ;
551
+ connector
552
+ . paths
553
+ . pid_file
554
+ . create_with_contents ( proc. id ( ) . unwrap ( ) . to_string ( ) )
555
+ . unwrap ( ) ;
556
+ connector. paths . sock_file . ensure_dir ( ) . unwrap ( ) ;
557
+ connector. paths . sock_file . create_with_contents ( "" ) . unwrap ( ) ;
576
558
577
559
assert_matches ! (
578
560
connector
@@ -581,7 +563,10 @@ mod test {
581
563
Ok ( ( ) )
582
564
) ;
583
565
584
- assert ! ( connector. pid_file. exists( ) , "pid file should still exist" ) ;
566
+ assert ! (
567
+ connector. paths. pid_file. exists( ) ,
568
+ "pid file should still exist"
569
+ ) ;
585
570
}
586
571
587
572
struct DummyServer {
@@ -664,25 +649,9 @@ mod test {
664
649
} ) )
665
650
. serve_with_incoming ( stream) ;
666
651
667
- let ( pid_file, sock_file) = if cfg ! ( windows) {
668
- (
669
- AbsoluteSystemPathBuf :: new ( "C:\\ pid" ) . unwrap ( ) ,
670
- AbsoluteSystemPathBuf :: new ( "C:\\ sock" ) . unwrap ( ) ,
671
- )
672
- } else {
673
- (
674
- AbsoluteSystemPathBuf :: new ( "/pid" ) . unwrap ( ) ,
675
- AbsoluteSystemPathBuf :: new ( "/sock" ) . unwrap ( ) ,
676
- )
677
- } ;
678
-
679
- // set up the client
680
- let conn = DaemonConnector {
681
- pid_file,
682
- sock_file,
683
- can_kill_server : false ,
684
- can_start_server : false ,
685
- } ;
652
+ let tmp_dir = tempfile:: tempdir ( ) . unwrap ( ) ;
653
+ let repo_root = AbsoluteSystemPathBuf :: try_from ( tmp_dir. path ( ) ) . unwrap ( ) ;
654
+ let connector = DaemonConnector :: new ( false , false , & repo_root) ;
686
655
687
656
let mut client = Endpoint :: try_from ( "http://[::]:50051" )
688
657
. expect ( "this is a valid uri" )
@@ -716,7 +685,7 @@ mod test {
716
685
assert_matches ! ( hello_resp, DaemonError :: VersionMismatch ( _) ) ;
717
686
let client = DaemonClient :: new ( client) ;
718
687
719
- let shutdown_fut = conn . kill_live_server ( client, Pid :: from ( 1000 ) ) ;
688
+ let shutdown_fut = connector . kill_live_server ( client, Pid :: from ( 1000 ) ) ;
720
689
721
690
// drive the futures to completion
722
691
select ! {
0 commit comments