2021-02-06 00:54:27 +00:00
|
|
|
#pragma once
|
|
|
|
|
|
|
|
#if defined(OS_LINUX)
|
|
|
|
|
|
|
|
#include <Common/TimerDescriptor.h>
|
2021-02-17 17:34:52 +00:00
|
|
|
#include <Common/Epoll.h>
|
|
|
|
#include <Common/FiberStack.h>
|
|
|
|
#include <Common/Fiber.h>
|
|
|
|
#include <Client/ConnectionEstablisher.h>
|
2021-02-06 00:54:27 +00:00
|
|
|
#include <Client/ConnectionPoolWithFailover.h>
|
|
|
|
#include <Core/Settings.h>
|
|
|
|
#include <unordered_map>
|
|
|
|
#include <memory>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
/** Class for establishing hedged connections with replicas.
|
|
|
|
* The process of establishing connection is divided on stages, on each stage if
|
|
|
|
* replica doesn't respond for a long time, we start establishing connection with
|
|
|
|
* the next replica, without cancelling working with previous one.
|
|
|
|
* It works with multiple replicas simultaneously without blocking by using epoll.
|
|
|
|
*/
|
|
|
|
class HedgedConnectionsFactory
|
|
|
|
{
|
|
|
|
public:
|
|
|
|
using ShuffledPool = ConnectionPoolWithFailover::Base::ShuffledPool;
|
2021-02-21 14:03:24 +00:00
|
|
|
using TryResult = PoolWithFailoverBase<IConnectionPool>::TryResult;
|
2021-02-06 00:54:27 +00:00
|
|
|
|
2021-02-17 17:34:52 +00:00
|
|
|
enum class State
|
|
|
|
{
|
|
|
|
READY,
|
|
|
|
NOT_READY,
|
|
|
|
CANNOT_CHOOSE,
|
|
|
|
};
|
|
|
|
|
2021-02-15 13:21:36 +00:00
|
|
|
struct ReplicaStatus
|
|
|
|
{
|
2021-02-21 14:03:24 +00:00
|
|
|
explicit ReplicaStatus(ConnectionEstablisherAsync connection_stablisher_) : connection_establisher(std::move(connection_stablisher_))
|
2021-02-15 13:21:36 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
ConnectionEstablisherAsync connection_establisher;
|
2021-02-15 13:21:36 +00:00
|
|
|
TimerDescriptor change_replica_timeout;
|
|
|
|
bool is_ready = false;
|
2021-02-06 00:54:27 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
HedgedConnectionsFactory(const ConnectionPoolWithFailoverPtr & pool_,
|
|
|
|
const Settings * settings_,
|
|
|
|
const ConnectionTimeouts & timeouts_,
|
|
|
|
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr);
|
|
|
|
|
|
|
|
/// Create and return active connections according to pool_mode.
|
|
|
|
std::vector<Connection *> getManyConnections(PoolMode pool_mode);
|
|
|
|
|
2021-02-08 11:06:45 +00:00
|
|
|
/// Try to get connection to the new replica. If start_new_connection is true, we start establishing connection
|
2021-02-08 13:08:15 +00:00
|
|
|
/// with the new replica. Process all current events in epoll (connections, timeouts),
|
2021-02-08 11:06:45 +00:00
|
|
|
/// if there is no events in epoll and blocking is false, return NOT_READY.
|
|
|
|
/// Returned state might be READY, NOT_READY and CANNOT_CHOOSE.
|
2021-02-06 00:54:27 +00:00
|
|
|
/// If state is READY, replica connection will be written in connection_out.
|
2021-02-21 14:03:24 +00:00
|
|
|
State waitForReadyConnections(bool blocking, Connection *& connection_out);
|
2021-02-06 00:54:27 +00:00
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
State startNewConnection(Connection *& connection_out);
|
2021-02-06 00:54:27 +00:00
|
|
|
|
|
|
|
/// Stop working with all replicas that are not READY.
|
|
|
|
void stopChoosingReplicas();
|
|
|
|
|
2021-02-17 17:34:52 +00:00
|
|
|
bool hasEventsInProcess() const { return !epoll.empty(); }
|
2021-02-06 00:54:27 +00:00
|
|
|
|
|
|
|
int getFileDescriptor() const { return epoll.getFileDescriptor(); }
|
|
|
|
|
|
|
|
const ConnectionTimeouts & getConnectionTimeouts() const { return timeouts; }
|
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
int numberOfProcessingReplicas() const;
|
|
|
|
|
|
|
|
void setSkipPredicate(std::function<bool(Connection *)> pred) { skip_predicate = std::move(pred); }
|
|
|
|
|
2021-02-06 00:54:27 +00:00
|
|
|
~HedgedConnectionsFactory();
|
|
|
|
|
|
|
|
private:
|
2021-02-15 13:21:36 +00:00
|
|
|
/// Try to start establishing connection to the new replica. Return
|
|
|
|
/// the index of the new replica or -1 if cannot start new connection.
|
2021-02-21 14:03:24 +00:00
|
|
|
State startNewConnectionImpl(Connection *& connection_out);
|
2021-02-06 00:54:27 +00:00
|
|
|
|
|
|
|
/// Find an index of the next free replica to start connection.
|
|
|
|
/// Return -1 if there is no free replica.
|
|
|
|
int getNextIndex();
|
|
|
|
|
|
|
|
int getReadyFileDescriptor(bool blocking);
|
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
void processFailedConnection(int index, const std::string & fail_message);
|
|
|
|
|
|
|
|
State resumeConnectionEstablisher(int index, Connection *& connection_out);
|
|
|
|
|
|
|
|
State processFinishedConnection(int index, TryResult result, Connection *& connection_out);
|
2021-02-06 00:54:27 +00:00
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
void removeReplicaFromEpoll(int index, int fd);
|
2021-02-06 00:54:27 +00:00
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
void addNewReplicaToEpoll(int index, int fd);
|
2021-02-06 00:54:27 +00:00
|
|
|
|
2021-02-09 02:13:47 +00:00
|
|
|
/// Return NOT_READY state if there is no ready events, READY if replica is ready
|
2021-02-15 13:21:36 +00:00
|
|
|
/// and CANNOT_CHOOSE if there is no more events in epoll.
|
2021-02-09 02:01:09 +00:00
|
|
|
State processEpollEvents(bool blocking, Connection *& connection_out);
|
2021-02-06 00:54:27 +00:00
|
|
|
|
2021-02-09 02:01:09 +00:00
|
|
|
State setBestUsableReplica(Connection *& connection_out);
|
2021-02-06 00:54:27 +00:00
|
|
|
|
|
|
|
const ConnectionPoolWithFailoverPtr pool;
|
|
|
|
const Settings * settings;
|
|
|
|
const ConnectionTimeouts timeouts;
|
|
|
|
|
|
|
|
std::vector<ShuffledPool> shuffled_pools;
|
2021-02-17 17:34:52 +00:00
|
|
|
std::vector<ReplicaStatus> replicas;
|
2021-02-06 00:54:27 +00:00
|
|
|
|
2021-02-09 02:01:09 +00:00
|
|
|
/// Map socket file descriptor to replica index.
|
|
|
|
std::unordered_map<int, int> fd_to_replica_index;
|
2021-02-06 00:54:27 +00:00
|
|
|
|
2021-02-17 17:34:52 +00:00
|
|
|
/// Map timeout for changing replica to replica index.
|
|
|
|
std::unordered_map<int, int> timeout_fd_to_replica_index;
|
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
std::function<bool(Connection *)> skip_predicate;
|
2021-02-06 00:54:27 +00:00
|
|
|
|
2021-02-17 17:34:52 +00:00
|
|
|
std::shared_ptr<QualifiedTableName> table_to_check;
|
2021-02-06 00:54:27 +00:00
|
|
|
int last_used_index = -1;
|
|
|
|
bool fallback_to_stale_replicas;
|
|
|
|
Epoll epoll;
|
|
|
|
Poco::Logger * log;
|
|
|
|
std::string fail_messages;
|
|
|
|
size_t max_tries;
|
2021-02-21 14:03:24 +00:00
|
|
|
size_t entries_count = 0;
|
|
|
|
size_t usable_count = 0;
|
|
|
|
size_t up_to_date_count = 0;
|
|
|
|
size_t failed_pools_count= 0;
|
|
|
|
size_t replicas_in_process_count = 0;
|
|
|
|
size_t requested_connections_count = 0;
|
|
|
|
size_t ready_replicas_count = 0;
|
2021-02-06 00:54:27 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
}
|
|
|
|
#endif
|