2021-01-19 19:21:06 +00:00
|
|
|
#pragma once
|
2021-01-29 15:46:28 +00:00
|
|
|
#if defined(OS_LINUX)
|
2021-01-19 19:21:06 +00:00
|
|
|
|
|
|
|
#include <Client/GetHedgedConnections.h>
|
|
|
|
#include <Client/IConnections.h>
|
|
|
|
#include <functional>
|
2021-01-27 09:33:11 +00:00
|
|
|
#include <queue>
|
2021-01-19 19:21:06 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
class HedgedConnections : public IConnections
|
|
|
|
{
|
|
|
|
public:
|
|
|
|
using ReplicaStatePtr = GetHedgedConnections::ReplicaStatePtr;
|
|
|
|
using Replicas = GetHedgedConnections::Replicas;
|
|
|
|
|
|
|
|
HedgedConnections(const ConnectionPoolWithFailoverPtr & pool_,
|
|
|
|
const Settings & settings_,
|
|
|
|
const ConnectionTimeouts & timeouts_,
|
|
|
|
const ThrottlerPtr & throttler,
|
2021-01-27 09:33:11 +00:00
|
|
|
PoolMode pool_mode,
|
2021-01-19 19:21:06 +00:00
|
|
|
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr);
|
|
|
|
|
|
|
|
void sendScalarsData(Scalars & data) override;
|
|
|
|
|
|
|
|
void sendExternalTablesData(std::vector<ExternalTablesData> & data) override;
|
|
|
|
|
|
|
|
void sendQuery(
|
|
|
|
const ConnectionTimeouts & timeouts,
|
|
|
|
const String & query,
|
|
|
|
const String & query_id,
|
|
|
|
UInt64 stage,
|
|
|
|
const ClientInfo & client_info,
|
|
|
|
bool with_pending_data) override;
|
|
|
|
|
|
|
|
Packet receivePacket() override;
|
|
|
|
|
2021-01-29 15:46:28 +00:00
|
|
|
Packet receivePacketUnlocked(AsyncCallback async_callback) override;
|
2021-01-19 19:21:06 +00:00
|
|
|
|
|
|
|
void disconnect() override;
|
|
|
|
|
|
|
|
void sendCancel() override;
|
|
|
|
|
|
|
|
Packet drain() override;
|
|
|
|
|
|
|
|
std::string dumpAddresses() const override;
|
|
|
|
|
|
|
|
size_t size() const override;
|
|
|
|
|
2021-01-27 09:33:11 +00:00
|
|
|
bool hasActiveConnections() const override { return !active_connections_count_by_offset.empty(); }
|
2021-01-19 19:21:06 +00:00
|
|
|
|
|
|
|
private:
|
|
|
|
class Pipeline
|
|
|
|
{
|
|
|
|
public:
|
2021-01-27 09:33:11 +00:00
|
|
|
void add(std::function<void(ReplicaStatePtr &)> send_function);
|
2021-01-19 19:21:06 +00:00
|
|
|
|
2021-01-27 09:33:11 +00:00
|
|
|
void run(ReplicaStatePtr & replica);
|
2021-01-19 19:21:06 +00:00
|
|
|
|
|
|
|
bool empty() const { return pipeline.empty(); }
|
|
|
|
|
|
|
|
private:
|
2021-01-27 09:33:11 +00:00
|
|
|
std::vector<std::function<void(ReplicaStatePtr &)>> pipeline;
|
2021-01-19 19:21:06 +00:00
|
|
|
};
|
|
|
|
|
2021-01-27 09:33:11 +00:00
|
|
|
Packet receivePacketFromReplica(ReplicaStatePtr & replica, AsyncCallback async_callback = {});
|
2021-01-19 19:21:06 +00:00
|
|
|
|
|
|
|
Packet receivePacketImpl(AsyncCallback async_callback = {});
|
|
|
|
|
2021-01-27 09:33:11 +00:00
|
|
|
void processReceiveData(ReplicaStatePtr & replica);
|
2021-01-19 19:21:06 +00:00
|
|
|
|
|
|
|
void processTimeoutEvent(ReplicaStatePtr & replica, TimerDescriptorPtr timeout_descriptor);
|
|
|
|
|
2021-01-27 09:33:11 +00:00
|
|
|
void tryGetNewReplica();
|
2021-01-19 19:21:06 +00:00
|
|
|
|
2021-01-27 09:33:11 +00:00
|
|
|
void finishProcessReplica(ReplicaStatePtr & replica, bool disconnect);
|
2021-01-19 19:21:06 +00:00
|
|
|
|
2021-01-27 09:33:11 +00:00
|
|
|
int getReadyFileDescriptor(AsyncCallback async_callback = {});
|
2021-01-19 19:21:06 +00:00
|
|
|
|
|
|
|
GetHedgedConnections get_hedged_connections;
|
2021-01-27 09:33:11 +00:00
|
|
|
std::vector<std::vector<ReplicaStatePtr>> replicas;
|
|
|
|
std::unordered_map<int, ReplicaStatePtr> fd_to_replica;
|
|
|
|
std::unordered_map<int, ReplicaStatePtr> timeout_fd_to_replica;
|
|
|
|
std::queue<int> offsets_queue;
|
2021-01-19 19:21:06 +00:00
|
|
|
Epoll epoll;
|
|
|
|
const Settings & settings;
|
|
|
|
ThrottlerPtr throttler;
|
|
|
|
Poco::Logger * log;
|
2021-01-27 09:33:11 +00:00
|
|
|
Pipeline pipeline_for_new_replicas;
|
2021-01-19 19:21:06 +00:00
|
|
|
bool sent_query = false;
|
|
|
|
bool cancelled = false;
|
2021-01-27 09:33:11 +00:00
|
|
|
std::unordered_map<size_t, size_t> active_connections_count_by_offset;
|
|
|
|
bool next_replica_in_process = false;
|
|
|
|
bool has_two_level_aggregation_incompatibility = false;
|
|
|
|
std::unordered_set<size_t> offsets_with_received_data;
|
2021-01-19 19:21:06 +00:00
|
|
|
|
|
|
|
mutable std::mutex cancel_mutex;
|
|
|
|
};
|
|
|
|
|
|
|
|
}
|
2021-01-29 15:46:28 +00:00
|
|
|
#endif
|