This commit is contained in:
Pavel Kruglov 2021-02-06 03:54:27 +03:00
parent 3fc8b294e8
commit 0704d3cf27
16 changed files with 938 additions and 861 deletions

View File

@ -329,7 +329,7 @@ std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> ConnectionPoolWithFa
return Base::getShuffledPools(max_ignored_errors, get_priority); return Base::getShuffledPools(max_ignored_errors, get_priority);
} }
TryGetConnection::TryGetConnection( ConnectionEstablisher::ConnectionEstablisher(
IConnectionPool * pool_, IConnectionPool * pool_,
const ConnectionTimeouts * timeouts_, const ConnectionTimeouts * timeouts_,
const Settings * settings_, const Settings * settings_,
@ -340,7 +340,7 @@ TryGetConnection::TryGetConnection(
{ {
} }
void TryGetConnection::reset() void ConnectionEstablisher::reset()
{ {
resetResult(); resetResult();
stage = Stage::CONNECT; stage = Stage::CONNECT;
@ -349,7 +349,7 @@ void TryGetConnection::reset()
fail_message.clear(); fail_message.clear();
} }
void TryGetConnection::resetResult() void ConnectionEstablisher::resetResult()
{ {
if (!result.entry.isNull()) if (!result.entry.isNull())
{ {
@ -358,7 +358,7 @@ void TryGetConnection::resetResult()
} }
} }
void TryGetConnection::processFail(bool add_description) void ConnectionEstablisher::processFail(bool add_description)
{ {
if (action_before_disconnect) if (action_before_disconnect)
action_before_disconnect(socket_fd); action_before_disconnect(socket_fd);
@ -371,7 +371,7 @@ void TryGetConnection::processFail(bool add_description)
stage = Stage::FAILED; stage = Stage::FAILED;
} }
void TryGetConnection::run() void ConnectionEstablisher::run()
{ {
try try
{ {

View File

@ -31,8 +31,8 @@ enum class PoolMode
GET_ALL GET_ALL
}; };
/// Class for establishing connection with replica without blocking. /// Class for establishing connection with replica without blocking using different stages.
class TryGetConnection class ConnectionEstablisher
{ {
public: public:
enum Stage enum Stage
@ -47,7 +47,7 @@ public:
using TryResult = PoolWithFailoverBase<IConnectionPool>::TryResult; using TryResult = PoolWithFailoverBase<IConnectionPool>::TryResult;
TryGetConnection(IConnectionPool * pool_, ConnectionEstablisher(IConnectionPool * pool_,
const ConnectionTimeouts * timeouts_, const ConnectionTimeouts * timeouts_,
const Settings * settings_, const Settings * settings_,
const QualifiedTableName * table_to_check = nullptr, const QualifiedTableName * table_to_check = nullptr,

View File

@ -1,491 +0,0 @@
#if defined(OS_LINUX)
#include <Client/GetHedgedConnections.h>
#include <Common/typeid_cast.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int ALL_CONNECTION_TRIES_FAILED;
extern const int ALL_REPLICAS_ARE_STALE;
}
GetHedgedConnections::GetHedgedConnections(
const ConnectionPoolWithFailoverPtr & pool_,
const Settings * settings_,
const ConnectionTimeouts & timeouts_,
std::shared_ptr<QualifiedTableName> table_to_check_)
: pool(pool_), settings(settings_), timeouts(timeouts_), table_to_check(table_to_check_), log(&Poco::Logger::get("GetHedgedConnections"))
{
shuffled_pools = pool->getShuffledPools(settings);
for (size_t i = 0; i != shuffled_pools.size(); ++i)
try_get_connections.emplace_back(shuffled_pools[i].pool, &timeouts, settings, table_to_check.get(), log);
max_tries
= (settings ? size_t{settings->connections_with_failover_max_tries} : size_t{DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES});
fallback_to_stale_replicas = settings ? settings->fallback_to_stale_replicas_for_distributed_queries : false;
entries_count = 0;
usable_count = 0;
failed_pools_count = 0;
}
GetHedgedConnections::~GetHedgedConnections()
{
pool->updateSharedError(shuffled_pools);
}
std::vector<GetHedgedConnections::ReplicaStatePtr> GetHedgedConnections::getManyConnections(PoolMode pool_mode)
{
size_t min_entries = (settings && settings->skip_unavailable_shards) ? 0 : 1;
size_t max_entries;
if (pool_mode == PoolMode::GET_ALL)
{
min_entries = shuffled_pools.size();
max_entries = shuffled_pools.size();
}
else if (pool_mode == PoolMode::GET_ONE)
max_entries = 1;
else if (pool_mode == PoolMode::GET_MANY)
max_entries = settings ? size_t(settings->max_parallel_replicas) : 1;
else
throw DB::Exception("Unknown pool allocation mode", DB::ErrorCodes::LOGICAL_ERROR);
std::vector<ReplicaStatePtr> replicas;
replicas.reserve(max_entries);
for (size_t i = 0; i != max_entries; ++i)
{
auto replica = getNextConnection(false);
if (replica->isCannotChoose())
{
if (replicas.size() >= min_entries)
break;
/// Determine the reason of not enough replicas.
if (!fallback_to_stale_replicas && usable_count >= min_entries)
throw DB::Exception(
"Could not find enough connections to up-to-date replicas. Got: " + std::to_string(replicas.size())
+ ", needed: " + std::to_string(min_entries),
DB::ErrorCodes::ALL_REPLICAS_ARE_STALE);
throw DB::NetException(
"All connection tries failed. Log: \n\n" + fail_messages + "\n",
DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED);
}
replicas.push_back(replica);
}
return replicas;
}
GetHedgedConnections::ReplicaStatePtr GetHedgedConnections::getNextConnection(bool non_blocking)
{
ReplicaStatePtr replica = createNewReplica();
int index;
/// Check if it's the first time.
if (epoll.empty() && ready_indexes.empty())
{
index = 0;
last_used_index = 0;
}
else
index = getNextIndex();
bool is_first = true;
while (index != -1 || !epoll.empty())
{
/// Prevent blocking after receiving timeout when there is no new replica to connect
/// (processEpollEvents can return EMPTY replica after timeout processing to start new connection).
if (index == -1 && !is_first && non_blocking)
{
replica->state = State::NOT_READY;
return replica;
}
if (is_first)
is_first = false;
if (index != -1)
{
Action action = startTryGetConnection(index, replica);
if (action == Action::FINISH)
return replica;
if (action == Action::TRY_NEXT_REPLICA)
{
index = getNextIndex();
continue;
}
if (action == Action::PROCESS_EPOLL_EVENTS && non_blocking)
return replica;
}
replica = processEpollEvents(non_blocking);
if (replica->isReady() || (replica->isNotReady() && non_blocking))
return replica;
index = getNextIndex();
}
/// We reach this point only if there was no free up to date replica.
/// We will try to use usable replica.
/// Check if we are not allowed to use usable replicas or there is no even a free usable replica.
if (!fallback_to_stale_replicas || !canGetNewConnection())
{
replica->state = State::CANNOT_CHOOSE;
return replica;
}
setBestUsableReplica(replica);
return replica;
}
void GetHedgedConnections::stopChoosingReplicas()
{
for (auto & [fd, replica] : fd_to_replica)
{
removeTimeoutsFromReplica(replica, epoll, timeout_fd_to_replica);
epoll.remove(fd);
try_get_connections[replica->index].reset();
replica->reset();
}
fd_to_replica.clear();
}
int GetHedgedConnections::getNextIndex()
{
/// Check if there is no free replica.
if (entries_count + indexes_in_process.size() + failed_pools_count >= shuffled_pools.size())
return -1;
bool finish = false;
int next_index = last_used_index;
while (!finish)
{
next_index = (next_index + 1) % shuffled_pools.size();
/// Check if we can try this replica.
if (indexes_in_process.find(next_index) == indexes_in_process.end() && (max_tries == 0 || shuffled_pools[next_index].error_count < max_tries)
&& try_get_connections[next_index].stage != TryGetConnection::Stage::FINISHED)
finish = true;
/// If we made a complete round, there is no replica to connect.
else if (next_index == last_used_index)
return -1;
}
last_used_index = next_index;
return next_index;
}
GetHedgedConnections::Action GetHedgedConnections::startTryGetConnection(int index, ReplicaStatePtr & replica)
{
TryGetConnection & try_get_connection = try_get_connections[index];
replica->state = State::NOT_READY;
replica->index = index;
indexes_in_process.insert(index);
try_get_connection.reset();
try_get_connection.run();
if (try_get_connection.stage != TryGetConnection::Stage::FAILED)
{
replica->fd = try_get_connection.socket_fd;
replica->connection = &*try_get_connection.result.entry;
}
Action action = processTryGetConnectionStage(replica);
if (action == Action::PROCESS_EPOLL_EVENTS)
{
epoll.add(try_get_connection.socket_fd);
fd_to_replica[try_get_connection.socket_fd] = replica;
try_get_connection.setActionBeforeDisconnect(
[&](int fd)
{
epoll.remove(fd);
fd_to_replica.erase(fd);
});
addTimeouts(replica);
}
return action;
}
GetHedgedConnections::Action
GetHedgedConnections::processTryGetConnectionStage(ReplicaStatePtr & replica, bool remove_from_epoll)
{
TryGetConnection & try_get_connection = try_get_connections[replica->index];
if (try_get_connection.stage == TryGetConnection::Stage::FINISHED)
{
indexes_in_process.erase(replica->index);
++entries_count;
if (remove_from_epoll)
{
epoll.remove(try_get_connection.socket_fd);
fd_to_replica.erase(try_get_connection.socket_fd);
}
if (try_get_connection.result.is_usable)
{
++usable_count;
if (try_get_connection.result.is_up_to_date)
{
replica->state = State::READY;
ready_indexes.insert(replica->index);
return Action::FINISH;
}
}
/// This replica is not up to date, we will try to find up to date.
replica->reset();
return Action::TRY_NEXT_REPLICA;
}
else if (try_get_connection.stage == TryGetConnection::Stage::FAILED)
{
processFailedConnection(replica);
return Action::TRY_NEXT_REPLICA;
}
/// Get connection process is not finished.
return Action::PROCESS_EPOLL_EVENTS;
}
void GetHedgedConnections::processFailedConnection(ReplicaStatePtr & replica)
{
ShuffledPool & shuffled_pool = shuffled_pools[replica->index];
LOG_WARNING(
log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), try_get_connections[replica->index].fail_message);
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry);
shuffled_pool.error_count = std::min(pool->getMaxErrorCup(), shuffled_pool.error_count + 1);
if (shuffled_pool.error_count >= max_tries)
{
++failed_pools_count;
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailAtAll);
}
std::string & fail_message = try_get_connections[replica->index].fail_message;
if (!fail_message.empty())
fail_messages += fail_message + "\n";
indexes_in_process.erase(replica->index);
replica->reset();
}
void GetHedgedConnections::addTimeouts(ReplicaStatePtr & replica)
{
addTimeoutToReplica(TimerTypes::RECEIVE_TIMEOUT, replica, epoll, timeout_fd_to_replica, timeouts);
auto stage = try_get_connections[replica->index].stage;
if (stage == TryGetConnection::Stage::RECEIVE_HELLO)
addTimeoutToReplica(TimerTypes::RECEIVE_HELLO_TIMEOUT, replica, epoll, timeout_fd_to_replica, timeouts);
else if (stage == TryGetConnection::Stage::RECEIVE_TABLES_STATUS)
addTimeoutToReplica(TimerTypes::RECEIVE_TABLES_STATUS_TIMEOUT, replica, epoll, timeout_fd_to_replica, timeouts);
}
GetHedgedConnections::ReplicaStatePtr GetHedgedConnections::processEpollEvents(bool non_blocking)
{
int event_fd;
ReplicaStatePtr replica = nullptr;
bool finish = false;
while (!finish)
{
event_fd = getReadyFileDescriptor();
if (fd_to_replica.find(event_fd) != fd_to_replica.end())
{
replica = fd_to_replica[event_fd];
finish = processReplicaEvent(replica, non_blocking);
}
else if (timeout_fd_to_replica.find(event_fd) != timeout_fd_to_replica.end())
{
replica = timeout_fd_to_replica[event_fd];
finish = processTimeoutEvent(replica, replica->active_timeouts[event_fd], non_blocking);
}
else
throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR);
}
return replica;
}
int GetHedgedConnections::getReadyFileDescriptor(AsyncCallback async_callback)
{
for (auto & [fd, replica] : fd_to_replica)
if (replica->connection->hasReadPendingData())
return replica->fd;
return epoll.getReady(std::move(async_callback)).data.fd;
}
bool GetHedgedConnections::processReplicaEvent(ReplicaStatePtr & replica, bool non_blocking)
{
removeTimeoutsFromReplica(replica, epoll, timeout_fd_to_replica);
try_get_connections[replica->index].run();
Action action = processTryGetConnectionStage(replica, true);
if (action == Action::PROCESS_EPOLL_EVENTS)
{
addTimeouts(replica);
return non_blocking;
}
return true;
}
bool GetHedgedConnections::processTimeoutEvent(ReplicaStatePtr & replica, TimerDescriptorPtr timeout_descriptor, bool non_blocking)
{
epoll.remove(timeout_descriptor->getDescriptor());
replica->active_timeouts.erase(timeout_descriptor->getDescriptor());
timeout_fd_to_replica[timeout_descriptor->getDescriptor()];
if (timeout_descriptor->getType() == TimerTypes::RECEIVE_TIMEOUT)
{
removeTimeoutsFromReplica(replica, epoll, timeout_fd_to_replica);
epoll.remove(replica->fd);
fd_to_replica.erase(replica->fd);
TryGetConnection & try_get_connection = try_get_connections[replica->index];
try_get_connection.fail_message = "Receive timeout expired (" + try_get_connection.result.entry->getDescription() + ")";
try_get_connection.resetResult();
try_get_connection.stage = TryGetConnection::Stage::FAILED;
processFailedConnection(replica);
return true;
}
else if ((timeout_descriptor->getType() == TimerTypes::RECEIVE_HELLO_TIMEOUT
|| timeout_descriptor->getType() == TimerTypes::RECEIVE_TABLES_STATUS_TIMEOUT)
&& entries_count + indexes_in_process.size() + failed_pools_count < shuffled_pools.size())
{
replica = createNewReplica();
return true;
}
return non_blocking;
}
void GetHedgedConnections::setBestUsableReplica(ReplicaStatePtr & replica)
{
std::vector<int> indexes(try_get_connections.size());
for (size_t i = 0; i != indexes.size(); ++i)
indexes[i] = i;
/// Remove unusable, failed replicas and replicas that are ready or in process.
indexes.erase(
std::remove_if(
indexes.begin(),
indexes.end(),
[&](int i)
{
return try_get_connections[i].result.entry.isNull() || !try_get_connections[i].result.is_usable ||
indexes_in_process.find(i) != indexes_in_process.end() || ready_indexes.find(i) != ready_indexes.end();
}),
indexes.end());
if (indexes.empty())
{
replica->state = State::CANNOT_CHOOSE;
return;
}
/// Sort replicas by staleness.
std::stable_sort(
indexes.begin(),
indexes.end(),
[&](size_t lhs, size_t rhs)
{
return try_get_connections[lhs].result.staleness < try_get_connections[rhs].result.staleness;
});
replica->index = indexes[0];
replica->connection = &*try_get_connections[indexes[0]].result.entry;
replica->state = State::READY;
replica->fd = replica->connection->getSocket()->impl()->sockfd();
ready_indexes.insert(replica->index);
}
void addTimeoutToReplica(
int type,
GetHedgedConnections::ReplicaStatePtr & replica,
Epoll & epoll,
std::unordered_map<int, GetHedgedConnections::ReplicaStatePtr> & timeout_fd_to_replica,
const ConnectionTimeouts & timeouts)
{
Poco::Timespan timeout;
switch (type)
{
case TimerTypes::RECEIVE_HELLO_TIMEOUT:
timeout = timeouts.receive_hello_timeout;
break;
case TimerTypes::RECEIVE_TABLES_STATUS_TIMEOUT:
timeout = timeouts.receive_tables_status_timeout;
break;
case TimerTypes::RECEIVE_DATA_TIMEOUT:
timeout = timeouts.receive_data_timeout;
break;
case TimerTypes::RECEIVE_TIMEOUT:
timeout = timeouts.receive_timeout;
break;
default:
throw Exception("Unknown timeout type", ErrorCodes::BAD_ARGUMENTS);
}
TimerDescriptorPtr timeout_descriptor = std::make_shared<TimerDescriptor>();
timeout_descriptor->setType(type);
timeout_descriptor->setRelative(timeout);
epoll.add(timeout_descriptor->getDescriptor());
timeout_fd_to_replica[timeout_descriptor->getDescriptor()] = replica;
replica->active_timeouts[timeout_descriptor->getDescriptor()] = std::move(timeout_descriptor);
}
void removeTimeoutsFromReplica(
GetHedgedConnections::ReplicaStatePtr & replica,
Epoll & epoll,
std::unordered_map<int, GetHedgedConnections::ReplicaStatePtr> & timeout_fd_to_replica)
{
for (auto & [fd, _] : replica->active_timeouts)
{
epoll.remove(fd);
timeout_fd_to_replica.erase(fd);
}
replica->active_timeouts.clear();
}
void removeTimeoutFromReplica(
int type,
GetHedgedConnections::ReplicaStatePtr & replica,
Epoll & epoll,
std::unordered_map<int, GetHedgedConnections::ReplicaStatePtr> & timeout_fd_to_replica)
{
auto it = std::find_if(
replica->active_timeouts.begin(),
replica->active_timeouts.end(),
[type](auto & value){ return value.second->getType() == type; }
);
if (it != replica->active_timeouts.end())
{
epoll.remove(it->first);
timeout_fd_to_replica.erase(it->first);
replica->active_timeouts.erase(it);
}
}
}
#endif

View File

@ -1,173 +0,0 @@
#pragma once
#if defined(OS_LINUX)
#include <Common/TimerDescriptor.h>
#include <Client/ConnectionPoolWithFailover.h>
#include <Core/Settings.h>
#include <Common/Epoll.h>
#include <unordered_map>
#include <memory>
namespace DB
{
using TimerDescriptorPtr = std::shared_ptr<TimerDescriptor>;
/// Class for establishing hedged connections with replicas.
/// It works with multiple replicas simultaneously without blocking by using epoll.
class GetHedgedConnections
{
public:
using ShuffledPool = ConnectionPoolWithFailover::Base::ShuffledPool;
enum State
{
EMPTY = 0,
READY = 1,
NOT_READY = 2,
CANNOT_CHOOSE = 3,
};
struct ReplicaState
{
Connection * connection = nullptr;
State state = State::EMPTY;
int index = -1;
int fd = -1;
size_t parallel_replica_offset = 0;
std::unordered_map<int, std::shared_ptr<TimerDescriptor>> active_timeouts;
void reset()
{
connection = nullptr;
state = State::EMPTY;
index = -1;
fd = -1;
parallel_replica_offset = 0;
active_timeouts.clear();
}
bool isReady() const { return state == State::READY; }
bool isNotReady() const { return state == State::NOT_READY; }
bool isEmpty() const { return state == State::EMPTY; }
bool isCannotChoose() const { return state == State::CANNOT_CHOOSE; }
};
using ReplicaStatePtr = std::shared_ptr<ReplicaState>;
GetHedgedConnections(const ConnectionPoolWithFailoverPtr & pool_,
const Settings * settings_,
const ConnectionTimeouts & timeouts_,
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr);
/// Create and return connections according to pool_mode.
std::vector<ReplicaStatePtr> getManyConnections(PoolMode pool_mode);
/// Try to establish connection to the new replica. If non_blocking is false, this function will block
/// until establishing connection to the new replica (returned replica state might be READY or CANNOT_CHOOSE).
/// If non_blocking is true, this function will try to establish connection to the new replica without blocking
/// (returned replica state might be READY, NOT_READY and CANNOT_CHOOSE).
ReplicaStatePtr getNextConnection(bool non_blocking);
/// Check if we can try to produce new READY replica.
bool canGetNewConnection() const { return ready_indexes.size() + failed_pools_count < shuffled_pools.size(); }
/// Stop working with all replicas that are not READY.
void stopChoosingReplicas();
bool hasEventsInProcess() const { return epoll.size() > 0; }
int getFileDescriptor() const { return epoll.getFileDescriptor(); }
const ConnectionTimeouts & getConnectionTimeouts() const { return timeouts; }
~GetHedgedConnections();
private:
enum Action
{
FINISH = 0,
PROCESS_EPOLL_EVENTS = 1,
TRY_NEXT_REPLICA = 2,
};
Action startTryGetConnection(int index, ReplicaStatePtr & replica);
Action processTryGetConnectionStage(ReplicaStatePtr & replica, bool remove_from_epoll = false);
/// Find an index of the next free replica to start connection.
/// Return -1 if there is no free replica.
int getNextIndex();
int getReadyFileDescriptor(AsyncCallback async_callback = {});
void addTimeouts(ReplicaStatePtr & replica);
void processFailedConnection(ReplicaStatePtr & replica);
void processReceiveTimeout(ReplicaStatePtr & replica);
bool processReplicaEvent(ReplicaStatePtr & replica, bool non_blocking);
bool processTimeoutEvent(ReplicaStatePtr & replica, TimerDescriptorPtr timeout_descriptor, bool non_blocking);
ReplicaStatePtr processEpollEvents(bool non_blocking = false);
void setBestUsableReplica(ReplicaStatePtr & replica);
ReplicaStatePtr createNewReplica() { return std::make_shared<ReplicaState>(); }
const ConnectionPoolWithFailoverPtr pool;
const Settings * settings;
const ConnectionTimeouts timeouts;
std::shared_ptr<QualifiedTableName> table_to_check;
std::vector<TryGetConnection> try_get_connections;
std::vector<ShuffledPool> shuffled_pools;
/// Map socket file descriptor to replica.
std::unordered_map<int, ReplicaStatePtr> fd_to_replica;
/// Map timeout file descriptor to replica.
std::unordered_map<int, ReplicaStatePtr> timeout_fd_to_replica;
/// Indexes of replicas, that are in process of connection.
std::unordered_set<int> indexes_in_process;
/// Indexes of ready replicas.
std::unordered_set<int> ready_indexes;
int last_used_index;
bool fallback_to_stale_replicas;
Epoll epoll;
Poco::Logger * log;
std::string fail_messages;
size_t entries_count;
size_t usable_count;
size_t failed_pools_count;
size_t max_tries;
};
/// Add timeout with particular type to replica and add it to epoll.
void addTimeoutToReplica(
int type,
GetHedgedConnections::ReplicaStatePtr & replica,
Epoll & epoll,
std::unordered_map<int, GetHedgedConnections::ReplicaStatePtr> & timeout_fd_to_replica,
const ConnectionTimeouts & timeouts);
/// Remove timeout with particular type from replica and epoll.
void removeTimeoutFromReplica(
int type,
GetHedgedConnections::ReplicaStatePtr & replica,
Epoll & epoll,
std::unordered_map<int, GetHedgedConnections::ReplicaStatePtr> & timeout_fd_to_replica);
/// Remove all timeouts from replica and epoll.
void removeTimeoutsFromReplica(
GetHedgedConnections::ReplicaStatePtr & replica,
Epoll & epoll,
std::unordered_map<int, GetHedgedConnections::ReplicaStatePtr> & timeout_fd_to_replica);
}
#endif

View File

@ -10,6 +10,7 @@ namespace ErrorCodes
extern const int MISMATCH_REPLICAS_DATA_SOURCES; extern const int MISMATCH_REPLICAS_DATA_SOURCES;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int SOCKET_TIMEOUT; extern const int SOCKET_TIMEOUT;
extern const int ALL_CONNECTION_TRIES_FAILED;
} }
HedgedConnections::HedgedConnections( HedgedConnections::HedgedConnections(
@ -19,29 +20,35 @@ HedgedConnections::HedgedConnections(
const ThrottlerPtr & throttler_, const ThrottlerPtr & throttler_,
PoolMode pool_mode, PoolMode pool_mode,
std::shared_ptr<QualifiedTableName> table_to_check_) std::shared_ptr<QualifiedTableName> table_to_check_)
: get_hedged_connections(pool_, &settings_, timeouts_, table_to_check_), settings(settings_), throttler(throttler_), log(&Poco::Logger::get("HedgedConnections")) : hedged_connections_factory(pool_, &settings_, timeouts_, table_to_check_)
, settings(settings_)
, throttler(throttler_)
, log(&Poco::Logger::get("HedgedConnections"))
{ {
std::vector<ReplicaStatePtr> replicas_states = get_hedged_connections.getManyConnections(pool_mode); std::vector<Connection *> connections = hedged_connections_factory.getManyConnections(pool_mode);
for (size_t i = 0; i != replicas_states.size(); ++i) ReplicaState replica;
for (size_t i = 0; i != connections.size(); ++i)
{ {
replicas_states[i]->parallel_replica_offset = i; replica.connection = connections[i];
replicas_states[i]->connection->setThrottler(throttler_); replica.connection->setThrottler(throttler_);
epoll.add(replicas_states[i]->fd); int socket_fd = replica.connection->getSocket()->impl()->sockfd();
fd_to_replica[replicas_states[i]->fd] = replicas_states[i]; epoll.add(socket_fd);
replicas.push_back({std::move(replicas_states[i])}); fd_to_replica_location[socket_fd] = ReplicaLocation{i, 0};
active_connections_count_by_offset[i] = 1; offset_states.push_back(OffsetState{{replica}, 1, false});
} }
pipeline_for_new_replicas.add([throttler_](ReplicaStatePtr & replica_){ replica_->connection->setThrottler(throttler_); }); active_connection_count = connections.size();
offsets_with_received_first_data_packet = 0;
pipeline_for_new_replicas.add([throttler_](ReplicaState & replica_) { replica_.connection->setThrottler(throttler_); });
} }
void HedgedConnections::Pipeline::add(std::function<void(ReplicaStatePtr & replica)> send_function) void HedgedConnections::Pipeline::add(std::function<void(ReplicaState & replica)> send_function)
{ {
pipeline.push_back(send_function); pipeline.push_back(send_function);
} }
void HedgedConnections::Pipeline::run(ReplicaStatePtr & replica) void HedgedConnections::Pipeline::run(ReplicaState & replica)
{ {
for (auto & send_func : pipeline) for (auto & send_func : pipeline)
send_func(replica); send_func(replica);
@ -54,11 +61,11 @@ void HedgedConnections::sendScalarsData(Scalars & data)
if (!sent_query) if (!sent_query)
throw Exception("Cannot send scalars data: query not yet sent.", ErrorCodes::LOGICAL_ERROR); throw Exception("Cannot send scalars data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
auto send_scalars_data = [&data](ReplicaStatePtr & replica) { replica->connection->sendScalarsData(data); }; auto send_scalars_data = [&data](ReplicaState & replica) { replica.connection->sendScalarsData(data); };
for (auto & replicas_with_same_offset : replicas) for (auto & offset_state : offset_states)
for (auto & replica : replicas_with_same_offset) for (auto & replica : offset_state.replicas)
if (replica->isReady()) if (replica.connection)
send_scalars_data(replica); send_scalars_data(replica);
pipeline_for_new_replicas.add(send_scalars_data); pipeline_for_new_replicas.add(send_scalars_data);
@ -74,11 +81,11 @@ void HedgedConnections::sendExternalTablesData(std::vector<ExternalTablesData> &
if (data.size() != size()) if (data.size() != size())
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
auto send_external_tables_data = [&data](ReplicaStatePtr & replica) { replica->connection->sendExternalTablesData(data[0]); }; auto send_external_tables_data = [&data](ReplicaState & replica) { replica.connection->sendExternalTablesData(data[0]); };
for (auto & replicas_with_same_offset : replicas) for (auto & offset_state : offset_states)
for (auto & replica : replicas_with_same_offset) for (auto & replica : offset_state.replicas)
if (replica->isReady()) if (replica.connection)
send_external_tables_data(replica); send_external_tables_data(replica);
pipeline_for_new_replicas.add(send_external_tables_data); pipeline_for_new_replicas.add(send_external_tables_data);
@ -97,11 +104,11 @@ void HedgedConnections::sendQuery(
if (sent_query) if (sent_query)
throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR); throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);
for (auto & replicas_with_same_offset : replicas) for (auto & offset_state : offset_states)
{ {
for (auto & replica : replicas_with_same_offset) for (auto & replica : offset_state.replicas)
{ {
if (replica->connection->getServerRevision(timeouts) < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD) if (replica.connection->getServerRevision(timeouts) < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
{ {
disable_two_level_aggregation = true; disable_two_level_aggregation = true;
break; break;
@ -111,30 +118,29 @@ void HedgedConnections::sendQuery(
break; break;
} }
auto send_query = [this, timeouts, query, query_id, stage, client_info, with_pending_data](ReplicaStatePtr & replica) auto send_query = [this, timeouts, query, query_id, stage, client_info, with_pending_data](ReplicaState & replica) {
{ Settings modified_settings = settings;
Settings modified_settings = this->settings;
if (this->disable_two_level_aggregation) if (disable_two_level_aggregation)
{ {
/// Disable two-level aggregation due to version incompatibility. /// Disable two-level aggregation due to version incompatibility.
modified_settings.group_by_two_level_threshold = 0; modified_settings.group_by_two_level_threshold = 0;
modified_settings.group_by_two_level_threshold_bytes = 0; modified_settings.group_by_two_level_threshold_bytes = 0;
} }
if (this->replicas.size() > 1) if (offset_states.size() > 1)
{ {
modified_settings.parallel_replicas_count = this->replicas.size(); modified_settings.parallel_replicas_count = offset_states.size();
modified_settings.parallel_replica_offset = replica->parallel_replica_offset; modified_settings.parallel_replica_offset = fd_to_replica_location[replica.connection->getSocket()->impl()->sockfd()].offset;
} }
replica->connection->sendQuery(timeouts, query, query_id, stage, &modified_settings, &client_info, with_pending_data); replica.connection->sendQuery(timeouts, query, query_id, stage, &modified_settings, &client_info, with_pending_data);
addTimeoutToReplica(TimerTypes::RECEIVE_TIMEOUT, replica, this->epoll, this->timeout_fd_to_replica, timeouts); addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_TIMEOUT, replica);
addTimeoutToReplica(TimerTypes::RECEIVE_DATA_TIMEOUT, replica, this->epoll, this->timeout_fd_to_replica, timeouts); addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_DATA_TIMEOUT, replica);
}; };
for (auto & replicas_with_same_offset : replicas) for (auto & offset_status : offset_states)
for (auto & replica : replicas_with_same_offset) for (auto & replica : offset_status.replicas)
send_query(replica); send_query(replica);
pipeline_for_new_replicas.add(send_query); pipeline_for_new_replicas.add(send_query);
@ -145,16 +151,20 @@ void HedgedConnections::disconnect()
{ {
std::lock_guard lock(cancel_mutex); std::lock_guard lock(cancel_mutex);
for (auto & replicas_with_same_offset : replicas) for (auto & offset_status : offset_states)
for (auto & replica : replicas_with_same_offset) for (auto & replica : offset_status.replicas)
if (replica->isReady()) if (replica.connection)
finishProcessReplica(replica, true); finishProcessReplica(replica, true);
if (get_hedged_connections.hasEventsInProcess()) if (hedged_connections_factory.hasEventsInProcess())
{ {
get_hedged_connections.stopChoosingReplicas();
if (next_replica_in_process) if (next_replica_in_process)
epoll.remove(get_hedged_connections.getFileDescriptor()); {
epoll.remove(hedged_connections_factory.getFileDescriptor());
next_replica_in_process = false;
}
hedged_connections_factory.stopChoosingReplicas();
} }
} }
@ -165,13 +175,13 @@ std::string HedgedConnections::dumpAddresses() const
std::string addresses; std::string addresses;
bool is_first = true; bool is_first = true;
for (const auto & replicas_with_same_offset : replicas) for (const auto & offset_state : offset_states)
{ {
for (const auto & replica : replicas_with_same_offset) for (const auto & replica : offset_state.replicas)
{ {
if (replica->isReady()) if (replica.connection)
{ {
addresses += (is_first ? "" : "; ") + replica->connection->getDescription(); addresses += (is_first ? "" : "; ") + replica.connection->getDescription();
is_first = false; is_first = false;
} }
} }
@ -187,15 +197,14 @@ void HedgedConnections::sendCancel()
if (!sent_query || cancelled) if (!sent_query || cancelled)
throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR); throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR);
for (auto & replicas_with_same_offset : replicas) for (auto & offset_status : offset_states)
for (auto & replica : replicas_with_same_offset) for (auto & replica : offset_status.replicas)
if (replica->isReady()) if (replica.connection)
replica->connection->sendCancel(); replica.connection->sendCancel();
cancelled = true; cancelled = true;
} }
Packet HedgedConnections::drain() Packet HedgedConnections::drain()
{ {
std::lock_guard lock(cancel_mutex); std::lock_guard lock(cancel_mutex);
@ -252,26 +261,24 @@ Packet HedgedConnections::receivePacketUnlocked(AsyncCallback async_callback)
Packet HedgedConnections::receivePacketImpl(AsyncCallback async_callback) Packet HedgedConnections::receivePacketImpl(AsyncCallback async_callback)
{ {
int event_fd; int event_fd;
ReplicaStatePtr replica = nullptr;
Packet packet; Packet packet;
bool finish = false; bool finish = false;
while (!finish) while (!finish)
{ {
event_fd = getReadyFileDescriptor(async_callback); event_fd = getReadyFileDescriptor(async_callback);
if (fd_to_replica.find(event_fd) != fd_to_replica.end()) if (fd_to_replica_location.contains(event_fd))
{ {
replica = fd_to_replica[event_fd]; packet = receivePacketFromReplica(fd_to_replica_location[event_fd], async_callback);
packet = receivePacketFromReplica(replica, async_callback);
finish = true; finish = true;
} }
else if (timeout_fd_to_replica.find(event_fd) != timeout_fd_to_replica.end()) else if (timeout_fd_to_replica_location.contains(event_fd))
{ {
replica = timeout_fd_to_replica[event_fd]; ReplicaLocation location = timeout_fd_to_replica_location[event_fd];
processTimeoutEvent(replica, replica->active_timeouts[event_fd]); processTimeoutEvent(location, offset_states[location.offset].replicas[location.index].active_timeouts[event_fd]);
} }
else if (event_fd == get_hedged_connections.getFileDescriptor()) else if (event_fd == hedged_connections_factory.getFileDescriptor())
tryGetNewReplica(); tryGetNewReplica(false);
else else
throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR); throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR);
} }
@ -281,30 +288,34 @@ Packet HedgedConnections::receivePacketImpl(AsyncCallback async_callback)
int HedgedConnections::getReadyFileDescriptor(AsyncCallback async_callback) int HedgedConnections::getReadyFileDescriptor(AsyncCallback async_callback)
{ {
for (auto & [fd, replica] : fd_to_replica) for (auto & [fd, location] : fd_to_replica_location)
if (replica->connection->hasReadPendingData()) {
return replica->fd; ReplicaState & replica = offset_states[location.offset].replicas[location.index];
if (replica.connection->hasReadPendingData())
return replica.connection->getSocket()->impl()->sockfd();
}
return epoll.getReady(std::move(async_callback)).data.fd; return epoll.getReady(true, std::move(async_callback)).data.fd;
} }
Packet HedgedConnections::receivePacketFromReplica(ReplicaStatePtr & replica, AsyncCallback async_callback) Packet HedgedConnections::receivePacketFromReplica(ReplicaLocation & replica_location, AsyncCallback async_callback)
{ {
Packet packet = replica->connection->receivePacket(std::move(async_callback)); ReplicaState & replica = offset_states[replica_location.offset].replicas[replica_location.index];
removeTimeoutFromReplica(ConnectionTimeoutType::RECEIVE_TIMEOUT, replica);
Packet packet = replica.connection->receivePacket(std::move(async_callback));
switch (packet.type) switch (packet.type)
{ {
case Protocol::Server::Data: case Protocol::Server::Data:
removeTimeoutsFromReplica(replica, epoll, timeout_fd_to_replica); if (!offset_states[replica_location.offset].first_packet_of_data_received)
processReceiveData(replica); processReceivedFirstDataPacket(replica_location);
addTimeoutToReplica(TimerTypes::RECEIVE_TIMEOUT, replica, epoll, timeout_fd_to_replica, get_hedged_connections.getConnectionTimeouts()); addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_TIMEOUT, replica);
break; break;
case Protocol::Server::Progress: case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo: case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals: case Protocol::Server::Totals:
case Protocol::Server::Extremes: case Protocol::Server::Extremes:
case Protocol::Server::Log: case Protocol::Server::Log:
removeTimeoutFromReplica(TimerTypes::RECEIVE_TIMEOUT, replica, epoll, timeout_fd_to_replica); addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_TIMEOUT, replica);
addTimeoutToReplica(TimerTypes::RECEIVE_TIMEOUT, replica, epoll, timeout_fd_to_replica, get_hedged_connections.getConnectionTimeouts());
break; break;
case Protocol::Server::EndOfStream: case Protocol::Server::EndOfStream:
@ -320,96 +331,155 @@ Packet HedgedConnections::receivePacketFromReplica(ReplicaStatePtr & replica, As
return packet; return packet;
} }
void HedgedConnections::processReceiveData(ReplicaStatePtr & replica) void HedgedConnections::processReceivedFirstDataPacket(ReplicaLocation & replica_location)
{ {
/// When we receive first packet of data from replica, we stop working with replicas, that are /// When we receive first packet of data from replica, we stop working with replicas, that are
/// responsible for the same offset. /// responsible for the same offset.
offsets_with_received_data.insert(replica->parallel_replica_offset); OffsetState & offset_state = offset_states[replica_location.offset];
removeTimeoutFromReplica(ConnectionTimeoutType::RECEIVE_DATA_TIMEOUT, offset_state.replicas[replica_location.index]);
++offsets_with_received_first_data_packet;
offset_state.first_packet_of_data_received = true;
for (auto & other_replica : replicas[replica->parallel_replica_offset]) for (size_t i = 0; i != offset_state.replicas.size(); ++i)
{ {
if (other_replica->isReady() && other_replica != replica) if (i != replica_location.index && offset_state.replicas[i].connection)
{ {
other_replica->connection->sendCancel(); offset_state.replicas[i].connection->sendCancel();
finishProcessReplica(other_replica, true); finishProcessReplica(offset_state.replicas[i], true);
} }
} }
/// If we received data from replicas with all offsets, we need to stop choosing new replicas. /// If we received data from replicas with all offsets, we need to stop choosing new replicas.
if (get_hedged_connections.hasEventsInProcess() && offsets_with_received_data.size() == replicas.size()) if (hedged_connections_factory.hasEventsInProcess() && offsets_with_received_first_data_packet == offset_states.size())
{ {
get_hedged_connections.stopChoosingReplicas();
if (next_replica_in_process) if (next_replica_in_process)
epoll.remove(get_hedged_connections.getFileDescriptor()); {
epoll.remove(hedged_connections_factory.getFileDescriptor());
next_replica_in_process = false;
}
hedged_connections_factory.stopChoosingReplicas();
} }
} }
void HedgedConnections::processTimeoutEvent(ReplicaStatePtr & replica, TimerDescriptorPtr timeout_descriptor) void HedgedConnections::processTimeoutEvent(ReplicaLocation & replica_location, ConnectionTimeoutDescriptorPtr timeout_descriptor)
{ {
epoll.remove(timeout_descriptor->getDescriptor()); ReplicaState & replica = offset_states[replica_location.offset].replicas[replica_location.index];
replica->active_timeouts.erase(timeout_descriptor->getDescriptor()); epoll.remove(timeout_descriptor->timer.getDescriptor());
timeout_fd_to_replica.erase(timeout_descriptor->getDescriptor()); replica.active_timeouts.erase(timeout_descriptor->timer.getDescriptor());
timeout_fd_to_replica_location.erase(timeout_descriptor->timer.getDescriptor());
if (timeout_descriptor->getType() == TimerTypes::RECEIVE_TIMEOUT) if (timeout_descriptor->type == ConnectionTimeoutType::RECEIVE_TIMEOUT)
{ {
size_t offset = replica->parallel_replica_offset;
finishProcessReplica(replica, true); finishProcessReplica(replica, true);
/// Check if there is no active connections with the same offset. /// Check if there is no active connections with the same offset and there is no new replica in process.
if (active_connections_count_by_offset[offset] == 0) if (offset_states[replica_location.offset].active_connection_count == 0 && !next_replica_in_process)
throw NetException("Receive timeout expired", ErrorCodes::SOCKET_TIMEOUT); throw NetException("Receive timeout expired", ErrorCodes::SOCKET_TIMEOUT);
} }
else if (timeout_descriptor->getType() == TimerTypes::RECEIVE_DATA_TIMEOUT) else if (timeout_descriptor->type == ConnectionTimeoutType::RECEIVE_DATA_TIMEOUT)
{ {
offsets_queue.push(replica->parallel_replica_offset); offsets_queue.push(replica_location.offset);
tryGetNewReplica(); tryGetNewReplica(true);
} }
} }
void HedgedConnections::tryGetNewReplica() void HedgedConnections::tryGetNewReplica(bool start_new_connection)
{ {
ReplicaStatePtr new_replica = get_hedged_connections.getNextConnection(/*non_blocking*/ true); Connection * connection = nullptr;
HedgedConnectionsFactory::State state = hedged_connections_factory.getNextConnection(start_new_connection, connection);
/// Skip replicas that doesn't support two-level aggregation if we didn't disable it in sendQuery. /// Skip replicas that doesn't support two-level aggregation if we didn't disable it in sendQuery.
while (new_replica->isReady() && !disable_two_level_aggregation while (state == HedgedConnectionsFactory::State::READY && !disable_two_level_aggregation
&& new_replica->connection->getServerRevision(get_hedged_connections.getConnectionTimeouts()) < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD) && connection->getServerRevision(hedged_connections_factory.getConnectionTimeouts())
new_replica = get_hedged_connections.getNextConnection(/*non_blocking*/ true); < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
state = hedged_connections_factory.getNextConnection(true, connection);
if (new_replica->isReady()) if (state == HedgedConnectionsFactory::State::READY)
{ {
new_replica->parallel_replica_offset = offsets_queue.front(); size_t offset = offsets_queue.front();
offsets_queue.pop(); offsets_queue.pop();
replicas[new_replica->parallel_replica_offset].push_back(new_replica); size_t index = offset_states[offset].replicas.size();
epoll.add(new_replica->fd);
fd_to_replica[new_replica->fd] = new_replica; ReplicaState replica;
++active_connections_count_by_offset[new_replica->parallel_replica_offset]; replica.connection = connection;
pipeline_for_new_replicas.run(new_replica); int socket_fd = replica.connection->getSocket()->impl()->sockfd();
epoll.add(socket_fd);
fd_to_replica_location[socket_fd] = ReplicaLocation{offset, index};
offset_states[offset].replicas.push_back(replica);
++offset_states[offset].active_connection_count;
++active_connection_count;
pipeline_for_new_replicas.run(replica);
} }
else if (new_replica->isNotReady() && !next_replica_in_process) else if (state == HedgedConnectionsFactory::State::NOT_READY && !next_replica_in_process)
{ {
epoll.add(get_hedged_connections.getFileDescriptor()); epoll.add(hedged_connections_factory.getFileDescriptor());
next_replica_in_process = true; next_replica_in_process = true;
} }
if (next_replica_in_process && (new_replica->isCannotChoose() || offsets_queue.empty())) /// Check if we cannot get new replica and there is no active replica with needed offsets.
else if (state == HedgedConnectionsFactory::State::CANNOT_CHOOSE)
{ {
epoll.remove(get_hedged_connections.getFileDescriptor()); while (!offsets_queue.empty())
{
if (offset_states[offsets_queue.front()].active_connection_count == 0)
throw Exception("Cannot find enough connections to replicas", ErrorCodes::ALL_CONNECTION_TRIES_FAILED);
offsets_queue.pop();
}
}
/// Check if we don't need to listen hedged_connections_factory file descriptor in epoll anymore.
if (next_replica_in_process && (state == HedgedConnectionsFactory::State::CANNOT_CHOOSE || offsets_queue.empty()))
{
epoll.remove(hedged_connections_factory.getFileDescriptor());
next_replica_in_process = false; next_replica_in_process = false;
} }
} }
void HedgedConnections::finishProcessReplica(ReplicaStatePtr & replica, bool disconnect) void HedgedConnections::finishProcessReplica(ReplicaState & replica, bool disconnect)
{ {
removeTimeoutsFromReplica(replica, epoll, timeout_fd_to_replica); removeTimeoutsFromReplica(replica);
epoll.remove(replica->fd); int socket_fd = replica.connection->getSocket()->impl()->sockfd();
fd_to_replica.erase(replica->fd); epoll.remove(socket_fd);
--active_connections_count_by_offset[replica->parallel_replica_offset]; --offset_states[fd_to_replica_location[socket_fd].offset].active_connection_count;
if (active_connections_count_by_offset[replica->parallel_replica_offset] == 0) fd_to_replica_location.erase(socket_fd);
active_connections_count_by_offset.erase(replica->parallel_replica_offset); --active_connection_count;
if (disconnect) if (disconnect)
replica->connection->disconnect(); replica.connection->disconnect();
replica->reset(); replica.connection = nullptr;
}
void HedgedConnections::addTimeoutToReplica(ConnectionTimeoutType type, ReplicaState & replica)
{
ConnectionTimeoutDescriptorPtr timeout_descriptor
= createConnectionTimeoutDescriptor(type, hedged_connections_factory.getConnectionTimeouts());
epoll.add(timeout_descriptor->timer.getDescriptor());
timeout_fd_to_replica_location[timeout_descriptor->timer.getDescriptor()]
= fd_to_replica_location[replica.connection->getSocket()->impl()->sockfd()];
replica.active_timeouts[timeout_descriptor->timer.getDescriptor()] = std::move(timeout_descriptor);
}
void HedgedConnections::removeTimeoutsFromReplica(ReplicaState & replica)
{
for (auto & [fd, _] : replica.active_timeouts)
{
epoll.remove(fd);
timeout_fd_to_replica_location.erase(fd);
}
replica.active_timeouts.clear();
}
void HedgedConnections::removeTimeoutFromReplica(ConnectionTimeoutType type, ReplicaState & replica)
{
auto it = std::find_if(
replica.active_timeouts.begin(), replica.active_timeouts.end(), [type](auto & value) { return value.second->type == type; });
if (it != replica.active_timeouts.end())
{
epoll.remove(it->first);
timeout_fd_to_replica_location.erase(it->first);
replica.active_timeouts.erase(it);
}
} }
} }

View File

@ -1,18 +1,41 @@
#pragma once #pragma once
#if defined(OS_LINUX) #if defined(OS_LINUX)
#include <Client/GetHedgedConnections.h>
#include <Client/IConnections.h>
#include <functional> #include <functional>
#include <queue> #include <queue>
#include <Client/HedgedConnectionsFactory.h>
#include <Client/IConnections.h>
namespace DB namespace DB
{ {
/** To receive data from multiple replicas (connections) from one shard asynchronously,
* The principe of Hedged Connections is used to reduce tail latency:
* (if we don't receive data from replica for a long time, we try to get new replica
* and send query to it, without cancelling working with previous replica). This class
* supports all functionality that MultipleConnections has.
*/
class HedgedConnections : public IConnections class HedgedConnections : public IConnections
{ {
public: public:
using ReplicaStatePtr = GetHedgedConnections::ReplicaStatePtr; struct ReplicaState
{
Connection * connection = nullptr;
std::unordered_map<int, ConnectionTimeoutDescriptorPtr> active_timeouts;
};
struct ReplicaLocation
{
size_t offset;
size_t index;
};
struct OffsetState
{
std::vector<ReplicaState> replicas;
size_t active_connection_count;
bool first_packet_of_data_received;
};
HedgedConnections(const ConnectionPoolWithFailoverPtr & pool_, HedgedConnections(const ConnectionPoolWithFailoverPtr & pool_,
const Settings & settings_, const Settings & settings_,
@ -45,57 +68,67 @@ public:
std::string dumpAddresses() const override; std::string dumpAddresses() const override;
size_t size() const override { return replicas.size(); } size_t size() const override { return offset_states.size(); }
bool hasActiveConnections() const override { return !active_connections_count_by_offset.empty(); } bool hasActiveConnections() const override { return active_connection_count > 0; }
private: private:
/// We will save actions with replicas in pipeline to perform them on the new replicas. /// We will save actions with replicas in pipeline to perform them on the new replicas.
class Pipeline class Pipeline
{ {
public: public:
void add(std::function<void(ReplicaStatePtr &)> send_function); void add(std::function<void(ReplicaState &)> send_function);
void run(ReplicaStatePtr & replica); void run(ReplicaState & replica);
private: private:
std::vector<std::function<void(ReplicaStatePtr &)>> pipeline; std::vector<std::function<void(ReplicaState &)>> pipeline;
}; };
Packet receivePacketFromReplica(ReplicaStatePtr & replica, AsyncCallback async_callback = {}); Packet receivePacketFromReplica(ReplicaLocation & replica_location, AsyncCallback async_callback = {});
Packet receivePacketImpl(AsyncCallback async_callback = {}); Packet receivePacketImpl(AsyncCallback async_callback = {});
void processReceiveData(ReplicaStatePtr & replica); void processReceivedFirstDataPacket(ReplicaLocation & replica_location);
void processTimeoutEvent(ReplicaStatePtr & replica, TimerDescriptorPtr timeout_descriptor); void processTimeoutEvent(ReplicaLocation & replica_location, ConnectionTimeoutDescriptorPtr timeout_descriptor);
void tryGetNewReplica(); void tryGetNewReplica(bool start_new_connection);
void finishProcessReplica(ReplicaStatePtr & replica, bool disconnect); void finishProcessReplica(ReplicaState & replica, bool disconnect);
int getReadyFileDescriptor(AsyncCallback async_callback = {}); int getReadyFileDescriptor(AsyncCallback async_callback = {});
GetHedgedConnections get_hedged_connections; void addTimeoutToReplica(ConnectionTimeoutType type, ReplicaState & replica);
/// All replicas in replicas[offset] are responsible for process query void removeTimeoutsFromReplica(ReplicaState & replica);
void removeTimeoutFromReplica(ConnectionTimeoutType type, ReplicaState & replica);
HedgedConnectionsFactory hedged_connections_factory;
/// All replicas in offset_states[offset] is responsible for process query
/// with setting parallel_replica_offset = offset. In common situations /// with setting parallel_replica_offset = offset. In common situations
/// replicas[offset].size() = 1 (like in MultiplexedConnections). /// replica_states[offset].replicas.size() = 1 (like in MultiplexedConnections).
std::vector<std::vector<ReplicaStatePtr>> replicas; std::vector<OffsetState> offset_states;
/// Map socket file descriptor to replica. /// Map socket file descriptor to replica location (it's offset and index in OffsetState.replicas).
std::unordered_map<int, ReplicaStatePtr> fd_to_replica; std::unordered_map<int, ReplicaLocation> fd_to_replica_location;
/// Map timeout file descriptor to replica. /// Map timeout file descriptor to replica location (it's offset and index in OffsetState.replicas).
std::unordered_map<int, ReplicaStatePtr> timeout_fd_to_replica; std::unordered_map<int, ReplicaLocation> timeout_fd_to_replica_location;
/// A queue of offsets for new replicas. When we get RECEIVE_DATA_TIMEOUT from /// A queue of offsets for new replicas. When we get RECEIVE_DATA_TIMEOUT from
/// the replica, we push it's offset to this queue and start trying to get /// the replica, we push it's offset to this queue and start trying to get
/// new replica. /// new replica.
std::queue<int> offsets_queue; std::queue<int> offsets_queue;
/// Map offset to amount of active connections, responsible to this offset. /// The current number of valid connections to the replicas of this shard.
std::unordered_map<size_t, size_t> active_connections_count_by_offset; size_t active_connection_count;
std::unordered_set<size_t> offsets_with_received_data; /// We count offsets which received first packet of data,
/// it's needed to cancel choosing new replicas when all offsets
/// received their first packet of data.
size_t offsets_with_received_first_data_packet;
Pipeline pipeline_for_new_replicas; Pipeline pipeline_for_new_replicas;
@ -103,8 +136,8 @@ private:
/// If we didn't disabled it, we need to skip this replica. /// If we didn't disabled it, we need to skip this replica.
bool disable_two_level_aggregation = false; bool disable_two_level_aggregation = false;
/// next_replica_in_process is true when get_hedged_connections.getFileDescriptor() /// This flag means we need to get connection with new replica, but no replica is ready.
/// is in epoll now and false otherwise. /// When it's true, hedged_connections_factory.getFileDescriptor() is in epoll.
bool next_replica_in_process = false; bool next_replica_in_process = false;
Epoll epoll; Epoll epoll;

View File

@ -0,0 +1,475 @@
#if defined(OS_LINUX)
#include <Client/HedgedConnectionsFactory.h>
#include <Common/typeid_cast.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int ALL_CONNECTION_TRIES_FAILED;
extern const int ALL_REPLICAS_ARE_STALE;
}
HedgedConnectionsFactory::HedgedConnectionsFactory(
const ConnectionPoolWithFailoverPtr & pool_,
const Settings * settings_,
const ConnectionTimeouts & timeouts_,
std::shared_ptr<QualifiedTableName> table_to_check_)
: pool(pool_), settings(settings_), timeouts(timeouts_), table_to_check(table_to_check_), log(&Poco::Logger::get("HedgedConnectionsFactory"))
{
shuffled_pools = pool->getShuffledPools(settings);
for (size_t i = 0; i != shuffled_pools.size(); ++i)
connection_establishers.emplace_back(shuffled_pools[i].pool, &timeouts, settings, table_to_check.get(), log);
max_tries
= (settings ? size_t{settings->connections_with_failover_max_tries} : size_t{DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES});
fallback_to_stale_replicas = settings && settings->fallback_to_stale_replicas_for_distributed_queries;
entries_count = 0;
usable_count = 0;
failed_pools_count = 0;
}
HedgedConnectionsFactory::~HedgedConnectionsFactory()
{
pool->updateSharedError(shuffled_pools);
}
std::vector<Connection *> HedgedConnectionsFactory::getManyConnections(PoolMode pool_mode)
{
size_t min_entries = (settings && settings->skip_unavailable_shards) ? 0 : 1;
size_t max_entries;
if (pool_mode == PoolMode::GET_ALL)
{
min_entries = shuffled_pools.size();
max_entries = shuffled_pools.size();
}
else if (pool_mode == PoolMode::GET_ONE)
max_entries = 1;
else if (pool_mode == PoolMode::GET_MANY)
max_entries = settings ? size_t(settings->max_parallel_replicas) : 1;
else
throw DB::Exception("Unknown pool allocation mode", DB::ErrorCodes::LOGICAL_ERROR);
std::vector<Connection *> connections;
connections.reserve(max_entries);
/// Try to start establishing connections with max_entries replicas.
int index;
for (size_t i = 0; i != max_entries; ++i)
{
index = getNextIndex();
if (index == -1)
break;
ReplicaStatePtr replica = startEstablishingConnection(index);
if (replica->state == State::READY)
connections.push_back(replica->connection);
}
/// Process connections until we get enough READY connections
/// (work asynchronously with all connections we started).
Connection * connection = nullptr;
while (connections.size() < max_entries)
{
auto state = processConnections(true, connection);
if (state == State::READY)
connections.push_back(connection);
else if (state == State::CANNOT_CHOOSE)
{
if (connections.size() >= min_entries)
break;
/// Determine the reason of not enough replicas.
if (!fallback_to_stale_replicas && usable_count >= min_entries)
throw DB::Exception(
"Could not find enough connections to up-to-date replicas. Got: " + std::to_string(connections.size())
+ ", needed: " + std::to_string(min_entries),
DB::ErrorCodes::ALL_REPLICAS_ARE_STALE);
throw DB::NetException(
"All connection tries failed. Log: \n\n" + fail_messages + "\n",
DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED);
}
}
return connections;
}
HedgedConnectionsFactory::State HedgedConnectionsFactory::getNextConnection(bool start_new_connection, Connection *& connection_out)
{
if (start_new_connection)
{
/// Try to start establishing connection to the new replica.
int index = getNextIndex();
if (index != -1)
{
ReplicaStatePtr replica = startEstablishingConnection(index);
if (replica->state == State::READY)
{
connection_out = replica->connection;
return State::READY;
}
}
}
return processConnections(false, connection_out);
}
HedgedConnectionsFactory::State HedgedConnectionsFactory::processConnections(bool blocking, Connection *& connection_out)
{
ReplicaStatePtr replica = nullptr;
int index = -1;
while (index != -1 || !epoll.empty())
{
if (index != -1)
{
replica = startEstablishingConnection(index);
if (replica->state == State::READY)
{
connection_out = replica->connection;
return State::READY;
}
}
if (!processEpollEvents(replica, blocking))
return State::NOT_READY;
if (replica->state == State::READY)
{
connection_out = replica->connection;
return State::READY;
}
index = getNextIndex();
}
/// We reach this point only if there was no free up to date replica.
/// We will try to use usable replica.
/// Check if we are not allowed to use usable replicas or there is no even a free usable replica.
if (!fallback_to_stale_replicas || !canGetNewConnection())
return State::CANNOT_CHOOSE;
setBestUsableReplica(replica);
connection_out = replica->connection;
return replica->state;
}
void HedgedConnectionsFactory::stopChoosingReplicas()
{
for (auto & [fd, replica] : fd_to_replica)
{
removeTimeoutsFromReplica(replica);
epoll.remove(fd);
connection_establishers[replica->index].reset();
replica->reset();
}
fd_to_replica.clear();
}
int HedgedConnectionsFactory::getNextIndex()
{
/// Check if there is no free replica.
if (entries_count + indexes_in_process.size() + failed_pools_count >= shuffled_pools.size())
return -1;
/// Check if it's the first time.
if (last_used_index == -1)
{
last_used_index = 0;
return 0;
}
bool finish = false;
int next_index = last_used_index;
while (!finish)
{
next_index = (next_index + 1) % shuffled_pools.size();
/// Check if we can try this replica.
if (indexes_in_process.find(next_index) == indexes_in_process.end() && (max_tries == 0 || shuffled_pools[next_index].error_count < max_tries)
&& connection_establishers[next_index].stage != ConnectionEstablisher::Stage::FINISHED)
finish = true;
/// If we made a complete round, there is no replica to connect.
else if (next_index == last_used_index)
return -1;
}
last_used_index = next_index;
return next_index;
}
HedgedConnectionsFactory::ReplicaStatePtr HedgedConnectionsFactory::startEstablishingConnection(int index)
{
ReplicaStatePtr replica = createNewReplica();
do
{
ConnectionEstablisher & connection_establisher = connection_establishers[index];
replica->state = State::NOT_READY;
replica->index = index;
indexes_in_process.insert(index);
connection_establisher.reset();
connection_establisher.run();
if (connection_establisher.stage != ConnectionEstablisher::Stage::FAILED)
replica->connection = &*connection_establisher.result.entry;
processConnectionEstablisherStage(replica);
if (replica->state == State::NOT_READY)
{
epoll.add(connection_establisher.socket_fd);
fd_to_replica[connection_establisher.socket_fd] = replica;
connection_establisher.setActionBeforeDisconnect([&](int fd) {
epoll.remove(fd);
fd_to_replica.erase(fd);
});
addTimeouts(replica);
}
}
while (replica->state == State::EMPTY && (index = getNextIndex()) != -1);
return replica;
}
void HedgedConnectionsFactory::processConnectionEstablisherStage(ReplicaStatePtr & replica, bool remove_from_epoll)
{
ConnectionEstablisher & connection_establisher = connection_establishers[replica->index];
if (connection_establisher.stage == ConnectionEstablisher::Stage::FINISHED)
{
indexes_in_process.erase(replica->index);
++entries_count;
if (remove_from_epoll)
{
epoll.remove(connection_establisher.socket_fd);
fd_to_replica.erase(connection_establisher.socket_fd);
}
if (connection_establisher.result.is_usable)
{
++usable_count;
if (connection_establisher.result.is_up_to_date)
{
replica->state = State::READY;
ready_indexes.insert(replica->index);
return;
}
}
/// This replica is not up to date, we will try to find up to date.
replica->reset();
}
else if (connection_establisher.stage == ConnectionEstablisher::Stage::FAILED)
processFailedConnection(replica);
}
void HedgedConnectionsFactory::processFailedConnection(ReplicaStatePtr & replica)
{
ShuffledPool & shuffled_pool = shuffled_pools[replica->index];
LOG_WARNING(
log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), connection_establishers[replica->index].fail_message);
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry);
shuffled_pool.error_count = std::min(pool->getMaxErrorCup(), shuffled_pool.error_count + 1);
if (shuffled_pool.error_count >= max_tries)
{
++failed_pools_count;
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailAtAll);
}
std::string & fail_message = connection_establishers[replica->index].fail_message;
if (!fail_message.empty())
fail_messages += fail_message + "\n";
indexes_in_process.erase(replica->index);
replica->reset();
}
void HedgedConnectionsFactory::addTimeouts(ReplicaStatePtr & replica)
{
addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_TIMEOUT, replica);
auto stage = connection_establishers[replica->index].stage;
if (stage == ConnectionEstablisher::Stage::RECEIVE_HELLO)
addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_HELLO_TIMEOUT, replica);
else if (stage == ConnectionEstablisher::Stage::RECEIVE_TABLES_STATUS)
addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_TABLES_STATUS_TIMEOUT, replica);
}
void HedgedConnectionsFactory::addTimeoutToReplica(ConnectionTimeoutType type, ReplicaStatePtr & replica)
{
ConnectionTimeoutDescriptorPtr timeout_descriptor = createConnectionTimeoutDescriptor(type, timeouts);
epoll.add(timeout_descriptor->timer.getDescriptor());
timeout_fd_to_replica[timeout_descriptor->timer.getDescriptor()] = replica;
replica->active_timeouts[timeout_descriptor->timer.getDescriptor()] = std::move(timeout_descriptor);
}
void HedgedConnectionsFactory::removeTimeoutsFromReplica(ReplicaStatePtr & replica)
{
for (auto & [fd, _] : replica->active_timeouts)
{
epoll.remove(fd);
timeout_fd_to_replica.erase(fd);
}
replica->active_timeouts.clear();
}
bool HedgedConnectionsFactory::processEpollEvents(ReplicaStatePtr & replica, bool blocking)
{
int event_fd;
bool finish = false;
while (!finish)
{
event_fd = getReadyFileDescriptor(blocking);
/// Check if there is no events.
if (event_fd == -1)
return false;
if (fd_to_replica.find(event_fd) != fd_to_replica.end())
{
replica = fd_to_replica[event_fd];
processReplicaEvent(replica);
/// Check if replica is ready or we need to try next replica.
if (replica->state == State::READY || replica->state == State::EMPTY)
finish = true;
}
else if (timeout_fd_to_replica.find(event_fd) != timeout_fd_to_replica.end())
{
replica = timeout_fd_to_replica[event_fd];
processTimeoutEvent(replica, replica->active_timeouts[event_fd]);
/// Check if we need to try next replica.
if (replica->state == State::EMPTY)
finish = true;
}
else
throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR);
}
return true;
}
int HedgedConnectionsFactory::getReadyFileDescriptor(bool blocking)
{
for (auto & [fd, replica] : fd_to_replica)
if (replica->connection->hasReadPendingData())
return replica->connection->getSocket()->impl()->sockfd();
return epoll.getReady(/* blocking */blocking).data.fd;
}
void HedgedConnectionsFactory::processReplicaEvent(ReplicaStatePtr & replica)
{
removeTimeoutsFromReplica(replica);
connection_establishers[replica->index].run();
processConnectionEstablisherStage(replica, true);
if (replica->state == State::NOT_READY)
addTimeouts(replica);
}
void HedgedConnectionsFactory::processTimeoutEvent(ReplicaStatePtr & replica, ConnectionTimeoutDescriptorPtr timeout_descriptor)
{
epoll.remove(timeout_descriptor->timer.getDescriptor());
replica->active_timeouts.erase(timeout_descriptor->timer.getDescriptor());
timeout_fd_to_replica[timeout_descriptor->timer.getDescriptor()];
if (timeout_descriptor->type == ConnectionTimeoutType::RECEIVE_TIMEOUT)
{
removeTimeoutsFromReplica(replica);
int fd = replica->connection->getSocket()->impl()->sockfd();
epoll.remove(fd);
fd_to_replica.erase(fd);
ConnectionEstablisher & connection_establisher = connection_establishers[replica->index];
connection_establisher.fail_message = "Receive timeout expired (" + connection_establisher.result.entry->getDescription() + ")";
connection_establisher.resetResult();
connection_establisher.stage = ConnectionEstablisher::Stage::FAILED;
processFailedConnection(replica);
}
else if ((timeout_descriptor->type == ConnectionTimeoutType::RECEIVE_HELLO_TIMEOUT
|| timeout_descriptor->type == ConnectionTimeoutType::RECEIVE_TABLES_STATUS_TIMEOUT)
&& entries_count + indexes_in_process.size() + failed_pools_count < shuffled_pools.size())
replica = createNewReplica();
}
void HedgedConnectionsFactory::setBestUsableReplica(ReplicaStatePtr & replica)
{
std::vector<int> indexes(connection_establishers.size());
for (size_t i = 0; i != indexes.size(); ++i)
indexes[i] = i;
/// Remove unusable, failed replicas and replicas that are ready or in process.
indexes.erase(
std::remove_if(
indexes.begin(),
indexes.end(),
[&](int i)
{
return connection_establishers[i].result.entry.isNull() || !connection_establishers[i].result.is_usable ||
indexes_in_process.find(i) != indexes_in_process.end() || ready_indexes.find(i) != ready_indexes.end();
}),
indexes.end());
if (indexes.empty())
{
replica->state = State::CANNOT_CHOOSE;
return;
}
/// Sort replicas by staleness.
std::stable_sort(
indexes.begin(),
indexes.end(),
[&](size_t lhs, size_t rhs)
{
return connection_establishers[lhs].result.staleness < connection_establishers[rhs].result.staleness;
});
replica->index = indexes[0];
replica->connection = &*connection_establishers[indexes[0]].result.entry;
replica->state = State::READY;
ready_indexes.insert(replica->index);
}
ConnectionTimeoutDescriptorPtr createConnectionTimeoutDescriptor(ConnectionTimeoutType type, const ConnectionTimeouts & timeouts)
{
Poco::Timespan timeout;
switch (type)
{
case ConnectionTimeoutType::RECEIVE_HELLO_TIMEOUT:
timeout = timeouts.receive_hello_timeout;
break;
case ConnectionTimeoutType::RECEIVE_TABLES_STATUS_TIMEOUT:
timeout = timeouts.receive_tables_status_timeout;
break;
case ConnectionTimeoutType::RECEIVE_DATA_TIMEOUT:
timeout = timeouts.receive_data_timeout;
break;
case ConnectionTimeoutType::RECEIVE_TIMEOUT:
timeout = timeouts.receive_timeout;
break;
}
ConnectionTimeoutDescriptorPtr timeout_descriptor = std::make_shared<ConnectionTimeoutDescriptor>();
timeout_descriptor->type = type;
timeout_descriptor->timer.setRelative(timeout);
return timeout_descriptor;
}
}
#endif

View File

@ -0,0 +1,167 @@
#pragma once
#if defined(OS_LINUX)
#include <Common/TimerDescriptor.h>
#include <Client/ConnectionPoolWithFailover.h>
#include <Core/Settings.h>
#include <Common/Epoll.h>
#include <unordered_map>
#include <memory>
namespace DB
{
enum class ConnectionTimeoutType
{
RECEIVE_HELLO_TIMEOUT,
RECEIVE_TABLES_STATUS_TIMEOUT,
RECEIVE_DATA_TIMEOUT,
RECEIVE_TIMEOUT,
};
struct ConnectionTimeoutDescriptor
{
ConnectionTimeoutType type;
TimerDescriptor timer;
};
using ConnectionTimeoutDescriptorPtr = std::shared_ptr<ConnectionTimeoutDescriptor>;
using TimerDescriptorPtr = std::shared_ptr<TimerDescriptor>;
/** Class for establishing hedged connections with replicas.
* The process of establishing connection is divided on stages, on each stage if
* replica doesn't respond for a long time, we start establishing connection with
* the next replica, without cancelling working with previous one.
* It works with multiple replicas simultaneously without blocking by using epoll.
*/
class HedgedConnectionsFactory
{
public:
using ShuffledPool = ConnectionPoolWithFailover::Base::ShuffledPool;
enum class State
{
EMPTY = 0,
READY = 1,
NOT_READY = 2,
CANNOT_CHOOSE = 3,
};
struct ReplicaState
{
Connection * connection = nullptr;
size_t index = -1;
State state = State::EMPTY;
std::unordered_map<int, ConnectionTimeoutDescriptorPtr> active_timeouts;
void reset()
{
connection = nullptr;
index = -1;
state = State::EMPTY;
active_timeouts.clear();
}
};
using ReplicaStatePtr = std::shared_ptr<ReplicaState>;
HedgedConnectionsFactory(const ConnectionPoolWithFailoverPtr & pool_,
const Settings * settings_,
const ConnectionTimeouts & timeouts_,
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr);
/// Create and return active connections according to pool_mode.
std::vector<Connection *> getManyConnections(PoolMode pool_mode);
/// Try to get connection to the new replica without blocking. If start_new_connection is true, we start establishing connection
/// with the new replica and then call processConnections, otherwise just call processConnections.
State getNextConnection(bool start_new_connection, Connection *& connection_out);
/// Process all current events in epoll (connections, timeouts), if there is no events in epoll and blocking is false,
/// return NOT_READY. Returned state might be READY, NOT_READY and CANNOT_CHOOSE.
/// If state is READY, replica connection will be written in connection_out.
State processConnections(bool blocking, Connection *& connection_out);
/// Check if we can try to produce new READY replica.
bool canGetNewConnection() const { return ready_indexes.size() + failed_pools_count < shuffled_pools.size(); }
/// Stop working with all replicas that are not READY.
void stopChoosingReplicas();
bool hasEventsInProcess() const { return epoll.size() > 0; }
int getFileDescriptor() const { return epoll.getFileDescriptor(); }
const ConnectionTimeouts & getConnectionTimeouts() const { return timeouts; }
~HedgedConnectionsFactory();
private:
ReplicaStatePtr startEstablishingConnection(int index);
void processConnectionEstablisherStage(ReplicaStatePtr & replica, bool remove_from_epoll = false);
/// Find an index of the next free replica to start connection.
/// Return -1 if there is no free replica.
int getNextIndex();
int getReadyFileDescriptor(bool blocking);
void addTimeouts(ReplicaStatePtr & replica);
void addTimeoutToReplica(ConnectionTimeoutType type, ReplicaStatePtr & replica);
void removeTimeoutsFromReplica(ReplicaStatePtr & replica);
void processFailedConnection(ReplicaStatePtr & replica);
void processReceiveTimeout(ReplicaStatePtr & replica);
void processReplicaEvent(ReplicaStatePtr & replica);
void processTimeoutEvent(ReplicaStatePtr & replica, ConnectionTimeoutDescriptorPtr timeout_descriptor);
/// Return false if there is no ready events, return true if replica is ready
/// or we need to try next replica.
bool processEpollEvents(ReplicaStatePtr & replica, bool blocking);
void setBestUsableReplica(ReplicaStatePtr & replica);
ReplicaStatePtr createNewReplica() { return std::make_shared<ReplicaState>(); }
const ConnectionPoolWithFailoverPtr pool;
const Settings * settings;
const ConnectionTimeouts timeouts;
std::shared_ptr<QualifiedTableName> table_to_check;
std::vector<ConnectionEstablisher> connection_establishers;
std::vector<ShuffledPool> shuffled_pools;
std::vector<ReplicaState> replica_states;
/// Map socket file descriptor to replica.
std::unordered_map<int, ReplicaStatePtr> fd_to_replica;
/// Map timeout file descriptor to replica.
std::unordered_map<int, ReplicaStatePtr> timeout_fd_to_replica;
/// Indexes of replicas, that are in process of connection.
std::unordered_set<int> indexes_in_process;
/// Indexes of ready replicas.
std::unordered_set<int> ready_indexes;
int last_used_index = -1;
bool fallback_to_stale_replicas;
Epoll epoll;
Poco::Logger * log;
std::string fail_messages;
size_t entries_count;
size_t usable_count;
size_t failed_pools_count;
size_t max_tries;
};
/// Create ConnectionTimeoutDescriptor with particular type.
ConnectionTimeoutDescriptorPtr createConnectionTimeoutDescriptor(ConnectionTimeoutType type, const ConnectionTimeouts & timeouts);
}
#endif

View File

@ -12,8 +12,8 @@ PEERDIR(
SRCS( SRCS(
Connection.cpp Connection.cpp
ConnectionPoolWithFailover.cpp ConnectionPoolWithFailover.cpp
GetHedgedConnections.cpp
HedgedConnections.cpp HedgedConnections.cpp
HedgedConnectionsFactory.cpp
MultiplexedConnections.cpp MultiplexedConnections.cpp
TimeoutSetter.cpp TimeoutSetter.cpp

View File

@ -46,24 +46,24 @@ void Epoll::remove(int fd)
--events_count; --events_count;
} }
epoll_event Epoll::getReady(AsyncCallback async_callback) const epoll_event Epoll::getReady(bool blocking, AsyncCallback async_callback) const
{ {
std::vector<epoll_event> events = getManyReady(1, true, std::move(async_callback)); epoll_event event;
if (events.empty()) event.data.fd = -1;
throw Exception("Vector of ready events is empty", ErrorCodes::LOGICAL_ERROR); size_t ready_events_count = getManyReady(1, &event, blocking, std::move(async_callback));
if (ready_events_count > 1)
throw Exception("Returned amount of events cannot be more than 1.", ErrorCodes::LOGICAL_ERROR);
return events[0]; return event;
} }
std::vector<epoll_event> Epoll::getManyReady(int max_events, bool blocking, AsyncCallback async_callback) const size_t Epoll::getManyReady(int max_events, epoll_event * events_out, bool blocking, AsyncCallback async_callback) const
{ {
std::vector<epoll_event> events(max_events);
int ready_size = 0; int ready_size = 0;
int timeout = blocking && !async_callback ? -1 : 0; int timeout = blocking && !async_callback ? -1 : 0;
while (ready_size <= 0 && (ready_size != 0 || blocking)) do
{ {
ready_size = epoll_wait(epoll_fd, events.data(), max_events, timeout); ready_size = epoll_wait(epoll_fd, events_out, max_events, timeout);
if (ready_size == -1 && errno != EINTR) if (ready_size == -1 && errno != EINTR)
throwFromErrno("Error in epoll_wait", DB::ErrorCodes::EPOLL_ERROR); throwFromErrno("Error in epoll_wait", DB::ErrorCodes::EPOLL_ERROR);
@ -71,9 +71,9 @@ std::vector<epoll_event> Epoll::getManyReady(int max_events, bool blocking, Asyn
if (ready_size == 0 && blocking && async_callback) if (ready_size == 0 && blocking && async_callback)
async_callback(epoll_fd, 0, "epoll"); async_callback(epoll_fd, 0, "epoll");
} }
while (ready_size <= 0 && (ready_size != 0 || blocking));
events.resize(ready_size); return ready_size;
return events;
} }
Epoll::~Epoll() Epoll::~Epoll()

View File

@ -16,20 +16,22 @@ class Epoll : boost::noncopyable
public: public:
Epoll(); Epoll();
/// Add new file descriptor to epoll. /// Add new file descriptor to epoll. If ptr set to nullptr, epoll_event.data.fd = fd,
/// otherwise epoll_event.data.ptr = ptr.
void add(int fd, void * ptr = nullptr); void add(int fd, void * ptr = nullptr);
/// Remove file descriptor to epoll. /// Remove file descriptor to epoll.
void remove(int fd); void remove(int fd);
/// Get events from epoll. If blocking is false and there are no ready events, /// Get events from epoll. Events are written in events_out, this function returns an amount of ready events.
/// If blocking is false and there are no ready events,
/// return empty vector, otherwise wait for ready events. If blocking is true, /// return empty vector, otherwise wait for ready events. If blocking is true,
/// async_callback is given and there is no ready events, async_callback is called /// async_callback is given and there is no ready events, async_callback is called
/// with epoll file descriptor. /// with epoll file descriptor.
std::vector<epoll_event> getManyReady(int max_events, bool blocking, AsyncCallback async_callback = {}) const; size_t getManyReady(int max_events, epoll_event * events_out, bool blocking, AsyncCallback async_callback = {}) const;
/// Get only one ready event, this function is always blocking. /// Get only one ready event, if blocking is false and there is no ready events, epoll_event.data.fd will be set to -1.
epoll_event getReady(AsyncCallback async_callback = {}) const; epoll_event getReady(bool blocking = true, AsyncCallback async_callback = {}) const;
int getFileDescriptor() const { return epoll_fd; } int getFileDescriptor() const { return epoll_fd; }

View File

@ -5,21 +5,11 @@
namespace DB namespace DB
{ {
enum TimerTypes
{
DEFAULT,
RECEIVE_HELLO_TIMEOUT,
RECEIVE_TABLES_STATUS_TIMEOUT,
RECEIVE_DATA_TIMEOUT,
RECEIVE_TIMEOUT,
};
/// Wrapper over timerfd. /// Wrapper over timerfd.
class TimerDescriptor class TimerDescriptor
{ {
private: private:
int timer_fd; int timer_fd;
int type = TimerTypes::DEFAULT;
public: public:
explicit TimerDescriptor(int clockid = CLOCK_MONOTONIC, int flags = 0); explicit TimerDescriptor(int clockid = CLOCK_MONOTONIC, int flags = 0);
@ -31,12 +21,10 @@ public:
TimerDescriptor & operator=(TimerDescriptor &&) = default; TimerDescriptor & operator=(TimerDescriptor &&) = default;
int getDescriptor() const { return timer_fd; } int getDescriptor() const { return timer_fd; }
int getType() const { return type; }
void reset() const; void reset() const;
void drain() const; void drain() const;
void setRelative(const Poco::Timespan & timespan) const; void setRelative(const Poco::Timespan & timespan) const;
void setType(int type_) { type = type_; }
}; };
} }

View File

@ -121,19 +121,22 @@ bool RemoteQueryExecutorReadContext::checkTimeout() const
bool RemoteQueryExecutorReadContext::checkTimeoutImpl() const bool RemoteQueryExecutorReadContext::checkTimeoutImpl() const
{ {
/// Wait for epoll will not block if it was polled externally. /// Wait for epoll will not block if it was polled externally.
std::vector<epoll_event> events = epoll.getManyReady(epoll.size(), /* blocking = */ false); epoll_event events[3];
events[0].data.fd = events[1].data.fd = events[2].data.fd = -1;
epoll.getManyReady(3, events,/* blocking = */ false);
bool is_socket_ready = false; bool is_socket_ready = false;
bool is_pipe_alarmed = false; bool is_pipe_alarmed = false;
bool has_timer_alarm = false; bool has_timer_alarm = false;
for (const auto & event : events) for (int i = 0; i < 3; ++i)
{ {
if (event.data.fd == connection_fd) if (events[i].data.fd == connection_fd)
is_socket_ready = true; is_socket_ready = true;
if (event.data.fd == timer.getDescriptor()) if (events[i].data.fd == timer.getDescriptor())
has_timer_alarm = true; has_timer_alarm = true;
if (event.data.fd == pipe_fd[0]) if (events[i].data.fd == pipe_fd[0])
is_pipe_alarmed = true; is_pipe_alarmed = true;
} }
@ -198,7 +201,7 @@ void RemoteQueryExecutorReadContext::cancel()
RemoteQueryExecutorReadContext::~RemoteQueryExecutorReadContext() RemoteQueryExecutorReadContext::~RemoteQueryExecutorReadContext()
{ {
/// connection_fd is closed by Poco::Net::Socket /// connection_fd is closed by Poco::Net::Socket or Epoll
if (pipe_fd[0] != -1) if (pipe_fd[0] != -1)
close(pipe_fd[0]); close(pipe_fd[0]);
if (pipe_fd[1] != -1) if (pipe_fd[1] != -1)

View File

@ -33,9 +33,9 @@ struct ConnectionTimeouts
tcp_keep_alive_timeout(0), tcp_keep_alive_timeout(0),
http_keep_alive_timeout(0), http_keep_alive_timeout(0),
secure_connection_timeout(connection_timeout), secure_connection_timeout(connection_timeout),
receive_hello_timeout(0), receive_hello_timeout(receive_timeout_),
receive_tables_status_timeout(0), receive_tables_status_timeout(receive_timeout_),
receive_data_timeout(0) receive_data_timeout(receive_timeout_)
{ {
} }
@ -49,9 +49,9 @@ struct ConnectionTimeouts
tcp_keep_alive_timeout(tcp_keep_alive_timeout_), tcp_keep_alive_timeout(tcp_keep_alive_timeout_),
http_keep_alive_timeout(0), http_keep_alive_timeout(0),
secure_connection_timeout(connection_timeout), secure_connection_timeout(connection_timeout),
receive_hello_timeout(0), receive_hello_timeout(receive_timeout_),
receive_tables_status_timeout(0), receive_tables_status_timeout(receive_timeout_),
receive_data_timeout(0) receive_data_timeout(receive_timeout_)
{ {
} }
ConnectionTimeouts(const Poco::Timespan & connection_timeout_, ConnectionTimeouts(const Poco::Timespan & connection_timeout_,
@ -65,9 +65,9 @@ struct ConnectionTimeouts
tcp_keep_alive_timeout(tcp_keep_alive_timeout_), tcp_keep_alive_timeout(tcp_keep_alive_timeout_),
http_keep_alive_timeout(http_keep_alive_timeout_), http_keep_alive_timeout(http_keep_alive_timeout_),
secure_connection_timeout(connection_timeout), secure_connection_timeout(connection_timeout),
receive_hello_timeout(0), receive_hello_timeout(receive_timeout_),
receive_tables_status_timeout(0), receive_tables_status_timeout(receive_timeout_),
receive_data_timeout(0) receive_data_timeout(receive_timeout_)
{ {
} }

View File

@ -14,7 +14,6 @@ namespace ProfileEvents
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int NETWORK_ERROR; extern const int NETWORK_ERROR;
@ -42,7 +41,7 @@ bool ReadBufferFromPocoSocket::nextImpl()
/// Note that receive timeout is not checked here. External code should check it while polling. /// Note that receive timeout is not checked here. External code should check it while polling.
while (bytes_read < 0 && async_callback && errno == EAGAIN) while (bytes_read < 0 && async_callback && errno == EAGAIN)
{ {
async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), "socket (" + socket.peerAddress().toString() + ")"); async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), socket_description);
bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), internal_buffer.size(), flags); bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), internal_buffer.size(), flags);
} }
} }
@ -74,7 +73,10 @@ bool ReadBufferFromPocoSocket::nextImpl()
} }
ReadBufferFromPocoSocket::ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size) ReadBufferFromPocoSocket::ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size)
: BufferWithOwnMemory<ReadBuffer>(buf_size), socket(socket_), peer_address(socket.peerAddress()) : BufferWithOwnMemory<ReadBuffer>(buf_size)
, socket(socket_)
, peer_address(socket.peerAddress())
, socket_description("socket (" + peer_address.toString() + ")")
{ {
} }

View File

@ -34,6 +34,7 @@ public:
private: private:
AsyncCallback async_callback; AsyncCallback async_callback;
std::string socket_description;
}; };
} }