diff --git a/src/sync_connection_wrapper/mod.rs b/src/sync_connection_wrapper/mod.rs index 9f28e5b..75a2122 100644 --- a/src/sync_connection_wrapper/mod.rs +++ b/src/sync_connection_wrapper/mod.rs @@ -6,33 +6,30 @@ //! //! * using a sync Connection implementation in async context //! * using the same code base for async crates needing multiple backends - -use crate::{AsyncConnection, SimpleAsyncConnection, TransactionManager}; -use diesel::backend::{Backend, DieselReserveSpecialization}; -use diesel::connection::{CacheSize, Instrumentation}; -use diesel::connection::{ - Connection, LoadConnection, TransactionManagerStatus, WithMetadataLookup, -}; -use diesel::query_builder::{ - AsQuery, CollectedQuery, MoveableBindCollector, QueryBuilder, QueryFragment, QueryId, -}; -use diesel::row::IntoOwnedRow; -use diesel::{ConnectionResult, QueryResult}; use futures_util::future::BoxFuture; -use futures_util::stream::BoxStream; -use futures_util::{FutureExt, StreamExt, TryFutureExt}; -use std::marker::PhantomData; -use std::sync::{Arc, Mutex}; -use tokio::task::JoinError; +use std::error::Error; #[cfg(feature = "sqlite")] mod sqlite; -fn from_tokio_join_error(join_error: JoinError) -> diesel::result::Error { - diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::UnableToSendCommand, - Box::new(join_error.to_string()), - ) +/// This is a helper trait that allows to customize the +/// spawning blocking tasks as part of the +/// [`SyncConnectionWrapper`] type. By default a +/// tokio runtime and its spawn_blocking function is used. +pub trait SpawnBlocking { + /// This function should allow to execute a + /// given blocking task without blocking the caller + /// to get the result + fn spawn_blocking<'a, R>( + &mut self, + task: impl FnOnce() -> R + Send + 'static, + ) -> BoxFuture<'a, Result>> + where + R: Send + 'static; + + /// This function should be used to construct + /// a new runtime instance + fn get_runtime() -> Self; } /// A wrapper of a [`diesel::connection::Connection`] usable in async context. @@ -73,309 +70,427 @@ fn from_tokio_join_error(join_error: JoinError) -> diesel::result::Error { /// # some_async_fn().await; /// # } /// ``` -pub struct SyncConnectionWrapper { - inner: Arc>, -} +#[cfg(feature = "tokio")] +pub type SyncConnectionWrapper = + self::implementation::SyncConnectionWrapper; + +/// A wrapper of a [`diesel::connection::Connection`] usable in async context. +/// +/// It implements AsyncConnection if [`diesel::connection::Connection`] fullfils requirements: +/// * it's a [`diesel::connection::LoadConnection`] +/// * its [`diesel::connection::Connection::Backend`] has a [`diesel::query_builder::BindCollector`] implementing [`diesel::query_builder::MoveableBindCollector`] +/// * its [`diesel::connection::LoadConnection::Row`] implements [`diesel::row::IntoOwnedRow`] +/// +/// Internally this wrapper type will use `spawn_blocking` on given type implementing [`SpawnBlocking`] trait +/// to execute the request on the inner connection. +#[cfg(not(feature = "tokio"))] +pub use self::implementation::SyncConnectionWrapper; -impl SimpleAsyncConnection for SyncConnectionWrapper -where - C: diesel::connection::Connection + 'static, -{ - async fn batch_execute(&mut self, query: &str) -> QueryResult<()> { - let query = query.to_string(); - self.spawn_blocking(move |inner| inner.batch_execute(query.as_str())) - .await +pub use self::implementation::SyncTransactionManagerWrapper; + +mod implementation { + use crate::{AsyncConnection, SimpleAsyncConnection, TransactionManager}; + use diesel::backend::{Backend, DieselReserveSpecialization}; + use diesel::connection::{CacheSize, Instrumentation}; + use diesel::connection::{ + Connection, LoadConnection, TransactionManagerStatus, WithMetadataLookup, + }; + use diesel::query_builder::{ + AsQuery, CollectedQuery, MoveableBindCollector, QueryBuilder, QueryFragment, QueryId, + }; + use diesel::row::IntoOwnedRow; + use diesel::{ConnectionResult, QueryResult}; + use futures_util::stream::BoxStream; + use futures_util::{FutureExt, StreamExt, TryFutureExt}; + use std::marker::PhantomData; + use std::sync::{Arc, Mutex}; + + use super::*; + + fn from_spawn_blocking_error( + error: Box, + ) -> diesel::result::Error { + diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::UnableToSendCommand, + Box::new(error.to_string()), + ) } -} -impl AsyncConnection for SyncConnectionWrapper -where - // Backend bounds - ::Backend: std::default::Default + DieselReserveSpecialization, - ::QueryBuilder: std::default::Default, - // Connection bounds - C: Connection + LoadConnection + WithMetadataLookup + 'static, - ::TransactionManager: Send, - // BindCollector bounds - MD: Send + 'static, - for<'a> ::BindCollector<'a>: - MoveableBindCollector + std::default::Default, - // Row bounds - O: 'static + Send + for<'conn> diesel::row::Row<'conn, C::Backend>, - for<'conn, 'query> ::Row<'conn, 'query>: - IntoOwnedRow<'conn, ::Backend, OwnedRow = O>, -{ - type LoadFuture<'conn, 'query> = BoxFuture<'query, QueryResult>>; - type ExecuteFuture<'conn, 'query> = BoxFuture<'query, QueryResult>; - type Stream<'conn, 'query> = BoxStream<'static, QueryResult>>; - type Row<'conn, 'query> = O; - type Backend = ::Backend; - type TransactionManager = SyncTransactionManagerWrapper<::TransactionManager>; - - async fn establish(database_url: &str) -> ConnectionResult { - let database_url = database_url.to_string(); - tokio::task::spawn_blocking(move || C::establish(&database_url)) - .await - .unwrap_or_else(|e| Err(diesel::ConnectionError::BadConnection(e.to_string()))) - .map(|c| SyncConnectionWrapper::new(c)) + pub struct SyncConnectionWrapper { + inner: Arc>, + runtime: S, } - fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query> + impl SimpleAsyncConnection for SyncConnectionWrapper where - T: AsQuery + 'query, - T::Query: QueryFragment + QueryId + 'query, + C: diesel::connection::Connection + 'static, + S: SpawnBlocking + Send, { - self.execute_with_prepared_query(source.as_query(), |conn, query| { - use diesel::row::IntoOwnedRow; - let mut cache = <<::Row<'_, '_> as IntoOwnedRow< - ::Backend, - >>::Cache as Default>::default(); - let cursor = conn.load(&query)?; - - let size_hint = cursor.size_hint(); - let mut out = Vec::with_capacity(size_hint.1.unwrap_or(size_hint.0)); - // we use an explicit loop here to easily propagate possible errors - // as early as possible - for row in cursor { - out.push(Ok(IntoOwnedRow::into_owned(row?, &mut cache))); + async fn batch_execute(&mut self, query: &str) -> QueryResult<()> { + let query = query.to_string(); + self.spawn_blocking(move |inner| inner.batch_execute(query.as_str())) + .await + } + } + + impl AsyncConnection for SyncConnectionWrapper + where + // Backend bounds + ::Backend: std::default::Default + DieselReserveSpecialization, + ::QueryBuilder: std::default::Default, + // Connection bounds + C: Connection + LoadConnection + WithMetadataLookup + 'static, + ::TransactionManager: Send, + // BindCollector bounds + MD: Send + 'static, + for<'a> ::BindCollector<'a>: + MoveableBindCollector + std::default::Default, + // Row bounds + O: 'static + Send + for<'conn> diesel::row::Row<'conn, C::Backend>, + for<'conn, 'query> ::Row<'conn, 'query>: + IntoOwnedRow<'conn, ::Backend, OwnedRow = O>, + // SpawnBlocking bounds + S: SpawnBlocking + Send, + { + type LoadFuture<'conn, 'query> = + BoxFuture<'query, QueryResult>>; + type ExecuteFuture<'conn, 'query> = BoxFuture<'query, QueryResult>; + type Stream<'conn, 'query> = BoxStream<'static, QueryResult>>; + type Row<'conn, 'query> = O; + type Backend = ::Backend; + type TransactionManager = + SyncTransactionManagerWrapper<::TransactionManager>; + + async fn establish(database_url: &str) -> ConnectionResult { + let database_url = database_url.to_string(); + let mut runtime = S::get_runtime(); + + runtime + .spawn_blocking(move || C::establish(&database_url)) + .await + .unwrap_or_else(|e| Err(diesel::ConnectionError::BadConnection(e.to_string()))) + .map(move |c| SyncConnectionWrapper::with_runtime(c, runtime)) + } + + fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query> + where + T: AsQuery + 'query, + T::Query: QueryFragment + QueryId + 'query, + { + self.execute_with_prepared_query(source.as_query(), |conn, query| { + use diesel::row::IntoOwnedRow; + let mut cache = <<::Row<'_, '_> as IntoOwnedRow< + ::Backend, + >>::Cache as Default>::default(); + let cursor = conn.load(&query)?; + + let size_hint = cursor.size_hint(); + let mut out = Vec::with_capacity(size_hint.1.unwrap_or(size_hint.0)); + // we use an explicit loop here to easily propagate possible errors + // as early as possible + for row in cursor { + out.push(Ok(IntoOwnedRow::into_owned(row?, &mut cache))); + } + + Ok(out) + }) + .map_ok(|rows| futures_util::stream::iter(rows).boxed()) + .boxed() + } + + fn execute_returning_count<'query, T>( + &mut self, + source: T, + ) -> Self::ExecuteFuture<'_, 'query> + where + T: QueryFragment + QueryId, + { + self.execute_with_prepared_query(source, |conn, query| { + conn.execute_returning_count(&query) + }) + } + + fn transaction_state( + &mut self, + ) -> &mut >::TransactionStateData + { + self.exclusive_connection().transaction_state() + } + + fn instrumentation(&mut self) -> &mut dyn Instrumentation { + // there should be no other pending future when this is called + // that means there is only one instance of this arc and + // we can simply access the inner data + if let Some(inner) = Arc::get_mut(&mut self.inner) { + inner + .get_mut() + .unwrap_or_else(|p| p.into_inner()) + .instrumentation() + } else { + panic!("Cannot access shared instrumentation") } + } + + fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) { + // there should be no other pending future when this is called + // that means there is only one instance of this arc and + // we can simply access the inner data + if let Some(inner) = Arc::get_mut(&mut self.inner) { + inner + .get_mut() + .unwrap_or_else(|p| p.into_inner()) + .set_instrumentation(instrumentation) + } else { + panic!("Cannot access shared instrumentation") + } + } - Ok(out) - }) - .map_ok(|rows| futures_util::stream::iter(rows).boxed()) - .boxed() + fn set_prepared_statement_cache_size(&mut self, size: CacheSize) { + // there should be no other pending future when this is called + // that means there is only one instance of this arc and + // we can simply access the inner data + if let Some(inner) = Arc::get_mut(&mut self.inner) { + inner + .get_mut() + .unwrap_or_else(|p| p.into_inner()) + .set_prepared_statement_cache_size(size) + } else { + panic!("Cannot access shared cache") + } + } } - fn execute_returning_count<'query, T>(&mut self, source: T) -> Self::ExecuteFuture<'_, 'query> + /// A wrapper of a diesel transaction manager usable in async context. + pub struct SyncTransactionManagerWrapper(PhantomData); + + impl TransactionManager> for SyncTransactionManagerWrapper where - T: QueryFragment + QueryId, + SyncConnectionWrapper: AsyncConnection, + C: Connection + 'static, + S: SpawnBlocking, + T: diesel::connection::TransactionManager + Send, { - self.execute_with_prepared_query(source, |conn, query| conn.execute_returning_count(&query)) - } + type TransactionStateData = T::TransactionStateData; - fn transaction_state( - &mut self, - ) -> &mut >::TransactionStateData { - self.exclusive_connection().transaction_state() - } + async fn begin_transaction(conn: &mut SyncConnectionWrapper) -> QueryResult<()> { + conn.spawn_blocking(move |inner| T::begin_transaction(inner)) + .await + } - fn instrumentation(&mut self) -> &mut dyn Instrumentation { - // there should be no other pending future when this is called - // that means there is only one instance of this arc and - // we can simply access the inner data - if let Some(inner) = Arc::get_mut(&mut self.inner) { - inner - .get_mut() - .unwrap_or_else(|p| p.into_inner()) - .instrumentation() - } else { - panic!("Cannot access shared instrumentation") + async fn commit_transaction(conn: &mut SyncConnectionWrapper) -> QueryResult<()> { + conn.spawn_blocking(move |inner| T::commit_transaction(inner)) + .await } - } - fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) { - // there should be no other pending future when this is called - // that means there is only one instance of this arc and - // we can simply access the inner data - if let Some(inner) = Arc::get_mut(&mut self.inner) { - inner - .get_mut() - .unwrap_or_else(|p| p.into_inner()) - .set_instrumentation(instrumentation) - } else { - panic!("Cannot access shared instrumentation") + async fn rollback_transaction(conn: &mut SyncConnectionWrapper) -> QueryResult<()> { + conn.spawn_blocking(move |inner| T::rollback_transaction(inner)) + .await } - } - fn set_prepared_statement_cache_size(&mut self, size: CacheSize) { - // there should be no other pending future when this is called - // that means there is only one instance of this arc and - // we can simply access the inner data - if let Some(inner) = Arc::get_mut(&mut self.inner) { - inner - .get_mut() - .unwrap_or_else(|p| p.into_inner()) - .set_prepared_statement_cache_size(size) - } else { - panic!("Cannot access shared cache") + fn transaction_manager_status_mut( + conn: &mut SyncConnectionWrapper, + ) -> &mut TransactionManagerStatus { + T::transaction_manager_status_mut(conn.exclusive_connection()) } } -} -/// A wrapper of a diesel transaction manager usable in async context. -pub struct SyncTransactionManagerWrapper(PhantomData); + impl SyncConnectionWrapper { + /// Builds a wrapper with this underlying sync connection + pub fn new(connection: C) -> Self + where + C: Connection, + S: SpawnBlocking, + { + SyncConnectionWrapper { + inner: Arc::new(Mutex::new(connection)), + runtime: S::get_runtime(), + } + } + + /// Builds a wrapper with this underlying sync connection + /// and runtime for spawning blocking tasks + pub fn with_runtime(connection: C, runtime: S) -> Self + where + C: Connection, + S: SpawnBlocking, + { + SyncConnectionWrapper { + inner: Arc::new(Mutex::new(connection)), + runtime, + } + } -impl TransactionManager> for SyncTransactionManagerWrapper -where - SyncConnectionWrapper: AsyncConnection, - C: Connection + 'static, - T: diesel::connection::TransactionManager + Send, -{ - type TransactionStateData = T::TransactionStateData; + /// Run a operation directly with the inner connection + /// + /// This function is usful to register custom functions + /// and collection for Sqlite for example + /// + /// # Example + /// + /// ```rust + /// # include!("../doctest_setup.rs"); + /// # #[tokio::main] + /// # async fn main() { + /// # run_test().await.unwrap(); + /// # } + /// # + /// # async fn run_test() -> QueryResult<()> { + /// # let mut conn = establish_connection().await; + /// conn.spawn_blocking(|conn| { + /// // sqlite.rs sqlite NOCASE only works for ASCII characters, + /// // this collation allows handling UTF-8 (barring locale differences) + /// conn.register_collation("RUSTNOCASE", |rhs, lhs| { + /// rhs.to_lowercase().cmp(&lhs.to_lowercase()) + /// }) + /// }).await + /// + /// # } + /// ``` + pub fn spawn_blocking<'a, R>( + &mut self, + task: impl FnOnce(&mut C) -> QueryResult + Send + 'static, + ) -> BoxFuture<'a, QueryResult> + where + C: Connection + 'static, + R: Send + 'static, + S: SpawnBlocking, + { + let inner = self.inner.clone(); + self.runtime + .spawn_blocking(move || { + let mut inner = inner.lock().unwrap_or_else(|poison| { + // try to be resilient by providing the guard + inner.clear_poison(); + poison.into_inner() + }); + task(&mut inner) + }) + .unwrap_or_else(|err| QueryResult::Err(from_spawn_blocking_error(err))) + .boxed() + } - async fn begin_transaction(conn: &mut SyncConnectionWrapper) -> QueryResult<()> { - conn.spawn_blocking(move |inner| T::begin_transaction(inner)) - .await - } + fn execute_with_prepared_query<'a, MD, Q, R>( + &mut self, + query: Q, + callback: impl FnOnce(&mut C, &CollectedQuery) -> QueryResult + Send + 'static, + ) -> BoxFuture<'a, QueryResult> + where + // Backend bounds + ::Backend: std::default::Default + DieselReserveSpecialization, + ::QueryBuilder: std::default::Default, + // Connection bounds + C: Connection + LoadConnection + WithMetadataLookup + 'static, + ::TransactionManager: Send, + // BindCollector bounds + MD: Send + 'static, + for<'b> ::BindCollector<'b>: + MoveableBindCollector + std::default::Default, + // Arguments/Return bounds + Q: QueryFragment + QueryId, + R: Send + 'static, + // SpawnBlocking bounds + S: SpawnBlocking, + { + let backend = C::Backend::default(); - async fn commit_transaction(conn: &mut SyncConnectionWrapper) -> QueryResult<()> { - conn.spawn_blocking(move |inner| T::commit_transaction(inner)) - .await - } + let (collect_bind_result, collector_data) = { + let exclusive = self.inner.clone(); + let mut inner = exclusive.lock().unwrap_or_else(|poison| { + // try to be resilient by providing the guard + exclusive.clear_poison(); + poison.into_inner() + }); + let mut bind_collector = + <::BindCollector<'_> as Default>::default(); + let metadata_lookup = inner.metadata_lookup(); + let result = query.collect_binds(&mut bind_collector, metadata_lookup, &backend); + let collector_data = bind_collector.moveable(); - async fn rollback_transaction(conn: &mut SyncConnectionWrapper) -> QueryResult<()> { - conn.spawn_blocking(move |inner| T::rollback_transaction(inner)) - .await - } + (result, collector_data) + }; - fn transaction_manager_status_mut( - conn: &mut SyncConnectionWrapper, - ) -> &mut TransactionManagerStatus { - T::transaction_manager_status_mut(conn.exclusive_connection()) - } -} + let mut query_builder = <::QueryBuilder as Default>::default(); + let sql = query + .to_sql(&mut query_builder, &backend) + .map(|_| query_builder.finish()); + let is_safe_to_cache_prepared = query.is_safe_to_cache_prepared(&backend); -impl SyncConnectionWrapper { - /// Builds a wrapper with this underlying sync connection - pub fn new(connection: C) -> Self - where - C: Connection, - { - SyncConnectionWrapper { - inner: Arc::new(Mutex::new(connection)), + self.spawn_blocking(|inner| { + collect_bind_result?; + let query = CollectedQuery::new(sql?, is_safe_to_cache_prepared?, collector_data); + callback(inner, &query) + }) + } + + /// Gets an exclusive access to the underlying diesel Connection + /// + /// It panics in case of shared access. + /// This is typically used only used during transaction. + pub(self) fn exclusive_connection(&mut self) -> &mut C + where + C: Connection, + { + // there should be no other pending future when this is called + // that means there is only one instance of this Arc and + // we can simply access the inner data + if let Some(conn_mutex) = Arc::get_mut(&mut self.inner) { + conn_mutex + .get_mut() + .expect("Mutex is poisoned, a thread must have panicked holding it.") + } else { + panic!("Cannot access shared transaction state") + } } } - /// Run a operation directly with the inner connection - /// - /// This function is usful to register custom functions - /// and collection for Sqlite for example - /// - /// # Example - /// - /// ```rust - /// # include!("../doctest_setup.rs"); - /// # #[tokio::main] - /// # async fn main() { - /// # run_test().await.unwrap(); - /// # } - /// # - /// # async fn run_test() -> QueryResult<()> { - /// # let mut conn = establish_connection().await; - /// conn.spawn_blocking(|conn| { - /// // sqlite.rs sqlite NOCASE only works for ASCII characters, - /// // this collation allows handling UTF-8 (barring locale differences) - /// conn.register_collation("RUSTNOCASE", |rhs, lhs| { - /// rhs.to_lowercase().cmp(&lhs.to_lowercase()) - /// }) - /// }).await - /// - /// # } - /// ``` - pub fn spawn_blocking<'a, R>( - &mut self, - task: impl FnOnce(&mut C) -> QueryResult + Send + 'static, - ) -> BoxFuture<'a, QueryResult> + #[cfg(any( + feature = "deadpool", + feature = "bb8", + feature = "mobc", + feature = "r2d2" + ))] + impl crate::pooled_connection::PoolableConnection for SyncConnectionWrapper where - C: Connection + 'static, - R: Send + 'static, + Self: AsyncConnection, { - let inner = self.inner.clone(); - tokio::task::spawn_blocking(move || { - let mut inner = inner.lock().unwrap_or_else(|poison| { - // try to be resilient by providing the guard - inner.clear_poison(); - poison.into_inner() - }); - task(&mut inner) - }) - .unwrap_or_else(|err| QueryResult::Err(from_tokio_join_error(err))) - .boxed() + fn is_broken(&mut self) -> bool { + Self::TransactionManager::is_broken_transaction_manager(self) + } } - fn execute_with_prepared_query<'a, MD, Q, R>( - &mut self, - query: Q, - callback: impl FnOnce(&mut C, &CollectedQuery) -> QueryResult + Send + 'static, - ) -> BoxFuture<'a, QueryResult> - where - // Backend bounds - ::Backend: std::default::Default + DieselReserveSpecialization, - ::QueryBuilder: std::default::Default, - // Connection bounds - C: Connection + LoadConnection + WithMetadataLookup + 'static, - ::TransactionManager: Send, - // BindCollector bounds - MD: Send + 'static, - for<'b> ::BindCollector<'b>: - MoveableBindCollector + std::default::Default, - // Arguments/Return bounds - Q: QueryFragment + QueryId, - R: Send + 'static, - { - let backend = C::Backend::default(); - - let (collect_bind_result, collector_data) = { - let exclusive = self.inner.clone(); - let mut inner = exclusive.lock().unwrap_or_else(|poison| { - // try to be resilient by providing the guard - exclusive.clear_poison(); - poison.into_inner() - }); - let mut bind_collector = - <::BindCollector<'_> as Default>::default(); - let metadata_lookup = inner.metadata_lookup(); - let result = query.collect_binds(&mut bind_collector, metadata_lookup, &backend); - let collector_data = bind_collector.moveable(); - - (result, collector_data) - }; - - let mut query_builder = <::QueryBuilder as Default>::default(); - let sql = query - .to_sql(&mut query_builder, &backend) - .map(|_| query_builder.finish()); - let is_safe_to_cache_prepared = query.is_safe_to_cache_prepared(&backend); - - self.spawn_blocking(|inner| { - collect_bind_result?; - let query = CollectedQuery::new(sql?, is_safe_to_cache_prepared?, collector_data); - callback(inner, &query) - }) + #[cfg(feature = "tokio")] + pub enum Tokio { + Handle(tokio::runtime::Handle), + Runtime(tokio::runtime::Runtime), } - /// Gets an exclusive access to the underlying diesel Connection - /// - /// It panics in case of shared access. - /// This is typically used only used during transaction. - pub(self) fn exclusive_connection(&mut self) -> &mut C - where - C: Connection, - { - // there should be no other pending future when this is called - // that means there is only one instance of this Arc and - // we can simply access the inner data - if let Some(conn_mutex) = Arc::get_mut(&mut self.inner) { - conn_mutex - .get_mut() - .expect("Mutex is poisoned, a thread must have panicked holding it.") - } else { - panic!("Cannot access shared transaction state") + #[cfg(feature = "tokio")] + impl SpawnBlocking for Tokio { + fn spawn_blocking<'a, R>( + &mut self, + task: impl FnOnce() -> R + Send + 'static, + ) -> BoxFuture<'a, Result>> + where + R: Send + 'static, + { + let fut = match self { + Tokio::Handle(handle) => handle.spawn_blocking(task), + Tokio::Runtime(runtime) => runtime.spawn_blocking(task), + }; + + fut.map_err(|err| Box::from(err)).boxed() } - } -} -#[cfg(any( - feature = "deadpool", - feature = "bb8", - feature = "mobc", - feature = "r2d2" -))] -impl crate::pooled_connection::PoolableConnection for SyncConnectionWrapper -where - Self: AsyncConnection, -{ - fn is_broken(&mut self) -> bool { - Self::TransactionManager::is_broken_transaction_manager(self) + fn get_runtime() -> Self { + if let Ok(handle) = tokio::runtime::Handle::try_current() { + Tokio::Handle(handle) + } else { + let runtime = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + + Tokio::Runtime(runtime) + } + } } }