ClickHouse/src/Client/HedgedConnections.cpp

508 lines
17 KiB
C++
Raw Normal View History

2021-01-29 15:46:28 +00:00
#if defined(OS_LINUX)
2021-01-19 19:21:06 +00:00
#include <Client/HedgedConnections.h>
#include <Interpreters/ClientInfo.h>
namespace DB
{
namespace ErrorCodes
{
extern const int MISMATCH_REPLICAS_DATA_SOURCES;
extern const int LOGICAL_ERROR;
extern const int SOCKET_TIMEOUT;
2021-02-06 00:54:27 +00:00
extern const int ALL_CONNECTION_TRIES_FAILED;
2021-01-19 19:21:06 +00:00
}
HedgedConnections::HedgedConnections(
const ConnectionPoolWithFailoverPtr & pool_,
const Settings & settings_,
const ConnectionTimeouts & timeouts_,
const ThrottlerPtr & throttler_,
PoolMode pool_mode,
2021-01-19 19:21:06 +00:00
std::shared_ptr<QualifiedTableName> table_to_check_)
2021-02-06 00:54:27 +00:00
: hedged_connections_factory(pool_, &settings_, timeouts_, table_to_check_)
, settings(settings_)
, throttler(throttler_)
, log(&Poco::Logger::get("HedgedConnections"))
2021-01-19 19:21:06 +00:00
{
2021-02-06 00:54:27 +00:00
std::vector<Connection *> connections = hedged_connections_factory.getManyConnections(pool_mode);
2021-01-19 19:21:06 +00:00
2021-02-08 13:08:15 +00:00
ReplicaStatePtr replica = nullptr;
2021-02-06 00:54:27 +00:00
for (size_t i = 0; i != connections.size(); ++i)
2021-01-19 19:21:06 +00:00
{
2021-02-08 13:08:15 +00:00
replica = std::make_shared<ReplicaState>();
replica->connection = connections[i];
replica->offset = i;
replica->connection->setThrottler(throttler_);
int socket_fd = replica->connection->getSocket()->impl()->sockfd();
2021-02-06 00:54:27 +00:00
epoll.add(socket_fd);
2021-02-08 13:08:15 +00:00
fd_to_replica[socket_fd] = replica;
offset_states.push_back(OffsetState{{std::move(replica)}, 1, false});
2021-01-19 19:21:06 +00:00
}
2021-02-06 00:54:27 +00:00
active_connection_count = connections.size();
offsets_with_received_first_data_packet = 0;
2021-02-08 13:08:15 +00:00
pipeline_for_new_replicas.add([throttler_](ReplicaStatePtr & replica_) { replica_->connection->setThrottler(throttler_); });
2021-01-19 19:21:06 +00:00
}
2021-02-08 13:08:15 +00:00
void HedgedConnections::Pipeline::add(std::function<void(ReplicaStatePtr & replica)> send_function)
2021-01-19 19:21:06 +00:00
{
pipeline.push_back(send_function);
}
2021-02-08 13:08:15 +00:00
void HedgedConnections::Pipeline::run(ReplicaStatePtr & replica)
2021-01-19 19:21:06 +00:00
{
for (auto & send_func : pipeline)
send_func(replica);
}
void HedgedConnections::sendScalarsData(Scalars & data)
{
std::lock_guard lock(cancel_mutex);
if (!sent_query)
throw Exception("Cannot send scalars data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
2021-02-08 13:08:15 +00:00
auto send_scalars_data = [&data](ReplicaStatePtr & replica) { replica->connection->sendScalarsData(data); };
2021-01-19 19:21:06 +00:00
2021-02-06 00:54:27 +00:00
for (auto & offset_state : offset_states)
for (auto & replica : offset_state.replicas)
2021-02-08 13:08:15 +00:00
if (replica->connection)
send_scalars_data(replica);
2021-01-19 19:21:06 +00:00
pipeline_for_new_replicas.add(send_scalars_data);
2021-01-19 19:21:06 +00:00
}
void HedgedConnections::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{
std::lock_guard lock(cancel_mutex);
if (!sent_query)
throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
if (data.size() != size())
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
2021-02-08 13:08:15 +00:00
auto send_external_tables_data = [&data](ReplicaStatePtr & replica) { replica->connection->sendExternalTablesData(data[0]); };
2021-01-19 19:21:06 +00:00
2021-02-06 00:54:27 +00:00
for (auto & offset_state : offset_states)
for (auto & replica : offset_state.replicas)
2021-02-08 13:08:15 +00:00
if (replica->connection)
send_external_tables_data(replica);
2021-01-19 19:21:06 +00:00
pipeline_for_new_replicas.add(send_external_tables_data);
2021-01-19 19:21:06 +00:00
}
void HedgedConnections::sendIgnoredPartUUIDs(const std::vector<UUID> & uuids)
{
std::lock_guard lock(cancel_mutex);
if (sent_query)
throw Exception("Cannot send uuids after query is sent.", ErrorCodes::LOGICAL_ERROR);
2021-02-08 13:08:15 +00:00
auto send_ignored_part_uuids = [&uuids](ReplicaStatePtr & replica) { replica->connection->sendIgnoredPartUUIDs(uuids); };
for (auto & offset_state : offset_states)
for (auto & replica : offset_state.replicas)
2021-02-08 13:08:15 +00:00
if (replica->connection)
send_ignored_part_uuids(replica);
pipeline_for_new_replicas.add(send_ignored_part_uuids);
}
2021-01-19 19:21:06 +00:00
void HedgedConnections::sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id,
UInt64 stage,
const ClientInfo & client_info,
bool with_pending_data)
{
std::lock_guard lock(cancel_mutex);
if (sent_query)
throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);
2021-02-06 00:54:27 +00:00
for (auto & offset_state : offset_states)
2021-01-19 19:21:06 +00:00
{
2021-02-06 00:54:27 +00:00
for (auto & replica : offset_state.replicas)
2021-01-19 19:21:06 +00:00
{
2021-02-08 13:08:15 +00:00
if (replica->connection->getServerRevision(timeouts) < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
{
2021-02-02 12:14:31 +00:00
disable_two_level_aggregation = true;
break;
}
}
2021-02-02 12:14:31 +00:00
if (disable_two_level_aggregation)
break;
}
2021-02-08 13:08:15 +00:00
auto send_query = [this, timeouts, query, query_id, stage, client_info, with_pending_data](ReplicaStatePtr & replica) {
2021-02-06 00:54:27 +00:00
Settings modified_settings = settings;
2021-02-06 00:54:27 +00:00
if (disable_two_level_aggregation)
{
/// Disable two-level aggregation due to version incompatibility.
2021-01-19 19:21:06 +00:00
modified_settings.group_by_two_level_threshold = 0;
modified_settings.group_by_two_level_threshold_bytes = 0;
}
2021-02-06 00:54:27 +00:00
if (offset_states.size() > 1)
{
2021-02-06 00:54:27 +00:00
modified_settings.parallel_replicas_count = offset_states.size();
2021-02-08 13:08:15 +00:00
modified_settings.parallel_replica_offset = replica->offset;
}
2021-02-08 13:08:15 +00:00
replica->connection->sendQuery(timeouts, query, query_id, stage, &modified_settings, &client_info, with_pending_data);
2021-02-06 00:54:27 +00:00
addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_TIMEOUT, replica);
addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_DATA_TIMEOUT, replica);
2021-01-19 19:21:06 +00:00
};
2021-02-06 00:54:27 +00:00
for (auto & offset_status : offset_states)
for (auto & replica : offset_status.replicas)
send_query(replica);
2021-01-19 19:21:06 +00:00
pipeline_for_new_replicas.add(send_query);
2021-01-19 19:21:06 +00:00
sent_query = true;
}
void HedgedConnections::disconnect()
{
std::lock_guard lock(cancel_mutex);
2021-02-06 00:54:27 +00:00
for (auto & offset_status : offset_states)
for (auto & replica : offset_status.replicas)
2021-02-08 13:08:15 +00:00
if (replica->connection)
finishProcessReplica(replica, true);
2021-02-06 00:54:27 +00:00
if (hedged_connections_factory.hasEventsInProcess())
2021-01-19 19:21:06 +00:00
{
if (next_replica_in_process)
2021-02-06 00:54:27 +00:00
{
epoll.remove(hedged_connections_factory.getFileDescriptor());
next_replica_in_process = false;
}
hedged_connections_factory.stopChoosingReplicas();
2021-01-19 19:21:06 +00:00
}
}
std::string HedgedConnections::dumpAddresses() const
{
std::lock_guard lock(cancel_mutex);
2021-01-29 15:46:28 +00:00
std::string addresses;
bool is_first = true;
2021-01-19 19:21:06 +00:00
2021-02-06 00:54:27 +00:00
for (const auto & offset_state : offset_states)
{
2021-02-06 00:54:27 +00:00
for (const auto & replica : offset_state.replicas)
{
2021-02-08 13:08:15 +00:00
if (replica->connection)
{
2021-02-08 13:08:15 +00:00
addresses += (is_first ? "" : "; ") + replica->connection->getDescription();
is_first = false;
}
}
}
2021-01-19 19:21:06 +00:00
return addresses;
}
void HedgedConnections::sendCancel()
{
std::lock_guard lock(cancel_mutex);
if (!sent_query || cancelled)
throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR);
2021-02-06 00:54:27 +00:00
for (auto & offset_status : offset_states)
for (auto & replica : offset_status.replicas)
2021-02-08 13:08:15 +00:00
if (replica->connection)
replica->connection->sendCancel();
2021-01-19 19:21:06 +00:00
cancelled = true;
}
Packet HedgedConnections::drain()
{
std::lock_guard lock(cancel_mutex);
if (!cancelled)
throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR);
Packet res;
res.type = Protocol::Server::EndOfStream;
2021-01-29 15:46:28 +00:00
while (!epoll.empty())
2021-01-19 19:21:06 +00:00
{
Packet packet = receivePacketImpl();
switch (packet.type)
{
case Protocol::Server::PartUUIDs:
2021-01-19 19:21:06 +00:00
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
case Protocol::Server::EndOfStream:
break;
case Protocol::Server::Exception:
default:
/// If we receive an exception or an unknown packet, we save it.
res = std::move(packet);
break;
}
}
return res;
}
Packet HedgedConnections::receivePacket()
{
std::lock_guard lock(cancel_mutex);
2021-01-29 15:46:28 +00:00
return receivePacketUnlocked({});
2021-01-19 19:21:06 +00:00
}
Packet HedgedConnections::receivePacketUnlocked(AsyncCallback async_callback)
{
if (!sent_query)
throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR);
if (!hasActiveConnections())
throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR);
2021-01-29 15:46:28 +00:00
if (epoll.empty())
2021-01-19 19:21:06 +00:00
throw Exception("No pending events in epoll.", ErrorCodes::LOGICAL_ERROR);
return receivePacketImpl(std::move(async_callback));
}
Packet HedgedConnections::receivePacketImpl(AsyncCallback async_callback)
{
int event_fd;
Packet packet;
bool finish = false;
while (!finish)
{
event_fd = getReadyFileDescriptor(async_callback);
2021-01-19 19:21:06 +00:00
2021-02-08 13:08:15 +00:00
if (fd_to_replica.contains(event_fd))
2021-01-19 19:21:06 +00:00
{
2021-02-08 13:08:15 +00:00
packet = receivePacketFromReplica(fd_to_replica[event_fd], async_callback);
2021-01-19 19:21:06 +00:00
finish = true;
}
2021-02-08 13:08:15 +00:00
else if (timeout_fd_to_replica.contains(event_fd))
{
2021-02-08 13:08:15 +00:00
ReplicaStatePtr & replica = timeout_fd_to_replica[event_fd];
processTimeoutEvent(replica, replica->active_timeouts[event_fd]);
}
2021-02-06 00:54:27 +00:00
else if (event_fd == hedged_connections_factory.getFileDescriptor())
tryGetNewReplica(false);
2021-01-19 19:21:06 +00:00
else
throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR);
}
return packet;
};
int HedgedConnections::getReadyFileDescriptor(AsyncCallback async_callback)
{
2021-02-08 13:08:15 +00:00
for (auto & [fd, replica] : fd_to_replica)
2021-02-06 00:54:27 +00:00
{
2021-02-08 13:08:15 +00:00
if (replica->connection->hasReadPendingData())
return replica->connection->getSocket()->impl()->sockfd();
2021-02-06 00:54:27 +00:00
}
2021-02-06 14:23:48 +00:00
epoll_event event;
event.data.fd = -1;
epoll.getManyReady(1, &event, true, std::move(async_callback));
return event.data.fd;
}
2021-02-08 13:08:15 +00:00
Packet HedgedConnections::receivePacketFromReplica(ReplicaStatePtr & replica, AsyncCallback async_callback)
2021-01-19 19:21:06 +00:00
{
2021-02-06 00:54:27 +00:00
removeTimeoutFromReplica(ConnectionTimeoutType::RECEIVE_TIMEOUT, replica);
2021-02-08 13:08:15 +00:00
Packet packet = replica->connection->receivePacket(std::move(async_callback));
2021-01-19 19:21:06 +00:00
switch (packet.type)
{
case Protocol::Server::Data:
2021-02-08 13:08:15 +00:00
if (!offset_states[replica->offset].first_packet_of_data_received)
processReceivedFirstDataPacket(replica);
2021-02-06 00:54:27 +00:00
addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_TIMEOUT, replica);
2021-01-19 19:21:06 +00:00
break;
case Protocol::Server::PartUUIDs:
2021-01-19 19:21:06 +00:00
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
case Protocol::Server::Log:
2021-02-06 00:54:27 +00:00
addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_TIMEOUT, replica);
2021-01-19 19:21:06 +00:00
break;
case Protocol::Server::EndOfStream:
finishProcessReplica(replica, false);
break;
case Protocol::Server::Exception:
default:
finishProcessReplica(replica, true);
break;
}
return packet;
}
2021-02-08 13:08:15 +00:00
void HedgedConnections::processReceivedFirstDataPacket(ReplicaStatePtr & replica)
2021-01-19 19:21:06 +00:00
{
2021-02-02 12:14:31 +00:00
/// When we receive first packet of data from replica, we stop working with replicas, that are
/// responsible for the same offset.
2021-02-08 13:08:15 +00:00
OffsetState & offset_state = offset_states[replica->offset];
removeTimeoutFromReplica(ConnectionTimeoutType::RECEIVE_DATA_TIMEOUT, replica);
2021-02-06 00:54:27 +00:00
++offsets_with_received_first_data_packet;
offset_state.first_packet_of_data_received = true;
2021-01-19 19:21:06 +00:00
2021-02-08 13:08:15 +00:00
for (auto & other_replica : offset_state.replicas)
2021-01-19 19:21:06 +00:00
{
2021-02-08 13:08:15 +00:00
if (replica != other_replica && other_replica->connection)
{
2021-02-08 13:08:15 +00:00
other_replica->connection->sendCancel();
finishProcessReplica(other_replica, true);
}
2021-01-19 19:21:06 +00:00
}
2021-02-02 12:14:31 +00:00
/// If we received data from replicas with all offsets, we need to stop choosing new replicas.
2021-02-06 00:54:27 +00:00
if (hedged_connections_factory.hasEventsInProcess() && offsets_with_received_first_data_packet == offset_states.size())
2021-01-19 19:21:06 +00:00
{
if (next_replica_in_process)
2021-02-06 00:54:27 +00:00
{
epoll.remove(hedged_connections_factory.getFileDescriptor());
next_replica_in_process = false;
}
hedged_connections_factory.stopChoosingReplicas();
2021-01-19 19:21:06 +00:00
}
}
2021-02-08 13:08:15 +00:00
void HedgedConnections::processTimeoutEvent(ReplicaStatePtr & replica, ConnectionTimeoutDescriptorPtr timeout_descriptor)
2021-01-19 19:21:06 +00:00
{
2021-02-06 00:54:27 +00:00
epoll.remove(timeout_descriptor->timer.getDescriptor());
2021-02-08 13:08:15 +00:00
replica->active_timeouts.erase(timeout_descriptor->timer.getDescriptor());
timeout_fd_to_replica.erase(timeout_descriptor->timer.getDescriptor());
2021-01-19 19:21:06 +00:00
2021-02-06 00:54:27 +00:00
if (timeout_descriptor->type == ConnectionTimeoutType::RECEIVE_TIMEOUT)
2021-01-19 19:21:06 +00:00
{
finishProcessReplica(replica, true);
2021-02-06 00:54:27 +00:00
/// Check if there is no active connections with the same offset and there is no new replica in process.
2021-02-08 13:08:15 +00:00
if (offset_states[replica->offset].active_connection_count == 0 && !next_replica_in_process)
2021-01-19 19:21:06 +00:00
throw NetException("Receive timeout expired", ErrorCodes::SOCKET_TIMEOUT);
}
2021-02-06 00:54:27 +00:00
else if (timeout_descriptor->type == ConnectionTimeoutType::RECEIVE_DATA_TIMEOUT)
2021-01-19 19:21:06 +00:00
{
2021-02-08 13:08:15 +00:00
offsets_queue.push(replica->offset);
2021-02-06 00:54:27 +00:00
tryGetNewReplica(true);
2021-01-19 19:21:06 +00:00
}
}
2021-02-06 00:54:27 +00:00
void HedgedConnections::tryGetNewReplica(bool start_new_connection)
2021-01-19 19:21:06 +00:00
{
2021-02-06 00:54:27 +00:00
Connection * connection = nullptr;
2021-02-08 11:06:45 +00:00
HedgedConnectionsFactory::State state = hedged_connections_factory.getNextConnection(start_new_connection, false, connection);
2021-01-19 19:21:06 +00:00
2021-02-02 12:14:31 +00:00
/// Skip replicas that doesn't support two-level aggregation if we didn't disable it in sendQuery.
2021-02-06 00:54:27 +00:00
while (state == HedgedConnectionsFactory::State::READY && !disable_two_level_aggregation
&& connection->getServerRevision(hedged_connections_factory.getConnectionTimeouts())
< DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
2021-02-08 11:06:45 +00:00
state = hedged_connections_factory.getNextConnection(true, false, connection);
2021-02-06 00:54:27 +00:00
if (state == HedgedConnectionsFactory::State::READY)
{
2021-02-06 00:54:27 +00:00
size_t offset = offsets_queue.front();
offsets_queue.pop();
2021-02-06 00:54:27 +00:00
2021-02-08 13:08:15 +00:00
ReplicaStatePtr replica = std::make_shared<ReplicaState>();
replica->connection = connection;
replica->offset = offset;
int socket_fd = replica->connection->getSocket()->impl()->sockfd();
2021-02-06 00:54:27 +00:00
epoll.add(socket_fd);
2021-02-08 13:08:15 +00:00
fd_to_replica[socket_fd] = replica;
2021-02-08 13:16:09 +00:00
offset_states[offset].replicas.push_back(replica);
2021-02-06 00:54:27 +00:00
++offset_states[offset].active_connection_count;
++active_connection_count;
pipeline_for_new_replicas.run(replica);
}
2021-02-06 00:54:27 +00:00
else if (state == HedgedConnectionsFactory::State::NOT_READY && !next_replica_in_process)
{
2021-02-06 00:54:27 +00:00
epoll.add(hedged_connections_factory.getFileDescriptor());
next_replica_in_process = true;
}
2021-01-19 19:21:06 +00:00
2021-02-06 00:54:27 +00:00
/// Check if we cannot get new replica and there is no active replica with needed offsets.
else if (state == HedgedConnectionsFactory::State::CANNOT_CHOOSE)
{
while (!offsets_queue.empty())
{
if (offset_states[offsets_queue.front()].active_connection_count == 0)
throw Exception("Cannot find enough connections to replicas", ErrorCodes::ALL_CONNECTION_TRIES_FAILED);
offsets_queue.pop();
}
}
/// Check if we don't need to listen hedged_connections_factory file descriptor in epoll anymore.
if (next_replica_in_process && (state == HedgedConnectionsFactory::State::CANNOT_CHOOSE || offsets_queue.empty()))
{
2021-02-06 00:54:27 +00:00
epoll.remove(hedged_connections_factory.getFileDescriptor());
next_replica_in_process = false;
}
2021-01-19 19:21:06 +00:00
}
2021-02-08 13:08:15 +00:00
void HedgedConnections::finishProcessReplica(ReplicaStatePtr & replica, bool disconnect)
2021-01-19 19:21:06 +00:00
{
2021-02-06 00:54:27 +00:00
removeTimeoutsFromReplica(replica);
2021-02-08 13:08:15 +00:00
int socket_fd = replica->connection->getSocket()->impl()->sockfd();
2021-02-06 00:54:27 +00:00
epoll.remove(socket_fd);
2021-02-08 13:08:15 +00:00
--offset_states[replica->offset].active_connection_count;
fd_to_replica.erase(socket_fd);
2021-02-06 00:54:27 +00:00
--active_connection_count;
2021-01-19 19:21:06 +00:00
if (disconnect)
2021-02-08 13:08:15 +00:00
replica->connection->disconnect();
replica->connection = nullptr;
2021-02-06 00:54:27 +00:00
}
2021-02-08 13:08:15 +00:00
void HedgedConnections::addTimeoutToReplica(ConnectionTimeoutType type, ReplicaStatePtr & replica)
2021-02-06 00:54:27 +00:00
{
ConnectionTimeoutDescriptorPtr timeout_descriptor
= createConnectionTimeoutDescriptor(type, hedged_connections_factory.getConnectionTimeouts());
epoll.add(timeout_descriptor->timer.getDescriptor());
2021-02-08 13:08:15 +00:00
timeout_fd_to_replica[timeout_descriptor->timer.getDescriptor()]
= fd_to_replica[replica->connection->getSocket()->impl()->sockfd()];
replica->active_timeouts[timeout_descriptor->timer.getDescriptor()] = std::move(timeout_descriptor);
2021-02-06 00:54:27 +00:00
}
2021-02-08 13:08:15 +00:00
void HedgedConnections::removeTimeoutsFromReplica(ReplicaStatePtr & replica)
2021-02-06 00:54:27 +00:00
{
2021-02-08 13:08:15 +00:00
for (auto & [fd, _] : replica->active_timeouts)
2021-02-06 00:54:27 +00:00
{
epoll.remove(fd);
2021-02-08 13:08:15 +00:00
timeout_fd_to_replica.erase(fd);
2021-02-06 00:54:27 +00:00
}
2021-02-08 13:08:15 +00:00
replica->active_timeouts.clear();
2021-02-06 00:54:27 +00:00
}
2021-02-08 13:08:15 +00:00
void HedgedConnections::removeTimeoutFromReplica(ConnectionTimeoutType type, ReplicaStatePtr & replica)
2021-02-06 00:54:27 +00:00
{
auto it = std::find_if(
2021-02-08 13:08:15 +00:00
replica->active_timeouts.begin(), replica->active_timeouts.end(), [type](auto & value) { return value.second->type == type; });
2021-02-06 00:54:27 +00:00
2021-02-08 13:08:15 +00:00
if (it != replica->active_timeouts.end())
2021-02-06 00:54:27 +00:00
{
epoll.remove(it->first);
2021-02-08 13:08:15 +00:00
timeout_fd_to_replica.erase(it->first);
replica->active_timeouts.erase(it);
2021-02-06 00:54:27 +00:00
}
2021-01-19 19:21:06 +00:00
}
}
2021-01-29 15:46:28 +00:00
#endif