Merge remote-tracking branch 'origin/master' into pr-local-plan

This commit is contained in:
Igor Nikonov 2024-07-30 16:59:07 +00:00
commit 0705bbb42b
8 changed files with 97 additions and 5 deletions

View File

@ -204,6 +204,8 @@ void ExecutorTasks::processAsyncTasks()
while (auto task = async_task_queue.wait(lock))
{
auto * node = static_cast<ExecutingGraph::Node *>(task.data);
node->processor->onAsyncJobReady();
executor_contexts[task.thread_num]->pushAsyncTask(node);
++num_waiting_async_tasks;

View File

@ -28,7 +28,7 @@ class ExecutorTasks
TaskQueue<ExecutingGraph::Node> task_queue;
/// Queue which stores tasks where processors returned Async status after prepare.
/// If multiple threads are using, main thread will wait for async tasks.
/// If multiple threads are used, main thread will wait for async tasks.
/// For single thread, will wait for async tasks only when task_queue is empty.
PollingQueue async_task_queue;

View File

@ -221,6 +221,23 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'schedule' is not implemented for {} processor", getName());
}
/* The method is called right after asynchronous job is done
* i.e. when file descriptor returned by schedule() is readable.
* The sequence of method calls:
* ... prepare() -> schedule() -> onAsyncJobReady() -> work() ...
* See also comment to schedule() method
*
* It allows doing some preprocessing immediately after asynchronous job is done.
* The implementation should return control quickly, to avoid blocking another asynchronous completed jobs
* created by the same pipeline.
*
* Example, scheduling tasks for remote workers (file descriptor in this case is a socket)
* When the remote worker asks for the next task, doing it in onAsyncJobReady() we can provide it immediately.
* Otherwise, the returning of the next task for the remote worker can be delayed by current work done in the pipeline
* (by other processors), which will create unnecessary latency in query processing by remote workers
*/
virtual void onAsyncJobReady() {}
/** You must call this method if 'prepare' returned ExpandPipeline.
* This method cannot access any port, but it can create new ports for current processor.
*

View File

@ -98,9 +98,29 @@ void RemoteSource::work()
executor_finished = true;
return;
}
if (preprocessed_packet)
{
preprocessed_packet = false;
return;
}
ISource::work();
}
void RemoteSource::onAsyncJobReady()
{
chassert(async_read);
if (!was_query_sent)
return;
chassert(!preprocessed_packet);
preprocessed_packet = query_executor->processParallelReplicaPacketIfAny();
if (preprocessed_packet)
is_async_state = false;
}
std::optional<Chunk> RemoteSource::tryGenerate()
{
/// onCancel() will do the cancel if the query was sent.

View File

@ -32,6 +32,8 @@ public:
int schedule() override { return fd; }
void onAsyncJobReady() override;
void setStorageLimits(const std::shared_ptr<const StorageLimitsList> & storage_limits_) override;
protected:
@ -52,6 +54,7 @@ private:
int fd = -1;
size_t rows = 0;
bool manually_add_rows_before_limit_counter = false;
bool preprocessed_packet = false;
};
/// Totals source from RemoteQueryExecutor.

View File

@ -488,6 +488,17 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
if (was_cancelled)
return ReadResult(Block());
if (has_postponed_packet)
{
has_postponed_packet = false;
auto read_result = processPacket(read_context->getPacket());
if (read_result.getType() == ReadResult::Type::Data || read_result.getType() == ReadResult::Type::ParallelReplicasToken)
return read_result;
if (got_duplicated_part_uuids)
break;
}
read_context->resume();
if (isReplicaUnavailable() || needToSkipUnavailableShard())
@ -508,10 +519,9 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
if (read_context->isInProgress())
return ReadResult(read_context->getFileDescriptor());
auto anything = processPacket(read_context->getPacket());
if (anything.getType() == ReadResult::Type::Data || anything.getType() == ReadResult::Type::ParallelReplicasToken)
return anything;
auto read_result = processPacket(read_context->getPacket());
if (read_result.getType() == ReadResult::Type::Data || read_result.getType() == ReadResult::Type::ParallelReplicasToken)
return read_result;
if (got_duplicated_part_uuids)
break;
@ -914,4 +924,37 @@ bool RemoteQueryExecutor::needToSkipUnavailableShard() const
return context->getSettingsRef().skip_unavailable_shards && (0 == connections->size());
}
bool RemoteQueryExecutor::processParallelReplicaPacketIfAny()
{
#if defined(OS_LINUX)
std::lock_guard lock(was_cancelled_mutex);
if (was_cancelled)
return false;
if (!read_context || (resent_query && recreate_read_context))
{
read_context = std::make_unique<ReadContext>(*this);
recreate_read_context = false;
}
chassert(!has_postponed_packet);
read_context->resume();
if (read_context->isInProgress()) // <- nothing to process
return false;
const auto packet_type = read_context->getPacketType();
if (packet_type == Protocol::Server::MergeTreeReadTaskRequest || packet_type == Protocol::Server::MergeTreeAllRangesAnnouncement)
{
processPacket(read_context->getPacket());
return true;
}
has_postponed_packet = true;
#endif
return false;
}
}

View File

@ -221,6 +221,9 @@ public:
bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; }
/// return true if parallel replica packet was processed
bool processParallelReplicaPacketIfAny();
private:
RemoteQueryExecutor(
const String & query_,
@ -302,6 +305,8 @@ private:
*/
bool got_duplicated_part_uuids = false;
bool has_postponed_packet = false;
/// Parts uuids, collected from remote replicas
std::vector<UUID> duplicated_part_uuids;

View File

@ -39,6 +39,8 @@ public:
Packet getPacket() { return std::move(packet); }
UInt64 getPacketType() const { return packet.type; }
private:
bool checkTimeout(bool blocking = false);