@@ -823,8 +823,11 @@ def _dec_needs_replica(self, ts: TaskState) -> None:
823
823
if self .needs_what [ts ] == 0 :
824
824
del self .needs_what [ts ]
825
825
nbytes = ts .get_nbytes ()
826
- self ._network_occ -= nbytes
827
- self .scheduler ._network_occ_global -= nbytes
826
+ # FIXME: ts.get_nbytes may change if non-deterministic tasks get recomputed, causing drift
827
+ self ._network_occ -= min (nbytes , self ._network_occ )
828
+ self .scheduler ._network_occ_global -= min (
829
+ nbytes , self .scheduler ._network_occ_global
830
+ )
828
831
829
832
def add_replica (self , ts : TaskState ) -> None :
830
833
"""The worker acquired a replica of task"""
@@ -835,8 +838,11 @@ def add_replica(self, ts: TaskState) -> None:
835
838
nbytes = ts .get_nbytes ()
836
839
if ts in self .needs_what :
837
840
del self .needs_what [ts ]
838
- self ._network_occ -= nbytes
839
- self .scheduler ._network_occ_global -= nbytes
841
+ # FIXME: ts.get_nbytes may change if non-deterministic tasks get recomputed, causing drift
842
+ self ._network_occ -= min (nbytes , self ._network_occ )
843
+ self .scheduler ._network_occ_global -= min (
844
+ nbytes , self .scheduler ._network_occ_global
845
+ )
840
846
ts .who_has .add (self )
841
847
self .nbytes += nbytes
842
848
self ._has_what [ts ] = None
@@ -1958,7 +1964,9 @@ def _calc_occupancy(
1958
1964
duration = self ._get_prefix_duration (self .task_prefixes [prefix_name ])
1959
1965
res += duration * count
1960
1966
occ = res + network_occ / self .bandwidth
1961
- assert occ >= 0 , (occ , res , network_occ , self .bandwidth )
1967
+ if self .validate :
1968
+ assert occ >= 0 , (occ , res , network_occ , self .bandwidth )
1969
+ occ = max (occ , 0 )
1962
1970
return occ
1963
1971
1964
1972
#####################
0 commit comments