From d6cb2b64c306726e2872e0254afe59c533cb932a Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Wed, 8 Dec 2021 16:56:18 -0500 Subject: [PATCH 1/6] Added grapman test-run command to launch a Subgraph then stop it later on - Copied a bunch of stuff that can be shared with `main.rs` - Can refactor even more `main.rs` to be shared with stuff made for `test-run` - Implement some form of stop mechanism (block_count, stop block, time based, etc) --- core/src/subgraph/instance_manager.rs | 19 +- core/src/subgraph/provider.rs | 4 +- core/src/subgraph/registrar.rs | 2 +- .../components/subgraph/instance_manager.rs | 2 + graph/src/components/subgraph/provider.rs | 1 + node/src/bin/manager.rs | 47 ++ node/src/manager/commands/mod.rs | 1 + node/src/manager/commands/test_run.rs | 577 ++++++++++++++++++ node/src/manager/deployment.rs | 2 +- 9 files changed, 649 insertions(+), 6 deletions(-) create mode 100644 node/src/manager/commands/test_run.rs diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 717c1408ae1..fe790aab074 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -60,6 +60,7 @@ struct IndexingInputs { deployment: DeploymentLocator, features: BTreeSet, start_blocks: Vec, + stop_block: Option, store: Arc, triggers_adapter: Arc, chain: Arc, @@ -192,6 +193,7 @@ where self: Arc, loc: DeploymentLocator, manifest: serde_yaml::Mapping, + stop_block: Option, ) { let logger = self.logger_factory.subgraph_logger(&loc); let err_logger = logger.clone(); @@ -201,13 +203,13 @@ where match BlockchainKind::from_manifest(&manifest)? { BlockchainKind::Ethereum => { instance_manager - .start_subgraph_inner::(logger, loc, manifest) + .start_subgraph_inner::(logger, loc, manifest, stop_block) .await } BlockchainKind::Near => { instance_manager - .start_subgraph_inner::(logger, loc, manifest) + .start_subgraph_inner::(logger, loc, manifest, stop_block) .await } } @@ -274,6 +276,7 @@ where logger: Logger, deployment: DeploymentLocator, manifest: serde_yaml::Mapping, + stop_block: Option, ) -> Result<(), Error> { let subgraph_store = self.subgraph_store.cheap_clone(); let registry = self.metrics_registry.cheap_clone(); @@ -412,6 +415,7 @@ where deployment: deployment.clone(), features, start_blocks, + stop_block, store, triggers_adapter, chain, @@ -620,6 +624,17 @@ where let block_ptr = block.ptr(); + + match ctx.inputs.stop_block.clone() { + Some(stop_block) => { + if block_ptr.number > stop_block { + info!(&logger, "stop block reached for subgraph"); + return Ok(()); + } + } + _ => {} + } + if block.trigger_count() > 0 { subgraph_metrics .block_trigger_count diff --git a/core/src/subgraph/provider.rs b/core/src/subgraph/provider.rs index b7a5de290e4..7e2bb71e9c7 100644 --- a/core/src/subgraph/provider.rs +++ b/core/src/subgraph/provider.rs @@ -40,7 +40,7 @@ where L: LinkResolver, I: SubgraphInstanceManager, { - async fn start(&self, loc: DeploymentLocator) -> Result<(), SubgraphAssignmentProviderError> { + async fn start(&self, loc: DeploymentLocator, stop_block: Option) -> Result<(), SubgraphAssignmentProviderError> { let logger = self.logger_factory.subgraph_logger(&loc); // If subgraph ID already in set @@ -63,7 +63,7 @@ where self.instance_manager .cheap_clone() - .start_subgraph(loc, raw) + .start_subgraph(loc, raw, stop_block) .await; Ok(()) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 41c786dd121..8ff2763fcc5 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -409,7 +409,7 @@ async fn start_subgraph( trace!(logger, "Start subgraph"); let start_time = Instant::now(); - let result = provider.start(deployment.clone()).await; + let result = provider.start(deployment.clone(), None).await; debug!( logger, diff --git a/graph/src/components/subgraph/instance_manager.rs b/graph/src/components/subgraph/instance_manager.rs index 0892716678d..05b200dbdf2 100644 --- a/graph/src/components/subgraph/instance_manager.rs +++ b/graph/src/components/subgraph/instance_manager.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use crate::prelude::BlockNumber; use crate::components::store::DeploymentLocator; @@ -13,6 +14,7 @@ pub trait SubgraphInstanceManager: Send + Sync + 'static { self: Arc, deployment: DeploymentLocator, manifest: serde_yaml::Mapping, + stop_block: Option, ); fn stop_subgraph(&self, deployment: DeploymentLocator); } diff --git a/graph/src/components/subgraph/provider.rs b/graph/src/components/subgraph/provider.rs index fbdde52495f..5edc22391c8 100644 --- a/graph/src/components/subgraph/provider.rs +++ b/graph/src/components/subgraph/provider.rs @@ -8,6 +8,7 @@ pub trait SubgraphAssignmentProvider: Send + Sync + 'static { async fn start( &self, deployment: DeploymentLocator, + stop_block: Option, ) -> Result<(), SubgraphAssignmentProviderError>; async fn stop( &self, diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 7fbc9d01e7e..8ef82f7c60e 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -144,6 +144,17 @@ pub enum Command { /// The deployments to rewind names: Vec, }, + /// Runs a suite of tests around a specific subgraph + TestRun { + /// Network name (must fit one of the chain) + network_name: String, + + /// Subgraph in the form `` or `:` + subgraph: String, + + /// Number of block to process + stop_block: i32, + }, /// Check and interrogate the configuration /// /// Print information about a configuration file without @@ -374,6 +385,18 @@ impl Context { } } + fn metrics_registry(&self) -> Arc { + self.registry.clone() + } + + fn config(&self) -> Cfg { + self.config.clone() + } + + fn node_id(&self) -> NodeId { + self.node_id.clone() + } + fn primary_pool(self) -> ConnectionPool { let primary = self.config.primary_store(); let pool = StoreBuilder::main_pool( @@ -582,6 +605,30 @@ async fn main() { sleep, ) } + TestRun { + network_name, + subgraph, + stop_block, + } => { + let logger = ctx.logger.clone(); + let config = ctx.config(); + let registry = ctx.metrics_registry().clone(); + let node_id = ctx.node_id().clone(); + let (store, primary) = ctx.store_and_primary(); + + commands::test_run::run( + primary, + store, + logger, + network_name, + config, + registry, + node_id, + subgraph, + stop_block, + ) + .await + } Listen(cmd) => { use ListenCommand::*; match cmd { diff --git a/node/src/manager/commands/mod.rs b/node/src/manager/commands/mod.rs index 1f4b18232b2..3f5ff87bf10 100644 --- a/node/src/manager/commands/mod.rs +++ b/node/src/manager/commands/mod.rs @@ -9,5 +9,6 @@ pub mod query; pub mod remove; pub mod rewind; pub mod stats; +pub mod test_run; pub mod txn_speed; pub mod unused_deployments; diff --git a/node/src/manager/commands/test_run.rs b/node/src/manager/commands/test_run.rs new file mode 100644 index 00000000000..e7b95c48c2c --- /dev/null +++ b/node/src/manager/commands/test_run.rs @@ -0,0 +1,577 @@ +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; +use std::time::Duration; +use std::{collections::HashSet, convert::TryFrom}; +use std::{env, thread}; + +use crate::config::{Config, ProviderDetails}; +use crate::manager::deployment::Deployment; +use crate::manager::PanicSubscriptionManager; +use crate::store_builder::StoreBuilder; +use ethereum::{EthereumNetworks, ProviderEthRpcMetrics}; +use futures::future::join_all; +use futures::TryFutureExt; +use graph::anyhow::{bail, format_err, Error}; +use graph::blockchain::{Block as BlockchainBlock, BlockchainKind, BlockchainMap, ChainIdentifier}; +use graph::cheap_clone::CheapClone; +use graph::components::store::{BlockStore as _, ChainStore as _, DeploymentId, DeploymentLocator}; +use graph::firehose::endpoints::{FirehoseEndpoint, FirehoseNetworkEndpoints, FirehoseNetworks}; +use graph::ipfs_client::IpfsClient; +use graph::log::logger; +use graph::prelude::{ + anyhow, tokio, BlockNumber, BlockPtr, DeploymentHash, LoggerFactory, NodeId, + SubgraphAssignmentProvider, SubgraphName, SubgraphRegistrar, SubgraphStore, + SubgraphVersionSwitchingMode, +}; +use graph::prelude::{prost, MetricsRegistry as MetricsRegistryTrait}; +use graph::slog::{debug, error, info, o, Logger}; +use graph::util::security::SafeDisplay; +use graph_chain_ethereum::{self as ethereum, network_indexer, EthereumAdapterTrait, Transport}; +use graph_core::{ + LinkResolver, MetricsRegistry, SubgraphAssignmentProvider as IpfsSubgraphAssignmentProvider, + SubgraphInstanceManager, SubgraphRegistrar as IpfsSubgraphRegistrar, +}; +use graph_store_postgres::BlockStore; +use graph_store_postgres::{connection_pool::ConnectionPool, Store}; +use lazy_static::lazy_static; +use std::str::FromStr; + +pub async fn run( + primary: ConnectionPool, + store: Arc, + logger: Logger, + network_name: String, + config: Config, + metrics_registry: Arc, + node_id: NodeId, + subgraph: String, + stop_block: BlockNumber, +) -> Result<(), anyhow::Error> { + println!( + "Test run starting subgraph => {}, stop_block = {}", + subgraph, stop_block + ); + + let logger_factory = LoggerFactory::new(logger.clone(), None); + + // FIXME: Hard-coded IPFS config, take it from config file instead? + let ipfs_clients: Vec<_> = + create_ipfs_clients(&logger, &vec!["http://127.0.0.1:5001".to_string()]); + + // Convert the clients into a link resolver. Since we want to get past + // possible temporary DNS failures, make the resolver retry + let link_resolver = Arc::new(LinkResolver::from(ipfs_clients)); + + let eth_networks = create_ethereum_networks(logger.clone(), metrics_registry.clone(), &config) + .await + .expect("Failed to parse Ethereum networks"); + let firehose_networks_by_kind = + create_firehose_networks(logger.clone(), metrics_registry.clone(), &config) + .await + .expect("Failed to parse Firehose endpoints"); + let firehose_networks = firehose_networks_by_kind.get(&BlockchainKind::Ethereum); + let firehose_endpoints = firehose_networks.and_then(|v| v.networks.get(&network_name)); + + let eth_adapters = match eth_networks.networks.get(&network_name) { + Some(adapters) => adapters.clone(), + None => { + return Err(format_err!( + "No ethereum adapters found, but required in this state of graphman test-run command" + )) + } + }; + + let store_builder = + StoreBuilder::new(&logger, &node_id, &config, metrics_registry.clone()).await; + let chain_head_update_listener = store_builder.chain_head_update_listener(); + + let (eth_networks, ethereum_idents) = connect_ethereum_networks(&logger, eth_networks).await; + // let (near_networks, near_idents) = connect_firehose_networks::( + // &logger, + // firehose_networks_by_kind + // .remove(&BlockchainKind::Near) + // .unwrap_or_else(|| FirehoseNetworks::new()), + // ) + // .await; + + let network_identifiers = ethereum_idents.into_iter().collect(); + let network_store = store_builder.network_store(network_identifiers); + + let subgraph_store = network_store.subgraph_store(); + let chain_store = network_store + .block_store() + .chain_store(network_name.as_ref()) + .expect(format!("No chain store for {}", &network_name).as_ref()); + + let chain = ethereum::Chain::new( + logger_factory.clone(), + network_name.clone(), + node_id.clone(), + metrics_registry.clone(), + chain_store.cheap_clone(), + chain_store, + subgraph_store.clone(), + firehose_endpoints.map_or_else(|| FirehoseNetworkEndpoints::new(), |v| v.clone()), + eth_adapters, + chain_head_update_listener, + *ANCESTOR_COUNT, + *REORG_THRESHOLD, + // We assume the tested chain is always ingestible for now + true, + ); + + let mut blockchain_map = BlockchainMap::new(); + blockchain_map.insert(network_name.clone(), Arc::new(chain)); + + let blockchain_map = Arc::new(blockchain_map); + let subgraph_instance_manager = SubgraphInstanceManager::new( + &logger_factory, + subgraph_store.clone(), + blockchain_map.clone(), + metrics_registry.clone(), + link_resolver.cheap_clone(), + ); + + // Create IPFS-based subgraph provider + let subgraph_provider = Arc::new(IpfsSubgraphAssignmentProvider::new( + &logger_factory, + link_resolver.cheap_clone(), + subgraph_instance_manager, + )); + + let panicking_subscription_manager = Arc::new(PanicSubscriptionManager {}); + + let subgraph_registrar = Arc::new(IpfsSubgraphRegistrar::new( + &logger_factory, + link_resolver.cheap_clone(), + subgraph_provider.clone(), + subgraph_store.clone(), + panicking_subscription_manager, + blockchain_map, + node_id.clone(), + SubgraphVersionSwitchingMode::Instant, + )); + + let (name, hash) = if subgraph.contains(':') { + let mut split = subgraph.split(':'); + (split.next().unwrap(), split.next().unwrap().to_owned()) + } else { + ("cli", subgraph) + }; + + let subgraph_name = SubgraphName::new(name) + .expect("Subgraph name must contain only a-z, A-Z, 0-9, '-' and '_'"); + let subgraph_hash = DeploymentHash::new(hash).expect("Subgraph hash must be a valid IPFS hash"); + + info!(&logger, "Creating subgraph {}", name); + let create_result = + SubgraphRegistrar::create_subgraph(subgraph_registrar.as_ref(), subgraph_name.clone()) + .await?; + + info!( + &logger, + "Looking up subgraph deployment {} (Deployment hash => {}, id => {})", + name, + subgraph_hash, + create_result.id, + ); + + SubgraphRegistrar::create_subgraph_version( + subgraph_registrar.as_ref(), + subgraph_name.clone(), + subgraph_hash.clone(), + node_id.clone(), + ) + .await?; + + // let deployment_locator = DeploymentLocator::new(DeploymentId(deployment_id), subgraph_hash); + + let deployments = Deployment::lookup(&primary, name.to_string())?; + let deployment = deployments + .first() + .expect("At least one deployment should exist"); + + SubgraphAssignmentProvider::start(subgraph_provider.as_ref(), deployment.locator(), Some(stop_block)).await?; + + loop { + tokio::time::sleep(Duration::from_millis(1000)).await; + + let block_ptr = subgraph_store + .least_block_ptr(&subgraph_hash) + .unwrap() + .unwrap(); + + if block_ptr.number >= stop_block { + info!(&logger, "subgraph now at block {}, reached stop block {}", block_ptr.number, stop_block); + break; + } + + } + info!(&logger, "Removing subgraph {}", name); + subgraph_store.clone().remove_subgraph(subgraph_name)?; + + Ok(()) +} + +// Stuff copied directly moslty from `main.rs` +// +// FIXME: Share that with `main.rs` stuff + +// The status of a provider that we learned from connecting to it +#[derive(PartialEq)] +enum ProviderNetworkStatus { + Broken { + network: String, + provider: String, + }, + Version { + network: String, + ident: ChainIdentifier, + }, +} + +/// How long we will hold up node startup to get the net version and genesis +/// hash from the client. If we can't get it within that time, we'll try and +/// continue regardless. +const NET_VERSION_WAIT_TIME: Duration = Duration::from_secs(30); + +lazy_static! { + // Default to an Ethereum reorg threshold to 50 blocks + static ref REORG_THRESHOLD: BlockNumber = env::var("ETHEREUM_REORG_THRESHOLD") + .ok() + .map(|s| BlockNumber::from_str(&s) + .unwrap_or_else(|_| panic!("failed to parse env var ETHEREUM_REORG_THRESHOLD"))) + .unwrap_or(50); + + // Default to an ancestor count of 50 blocks + static ref ANCESTOR_COUNT: BlockNumber = env::var("ETHEREUM_ANCESTOR_COUNT") + .ok() + .map(|s| BlockNumber::from_str(&s) + .unwrap_or_else(|_| panic!("failed to parse env var ETHEREUM_ANCESTOR_COUNT"))) + .unwrap_or(50); +} + +fn create_ipfs_clients(logger: &Logger, ipfs_addresses: &Vec) -> Vec { + // Parse the IPFS URL from the `--ipfs` command line argument + let ipfs_addresses: Vec<_> = ipfs_addresses + .iter() + .map(|uri| { + if uri.starts_with("http://") || uri.starts_with("https://") { + String::from(uri) + } else { + format!("http://{}", uri) + } + }) + .collect(); + + ipfs_addresses + .into_iter() + .map(|ipfs_address| { + info!( + logger, + "Trying IPFS node at: {}", + SafeDisplay(&ipfs_address) + ); + + let ipfs_client = match IpfsClient::new(&ipfs_address) { + Ok(ipfs_client) => ipfs_client, + Err(e) => { + error!( + logger, + "Failed to create IPFS client for `{}`: {}", + SafeDisplay(&ipfs_address), + e + ); + panic!("Could not connect to IPFS"); + } + }; + + // Test the IPFS client by getting the version from the IPFS daemon + let ipfs_test = ipfs_client.cheap_clone(); + let ipfs_ok_logger = logger.clone(); + let ipfs_err_logger = logger.clone(); + let ipfs_address_for_ok = ipfs_address.clone(); + let ipfs_address_for_err = ipfs_address.clone(); + graph::spawn(async move { + ipfs_test + .test() + .map_err(move |e| { + error!( + ipfs_err_logger, + "Is there an IPFS node running at \"{}\"?", + SafeDisplay(ipfs_address_for_err), + ); + panic!("Failed to connect to IPFS: {}", e); + }) + .map_ok(move |_| { + info!( + ipfs_ok_logger, + "Successfully connected to IPFS node at: {}", + SafeDisplay(ipfs_address_for_ok) + ); + }) + .await + }); + + ipfs_client + }) + .collect() +} + +/// Parses an Ethereum connection string and returns the network name and Ethereum adapter. +async fn create_ethereum_networks( + logger: Logger, + registry: Arc, + config: &Config, +) -> Result { + let eth_rpc_metrics = Arc::new(ProviderEthRpcMetrics::new(registry)); + let mut parsed_networks = EthereumNetworks::new(); + for (name, chain) in &config.chains.chains { + if chain.protocol != BlockchainKind::Ethereum { + continue; + } + + for provider in &chain.providers { + if let ProviderDetails::Web3(web3) = &provider.details { + let capabilities = web3.node_capabilities(); + + let logger = logger.new(o!("provider" => provider.label.clone())); + info!( + logger, + "Creating transport"; + "url" => &web3.url, + "capabilities" => capabilities + ); + + use crate::config::Transport::*; + + let transport = match web3.transport { + Rpc => Transport::new_rpc(&web3.url, web3.headers.clone()), + Ipc => Transport::new_ipc(&web3.url).await, + Ws => Transport::new_ws(&web3.url).await, + }; + + let supports_eip_1898 = !web3.features.contains("no_eip1898"); + + parsed_networks.insert( + name.to_string(), + capabilities, + Arc::new( + graph_chain_ethereum::EthereumAdapter::new( + logger, + provider.label.clone(), + &web3.url, + transport, + eth_rpc_metrics.clone(), + supports_eip_1898, + ) + .await, + ), + ); + } + } + } + parsed_networks.sort(); + Ok(parsed_networks) +} + +async fn create_firehose_networks( + logger: Logger, + _registry: Arc, + config: &Config, +) -> Result, anyhow::Error> { + debug!( + logger, + "Creating firehose networks [{} chains, ingestor {}]", + config.chains.chains.len(), + config.chains.ingestor, + ); + + let mut networks_by_kind = BTreeMap::new(); + + for (name, chain) in &config.chains.chains { + for provider in &chain.providers { + if let ProviderDetails::Firehose(ref firehose) = provider.details { + let logger = logger.new(o!("provider" => provider.label.clone())); + info!( + logger, + "Creating firehose endpoint"; + "url" => &firehose.url, + ); + + let endpoint = FirehoseEndpoint::new( + logger, + &provider.label, + &firehose.url, + firehose.token.clone(), + ) + .await?; + + let parsed_networks = networks_by_kind + .entry(chain.protocol) + .or_insert_with(|| FirehoseNetworks::new()); + parsed_networks.insert(name.to_string(), Arc::new(endpoint)); + } + } + } + + Ok(networks_by_kind) +} + +/// Try to connect to all the providers in `eth_networks` and get their net +/// version and genesis block. Return the same `eth_networks` and the +/// retrieved net identifiers grouped by network name. Remove all providers +/// for which trying to connect resulted in an error from the returned +/// `EthereumNetworks`, since it's likely pointless to try and connect to +/// them. If the connection attempt to a provider times out after +/// `NET_VERSION_WAIT_TIME`, keep the provider, but don't report a +/// version for it. +async fn connect_ethereum_networks( + logger: &Logger, + mut eth_networks: EthereumNetworks, +) -> (EthereumNetworks, Vec<(String, Vec)>) { + // This has one entry for each provider, and therefore multiple entries + // for each network + let statuses = join_all( + eth_networks + .flatten() + .into_iter() + .map(|(network_name, capabilities, eth_adapter)| { + (network_name, capabilities, eth_adapter, logger.clone()) + }) + .map(|(network, capabilities, eth_adapter, logger)| async move { + let logger = logger.new(o!("provider" => eth_adapter.provider().to_string())); + info!( + logger, "Connecting to Ethereum to get network identifier"; + "capabilities" => &capabilities + ); + match tokio::time::timeout(NET_VERSION_WAIT_TIME, eth_adapter.net_identifiers()) + .await + .map_err(Error::from) + { + // An `Err` means a timeout, an `Ok(Err)` means some other error (maybe a typo + // on the URL) + Ok(Err(e)) | Err(e) => { + error!(logger, "Connection to provider failed. Not using this provider"; + "error" => e.to_string()); + ProviderNetworkStatus::Broken { + network, + provider: eth_adapter.provider().to_string(), + } + } + Ok(Ok(ident)) => { + info!( + logger, + "Connected to Ethereum"; + "network_version" => &ident.net_version, + "capabilities" => &capabilities + ); + ProviderNetworkStatus::Version { network, ident } + } + } + }), + ) + .await; + + // Group identifiers by network name + let idents: HashMap> = + statuses + .into_iter() + .fold(HashMap::new(), |mut networks, status| { + match status { + ProviderNetworkStatus::Broken { network, provider } => { + eth_networks.remove(&network, &provider) + } + ProviderNetworkStatus::Version { network, ident } => { + networks.entry(network.to_string()).or_default().push(ident) + } + } + networks + }); + let idents: Vec<_> = idents.into_iter().collect(); + (eth_networks, idents) +} + +/// Try to connect to all the providers in `firehose_networks` and get their net +/// version and genesis block. Return the same `eth_networks` and the +/// retrieved net identifiers grouped by network name. Remove all providers +/// for which trying to connect resulted in an error from the returned +/// `EthereumNetworks`, since it's likely pointless to try and connect to +/// them. If the connection attempt to a provider times out after +/// `NET_VERSION_WAIT_TIME`, keep the provider, but don't report a +/// version for it. +async fn connect_firehose_networks( + logger: &Logger, + mut firehose_networks: FirehoseNetworks, +) -> (FirehoseNetworks, Vec<(String, Vec)>) +where + M: prost::Message + BlockchainBlock + Default + 'static, +{ + // This has one entry for each provider, and therefore multiple entries + // for each network + let statuses = join_all( + firehose_networks + .flatten() + .into_iter() + .map(|(network_name, endpoint)| (network_name, endpoint, logger.clone())) + .map(|(network, endpoint, logger)| async move { + let logger = logger.new(o!("provider" => endpoint.provider.to_string())); + info!( + logger, "Connecting to Firehose to get network identifier"; + "url" => &endpoint.uri, + ); + match tokio::time::timeout( + NET_VERSION_WAIT_TIME, + endpoint.genesis_block_ptr::(&logger), + ) + .await + .map_err(Error::from) + { + // An `Err` means a timeout, an `Ok(Err)` means some other error (maybe a typo + // on the URL) + Ok(Err(e)) | Err(e) => { + error!(logger, "Connection to provider failed. Not using this provider"; + "error" => e.to_string()); + ProviderNetworkStatus::Broken { + network, + provider: endpoint.provider.to_string(), + } + } + Ok(Ok(ptr)) => { + info!( + logger, + "Connected to Firehose"; + "uri" => &endpoint.uri, + "genesis_block" => format_args!("{}", &ptr), + ); + + let ident = ChainIdentifier { + net_version: "0".to_string(), + genesis_block_hash: ptr.hash, + }; + + ProviderNetworkStatus::Version { network, ident } + } + } + }), + ) + .await; + + // Group identifiers by network name + let idents: HashMap> = + statuses + .into_iter() + .fold(HashMap::new(), |mut networks, status| { + match status { + ProviderNetworkStatus::Broken { network, provider } => { + firehose_networks.remove(&network, &provider) + } + ProviderNetworkStatus::Version { network, ident } => { + networks.entry(network.to_string()).or_default().push(ident) + } + } + networks + }); + let idents: Vec<_> = idents.into_iter().collect(); + (firehose_networks, idents) +} diff --git a/node/src/manager/deployment.rs b/node/src/manager/deployment.rs index f40ab938f15..a9652433328 100644 --- a/node/src/manager/deployment.rs +++ b/node/src/manager/deployment.rs @@ -16,7 +16,7 @@ use graph_store_postgres::{command_support::catalog as store_catalog, Shard, Sub use crate::manager::deployment; use crate::manager::display::List; -#[derive(Queryable, PartialEq, Eq, Hash)] +#[derive(Queryable, PartialEq, Eq, Hash, Debug)] pub struct Deployment { pub name: String, pub status: String, From 63b1e75c7885925bf641fa7a1e90ba079da9f086 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Mon, 10 Jan 2022 08:32:51 -0500 Subject: [PATCH 2/6] lint with 'cargo fmt --all' --- core/src/subgraph/instance_manager.rs | 9 ++++++--- core/src/subgraph/provider.rs | 6 +++++- graph/src/components/subgraph/instance_manager.rs | 2 +- node/src/manager/commands/test_run.rs | 13 ++++++++++--- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index fe790aab074..e4a08e39c25 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -203,13 +203,17 @@ where match BlockchainKind::from_manifest(&manifest)? { BlockchainKind::Ethereum => { instance_manager - .start_subgraph_inner::(logger, loc, manifest, stop_block) + .start_subgraph_inner::( + logger, loc, manifest, stop_block, + ) .await } BlockchainKind::Near => { instance_manager - .start_subgraph_inner::(logger, loc, manifest, stop_block) + .start_subgraph_inner::( + logger, loc, manifest, stop_block, + ) .await } } @@ -624,7 +628,6 @@ where let block_ptr = block.ptr(); - match ctx.inputs.stop_block.clone() { Some(stop_block) => { if block_ptr.number > stop_block { diff --git a/core/src/subgraph/provider.rs b/core/src/subgraph/provider.rs index 7e2bb71e9c7..ad2d5cadda1 100644 --- a/core/src/subgraph/provider.rs +++ b/core/src/subgraph/provider.rs @@ -40,7 +40,11 @@ where L: LinkResolver, I: SubgraphInstanceManager, { - async fn start(&self, loc: DeploymentLocator, stop_block: Option) -> Result<(), SubgraphAssignmentProviderError> { + async fn start( + &self, + loc: DeploymentLocator, + stop_block: Option, + ) -> Result<(), SubgraphAssignmentProviderError> { let logger = self.logger_factory.subgraph_logger(&loc); // If subgraph ID already in set diff --git a/graph/src/components/subgraph/instance_manager.rs b/graph/src/components/subgraph/instance_manager.rs index 05b200dbdf2..3b1777e3df8 100644 --- a/graph/src/components/subgraph/instance_manager.rs +++ b/graph/src/components/subgraph/instance_manager.rs @@ -1,5 +1,5 @@ -use std::sync::Arc; use crate::prelude::BlockNumber; +use std::sync::Arc; use crate::components::store::DeploymentLocator; diff --git a/node/src/manager/commands/test_run.rs b/node/src/manager/commands/test_run.rs index e7b95c48c2c..2d5449d7286 100644 --- a/node/src/manager/commands/test_run.rs +++ b/node/src/manager/commands/test_run.rs @@ -191,7 +191,12 @@ pub async fn run( .first() .expect("At least one deployment should exist"); - SubgraphAssignmentProvider::start(subgraph_provider.as_ref(), deployment.locator(), Some(stop_block)).await?; + SubgraphAssignmentProvider::start( + subgraph_provider.as_ref(), + deployment.locator(), + Some(stop_block), + ) + .await?; loop { tokio::time::sleep(Duration::from_millis(1000)).await; @@ -202,10 +207,12 @@ pub async fn run( .unwrap(); if block_ptr.number >= stop_block { - info!(&logger, "subgraph now at block {}, reached stop block {}", block_ptr.number, stop_block); + info!( + &logger, + "subgraph now at block {}, reached stop block {}", block_ptr.number, stop_block + ); break; } - } info!(&logger, "Removing subgraph {}", name); subgraph_store.clone().remove_subgraph(subgraph_name)?; From cd0f00cbb4fa9f156102991aa35d8eabbfbcf0a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Mon, 10 Jan 2022 09:05:27 -0500 Subject: [PATCH 3/6] fix code breaks from rebase with master --- node/src/manager/commands/test_run.rs | 108 +++----------------------- 1 file changed, 11 insertions(+), 97 deletions(-) diff --git a/node/src/manager/commands/test_run.rs b/node/src/manager/commands/test_run.rs index 2d5449d7286..886eb3c4a17 100644 --- a/node/src/manager/commands/test_run.rs +++ b/node/src/manager/commands/test_run.rs @@ -1,8 +1,7 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; -use std::{collections::HashSet, convert::TryFrom}; -use std::{env, thread}; +use std::env; use crate::config::{Config, ProviderDetails}; use crate::manager::deployment::Deployment; @@ -11,34 +10,32 @@ use crate::store_builder::StoreBuilder; use ethereum::{EthereumNetworks, ProviderEthRpcMetrics}; use futures::future::join_all; use futures::TryFutureExt; -use graph::anyhow::{bail, format_err, Error}; -use graph::blockchain::{Block as BlockchainBlock, BlockchainKind, BlockchainMap, ChainIdentifier}; +use graph::anyhow::{format_err, Error}; +use graph::blockchain::{BlockchainKind, BlockchainMap, ChainIdentifier}; use graph::cheap_clone::CheapClone; -use graph::components::store::{BlockStore as _, ChainStore as _, DeploymentId, DeploymentLocator}; -use graph::firehose::endpoints::{FirehoseEndpoint, FirehoseNetworkEndpoints, FirehoseNetworks}; +use graph::components::store::BlockStore as _; +use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints, FirehoseNetworks}; use graph::ipfs_client::IpfsClient; -use graph::log::logger; use graph::prelude::{ - anyhow, tokio, BlockNumber, BlockPtr, DeploymentHash, LoggerFactory, NodeId, + anyhow, tokio, BlockNumber, DeploymentHash, LoggerFactory, NodeId, SubgraphAssignmentProvider, SubgraphName, SubgraphRegistrar, SubgraphStore, SubgraphVersionSwitchingMode, }; -use graph::prelude::{prost, MetricsRegistry as MetricsRegistryTrait}; +use graph::prelude::MetricsRegistry as MetricsRegistryTrait; use graph::slog::{debug, error, info, o, Logger}; use graph::util::security::SafeDisplay; -use graph_chain_ethereum::{self as ethereum, network_indexer, EthereumAdapterTrait, Transport}; +use graph_chain_ethereum::{self as ethereum, EthereumAdapterTrait, Transport}; use graph_core::{ LinkResolver, MetricsRegistry, SubgraphAssignmentProvider as IpfsSubgraphAssignmentProvider, SubgraphInstanceManager, SubgraphRegistrar as IpfsSubgraphRegistrar, }; -use graph_store_postgres::BlockStore; use graph_store_postgres::{connection_pool::ConnectionPool, Store}; use lazy_static::lazy_static; use std::str::FromStr; pub async fn run( primary: ConnectionPool, - store: Arc, + _store: Arc, logger: Logger, network_name: String, config: Config, @@ -85,7 +82,7 @@ pub async fn run( StoreBuilder::new(&logger, &node_id, &config, metrics_registry.clone()).await; let chain_head_update_listener = store_builder.chain_head_update_listener(); - let (eth_networks, ethereum_idents) = connect_ethereum_networks(&logger, eth_networks).await; + let (_, ethereum_idents) = connect_ethereum_networks(&logger, eth_networks).await; // let (near_networks, near_idents) = connect_firehose_networks::( // &logger, // firehose_networks_by_kind @@ -111,7 +108,7 @@ pub async fn run( chain_store.cheap_clone(), chain_store, subgraph_store.clone(), - firehose_endpoints.map_or_else(|| FirehoseNetworkEndpoints::new(), |v| v.clone()), + firehose_endpoints.map_or_else(|| FirehoseEndpoints::new(), |v| v.clone()), eth_adapters, chain_head_update_listener, *ANCESTOR_COUNT, @@ -499,86 +496,3 @@ async fn connect_ethereum_networks( (eth_networks, idents) } -/// Try to connect to all the providers in `firehose_networks` and get their net -/// version and genesis block. Return the same `eth_networks` and the -/// retrieved net identifiers grouped by network name. Remove all providers -/// for which trying to connect resulted in an error from the returned -/// `EthereumNetworks`, since it's likely pointless to try and connect to -/// them. If the connection attempt to a provider times out after -/// `NET_VERSION_WAIT_TIME`, keep the provider, but don't report a -/// version for it. -async fn connect_firehose_networks( - logger: &Logger, - mut firehose_networks: FirehoseNetworks, -) -> (FirehoseNetworks, Vec<(String, Vec)>) -where - M: prost::Message + BlockchainBlock + Default + 'static, -{ - // This has one entry for each provider, and therefore multiple entries - // for each network - let statuses = join_all( - firehose_networks - .flatten() - .into_iter() - .map(|(network_name, endpoint)| (network_name, endpoint, logger.clone())) - .map(|(network, endpoint, logger)| async move { - let logger = logger.new(o!("provider" => endpoint.provider.to_string())); - info!( - logger, "Connecting to Firehose to get network identifier"; - "url" => &endpoint.uri, - ); - match tokio::time::timeout( - NET_VERSION_WAIT_TIME, - endpoint.genesis_block_ptr::(&logger), - ) - .await - .map_err(Error::from) - { - // An `Err` means a timeout, an `Ok(Err)` means some other error (maybe a typo - // on the URL) - Ok(Err(e)) | Err(e) => { - error!(logger, "Connection to provider failed. Not using this provider"; - "error" => e.to_string()); - ProviderNetworkStatus::Broken { - network, - provider: endpoint.provider.to_string(), - } - } - Ok(Ok(ptr)) => { - info!( - logger, - "Connected to Firehose"; - "uri" => &endpoint.uri, - "genesis_block" => format_args!("{}", &ptr), - ); - - let ident = ChainIdentifier { - net_version: "0".to_string(), - genesis_block_hash: ptr.hash, - }; - - ProviderNetworkStatus::Version { network, ident } - } - } - }), - ) - .await; - - // Group identifiers by network name - let idents: HashMap> = - statuses - .into_iter() - .fold(HashMap::new(), |mut networks, status| { - match status { - ProviderNetworkStatus::Broken { network, provider } => { - firehose_networks.remove(&network, &provider) - } - ProviderNetworkStatus::Version { network, ident } => { - networks.entry(network.to_string()).or_default().push(ident) - } - } - networks - }); - let idents: Vec<_> = idents.into_iter().collect(); - (firehose_networks, idents) -} From 78d1dcc6279bad8e035b0854e4c612669d66de65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Mon, 10 Jan 2022 09:07:10 -0500 Subject: [PATCH 4/6] more linting --- node/src/manager/commands/test_run.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/node/src/manager/commands/test_run.rs b/node/src/manager/commands/test_run.rs index 886eb3c4a17..fd51323513f 100644 --- a/node/src/manager/commands/test_run.rs +++ b/node/src/manager/commands/test_run.rs @@ -1,7 +1,7 @@ use std::collections::{BTreeMap, HashMap}; +use std::env; use std::sync::Arc; use std::time::Duration; -use std::env; use crate::config::{Config, ProviderDetails}; use crate::manager::deployment::Deployment; @@ -16,12 +16,11 @@ use graph::cheap_clone::CheapClone; use graph::components::store::BlockStore as _; use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints, FirehoseNetworks}; use graph::ipfs_client::IpfsClient; +use graph::prelude::MetricsRegistry as MetricsRegistryTrait; use graph::prelude::{ - anyhow, tokio, BlockNumber, DeploymentHash, LoggerFactory, NodeId, - SubgraphAssignmentProvider, SubgraphName, SubgraphRegistrar, SubgraphStore, - SubgraphVersionSwitchingMode, + anyhow, tokio, BlockNumber, DeploymentHash, LoggerFactory, NodeId, SubgraphAssignmentProvider, + SubgraphName, SubgraphRegistrar, SubgraphStore, SubgraphVersionSwitchingMode, }; -use graph::prelude::MetricsRegistry as MetricsRegistryTrait; use graph::slog::{debug, error, info, o, Logger}; use graph::util::security::SafeDisplay; use graph_chain_ethereum::{self as ethereum, EthereumAdapterTrait, Transport}; @@ -495,4 +494,3 @@ async fn connect_ethereum_networks( let idents: Vec<_> = idents.into_iter().collect(); (eth_networks, idents) } - From 3c247c1e66e73de66f3c8a98111d7196ff395437 Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Mon, 10 Jan 2022 16:24:34 -0500 Subject: [PATCH 5/6] Fixed migrations not being applied --- node/src/bin/manager.rs | 9 ++++++--- node/src/manager/commands/test_run.rs | 14 ++++---------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 8ef82f7c60e..770e0752b45 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -434,6 +434,10 @@ impl Context { pools } + async fn store_builder(self) -> StoreBuilder { + StoreBuilder::new(&self.logger, &self.node_id, &self.config, self.registry).await + } + fn store_and_pools(self) -> (Arc, HashMap) { let (subgraph_store, pools) = StoreBuilder::make_subgraph_store_and_pools( &self.logger, @@ -614,12 +618,11 @@ async fn main() { let config = ctx.config(); let registry = ctx.metrics_registry().clone(); let node_id = ctx.node_id().clone(); - let (store, primary) = ctx.store_and_primary(); + let store_builder = ctx.store_builder().await; commands::test_run::run( - primary, - store, logger, + store_builder, network_name, config, registry, diff --git a/node/src/manager/commands/test_run.rs b/node/src/manager/commands/test_run.rs index fd51323513f..bf63ce83ec2 100644 --- a/node/src/manager/commands/test_run.rs +++ b/node/src/manager/commands/test_run.rs @@ -28,14 +28,12 @@ use graph_core::{ LinkResolver, MetricsRegistry, SubgraphAssignmentProvider as IpfsSubgraphAssignmentProvider, SubgraphInstanceManager, SubgraphRegistrar as IpfsSubgraphRegistrar, }; -use graph_store_postgres::{connection_pool::ConnectionPool, Store}; use lazy_static::lazy_static; use std::str::FromStr; pub async fn run( - primary: ConnectionPool, - _store: Arc, logger: Logger, + store_builder: StoreBuilder, network_name: String, config: Config, metrics_registry: Arc, @@ -77,10 +75,6 @@ pub async fn run( } }; - let store_builder = - StoreBuilder::new(&logger, &node_id, &config, metrics_registry.clone()).await; - let chain_head_update_listener = store_builder.chain_head_update_listener(); - let (_, ethereum_idents) = connect_ethereum_networks(&logger, eth_networks).await; // let (near_networks, near_idents) = connect_firehose_networks::( // &logger, @@ -90,6 +84,8 @@ pub async fn run( // ) // .await; + let chain_head_update_listener = store_builder.chain_head_update_listener(); + let primary_pool = store_builder.primary_pool(); let network_identifiers = ethereum_idents.into_iter().collect(); let network_store = store_builder.network_store(network_identifiers); @@ -180,9 +176,7 @@ pub async fn run( ) .await?; - // let deployment_locator = DeploymentLocator::new(DeploymentId(deployment_id), subgraph_hash); - - let deployments = Deployment::lookup(&primary, name.to_string())?; + let deployments = Deployment::lookup(&primary_pool, name.to_string())?; let deployment = deployments .first() .expect("At least one deployment should exist"); From 855c4b9aee73f1c018a89cf329755e125695feba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Wed, 12 Jan 2022 09:53:51 -0500 Subject: [PATCH 6/6] rename graphman 'test-run' to 'run' --- node/src/bin/manager.rs | 10 +++++----- node/src/manager/commands/mod.rs | 2 +- node/src/manager/commands/{test_run.rs => run.rs} | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) rename node/src/manager/commands/{test_run.rs => run.rs} (99%) diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 770e0752b45..a2743a4bf71 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -144,15 +144,15 @@ pub enum Command { /// The deployments to rewind names: Vec, }, - /// Runs a suite of tests around a specific subgraph - TestRun { + /// Deploy and run an arbitrary subgraph, up to a certain block (for dev and testing purposes) -- WARNING: WILL RUN MIGRATIONS ON THE DB, DO NOT USE IN PRODUCTION + Run { /// Network name (must fit one of the chain) network_name: String, /// Subgraph in the form `` or `:` subgraph: String, - /// Number of block to process + /// Highest block number to process before stopping (inclusive) stop_block: i32, }, /// Check and interrogate the configuration @@ -609,7 +609,7 @@ async fn main() { sleep, ) } - TestRun { + Run { network_name, subgraph, stop_block, @@ -620,7 +620,7 @@ async fn main() { let node_id = ctx.node_id().clone(); let store_builder = ctx.store_builder().await; - commands::test_run::run( + commands::run::run( logger, store_builder, network_name, diff --git a/node/src/manager/commands/mod.rs b/node/src/manager/commands/mod.rs index 3f5ff87bf10..27b0eff0e87 100644 --- a/node/src/manager/commands/mod.rs +++ b/node/src/manager/commands/mod.rs @@ -8,7 +8,7 @@ pub mod listen; pub mod query; pub mod remove; pub mod rewind; +pub mod run; pub mod stats; -pub mod test_run; pub mod txn_speed; pub mod unused_deployments; diff --git a/node/src/manager/commands/test_run.rs b/node/src/manager/commands/run.rs similarity index 99% rename from node/src/manager/commands/test_run.rs rename to node/src/manager/commands/run.rs index bf63ce83ec2..9c5c522da94 100644 --- a/node/src/manager/commands/test_run.rs +++ b/node/src/manager/commands/run.rs @@ -42,7 +42,7 @@ pub async fn run( stop_block: BlockNumber, ) -> Result<(), anyhow::Error> { println!( - "Test run starting subgraph => {}, stop_block = {}", + "Run command: starting subgraph => {}, stop_block = {}", subgraph, stop_block ); @@ -70,8 +70,8 @@ pub async fn run( Some(adapters) => adapters.clone(), None => { return Err(format_err!( - "No ethereum adapters found, but required in this state of graphman test-run command" - )) + "No ethereum adapters found, but required in this state of graphman run command" + )) } };