Parallel replicas always skip unavailable ones (#50293)

This commit is contained in:
Nikita Mikhaylov 2023-05-31 22:10:33 +02:00 committed by GitHub
parent aedd3afb8a
commit 3543d95980
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 105 additions and 22 deletions

View File

@ -909,6 +909,11 @@
<host>127.0.0.10</host>
<port>9000</port>
</replica>
<!-- Unavailable replica -->
<replica>
<host>127.0.0.11</host>
<port>1234</port>
</replica>
</shard>
</parallel_replicas>
<test_cluster_two_shards_localhost>

View File

@ -468,6 +468,12 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}
/// Set skip_unavailable_shards to true only if it wasn't disabled explicitly
if (settings.allow_experimental_parallel_reading_from_replicas > 0 && !settings.skip_unavailable_shards && !settings.isChanged("skip_unavailable_shards"))
{
context->setSetting("skip_unavailable_shards", true);
}
/// Check support for JOIN for parallel replicas with custom key
if (joined_tables.tablesCount() > 1 && !settings.parallel_replicas_custom_key.value.empty())
{

View File

@ -47,8 +47,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: header(header_), query(query_), context(context_), scalars(scalars_)
, external_tables(external_tables_), stage(stage_)
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
, extension(extension_)
{}
RemoteQueryExecutor::RemoteQueryExecutor(
@ -90,8 +89,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
, extension(extension_)
{
create_connections = [this, connections_, throttler, extension_](AsyncCallback) mutable {
auto res = std::make_unique<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
@ -108,8 +106,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
, extension(extension_)
{
create_connections = [this, pool, throttler, extension_](AsyncCallback async_callback)->std::unique_ptr<IConnections>
{
@ -247,6 +244,13 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As
finished = true;
sent_query = true;
/// We need to tell the coordinator not to wait for this replica.
if (extension && extension->parallel_reading_coordinator)
{
chassert(extension->replica_info);
extension->parallel_reading_coordinator->markReplicaAsUnavailable(extension->replica_info->number_of_current_replica);
}
return;
}
@ -360,7 +364,18 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
read_context->resume();
if (needToSkipUnavailableShard())
{
/// We need to tell the coordinator not to wait for this replica.
/// But at this point it may lead to an incomplete result set, because
/// this replica committed to read some part of there data and then died.
if (extension && extension->parallel_reading_coordinator)
{
chassert(extension->parallel_reading_coordinator);
extension->parallel_reading_coordinator->markReplicaAsUnavailable(extension->replica_info->number_of_current_replica);
}
return ReadResult(Block());
}
/// Check if packet is not ready yet.
if (read_context->isInProgress())
@ -524,30 +539,30 @@ bool RemoteQueryExecutor::setPartUUIDs(const std::vector<UUID> & uuids)
void RemoteQueryExecutor::processReadTaskRequest()
{
if (!task_iterator)
if (!extension || !extension->task_iterator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized");
ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived);
auto response = (*task_iterator)();
auto response = (*extension->task_iterator)();
connections->sendReadTaskResponse(response);
}
void RemoteQueryExecutor::processMergeTreeReadTaskRequest(ParallelReadRequest request)
{
if (!parallel_reading_coordinator)
if (!extension || !extension->parallel_reading_coordinator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");
ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsReceived);
auto response = parallel_reading_coordinator->handleRequest(std::move(request));
auto response = extension->parallel_reading_coordinator->handleRequest(std::move(request));
connections->sendMergeTreeReadTaskResponse(response);
}
void RemoteQueryExecutor::processMergeTreeInitialReadAnnounecement(InitialAllRangesAnnouncement announcement)
{
if (!parallel_reading_coordinator)
if (!extension || !extension->parallel_reading_coordinator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");
parallel_reading_coordinator->handleInitialAllRangesAnnouncement(announcement);
extension->parallel_reading_coordinator->handleInitialAllRangesAnnouncement(announcement);
}
void RemoteQueryExecutor::finish()

View File

@ -212,11 +212,11 @@ private:
/// Temporary tables needed to be sent to remote servers
Tables external_tables;
QueryProcessingStage::Enum stage;
std::optional<Extension> extension;
/// Initiator identifier for distributed task processing
std::shared_ptr<TaskIterator> task_iterator;
std::shared_ptr<ParallelReplicasReadingCoordinator> parallel_reading_coordinator;
/// This is needed only for parallel reading from replicas, because
/// we create a RemoteQueryExecutor per replica and have to store additional info
/// about the number of the current replica or the count of replicas at all.

View File

@ -19,6 +19,7 @@
#include "Storages/MergeTree/RequestResponse.h"
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/IntersectionsIndexes.h>
#include <fmt/core.h>
#include <fmt/format.h>
namespace DB
@ -61,18 +62,22 @@ public:
{
size_t number_of_requests{0};
size_t sum_marks{0};
bool is_unavailable{false};
};
using Stats = std::vector<Stat>;
static String toString(Stats stats)
{
String result = "Statistics: ";
std::vector<String> stats_by_replica;
for (size_t i = 0; i < stats.size(); ++i)
result += fmt::format("-- replica {}, requests: {} marks: {} ", i, stats[i].number_of_requests, stats[i].sum_marks);
stats_by_replica.push_back(fmt::format("replica {}{} - {{requests: {} marks: {}}}", i, stats[i].is_unavailable ? " is unavailable" : "", stats[i].number_of_requests, stats[i].sum_marks));
result += fmt::format("{}", fmt::join(stats_by_replica, "; "));
return result;
}
Stats stats;
size_t replicas_count;
size_t replicas_count{0};
size_t unavailable_replicas_count{0};
explicit ImplInterface(size_t replicas_count_)
: stats{replicas_count_}
@ -82,6 +87,7 @@ public:
virtual ~ImplInterface() = default;
virtual ParallelReadResponse handleRequest(ParallelReadRequest request) = 0;
virtual void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) = 0;
virtual void markReplicaAsUnavailable(size_t replica_number) = 0;
};
using Parts = std::set<Part>;
@ -128,6 +134,7 @@ public:
ParallelReadResponse handleRequest(ParallelReadRequest request) override;
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) override;
void markReplicaAsUnavailable(size_t replica_number) override;
void updateReadingState(const InitialAllRangesAnnouncement & announcement);
void finalizeReadingState();
@ -199,6 +206,17 @@ void DefaultCoordinator::updateReadingState(const InitialAllRangesAnnouncement &
}
}
void DefaultCoordinator::markReplicaAsUnavailable(size_t replica_number)
{
LOG_DEBUG(log, "Replica number {} is unavailable", replica_number);
++unavailable_replicas_count;
stats[replica_number].is_unavailable = true;
if (sent_initial_requests == replicas_count - unavailable_replicas_count)
finalizeReadingState();
}
void DefaultCoordinator::finalizeReadingState()
{
/// Clear all the delayed queue
@ -345,12 +363,23 @@ public:
ParallelReadResponse handleRequest([[ maybe_unused ]] ParallelReadRequest request) override;
void handleInitialAllRangesAnnouncement([[ maybe_unused ]] InitialAllRangesAnnouncement announcement) override;
void markReplicaAsUnavailable(size_t replica_number) override;
Parts all_parts_to_read;
Poco::Logger * log = &Poco::Logger::get(fmt::format("{}{}", magic_enum::enum_name(mode), "Coordinator"));
};
template <CoordinationMode mode>
void InOrderCoordinator<mode>::markReplicaAsUnavailable(size_t replica_number)
{
LOG_DEBUG(log, "Replica number {} is unavailable", replica_number);
stats[replica_number].is_unavailable = true;
++unavailable_replicas_count;
/// There is nothing to do else.
}
template <CoordinationMode mode>
void InOrderCoordinator<mode>::handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement)
@ -388,7 +417,6 @@ void InOrderCoordinator<mode>::handleInitialAllRangesAnnouncement(InitialAllRang
}
}
template <CoordinationMode mode>
ParallelReadResponse InOrderCoordinator<mode>::handleRequest(ParallelReadRequest request)
{
@ -486,7 +514,7 @@ void ParallelReplicasReadingCoordinator::handleInitialAllRangesAnnouncement(Init
if (!pimpl)
{
setMode(announcement.mode);
mode = announcement.mode;
initialize();
}
@ -500,16 +528,23 @@ ParallelReadResponse ParallelReplicasReadingCoordinator::handleRequest(ParallelR
if (!pimpl)
{
setMode(request.mode);
mode = request.mode;
initialize();
}
return pimpl->handleRequest(std::move(request));
}
void ParallelReplicasReadingCoordinator::setMode(CoordinationMode mode_)
void ParallelReplicasReadingCoordinator::markReplicaAsUnavailable(size_t replica_number)
{
mode = mode_;
std::lock_guard lock(mutex);
if (!pimpl)
{
initialize();
}
return pimpl->markReplicaAsUnavailable(replica_number);
}
void ParallelReplicasReadingCoordinator::initialize()

View File

@ -18,10 +18,15 @@ public:
explicit ParallelReplicasReadingCoordinator(size_t replicas_count_);
~ParallelReplicasReadingCoordinator();
void setMode(CoordinationMode mode);
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement);
ParallelReadResponse handleRequest(ParallelReadRequest request);
/// Called when some replica is unavailable and we skipped it.
/// This is needed to "finalize" reading state e.g. spread all the marks using
/// consistent hashing, because otherwise coordinator will continue working in
/// "pending" state waiting for the unavailable replica to send the announcement.
void markReplicaAsUnavailable(size_t replica_number);
private:
void initialize();

View File

@ -0,0 +1,15 @@
DROP TABLE IF EXISTS test_parallel_replicas_unavailable_shards;
CREATE TABLE test_parallel_replicas_unavailable_shards (n UInt64) ENGINE=MergeTree() ORDER BY tuple();
INSERT INTO test_parallel_replicas_unavailable_shards SELECT * FROM numbers(10);
SYSTEM FLUSH LOGS;
SET skip_unavailable_shards=1, allow_experimental_parallel_reading_from_replicas=1, max_parallel_replicas=11, use_hedged_requests=0, cluster_for_parallel_replicas='parallel_replicas', parallel_replicas_for_non_replicated_merge_tree=1;
SET send_logs_level='error';
SELECT count() FROM test_parallel_replicas_unavailable_shards WHERE NOT ignore(*);
SYSTEM FLUSH LOGS;
SELECT count() > 0 FROM system.text_log WHERE yesterday() <= event_date AND message LIKE '%Replica number 10 is unavailable%';
DROP TABLE test_parallel_replicas_unavailable_shards;