ClickHouse/src/Client/MultiplexedConnections.h

110 lines
3.4 KiB
C++
Raw Normal View History

#pragma once
#include <mutex>
#include <Common/Throttler.h>
#include <Client/Connection.h>
#include <Client/ConnectionPoolWithFailover.h>
#include <IO/ConnectionTimeouts.h>
2021-01-19 19:21:06 +00:00
#include <Client/IConnections.h>
namespace DB
{
/** To retrieve data directly from multiple replicas (connections) from one shard
2017-03-09 00:56:38 +00:00
* within a single thread. As a degenerate case, it can also work with one connection.
* It is assumed that all functions except sendCancel are always executed in one thread.
*
2017-03-09 00:56:38 +00:00
* The interface is almost the same as Connection.
*/
2021-01-19 19:21:06 +00:00
class MultiplexedConnections final : public IConnections
{
public:
/// Accepts ready connection.
MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler_);
2018-11-28 14:33:40 +00:00
/// Accepts a vector of connections to replicas of one shard already taken from pool.
MultiplexedConnections(
2018-11-28 14:33:40 +00:00
std::vector<IConnectionPool::Entry> && connections,
const Settings & settings_, const ThrottlerPtr & throttler_);
2021-01-19 19:21:06 +00:00
void sendScalarsData(Scalars & data) override;
void sendExternalTablesData(std::vector<ExternalTablesData> & data) override;
void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
2020-05-17 05:45:20 +00:00
const String & query_id,
UInt64 stage,
const ClientInfo & client_info,
2021-01-19 19:21:06 +00:00
bool with_pending_data) override;
2021-04-13 10:59:02 +00:00
void sendReadTaskResponse(const String &) override;
2021-01-19 19:21:06 +00:00
Packet receivePacket() override;
2021-01-19 19:21:06 +00:00
void disconnect() override;
2021-01-19 19:21:06 +00:00
void sendCancel() override;
/// Send parts' uuids to replicas to exclude them from query processing
void sendIgnoredPartUUIDs(const std::vector<UUID> & uuids) override;
2021-01-19 19:21:06 +00:00
Packet drain() override;
2021-01-19 19:21:06 +00:00
std::string dumpAddresses() const override;
/// Without locking, because sendCancel() does not change this number.
2021-01-19 19:21:06 +00:00
size_t size() const override { return replica_states.size(); }
/// Without locking, because sendCancel() does not change the state of the replicas.
2021-01-19 19:21:06 +00:00
bool hasActiveConnections() const override { return active_connection_count > 0; }
private:
2021-07-14 13:17:30 +00:00
Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) override;
/// Internal version of `dumpAddresses` function without locking.
std::string dumpAddressesUnlocked() const;
/// Description of a single replica.
struct ReplicaState
{
Connection * connection = nullptr;
ConnectionPool::Entry pool_entry;
};
/// Get a replica where you can read the data.
2021-07-14 13:17:30 +00:00
ReplicaState & getReplicaForReading(bool is_draining);
/// Mark the replica as invalid.
void invalidateReplica(ReplicaState & replica_state);
private:
const Settings & settings;
2015-11-06 17:44:01 +00:00
2021-07-14 13:17:30 +00:00
/// The following two fields are from settings but can be referenced outside the lifetime of
/// settings when connection is drained asynchronously.
Poco::Timespan drain_timeout;
Poco::Timespan receive_timeout;
/// The current number of valid connections to the replicas of this shard.
size_t active_connection_count = 0;
std::vector<ReplicaState> replica_states;
std::unordered_map<int, size_t> fd_to_replica_state_idx;
/// Connection that received last block.
Connection * current_connection = nullptr;
2015-10-12 14:53:16 +00:00
bool sent_query = false;
bool cancelled = false;
/// A mutex for the sendCancel function to execute safely
/// in separate thread.
mutable std::mutex cancel_mutex;
2020-12-04 13:35:24 +00:00
friend struct RemoteQueryExecutorRoutine;
};
}