1
1
use std:: collections:: { BTreeMap , HashMap } ;
2
2
use std:: sync:: Arc ;
3
3
use std:: time:: Duration ;
4
- use std :: { collections :: HashSet , convert :: TryFrom } ;
5
- use std:: { env, thread } ;
4
+
5
+ use std:: { env} ;
6
6
7
7
use crate :: config:: { Config , ProviderDetails } ;
8
8
use crate :: manager:: deployment:: Deployment ;
@@ -11,34 +11,33 @@ use crate::store_builder::StoreBuilder;
11
11
use ethereum:: { EthereumNetworks , ProviderEthRpcMetrics } ;
12
12
use futures:: future:: join_all;
13
13
use futures:: TryFutureExt ;
14
- use graph:: anyhow:: { bail , format_err, Error } ;
15
- use graph:: blockchain:: { Block as BlockchainBlock , BlockchainKind , BlockchainMap , ChainIdentifier } ;
14
+ use graph:: anyhow:: { format_err, Error } ;
15
+ use graph:: blockchain:: { BlockchainKind , BlockchainMap , ChainIdentifier } ;
16
16
use graph:: cheap_clone:: CheapClone ;
17
- use graph:: components:: store:: { BlockStore as _, ChainStore as _, DeploymentId , DeploymentLocator } ;
18
17
use graph:: firehose:: endpoints:: { FirehoseEndpoint , FirehoseNetworkEndpoints , FirehoseNetworks } ;
19
18
use graph:: ipfs_client:: IpfsClient ;
20
- use graph :: log :: logger ;
19
+
21
20
use graph:: prelude:: {
22
- anyhow, tokio, BlockNumber , BlockPtr , DeploymentHash , LoggerFactory , NodeId ,
21
+ anyhow, tokio, BlockNumber , DeploymentHash , LoggerFactory , NodeId ,
23
22
SubgraphAssignmentProvider , SubgraphName , SubgraphRegistrar , SubgraphStore ,
24
23
SubgraphVersionSwitchingMode ,
25
24
} ;
26
- use graph:: prelude:: { prost , MetricsRegistry as MetricsRegistryTrait } ;
25
+ use graph:: prelude:: { MetricsRegistry as MetricsRegistryTrait } ;
27
26
use graph:: slog:: { debug, error, info, o, Logger } ;
28
27
use graph:: util:: security:: SafeDisplay ;
29
- use graph_chain_ethereum:: { self as ethereum, network_indexer , EthereumAdapterTrait , Transport } ;
28
+ use graph_chain_ethereum:: { self as ethereum, EthereumAdapterTrait , Transport } ;
30
29
use graph_core:: {
31
30
LinkResolver , MetricsRegistry , SubgraphAssignmentProvider as IpfsSubgraphAssignmentProvider ,
32
31
SubgraphInstanceManager , SubgraphRegistrar as IpfsSubgraphRegistrar ,
33
32
} ;
34
- use graph_store_postgres :: BlockStore ;
33
+ use graph :: components :: store :: BlockStore ;
35
34
use graph_store_postgres:: { connection_pool:: ConnectionPool , Store } ;
36
35
use lazy_static:: lazy_static;
37
36
use std:: str:: FromStr ;
38
37
39
38
pub async fn run (
40
39
primary : ConnectionPool ,
41
- store : Arc < Store > ,
40
+ _store : Arc < Store > ,
42
41
logger : Logger ,
43
42
network_name : String ,
44
43
config : Config ,
@@ -85,15 +84,7 @@ pub async fn run(
85
84
StoreBuilder :: new ( & logger, & node_id, & config, metrics_registry. clone ( ) ) . await ;
86
85
let chain_head_update_listener = store_builder. chain_head_update_listener ( ) ;
87
86
88
- let ( eth_networks, ethereum_idents) = connect_ethereum_networks ( & logger, eth_networks) . await ;
89
- // let (near_networks, near_idents) = connect_firehose_networks::<NearFirehoseHeaderOnlyBlock>(
90
- // &logger,
91
- // firehose_networks_by_kind
92
- // .remove(&BlockchainKind::Near)
93
- // .unwrap_or_else(|| FirehoseNetworks::new()),
94
- // )
95
- // .await;
96
-
87
+ let ( _eth_networks, ethereum_idents) = connect_ethereum_networks ( & logger, eth_networks) . await ;
97
88
let network_identifiers = ethereum_idents. into_iter ( ) . collect ( ) ;
98
89
let network_store = store_builder. network_store ( network_identifiers) ;
99
90
@@ -491,87 +482,3 @@ async fn connect_ethereum_networks(
491
482
let idents: Vec < _ > = idents. into_iter ( ) . collect ( ) ;
492
483
( eth_networks, idents)
493
484
}
494
-
495
- /// Try to connect to all the providers in `firehose_networks` and get their net
496
- /// version and genesis block. Return the same `eth_networks` and the
497
- /// retrieved net identifiers grouped by network name. Remove all providers
498
- /// for which trying to connect resulted in an error from the returned
499
- /// `EthereumNetworks`, since it's likely pointless to try and connect to
500
- /// them. If the connection attempt to a provider times out after
501
- /// `NET_VERSION_WAIT_TIME`, keep the provider, but don't report a
502
- /// version for it.
503
- async fn connect_firehose_networks < M > (
504
- logger : & Logger ,
505
- mut firehose_networks : FirehoseNetworks ,
506
- ) -> ( FirehoseNetworks , Vec < ( String , Vec < ChainIdentifier > ) > )
507
- where
508
- M : prost:: Message + BlockchainBlock + Default + ' static ,
509
- {
510
- // This has one entry for each provider, and therefore multiple entries
511
- // for each network
512
- let statuses = join_all (
513
- firehose_networks
514
- . flatten ( )
515
- . into_iter ( )
516
- . map ( |( network_name, endpoint) | ( network_name, endpoint, logger. clone ( ) ) )
517
- . map ( |( network, endpoint, logger) | async move {
518
- let logger = logger. new ( o ! ( "provider" => endpoint. provider. to_string( ) ) ) ;
519
- info ! (
520
- logger, "Connecting to Firehose to get network identifier" ;
521
- "url" => & endpoint. uri,
522
- ) ;
523
- match tokio:: time:: timeout (
524
- NET_VERSION_WAIT_TIME ,
525
- endpoint. genesis_block_ptr :: < M > ( & logger) ,
526
- )
527
- . await
528
- . map_err ( Error :: from)
529
- {
530
- // An `Err` means a timeout, an `Ok(Err)` means some other error (maybe a typo
531
- // on the URL)
532
- Ok ( Err ( e) ) | Err ( e) => {
533
- error ! ( logger, "Connection to provider failed. Not using this provider" ;
534
- "error" => e. to_string( ) ) ;
535
- ProviderNetworkStatus :: Broken {
536
- network,
537
- provider : endpoint. provider . to_string ( ) ,
538
- }
539
- }
540
- Ok ( Ok ( ptr) ) => {
541
- info ! (
542
- logger,
543
- "Connected to Firehose" ;
544
- "uri" => & endpoint. uri,
545
- "genesis_block" => format_args!( "{}" , & ptr) ,
546
- ) ;
547
-
548
- let ident = ChainIdentifier {
549
- net_version : "0" . to_string ( ) ,
550
- genesis_block_hash : ptr. hash ,
551
- } ;
552
-
553
- ProviderNetworkStatus :: Version { network, ident }
554
- }
555
- }
556
- } ) ,
557
- )
558
- . await ;
559
-
560
- // Group identifiers by network name
561
- let idents: HashMap < String , Vec < ChainIdentifier > > =
562
- statuses
563
- . into_iter ( )
564
- . fold ( HashMap :: new ( ) , |mut networks, status| {
565
- match status {
566
- ProviderNetworkStatus :: Broken { network, provider } => {
567
- firehose_networks. remove ( & network, & provider)
568
- }
569
- ProviderNetworkStatus :: Version { network, ident } => {
570
- networks. entry ( network. to_string ( ) ) . or_default ( ) . push ( ident)
571
- }
572
- }
573
- networks
574
- } ) ;
575
- let idents: Vec < _ > = idents. into_iter ( ) . collect ( ) ;
576
- ( firehose_networks, idents)
577
- }
0 commit comments