This commit is contained in:
Igor Nikonov 2024-06-25 14:47:38 +00:00
parent 318af3af95
commit 510cb961a1
5 changed files with 6 additions and 13 deletions

View File

@ -524,8 +524,7 @@ void executeQueryWithParallelReplicas(
new_context,
processed_stage,
coordinator,
std::move(analyzed_read_from_merge_tree),
/*has_missing_objects=*/false);
std::move(analyzed_read_from_merge_tree));
DataStreams input_streams;
input_streams.reserve(2);

View File

@ -9,9 +9,6 @@
namespace DB
{
namespace
{
void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missing_objects)
{
if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header))
@ -36,8 +33,6 @@ void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missi
plan.addStep(std::move(converting));
}
}
std::unique_ptr<QueryPlan> createLocalPlan(
const ASTPtr & query_ast,
const Block & header,

View File

@ -79,9 +79,8 @@ std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
MergeTreeReadTaskCallback read_task_cb = [coordinator](ParallelReadRequest req) -> std::optional<ParallelReadResponse>
{ return coordinator->handleRequest(std::move(req)); };
const auto number_of_local_replica = 0;
auto read_from_merge_tree_parallel_replicas
= reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, all_ranges_cb, read_task_cb, number_of_local_replica);
= reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, all_ranges_cb, read_task_cb);
node->step = std::move(read_from_merge_tree_parallel_replicas);
addConvertingActions(*query_plan, header, /*has_missing_objects=*/false);

View File

@ -356,8 +356,8 @@ std::unique_ptr<ReadFromMergeTree> ReadFromMergeTree::createLocalParallelReplica
log,
(analyzed_merge_tree ? analyzed_merge_tree->analyzed_result_ptr : nullptr),
true,
all_ranges_callback,
read_task_callback,
all_ranges_callback_,
read_task_callback_,
number_of_local_replica);
}

View File

@ -123,12 +123,12 @@ public:
AnalysisResultPtr analyzed_result_ptr_,
bool enable_parallel_reading_,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_ = std::nullopt,
std::optional<MergeTreeReadTaskCallback> read_task_callback_ = std::nullopt);
std::optional<MergeTreeReadTaskCallback> read_task_callback_ = std::nullopt,
std::optional<size_t> number_of_current_replica_ = std::nullopt);
std::unique_ptr<ReadFromMergeTree> createLocalParallelReplicasReadingStep(
const ReadFromMergeTree * analyzed_merge_tree,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_,
std::optional<MergeTreeReadTaskCallback> read_task_callback);
static constexpr auto name = "ReadFromMergeTree";