From 0704d3cf27239ec0aa07ee88f256ccc40b891b7e Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Sat, 6 Feb 2021 03:54:27 +0300 Subject: [PATCH] Refactor --- src/Client/ConnectionPoolWithFailover.cpp | 10 +- src/Client/ConnectionPoolWithFailover.h | 6 +- src/Client/GetHedgedConnections.cpp | 491 ------------------ src/Client/GetHedgedConnections.h | 173 ------ src/Client/HedgedConnections.cpp | 300 +++++++---- src/Client/HedgedConnections.h | 85 ++- src/Client/HedgedConnectionsFactory.cpp | 475 +++++++++++++++++ src/Client/HedgedConnectionsFactory.h | 167 ++++++ src/Client/ya.make | 2 +- src/Common/Epoll.cpp | 24 +- src/Common/Epoll.h | 12 +- src/Common/TimerDescriptor.h | 12 - .../RemoteQueryExecutorReadContext.cpp | 15 +- src/IO/ConnectionTimeouts.h | 18 +- src/IO/ReadBufferFromPocoSocket.cpp | 8 +- src/IO/ReadBufferFromPocoSocket.h | 1 + 16 files changed, 938 insertions(+), 861 deletions(-) delete mode 100644 src/Client/GetHedgedConnections.cpp delete mode 100644 src/Client/GetHedgedConnections.h create mode 100644 src/Client/HedgedConnectionsFactory.cpp create mode 100644 src/Client/HedgedConnectionsFactory.h diff --git a/src/Client/ConnectionPoolWithFailover.cpp b/src/Client/ConnectionPoolWithFailover.cpp index 3e41c26fb65..15344b3b18b 100644 --- a/src/Client/ConnectionPoolWithFailover.cpp +++ b/src/Client/ConnectionPoolWithFailover.cpp @@ -329,7 +329,7 @@ std::vector ConnectionPoolWithFa return Base::getShuffledPools(max_ignored_errors, get_priority); } -TryGetConnection::TryGetConnection( +ConnectionEstablisher::ConnectionEstablisher( IConnectionPool * pool_, const ConnectionTimeouts * timeouts_, const Settings * settings_, @@ -340,7 +340,7 @@ TryGetConnection::TryGetConnection( { } -void TryGetConnection::reset() +void ConnectionEstablisher::reset() { resetResult(); stage = Stage::CONNECT; @@ -349,7 +349,7 @@ void TryGetConnection::reset() fail_message.clear(); } -void TryGetConnection::resetResult() +void ConnectionEstablisher::resetResult() { if (!result.entry.isNull()) { @@ -358,7 +358,7 @@ void TryGetConnection::resetResult() } } -void TryGetConnection::processFail(bool add_description) +void ConnectionEstablisher::processFail(bool add_description) { if (action_before_disconnect) action_before_disconnect(socket_fd); @@ -371,7 +371,7 @@ void TryGetConnection::processFail(bool add_description) stage = Stage::FAILED; } -void TryGetConnection::run() +void ConnectionEstablisher::run() { try { diff --git a/src/Client/ConnectionPoolWithFailover.h b/src/Client/ConnectionPoolWithFailover.h index a6c0b9e8070..44b06e871ec 100644 --- a/src/Client/ConnectionPoolWithFailover.h +++ b/src/Client/ConnectionPoolWithFailover.h @@ -31,8 +31,8 @@ enum class PoolMode GET_ALL }; -/// Class for establishing connection with replica without blocking. -class TryGetConnection +/// Class for establishing connection with replica without blocking using different stages. +class ConnectionEstablisher { public: enum Stage @@ -47,7 +47,7 @@ public: using TryResult = PoolWithFailoverBase::TryResult; - TryGetConnection(IConnectionPool * pool_, + ConnectionEstablisher(IConnectionPool * pool_, const ConnectionTimeouts * timeouts_, const Settings * settings_, const QualifiedTableName * table_to_check = nullptr, diff --git a/src/Client/GetHedgedConnections.cpp b/src/Client/GetHedgedConnections.cpp deleted file mode 100644 index 093b4bc930c..00000000000 --- a/src/Client/GetHedgedConnections.cpp +++ /dev/null @@ -1,491 +0,0 @@ -#if defined(OS_LINUX) - -#include -#include - - -namespace DB -{ -namespace ErrorCodes -{ - extern const int BAD_ARGUMENTS; - extern const int LOGICAL_ERROR; - extern const int ALL_CONNECTION_TRIES_FAILED; - extern const int ALL_REPLICAS_ARE_STALE; -} - -GetHedgedConnections::GetHedgedConnections( - const ConnectionPoolWithFailoverPtr & pool_, - const Settings * settings_, - const ConnectionTimeouts & timeouts_, - std::shared_ptr table_to_check_) - : pool(pool_), settings(settings_), timeouts(timeouts_), table_to_check(table_to_check_), log(&Poco::Logger::get("GetHedgedConnections")) -{ - shuffled_pools = pool->getShuffledPools(settings); - for (size_t i = 0; i != shuffled_pools.size(); ++i) - try_get_connections.emplace_back(shuffled_pools[i].pool, &timeouts, settings, table_to_check.get(), log); - - max_tries - = (settings ? size_t{settings->connections_with_failover_max_tries} : size_t{DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES}); - - fallback_to_stale_replicas = settings ? settings->fallback_to_stale_replicas_for_distributed_queries : false; - entries_count = 0; - usable_count = 0; - failed_pools_count = 0; -} - -GetHedgedConnections::~GetHedgedConnections() -{ - pool->updateSharedError(shuffled_pools); -} - -std::vector GetHedgedConnections::getManyConnections(PoolMode pool_mode) -{ - size_t min_entries = (settings && settings->skip_unavailable_shards) ? 0 : 1; - - size_t max_entries; - if (pool_mode == PoolMode::GET_ALL) - { - min_entries = shuffled_pools.size(); - max_entries = shuffled_pools.size(); - } - else if (pool_mode == PoolMode::GET_ONE) - max_entries = 1; - else if (pool_mode == PoolMode::GET_MANY) - max_entries = settings ? size_t(settings->max_parallel_replicas) : 1; - else - throw DB::Exception("Unknown pool allocation mode", DB::ErrorCodes::LOGICAL_ERROR); - - std::vector replicas; - replicas.reserve(max_entries); - for (size_t i = 0; i != max_entries; ++i) - { - auto replica = getNextConnection(false); - if (replica->isCannotChoose()) - { - if (replicas.size() >= min_entries) - break; - - /// Determine the reason of not enough replicas. - if (!fallback_to_stale_replicas && usable_count >= min_entries) - throw DB::Exception( - "Could not find enough connections to up-to-date replicas. Got: " + std::to_string(replicas.size()) - + ", needed: " + std::to_string(min_entries), - DB::ErrorCodes::ALL_REPLICAS_ARE_STALE); - - throw DB::NetException( - "All connection tries failed. Log: \n\n" + fail_messages + "\n", - DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED); - } - - replicas.push_back(replica); - } - - return replicas; -} - -GetHedgedConnections::ReplicaStatePtr GetHedgedConnections::getNextConnection(bool non_blocking) -{ - ReplicaStatePtr replica = createNewReplica(); - int index; - - /// Check if it's the first time. - if (epoll.empty() && ready_indexes.empty()) - { - index = 0; - last_used_index = 0; - } - else - index = getNextIndex(); - - bool is_first = true; - - while (index != -1 || !epoll.empty()) - { - /// Prevent blocking after receiving timeout when there is no new replica to connect - /// (processEpollEvents can return EMPTY replica after timeout processing to start new connection). - if (index == -1 && !is_first && non_blocking) - { - replica->state = State::NOT_READY; - return replica; - } - - if (is_first) - is_first = false; - - if (index != -1) - { - Action action = startTryGetConnection(index, replica); - - if (action == Action::FINISH) - return replica; - - if (action == Action::TRY_NEXT_REPLICA) - { - index = getNextIndex(); - continue; - } - - if (action == Action::PROCESS_EPOLL_EVENTS && non_blocking) - return replica; - } - - replica = processEpollEvents(non_blocking); - if (replica->isReady() || (replica->isNotReady() && non_blocking)) - return replica; - - index = getNextIndex(); - } - - /// We reach this point only if there was no free up to date replica. - /// We will try to use usable replica. - - /// Check if we are not allowed to use usable replicas or there is no even a free usable replica. - if (!fallback_to_stale_replicas || !canGetNewConnection()) - { - replica->state = State::CANNOT_CHOOSE; - return replica; - } - - setBestUsableReplica(replica); - return replica; -} - -void GetHedgedConnections::stopChoosingReplicas() -{ - for (auto & [fd, replica] : fd_to_replica) - { - removeTimeoutsFromReplica(replica, epoll, timeout_fd_to_replica); - epoll.remove(fd); - try_get_connections[replica->index].reset(); - replica->reset(); - } - - fd_to_replica.clear(); -} - -int GetHedgedConnections::getNextIndex() -{ - /// Check if there is no free replica. - if (entries_count + indexes_in_process.size() + failed_pools_count >= shuffled_pools.size()) - return -1; - - bool finish = false; - int next_index = last_used_index; - while (!finish) - { - next_index = (next_index + 1) % shuffled_pools.size(); - - /// Check if we can try this replica. - if (indexes_in_process.find(next_index) == indexes_in_process.end() && (max_tries == 0 || shuffled_pools[next_index].error_count < max_tries) - && try_get_connections[next_index].stage != TryGetConnection::Stage::FINISHED) - finish = true; - - /// If we made a complete round, there is no replica to connect. - else if (next_index == last_used_index) - return -1; - } - - last_used_index = next_index; - return next_index; -} - -GetHedgedConnections::Action GetHedgedConnections::startTryGetConnection(int index, ReplicaStatePtr & replica) -{ - TryGetConnection & try_get_connection = try_get_connections[index]; - - replica->state = State::NOT_READY; - replica->index = index; - indexes_in_process.insert(index); - - try_get_connection.reset(); - try_get_connection.run(); - - if (try_get_connection.stage != TryGetConnection::Stage::FAILED) - { - replica->fd = try_get_connection.socket_fd; - replica->connection = &*try_get_connection.result.entry; - } - - Action action = processTryGetConnectionStage(replica); - - if (action == Action::PROCESS_EPOLL_EVENTS) - { - epoll.add(try_get_connection.socket_fd); - fd_to_replica[try_get_connection.socket_fd] = replica; - try_get_connection.setActionBeforeDisconnect( - [&](int fd) - { - epoll.remove(fd); - fd_to_replica.erase(fd); - }); - addTimeouts(replica); - } - - return action; -} - -GetHedgedConnections::Action -GetHedgedConnections::processTryGetConnectionStage(ReplicaStatePtr & replica, bool remove_from_epoll) -{ - TryGetConnection & try_get_connection = try_get_connections[replica->index]; - - if (try_get_connection.stage == TryGetConnection::Stage::FINISHED) - { - indexes_in_process.erase(replica->index); - ++entries_count; - - if (remove_from_epoll) - { - epoll.remove(try_get_connection.socket_fd); - fd_to_replica.erase(try_get_connection.socket_fd); - } - - if (try_get_connection.result.is_usable) - { - ++usable_count; - if (try_get_connection.result.is_up_to_date) - { - replica->state = State::READY; - ready_indexes.insert(replica->index); - return Action::FINISH; - } - } - - /// This replica is not up to date, we will try to find up to date. - replica->reset(); - return Action::TRY_NEXT_REPLICA; - } - else if (try_get_connection.stage == TryGetConnection::Stage::FAILED) - { - processFailedConnection(replica); - return Action::TRY_NEXT_REPLICA; - } - - /// Get connection process is not finished. - return Action::PROCESS_EPOLL_EVENTS; -} - -void GetHedgedConnections::processFailedConnection(ReplicaStatePtr & replica) -{ - ShuffledPool & shuffled_pool = shuffled_pools[replica->index]; - LOG_WARNING( - log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), try_get_connections[replica->index].fail_message); - ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry); - - shuffled_pool.error_count = std::min(pool->getMaxErrorCup(), shuffled_pool.error_count + 1); - - if (shuffled_pool.error_count >= max_tries) - { - ++failed_pools_count; - ProfileEvents::increment(ProfileEvents::DistributedConnectionFailAtAll); - } - - std::string & fail_message = try_get_connections[replica->index].fail_message; - if (!fail_message.empty()) - fail_messages += fail_message + "\n"; - - indexes_in_process.erase(replica->index); - replica->reset(); -} - -void GetHedgedConnections::addTimeouts(ReplicaStatePtr & replica) -{ - addTimeoutToReplica(TimerTypes::RECEIVE_TIMEOUT, replica, epoll, timeout_fd_to_replica, timeouts); - - auto stage = try_get_connections[replica->index].stage; - if (stage == TryGetConnection::Stage::RECEIVE_HELLO) - addTimeoutToReplica(TimerTypes::RECEIVE_HELLO_TIMEOUT, replica, epoll, timeout_fd_to_replica, timeouts); - else if (stage == TryGetConnection::Stage::RECEIVE_TABLES_STATUS) - addTimeoutToReplica(TimerTypes::RECEIVE_TABLES_STATUS_TIMEOUT, replica, epoll, timeout_fd_to_replica, timeouts); -} - -GetHedgedConnections::ReplicaStatePtr GetHedgedConnections::processEpollEvents(bool non_blocking) -{ - int event_fd; - ReplicaStatePtr replica = nullptr; - bool finish = false; - while (!finish) - { - event_fd = getReadyFileDescriptor(); - - if (fd_to_replica.find(event_fd) != fd_to_replica.end()) - { - replica = fd_to_replica[event_fd]; - finish = processReplicaEvent(replica, non_blocking); - } - else if (timeout_fd_to_replica.find(event_fd) != timeout_fd_to_replica.end()) - { - replica = timeout_fd_to_replica[event_fd]; - finish = processTimeoutEvent(replica, replica->active_timeouts[event_fd], non_blocking); - } - else - throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR); - } - - return replica; -} - -int GetHedgedConnections::getReadyFileDescriptor(AsyncCallback async_callback) -{ - for (auto & [fd, replica] : fd_to_replica) - if (replica->connection->hasReadPendingData()) - return replica->fd; - - return epoll.getReady(std::move(async_callback)).data.fd; -} - -bool GetHedgedConnections::processReplicaEvent(ReplicaStatePtr & replica, bool non_blocking) -{ - removeTimeoutsFromReplica(replica, epoll, timeout_fd_to_replica); - try_get_connections[replica->index].run(); - Action action = processTryGetConnectionStage(replica, true); - if (action == Action::PROCESS_EPOLL_EVENTS) - { - addTimeouts(replica); - return non_blocking; - } - - return true; -} - -bool GetHedgedConnections::processTimeoutEvent(ReplicaStatePtr & replica, TimerDescriptorPtr timeout_descriptor, bool non_blocking) -{ - epoll.remove(timeout_descriptor->getDescriptor()); - replica->active_timeouts.erase(timeout_descriptor->getDescriptor()); - timeout_fd_to_replica[timeout_descriptor->getDescriptor()]; - - if (timeout_descriptor->getType() == TimerTypes::RECEIVE_TIMEOUT) - { - removeTimeoutsFromReplica(replica, epoll, timeout_fd_to_replica); - epoll.remove(replica->fd); - fd_to_replica.erase(replica->fd); - - TryGetConnection & try_get_connection = try_get_connections[replica->index]; - try_get_connection.fail_message = "Receive timeout expired (" + try_get_connection.result.entry->getDescription() + ")"; - try_get_connection.resetResult(); - try_get_connection.stage = TryGetConnection::Stage::FAILED; - processFailedConnection(replica); - - return true; - } - else if ((timeout_descriptor->getType() == TimerTypes::RECEIVE_HELLO_TIMEOUT - || timeout_descriptor->getType() == TimerTypes::RECEIVE_TABLES_STATUS_TIMEOUT) - && entries_count + indexes_in_process.size() + failed_pools_count < shuffled_pools.size()) - { - replica = createNewReplica(); - return true; - } - - return non_blocking; -} - -void GetHedgedConnections::setBestUsableReplica(ReplicaStatePtr & replica) -{ - std::vector indexes(try_get_connections.size()); - for (size_t i = 0; i != indexes.size(); ++i) - indexes[i] = i; - - /// Remove unusable, failed replicas and replicas that are ready or in process. - indexes.erase( - std::remove_if( - indexes.begin(), - indexes.end(), - [&](int i) - { - return try_get_connections[i].result.entry.isNull() || !try_get_connections[i].result.is_usable || - indexes_in_process.find(i) != indexes_in_process.end() || ready_indexes.find(i) != ready_indexes.end(); - }), - indexes.end()); - - if (indexes.empty()) - { - replica->state = State::CANNOT_CHOOSE; - return; - } - - /// Sort replicas by staleness. - std::stable_sort( - indexes.begin(), - indexes.end(), - [&](size_t lhs, size_t rhs) - { - return try_get_connections[lhs].result.staleness < try_get_connections[rhs].result.staleness; - }); - - replica->index = indexes[0]; - replica->connection = &*try_get_connections[indexes[0]].result.entry; - replica->state = State::READY; - replica->fd = replica->connection->getSocket()->impl()->sockfd(); - ready_indexes.insert(replica->index); -} - -void addTimeoutToReplica( - int type, - GetHedgedConnections::ReplicaStatePtr & replica, - Epoll & epoll, - std::unordered_map & timeout_fd_to_replica, - const ConnectionTimeouts & timeouts) -{ - Poco::Timespan timeout; - switch (type) - { - case TimerTypes::RECEIVE_HELLO_TIMEOUT: - timeout = timeouts.receive_hello_timeout; - break; - case TimerTypes::RECEIVE_TABLES_STATUS_TIMEOUT: - timeout = timeouts.receive_tables_status_timeout; - break; - case TimerTypes::RECEIVE_DATA_TIMEOUT: - timeout = timeouts.receive_data_timeout; - break; - case TimerTypes::RECEIVE_TIMEOUT: - timeout = timeouts.receive_timeout; - break; - default: - throw Exception("Unknown timeout type", ErrorCodes::BAD_ARGUMENTS); - } - - TimerDescriptorPtr timeout_descriptor = std::make_shared(); - timeout_descriptor->setType(type); - timeout_descriptor->setRelative(timeout); - epoll.add(timeout_descriptor->getDescriptor()); - timeout_fd_to_replica[timeout_descriptor->getDescriptor()] = replica; - replica->active_timeouts[timeout_descriptor->getDescriptor()] = std::move(timeout_descriptor); -} - -void removeTimeoutsFromReplica( - GetHedgedConnections::ReplicaStatePtr & replica, - Epoll & epoll, - std::unordered_map & timeout_fd_to_replica) -{ - for (auto & [fd, _] : replica->active_timeouts) - { - epoll.remove(fd); - timeout_fd_to_replica.erase(fd); - } - replica->active_timeouts.clear(); -} - -void removeTimeoutFromReplica( - int type, - GetHedgedConnections::ReplicaStatePtr & replica, - Epoll & epoll, - std::unordered_map & timeout_fd_to_replica) -{ - auto it = std::find_if( - replica->active_timeouts.begin(), - replica->active_timeouts.end(), - [type](auto & value){ return value.second->getType() == type; } - ); - - if (it != replica->active_timeouts.end()) - { - epoll.remove(it->first); - timeout_fd_to_replica.erase(it->first); - replica->active_timeouts.erase(it); - } -} - -} -#endif diff --git a/src/Client/GetHedgedConnections.h b/src/Client/GetHedgedConnections.h deleted file mode 100644 index 8638367e184..00000000000 --- a/src/Client/GetHedgedConnections.h +++ /dev/null @@ -1,173 +0,0 @@ -#pragma once - -#if defined(OS_LINUX) - -#include -#include -#include -#include -#include -#include - -namespace DB -{ - -using TimerDescriptorPtr = std::shared_ptr; - -/// Class for establishing hedged connections with replicas. -/// It works with multiple replicas simultaneously without blocking by using epoll. -class GetHedgedConnections -{ -public: - using ShuffledPool = ConnectionPoolWithFailover::Base::ShuffledPool; - - enum State - { - EMPTY = 0, - READY = 1, - NOT_READY = 2, - CANNOT_CHOOSE = 3, - }; - - struct ReplicaState - { - Connection * connection = nullptr; - State state = State::EMPTY; - int index = -1; - int fd = -1; - size_t parallel_replica_offset = 0; - std::unordered_map> active_timeouts; - - void reset() - { - connection = nullptr; - state = State::EMPTY; - index = -1; - fd = -1; - parallel_replica_offset = 0; - active_timeouts.clear(); - } - - bool isReady() const { return state == State::READY; } - bool isNotReady() const { return state == State::NOT_READY; } - bool isEmpty() const { return state == State::EMPTY; } - bool isCannotChoose() const { return state == State::CANNOT_CHOOSE; } - }; - - using ReplicaStatePtr = std::shared_ptr; - - GetHedgedConnections(const ConnectionPoolWithFailoverPtr & pool_, - const Settings * settings_, - const ConnectionTimeouts & timeouts_, - std::shared_ptr table_to_check_ = nullptr); - - /// Create and return connections according to pool_mode. - std::vector getManyConnections(PoolMode pool_mode); - - /// Try to establish connection to the new replica. If non_blocking is false, this function will block - /// until establishing connection to the new replica (returned replica state might be READY or CANNOT_CHOOSE). - /// If non_blocking is true, this function will try to establish connection to the new replica without blocking - /// (returned replica state might be READY, NOT_READY and CANNOT_CHOOSE). - ReplicaStatePtr getNextConnection(bool non_blocking); - - /// Check if we can try to produce new READY replica. - bool canGetNewConnection() const { return ready_indexes.size() + failed_pools_count < shuffled_pools.size(); } - - /// Stop working with all replicas that are not READY. - void stopChoosingReplicas(); - - bool hasEventsInProcess() const { return epoll.size() > 0; } - - int getFileDescriptor() const { return epoll.getFileDescriptor(); } - - const ConnectionTimeouts & getConnectionTimeouts() const { return timeouts; } - - ~GetHedgedConnections(); - -private: - - enum Action - { - FINISH = 0, - PROCESS_EPOLL_EVENTS = 1, - TRY_NEXT_REPLICA = 2, - }; - - Action startTryGetConnection(int index, ReplicaStatePtr & replica); - - Action processTryGetConnectionStage(ReplicaStatePtr & replica, bool remove_from_epoll = false); - - /// Find an index of the next free replica to start connection. - /// Return -1 if there is no free replica. - int getNextIndex(); - - int getReadyFileDescriptor(AsyncCallback async_callback = {}); - - void addTimeouts(ReplicaStatePtr & replica); - - void processFailedConnection(ReplicaStatePtr & replica); - - void processReceiveTimeout(ReplicaStatePtr & replica); - - bool processReplicaEvent(ReplicaStatePtr & replica, bool non_blocking); - - bool processTimeoutEvent(ReplicaStatePtr & replica, TimerDescriptorPtr timeout_descriptor, bool non_blocking); - - ReplicaStatePtr processEpollEvents(bool non_blocking = false); - - void setBestUsableReplica(ReplicaStatePtr & replica); - - ReplicaStatePtr createNewReplica() { return std::make_shared(); } - - const ConnectionPoolWithFailoverPtr pool; - const Settings * settings; - const ConnectionTimeouts timeouts; - std::shared_ptr table_to_check; - - std::vector try_get_connections; - std::vector shuffled_pools; - - /// Map socket file descriptor to replica. - std::unordered_map fd_to_replica; - /// Map timeout file descriptor to replica. - std::unordered_map timeout_fd_to_replica; - - /// Indexes of replicas, that are in process of connection. - std::unordered_set indexes_in_process; - /// Indexes of ready replicas. - std::unordered_set ready_indexes; - - int last_used_index; - bool fallback_to_stale_replicas; - Epoll epoll; - Poco::Logger * log; - std::string fail_messages; - size_t entries_count; - size_t usable_count; - size_t failed_pools_count; - size_t max_tries; -}; - -/// Add timeout with particular type to replica and add it to epoll. -void addTimeoutToReplica( - int type, - GetHedgedConnections::ReplicaStatePtr & replica, - Epoll & epoll, - std::unordered_map & timeout_fd_to_replica, - const ConnectionTimeouts & timeouts); - -/// Remove timeout with particular type from replica and epoll. -void removeTimeoutFromReplica( - int type, - GetHedgedConnections::ReplicaStatePtr & replica, - Epoll & epoll, - std::unordered_map & timeout_fd_to_replica); - -/// Remove all timeouts from replica and epoll. -void removeTimeoutsFromReplica( - GetHedgedConnections::ReplicaStatePtr & replica, - Epoll & epoll, - std::unordered_map & timeout_fd_to_replica); - -} -#endif diff --git a/src/Client/HedgedConnections.cpp b/src/Client/HedgedConnections.cpp index f4810a7d79c..a6ffc3cbd1d 100644 --- a/src/Client/HedgedConnections.cpp +++ b/src/Client/HedgedConnections.cpp @@ -10,6 +10,7 @@ namespace ErrorCodes extern const int MISMATCH_REPLICAS_DATA_SOURCES; extern const int LOGICAL_ERROR; extern const int SOCKET_TIMEOUT; + extern const int ALL_CONNECTION_TRIES_FAILED; } HedgedConnections::HedgedConnections( @@ -19,29 +20,35 @@ HedgedConnections::HedgedConnections( const ThrottlerPtr & throttler_, PoolMode pool_mode, std::shared_ptr table_to_check_) - : get_hedged_connections(pool_, &settings_, timeouts_, table_to_check_), settings(settings_), throttler(throttler_), log(&Poco::Logger::get("HedgedConnections")) + : hedged_connections_factory(pool_, &settings_, timeouts_, table_to_check_) + , settings(settings_) + , throttler(throttler_) + , log(&Poco::Logger::get("HedgedConnections")) { - std::vector replicas_states = get_hedged_connections.getManyConnections(pool_mode); + std::vector connections = hedged_connections_factory.getManyConnections(pool_mode); - for (size_t i = 0; i != replicas_states.size(); ++i) + ReplicaState replica; + for (size_t i = 0; i != connections.size(); ++i) { - replicas_states[i]->parallel_replica_offset = i; - replicas_states[i]->connection->setThrottler(throttler_); - epoll.add(replicas_states[i]->fd); - fd_to_replica[replicas_states[i]->fd] = replicas_states[i]; - replicas.push_back({std::move(replicas_states[i])}); - active_connections_count_by_offset[i] = 1; + replica.connection = connections[i]; + replica.connection->setThrottler(throttler_); + int socket_fd = replica.connection->getSocket()->impl()->sockfd(); + epoll.add(socket_fd); + fd_to_replica_location[socket_fd] = ReplicaLocation{i, 0}; + offset_states.push_back(OffsetState{{replica}, 1, false}); } - pipeline_for_new_replicas.add([throttler_](ReplicaStatePtr & replica_){ replica_->connection->setThrottler(throttler_); }); + active_connection_count = connections.size(); + offsets_with_received_first_data_packet = 0; + pipeline_for_new_replicas.add([throttler_](ReplicaState & replica_) { replica_.connection->setThrottler(throttler_); }); } -void HedgedConnections::Pipeline::add(std::function send_function) +void HedgedConnections::Pipeline::add(std::function send_function) { pipeline.push_back(send_function); } -void HedgedConnections::Pipeline::run(ReplicaStatePtr & replica) +void HedgedConnections::Pipeline::run(ReplicaState & replica) { for (auto & send_func : pipeline) send_func(replica); @@ -54,11 +61,11 @@ void HedgedConnections::sendScalarsData(Scalars & data) if (!sent_query) throw Exception("Cannot send scalars data: query not yet sent.", ErrorCodes::LOGICAL_ERROR); - auto send_scalars_data = [&data](ReplicaStatePtr & replica) { replica->connection->sendScalarsData(data); }; + auto send_scalars_data = [&data](ReplicaState & replica) { replica.connection->sendScalarsData(data); }; - for (auto & replicas_with_same_offset : replicas) - for (auto & replica : replicas_with_same_offset) - if (replica->isReady()) + for (auto & offset_state : offset_states) + for (auto & replica : offset_state.replicas) + if (replica.connection) send_scalars_data(replica); pipeline_for_new_replicas.add(send_scalars_data); @@ -74,11 +81,11 @@ void HedgedConnections::sendExternalTablesData(std::vector & if (data.size() != size()) throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); - auto send_external_tables_data = [&data](ReplicaStatePtr & replica) { replica->connection->sendExternalTablesData(data[0]); }; + auto send_external_tables_data = [&data](ReplicaState & replica) { replica.connection->sendExternalTablesData(data[0]); }; - for (auto & replicas_with_same_offset : replicas) - for (auto & replica : replicas_with_same_offset) - if (replica->isReady()) + for (auto & offset_state : offset_states) + for (auto & replica : offset_state.replicas) + if (replica.connection) send_external_tables_data(replica); pipeline_for_new_replicas.add(send_external_tables_data); @@ -97,11 +104,11 @@ void HedgedConnections::sendQuery( if (sent_query) throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR); - for (auto & replicas_with_same_offset : replicas) + for (auto & offset_state : offset_states) { - for (auto & replica : replicas_with_same_offset) + for (auto & replica : offset_state.replicas) { - if (replica->connection->getServerRevision(timeouts) < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD) + if (replica.connection->getServerRevision(timeouts) < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD) { disable_two_level_aggregation = true; break; @@ -111,30 +118,29 @@ void HedgedConnections::sendQuery( break; } - auto send_query = [this, timeouts, query, query_id, stage, client_info, with_pending_data](ReplicaStatePtr & replica) - { - Settings modified_settings = this->settings; + auto send_query = [this, timeouts, query, query_id, stage, client_info, with_pending_data](ReplicaState & replica) { + Settings modified_settings = settings; - if (this->disable_two_level_aggregation) + if (disable_two_level_aggregation) { /// Disable two-level aggregation due to version incompatibility. modified_settings.group_by_two_level_threshold = 0; modified_settings.group_by_two_level_threshold_bytes = 0; } - if (this->replicas.size() > 1) + if (offset_states.size() > 1) { - modified_settings.parallel_replicas_count = this->replicas.size(); - modified_settings.parallel_replica_offset = replica->parallel_replica_offset; + modified_settings.parallel_replicas_count = offset_states.size(); + modified_settings.parallel_replica_offset = fd_to_replica_location[replica.connection->getSocket()->impl()->sockfd()].offset; } - replica->connection->sendQuery(timeouts, query, query_id, stage, &modified_settings, &client_info, with_pending_data); - addTimeoutToReplica(TimerTypes::RECEIVE_TIMEOUT, replica, this->epoll, this->timeout_fd_to_replica, timeouts); - addTimeoutToReplica(TimerTypes::RECEIVE_DATA_TIMEOUT, replica, this->epoll, this->timeout_fd_to_replica, timeouts); + replica.connection->sendQuery(timeouts, query, query_id, stage, &modified_settings, &client_info, with_pending_data); + addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_TIMEOUT, replica); + addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_DATA_TIMEOUT, replica); }; - for (auto & replicas_with_same_offset : replicas) - for (auto & replica : replicas_with_same_offset) + for (auto & offset_status : offset_states) + for (auto & replica : offset_status.replicas) send_query(replica); pipeline_for_new_replicas.add(send_query); @@ -145,16 +151,20 @@ void HedgedConnections::disconnect() { std::lock_guard lock(cancel_mutex); - for (auto & replicas_with_same_offset : replicas) - for (auto & replica : replicas_with_same_offset) - if (replica->isReady()) + for (auto & offset_status : offset_states) + for (auto & replica : offset_status.replicas) + if (replica.connection) finishProcessReplica(replica, true); - if (get_hedged_connections.hasEventsInProcess()) + if (hedged_connections_factory.hasEventsInProcess()) { - get_hedged_connections.stopChoosingReplicas(); if (next_replica_in_process) - epoll.remove(get_hedged_connections.getFileDescriptor()); + { + epoll.remove(hedged_connections_factory.getFileDescriptor()); + next_replica_in_process = false; + } + + hedged_connections_factory.stopChoosingReplicas(); } } @@ -165,13 +175,13 @@ std::string HedgedConnections::dumpAddresses() const std::string addresses; bool is_first = true; - for (const auto & replicas_with_same_offset : replicas) + for (const auto & offset_state : offset_states) { - for (const auto & replica : replicas_with_same_offset) + for (const auto & replica : offset_state.replicas) { - if (replica->isReady()) + if (replica.connection) { - addresses += (is_first ? "" : "; ") + replica->connection->getDescription(); + addresses += (is_first ? "" : "; ") + replica.connection->getDescription(); is_first = false; } } @@ -187,15 +197,14 @@ void HedgedConnections::sendCancel() if (!sent_query || cancelled) throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR); - for (auto & replicas_with_same_offset : replicas) - for (auto & replica : replicas_with_same_offset) - if (replica->isReady()) - replica->connection->sendCancel(); + for (auto & offset_status : offset_states) + for (auto & replica : offset_status.replicas) + if (replica.connection) + replica.connection->sendCancel(); cancelled = true; } - Packet HedgedConnections::drain() { std::lock_guard lock(cancel_mutex); @@ -252,26 +261,24 @@ Packet HedgedConnections::receivePacketUnlocked(AsyncCallback async_callback) Packet HedgedConnections::receivePacketImpl(AsyncCallback async_callback) { int event_fd; - ReplicaStatePtr replica = nullptr; Packet packet; bool finish = false; while (!finish) { event_fd = getReadyFileDescriptor(async_callback); - if (fd_to_replica.find(event_fd) != fd_to_replica.end()) + if (fd_to_replica_location.contains(event_fd)) { - replica = fd_to_replica[event_fd]; - packet = receivePacketFromReplica(replica, async_callback); + packet = receivePacketFromReplica(fd_to_replica_location[event_fd], async_callback); finish = true; } - else if (timeout_fd_to_replica.find(event_fd) != timeout_fd_to_replica.end()) + else if (timeout_fd_to_replica_location.contains(event_fd)) { - replica = timeout_fd_to_replica[event_fd]; - processTimeoutEvent(replica, replica->active_timeouts[event_fd]); + ReplicaLocation location = timeout_fd_to_replica_location[event_fd]; + processTimeoutEvent(location, offset_states[location.offset].replicas[location.index].active_timeouts[event_fd]); } - else if (event_fd == get_hedged_connections.getFileDescriptor()) - tryGetNewReplica(); + else if (event_fd == hedged_connections_factory.getFileDescriptor()) + tryGetNewReplica(false); else throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR); } @@ -281,30 +288,34 @@ Packet HedgedConnections::receivePacketImpl(AsyncCallback async_callback) int HedgedConnections::getReadyFileDescriptor(AsyncCallback async_callback) { - for (auto & [fd, replica] : fd_to_replica) - if (replica->connection->hasReadPendingData()) - return replica->fd; + for (auto & [fd, location] : fd_to_replica_location) + { + ReplicaState & replica = offset_states[location.offset].replicas[location.index]; + if (replica.connection->hasReadPendingData()) + return replica.connection->getSocket()->impl()->sockfd(); + } - return epoll.getReady(std::move(async_callback)).data.fd; + return epoll.getReady(true, std::move(async_callback)).data.fd; } -Packet HedgedConnections::receivePacketFromReplica(ReplicaStatePtr & replica, AsyncCallback async_callback) +Packet HedgedConnections::receivePacketFromReplica(ReplicaLocation & replica_location, AsyncCallback async_callback) { - Packet packet = replica->connection->receivePacket(std::move(async_callback)); + ReplicaState & replica = offset_states[replica_location.offset].replicas[replica_location.index]; + removeTimeoutFromReplica(ConnectionTimeoutType::RECEIVE_TIMEOUT, replica); + Packet packet = replica.connection->receivePacket(std::move(async_callback)); switch (packet.type) { case Protocol::Server::Data: - removeTimeoutsFromReplica(replica, epoll, timeout_fd_to_replica); - processReceiveData(replica); - addTimeoutToReplica(TimerTypes::RECEIVE_TIMEOUT, replica, epoll, timeout_fd_to_replica, get_hedged_connections.getConnectionTimeouts()); + if (!offset_states[replica_location.offset].first_packet_of_data_received) + processReceivedFirstDataPacket(replica_location); + addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_TIMEOUT, replica); break; case Protocol::Server::Progress: case Protocol::Server::ProfileInfo: case Protocol::Server::Totals: case Protocol::Server::Extremes: case Protocol::Server::Log: - removeTimeoutFromReplica(TimerTypes::RECEIVE_TIMEOUT, replica, epoll, timeout_fd_to_replica); - addTimeoutToReplica(TimerTypes::RECEIVE_TIMEOUT, replica, epoll, timeout_fd_to_replica, get_hedged_connections.getConnectionTimeouts()); + addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_TIMEOUT, replica); break; case Protocol::Server::EndOfStream: @@ -320,96 +331,155 @@ Packet HedgedConnections::receivePacketFromReplica(ReplicaStatePtr & replica, As return packet; } -void HedgedConnections::processReceiveData(ReplicaStatePtr & replica) +void HedgedConnections::processReceivedFirstDataPacket(ReplicaLocation & replica_location) { /// When we receive first packet of data from replica, we stop working with replicas, that are /// responsible for the same offset. - offsets_with_received_data.insert(replica->parallel_replica_offset); + OffsetState & offset_state = offset_states[replica_location.offset]; + removeTimeoutFromReplica(ConnectionTimeoutType::RECEIVE_DATA_TIMEOUT, offset_state.replicas[replica_location.index]); + ++offsets_with_received_first_data_packet; + offset_state.first_packet_of_data_received = true; - for (auto & other_replica : replicas[replica->parallel_replica_offset]) + for (size_t i = 0; i != offset_state.replicas.size(); ++i) { - if (other_replica->isReady() && other_replica != replica) + if (i != replica_location.index && offset_state.replicas[i].connection) { - other_replica->connection->sendCancel(); - finishProcessReplica(other_replica, true); + offset_state.replicas[i].connection->sendCancel(); + finishProcessReplica(offset_state.replicas[i], true); } } /// If we received data from replicas with all offsets, we need to stop choosing new replicas. - if (get_hedged_connections.hasEventsInProcess() && offsets_with_received_data.size() == replicas.size()) + if (hedged_connections_factory.hasEventsInProcess() && offsets_with_received_first_data_packet == offset_states.size()) { - get_hedged_connections.stopChoosingReplicas(); if (next_replica_in_process) - epoll.remove(get_hedged_connections.getFileDescriptor()); + { + epoll.remove(hedged_connections_factory.getFileDescriptor()); + next_replica_in_process = false; + } + hedged_connections_factory.stopChoosingReplicas(); } } -void HedgedConnections::processTimeoutEvent(ReplicaStatePtr & replica, TimerDescriptorPtr timeout_descriptor) +void HedgedConnections::processTimeoutEvent(ReplicaLocation & replica_location, ConnectionTimeoutDescriptorPtr timeout_descriptor) { - epoll.remove(timeout_descriptor->getDescriptor()); - replica->active_timeouts.erase(timeout_descriptor->getDescriptor()); - timeout_fd_to_replica.erase(timeout_descriptor->getDescriptor()); + ReplicaState & replica = offset_states[replica_location.offset].replicas[replica_location.index]; + epoll.remove(timeout_descriptor->timer.getDescriptor()); + replica.active_timeouts.erase(timeout_descriptor->timer.getDescriptor()); + timeout_fd_to_replica_location.erase(timeout_descriptor->timer.getDescriptor()); - if (timeout_descriptor->getType() == TimerTypes::RECEIVE_TIMEOUT) + if (timeout_descriptor->type == ConnectionTimeoutType::RECEIVE_TIMEOUT) { - size_t offset = replica->parallel_replica_offset; finishProcessReplica(replica, true); - /// Check if there is no active connections with the same offset. - if (active_connections_count_by_offset[offset] == 0) + /// Check if there is no active connections with the same offset and there is no new replica in process. + if (offset_states[replica_location.offset].active_connection_count == 0 && !next_replica_in_process) throw NetException("Receive timeout expired", ErrorCodes::SOCKET_TIMEOUT); } - else if (timeout_descriptor->getType() == TimerTypes::RECEIVE_DATA_TIMEOUT) + else if (timeout_descriptor->type == ConnectionTimeoutType::RECEIVE_DATA_TIMEOUT) { - offsets_queue.push(replica->parallel_replica_offset); - tryGetNewReplica(); + offsets_queue.push(replica_location.offset); + tryGetNewReplica(true); } } -void HedgedConnections::tryGetNewReplica() +void HedgedConnections::tryGetNewReplica(bool start_new_connection) { - ReplicaStatePtr new_replica = get_hedged_connections.getNextConnection(/*non_blocking*/ true); + Connection * connection = nullptr; + HedgedConnectionsFactory::State state = hedged_connections_factory.getNextConnection(start_new_connection, connection); /// Skip replicas that doesn't support two-level aggregation if we didn't disable it in sendQuery. - while (new_replica->isReady() && !disable_two_level_aggregation - && new_replica->connection->getServerRevision(get_hedged_connections.getConnectionTimeouts()) < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD) - new_replica = get_hedged_connections.getNextConnection(/*non_blocking*/ true); + while (state == HedgedConnectionsFactory::State::READY && !disable_two_level_aggregation + && connection->getServerRevision(hedged_connections_factory.getConnectionTimeouts()) + < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD) + state = hedged_connections_factory.getNextConnection(true, connection); - if (new_replica->isReady()) + if (state == HedgedConnectionsFactory::State::READY) { - new_replica->parallel_replica_offset = offsets_queue.front(); + size_t offset = offsets_queue.front(); offsets_queue.pop(); - replicas[new_replica->parallel_replica_offset].push_back(new_replica); - epoll.add(new_replica->fd); - fd_to_replica[new_replica->fd] = new_replica; - ++active_connections_count_by_offset[new_replica->parallel_replica_offset]; - pipeline_for_new_replicas.run(new_replica); + size_t index = offset_states[offset].replicas.size(); + + ReplicaState replica; + replica.connection = connection; + int socket_fd = replica.connection->getSocket()->impl()->sockfd(); + epoll.add(socket_fd); + fd_to_replica_location[socket_fd] = ReplicaLocation{offset, index}; + offset_states[offset].replicas.push_back(replica); + ++offset_states[offset].active_connection_count; + ++active_connection_count; + pipeline_for_new_replicas.run(replica); } - else if (new_replica->isNotReady() && !next_replica_in_process) + else if (state == HedgedConnectionsFactory::State::NOT_READY && !next_replica_in_process) { - epoll.add(get_hedged_connections.getFileDescriptor()); + epoll.add(hedged_connections_factory.getFileDescriptor()); next_replica_in_process = true; } - if (next_replica_in_process && (new_replica->isCannotChoose() || offsets_queue.empty())) + /// Check if we cannot get new replica and there is no active replica with needed offsets. + else if (state == HedgedConnectionsFactory::State::CANNOT_CHOOSE) { - epoll.remove(get_hedged_connections.getFileDescriptor()); + while (!offsets_queue.empty()) + { + if (offset_states[offsets_queue.front()].active_connection_count == 0) + throw Exception("Cannot find enough connections to replicas", ErrorCodes::ALL_CONNECTION_TRIES_FAILED); + offsets_queue.pop(); + } + } + + /// Check if we don't need to listen hedged_connections_factory file descriptor in epoll anymore. + if (next_replica_in_process && (state == HedgedConnectionsFactory::State::CANNOT_CHOOSE || offsets_queue.empty())) + { + epoll.remove(hedged_connections_factory.getFileDescriptor()); next_replica_in_process = false; } } -void HedgedConnections::finishProcessReplica(ReplicaStatePtr & replica, bool disconnect) +void HedgedConnections::finishProcessReplica(ReplicaState & replica, bool disconnect) { - removeTimeoutsFromReplica(replica, epoll, timeout_fd_to_replica); - epoll.remove(replica->fd); - fd_to_replica.erase(replica->fd); - --active_connections_count_by_offset[replica->parallel_replica_offset]; - if (active_connections_count_by_offset[replica->parallel_replica_offset] == 0) - active_connections_count_by_offset.erase(replica->parallel_replica_offset); + removeTimeoutsFromReplica(replica); + int socket_fd = replica.connection->getSocket()->impl()->sockfd(); + epoll.remove(socket_fd); + --offset_states[fd_to_replica_location[socket_fd].offset].active_connection_count; + fd_to_replica_location.erase(socket_fd); + --active_connection_count; if (disconnect) - replica->connection->disconnect(); - replica->reset(); + replica.connection->disconnect(); + replica.connection = nullptr; +} + +void HedgedConnections::addTimeoutToReplica(ConnectionTimeoutType type, ReplicaState & replica) +{ + ConnectionTimeoutDescriptorPtr timeout_descriptor + = createConnectionTimeoutDescriptor(type, hedged_connections_factory.getConnectionTimeouts()); + epoll.add(timeout_descriptor->timer.getDescriptor()); + timeout_fd_to_replica_location[timeout_descriptor->timer.getDescriptor()] + = fd_to_replica_location[replica.connection->getSocket()->impl()->sockfd()]; + replica.active_timeouts[timeout_descriptor->timer.getDescriptor()] = std::move(timeout_descriptor); +} + +void HedgedConnections::removeTimeoutsFromReplica(ReplicaState & replica) +{ + for (auto & [fd, _] : replica.active_timeouts) + { + epoll.remove(fd); + timeout_fd_to_replica_location.erase(fd); + } + replica.active_timeouts.clear(); +} + +void HedgedConnections::removeTimeoutFromReplica(ConnectionTimeoutType type, ReplicaState & replica) +{ + auto it = std::find_if( + replica.active_timeouts.begin(), replica.active_timeouts.end(), [type](auto & value) { return value.second->type == type; }); + + if (it != replica.active_timeouts.end()) + { + epoll.remove(it->first); + timeout_fd_to_replica_location.erase(it->first); + replica.active_timeouts.erase(it); + } } } diff --git a/src/Client/HedgedConnections.h b/src/Client/HedgedConnections.h index 8081fa6739d..6931db9ede6 100644 --- a/src/Client/HedgedConnections.h +++ b/src/Client/HedgedConnections.h @@ -1,18 +1,41 @@ #pragma once #if defined(OS_LINUX) -#include -#include #include #include +#include +#include namespace DB { +/** To receive data from multiple replicas (connections) from one shard asynchronously, + * The principe of Hedged Connections is used to reduce tail latency: + * (if we don't receive data from replica for a long time, we try to get new replica + * and send query to it, without cancelling working with previous replica). This class + * supports all functionality that MultipleConnections has. + */ class HedgedConnections : public IConnections { public: - using ReplicaStatePtr = GetHedgedConnections::ReplicaStatePtr; + struct ReplicaState + { + Connection * connection = nullptr; + std::unordered_map active_timeouts; + }; + + struct ReplicaLocation + { + size_t offset; + size_t index; + }; + + struct OffsetState + { + std::vector replicas; + size_t active_connection_count; + bool first_packet_of_data_received; + }; HedgedConnections(const ConnectionPoolWithFailoverPtr & pool_, const Settings & settings_, @@ -45,57 +68,67 @@ public: std::string dumpAddresses() const override; - size_t size() const override { return replicas.size(); } + size_t size() const override { return offset_states.size(); } - bool hasActiveConnections() const override { return !active_connections_count_by_offset.empty(); } + bool hasActiveConnections() const override { return active_connection_count > 0; } private: /// We will save actions with replicas in pipeline to perform them on the new replicas. class Pipeline { public: - void add(std::function send_function); + void add(std::function send_function); - void run(ReplicaStatePtr & replica); + void run(ReplicaState & replica); private: - std::vector> pipeline; + std::vector> pipeline; }; - Packet receivePacketFromReplica(ReplicaStatePtr & replica, AsyncCallback async_callback = {}); + Packet receivePacketFromReplica(ReplicaLocation & replica_location, AsyncCallback async_callback = {}); Packet receivePacketImpl(AsyncCallback async_callback = {}); - void processReceiveData(ReplicaStatePtr & replica); + void processReceivedFirstDataPacket(ReplicaLocation & replica_location); - void processTimeoutEvent(ReplicaStatePtr & replica, TimerDescriptorPtr timeout_descriptor); + void processTimeoutEvent(ReplicaLocation & replica_location, ConnectionTimeoutDescriptorPtr timeout_descriptor); - void tryGetNewReplica(); + void tryGetNewReplica(bool start_new_connection); - void finishProcessReplica(ReplicaStatePtr & replica, bool disconnect); + void finishProcessReplica(ReplicaState & replica, bool disconnect); int getReadyFileDescriptor(AsyncCallback async_callback = {}); - GetHedgedConnections get_hedged_connections; + void addTimeoutToReplica(ConnectionTimeoutType type, ReplicaState & replica); - /// All replicas in replicas[offset] are responsible for process query + void removeTimeoutsFromReplica(ReplicaState & replica); + + void removeTimeoutFromReplica(ConnectionTimeoutType type, ReplicaState & replica); + + + HedgedConnectionsFactory hedged_connections_factory; + + /// All replicas in offset_states[offset] is responsible for process query /// with setting parallel_replica_offset = offset. In common situations - /// replicas[offset].size() = 1 (like in MultiplexedConnections). - std::vector> replicas; + /// replica_states[offset].replicas.size() = 1 (like in MultiplexedConnections). + std::vector offset_states; - /// Map socket file descriptor to replica. - std::unordered_map fd_to_replica; - /// Map timeout file descriptor to replica. - std::unordered_map timeout_fd_to_replica; + /// Map socket file descriptor to replica location (it's offset and index in OffsetState.replicas). + std::unordered_map fd_to_replica_location; + /// Map timeout file descriptor to replica location (it's offset and index in OffsetState.replicas). + std::unordered_map timeout_fd_to_replica_location; /// A queue of offsets for new replicas. When we get RECEIVE_DATA_TIMEOUT from /// the replica, we push it's offset to this queue and start trying to get /// new replica. std::queue offsets_queue; - /// Map offset to amount of active connections, responsible to this offset. - std::unordered_map active_connections_count_by_offset; + /// The current number of valid connections to the replicas of this shard. + size_t active_connection_count; - std::unordered_set offsets_with_received_data; + /// We count offsets which received first packet of data, + /// it's needed to cancel choosing new replicas when all offsets + /// received their first packet of data. + size_t offsets_with_received_first_data_packet; Pipeline pipeline_for_new_replicas; @@ -103,8 +136,8 @@ private: /// If we didn't disabled it, we need to skip this replica. bool disable_two_level_aggregation = false; - /// next_replica_in_process is true when get_hedged_connections.getFileDescriptor() - /// is in epoll now and false otherwise. + /// This flag means we need to get connection with new replica, but no replica is ready. + /// When it's true, hedged_connections_factory.getFileDescriptor() is in epoll. bool next_replica_in_process = false; Epoll epoll; diff --git a/src/Client/HedgedConnectionsFactory.cpp b/src/Client/HedgedConnectionsFactory.cpp new file mode 100644 index 00000000000..22666642b4e --- /dev/null +++ b/src/Client/HedgedConnectionsFactory.cpp @@ -0,0 +1,475 @@ +#if defined(OS_LINUX) + +#include +#include + + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int LOGICAL_ERROR; + extern const int ALL_CONNECTION_TRIES_FAILED; + extern const int ALL_REPLICAS_ARE_STALE; +} + +HedgedConnectionsFactory::HedgedConnectionsFactory( + const ConnectionPoolWithFailoverPtr & pool_, + const Settings * settings_, + const ConnectionTimeouts & timeouts_, + std::shared_ptr table_to_check_) + : pool(pool_), settings(settings_), timeouts(timeouts_), table_to_check(table_to_check_), log(&Poco::Logger::get("HedgedConnectionsFactory")) +{ + shuffled_pools = pool->getShuffledPools(settings); + for (size_t i = 0; i != shuffled_pools.size(); ++i) + connection_establishers.emplace_back(shuffled_pools[i].pool, &timeouts, settings, table_to_check.get(), log); + + max_tries + = (settings ? size_t{settings->connections_with_failover_max_tries} : size_t{DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES}); + + fallback_to_stale_replicas = settings && settings->fallback_to_stale_replicas_for_distributed_queries; + entries_count = 0; + usable_count = 0; + failed_pools_count = 0; +} + +HedgedConnectionsFactory::~HedgedConnectionsFactory() +{ + pool->updateSharedError(shuffled_pools); +} + +std::vector HedgedConnectionsFactory::getManyConnections(PoolMode pool_mode) +{ + size_t min_entries = (settings && settings->skip_unavailable_shards) ? 0 : 1; + + size_t max_entries; + if (pool_mode == PoolMode::GET_ALL) + { + min_entries = shuffled_pools.size(); + max_entries = shuffled_pools.size(); + } + else if (pool_mode == PoolMode::GET_ONE) + max_entries = 1; + else if (pool_mode == PoolMode::GET_MANY) + max_entries = settings ? size_t(settings->max_parallel_replicas) : 1; + else + throw DB::Exception("Unknown pool allocation mode", DB::ErrorCodes::LOGICAL_ERROR); + + std::vector connections; + connections.reserve(max_entries); + + /// Try to start establishing connections with max_entries replicas. + int index; + for (size_t i = 0; i != max_entries; ++i) + { + index = getNextIndex(); + if (index == -1) + break; + + ReplicaStatePtr replica = startEstablishingConnection(index); + if (replica->state == State::READY) + connections.push_back(replica->connection); + } + + /// Process connections until we get enough READY connections + /// (work asynchronously with all connections we started). + Connection * connection = nullptr; + while (connections.size() < max_entries) + { + auto state = processConnections(true, connection); + if (state == State::READY) + connections.push_back(connection); + else if (state == State::CANNOT_CHOOSE) + { + if (connections.size() >= min_entries) + break; + + /// Determine the reason of not enough replicas. + if (!fallback_to_stale_replicas && usable_count >= min_entries) + throw DB::Exception( + "Could not find enough connections to up-to-date replicas. Got: " + std::to_string(connections.size()) + + ", needed: " + std::to_string(min_entries), + DB::ErrorCodes::ALL_REPLICAS_ARE_STALE); + + throw DB::NetException( + "All connection tries failed. Log: \n\n" + fail_messages + "\n", + DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED); + } + } + + return connections; +} + +HedgedConnectionsFactory::State HedgedConnectionsFactory::getNextConnection(bool start_new_connection, Connection *& connection_out) +{ + if (start_new_connection) + { + /// Try to start establishing connection to the new replica. + int index = getNextIndex(); + if (index != -1) + { + ReplicaStatePtr replica = startEstablishingConnection(index); + if (replica->state == State::READY) + { + connection_out = replica->connection; + return State::READY; + } + } + } + + return processConnections(false, connection_out); +} + +HedgedConnectionsFactory::State HedgedConnectionsFactory::processConnections(bool blocking, Connection *& connection_out) +{ + ReplicaStatePtr replica = nullptr; + int index = -1; + + while (index != -1 || !epoll.empty()) + { + if (index != -1) + { + replica = startEstablishingConnection(index); + if (replica->state == State::READY) + { + connection_out = replica->connection; + return State::READY; + } + } + + if (!processEpollEvents(replica, blocking)) + return State::NOT_READY; + + if (replica->state == State::READY) + { + connection_out = replica->connection; + return State::READY; + } + + index = getNextIndex(); + } + + /// We reach this point only if there was no free up to date replica. + /// We will try to use usable replica. + + /// Check if we are not allowed to use usable replicas or there is no even a free usable replica. + if (!fallback_to_stale_replicas || !canGetNewConnection()) + return State::CANNOT_CHOOSE; + + setBestUsableReplica(replica); + connection_out = replica->connection; + return replica->state; +} + +void HedgedConnectionsFactory::stopChoosingReplicas() +{ + for (auto & [fd, replica] : fd_to_replica) + { + removeTimeoutsFromReplica(replica); + epoll.remove(fd); + connection_establishers[replica->index].reset(); + replica->reset(); + } + + fd_to_replica.clear(); +} + +int HedgedConnectionsFactory::getNextIndex() +{ + /// Check if there is no free replica. + if (entries_count + indexes_in_process.size() + failed_pools_count >= shuffled_pools.size()) + return -1; + + /// Check if it's the first time. + if (last_used_index == -1) + { + last_used_index = 0; + return 0; + } + + bool finish = false; + int next_index = last_used_index; + while (!finish) + { + next_index = (next_index + 1) % shuffled_pools.size(); + + /// Check if we can try this replica. + if (indexes_in_process.find(next_index) == indexes_in_process.end() && (max_tries == 0 || shuffled_pools[next_index].error_count < max_tries) + && connection_establishers[next_index].stage != ConnectionEstablisher::Stage::FINISHED) + finish = true; + + /// If we made a complete round, there is no replica to connect. + else if (next_index == last_used_index) + return -1; + } + + last_used_index = next_index; + return next_index; +} + +HedgedConnectionsFactory::ReplicaStatePtr HedgedConnectionsFactory::startEstablishingConnection(int index) +{ + ReplicaStatePtr replica = createNewReplica(); + + do + { + ConnectionEstablisher & connection_establisher = connection_establishers[index]; + + replica->state = State::NOT_READY; + replica->index = index; + indexes_in_process.insert(index); + + connection_establisher.reset(); + connection_establisher.run(); + + if (connection_establisher.stage != ConnectionEstablisher::Stage::FAILED) + replica->connection = &*connection_establisher.result.entry; + + processConnectionEstablisherStage(replica); + + if (replica->state == State::NOT_READY) + { + epoll.add(connection_establisher.socket_fd); + fd_to_replica[connection_establisher.socket_fd] = replica; + connection_establisher.setActionBeforeDisconnect([&](int fd) { + epoll.remove(fd); + fd_to_replica.erase(fd); + }); + addTimeouts(replica); + } + } + while (replica->state == State::EMPTY && (index = getNextIndex()) != -1); + + return replica; +} + +void HedgedConnectionsFactory::processConnectionEstablisherStage(ReplicaStatePtr & replica, bool remove_from_epoll) +{ + ConnectionEstablisher & connection_establisher = connection_establishers[replica->index]; + + if (connection_establisher.stage == ConnectionEstablisher::Stage::FINISHED) + { + indexes_in_process.erase(replica->index); + ++entries_count; + + if (remove_from_epoll) + { + epoll.remove(connection_establisher.socket_fd); + fd_to_replica.erase(connection_establisher.socket_fd); + } + + if (connection_establisher.result.is_usable) + { + ++usable_count; + if (connection_establisher.result.is_up_to_date) + { + replica->state = State::READY; + ready_indexes.insert(replica->index); + return; + } + } + + /// This replica is not up to date, we will try to find up to date. + replica->reset(); + } + else if (connection_establisher.stage == ConnectionEstablisher::Stage::FAILED) + processFailedConnection(replica); +} + +void HedgedConnectionsFactory::processFailedConnection(ReplicaStatePtr & replica) +{ + ShuffledPool & shuffled_pool = shuffled_pools[replica->index]; + LOG_WARNING( + log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), connection_establishers[replica->index].fail_message); + ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry); + + shuffled_pool.error_count = std::min(pool->getMaxErrorCup(), shuffled_pool.error_count + 1); + + if (shuffled_pool.error_count >= max_tries) + { + ++failed_pools_count; + ProfileEvents::increment(ProfileEvents::DistributedConnectionFailAtAll); + } + + std::string & fail_message = connection_establishers[replica->index].fail_message; + if (!fail_message.empty()) + fail_messages += fail_message + "\n"; + + indexes_in_process.erase(replica->index); + replica->reset(); +} + +void HedgedConnectionsFactory::addTimeouts(ReplicaStatePtr & replica) +{ + addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_TIMEOUT, replica); + + auto stage = connection_establishers[replica->index].stage; + if (stage == ConnectionEstablisher::Stage::RECEIVE_HELLO) + addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_HELLO_TIMEOUT, replica); + else if (stage == ConnectionEstablisher::Stage::RECEIVE_TABLES_STATUS) + addTimeoutToReplica(ConnectionTimeoutType::RECEIVE_TABLES_STATUS_TIMEOUT, replica); +} + +void HedgedConnectionsFactory::addTimeoutToReplica(ConnectionTimeoutType type, ReplicaStatePtr & replica) +{ + ConnectionTimeoutDescriptorPtr timeout_descriptor = createConnectionTimeoutDescriptor(type, timeouts); + epoll.add(timeout_descriptor->timer.getDescriptor()); + timeout_fd_to_replica[timeout_descriptor->timer.getDescriptor()] = replica; + replica->active_timeouts[timeout_descriptor->timer.getDescriptor()] = std::move(timeout_descriptor); +} + +void HedgedConnectionsFactory::removeTimeoutsFromReplica(ReplicaStatePtr & replica) +{ + for (auto & [fd, _] : replica->active_timeouts) + { + epoll.remove(fd); + timeout_fd_to_replica.erase(fd); + } + replica->active_timeouts.clear(); +} + +bool HedgedConnectionsFactory::processEpollEvents(ReplicaStatePtr & replica, bool blocking) +{ + int event_fd; + bool finish = false; + while (!finish) + { + event_fd = getReadyFileDescriptor(blocking); + + /// Check if there is no events. + if (event_fd == -1) + return false; + + if (fd_to_replica.find(event_fd) != fd_to_replica.end()) + { + replica = fd_to_replica[event_fd]; + processReplicaEvent(replica); + /// Check if replica is ready or we need to try next replica. + if (replica->state == State::READY || replica->state == State::EMPTY) + finish = true; + } + else if (timeout_fd_to_replica.find(event_fd) != timeout_fd_to_replica.end()) + { + replica = timeout_fd_to_replica[event_fd]; + processTimeoutEvent(replica, replica->active_timeouts[event_fd]); + /// Check if we need to try next replica. + if (replica->state == State::EMPTY) + finish = true; + } + else + throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR); + } + + return true; +} + +int HedgedConnectionsFactory::getReadyFileDescriptor(bool blocking) +{ + for (auto & [fd, replica] : fd_to_replica) + if (replica->connection->hasReadPendingData()) + return replica->connection->getSocket()->impl()->sockfd(); + + return epoll.getReady(/* blocking */blocking).data.fd; +} + +void HedgedConnectionsFactory::processReplicaEvent(ReplicaStatePtr & replica) +{ + removeTimeoutsFromReplica(replica); + connection_establishers[replica->index].run(); + processConnectionEstablisherStage(replica, true); + if (replica->state == State::NOT_READY) + addTimeouts(replica); +} + +void HedgedConnectionsFactory::processTimeoutEvent(ReplicaStatePtr & replica, ConnectionTimeoutDescriptorPtr timeout_descriptor) +{ + epoll.remove(timeout_descriptor->timer.getDescriptor()); + replica->active_timeouts.erase(timeout_descriptor->timer.getDescriptor()); + timeout_fd_to_replica[timeout_descriptor->timer.getDescriptor()]; + + if (timeout_descriptor->type == ConnectionTimeoutType::RECEIVE_TIMEOUT) + { + removeTimeoutsFromReplica(replica); + int fd = replica->connection->getSocket()->impl()->sockfd(); + epoll.remove(fd); + fd_to_replica.erase(fd); + + ConnectionEstablisher & connection_establisher = connection_establishers[replica->index]; + connection_establisher.fail_message = "Receive timeout expired (" + connection_establisher.result.entry->getDescription() + ")"; + connection_establisher.resetResult(); + connection_establisher.stage = ConnectionEstablisher::Stage::FAILED; + processFailedConnection(replica); + } + else if ((timeout_descriptor->type == ConnectionTimeoutType::RECEIVE_HELLO_TIMEOUT + || timeout_descriptor->type == ConnectionTimeoutType::RECEIVE_TABLES_STATUS_TIMEOUT) + && entries_count + indexes_in_process.size() + failed_pools_count < shuffled_pools.size()) + replica = createNewReplica(); +} + +void HedgedConnectionsFactory::setBestUsableReplica(ReplicaStatePtr & replica) +{ + std::vector indexes(connection_establishers.size()); + for (size_t i = 0; i != indexes.size(); ++i) + indexes[i] = i; + + /// Remove unusable, failed replicas and replicas that are ready or in process. + indexes.erase( + std::remove_if( + indexes.begin(), + indexes.end(), + [&](int i) + { + return connection_establishers[i].result.entry.isNull() || !connection_establishers[i].result.is_usable || + indexes_in_process.find(i) != indexes_in_process.end() || ready_indexes.find(i) != ready_indexes.end(); + }), + indexes.end()); + + if (indexes.empty()) + { + replica->state = State::CANNOT_CHOOSE; + return; + } + + /// Sort replicas by staleness. + std::stable_sort( + indexes.begin(), + indexes.end(), + [&](size_t lhs, size_t rhs) + { + return connection_establishers[lhs].result.staleness < connection_establishers[rhs].result.staleness; + }); + + replica->index = indexes[0]; + replica->connection = &*connection_establishers[indexes[0]].result.entry; + replica->state = State::READY; + ready_indexes.insert(replica->index); +} + +ConnectionTimeoutDescriptorPtr createConnectionTimeoutDescriptor(ConnectionTimeoutType type, const ConnectionTimeouts & timeouts) +{ + Poco::Timespan timeout; + switch (type) + { + case ConnectionTimeoutType::RECEIVE_HELLO_TIMEOUT: + timeout = timeouts.receive_hello_timeout; + break; + case ConnectionTimeoutType::RECEIVE_TABLES_STATUS_TIMEOUT: + timeout = timeouts.receive_tables_status_timeout; + break; + case ConnectionTimeoutType::RECEIVE_DATA_TIMEOUT: + timeout = timeouts.receive_data_timeout; + break; + case ConnectionTimeoutType::RECEIVE_TIMEOUT: + timeout = timeouts.receive_timeout; + break; + } + + ConnectionTimeoutDescriptorPtr timeout_descriptor = std::make_shared(); + timeout_descriptor->type = type; + timeout_descriptor->timer.setRelative(timeout); + return timeout_descriptor; +} + +} +#endif diff --git a/src/Client/HedgedConnectionsFactory.h b/src/Client/HedgedConnectionsFactory.h new file mode 100644 index 00000000000..d1dc262d39c --- /dev/null +++ b/src/Client/HedgedConnectionsFactory.h @@ -0,0 +1,167 @@ +#pragma once + +#if defined(OS_LINUX) + +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +enum class ConnectionTimeoutType +{ + RECEIVE_HELLO_TIMEOUT, + RECEIVE_TABLES_STATUS_TIMEOUT, + RECEIVE_DATA_TIMEOUT, + RECEIVE_TIMEOUT, +}; + +struct ConnectionTimeoutDescriptor +{ + ConnectionTimeoutType type; + TimerDescriptor timer; +}; + +using ConnectionTimeoutDescriptorPtr = std::shared_ptr; +using TimerDescriptorPtr = std::shared_ptr; + +/** Class for establishing hedged connections with replicas. + * The process of establishing connection is divided on stages, on each stage if + * replica doesn't respond for a long time, we start establishing connection with + * the next replica, without cancelling working with previous one. + * It works with multiple replicas simultaneously without blocking by using epoll. + */ +class HedgedConnectionsFactory +{ +public: + using ShuffledPool = ConnectionPoolWithFailover::Base::ShuffledPool; + + enum class State + { + EMPTY = 0, + READY = 1, + NOT_READY = 2, + CANNOT_CHOOSE = 3, + }; + + struct ReplicaState + { + Connection * connection = nullptr; + size_t index = -1; + State state = State::EMPTY; + std::unordered_map active_timeouts; + + void reset() + { + connection = nullptr; + index = -1; + state = State::EMPTY; + active_timeouts.clear(); + } + }; + + using ReplicaStatePtr = std::shared_ptr; + + HedgedConnectionsFactory(const ConnectionPoolWithFailoverPtr & pool_, + const Settings * settings_, + const ConnectionTimeouts & timeouts_, + std::shared_ptr table_to_check_ = nullptr); + + /// Create and return active connections according to pool_mode. + std::vector getManyConnections(PoolMode pool_mode); + + /// Try to get connection to the new replica without blocking. If start_new_connection is true, we start establishing connection + /// with the new replica and then call processConnections, otherwise just call processConnections. + State getNextConnection(bool start_new_connection, Connection *& connection_out); + + /// Process all current events in epoll (connections, timeouts), if there is no events in epoll and blocking is false, + /// return NOT_READY. Returned state might be READY, NOT_READY and CANNOT_CHOOSE. + /// If state is READY, replica connection will be written in connection_out. + State processConnections(bool blocking, Connection *& connection_out); + + /// Check if we can try to produce new READY replica. + bool canGetNewConnection() const { return ready_indexes.size() + failed_pools_count < shuffled_pools.size(); } + + /// Stop working with all replicas that are not READY. + void stopChoosingReplicas(); + + bool hasEventsInProcess() const { return epoll.size() > 0; } + + int getFileDescriptor() const { return epoll.getFileDescriptor(); } + + const ConnectionTimeouts & getConnectionTimeouts() const { return timeouts; } + + ~HedgedConnectionsFactory(); + +private: + ReplicaStatePtr startEstablishingConnection(int index); + + void processConnectionEstablisherStage(ReplicaStatePtr & replica, bool remove_from_epoll = false); + + /// Find an index of the next free replica to start connection. + /// Return -1 if there is no free replica. + int getNextIndex(); + + int getReadyFileDescriptor(bool blocking); + + void addTimeouts(ReplicaStatePtr & replica); + + void addTimeoutToReplica(ConnectionTimeoutType type, ReplicaStatePtr & replica); + + void removeTimeoutsFromReplica(ReplicaStatePtr & replica); + + void processFailedConnection(ReplicaStatePtr & replica); + + void processReceiveTimeout(ReplicaStatePtr & replica); + + void processReplicaEvent(ReplicaStatePtr & replica); + + void processTimeoutEvent(ReplicaStatePtr & replica, ConnectionTimeoutDescriptorPtr timeout_descriptor); + + /// Return false if there is no ready events, return true if replica is ready + /// or we need to try next replica. + bool processEpollEvents(ReplicaStatePtr & replica, bool blocking); + + void setBestUsableReplica(ReplicaStatePtr & replica); + + ReplicaStatePtr createNewReplica() { return std::make_shared(); } + + const ConnectionPoolWithFailoverPtr pool; + const Settings * settings; + const ConnectionTimeouts timeouts; + std::shared_ptr table_to_check; + + std::vector connection_establishers; + std::vector shuffled_pools; + std::vector replica_states; + + /// Map socket file descriptor to replica. + std::unordered_map fd_to_replica; + /// Map timeout file descriptor to replica. + std::unordered_map timeout_fd_to_replica; + + /// Indexes of replicas, that are in process of connection. + std::unordered_set indexes_in_process; + /// Indexes of ready replicas. + std::unordered_set ready_indexes; + + int last_used_index = -1; + bool fallback_to_stale_replicas; + Epoll epoll; + Poco::Logger * log; + std::string fail_messages; + size_t entries_count; + size_t usable_count; + size_t failed_pools_count; + size_t max_tries; +}; + +/// Create ConnectionTimeoutDescriptor with particular type. +ConnectionTimeoutDescriptorPtr createConnectionTimeoutDescriptor(ConnectionTimeoutType type, const ConnectionTimeouts & timeouts); + +} +#endif diff --git a/src/Client/ya.make b/src/Client/ya.make index 603e8290350..7a664f328f7 100644 --- a/src/Client/ya.make +++ b/src/Client/ya.make @@ -12,8 +12,8 @@ PEERDIR( SRCS( Connection.cpp ConnectionPoolWithFailover.cpp - GetHedgedConnections.cpp HedgedConnections.cpp + HedgedConnectionsFactory.cpp MultiplexedConnections.cpp TimeoutSetter.cpp diff --git a/src/Common/Epoll.cpp b/src/Common/Epoll.cpp index cb34f81cf36..bfd323b4f55 100644 --- a/src/Common/Epoll.cpp +++ b/src/Common/Epoll.cpp @@ -46,24 +46,24 @@ void Epoll::remove(int fd) --events_count; } -epoll_event Epoll::getReady(AsyncCallback async_callback) const +epoll_event Epoll::getReady(bool blocking, AsyncCallback async_callback) const { - std::vector events = getManyReady(1, true, std::move(async_callback)); - if (events.empty()) - throw Exception("Vector of ready events is empty", ErrorCodes::LOGICAL_ERROR); + epoll_event event; + event.data.fd = -1; + size_t ready_events_count = getManyReady(1, &event, blocking, std::move(async_callback)); + if (ready_events_count > 1) + throw Exception("Returned amount of events cannot be more than 1.", ErrorCodes::LOGICAL_ERROR); - return events[0]; + return event; } -std::vector Epoll::getManyReady(int max_events, bool blocking, AsyncCallback async_callback) const +size_t Epoll::getManyReady(int max_events, epoll_event * events_out, bool blocking, AsyncCallback async_callback) const { - std::vector events(max_events); - int ready_size = 0; int timeout = blocking && !async_callback ? -1 : 0; - while (ready_size <= 0 && (ready_size != 0 || blocking)) + do { - ready_size = epoll_wait(epoll_fd, events.data(), max_events, timeout); + ready_size = epoll_wait(epoll_fd, events_out, max_events, timeout); if (ready_size == -1 && errno != EINTR) throwFromErrno("Error in epoll_wait", DB::ErrorCodes::EPOLL_ERROR); @@ -71,9 +71,9 @@ std::vector Epoll::getManyReady(int max_events, bool blocking, Asyn if (ready_size == 0 && blocking && async_callback) async_callback(epoll_fd, 0, "epoll"); } + while (ready_size <= 0 && (ready_size != 0 || blocking)); - events.resize(ready_size); - return events; + return ready_size; } Epoll::~Epoll() diff --git a/src/Common/Epoll.h b/src/Common/Epoll.h index 1dc65d15d08..92638715aeb 100644 --- a/src/Common/Epoll.h +++ b/src/Common/Epoll.h @@ -16,20 +16,22 @@ class Epoll : boost::noncopyable public: Epoll(); - /// Add new file descriptor to epoll. + /// Add new file descriptor to epoll. If ptr set to nullptr, epoll_event.data.fd = fd, + /// otherwise epoll_event.data.ptr = ptr. void add(int fd, void * ptr = nullptr); /// Remove file descriptor to epoll. void remove(int fd); - /// Get events from epoll. If blocking is false and there are no ready events, + /// Get events from epoll. Events are written in events_out, this function returns an amount of ready events. + /// If blocking is false and there are no ready events, /// return empty vector, otherwise wait for ready events. If blocking is true, /// async_callback is given and there is no ready events, async_callback is called /// with epoll file descriptor. - std::vector getManyReady(int max_events, bool blocking, AsyncCallback async_callback = {}) const; + size_t getManyReady(int max_events, epoll_event * events_out, bool blocking, AsyncCallback async_callback = {}) const; - /// Get only one ready event, this function is always blocking. - epoll_event getReady(AsyncCallback async_callback = {}) const; + /// Get only one ready event, if blocking is false and there is no ready events, epoll_event.data.fd will be set to -1. + epoll_event getReady(bool blocking = true, AsyncCallback async_callback = {}) const; int getFileDescriptor() const { return epoll_fd; } diff --git a/src/Common/TimerDescriptor.h b/src/Common/TimerDescriptor.h index debf7cdc899..6f7003f6980 100644 --- a/src/Common/TimerDescriptor.h +++ b/src/Common/TimerDescriptor.h @@ -5,21 +5,11 @@ namespace DB { -enum TimerTypes -{ - DEFAULT, - RECEIVE_HELLO_TIMEOUT, - RECEIVE_TABLES_STATUS_TIMEOUT, - RECEIVE_DATA_TIMEOUT, - RECEIVE_TIMEOUT, -}; - /// Wrapper over timerfd. class TimerDescriptor { private: int timer_fd; - int type = TimerTypes::DEFAULT; public: explicit TimerDescriptor(int clockid = CLOCK_MONOTONIC, int flags = 0); @@ -31,12 +21,10 @@ public: TimerDescriptor & operator=(TimerDescriptor &&) = default; int getDescriptor() const { return timer_fd; } - int getType() const { return type; } void reset() const; void drain() const; void setRelative(const Poco::Timespan & timespan) const; - void setType(int type_) { type = type_; } }; } diff --git a/src/DataStreams/RemoteQueryExecutorReadContext.cpp b/src/DataStreams/RemoteQueryExecutorReadContext.cpp index c77b2d48f05..e02ac1fc1b3 100644 --- a/src/DataStreams/RemoteQueryExecutorReadContext.cpp +++ b/src/DataStreams/RemoteQueryExecutorReadContext.cpp @@ -121,19 +121,22 @@ bool RemoteQueryExecutorReadContext::checkTimeout() const bool RemoteQueryExecutorReadContext::checkTimeoutImpl() const { /// Wait for epoll will not block if it was polled externally. - std::vector events = epoll.getManyReady(epoll.size(), /* blocking = */ false); + epoll_event events[3]; + events[0].data.fd = events[1].data.fd = events[2].data.fd = -1; + + epoll.getManyReady(3, events,/* blocking = */ false); bool is_socket_ready = false; bool is_pipe_alarmed = false; bool has_timer_alarm = false; - for (const auto & event : events) + for (int i = 0; i < 3; ++i) { - if (event.data.fd == connection_fd) + if (events[i].data.fd == connection_fd) is_socket_ready = true; - if (event.data.fd == timer.getDescriptor()) + if (events[i].data.fd == timer.getDescriptor()) has_timer_alarm = true; - if (event.data.fd == pipe_fd[0]) + if (events[i].data.fd == pipe_fd[0]) is_pipe_alarmed = true; } @@ -198,7 +201,7 @@ void RemoteQueryExecutorReadContext::cancel() RemoteQueryExecutorReadContext::~RemoteQueryExecutorReadContext() { - /// connection_fd is closed by Poco::Net::Socket + /// connection_fd is closed by Poco::Net::Socket or Epoll if (pipe_fd[0] != -1) close(pipe_fd[0]); if (pipe_fd[1] != -1) diff --git a/src/IO/ConnectionTimeouts.h b/src/IO/ConnectionTimeouts.h index 01f31d6efa8..a92f75bf980 100644 --- a/src/IO/ConnectionTimeouts.h +++ b/src/IO/ConnectionTimeouts.h @@ -33,9 +33,9 @@ struct ConnectionTimeouts tcp_keep_alive_timeout(0), http_keep_alive_timeout(0), secure_connection_timeout(connection_timeout), - receive_hello_timeout(0), - receive_tables_status_timeout(0), - receive_data_timeout(0) + receive_hello_timeout(receive_timeout_), + receive_tables_status_timeout(receive_timeout_), + receive_data_timeout(receive_timeout_) { } @@ -49,9 +49,9 @@ struct ConnectionTimeouts tcp_keep_alive_timeout(tcp_keep_alive_timeout_), http_keep_alive_timeout(0), secure_connection_timeout(connection_timeout), - receive_hello_timeout(0), - receive_tables_status_timeout(0), - receive_data_timeout(0) + receive_hello_timeout(receive_timeout_), + receive_tables_status_timeout(receive_timeout_), + receive_data_timeout(receive_timeout_) { } ConnectionTimeouts(const Poco::Timespan & connection_timeout_, @@ -65,9 +65,9 @@ struct ConnectionTimeouts tcp_keep_alive_timeout(tcp_keep_alive_timeout_), http_keep_alive_timeout(http_keep_alive_timeout_), secure_connection_timeout(connection_timeout), - receive_hello_timeout(0), - receive_tables_status_timeout(0), - receive_data_timeout(0) + receive_hello_timeout(receive_timeout_), + receive_tables_status_timeout(receive_timeout_), + receive_data_timeout(receive_timeout_) { } diff --git a/src/IO/ReadBufferFromPocoSocket.cpp b/src/IO/ReadBufferFromPocoSocket.cpp index e08b9e7c8fb..1f9c732e644 100644 --- a/src/IO/ReadBufferFromPocoSocket.cpp +++ b/src/IO/ReadBufferFromPocoSocket.cpp @@ -14,7 +14,6 @@ namespace ProfileEvents namespace DB { - namespace ErrorCodes { extern const int NETWORK_ERROR; @@ -42,7 +41,7 @@ bool ReadBufferFromPocoSocket::nextImpl() /// Note that receive timeout is not checked here. External code should check it while polling. while (bytes_read < 0 && async_callback && errno == EAGAIN) { - async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), "socket (" + socket.peerAddress().toString() + ")"); + async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), socket_description); bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), internal_buffer.size(), flags); } } @@ -74,7 +73,10 @@ bool ReadBufferFromPocoSocket::nextImpl() } ReadBufferFromPocoSocket::ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size) - : BufferWithOwnMemory(buf_size), socket(socket_), peer_address(socket.peerAddress()) + : BufferWithOwnMemory(buf_size) + , socket(socket_) + , peer_address(socket.peerAddress()) + , socket_description("socket (" + peer_address.toString() + ")") { } diff --git a/src/IO/ReadBufferFromPocoSocket.h b/src/IO/ReadBufferFromPocoSocket.h index 7fd1b646846..73e83dfb5f9 100644 --- a/src/IO/ReadBufferFromPocoSocket.h +++ b/src/IO/ReadBufferFromPocoSocket.h @@ -34,6 +34,7 @@ public: private: AsyncCallback async_callback; + std::string socket_description; }; }