Init coordinator separately

This commit is contained in:
Igor Nikonov 2024-05-28 12:16:18 +00:00
parent 5cdf8d336c
commit 29346f6073
4 changed files with 35 additions and 5 deletions

View File

@ -892,16 +892,19 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
if (!node->children.empty())
node = node->children.at(0);
else
node = nullptr;
}
chassert(reading);
auto result_ptr = reading->selectRangesToRead();
UInt64 rows_to_read = result_ptr->selected_rows;
reading->setAnalyzedResult(std::move(result_ptr));
// (2) if it's ReadFromMergeTree - run index analysis and check number of rows to read
if (settings.parallel_replicas_min_number_of_rows_per_replica > 0)
{
auto result_ptr = reading->selectRangesToRead();
UInt64 rows_to_read = result_ptr->selected_rows;
if (table_expression_query_info.limit > 0 && table_expression_query_info.limit < rows_to_read)
rows_to_read = table_expression_query_info.limit;

View File

@ -268,10 +268,32 @@ std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
if (!node->children.empty())
node = node->children.at(0);
else
node = nullptr;
}
chassert(reading);
const auto * analyzed_merge_tree = typeid_cast<const ReadFromMergeTree *>(read_from_merge_tree.get());
if (!analyzed_merge_tree->hasAnalyzedResult())
analyzed_merge_tree->selectRangesToRead();
switch (analyzed_merge_tree->getReadType())
{
case ReadFromMergeTree::ReadType::Default:
coordinator->initialize(CoordinationMode::Default);
break;
case ReadFromMergeTree::ReadType::InOrder:
coordinator->initialize(CoordinationMode::WithOrder);
break;
case ReadFromMergeTree::ReadType::InReverseOrder:
coordinator->initialize(CoordinationMode::ReverseOrder);
break;
case ReadFromMergeTree::ReadType::ParallelReplicas:
chassert(false);
UNREACHABLE();
}
MergeTreeAllRangesCallback all_ranges_cb = [coordinator](InitialAllRangesAnnouncement announcement)
{
chassert(coordinator);
@ -281,7 +303,6 @@ std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
MergeTreeReadTaskCallback read_task_cb = [coordinator](ParallelReadRequest req) -> std::optional<ParallelReadResponse>
{ return coordinator->handleRequest(std::move(req)); };
const auto * analyzed_merge_tree = typeid_cast<const ReadFromMergeTree *>(read_from_merge_tree.get());
auto read_from_merge_tree_parallel_replicas = reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, true, all_ranges_cb, read_task_cb);
node->step = std::move(read_from_merge_tree_parallel_replicas);

View File

@ -203,6 +203,12 @@ public:
void applyFilters(ActionDAGNodes added_filter_nodes) override;
ReadType getReadType() const
{
chassert(analyzed_result_ptr);
return analyzed_result_ptr->read_type;
}
private:
static AnalysisResultPtr selectRangesToReadImpl(
MergeTreeData::DataPartsVector parts,

View File

@ -30,8 +30,8 @@ public:
/// needed to report total rows to read
void setProgressCallback(ProgressCallback callback);
private:
void initialize(CoordinationMode mode);
private:
std::mutex mutex;
size_t replicas_count{0};