Skip to content

Commit 50473b5

Browse files
authored
chore: decouple reply_builder from ConnectionContext (#4069)
Signed-off-by: Roman Gershman <roman@dragonflydb.io>
1 parent 9b7af7d commit 50473b5

19 files changed

+89
-109
lines changed

src/facade/conn_context.cc

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,7 @@
1212

1313
namespace facade {
1414

15-
ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : owner_(owner) {
16-
if (owner) {
17-
protocol_ = owner->protocol();
18-
}
19-
20-
if (stream) {
21-
switch (protocol_) {
22-
case Protocol::NONE:
23-
LOG(DFATAL) << "Invalid protocol";
24-
break;
25-
case Protocol::REDIS: {
26-
rbuilder_.reset(new RedisReplyBuilder(stream));
27-
break;
28-
}
29-
case Protocol::MEMCACHE:
30-
rbuilder_.reset(new MCReplyBuilder(stream));
31-
break;
32-
}
33-
}
34-
15+
ConnectionContext::ConnectionContext(Connection* owner) : owner_(owner) {
3516
conn_closing = false;
3617
req_auth = false;
3718
replica_conn = false;
@@ -46,7 +27,7 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow
4627
}
4728

4829
size_t ConnectionContext::UsedMemory() const {
49-
return dfly::HeapSize(rbuilder_) + dfly::HeapSize(authed_username) + dfly::HeapSize(acl_commands);
30+
return dfly::HeapSize(authed_username) + dfly::HeapSize(acl_commands);
5031
}
5132

5233
} // namespace facade

src/facade/conn_context.h

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class Connection;
1919

2020
class ConnectionContext {
2121
public:
22-
ConnectionContext(::io::Sink* stream, Connection* owner);
22+
explicit ConnectionContext(Connection* owner);
2323

2424
virtual ~ConnectionContext() {
2525
}
@@ -32,14 +32,6 @@ class ConnectionContext {
3232
return owner_;
3333
}
3434

35-
Protocol protocol() const {
36-
return protocol_;
37-
}
38-
39-
SinkReplyBuilder* reply_builder_old() {
40-
return rbuilder_.get();
41-
}
42-
4335
virtual size_t UsedMemory() const;
4436

4537
// connection state / properties.
@@ -71,8 +63,6 @@ class ConnectionContext {
7163

7264
private:
7365
Connection* owner_;
74-
Protocol protocol_ = Protocol::REDIS;
75-
std::unique_ptr<SinkReplyBuilder> rbuilder_;
7666
};
7767

7868
} // namespace facade

src/facade/dragonfly_connection.cc

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -487,14 +487,15 @@ void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg
487487
DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front());
488488

489489
self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()},
490-
self->reply_builder_, self->cc_.get());
490+
self->reply_builder_.get(), self->cc_.get());
491491

492492
self->last_interaction_ = time(nullptr);
493493
self->skip_next_squashing_ = false;
494494
}
495495

496496
void Connection::DispatchOperations::operator()(const Connection::MCPipelineMessage& msg) {
497-
self->service_->DispatchMC(msg.cmd, msg.value, static_cast<MCReplyBuilder*>(self->reply_builder_),
497+
self->service_->DispatchMC(msg.cmd, msg.value,
498+
static_cast<MCReplyBuilder*>(self->reply_builder_.get()),
498499
self->cc_.get());
499500
self->last_interaction_ = time(nullptr);
500501
}
@@ -538,21 +539,17 @@ void UpdateLibNameVerMap(const string& name, const string& ver, int delta) {
538539
Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
539540
ServiceInterface* service)
540541
: io_buf_(kMinReadSize),
542+
protocol_(protocol),
541543
http_listener_(http_listener),
542544
ssl_ctx_(ctx),
543545
service_(service),
544546
flags_(0) {
545547
static atomic_uint32_t next_id{1};
546548

547-
protocol_ = protocol;
548-
549549
constexpr size_t kReqSz = sizeof(Connection::PipelineMessage);
550550
static_assert(kReqSz <= 256 && kReqSz >= 200);
551551

552552
switch (protocol) {
553-
case Protocol::NONE:
554-
LOG(DFATAL) << "Invalid protocol";
555-
break;
556553
case Protocol::REDIS:
557554
redis_parser_.reset(new RedisParser(GetFlag(FLAGS_max_multi_bulk_len)));
558555
break;
@@ -727,8 +724,7 @@ void Connection::HandleRequests() {
727724
// because both Write and Recv internally check if the socket was shut
728725
// down and return with an error accordingly.
729726
if (http_res && socket_->IsOpen()) {
730-
cc_.reset(service_->CreateContext(socket_.get(), this));
731-
reply_builder_ = cc_->reply_builder_old();
727+
cc_.reset(service_->CreateContext(this));
732728

733729
if (*http_res) {
734730
VLOG(1) << "HTTP1.1 identified";
@@ -748,19 +744,28 @@ void Connection::HandleRequests() {
748744
// Release the ownership of the socket from http_conn so it would stay with
749745
// this connection.
750746
http_conn.ReleaseSocket();
751-
} else {
747+
} else { // non-http
752748
if (breaker_cb_) {
753749
socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); });
754750
}
755-
751+
switch (protocol_) {
752+
case Protocol::REDIS:
753+
reply_builder_.reset(new RedisReplyBuilder(socket_.get()));
754+
break;
755+
case Protocol::MEMCACHE:
756+
reply_builder_.reset(new MCReplyBuilder(socket_.get()));
757+
break;
758+
default:
759+
break;
760+
}
756761
ConnectionFlow();
757762

758763
socket_->CancelOnErrorCb(); // noop if nothing is registered.
764+
VLOG(1) << "Closed connection for peer "
765+
<< GetClientInfo(fb2::ProactorBase::me()->GetPoolIndex());
766+
reply_builder_.reset();
759767
}
760-
VLOG(1) << "Closed connection for peer "
761-
<< GetClientInfo(fb2::ProactorBase::me()->GetPoolIndex());
762768
cc_.reset();
763-
reply_builder_ = nullptr;
764769
}
765770
}
766771

@@ -932,6 +937,8 @@ io::Result<bool> Connection::CheckForHttpProto() {
932937
}
933938

934939
void Connection::ConnectionFlow() {
940+
DCHECK(reply_builder_);
941+
935942
++stats_->num_conns;
936943
++stats_->conn_received_cnt;
937944
stats_->read_buf_capacity += io_buf_.Capacity();
@@ -989,7 +996,7 @@ void Connection::ConnectionFlow() {
989996
VLOG(1) << "Error parser status " << parser_error_;
990997

991998
if (redis_parser_) {
992-
SendProtocolError(RedisParser::Result(parser_error_), reply_builder_);
999+
SendProtocolError(RedisParser::Result(parser_error_), reply_builder_.get());
9931000
} else {
9941001
DCHECK(memcache_parser_);
9951002
reply_builder_->SendProtocolError("bad command line format");
@@ -1092,7 +1099,7 @@ Connection::ParserStatus Connection::ParseRedis() {
10921099

10931100
auto dispatch_sync = [this, &parse_args, &cmd_vec] {
10941101
RespExpr::VecToArgList(parse_args, &cmd_vec);
1095-
service_->DispatchCommand(absl::MakeSpan(cmd_vec), reply_builder_, cc_.get());
1102+
service_->DispatchCommand(absl::MakeSpan(cmd_vec), reply_builder_.get(), cc_.get());
10961103
};
10971104
auto dispatch_async = [this, &parse_args, tlh = mi_heap_get_backing()]() -> MessageHandle {
10981105
return {FromArgs(std::move(parse_args), tlh)};
@@ -1137,14 +1144,14 @@ auto Connection::ParseMemcache() -> ParserStatus {
11371144
string_view value;
11381145

11391146
auto dispatch_sync = [this, &cmd, &value] {
1140-
service_->DispatchMC(cmd, value, static_cast<MCReplyBuilder*>(reply_builder_), cc_.get());
1147+
service_->DispatchMC(cmd, value, static_cast<MCReplyBuilder*>(reply_builder_.get()), cc_.get());
11411148
};
11421149

11431150
auto dispatch_async = [&cmd, &value]() -> MessageHandle {
11441151
return {make_unique<MCPipelineMessage>(std::move(cmd), value)};
11451152
};
11461153

1147-
MCReplyBuilder* builder = static_cast<MCReplyBuilder*>(reply_builder_);
1154+
MCReplyBuilder* builder = static_cast<MCReplyBuilder*>(reply_builder_.get());
11481155

11491156
do {
11501157
string_view str = ToSV(io_buf_.InputBuffer());
@@ -1377,7 +1384,7 @@ void Connection::SquashPipeline() {
13771384
cc_->async_dispatch = true;
13781385

13791386
size_t dispatched =
1380-
service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), reply_builder_, cc_.get());
1387+
service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), reply_builder_.get(), cc_.get());
13811388

13821389
if (pending_pipeline_cmd_cnt_ == squash_cmds.size()) { // Flush if no new commands appeared
13831390
reply_builder_->Flush();
@@ -1400,7 +1407,7 @@ void Connection::SquashPipeline() {
14001407
}
14011408

14021409
void Connection::ClearPipelinedMessages() {
1403-
DispatchOperations dispatch_op{reply_builder_, this};
1410+
DispatchOperations dispatch_op{reply_builder_.get(), this};
14041411

14051412
// Recycle messages even from disconnecting client to keep properly track of memory stats
14061413
// As well as to avoid pubsub backpressure leakege.
@@ -1448,7 +1455,7 @@ std::string Connection::DebugInfo() const {
14481455
void Connection::ExecutionFiber() {
14491456
ThisFiber::SetName("ExecutionFiber");
14501457

1451-
DispatchOperations dispatch_op{reply_builder_, this};
1458+
DispatchOperations dispatch_op{reply_builder_.get(), this};
14521459

14531460
size_t squashing_threshold = GetFlag(FLAGS_pipeline_squash);
14541461

@@ -1812,7 +1819,7 @@ Connection::MemoryUsage Connection::GetMemoryUsage() const {
18121819
size_t mem = sizeof(*this) + dfly::HeapSize(dispatch_q_) + dfly::HeapSize(name_) +
18131820
dfly::HeapSize(tmp_parse_args_) + dfly::HeapSize(tmp_cmd_vec_) +
18141821
dfly::HeapSize(memcache_parser_) + dfly::HeapSize(redis_parser_) +
1815-
dfly::HeapSize(cc_);
1822+
dfly::HeapSize(cc_) + dfly::HeapSize(reply_builder_);
18161823

18171824
// We add a hardcoded 9k value to accomodate for the part of the Fiber stack that is in use.
18181825
// The allocated stack is actually larger (~130k), but only a small fraction of that (9k

src/facade/dragonfly_connection.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -269,10 +269,6 @@ class Connection : public util::Connection {
269269

270270
bool IsMain() const;
271271

272-
Protocol protocol() const {
273-
return protocol_;
274-
}
275-
276272
void SetName(std::string name);
277273

278274
void SetLibName(std::string name);
@@ -404,9 +400,7 @@ class Connection : public util::Connection {
404400
Protocol protocol_;
405401
ConnectionStats* stats_ = nullptr;
406402

407-
// cc_->reply_builder may change during the lifetime of the connection, due to injections.
408-
// This is a pointer to the original, socket based reply builder that never changes.
409-
SinkReplyBuilder* reply_builder_ = nullptr;
403+
std::unique_ptr<SinkReplyBuilder> reply_builder_;
410404
util::HttpListenerBase* http_listener_;
411405
SSL_CTX* ssl_ctx_;
412406

src/facade/dragonfly_listener.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ class Listener : public util::ListenerInterface {
5050
bool IsPrivilegedInterface() const;
5151
bool IsMainInterface() const;
5252

53+
Protocol protocol() const {
54+
return protocol_;
55+
}
56+
5357
private:
5458
util::Connection* NewConnection(ProactorBase* proactor) final;
5559
ProactorBase* PickConnectionProactor(util::FiberSocketBase* sock) final;

src/facade/facade.cc

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,6 @@ ostream& operator<<(ostream& os, facade::CmdArgList ras) {
165165
return os;
166166
}
167167

168-
ostream& operator<<(ostream& os, facade::Protocol p) {
169-
return os << int(p);
170-
}
171-
172168
ostream& operator<<(ostream& os, const facade::RespExpr& e) {
173169
using facade::RespExpr;
174170
using facade::ToSV;
@@ -213,4 +209,17 @@ ostream& operator<<(ostream& os, facade::RespSpan ras) {
213209
return os;
214210
}
215211

212+
ostream& operator<<(ostream& os, facade::Protocol p) {
213+
switch (p) {
214+
case facade::Protocol::REDIS:
215+
os << "REDIS";
216+
break;
217+
case facade::Protocol::MEMCACHE:
218+
os << "MEMCACHE";
219+
break;
220+
}
221+
222+
return os;
223+
}
224+
216225
} // namespace std

src/facade/facade_types.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ constexpr size_t kSanitizerOverhead = 0u;
3333
#endif
3434
#endif
3535

36-
enum class Protocol : uint8_t { NONE = 0, MEMCACHE = 1, REDIS = 2 };
36+
enum class Protocol : uint8_t { MEMCACHE = 1, REDIS = 2 };
3737

3838
using MutableSlice = std::string_view;
3939
using CmdArgList = absl::Span<const std::string_view>;

src/facade/ok_main.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ class OkService : public ServiceInterface {
3737
builder->SendError("");
3838
}
3939

40-
ConnectionContext* CreateContext(util::FiberSocketBase* peer, Connection* owner) final {
41-
return new ConnectionContext{peer, owner};
40+
ConnectionContext* CreateContext(Connection* owner) final {
41+
return new ConnectionContext{owner};
4242
}
4343
};
4444

src/facade/reply_builder.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,7 @@ class SinkReplyBuilder {
8787
}
8888

8989
public: // High level interface
90-
virtual Protocol GetProtocol() const {
91-
return Protocol::NONE;
92-
}
90+
virtual Protocol GetProtocol() const = 0;
9391

9492
virtual void SendLong(long val) = 0;
9593
virtual void SendSimpleString(std::string_view str) = 0;

src/facade/service_interface.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class ServiceInterface {
3636
virtual void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
3737
MCReplyBuilder* builder, ConnectionContext* cntx) = 0;
3838

39-
virtual ConnectionContext* CreateContext(util::FiberSocketBase* peer, Connection* owner) = 0;
39+
virtual ConnectionContext* CreateContext(Connection* owner) = 0;
4040

4141
virtual void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) {
4242
}

src/server/acl/acl_family.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "base/logging.h"
2828
#include "core/overloaded.h"
2929
#include "facade/dragonfly_connection.h"
30+
#include "facade/dragonfly_listener.h"
3031
#include "facade/facade_types.h"
3132
#include "io/file.h"
3233
#include "io/file_util.h"
@@ -102,14 +103,13 @@ void AclFamily::StreamUpdatesToAllProactorConnections(const std::string& user,
102103
auto update_cb = [&]([[maybe_unused]] size_t id, util::Connection* conn) {
103104
DCHECK(conn);
104105
auto connection = static_cast<facade::Connection*>(conn);
105-
if (connection->protocol() == facade::Protocol::REDIS && !connection->IsHttp() &&
106-
connection->cntx()) {
106+
if (!connection->IsHttp() && connection->cntx()) {
107107
connection->SendAclUpdateAsync(
108108
facade::Connection::AclUpdateMessage{user, update_commands, update_keys, update_pub_sub});
109109
}
110110
};
111111

112-
if (main_listener_) {
112+
if (main_listener_ && main_listener_->protocol() == facade::Protocol::REDIS) {
113113
main_listener_->TraverseConnections(update_cb);
114114
}
115115
}

src/server/acl/acl_family.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
#include <vector>
1111

1212
#include "absl/container/flat_hash_set.h"
13-
#include "facade/dragonfly_listener.h"
1413
#include "facade/facade_types.h"
1514
#include "helio/util/proactor_pool.h"
1615
#include "server/acl/acl_commands_def.h"
@@ -20,6 +19,7 @@
2019

2120
namespace facade {
2221
class SinkReplyBuilder;
22+
class Listener;
2323
} // namespace facade
2424

2525
namespace dfly {

src/server/conn_context.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,8 @@ const CommandId* StoredCmd::Cid() const {
100100
return cid_;
101101
}
102102

103-
ConnectionContext::ConnectionContext(::io::Sink* stream, facade::Connection* owner,
104-
acl::UserCredentials cred)
105-
: facade::ConnectionContext(stream, owner) {
103+
ConnectionContext::ConnectionContext(facade::Connection* owner, acl::UserCredentials cred)
104+
: facade::ConnectionContext(owner) {
106105
if (owner) {
107106
skip_acl_validation = owner->IsPrivileged();
108107
}
@@ -117,7 +116,7 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, facade::Connection* own
117116
}
118117

119118
ConnectionContext::ConnectionContext(const ConnectionContext* owner, Transaction* tx)
120-
: facade::ConnectionContext(nullptr, nullptr), transaction{tx} {
119+
: facade::ConnectionContext(nullptr), transaction{tx} {
121120
if (owner) {
122121
acl_commands = owner->acl_commands;
123122
keys = owner->keys;

0 commit comments

Comments
 (0)