2021-01-29 15:46:28 +00:00
|
|
|
#if defined(OS_LINUX)
|
|
|
|
|
2021-01-19 19:21:06 +00:00
|
|
|
#include <Client/HedgedConnections.h>
|
|
|
|
#include <Interpreters/ClientInfo.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int MISMATCH_REPLICAS_DATA_SOURCES;
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
extern const int SOCKET_TIMEOUT;
|
2021-02-06 00:54:27 +00:00
|
|
|
extern const int ALL_CONNECTION_TRIES_FAILED;
|
2021-01-19 19:21:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
HedgedConnections::HedgedConnections(
|
|
|
|
const ConnectionPoolWithFailoverPtr & pool_,
|
|
|
|
const Settings & settings_,
|
|
|
|
const ConnectionTimeouts & timeouts_,
|
|
|
|
const ThrottlerPtr & throttler_,
|
2021-01-27 09:33:11 +00:00
|
|
|
PoolMode pool_mode,
|
2021-01-19 19:21:06 +00:00
|
|
|
std::shared_ptr<QualifiedTableName> table_to_check_)
|
2021-02-06 00:54:27 +00:00
|
|
|
: hedged_connections_factory(pool_, &settings_, timeouts_, table_to_check_)
|
|
|
|
, settings(settings_)
|
|
|
|
, throttler(throttler_)
|
2021-01-19 19:21:06 +00:00
|
|
|
{
|
2021-02-06 00:54:27 +00:00
|
|
|
std::vector<Connection *> connections = hedged_connections_factory.getManyConnections(pool_mode);
|
2021-01-19 19:21:06 +00:00
|
|
|
|
2021-02-15 13:21:36 +00:00
|
|
|
if (connections.empty())
|
|
|
|
return;
|
|
|
|
|
2021-02-17 17:34:52 +00:00
|
|
|
offset_states.reserve(connections.size());
|
2021-02-06 00:54:27 +00:00
|
|
|
for (size_t i = 0; i != connections.size(); ++i)
|
2021-01-19 19:21:06 +00:00
|
|
|
{
|
2021-02-15 13:21:36 +00:00
|
|
|
offset_states.emplace_back();
|
2021-02-17 17:34:52 +00:00
|
|
|
offset_states[i].replicas.emplace_back(connections[i]);
|
2021-02-15 13:21:36 +00:00
|
|
|
offset_states[i].active_connection_count = 1;
|
2021-02-17 17:34:52 +00:00
|
|
|
|
|
|
|
ReplicaState & replica = offset_states[i].replicas.back();
|
|
|
|
replica.connection->setThrottler(throttler_);
|
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
epoll.add(replica.packet_receiver->getFileDescriptor());
|
|
|
|
fd_to_replica_location[replica.packet_receiver->getFileDescriptor()] = ReplicaLocation{i, 0};
|
2021-02-17 17:34:52 +00:00
|
|
|
|
|
|
|
epoll.add(replica.change_replica_timeout.getDescriptor());
|
|
|
|
timeout_fd_to_replica_location[replica.change_replica_timeout.getDescriptor()] = ReplicaLocation{i, 0};
|
2021-01-19 19:21:06 +00:00
|
|
|
}
|
2021-01-27 09:33:11 +00:00
|
|
|
|
2021-02-06 00:54:27 +00:00
|
|
|
active_connection_count = connections.size();
|
|
|
|
offsets_with_received_first_data_packet = 0;
|
2021-02-09 02:01:09 +00:00
|
|
|
pipeline_for_new_replicas.add([throttler_](ReplicaState & replica_) { replica_.connection->setThrottler(throttler_); });
|
2021-01-19 19:21:06 +00:00
|
|
|
}
|
|
|
|
|
2021-02-09 02:01:09 +00:00
|
|
|
void HedgedConnections::Pipeline::add(std::function<void(ReplicaState & replica)> send_function)
|
2021-01-19 19:21:06 +00:00
|
|
|
{
|
|
|
|
pipeline.push_back(send_function);
|
|
|
|
}
|
|
|
|
|
2021-02-09 02:01:09 +00:00
|
|
|
void HedgedConnections::Pipeline::run(ReplicaState & replica)
|
2021-01-19 19:21:06 +00:00
|
|
|
{
|
|
|
|
for (auto & send_func : pipeline)
|
|
|
|
send_func(replica);
|
|
|
|
}
|
|
|
|
|
|
|
|
void HedgedConnections::sendScalarsData(Scalars & data)
|
|
|
|
{
|
|
|
|
std::lock_guard lock(cancel_mutex);
|
|
|
|
|
|
|
|
if (!sent_query)
|
|
|
|
throw Exception("Cannot send scalars data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2021-02-09 02:01:09 +00:00
|
|
|
auto send_scalars_data = [&data](ReplicaState & replica) { replica.connection->sendScalarsData(data); };
|
2021-01-19 19:21:06 +00:00
|
|
|
|
2021-02-06 00:54:27 +00:00
|
|
|
for (auto & offset_state : offset_states)
|
|
|
|
for (auto & replica : offset_state.replicas)
|
2021-02-09 02:01:09 +00:00
|
|
|
if (replica.connection)
|
2021-01-27 09:33:11 +00:00
|
|
|
send_scalars_data(replica);
|
2021-01-19 19:21:06 +00:00
|
|
|
|
2021-01-27 09:33:11 +00:00
|
|
|
pipeline_for_new_replicas.add(send_scalars_data);
|
2021-01-19 19:21:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void HedgedConnections::sendExternalTablesData(std::vector<ExternalTablesData> & data)
|
|
|
|
{
|
|
|
|
std::lock_guard lock(cancel_mutex);
|
|
|
|
|
|
|
|
if (!sent_query)
|
|
|
|
throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
if (data.size() != size())
|
|
|
|
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
|
|
|
|
|
2021-02-09 02:01:09 +00:00
|
|
|
auto send_external_tables_data = [&data](ReplicaState & replica) { replica.connection->sendExternalTablesData(data[0]); };
|
2021-01-19 19:21:06 +00:00
|
|
|
|
2021-02-06 00:54:27 +00:00
|
|
|
for (auto & offset_state : offset_states)
|
|
|
|
for (auto & replica : offset_state.replicas)
|
2021-02-09 02:01:09 +00:00
|
|
|
if (replica.connection)
|
2021-01-27 09:33:11 +00:00
|
|
|
send_external_tables_data(replica);
|
2021-01-19 19:21:06 +00:00
|
|
|
|
2021-01-27 09:33:11 +00:00
|
|
|
pipeline_for_new_replicas.add(send_external_tables_data);
|
2021-01-19 19:21:06 +00:00
|
|
|
}
|
|
|
|
|
2021-02-06 15:23:41 +00:00
|
|
|
void HedgedConnections::sendIgnoredPartUUIDs(const std::vector<UUID> & uuids)
|
|
|
|
{
|
|
|
|
std::lock_guard lock(cancel_mutex);
|
|
|
|
|
|
|
|
if (sent_query)
|
|
|
|
throw Exception("Cannot send uuids after query is sent.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2021-02-09 02:01:09 +00:00
|
|
|
auto send_ignored_part_uuids = [&uuids](ReplicaState & replica) { replica.connection->sendIgnoredPartUUIDs(uuids); };
|
2021-02-06 15:23:41 +00:00
|
|
|
|
|
|
|
for (auto & offset_state : offset_states)
|
|
|
|
for (auto & replica : offset_state.replicas)
|
2021-02-09 02:01:09 +00:00
|
|
|
if (replica.connection)
|
2021-02-06 15:23:41 +00:00
|
|
|
send_ignored_part_uuids(replica);
|
|
|
|
|
|
|
|
pipeline_for_new_replicas.add(send_ignored_part_uuids);
|
|
|
|
}
|
|
|
|
|
2021-01-19 19:21:06 +00:00
|
|
|
void HedgedConnections::sendQuery(
|
|
|
|
const ConnectionTimeouts & timeouts,
|
|
|
|
const String & query,
|
|
|
|
const String & query_id,
|
|
|
|
UInt64 stage,
|
|
|
|
const ClientInfo & client_info,
|
|
|
|
bool with_pending_data)
|
|
|
|
{
|
|
|
|
std::lock_guard lock(cancel_mutex);
|
|
|
|
|
|
|
|
if (sent_query)
|
|
|
|
throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2021-02-06 00:54:27 +00:00
|
|
|
for (auto & offset_state : offset_states)
|
2021-01-19 19:21:06 +00:00
|
|
|
{
|
2021-02-06 00:54:27 +00:00
|
|
|
for (auto & replica : offset_state.replicas)
|
2021-01-19 19:21:06 +00:00
|
|
|
{
|
2021-02-09 02:01:09 +00:00
|
|
|
if (replica.connection->getServerRevision(timeouts) < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
|
2021-01-27 09:33:11 +00:00
|
|
|
{
|
2021-02-02 12:14:31 +00:00
|
|
|
disable_two_level_aggregation = true;
|
2021-01-27 09:33:11 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2021-02-02 12:14:31 +00:00
|
|
|
if (disable_two_level_aggregation)
|
2021-01-27 09:33:11 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
if (!disable_two_level_aggregation)
|
|
|
|
{
|
|
|
|
/// Tell hedged_connections_factory to skip replicas that doesn't support two-level aggregation.
|
2021-02-26 15:53:40 +00:00
|
|
|
hedged_connections_factory.skipReplicasWithTwoLevelAggregationIncompatibility();
|
2021-02-21 14:03:24 +00:00
|
|
|
}
|
|
|
|
|
2021-02-09 02:01:09 +00:00
|
|
|
auto send_query = [this, timeouts, query, query_id, stage, client_info, with_pending_data](ReplicaState & replica)
|
2021-02-08 14:02:11 +00:00
|
|
|
{
|
2021-02-06 00:54:27 +00:00
|
|
|
Settings modified_settings = settings;
|
2021-01-27 09:33:11 +00:00
|
|
|
|
2021-02-06 00:54:27 +00:00
|
|
|
if (disable_two_level_aggregation)
|
2021-01-27 09:33:11 +00:00
|
|
|
{
|
|
|
|
/// Disable two-level aggregation due to version incompatibility.
|
2021-01-19 19:21:06 +00:00
|
|
|
modified_settings.group_by_two_level_threshold = 0;
|
|
|
|
modified_settings.group_by_two_level_threshold_bytes = 0;
|
|
|
|
}
|
|
|
|
|
2021-02-06 00:54:27 +00:00
|
|
|
if (offset_states.size() > 1)
|
2021-01-27 09:33:11 +00:00
|
|
|
{
|
2021-02-06 00:54:27 +00:00
|
|
|
modified_settings.parallel_replicas_count = offset_states.size();
|
2021-02-21 14:03:24 +00:00
|
|
|
modified_settings.parallel_replica_offset = fd_to_replica_location[replica.packet_receiver->getFileDescriptor()].offset;
|
2021-01-27 09:33:11 +00:00
|
|
|
}
|
|
|
|
|
2021-02-09 02:01:09 +00:00
|
|
|
replica.connection->sendQuery(timeouts, query, query_id, stage, &modified_settings, &client_info, with_pending_data);
|
2021-02-15 13:21:36 +00:00
|
|
|
replica.change_replica_timeout.setRelative(timeouts.receive_data_timeout);
|
2021-01-19 19:21:06 +00:00
|
|
|
};
|
|
|
|
|
2021-02-06 00:54:27 +00:00
|
|
|
for (auto & offset_status : offset_states)
|
|
|
|
for (auto & replica : offset_status.replicas)
|
2021-01-27 09:33:11 +00:00
|
|
|
send_query(replica);
|
2021-01-19 19:21:06 +00:00
|
|
|
|
2021-01-27 09:33:11 +00:00
|
|
|
pipeline_for_new_replicas.add(send_query);
|
2021-01-19 19:21:06 +00:00
|
|
|
sent_query = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
void HedgedConnections::disconnect()
|
|
|
|
{
|
|
|
|
std::lock_guard lock(cancel_mutex);
|
|
|
|
|
2021-02-06 00:54:27 +00:00
|
|
|
for (auto & offset_status : offset_states)
|
|
|
|
for (auto & replica : offset_status.replicas)
|
2021-02-09 02:01:09 +00:00
|
|
|
if (replica.connection)
|
2021-01-27 09:33:11 +00:00
|
|
|
finishProcessReplica(replica, true);
|
|
|
|
|
2021-02-06 00:54:27 +00:00
|
|
|
if (hedged_connections_factory.hasEventsInProcess())
|
2021-01-19 19:21:06 +00:00
|
|
|
{
|
2021-02-21 14:03:24 +00:00
|
|
|
if (hedged_connections_factory.numberOfProcessingReplicas() > 0)
|
2021-02-06 00:54:27 +00:00
|
|
|
epoll.remove(hedged_connections_factory.getFileDescriptor());
|
|
|
|
|
|
|
|
hedged_connections_factory.stopChoosingReplicas();
|
2021-01-19 19:21:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
std::string HedgedConnections::dumpAddresses() const
|
|
|
|
{
|
|
|
|
std::lock_guard lock(cancel_mutex);
|
|
|
|
|
2021-01-29 15:46:28 +00:00
|
|
|
std::string addresses;
|
2021-01-27 09:33:11 +00:00
|
|
|
bool is_first = true;
|
2021-01-19 19:21:06 +00:00
|
|
|
|
2021-02-06 00:54:27 +00:00
|
|
|
for (const auto & offset_state : offset_states)
|
2021-01-27 09:33:11 +00:00
|
|
|
{
|
2021-02-06 00:54:27 +00:00
|
|
|
for (const auto & replica : offset_state.replicas)
|
2021-01-27 09:33:11 +00:00
|
|
|
{
|
2021-02-09 02:01:09 +00:00
|
|
|
if (replica.connection)
|
2021-01-27 09:33:11 +00:00
|
|
|
{
|
2021-02-09 02:01:09 +00:00
|
|
|
addresses += (is_first ? "" : "; ") + replica.connection->getDescription();
|
2021-01-27 09:33:11 +00:00
|
|
|
is_first = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-01-19 19:21:06 +00:00
|
|
|
|
|
|
|
return addresses;
|
|
|
|
}
|
|
|
|
|
|
|
|
void HedgedConnections::sendCancel()
|
|
|
|
{
|
|
|
|
std::lock_guard lock(cancel_mutex);
|
|
|
|
|
|
|
|
if (!sent_query || cancelled)
|
|
|
|
throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2021-02-06 00:54:27 +00:00
|
|
|
for (auto & offset_status : offset_states)
|
|
|
|
for (auto & replica : offset_status.replicas)
|
2021-02-09 02:01:09 +00:00
|
|
|
if (replica.connection)
|
|
|
|
replica.connection->sendCancel();
|
2021-01-19 19:21:06 +00:00
|
|
|
|
|
|
|
cancelled = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
Packet HedgedConnections::drain()
|
|
|
|
{
|
|
|
|
std::lock_guard lock(cancel_mutex);
|
|
|
|
|
|
|
|
if (!cancelled)
|
|
|
|
throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
Packet res;
|
|
|
|
res.type = Protocol::Server::EndOfStream;
|
|
|
|
|
2021-01-29 15:46:28 +00:00
|
|
|
while (!epoll.empty())
|
2021-01-19 19:21:06 +00:00
|
|
|
{
|
2021-02-15 13:21:36 +00:00
|
|
|
ReplicaLocation location = getReadyReplicaLocation();
|
|
|
|
Packet packet = receivePacketFromReplica(location);
|
2021-01-19 19:21:06 +00:00
|
|
|
switch (packet.type)
|
|
|
|
{
|
2021-02-06 15:23:41 +00:00
|
|
|
case Protocol::Server::PartUUIDs:
|
2021-01-19 19:21:06 +00:00
|
|
|
case Protocol::Server::Data:
|
|
|
|
case Protocol::Server::Progress:
|
|
|
|
case Protocol::Server::ProfileInfo:
|
|
|
|
case Protocol::Server::Totals:
|
|
|
|
case Protocol::Server::Extremes:
|
|
|
|
case Protocol::Server::EndOfStream:
|
|
|
|
break;
|
|
|
|
|
|
|
|
case Protocol::Server::Exception:
|
|
|
|
default:
|
|
|
|
/// If we receive an exception or an unknown packet, we save it.
|
|
|
|
res = std::move(packet);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
Packet HedgedConnections::receivePacket()
|
|
|
|
{
|
|
|
|
std::lock_guard lock(cancel_mutex);
|
2021-01-29 15:46:28 +00:00
|
|
|
return receivePacketUnlocked({});
|
2021-01-19 19:21:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Packet HedgedConnections::receivePacketUnlocked(AsyncCallback async_callback)
|
|
|
|
{
|
|
|
|
if (!sent_query)
|
|
|
|
throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
if (!hasActiveConnections())
|
|
|
|
throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2021-01-29 15:46:28 +00:00
|
|
|
if (epoll.empty())
|
2021-01-19 19:21:06 +00:00
|
|
|
throw Exception("No pending events in epoll.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2021-02-17 17:34:52 +00:00
|
|
|
ReplicaLocation location = getReadyReplicaLocation(std::move(async_callback));
|
|
|
|
return receivePacketFromReplica(location);
|
2021-01-19 19:21:06 +00:00
|
|
|
}
|
|
|
|
|
2021-02-15 13:21:36 +00:00
|
|
|
HedgedConnections::ReplicaLocation HedgedConnections::getReadyReplicaLocation(AsyncCallback async_callback)
|
2021-01-19 19:21:06 +00:00
|
|
|
{
|
2021-02-26 15:53:40 +00:00
|
|
|
/// Firstly, resume replica with the last received packet if needed.
|
|
|
|
if (replica_with_last_received_packet)
|
|
|
|
{
|
|
|
|
ReplicaLocation location = replica_with_last_received_packet.value();
|
|
|
|
replica_with_last_received_packet.reset();
|
2021-02-27 08:40:03 +00:00
|
|
|
if (offset_states[location.offset].replicas[location.index].connection->hasReadPendingData() && resumePacketReceiver(location))
|
2021-02-26 15:53:40 +00:00
|
|
|
return location;
|
|
|
|
}
|
|
|
|
|
2021-01-19 19:21:06 +00:00
|
|
|
int event_fd;
|
2021-02-15 13:21:36 +00:00
|
|
|
while (true)
|
2021-01-19 19:21:06 +00:00
|
|
|
{
|
2021-02-15 13:21:36 +00:00
|
|
|
/// Get ready file descriptor from epoll and process it.
|
2021-01-27 09:33:11 +00:00
|
|
|
event_fd = getReadyFileDescriptor(async_callback);
|
2021-01-19 19:21:06 +00:00
|
|
|
|
2021-02-15 13:21:36 +00:00
|
|
|
if (event_fd == hedged_connections_factory.getFileDescriptor())
|
2021-02-21 14:03:24 +00:00
|
|
|
checkNewReplica();
|
2021-02-17 17:34:52 +00:00
|
|
|
else if (fd_to_replica_location.contains(event_fd))
|
2021-01-27 09:33:11 +00:00
|
|
|
{
|
2021-02-21 14:03:24 +00:00
|
|
|
ReplicaLocation location = fd_to_replica_location[event_fd];
|
2021-02-26 15:53:40 +00:00
|
|
|
if (resumePacketReceiver(location))
|
2021-02-17 17:34:52 +00:00
|
|
|
return location;
|
|
|
|
}
|
|
|
|
else if (timeout_fd_to_replica_location.contains(event_fd))
|
2021-02-15 13:21:36 +00:00
|
|
|
{
|
2021-02-21 14:03:24 +00:00
|
|
|
ReplicaLocation location = timeout_fd_to_replica_location[event_fd];
|
2021-02-17 17:34:52 +00:00
|
|
|
offset_states[location.offset].replicas[location.index].change_replica_timeout.reset();
|
|
|
|
offset_states[location.offset].next_replica_in_process = true;
|
2021-02-15 13:21:36 +00:00
|
|
|
offsets_queue.push(location.offset);
|
2021-02-21 14:03:24 +00:00
|
|
|
startNewReplica();
|
2021-02-15 13:21:36 +00:00
|
|
|
}
|
2021-02-17 17:34:52 +00:00
|
|
|
else
|
|
|
|
throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR);
|
2021-02-15 13:21:36 +00:00
|
|
|
}
|
2021-01-19 19:21:06 +00:00
|
|
|
};
|
|
|
|
|
2021-02-26 15:53:40 +00:00
|
|
|
bool HedgedConnections::resumePacketReceiver(const HedgedConnections::ReplicaLocation & location)
|
|
|
|
{
|
|
|
|
ReplicaState & replica_state = offset_states[location.offset].replicas[location.index];
|
|
|
|
auto res = replica_state.packet_receiver->resume();
|
|
|
|
|
|
|
|
if (std::holds_alternative<Packet>(res))
|
|
|
|
{
|
|
|
|
last_received_packet = std::move(std::get<Packet>(res));
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
else if (std::holds_alternative<Poco::Timespan>(res))
|
|
|
|
{
|
|
|
|
finishProcessReplica(replica_state, true);
|
|
|
|
|
|
|
|
/// Check if there is no more active connections with the same offset and there is no new replica in process.
|
|
|
|
if (offset_states[location.offset].active_connection_count == 0 && !offset_states[location.offset].next_replica_in_process)
|
|
|
|
throw NetException("Receive timeout expired", ErrorCodes::SOCKET_TIMEOUT);
|
|
|
|
}
|
|
|
|
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2021-01-27 09:33:11 +00:00
|
|
|
int HedgedConnections::getReadyFileDescriptor(AsyncCallback async_callback)
|
|
|
|
{
|
2021-02-06 14:23:48 +00:00
|
|
|
epoll_event event;
|
|
|
|
event.data.fd = -1;
|
2021-02-21 14:03:24 +00:00
|
|
|
size_t events_count = 0;
|
|
|
|
while (events_count == 0)
|
2021-02-15 13:21:36 +00:00
|
|
|
{
|
2021-02-21 14:03:24 +00:00
|
|
|
events_count = epoll.getManyReady(1, &event, false);
|
|
|
|
if (!events_count && async_callback)
|
|
|
|
async_callback(epoll.getFileDescriptor(), 0, epoll.getDescription());
|
2021-02-15 13:21:36 +00:00
|
|
|
}
|
2021-02-21 14:03:24 +00:00
|
|
|
return event.data.fd;
|
2021-02-15 13:21:36 +00:00
|
|
|
}
|
|
|
|
|
2021-02-17 17:34:52 +00:00
|
|
|
Packet HedgedConnections::receivePacketFromReplica(const ReplicaLocation & replica_location)
|
2021-01-19 19:21:06 +00:00
|
|
|
{
|
2021-02-09 02:01:09 +00:00
|
|
|
ReplicaState & replica = offset_states[replica_location.offset].replicas[replica_location.index];
|
2021-02-21 14:03:24 +00:00
|
|
|
Packet packet = std::move(last_received_packet);
|
2021-01-19 19:21:06 +00:00
|
|
|
switch (packet.type)
|
|
|
|
{
|
|
|
|
case Protocol::Server::Data:
|
2021-02-09 02:01:09 +00:00
|
|
|
if (!offset_states[replica_location.offset].first_packet_of_data_received)
|
|
|
|
processReceivedFirstDataPacket(replica_location);
|
2021-02-26 15:53:40 +00:00
|
|
|
replica_with_last_received_packet = replica_location;
|
2021-01-19 19:21:06 +00:00
|
|
|
break;
|
2021-02-06 15:23:41 +00:00
|
|
|
case Protocol::Server::PartUUIDs:
|
2021-01-19 19:21:06 +00:00
|
|
|
case Protocol::Server::Progress:
|
|
|
|
case Protocol::Server::ProfileInfo:
|
|
|
|
case Protocol::Server::Totals:
|
|
|
|
case Protocol::Server::Extremes:
|
|
|
|
case Protocol::Server::Log:
|
2021-02-26 15:53:40 +00:00
|
|
|
replica_with_last_received_packet = replica_location;
|
2021-01-19 19:21:06 +00:00
|
|
|
break;
|
|
|
|
|
|
|
|
case Protocol::Server::EndOfStream:
|
|
|
|
finishProcessReplica(replica, false);
|
|
|
|
break;
|
|
|
|
|
|
|
|
case Protocol::Server::Exception:
|
|
|
|
default:
|
|
|
|
finishProcessReplica(replica, true);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
return packet;
|
|
|
|
}
|
|
|
|
|
2021-02-15 13:21:36 +00:00
|
|
|
void HedgedConnections::processReceivedFirstDataPacket(const ReplicaLocation & replica_location)
|
2021-01-19 19:21:06 +00:00
|
|
|
{
|
2021-02-02 12:14:31 +00:00
|
|
|
/// When we receive first packet of data from replica, we stop working with replicas, that are
|
|
|
|
/// responsible for the same offset.
|
2021-02-09 02:01:09 +00:00
|
|
|
OffsetState & offset_state = offset_states[replica_location.offset];
|
2021-02-15 13:21:36 +00:00
|
|
|
offset_state.replicas[replica_location.index].change_replica_timeout.reset();
|
2021-02-06 00:54:27 +00:00
|
|
|
++offsets_with_received_first_data_packet;
|
|
|
|
offset_state.first_packet_of_data_received = true;
|
2021-01-19 19:21:06 +00:00
|
|
|
|
2021-02-09 02:01:09 +00:00
|
|
|
for (size_t i = 0; i != offset_state.replicas.size(); ++i)
|
2021-01-19 19:21:06 +00:00
|
|
|
{
|
2021-02-09 02:01:09 +00:00
|
|
|
if (i != replica_location.index && offset_state.replicas[i].connection)
|
2021-01-27 09:33:11 +00:00
|
|
|
{
|
2021-02-09 02:01:09 +00:00
|
|
|
offset_state.replicas[i].connection->sendCancel();
|
|
|
|
finishProcessReplica(offset_state.replicas[i], true);
|
2021-01-27 09:33:11 +00:00
|
|
|
}
|
2021-01-19 19:21:06 +00:00
|
|
|
}
|
2021-01-27 09:33:11 +00:00
|
|
|
|
2021-02-02 12:14:31 +00:00
|
|
|
/// If we received data from replicas with all offsets, we need to stop choosing new replicas.
|
2021-02-06 00:54:27 +00:00
|
|
|
if (hedged_connections_factory.hasEventsInProcess() && offsets_with_received_first_data_packet == offset_states.size())
|
2021-01-19 19:21:06 +00:00
|
|
|
{
|
2021-02-21 14:03:24 +00:00
|
|
|
if (hedged_connections_factory.numberOfProcessingReplicas() > 0)
|
2021-02-06 00:54:27 +00:00
|
|
|
epoll.remove(hedged_connections_factory.getFileDescriptor());
|
|
|
|
hedged_connections_factory.stopChoosingReplicas();
|
2021-01-19 19:21:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
void HedgedConnections::startNewReplica()
|
2021-01-19 19:21:06 +00:00
|
|
|
{
|
2021-02-06 00:54:27 +00:00
|
|
|
Connection * connection = nullptr;
|
2021-02-21 14:03:24 +00:00
|
|
|
HedgedConnectionsFactory::State state = hedged_connections_factory.startNewConnection(connection);
|
2021-01-19 19:21:06 +00:00
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
/// Check if we need to add hedged_connections_factory file descriptor to epoll.
|
|
|
|
if (state == HedgedConnectionsFactory::State::NOT_READY && hedged_connections_factory.numberOfProcessingReplicas() == 1)
|
|
|
|
epoll.add(hedged_connections_factory.getFileDescriptor());
|
2021-01-27 09:33:11 +00:00
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
processNewReplicaState(state, connection);
|
|
|
|
}
|
2021-02-06 00:54:27 +00:00
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
void HedgedConnections::checkNewReplica()
|
|
|
|
{
|
|
|
|
Connection * connection = nullptr;
|
2021-02-26 15:53:40 +00:00
|
|
|
HedgedConnectionsFactory::State state = hedged_connections_factory.waitForReadyConnections(connection);
|
2021-02-17 17:34:52 +00:00
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
processNewReplicaState(state, connection);
|
2021-02-17 17:34:52 +00:00
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
/// Check if we don't need to listen hedged_connections_factory file descriptor in epoll anymore.
|
|
|
|
if (hedged_connections_factory.numberOfProcessingReplicas() == 0)
|
|
|
|
epoll.remove(hedged_connections_factory.getFileDescriptor());
|
|
|
|
}
|
2021-01-19 19:21:06 +00:00
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
void HedgedConnections::processNewReplicaState(HedgedConnectionsFactory::State state, Connection * connection)
|
|
|
|
{
|
|
|
|
switch (state)
|
2021-02-06 00:54:27 +00:00
|
|
|
{
|
2021-02-21 14:03:24 +00:00
|
|
|
case HedgedConnectionsFactory::State::READY:
|
2021-02-06 00:54:27 +00:00
|
|
|
{
|
2021-02-21 14:03:24 +00:00
|
|
|
size_t offset = offsets_queue.front();
|
2021-02-06 00:54:27 +00:00
|
|
|
offsets_queue.pop();
|
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
offset_states[offset].replicas.emplace_back(connection);
|
|
|
|
++offset_states[offset].active_connection_count;
|
|
|
|
offset_states[offset].next_replica_in_process = false;
|
|
|
|
++active_connection_count;
|
|
|
|
|
|
|
|
ReplicaState & replica = offset_states[offset].replicas.back();
|
|
|
|
epoll.add(replica.packet_receiver->getFileDescriptor());
|
|
|
|
fd_to_replica_location[replica.packet_receiver->getFileDescriptor()] = ReplicaLocation{offset, offset_states[offset].replicas.size() - 1};
|
|
|
|
epoll.add(replica.change_replica_timeout.getDescriptor());
|
|
|
|
timeout_fd_to_replica_location[replica.change_replica_timeout.getDescriptor()] = ReplicaLocation{offset, offset_states[offset].replicas.size() - 1};
|
|
|
|
|
|
|
|
pipeline_for_new_replicas.run(replica);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case HedgedConnectionsFactory::State::CANNOT_CHOOSE:
|
|
|
|
{
|
|
|
|
while (!offsets_queue.empty())
|
|
|
|
{
|
|
|
|
/// Check if there is no active replica with needed offsets.
|
|
|
|
if (offset_states[offsets_queue.front()].active_connection_count == 0)
|
|
|
|
throw Exception("Cannot find enough connections to replicas", ErrorCodes::ALL_CONNECTION_TRIES_FAILED);
|
|
|
|
offset_states[offsets_queue.front()].next_replica_in_process = false;
|
|
|
|
offsets_queue.pop();
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case HedgedConnectionsFactory::State::NOT_READY:
|
|
|
|
break;
|
2021-01-27 09:33:11 +00:00
|
|
|
}
|
2021-01-19 19:21:06 +00:00
|
|
|
}
|
|
|
|
|
2021-02-09 02:01:09 +00:00
|
|
|
void HedgedConnections::finishProcessReplica(ReplicaState & replica, bool disconnect)
|
2021-01-19 19:21:06 +00:00
|
|
|
{
|
2021-02-21 14:03:24 +00:00
|
|
|
replica.packet_receiver->cancel();
|
2021-02-17 17:34:52 +00:00
|
|
|
replica.change_replica_timeout.reset();
|
|
|
|
|
2021-02-21 14:03:24 +00:00
|
|
|
epoll.remove(replica.packet_receiver->getFileDescriptor());
|
|
|
|
--offset_states[fd_to_replica_location[replica.packet_receiver->getFileDescriptor()].offset].active_connection_count;
|
|
|
|
fd_to_replica_location.erase(replica.packet_receiver->getFileDescriptor());
|
2021-02-17 17:34:52 +00:00
|
|
|
|
|
|
|
epoll.remove(replica.change_replica_timeout.getDescriptor());
|
|
|
|
timeout_fd_to_replica_location.erase(replica.change_replica_timeout.getDescriptor());
|
2021-02-16 07:56:45 +00:00
|
|
|
|
2021-02-06 00:54:27 +00:00
|
|
|
--active_connection_count;
|
2021-01-27 09:33:11 +00:00
|
|
|
|
2021-01-19 19:21:06 +00:00
|
|
|
if (disconnect)
|
2021-02-09 02:01:09 +00:00
|
|
|
replica.connection->disconnect();
|
|
|
|
replica.connection = nullptr;
|
2021-02-06 00:54:27 +00:00
|
|
|
}
|
|
|
|
|
2021-01-19 19:21:06 +00:00
|
|
|
}
|
2021-01-29 15:46:28 +00:00
|
|
|
#endif
|