From b6a790124cd670749b4c504f58a4854307bf7d83 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Tue, 16 Jul 2024 20:16:47 +0000 Subject: [PATCH 01/11] Handling parallel replicas protocol with priority for async communication --- src/Processors/IProcessor.h | 2 + src/Processors/Sources/RemoteSource.cpp | 23 +++++++++++ src/Processors/Sources/RemoteSource.h | 3 ++ src/QueryPipeline/RemoteQueryExecutor.cpp | 38 +++++++++++++++++-- src/QueryPipeline/RemoteQueryExecutor.h | 4 +- .../RemoteQueryExecutorReadContext.h | 2 + 6 files changed, 67 insertions(+), 5 deletions(-) diff --git a/src/Processors/IProcessor.h b/src/Processors/IProcessor.h index 02f7b6b3d12..358983a2179 100644 --- a/src/Processors/IProcessor.h +++ b/src/Processors/IProcessor.h @@ -221,6 +221,8 @@ public: throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'schedule' is not implemented for {} processor", getName()); } + virtual void asyncJobReady() {} + /** You must call this method if 'prepare' returned ExpandPipeline. * This method cannot access any port, but it can create new ports for current processor. * diff --git a/src/Processors/Sources/RemoteSource.cpp b/src/Processors/Sources/RemoteSource.cpp index 3d7dd3f76b8..f1d47f69782 100644 --- a/src/Processors/Sources/RemoteSource.cpp +++ b/src/Processors/Sources/RemoteSource.cpp @@ -89,6 +89,12 @@ ISource::Status RemoteSource::prepare() void RemoteSource::work() { + if (async_immediate_work.exchange(false)) + { + LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "async_immediate_work was true"); + return; + } + /// Connection drain is a heavy operation that may take a long time. /// Therefore we move connection drain from prepare() to work(), and drain multiple connections in parallel. /// See issue: https://github.com/ClickHouse/ClickHouse/issues/60844 @@ -101,6 +107,23 @@ void RemoteSource::work() ISource::work(); } +void RemoteSource::asyncJobReady() +{ + chassert(async_read); + + if (!was_query_sent) + return; + + auto res = query_executor->readAsync(/*probe=*/true); + if (res.type == RemoteQueryExecutor::ReadResult::Type::ParallelReplicasToken) + { + LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "async_immediate_work is {}", async_immediate_work); + work(); + async_immediate_work = true; + LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "async_immediate_work is true"); + } +} + std::optional RemoteSource::tryGenerate() { /// onCancel() will do the cancel if the query was sent. diff --git a/src/Processors/Sources/RemoteSource.h b/src/Processors/Sources/RemoteSource.h index 052567bc261..fa04985f101 100644 --- a/src/Processors/Sources/RemoteSource.h +++ b/src/Processors/Sources/RemoteSource.h @@ -32,6 +32,8 @@ public: int schedule() override { return fd; } + void asyncJobReady() override; + void setStorageLimits(const std::shared_ptr & storage_limits_) override; protected: @@ -52,6 +54,7 @@ private: int fd = -1; size_t rows = 0; bool manually_add_rows_before_limit_counter = false; + std::atomic_bool async_immediate_work{false}; }; /// Totals source from RemoteQueryExecutor. diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index b08f2002f64..3ca05b53417 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -469,7 +469,7 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::read() return restartQueryWithoutDuplicatedUUIDs(); } -RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync() +RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync(bool check_packet_type_only) { #if defined(OS_LINUX) if (!read_context || (resent_query && recreate_read_context)) @@ -486,7 +486,21 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync() { std::lock_guard lock(was_cancelled_mutex); if (was_cancelled) + { + LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "was_cancelled"); return ReadResult(Block()); + } + + if (has_postponed_packet) + { + has_postponed_packet = false; + auto read_result = processPacket(read_context->getPacket()); + if (read_result.getType() == ReadResult::Type::Data || read_result.getType() == ReadResult::Type::ParallelReplicasToken) + return read_result; + + if (got_duplicated_part_uuids) + break; + } read_context->resume(); @@ -506,12 +520,28 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync() /// Check if packet is not ready yet. if (read_context->isInProgress()) + { + LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "read_context still in progress"); return ReadResult(read_context->getFileDescriptor()); + } - auto anything = processPacket(read_context->getPacket()); + const auto packet_type = read_context->getPacketType(); + LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "Packet type: {}", packet_type); - if (anything.getType() == ReadResult::Type::Data || anything.getType() == ReadResult::Type::ParallelReplicasToken) - return anything; + if (check_packet_type_only) + { + has_postponed_packet = true; + if (packet_type == Protocol::Server::MergeTreeReadTaskRequest + || packet_type == Protocol::Server::MergeTreeAllRangesAnnouncement) + { + return ReadResult(ReadResult::Type::ParallelReplicasToken); + } + return ReadResult(ReadResult::Type::Nothing); + } + + auto read_result = processPacket(read_context->getPacket()); + if (read_result.getType() == ReadResult::Type::Data || read_result.getType() == ReadResult::Type::ParallelReplicasToken) + return read_result; if (got_duplicated_part_uuids) break; diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 04a59cc3b7e..6849c3e0a07 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -183,7 +183,7 @@ public: ReadResult read(); /// Async variant of read. Returns ready block or file descriptor which may be used for polling. - ReadResult readAsync(); + ReadResult readAsync(bool check_packet_type_only = false); /// Receive all remain packets and finish query. /// It should be cancelled after read returned empty block. @@ -303,6 +303,8 @@ private: */ bool got_duplicated_part_uuids = false; + bool has_postponed_packet = false; + /// Parts uuids, collected from remote replicas std::vector duplicated_part_uuids; diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.h b/src/QueryPipeline/RemoteQueryExecutorReadContext.h index b8aa8bb9111..c054e75f6f1 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.h +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.h @@ -39,6 +39,8 @@ public: Packet getPacket() { return std::move(packet); } + UInt64 getPacketType() const { return packet.type; } + private: bool checkTimeout(bool blocking = false); From 523e0abb4ec329c0535602c43c17991f4ef043a3 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Wed, 17 Jul 2024 13:15:14 +0000 Subject: [PATCH 02/11] Remove debug logs --- src/Processors/Sources/RemoteSource.cpp | 7 +------ src/QueryPipeline/RemoteQueryExecutor.cpp | 10 +--------- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/src/Processors/Sources/RemoteSource.cpp b/src/Processors/Sources/RemoteSource.cpp index f1d47f69782..e33613564a2 100644 --- a/src/Processors/Sources/RemoteSource.cpp +++ b/src/Processors/Sources/RemoteSource.cpp @@ -90,10 +90,7 @@ ISource::Status RemoteSource::prepare() void RemoteSource::work() { if (async_immediate_work.exchange(false)) - { - LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "async_immediate_work was true"); return; - } /// Connection drain is a heavy operation that may take a long time. /// Therefore we move connection drain from prepare() to work(), and drain multiple connections in parallel. @@ -114,13 +111,11 @@ void RemoteSource::asyncJobReady() if (!was_query_sent) return; - auto res = query_executor->readAsync(/*probe=*/true); + auto res = query_executor->readAsync(/*check_packet_type_only=*/true); if (res.type == RemoteQueryExecutor::ReadResult::Type::ParallelReplicasToken) { - LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "async_immediate_work is {}", async_immediate_work); work(); async_immediate_work = true; - LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "async_immediate_work is true"); } } diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 3ca05b53417..87f634b8334 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -486,10 +486,7 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync(bool check_packet { std::lock_guard lock(was_cancelled_mutex); if (was_cancelled) - { - LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "was_cancelled"); return ReadResult(Block()); - } if (has_postponed_packet) { @@ -520,17 +517,12 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync(bool check_packet /// Check if packet is not ready yet. if (read_context->isInProgress()) - { - LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "read_context still in progress"); return ReadResult(read_context->getFileDescriptor()); - } - - const auto packet_type = read_context->getPacketType(); - LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "Packet type: {}", packet_type); if (check_packet_type_only) { has_postponed_packet = true; + const auto packet_type = read_context->getPacketType(); if (packet_type == Protocol::Server::MergeTreeReadTaskRequest || packet_type == Protocol::Server::MergeTreeAllRangesAnnouncement) { From e806123856f5ded0f2e92f4f4b42c38132276c15 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Thu, 18 Jul 2024 20:30:56 +0000 Subject: [PATCH 03/11] Fix non x86 build --- src/QueryPipeline/RemoteQueryExecutor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 87f634b8334..d7edbc9ed35 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -469,7 +469,7 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::read() return restartQueryWithoutDuplicatedUUIDs(); } -RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync(bool check_packet_type_only) +RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync([[maybe_unused]] bool check_packet_type_only) { #if defined(OS_LINUX) if (!read_context || (resent_query && recreate_read_context)) From 4cb862432c50848e3406899f5c7079b4cf1d62a8 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Fri, 19 Jul 2024 09:34:20 +0000 Subject: [PATCH 04/11] Rename method --- src/Processors/IProcessor.h | 2 +- src/Processors/Sources/RemoteSource.cpp | 2 +- src/Processors/Sources/RemoteSource.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Processors/IProcessor.h b/src/Processors/IProcessor.h index 358983a2179..0776921a814 100644 --- a/src/Processors/IProcessor.h +++ b/src/Processors/IProcessor.h @@ -221,7 +221,7 @@ public: throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'schedule' is not implemented for {} processor", getName()); } - virtual void asyncJobReady() {} + virtual void onAsyncJobReady() {} /** You must call this method if 'prepare' returned ExpandPipeline. * This method cannot access any port, but it can create new ports for current processor. diff --git a/src/Processors/Sources/RemoteSource.cpp b/src/Processors/Sources/RemoteSource.cpp index 3ec2356a121..587f6e2001b 100644 --- a/src/Processors/Sources/RemoteSource.cpp +++ b/src/Processors/Sources/RemoteSource.cpp @@ -104,7 +104,7 @@ void RemoteSource::work() ISource::work(); } -void RemoteSource::asyncJobReady() +void RemoteSource::onAsyncJobReady() { chassert(async_read); diff --git a/src/Processors/Sources/RemoteSource.h b/src/Processors/Sources/RemoteSource.h index fa04985f101..2247c781584 100644 --- a/src/Processors/Sources/RemoteSource.h +++ b/src/Processors/Sources/RemoteSource.h @@ -32,7 +32,7 @@ public: int schedule() override { return fd; } - void asyncJobReady() override; + void onAsyncJobReady() override; void setStorageLimits(const std::shared_ptr & storage_limits_) override; From 8349d260952a6daeb84c653c37ac000cf5302cfd Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Fri, 19 Jul 2024 11:25:34 +0000 Subject: [PATCH 05/11] Simplified implementation --- src/Processors/Sources/RemoteSource.cpp | 17 +++++------ src/Processors/Sources/RemoteSource.h | 2 +- src/QueryPipeline/RemoteQueryExecutor.cpp | 37 +++++++++++++++++++++++ src/QueryPipeline/RemoteQueryExecutor.h | 3 ++ 4 files changed, 49 insertions(+), 10 deletions(-) diff --git a/src/Processors/Sources/RemoteSource.cpp b/src/Processors/Sources/RemoteSource.cpp index 587f6e2001b..46c27676e12 100644 --- a/src/Processors/Sources/RemoteSource.cpp +++ b/src/Processors/Sources/RemoteSource.cpp @@ -89,9 +89,6 @@ ISource::Status RemoteSource::prepare() void RemoteSource::work() { - if (async_immediate_work.exchange(false)) - return; - /// Connection drain is a heavy operation that may take a long time. /// Therefore we move connection drain from prepare() to work(), and drain multiple connections in parallel. /// See issue: https://github.com/ClickHouse/ClickHouse/issues/60844 @@ -101,6 +98,13 @@ void RemoteSource::work() executor_finished = true; return; } + + if (preprocessed_packet) + { + preprocessed_packet = false; + return; + } + ISource::work(); } @@ -111,12 +115,7 @@ void RemoteSource::onAsyncJobReady() if (!was_query_sent) return; - auto res = query_executor->readAsync(/*check_packet_type_only=*/true); - if (res.type == RemoteQueryExecutor::ReadResult::Type::ParallelReplicasToken) - { - work(); - async_immediate_work = true; - } + preprocessed_packet = query_executor->processParallelReplicaPacketIfAny(); } std::optional RemoteSource::tryGenerate() diff --git a/src/Processors/Sources/RemoteSource.h b/src/Processors/Sources/RemoteSource.h index 2247c781584..22d3921708b 100644 --- a/src/Processors/Sources/RemoteSource.h +++ b/src/Processors/Sources/RemoteSource.h @@ -54,7 +54,7 @@ private: int fd = -1; size_t rows = 0; bool manually_add_rows_before_limit_counter = false; - std::atomic_bool async_immediate_work{false}; + bool preprocessed_packet = false; }; /// Totals source from RemoteQueryExecutor. diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index d7edbc9ed35..b15e31a120f 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -936,4 +936,41 @@ bool RemoteQueryExecutor::needToSkipUnavailableShard() const return context->getSettingsRef().skip_unavailable_shards && (0 == connections->size()); } +bool RemoteQueryExecutor::processParallelReplicaPacketIfAny() +{ +#if defined(OS_LINUX) + if (!read_context || (resent_query && recreate_read_context)) + { + std::lock_guard lock(was_cancelled_mutex); + if (was_cancelled) + return false; + + read_context = std::make_unique(*this); + recreate_read_context = false; + } + + { + std::lock_guard lock(was_cancelled_mutex); + if (was_cancelled) + return false; + + chassert(!has_postponed_packet); + + read_context->resume(); + if (read_context->isInProgress()) // <- nothing to process + return false; + + const auto packet_type = read_context->getPacketType(); + if (packet_type == Protocol::Server::MergeTreeReadTaskRequest || packet_type == Protocol::Server::MergeTreeAllRangesAnnouncement) + { + processPacket(read_context->getPacket()); + return true; + } + + has_postponed_packet = true; + return false; + } +#endif +} + } diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 6849c3e0a07..6f56df71f1d 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -222,6 +222,9 @@ public: bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; } + /// return true if parallel replica packet was processed + bool processParallelReplicaPacketIfAny(); + private: RemoteQueryExecutor( const String & query_, From 53ea5510143ded0862fd51922077a7cdc1344fe2 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Fri, 19 Jul 2024 11:30:55 +0000 Subject: [PATCH 06/11] Remove unused code --- src/QueryPipeline/RemoteQueryExecutor.cpp | 14 +------------- src/QueryPipeline/RemoteQueryExecutor.h | 2 +- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index b15e31a120f..b78c38a4134 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -469,7 +469,7 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::read() return restartQueryWithoutDuplicatedUUIDs(); } -RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync([[maybe_unused]] bool check_packet_type_only) +RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync() { #if defined(OS_LINUX) if (!read_context || (resent_query && recreate_read_context)) @@ -519,18 +519,6 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync([[maybe_unused]] if (read_context->isInProgress()) return ReadResult(read_context->getFileDescriptor()); - if (check_packet_type_only) - { - has_postponed_packet = true; - const auto packet_type = read_context->getPacketType(); - if (packet_type == Protocol::Server::MergeTreeReadTaskRequest - || packet_type == Protocol::Server::MergeTreeAllRangesAnnouncement) - { - return ReadResult(ReadResult::Type::ParallelReplicasToken); - } - return ReadResult(ReadResult::Type::Nothing); - } - auto read_result = processPacket(read_context->getPacket()); if (read_result.getType() == ReadResult::Type::Data || read_result.getType() == ReadResult::Type::ParallelReplicasToken) return read_result; diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 6f56df71f1d..7289e2a2243 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -183,7 +183,7 @@ public: ReadResult read(); /// Async variant of read. Returns ready block or file descriptor which may be used for polling. - ReadResult readAsync(bool check_packet_type_only = false); + ReadResult readAsync(); /// Receive all remain packets and finish query. /// It should be cancelled after read returned empty block. From f3fb729f53860d55db1d72ccfc88f9c5d018aea1 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Fri, 19 Jul 2024 20:12:14 +0000 Subject: [PATCH 07/11] Call onAsyncJobReady() --- src/Processors/Executors/ExecutorTasks.cpp | 2 ++ src/Processors/Executors/ExecutorTasks.h | 2 +- src/Processors/Executors/PipelineExecutor.h | 1 - 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Processors/Executors/ExecutorTasks.cpp b/src/Processors/Executors/ExecutorTasks.cpp index 7e3bee239ef..d045f59a2e2 100644 --- a/src/Processors/Executors/ExecutorTasks.cpp +++ b/src/Processors/Executors/ExecutorTasks.cpp @@ -204,6 +204,8 @@ void ExecutorTasks::processAsyncTasks() while (auto task = async_task_queue.wait(lock)) { auto * node = static_cast(task.data); + node->processor->onAsyncJobReady(); + executor_contexts[task.thread_num]->pushAsyncTask(node); ++num_waiting_async_tasks; diff --git a/src/Processors/Executors/ExecutorTasks.h b/src/Processors/Executors/ExecutorTasks.h index 202ca253c6c..b2201873edf 100644 --- a/src/Processors/Executors/ExecutorTasks.h +++ b/src/Processors/Executors/ExecutorTasks.h @@ -28,7 +28,7 @@ class ExecutorTasks TaskQueue task_queue; /// Queue which stores tasks where processors returned Async status after prepare. - /// If multiple threads are using, main thread will wait for async tasks. + /// If multiple threads are used, main thread will wait for async tasks. /// For single thread, will wait for async tasks only when task_queue is empty. PollingQueue async_task_queue; diff --git a/src/Processors/Executors/PipelineExecutor.h b/src/Processors/Executors/PipelineExecutor.h index 03f0f7f1a0a..ae119355cb5 100644 --- a/src/Processors/Executors/PipelineExecutor.h +++ b/src/Processors/Executors/PipelineExecutor.h @@ -9,7 +9,6 @@ #include #include -#include #include From 465a34d3dfe3e313471e10d59cab8219b3e5837e Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Fri, 19 Jul 2024 20:27:57 +0000 Subject: [PATCH 08/11] Simplify, fix build --- src/QueryPipeline/RemoteQueryExecutor.cpp | 48 +++++++++++------------ 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index b78c38a4134..61a512bcfc5 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -924,41 +924,37 @@ bool RemoteQueryExecutor::needToSkipUnavailableShard() const return context->getSettingsRef().skip_unavailable_shards && (0 == connections->size()); } -bool RemoteQueryExecutor::processParallelReplicaPacketIfAny() +bool RemoteQueryExecutor::processParallelReplicaPacketIfAny() { #if defined(OS_LINUX) + + std::lock_guard lock(was_cancelled_mutex); + if (was_cancelled) + return false; + if (!read_context || (resent_query && recreate_read_context)) { - std::lock_guard lock(was_cancelled_mutex); - if (was_cancelled) - return false; - read_context = std::make_unique(*this); recreate_read_context = false; } - { - std::lock_guard lock(was_cancelled_mutex); - if (was_cancelled) - return false; + chassert(!has_postponed_packet); - chassert(!has_postponed_packet); - - read_context->resume(); - if (read_context->isInProgress()) // <- nothing to process - return false; - - const auto packet_type = read_context->getPacketType(); - if (packet_type == Protocol::Server::MergeTreeReadTaskRequest || packet_type == Protocol::Server::MergeTreeAllRangesAnnouncement) - { - processPacket(read_context->getPacket()); - return true; - } - - has_postponed_packet = true; + read_context->resume(); + if (read_context->isInProgress()) // <- nothing to process return false; - } -#endif -} + const auto packet_type = read_context->getPacketType(); + if (packet_type == Protocol::Server::MergeTreeReadTaskRequest || packet_type == Protocol::Server::MergeTreeAllRangesAnnouncement) + { + processPacket(read_context->getPacket()); + return true; + } + + has_postponed_packet = true; + +#endif + + return false; +} } From 71cdf82643fb17b5b68003df314c54ebbca0842f Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Mon, 22 Jul 2024 18:41:30 +0000 Subject: [PATCH 09/11] Fix: reset is_async_state flag --- src/Processors/Sources/RemoteSource.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Processors/Sources/RemoteSource.cpp b/src/Processors/Sources/RemoteSource.cpp index 46c27676e12..2f9a30296be 100644 --- a/src/Processors/Sources/RemoteSource.cpp +++ b/src/Processors/Sources/RemoteSource.cpp @@ -115,7 +115,10 @@ void RemoteSource::onAsyncJobReady() if (!was_query_sent) return; + chassert(!preprocessed_packet); preprocessed_packet = query_executor->processParallelReplicaPacketIfAny(); + if (preprocessed_packet) + is_async_state = false; } std::optional RemoteSource::tryGenerate() From 223eee3f46b07c38de3223fb56575f9ecbc5bea7 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Tue, 23 Jul 2024 07:57:03 +0000 Subject: [PATCH 10/11] Comment to new IProcessor method --- src/Processors/IProcessor.h | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/Processors/IProcessor.h b/src/Processors/IProcessor.h index 0776921a814..94e93595f4e 100644 --- a/src/Processors/IProcessor.h +++ b/src/Processors/IProcessor.h @@ -221,6 +221,21 @@ public: throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'schedule' is not implemented for {} processor", getName()); } + /* The method is called right after asynchronous job is done + * i.e. when file descriptor returned by schedule() is readable. + * The sequence of method calls: + * ... prepare() -> schedule() -> onAsyncJobReady() -> work() ... + * See also comment to schedule() method + * + * It allows doing some preprocessing immediately after asynchronous job is done. + * The implementation should return control quickly, to avoid blocking another asynchronous completed jobs + * created by the same pipeline. + * + * Example, scheduling tasks for remote workers (file descriptor in this case is a socket) + * When the remote worker asks for the next task, doing it in onAsyncJobReady() we can provide it immediately. + * Otherwise, the returning of the next task for the remote worker can be delayed by current work done in the pipeline + * i.e. processor->work(), which will create unnecessary latency in query processing by remote workers Not Committed Yet + */ virtual void onAsyncJobReady() {} /** You must call this method if 'prepare' returned ExpandPipeline. From d74dc587d7a183225b7cf0846b85e8213dcb7fc0 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Tue, 23 Jul 2024 13:06:58 +0000 Subject: [PATCH 11/11] Fix comment --- src/Processors/IProcessor.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Processors/IProcessor.h b/src/Processors/IProcessor.h index 94e93595f4e..4fd00d5e164 100644 --- a/src/Processors/IProcessor.h +++ b/src/Processors/IProcessor.h @@ -234,7 +234,7 @@ public: * Example, scheduling tasks for remote workers (file descriptor in this case is a socket) * When the remote worker asks for the next task, doing it in onAsyncJobReady() we can provide it immediately. * Otherwise, the returning of the next task for the remote worker can be delayed by current work done in the pipeline - * i.e. processor->work(), which will create unnecessary latency in query processing by remote workers Not Committed Yet + * (by other processors), which will create unnecessary latency in query processing by remote workers */ virtual void onAsyncJobReady() {}