Skip to content

Feature/graphman test run #3079

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ struct IndexingInputs<C: Blockchain> {
deployment: DeploymentLocator,
features: BTreeSet<SubgraphFeature>,
start_blocks: Vec<BlockNumber>,
stop_block: Option<BlockNumber>,
store: Arc<dyn WritableStore>,
triggers_adapter: Arc<C::TriggersAdapter>,
chain: Arc<C>,
Expand Down Expand Up @@ -192,6 +193,7 @@ where
self: Arc<Self>,
loc: DeploymentLocator,
manifest: serde_yaml::Mapping,
stop_block: Option<BlockNumber>,
) {
let logger = self.logger_factory.subgraph_logger(&loc);
let err_logger = logger.clone();
Expand All @@ -201,13 +203,17 @@ where
match BlockchainKind::from_manifest(&manifest)? {
BlockchainKind::Ethereum => {
instance_manager
.start_subgraph_inner::<graph_chain_ethereum::Chain>(logger, loc, manifest)
.start_subgraph_inner::<graph_chain_ethereum::Chain>(
logger, loc, manifest, stop_block,
)
.await
}

BlockchainKind::Near => {
instance_manager
.start_subgraph_inner::<graph_chain_near::Chain>(logger, loc, manifest)
.start_subgraph_inner::<graph_chain_near::Chain>(
logger, loc, manifest, stop_block,
)
.await
}
}
Expand Down Expand Up @@ -274,6 +280,7 @@ where
logger: Logger,
deployment: DeploymentLocator,
manifest: serde_yaml::Mapping,
stop_block: Option<BlockNumber>,
) -> Result<(), Error> {
let subgraph_store = self.subgraph_store.cheap_clone();
let registry = self.metrics_registry.cheap_clone();
Expand Down Expand Up @@ -412,6 +419,7 @@ where
deployment: deployment.clone(),
features,
start_blocks,
stop_block,
store,
triggers_adapter,
chain,
Expand Down Expand Up @@ -620,6 +628,16 @@ where

let block_ptr = block.ptr();

match ctx.inputs.stop_block.clone() {
Some(stop_block) => {
if block_ptr.number > stop_block {
info!(&logger, "stop block reached for subgraph");
return Ok(());
}
}
_ => {}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a little weird that we stop after getting the block but before processing it - that way, the last block we'll actually process (and have entities for) will be stop_block - 1. It's probably fine, but will need to be documented in graphman run

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The instance manager stops after seeing (and before processing) the block HIGHER THAN the stop block:
if block_ptr.number > stop_block

So the behavior is to process up to stop_block INCLUSIVE, as you would expect (well, it Works on My Machine ™️)

if block.trigger_count() > 0 {
subgraph_metrics
.block_trigger_count
Expand Down
8 changes: 6 additions & 2 deletions core/src/subgraph/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ where
L: LinkResolver,
I: SubgraphInstanceManager,
{
async fn start(&self, loc: DeploymentLocator) -> Result<(), SubgraphAssignmentProviderError> {
async fn start(
&self,
loc: DeploymentLocator,
stop_block: Option<BlockNumber>,
) -> Result<(), SubgraphAssignmentProviderError> {
let logger = self.logger_factory.subgraph_logger(&loc);

// If subgraph ID already in set
Expand All @@ -63,7 +67,7 @@ where

self.instance_manager
.cheap_clone()
.start_subgraph(loc, raw)
.start_subgraph(loc, raw, stop_block)
.await;

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ async fn start_subgraph(
trace!(logger, "Start subgraph");

let start_time = Instant::now();
let result = provider.start(deployment.clone()).await;
let result = provider.start(deployment.clone(), None).await;

debug!(
logger,
Expand Down
2 changes: 2 additions & 0 deletions graph/src/components/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::prelude::BlockNumber;
use std::sync::Arc;

use crate::components::store::DeploymentLocator;
Expand All @@ -13,6 +14,7 @@ pub trait SubgraphInstanceManager: Send + Sync + 'static {
self: Arc<Self>,
deployment: DeploymentLocator,
manifest: serde_yaml::Mapping,
stop_block: Option<BlockNumber>,
);
fn stop_subgraph(&self, deployment: DeploymentLocator);
}
1 change: 1 addition & 0 deletions graph/src/components/subgraph/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub trait SubgraphAssignmentProvider: Send + Sync + 'static {
async fn start(
&self,
deployment: DeploymentLocator,
stop_block: Option<BlockNumber>,
) -> Result<(), SubgraphAssignmentProviderError>;
async fn stop(
&self,
Expand Down
50 changes: 50 additions & 0 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,17 @@ pub enum Command {
/// The deployments to rewind
names: Vec<String>,
},
/// Deploy and run an arbitrary subgraph, up to a certain block (for dev and testing purposes) -- WARNING: WILL RUN MIGRATIONS ON THE DB, DO NOT USE IN PRODUCTION
Run {
/// Network name (must fit one of the chain)
network_name: String,

/// Subgraph in the form `<IPFS Hash>` or `<name>:<IPFS Hash>`
subgraph: String,

/// Highest block number to process before stopping (inclusive)
stop_block: i32,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to say that the latest entities we'll have in the database will be for stop_block - 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed if my previous claim is right 😃

},
/// Check and interrogate the configuration
///
/// Print information about a configuration file without
Expand Down Expand Up @@ -374,6 +385,18 @@ impl Context {
}
}

fn metrics_registry(&self) -> Arc<MetricsRegistry> {
self.registry.clone()
}

fn config(&self) -> Cfg {
self.config.clone()
}

fn node_id(&self) -> NodeId {
self.node_id.clone()
}

fn primary_pool(self) -> ConnectionPool {
let primary = self.config.primary_store();
let pool = StoreBuilder::main_pool(
Expand Down Expand Up @@ -411,6 +434,10 @@ impl Context {
pools
}

async fn store_builder(self) -> StoreBuilder {
StoreBuilder::new(&self.logger, &self.node_id, &self.config, self.registry).await
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh .. the whole point of this Context struct was so that we wouldn't have to pass the StoreBuilder around because that obscures what exactly a command might be doing with the store. It would be better to add a utility method to the Context that returns the NetworkStore, the primary pool, and the chain head listener and pass those into commands::run

Also, StoreBuilder::new will run migrations - avoiding that is a big reason why there are all these other methods on StoreBuilder so that using the wrong graphman version won't screw up the database schema.

I would change this method to call Self::store_and_primary and then copy the few lines that create a ChainHeadUpdateListener from StoreBuilder::new to here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We changed it exactly to be able to run ... migrations :D I notice the differences indeed. Migrations was required first for CI since it always starts fresh but it's also quite good when using it for development purposes as I often starts from a fresh database.

The best will be to discuss that over a video chat to see what you be the best course of action here. Two things come to mind:

  • Run only on pristine database
  • Run only when specific flag is provided

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lutter Need to chat more about this, indeed, we are using this command mostly on a completely empty database (in dev/test flow), so we need all the migrations bells and whistles...


fn store_and_pools(self) -> (Arc<Store>, HashMap<Shard, ConnectionPool>) {
let (subgraph_store, pools) = StoreBuilder::make_subgraph_store_and_pools(
&self.logger,
Expand Down Expand Up @@ -582,6 +609,29 @@ async fn main() {
sleep,
)
}
Run {
network_name,
subgraph,
stop_block,
} => {
let logger = ctx.logger.clone();
let config = ctx.config();
let registry = ctx.metrics_registry().clone();
let node_id = ctx.node_id().clone();
let store_builder = ctx.store_builder().await;

commands::run::run(
logger,
store_builder,
network_name,
config,
registry,
node_id,
subgraph,
stop_block,
)
.await
}
Listen(cmd) => {
use ListenCommand::*;
match cmd {
Expand Down
1 change: 1 addition & 0 deletions node/src/manager/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod listen;
pub mod query;
pub mod remove;
pub mod rewind;
pub mod run;
pub mod stats;
pub mod txn_speed;
pub mod unused_deployments;
Loading