-
-
Notifications
You must be signed in to change notification settings - Fork 92
Expose underlying connection errors to user of AsyncPgConnection #121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR 👍 It's more or less what I had in mind.
I've left a few comments about things that I would like to change. In addition we might need to make sure that we also perform that selection in SimpleConnection::batch_execute
.
src/pg/mod.rs
Outdated
.boxed() | ||
}); | ||
|
||
match self.error_receiver.try_recv() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need to have this block here if we handle it in with_prepared_statement
anyway.
src/pg/mod.rs
Outdated
.boxed() | ||
}); | ||
|
||
match self.error_receiver.try_recv() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need to have this block here if we handle it in with_prepared_statement anyway.
src/pg/mod.rs
Outdated
@@ -411,8 +440,12 @@ impl AsyncPgConnection { | |||
let res = callback(raw_connection, stmt.clone(), binds).await; | |||
let mut tm = tm.lock().await; | |||
update_transaction_manager_status(res, &mut tm) | |||
}; | |||
|
|||
match self.error_receiver.try_recv() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using try_recv()
only gets the errors that have occurred until this function is called. It does not get us the errors that might occur while the future f
is executed. That's the reason why I would prefer using select!
between the channel (future) and the actual future f
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good point. I tried the snippet you suggested but tokio::select
only works in an async context. To deal with that I tried to move it into the above async closure but I had difficulty with that too. Let me try the latter again.
4438e23
to
f61f8ea
Compare
Hey hey, just got this working! At first I tried this: https://gist.github.com/banool/b62cda55a4cac09c0d57e6b283b40778. This doesn't work because At first I was relucant to use Arc + Mutex to solve this but I see you're doing this already in other cases. So that's what I did. Check out the new code! |
97520ec
to
5f436a1
Compare
5f436a1
to
7115c86
Compare
If this looks good I can make that |
@@ -102,6 +103,7 @@ pub struct AsyncPgConnection { | |||
stmt_cache: Arc<Mutex<StmtCache<diesel::pg::Pg, Statement>>>, | |||
transaction_state: Arc<Mutex<AnsiTransactionManager>>, | |||
metadata_cache: Arc<Mutex<PgMetadataCache>>, | |||
error_receiver: Arc<Mutex<tokio::sync::oneshot::Receiver<diesel::result::Error>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if using a mutex here is a good idea, because that one would be locked while executing the query (compared to while starting to prepare as for the other mutexes). That would basically restrict the ability to do pipe lining at all. I need to think about a better solution, probably using a mpmc
channel or something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps the approach is this:
- Make a new async closure.
- In that closure, take the lock.
- Manually poll the future. If the future is pending we return future not ready for the closure.
I'm not 100% sure if this works but the overall idea is to make sure we only take the lock to check if the channel has an error in it and if it doesn't, let go of the lock and move on instead of blocking (so tokio::select can instead drive the other branch).
I notice that some other tokio channels maybe have an alternative form of what we need, this poll_recv function: https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.poll_recv.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've finally found the time to have another look at this. #132 is what I've settled on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neato, looks good to me! I'll close this one out.
Summary
This PR addresses #118. In short, rather than just printing when there is an error with the tokio postgres connection, we expose it to the user in query path. If there is an error, they will get that rather than the inner result.
Test Plan
Set up DB:
Run tests:
I don't have any new tests yet. Adding a test for this behavior would be quite hard, we'd have to introduce some kind of induced failure + a way to have a deterministic order of steps. We can discuss here after we decide if the core change is good.