diff --git a/src/Processors/Executors/ExecutorTasks.cpp b/src/Processors/Executors/ExecutorTasks.cpp index 7e3bee239ef..d045f59a2e2 100644 --- a/src/Processors/Executors/ExecutorTasks.cpp +++ b/src/Processors/Executors/ExecutorTasks.cpp @@ -204,6 +204,8 @@ void ExecutorTasks::processAsyncTasks() while (auto task = async_task_queue.wait(lock)) { auto * node = static_cast(task.data); + node->processor->onAsyncJobReady(); + executor_contexts[task.thread_num]->pushAsyncTask(node); ++num_waiting_async_tasks; diff --git a/src/Processors/Executors/ExecutorTasks.h b/src/Processors/Executors/ExecutorTasks.h index 202ca253c6c..b2201873edf 100644 --- a/src/Processors/Executors/ExecutorTasks.h +++ b/src/Processors/Executors/ExecutorTasks.h @@ -28,7 +28,7 @@ class ExecutorTasks TaskQueue task_queue; /// Queue which stores tasks where processors returned Async status after prepare. - /// If multiple threads are using, main thread will wait for async tasks. + /// If multiple threads are used, main thread will wait for async tasks. /// For single thread, will wait for async tasks only when task_queue is empty. PollingQueue async_task_queue; diff --git a/src/Processors/IProcessor.h b/src/Processors/IProcessor.h index 02f7b6b3d12..4fd00d5e164 100644 --- a/src/Processors/IProcessor.h +++ b/src/Processors/IProcessor.h @@ -221,6 +221,23 @@ public: throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'schedule' is not implemented for {} processor", getName()); } + /* The method is called right after asynchronous job is done + * i.e. when file descriptor returned by schedule() is readable. + * The sequence of method calls: + * ... prepare() -> schedule() -> onAsyncJobReady() -> work() ... + * See also comment to schedule() method + * + * It allows doing some preprocessing immediately after asynchronous job is done. + * The implementation should return control quickly, to avoid blocking another asynchronous completed jobs + * created by the same pipeline. + * + * Example, scheduling tasks for remote workers (file descriptor in this case is a socket) + * When the remote worker asks for the next task, doing it in onAsyncJobReady() we can provide it immediately. + * Otherwise, the returning of the next task for the remote worker can be delayed by current work done in the pipeline + * (by other processors), which will create unnecessary latency in query processing by remote workers + */ + virtual void onAsyncJobReady() {} + /** You must call this method if 'prepare' returned ExpandPipeline. * This method cannot access any port, but it can create new ports for current processor. * diff --git a/src/Processors/Sources/RemoteSource.cpp b/src/Processors/Sources/RemoteSource.cpp index 1578bd389c9..2f9a30296be 100644 --- a/src/Processors/Sources/RemoteSource.cpp +++ b/src/Processors/Sources/RemoteSource.cpp @@ -98,9 +98,29 @@ void RemoteSource::work() executor_finished = true; return; } + + if (preprocessed_packet) + { + preprocessed_packet = false; + return; + } + ISource::work(); } +void RemoteSource::onAsyncJobReady() +{ + chassert(async_read); + + if (!was_query_sent) + return; + + chassert(!preprocessed_packet); + preprocessed_packet = query_executor->processParallelReplicaPacketIfAny(); + if (preprocessed_packet) + is_async_state = false; +} + std::optional RemoteSource::tryGenerate() { /// onCancel() will do the cancel if the query was sent. diff --git a/src/Processors/Sources/RemoteSource.h b/src/Processors/Sources/RemoteSource.h index 052567bc261..22d3921708b 100644 --- a/src/Processors/Sources/RemoteSource.h +++ b/src/Processors/Sources/RemoteSource.h @@ -32,6 +32,8 @@ public: int schedule() override { return fd; } + void onAsyncJobReady() override; + void setStorageLimits(const std::shared_ptr & storage_limits_) override; protected: @@ -52,6 +54,7 @@ private: int fd = -1; size_t rows = 0; bool manually_add_rows_before_limit_counter = false; + bool preprocessed_packet = false; }; /// Totals source from RemoteQueryExecutor. diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index b08f2002f64..61a512bcfc5 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -488,6 +488,17 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync() if (was_cancelled) return ReadResult(Block()); + if (has_postponed_packet) + { + has_postponed_packet = false; + auto read_result = processPacket(read_context->getPacket()); + if (read_result.getType() == ReadResult::Type::Data || read_result.getType() == ReadResult::Type::ParallelReplicasToken) + return read_result; + + if (got_duplicated_part_uuids) + break; + } + read_context->resume(); if (isReplicaUnavailable() || needToSkipUnavailableShard()) @@ -508,10 +519,9 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync() if (read_context->isInProgress()) return ReadResult(read_context->getFileDescriptor()); - auto anything = processPacket(read_context->getPacket()); - - if (anything.getType() == ReadResult::Type::Data || anything.getType() == ReadResult::Type::ParallelReplicasToken) - return anything; + auto read_result = processPacket(read_context->getPacket()); + if (read_result.getType() == ReadResult::Type::Data || read_result.getType() == ReadResult::Type::ParallelReplicasToken) + return read_result; if (got_duplicated_part_uuids) break; @@ -914,4 +924,37 @@ bool RemoteQueryExecutor::needToSkipUnavailableShard() const return context->getSettingsRef().skip_unavailable_shards && (0 == connections->size()); } +bool RemoteQueryExecutor::processParallelReplicaPacketIfAny() +{ +#if defined(OS_LINUX) + + std::lock_guard lock(was_cancelled_mutex); + if (was_cancelled) + return false; + + if (!read_context || (resent_query && recreate_read_context)) + { + read_context = std::make_unique(*this); + recreate_read_context = false; + } + + chassert(!has_postponed_packet); + + read_context->resume(); + if (read_context->isInProgress()) // <- nothing to process + return false; + + const auto packet_type = read_context->getPacketType(); + if (packet_type == Protocol::Server::MergeTreeReadTaskRequest || packet_type == Protocol::Server::MergeTreeAllRangesAnnouncement) + { + processPacket(read_context->getPacket()); + return true; + } + + has_postponed_packet = true; + +#endif + + return false; +} } diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 78e0b02c38b..83f33607dbf 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -221,6 +221,9 @@ public: bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; } + /// return true if parallel replica packet was processed + bool processParallelReplicaPacketIfAny(); + private: RemoteQueryExecutor( const String & query_, @@ -302,6 +305,8 @@ private: */ bool got_duplicated_part_uuids = false; + bool has_postponed_packet = false; + /// Parts uuids, collected from remote replicas std::vector duplicated_part_uuids; diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.h b/src/QueryPipeline/RemoteQueryExecutorReadContext.h index fbd55278f86..ef30971fdbe 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.h +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.h @@ -39,6 +39,8 @@ public: Packet getPacket() { return std::move(packet); } + UInt64 getPacketType() const { return packet.type; } + private: bool checkTimeout(bool blocking = false);