Skip to content

Commit 139f314

Browse files
authored
[Feature] Add support for endBlock in data sources (#4787)
* graph,chain,store/test-store : Allow new param `endBlock` in manifest * core,graph,store: ignore end_block reached datasources in match_and_decode, include them in processed datasources * tests : add runner tests for end-block * core: move TriggerFilter construction into SubgraphRunner.run_inner * core: filter out endBlock reached subgraphs when constructing TriggerFilter * chain,core: refactor endBlock implementation * refactor `SubgraphRunner.run_inner` to extract `build_filter` * core : handle reverts for endBlock * chain,graph: set min_spec_version requirements for endBlock * core: refaction `build_filter` * tests: runner test for endblock on reorg * core: restart block stream in the next block for endblock reached ds * graph: bump specVersion requirement for endBlock * core: refactor build_filter logic * core, tests, graph : make TriggerFilters testable * chain/startknet: endBlock support for starknet * chain,core,graph: refactor end_block implementation * core: refactor build_filter * Add comments for end-block runner tests
1 parent 02d611a commit 139f314

File tree

31 files changed

+529
-102
lines changed

31 files changed

+529
-102
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ lcov.info
2424
/tests/**/generated
2525
/tests/**/node_modules
2626
/tests/**/yarn-error.log
27+
/tests/**/pnpm-lock.yaml
2728

2829
# Built solidity contracts.
2930
/tests/**/bin

chain/arweave/src/adapter.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,11 @@ mod test {
239239
kind: "".into(),
240240
network: None,
241241
name: "".into(),
242-
source: Source { owner, start_block },
242+
source: Source {
243+
owner,
244+
start_block,
245+
end_block: None,
246+
},
243247
mapping: Mapping {
244248
api_version: Version::new(1, 2, 3),
245249
language: "".into(),

chain/arweave/src/data_source.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ impl blockchain::DataSource<Chain> for DataSource {
6363
kinds
6464
}
6565

66+
fn end_block(&self) -> Option<BlockNumber> {
67+
self.source.end_block
68+
}
69+
6670
fn match_and_decode(
6771
&self,
6872
trigger: &<Chain as Blockchain>::TriggerData,
@@ -392,9 +396,11 @@ pub struct TransactionHandler {
392396
}
393397

394398
#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)]
399+
#[serde(rename_all = "camelCase")]
395400
pub(crate) struct Source {
396401
// A data source that does not have an owner can only have block handlers.
397402
pub(crate) owner: Option<String>,
398-
#[serde(rename = "startBlock", default)]
403+
#[serde(default)]
399404
pub(crate) start_block: BlockNumber,
405+
pub(crate) end_block: Option<BlockNumber>,
400406
}

chain/cosmos/src/data_source.rs

+15-3
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ impl blockchain::DataSource<Chain> for DataSource {
8383
kinds
8484
}
8585

86+
fn end_block(&self) -> Option<BlockNumber> {
87+
self.source.end_block
88+
}
89+
8690
fn match_and_decode(
8791
&self,
8892
trigger: &<Chain as Blockchain>::TriggerData,
@@ -502,9 +506,11 @@ pub struct MappingMessageHandler {
502506
}
503507

504508
#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)]
509+
#[serde(rename_all = "camelCase")]
505510
pub struct Source {
506-
#[serde(rename = "startBlock", default)]
511+
#[serde(default)]
507512
pub start_block: BlockNumber,
513+
pub(crate) end_block: Option<BlockNumber>,
508514
}
509515

510516
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, Deserialize)]
@@ -657,7 +663,10 @@ mod tests {
657663
kind: "cosmos".to_string(),
658664
network: None,
659665
name: "Test".to_string(),
660-
source: Source { start_block: 1 },
666+
source: Source {
667+
start_block: 1,
668+
end_block: None,
669+
},
661670
mapping: Mapping {
662671
api_version: semver::Version::new(0, 0, 0),
663672
language: "".to_string(),
@@ -679,7 +688,10 @@ mod tests {
679688
kind: "cosmos".to_string(),
680689
network: None,
681690
name: "Test".to_string(),
682-
source: Source { start_block: 1 },
691+
source: Source {
692+
start_block: 1,
693+
end_block: None,
694+
},
683695
mapping: Mapping {
684696
api_version: semver::Version::new(0, 0, 0),
685697
language: "".to_string(),

chain/ethereum/src/adapter.rs

+28-3
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,21 @@ impl TriggerFilter {
121121
pub(crate) fn requires_traces(&self) -> bool {
122122
!self.call.is_empty() || self.block.requires_traces()
123123
}
124+
125+
#[cfg(debug_assertions)]
126+
pub fn log(&self) -> &EthereumLogFilter {
127+
&self.log
128+
}
129+
130+
#[cfg(debug_assertions)]
131+
pub fn call(&self) -> &EthereumCallFilter {
132+
&self.call
133+
}
134+
135+
#[cfg(debug_assertions)]
136+
pub fn block(&self) -> &EthereumBlockFilter {
137+
&self.block
138+
}
124139
}
125140

126141
impl bc::TriggerFilter<Chain> for TriggerFilter {
@@ -185,7 +200,7 @@ impl bc::TriggerFilter<Chain> for TriggerFilter {
185200
}
186201

187202
#[derive(Clone, Debug, Default)]
188-
pub(crate) struct EthereumLogFilter {
203+
pub struct EthereumLogFilter {
189204
/// Log filters can be represented as a bipartite graph between contracts and events. An edge
190205
/// exists between a contract and an event if a data source for the contract has a trigger for
191206
/// the event.
@@ -382,10 +397,20 @@ impl EthereumLogFilter {
382397
}
383398
filters.into_iter()
384399
}
400+
401+
#[cfg(debug_assertions)]
402+
pub fn contract_addresses(&self) -> impl Iterator<Item = Address> + '_ {
403+
self.contracts_and_events_graph
404+
.nodes()
405+
.filter_map(|node| match node {
406+
LogFilterNode::Contract(address) => Some(address),
407+
LogFilterNode::Event(_) => None,
408+
})
409+
}
385410
}
386411

387412
#[derive(Clone, Debug, Default)]
388-
pub(crate) struct EthereumCallFilter {
413+
pub struct EthereumCallFilter {
389414
// Each call filter has a map of filters keyed by address, each containing a tuple with
390415
// start_block and the set of function signatures
391416
pub contract_addresses_function_signatures:
@@ -583,7 +608,7 @@ impl From<&EthereumBlockFilter> for EthereumCallFilter {
583608
}
584609

585610
#[derive(Clone, Debug, Default)]
586-
pub(crate) struct EthereumBlockFilter {
611+
pub struct EthereumBlockFilter {
587612
/// Used for polling block handlers, a hashset of (start_block, polling_interval)
588613
pub polling_intervals: HashSet<(BlockNumber, i32)>,
589614
pub contract_addresses: HashSet<(BlockNumber, Address)>,

chain/ethereum/src/data_source.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pub struct DataSource {
5151
pub manifest_idx: u32,
5252
pub address: Option<Address>,
5353
pub start_block: BlockNumber,
54+
pub end_block: Option<BlockNumber>,
5455
pub mapping: Mapping,
5556
pub context: Arc<Option<DataSourceContext>>,
5657
pub creation_block: Option<BlockNumber>,
@@ -99,6 +100,7 @@ impl blockchain::DataSource<Chain> for DataSource {
99100
manifest_idx: template.manifest_idx,
100101
address: Some(address),
101102
start_block: creation_block,
103+
end_block: None,
102104
mapping: template.mapping,
103105
context: Arc::new(context),
104106
creation_block: Some(creation_block),
@@ -137,6 +139,10 @@ impl blockchain::DataSource<Chain> for DataSource {
137139
self.start_block
138140
}
139141

142+
fn end_block(&self) -> Option<BlockNumber> {
143+
self.end_block
144+
}
145+
140146
fn match_and_decode(
141147
&self,
142148
trigger: &<Chain as Blockchain>::TriggerData,
@@ -176,12 +182,12 @@ impl blockchain::DataSource<Chain> for DataSource {
176182
address,
177183
mapping,
178184
context,
179-
180185
// The creation block is ignored for detection duplicate data sources.
181186
// Contract ABI equality is implicit in `mapping.abis` equality.
182187
creation_block: _,
183188
contract_abi: _,
184189
start_block: _,
190+
end_block: _,
185191
} = self;
186192

187193
// mapping_request_sender, host_metrics, and (most of) host_exports are operational structs
@@ -247,6 +253,7 @@ impl blockchain::DataSource<Chain> for DataSource {
247253
manifest_idx,
248254
address,
249255
start_block: creation_block.unwrap_or(0),
256+
end_block: None,
250257
mapping: template.mapping.clone(),
251258
context: Arc::new(context),
252259
creation_block,
@@ -382,6 +389,7 @@ impl DataSource {
382389
manifest_idx,
383390
address: source.address,
384391
start_block: source.start_block,
392+
end_block: source.end_block,
385393
mapping,
386394
context: Arc::new(context),
387395
creation_block,

chain/near/src/chain.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1010,6 +1010,7 @@ mod test {
10101010
source: crate::data_source::Source {
10111011
account,
10121012
start_block: 10,
1013+
end_block: None,
10131014
accounts: partial_accounts,
10141015
},
10151016
mapping: Mapping {

chain/near/src/data_source.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ impl blockchain::DataSource<Chain> for DataSource {
9292
kinds
9393
}
9494

95+
fn end_block(&self) -> Option<BlockNumber> {
96+
self.source.end_block
97+
}
98+
9599
fn match_and_decode(
96100
&self,
97101
trigger: &<Chain as Blockchain>::TriggerData,
@@ -493,10 +497,12 @@ impl PartialAccounts {
493497
}
494498

495499
#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)]
500+
#[serde(rename_all = "camelCase")]
496501
pub(crate) struct Source {
497502
// A data source that does not have an account or accounts can only have block handlers.
498503
pub(crate) account: Option<String>,
499-
#[serde(rename = "startBlock", default)]
504+
#[serde(default)]
500505
pub(crate) start_block: BlockNumber,
506+
pub(crate) end_block: Option<BlockNumber>,
501507
pub(crate) accounts: Option<PartialAccounts>,
502508
}

chain/starknet/src/data_source.rs

+5
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ pub struct UnresolvedDataSource {
4848
#[serde(rename_all = "camelCase")]
4949
pub struct Source {
5050
pub start_block: BlockNumber,
51+
pub end_block: Option<BlockNumber>,
5152
#[serde(default, deserialize_with = "deserialize_address")]
5253
pub address: Option<FieldElement>,
5354
}
@@ -98,6 +99,10 @@ impl blockchain::DataSource<Chain> for DataSource {
9899
self.source.start_block
99100
}
100101

102+
fn end_block(&self) -> Option<BlockNumber> {
103+
self.source.end_block
104+
}
105+
101106
fn handler_kinds(&self) -> HashSet<&str> {
102107
let mut kinds = HashSet::new();
103108

chain/substreams/src/data_source.rs

+4
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ impl blockchain::DataSource<Chain> for DataSource {
4747
self.initial_block.unwrap_or(0)
4848
}
4949

50+
fn end_block(&self) -> Option<BlockNumber> {
51+
None
52+
}
53+
5054
fn name(&self) -> &str {
5155
&self.name
5256
}

core/src/subgraph/context.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -66,24 +66,23 @@ where
6666
{
6767
instance: SubgraphInstance<C, T>,
6868
pub instances: SubgraphKeepAlive,
69-
pub filter: C::TriggerFilter,
7069
pub offchain_monitor: OffchainMonitor,
70+
pub filter: Option<C::TriggerFilter>,
7171
trigger_processor: Box<dyn TriggerProcessor<C, T>>,
7272
}
7373

7474
impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
7575
pub fn new(
7676
instance: SubgraphInstance<C, T>,
7777
instances: SubgraphKeepAlive,
78-
filter: C::TriggerFilter,
7978
offchain_monitor: OffchainMonitor,
8079
trigger_processor: Box<dyn TriggerProcessor<C, T>>,
8180
) -> Self {
8281
Self {
8382
instance,
8483
instances,
85-
filter,
8684
offchain_monitor,
85+
filter: None,
8786
trigger_processor,
8887
}
8988
}
@@ -182,7 +181,6 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
182181
self.instance.causality_region_next_value()
183182
}
184183

185-
#[cfg(debug_assertions)]
186184
pub fn instance(&self) -> &SubgraphInstance<C, T> {
187185
&self.instance
188186
}

core/src/subgraph/context/instance.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ pub struct SubgraphInstance<C: Blockchain, T: RuntimeHostBuilder<C>> {
1515
subgraph_id: DeploymentHash,
1616
network: String,
1717
host_builder: T,
18-
templates: Arc<Vec<DataSourceTemplate<C>>>,
18+
pub templates: Arc<Vec<DataSourceTemplate<C>>>,
19+
/// The data sources declared in the subgraph manifest. This does not include dynamic data sources.
20+
pub data_sources: Arc<Vec<DataSource<C>>>,
1921
host_metrics: Arc<HostMetrics>,
2022

2123
/// The hosts represent the data sources in the subgraph. There is one host per data source.
@@ -33,9 +35,12 @@ where
3335
C: Blockchain,
3436
T: RuntimeHostBuilder<C>,
3537
{
38+
/// Create a new subgraph instance from the given manifest and data sources.
39+
/// `data_sources` must contain all data sources declared in the manifest + all dynamic data sources.
3640
pub fn from_manifest(
3741
logger: &Logger,
3842
manifest: SubgraphManifest<C>,
43+
data_sources: Vec<DataSource<C>>,
3944
host_builder: T,
4045
host_metrics: Arc<HostMetrics>,
4146
offchain_monitor: &mut OffchainMonitor,
@@ -49,6 +54,7 @@ where
4954
host_builder,
5055
subgraph_id,
5156
network,
57+
data_sources: Arc::new(manifest.data_sources),
5258
hosts: Hosts::new(),
5359
module_cache: HashMap::new(),
5460
templates,
@@ -59,7 +65,7 @@ where
5965
// Create a new runtime host for each data source in the subgraph manifest;
6066
// we use the same order here as in the subgraph manifest to make the
6167
// event processing behavior predictable
62-
for ds in manifest.data_sources {
68+
for ds in data_sources {
6369
// TODO: This is duplicating code from `IndexingContext::add_dynamic_data_source` and
6470
// `SubgraphInstance::add_dynamic_data_source`. Ideally this should be refactored into
6571
// `IndexingContext`.
@@ -215,7 +221,6 @@ where
215221
self.causality_region_seq.next_val()
216222
}
217223

218-
#[cfg(debug_assertions)]
219224
pub fn hosts(&self) -> &[Arc<T::Host>] {
220225
&self.hosts.hosts()
221226
}

core/src/subgraph/inputs.rs

+3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub struct IndexingInputs<C: Blockchain> {
1515
pub deployment: DeploymentLocator,
1616
pub features: BTreeSet<SubgraphFeature>,
1717
pub start_blocks: Vec<BlockNumber>,
18+
pub end_blocks: BTreeSet<BlockNumber>,
1819
pub stop_block: Option<BlockNumber>,
1920
pub store: Arc<dyn WritableStore>,
2021
pub debug_fork: Option<Arc<dyn SubgraphFork>>,
@@ -37,6 +38,7 @@ impl<C: Blockchain> IndexingInputs<C> {
3738
deployment,
3839
features,
3940
start_blocks,
41+
end_blocks,
4042
stop_block,
4143
store: _,
4244
debug_fork,
@@ -53,6 +55,7 @@ impl<C: Blockchain> IndexingInputs<C> {
5355
deployment: deployment.clone(),
5456
features: features.clone(),
5557
start_blocks: start_blocks.clone(),
58+
end_blocks: end_blocks.clone(),
5659
stop_block: stop_block.clone(),
5760
store,
5861
debug_fork: debug_fork.clone(),

0 commit comments

Comments
 (0)