ClickHouse/src/Client/HedgedConnections.h

201 lines
7.0 KiB
C++
Raw Normal View History

2021-01-19 19:21:06 +00:00
#pragma once
2021-01-29 15:46:28 +00:00
#if defined(OS_LINUX)
2021-01-19 19:21:06 +00:00
#include <functional>
#include <queue>
2021-02-26 15:53:40 +00:00
#include <optional>
2021-02-06 00:54:27 +00:00
#include <Client/HedgedConnectionsFactory.h>
#include <Client/IConnections.h>
2021-02-17 17:34:52 +00:00
#include <Client/PacketReceiver.h>
#include <Common/FiberStack.h>
#include <Common/Fiber.h>
2021-01-19 19:21:06 +00:00
namespace DB
{
2021-04-12 17:07:01 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/** To receive data from multiple replicas (connections) from one shard asynchronously.
2021-02-06 00:54:27 +00:00
* The principe of Hedged Connections is used to reduce tail latency:
* if we don't receive data from replica and there is no progress in query execution
* for a long time, we try to get new replica and send query to it,
* without cancelling working with previous replica. This class
2021-02-06 00:54:27 +00:00
* supports all functionality that MultipleConnections has.
*/
2021-01-19 19:21:06 +00:00
class HedgedConnections : public IConnections
{
public:
2021-02-21 14:03:24 +00:00
using PacketReceiverPtr = std::unique_ptr<PacketReceiver>;
2021-02-06 00:54:27 +00:00
struct ReplicaState
{
2021-02-21 14:03:24 +00:00
explicit ReplicaState(Connection * connection_) : connection(connection_), packet_receiver(std::make_unique<PacketReceiver>(connection_))
2021-02-15 13:21:36 +00:00
{
}
2021-02-06 00:54:27 +00:00
Connection * connection = nullptr;
2021-02-21 14:03:24 +00:00
PacketReceiverPtr packet_receiver;
2021-02-15 13:21:36 +00:00
TimerDescriptor change_replica_timeout;
2021-03-02 12:40:24 +00:00
bool is_change_replica_timeout_expired = false;
2021-02-06 00:54:27 +00:00
};
2021-02-15 13:21:36 +00:00
struct OffsetState
2021-02-09 02:01:09 +00:00
{
2021-02-15 13:21:36 +00:00
/// Replicas with the same offset.
std::vector<ReplicaState> replicas;
/// An amount of active replicas. When can_change_replica is false,
/// active_connection_count is always <= 1 (because we stopped working with
/// other replicas with the same offset)
2021-02-15 13:21:36 +00:00
size_t active_connection_count = 0;
bool can_change_replica = true;
2021-02-17 17:34:52 +00:00
/// This flag is true when this offset is in queue for
/// new replicas. It's needed to process receive timeout
/// (throw an exception when receive timeout expired and there is no
/// new replica in process)
bool next_replica_in_process = false;
2021-02-09 02:01:09 +00:00
};
2021-02-06 00:54:27 +00:00
2021-02-15 13:21:36 +00:00
/// We process events in epoll, so we need to determine replica by it's
/// file descriptor. We store map fd -> replica location. To determine
/// where replica is, we need a replica offset
/// (the same as parallel_replica_offset), and index, which is needed because
/// we can have many replicas with same offset (when receive_data_timeout has expired).
struct ReplicaLocation
2021-02-06 00:54:27 +00:00
{
2021-02-15 13:21:36 +00:00
size_t offset;
size_t index;
2021-02-06 00:54:27 +00:00
};
2021-01-19 19:21:06 +00:00
HedgedConnections(const ConnectionPoolWithFailoverPtr & pool_,
const Settings & settings_,
const ConnectionTimeouts & timeouts_,
const ThrottlerPtr & throttler,
PoolMode pool_mode,
2021-01-19 19:21:06 +00:00
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr);
void sendScalarsData(Scalars & data) override;
void sendExternalTablesData(std::vector<ExternalTablesData> & data) override;
void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id,
UInt64 stage,
const ClientInfo & client_info,
bool with_pending_data) override;
2021-04-10 02:21:18 +00:00
void sendReadTaskResponse(const std::optional<String> &) override
2021-04-08 19:00:39 +00:00
{
throw Exception("sendReadTaskResponse in not supported with HedgedConnections", ErrorCodes::LOGICAL_ERROR);
}
2021-01-19 19:21:06 +00:00
Packet receivePacket() override;
2021-01-29 15:46:28 +00:00
Packet receivePacketUnlocked(AsyncCallback async_callback) override;
2021-01-19 19:21:06 +00:00
void disconnect() override;
void sendCancel() override;
void sendIgnoredPartUUIDs(const std::vector<UUID> & uuids) override;
2021-01-19 19:21:06 +00:00
Packet drain() override;
std::string dumpAddresses() const override;
2021-02-06 00:54:27 +00:00
size_t size() const override { return offset_states.size(); }
2021-01-19 19:21:06 +00:00
2021-02-06 00:54:27 +00:00
bool hasActiveConnections() const override { return active_connection_count > 0; }
2021-01-19 19:21:06 +00:00
private:
/// If we don't receive data from replica and there is no progress in query
/// execution for receive_data_timeout, we are trying to get new
/// replica and send query to it. Beside sending query, there are some
2021-02-15 13:21:36 +00:00
/// additional actions like sendScalarsData or sendExternalTablesData and we need
/// to perform these actions in the same order on the new replica. So, we will
/// save actions with replicas in pipeline to perform them on the new replicas.
2021-01-19 19:21:06 +00:00
class Pipeline
{
public:
2021-02-09 02:01:09 +00:00
void add(std::function<void(ReplicaState &)> send_function);
2021-01-19 19:21:06 +00:00
2021-02-09 02:01:09 +00:00
void run(ReplicaState & replica);
2021-01-19 19:21:06 +00:00
private:
2021-02-09 02:01:09 +00:00
std::vector<std::function<void(ReplicaState &)>> pipeline;
2021-01-19 19:21:06 +00:00
};
2021-02-17 17:34:52 +00:00
Packet receivePacketFromReplica(const ReplicaLocation & replica_location);
2021-01-19 19:21:06 +00:00
2021-02-15 13:21:36 +00:00
ReplicaLocation getReadyReplicaLocation(AsyncCallback async_callback = {});
2021-01-19 19:21:06 +00:00
2021-02-26 15:53:40 +00:00
bool resumePacketReceiver(const ReplicaLocation & replica_location);
void disableChangingReplica(const ReplicaLocation & replica_location);
2021-01-19 19:21:06 +00:00
2021-02-21 14:03:24 +00:00
void startNewReplica();
void checkNewReplica();
void processNewReplicaState(HedgedConnectionsFactory::State state, Connection * connection);
2021-01-19 19:21:06 +00:00
2021-02-09 02:01:09 +00:00
void finishProcessReplica(ReplicaState & replica, bool disconnect);
2021-01-19 19:21:06 +00:00
int getReadyFileDescriptor(AsyncCallback async_callback = {});
2021-01-19 19:21:06 +00:00
2021-02-06 00:54:27 +00:00
HedgedConnectionsFactory hedged_connections_factory;
2021-02-02 12:14:31 +00:00
2021-02-06 00:54:27 +00:00
/// All replicas in offset_states[offset] is responsible for process query
2021-02-02 12:14:31 +00:00
/// with setting parallel_replica_offset = offset. In common situations
2021-02-06 00:54:27 +00:00
/// replica_states[offset].replicas.size() = 1 (like in MultiplexedConnections).
std::vector<OffsetState> offset_states;
2021-02-02 12:14:31 +00:00
2021-02-09 02:01:09 +00:00
/// Map socket file descriptor to replica location (it's offset and index in OffsetState.replicas).
std::unordered_map<int, ReplicaLocation> fd_to_replica_location;
2021-02-02 12:14:31 +00:00
2021-02-17 17:34:52 +00:00
/// Map receive data timeout file descriptor to replica location.
std::unordered_map<int, ReplicaLocation> timeout_fd_to_replica_location;
2021-02-02 12:14:31 +00:00
/// A queue of offsets for new replicas. When we get RECEIVE_DATA_TIMEOUT from
/// the replica, we push it's offset to this queue and start trying to get
/// new replica.
std::queue<int> offsets_queue;
2021-02-02 12:14:31 +00:00
2021-02-06 00:54:27 +00:00
/// The current number of valid connections to the replicas of this shard.
size_t active_connection_count;
2021-02-02 12:14:31 +00:00
/// We count offsets in which we can't change replica anymore,
/// it's needed to cancel choosing new replicas when we
/// disabled replica changing in all offsets.
size_t offsets_with_disabled_changing_replica;
2021-02-02 12:14:31 +00:00
Pipeline pipeline_for_new_replicas;
/// New replica may not support two-level aggregation due to version incompatibility.
/// If we didn't disabled it, we need to skip this replica.
bool disable_two_level_aggregation = false;
2021-02-26 15:53:40 +00:00
/// We will save replica with last received packet
/// (except cases when packet type is EndOfStream or Exception)
/// to resume it's packet receiver when new packet is needed.
std::optional<ReplicaLocation> replica_with_last_received_packet;
2021-02-21 14:03:24 +00:00
Packet last_received_packet;
2021-02-02 12:14:31 +00:00
2021-01-19 19:21:06 +00:00
Epoll epoll;
const Settings & settings;
ThrottlerPtr throttler;
bool sent_query = false;
bool cancelled = false;
mutable std::mutex cancel_mutex;
};
}
2021-01-29 15:46:28 +00:00
#endif