diff --git a/programs/server/config.xml b/programs/server/config.xml
index 5b69d9f6283..d18b4cb2ac9 100644
--- a/programs/server/config.xml
+++ b/programs/server/config.xml
@@ -909,6 +909,11 @@
127.0.0.10
9000
+
+
+ 127.0.0.11
+ 1234
+
diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp
index a4ea474e4e9..d2be48dafb3 100644
--- a/src/Interpreters/InterpreterSelectQuery.cpp
+++ b/src/Interpreters/InterpreterSelectQuery.cpp
@@ -468,6 +468,12 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}
+ /// Set skip_unavailable_shards to true only if it wasn't disabled explicitly
+ if (settings.allow_experimental_parallel_reading_from_replicas > 0 && !settings.skip_unavailable_shards && !settings.isChanged("skip_unavailable_shards"))
+ {
+ context->setSetting("skip_unavailable_shards", true);
+ }
+
/// Check support for JOIN for parallel replicas with custom key
if (joined_tables.tablesCount() > 1 && !settings.parallel_replicas_custom_key.value.empty())
{
diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp
index 991876eccbd..3f9f945fd45 100644
--- a/src/QueryPipeline/RemoteQueryExecutor.cpp
+++ b/src/QueryPipeline/RemoteQueryExecutor.cpp
@@ -47,8 +47,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
QueryProcessingStage::Enum stage_, std::optional extension_)
: header(header_), query(query_), context(context_), scalars(scalars_)
, external_tables(external_tables_), stage(stage_)
- , task_iterator(extension_ ? extension_->task_iterator : nullptr)
- , parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
+ , extension(extension_)
{}
RemoteQueryExecutor::RemoteQueryExecutor(
@@ -90,8 +89,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
QueryProcessingStage::Enum stage_, std::optional extension_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
- , task_iterator(extension_ ? extension_->task_iterator : nullptr)
- , parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
+ , extension(extension_)
{
create_connections = [this, connections_, throttler, extension_](AsyncCallback) mutable {
auto res = std::make_unique(std::move(connections_), context->getSettingsRef(), throttler);
@@ -108,8 +106,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
QueryProcessingStage::Enum stage_, std::optional extension_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
- , task_iterator(extension_ ? extension_->task_iterator : nullptr)
- , parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
+ , extension(extension_)
{
create_connections = [this, pool, throttler, extension_](AsyncCallback async_callback)->std::unique_ptr
{
@@ -247,6 +244,13 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As
finished = true;
sent_query = true;
+ /// We need to tell the coordinator not to wait for this replica.
+ if (extension && extension->parallel_reading_coordinator)
+ {
+ chassert(extension->replica_info);
+ extension->parallel_reading_coordinator->markReplicaAsUnavailable(extension->replica_info->number_of_current_replica);
+ }
+
return;
}
@@ -360,7 +364,18 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
read_context->resume();
if (needToSkipUnavailableShard())
+ {
+ /// We need to tell the coordinator not to wait for this replica.
+ /// But at this point it may lead to an incomplete result set, because
+ /// this replica committed to read some part of there data and then died.
+ if (extension && extension->parallel_reading_coordinator)
+ {
+ chassert(extension->parallel_reading_coordinator);
+ extension->parallel_reading_coordinator->markReplicaAsUnavailable(extension->replica_info->number_of_current_replica);
+ }
+
return ReadResult(Block());
+ }
/// Check if packet is not ready yet.
if (read_context->isInProgress())
@@ -524,30 +539,30 @@ bool RemoteQueryExecutor::setPartUUIDs(const std::vector & uuids)
void RemoteQueryExecutor::processReadTaskRequest()
{
- if (!task_iterator)
+ if (!extension || !extension->task_iterator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized");
ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived);
- auto response = (*task_iterator)();
+ auto response = (*extension->task_iterator)();
connections->sendReadTaskResponse(response);
}
void RemoteQueryExecutor::processMergeTreeReadTaskRequest(ParallelReadRequest request)
{
- if (!parallel_reading_coordinator)
+ if (!extension || !extension->parallel_reading_coordinator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");
ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsReceived);
- auto response = parallel_reading_coordinator->handleRequest(std::move(request));
+ auto response = extension->parallel_reading_coordinator->handleRequest(std::move(request));
connections->sendMergeTreeReadTaskResponse(response);
}
void RemoteQueryExecutor::processMergeTreeInitialReadAnnounecement(InitialAllRangesAnnouncement announcement)
{
- if (!parallel_reading_coordinator)
+ if (!extension || !extension->parallel_reading_coordinator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");
- parallel_reading_coordinator->handleInitialAllRangesAnnouncement(announcement);
+ extension->parallel_reading_coordinator->handleInitialAllRangesAnnouncement(announcement);
}
void RemoteQueryExecutor::finish()
diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h
index 96dc5510bf4..a843ce520de 100644
--- a/src/QueryPipeline/RemoteQueryExecutor.h
+++ b/src/QueryPipeline/RemoteQueryExecutor.h
@@ -212,11 +212,11 @@ private:
/// Temporary tables needed to be sent to remote servers
Tables external_tables;
QueryProcessingStage::Enum stage;
+
+ std::optional extension;
/// Initiator identifier for distributed task processing
std::shared_ptr task_iterator;
- std::shared_ptr parallel_reading_coordinator;
-
/// This is needed only for parallel reading from replicas, because
/// we create a RemoteQueryExecutor per replica and have to store additional info
/// about the number of the current replica or the count of replicas at all.
diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp
index ee38cecb9c4..bb044d15ba2 100644
--- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp
+++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp
@@ -19,6 +19,7 @@
#include "Storages/MergeTree/RequestResponse.h"
#include
#include
+#include
#include
namespace DB
@@ -61,18 +62,22 @@ public:
{
size_t number_of_requests{0};
size_t sum_marks{0};
+ bool is_unavailable{false};
};
using Stats = std::vector;
static String toString(Stats stats)
{
String result = "Statistics: ";
+ std::vector stats_by_replica;
for (size_t i = 0; i < stats.size(); ++i)
- result += fmt::format("-- replica {}, requests: {} marks: {} ", i, stats[i].number_of_requests, stats[i].sum_marks);
+ stats_by_replica.push_back(fmt::format("replica {}{} - {{requests: {} marks: {}}}", i, stats[i].is_unavailable ? " is unavailable" : "", stats[i].number_of_requests, stats[i].sum_marks));
+ result += fmt::format("{}", fmt::join(stats_by_replica, "; "));
return result;
}
Stats stats;
- size_t replicas_count;
+ size_t replicas_count{0};
+ size_t unavailable_replicas_count{0};
explicit ImplInterface(size_t replicas_count_)
: stats{replicas_count_}
@@ -82,6 +87,7 @@ public:
virtual ~ImplInterface() = default;
virtual ParallelReadResponse handleRequest(ParallelReadRequest request) = 0;
virtual void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) = 0;
+ virtual void markReplicaAsUnavailable(size_t replica_number) = 0;
};
using Parts = std::set;
@@ -128,6 +134,7 @@ public:
ParallelReadResponse handleRequest(ParallelReadRequest request) override;
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) override;
+ void markReplicaAsUnavailable(size_t replica_number) override;
void updateReadingState(const InitialAllRangesAnnouncement & announcement);
void finalizeReadingState();
@@ -199,6 +206,17 @@ void DefaultCoordinator::updateReadingState(const InitialAllRangesAnnouncement &
}
}
+void DefaultCoordinator::markReplicaAsUnavailable(size_t replica_number)
+{
+ LOG_DEBUG(log, "Replica number {} is unavailable", replica_number);
+
+ ++unavailable_replicas_count;
+ stats[replica_number].is_unavailable = true;
+
+ if (sent_initial_requests == replicas_count - unavailable_replicas_count)
+ finalizeReadingState();
+}
+
void DefaultCoordinator::finalizeReadingState()
{
/// Clear all the delayed queue
@@ -345,12 +363,23 @@ public:
ParallelReadResponse handleRequest([[ maybe_unused ]] ParallelReadRequest request) override;
void handleInitialAllRangesAnnouncement([[ maybe_unused ]] InitialAllRangesAnnouncement announcement) override;
+ void markReplicaAsUnavailable(size_t replica_number) override;
Parts all_parts_to_read;
Poco::Logger * log = &Poco::Logger::get(fmt::format("{}{}", magic_enum::enum_name(mode), "Coordinator"));
};
+template
+void InOrderCoordinator::markReplicaAsUnavailable(size_t replica_number)
+{
+ LOG_DEBUG(log, "Replica number {} is unavailable", replica_number);
+
+ stats[replica_number].is_unavailable = true;
+ ++unavailable_replicas_count;
+
+ /// There is nothing to do else.
+}
template
void InOrderCoordinator::handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement)
@@ -388,7 +417,6 @@ void InOrderCoordinator::handleInitialAllRangesAnnouncement(InitialAllRang
}
}
-
template
ParallelReadResponse InOrderCoordinator::handleRequest(ParallelReadRequest request)
{
@@ -486,7 +514,7 @@ void ParallelReplicasReadingCoordinator::handleInitialAllRangesAnnouncement(Init
if (!pimpl)
{
- setMode(announcement.mode);
+ mode = announcement.mode;
initialize();
}
@@ -500,16 +528,23 @@ ParallelReadResponse ParallelReplicasReadingCoordinator::handleRequest(ParallelR
if (!pimpl)
{
- setMode(request.mode);
+ mode = request.mode;
initialize();
}
return pimpl->handleRequest(std::move(request));
}
-void ParallelReplicasReadingCoordinator::setMode(CoordinationMode mode_)
+void ParallelReplicasReadingCoordinator::markReplicaAsUnavailable(size_t replica_number)
{
- mode = mode_;
+ std::lock_guard lock(mutex);
+
+ if (!pimpl)
+ {
+ initialize();
+ }
+
+ return pimpl->markReplicaAsUnavailable(replica_number);
}
void ParallelReplicasReadingCoordinator::initialize()
diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h
index 0f41d24a9c6..ad8229be2d0 100644
--- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h
+++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h
@@ -18,10 +18,15 @@ public:
explicit ParallelReplicasReadingCoordinator(size_t replicas_count_);
~ParallelReplicasReadingCoordinator();
- void setMode(CoordinationMode mode);
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement);
ParallelReadResponse handleRequest(ParallelReadRequest request);
+ /// Called when some replica is unavailable and we skipped it.
+ /// This is needed to "finalize" reading state e.g. spread all the marks using
+ /// consistent hashing, because otherwise coordinator will continue working in
+ /// "pending" state waiting for the unavailable replica to send the announcement.
+ void markReplicaAsUnavailable(size_t replica_number);
+
private:
void initialize();
diff --git a/tests/queries/0_stateless/02769_parallel_replicas_unavailable_shards.reference b/tests/queries/0_stateless/02769_parallel_replicas_unavailable_shards.reference
new file mode 100644
index 00000000000..af81158ecae
--- /dev/null
+++ b/tests/queries/0_stateless/02769_parallel_replicas_unavailable_shards.reference
@@ -0,0 +1,2 @@
+10
+1
diff --git a/tests/queries/0_stateless/02769_parallel_replicas_unavailable_shards.sql b/tests/queries/0_stateless/02769_parallel_replicas_unavailable_shards.sql
new file mode 100644
index 00000000000..ecc243b9c89
--- /dev/null
+++ b/tests/queries/0_stateless/02769_parallel_replicas_unavailable_shards.sql
@@ -0,0 +1,15 @@
+DROP TABLE IF EXISTS test_parallel_replicas_unavailable_shards;
+CREATE TABLE test_parallel_replicas_unavailable_shards (n UInt64) ENGINE=MergeTree() ORDER BY tuple();
+INSERT INTO test_parallel_replicas_unavailable_shards SELECT * FROM numbers(10);
+
+SYSTEM FLUSH LOGS;
+
+SET skip_unavailable_shards=1, allow_experimental_parallel_reading_from_replicas=1, max_parallel_replicas=11, use_hedged_requests=0, cluster_for_parallel_replicas='parallel_replicas', parallel_replicas_for_non_replicated_merge_tree=1;
+SET send_logs_level='error';
+SELECT count() FROM test_parallel_replicas_unavailable_shards WHERE NOT ignore(*);
+
+SYSTEM FLUSH LOGS;
+
+SELECT count() > 0 FROM system.text_log WHERE yesterday() <= event_date AND message LIKE '%Replica number 10 is unavailable%';
+
+DROP TABLE test_parallel_replicas_unavailable_shards;