mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Merge pull request #66618 from ClickHouse/pr-protocol-handling-with-priority
Respond to parallel replicas protocol requests with priority on initiator
This commit is contained in:
commit
b8cdae8542
@ -204,6 +204,8 @@ void ExecutorTasks::processAsyncTasks()
|
||||
while (auto task = async_task_queue.wait(lock))
|
||||
{
|
||||
auto * node = static_cast<ExecutingGraph::Node *>(task.data);
|
||||
node->processor->onAsyncJobReady();
|
||||
|
||||
executor_contexts[task.thread_num]->pushAsyncTask(node);
|
||||
++num_waiting_async_tasks;
|
||||
|
||||
|
@ -28,7 +28,7 @@ class ExecutorTasks
|
||||
TaskQueue<ExecutingGraph::Node> task_queue;
|
||||
|
||||
/// Queue which stores tasks where processors returned Async status after prepare.
|
||||
/// If multiple threads are using, main thread will wait for async tasks.
|
||||
/// If multiple threads are used, main thread will wait for async tasks.
|
||||
/// For single thread, will wait for async tasks only when task_queue is empty.
|
||||
PollingQueue async_task_queue;
|
||||
|
||||
|
@ -9,7 +9,6 @@
|
||||
|
||||
#include <deque>
|
||||
#include <queue>
|
||||
#include <mutex>
|
||||
#include <memory>
|
||||
|
||||
|
||||
|
@ -221,6 +221,23 @@ public:
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'schedule' is not implemented for {} processor", getName());
|
||||
}
|
||||
|
||||
/* The method is called right after asynchronous job is done
|
||||
* i.e. when file descriptor returned by schedule() is readable.
|
||||
* The sequence of method calls:
|
||||
* ... prepare() -> schedule() -> onAsyncJobReady() -> work() ...
|
||||
* See also comment to schedule() method
|
||||
*
|
||||
* It allows doing some preprocessing immediately after asynchronous job is done.
|
||||
* The implementation should return control quickly, to avoid blocking another asynchronous completed jobs
|
||||
* created by the same pipeline.
|
||||
*
|
||||
* Example, scheduling tasks for remote workers (file descriptor in this case is a socket)
|
||||
* When the remote worker asks for the next task, doing it in onAsyncJobReady() we can provide it immediately.
|
||||
* Otherwise, the returning of the next task for the remote worker can be delayed by current work done in the pipeline
|
||||
* (by other processors), which will create unnecessary latency in query processing by remote workers
|
||||
*/
|
||||
virtual void onAsyncJobReady() {}
|
||||
|
||||
/** You must call this method if 'prepare' returned ExpandPipeline.
|
||||
* This method cannot access any port, but it can create new ports for current processor.
|
||||
*
|
||||
|
@ -98,9 +98,29 @@ void RemoteSource::work()
|
||||
executor_finished = true;
|
||||
return;
|
||||
}
|
||||
|
||||
if (preprocessed_packet)
|
||||
{
|
||||
preprocessed_packet = false;
|
||||
return;
|
||||
}
|
||||
|
||||
ISource::work();
|
||||
}
|
||||
|
||||
void RemoteSource::onAsyncJobReady()
|
||||
{
|
||||
chassert(async_read);
|
||||
|
||||
if (!was_query_sent)
|
||||
return;
|
||||
|
||||
chassert(!preprocessed_packet);
|
||||
preprocessed_packet = query_executor->processParallelReplicaPacketIfAny();
|
||||
if (preprocessed_packet)
|
||||
is_async_state = false;
|
||||
}
|
||||
|
||||
std::optional<Chunk> RemoteSource::tryGenerate()
|
||||
{
|
||||
/// onCancel() will do the cancel if the query was sent.
|
||||
|
@ -32,6 +32,8 @@ public:
|
||||
|
||||
int schedule() override { return fd; }
|
||||
|
||||
void onAsyncJobReady() override;
|
||||
|
||||
void setStorageLimits(const std::shared_ptr<const StorageLimitsList> & storage_limits_) override;
|
||||
|
||||
protected:
|
||||
@ -52,6 +54,7 @@ private:
|
||||
int fd = -1;
|
||||
size_t rows = 0;
|
||||
bool manually_add_rows_before_limit_counter = false;
|
||||
bool preprocessed_packet = false;
|
||||
};
|
||||
|
||||
/// Totals source from RemoteQueryExecutor.
|
||||
|
@ -488,6 +488,17 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
|
||||
if (was_cancelled)
|
||||
return ReadResult(Block());
|
||||
|
||||
if (has_postponed_packet)
|
||||
{
|
||||
has_postponed_packet = false;
|
||||
auto read_result = processPacket(read_context->getPacket());
|
||||
if (read_result.getType() == ReadResult::Type::Data || read_result.getType() == ReadResult::Type::ParallelReplicasToken)
|
||||
return read_result;
|
||||
|
||||
if (got_duplicated_part_uuids)
|
||||
break;
|
||||
}
|
||||
|
||||
read_context->resume();
|
||||
|
||||
if (isReplicaUnavailable() || needToSkipUnavailableShard())
|
||||
@ -508,10 +519,9 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
|
||||
if (read_context->isInProgress())
|
||||
return ReadResult(read_context->getFileDescriptor());
|
||||
|
||||
auto anything = processPacket(read_context->getPacket());
|
||||
|
||||
if (anything.getType() == ReadResult::Type::Data || anything.getType() == ReadResult::Type::ParallelReplicasToken)
|
||||
return anything;
|
||||
auto read_result = processPacket(read_context->getPacket());
|
||||
if (read_result.getType() == ReadResult::Type::Data || read_result.getType() == ReadResult::Type::ParallelReplicasToken)
|
||||
return read_result;
|
||||
|
||||
if (got_duplicated_part_uuids)
|
||||
break;
|
||||
@ -914,4 +924,37 @@ bool RemoteQueryExecutor::needToSkipUnavailableShard() const
|
||||
return context->getSettingsRef().skip_unavailable_shards && (0 == connections->size());
|
||||
}
|
||||
|
||||
bool RemoteQueryExecutor::processParallelReplicaPacketIfAny()
|
||||
{
|
||||
#if defined(OS_LINUX)
|
||||
|
||||
std::lock_guard lock(was_cancelled_mutex);
|
||||
if (was_cancelled)
|
||||
return false;
|
||||
|
||||
if (!read_context || (resent_query && recreate_read_context))
|
||||
{
|
||||
read_context = std::make_unique<ReadContext>(*this);
|
||||
recreate_read_context = false;
|
||||
}
|
||||
|
||||
chassert(!has_postponed_packet);
|
||||
|
||||
read_context->resume();
|
||||
if (read_context->isInProgress()) // <- nothing to process
|
||||
return false;
|
||||
|
||||
const auto packet_type = read_context->getPacketType();
|
||||
if (packet_type == Protocol::Server::MergeTreeReadTaskRequest || packet_type == Protocol::Server::MergeTreeAllRangesAnnouncement)
|
||||
{
|
||||
processPacket(read_context->getPacket());
|
||||
return true;
|
||||
}
|
||||
|
||||
has_postponed_packet = true;
|
||||
|
||||
#endif
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -221,6 +221,9 @@ public:
|
||||
|
||||
bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; }
|
||||
|
||||
/// return true if parallel replica packet was processed
|
||||
bool processParallelReplicaPacketIfAny();
|
||||
|
||||
private:
|
||||
RemoteQueryExecutor(
|
||||
const String & query_,
|
||||
@ -302,6 +305,8 @@ private:
|
||||
*/
|
||||
bool got_duplicated_part_uuids = false;
|
||||
|
||||
bool has_postponed_packet = false;
|
||||
|
||||
/// Parts uuids, collected from remote replicas
|
||||
std::vector<UUID> duplicated_part_uuids;
|
||||
|
||||
|
@ -39,6 +39,8 @@ public:
|
||||
|
||||
Packet getPacket() { return std::move(packet); }
|
||||
|
||||
UInt64 getPacketType() const { return packet.type; }
|
||||
|
||||
private:
|
||||
bool checkTimeout(bool blocking = false);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user