More cleanup

This commit is contained in:
Igor Nikonov 2024-06-25 16:11:04 +00:00
parent 64cfe1628f
commit 3d35d31655
3 changed files with 13 additions and 8 deletions

View File

@ -69,9 +69,13 @@ std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
chassert(reading);
ReadFromMergeTree * analyzed_merge_tree = nullptr;
ReadFromMergeTree::AnalysisResultPtr analyzed_result_ptr;
if (analyzed_read_from_merge_tree.get())
analyzed_merge_tree = typeid_cast<ReadFromMergeTree *>(analyzed_read_from_merge_tree.get());
{
auto * analyzed_merge_tree = typeid_cast<ReadFromMergeTree *>(analyzed_read_from_merge_tree.get());
if (analyzed_merge_tree)
analyzed_result_ptr = analyzed_merge_tree->getAnalyzedResult();
}
MergeTreeAllRangesCallback all_ranges_cb
= [coordinator](InitialAllRangesAnnouncement announcement) { coordinator->handleInitialAllRangesAnnouncement(announcement); };
@ -80,7 +84,7 @@ std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
{ return coordinator->handleRequest(std::move(req)); };
auto read_from_merge_tree_parallel_replicas
= reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, all_ranges_cb, read_task_cb);
= reading->createLocalParallelReplicasReadingStep(analyzed_result_ptr, all_ranges_cb, read_task_cb);
node->step = std::move(read_from_merge_tree_parallel_replicas);
addConvertingActions(*query_plan, header, /*has_missing_objects=*/false);

View File

@ -337,11 +337,12 @@ ReadFromMergeTree::ReadFromMergeTree(
}
std::unique_ptr<ReadFromMergeTree> ReadFromMergeTree::createLocalParallelReplicasReadingStep(
const ReadFromMergeTree * analyzed_merge_tree,
AnalysisResultPtr analyzed_result_ptr_,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_,
std::optional<MergeTreeReadTaskCallback> read_task_callback_)
{
const auto number_of_local_replica = 0;
const bool enable_parallel_reading = true;
return std::make_unique<ReadFromMergeTree>(
prepared_parts,
alter_conversions_for_parts,
@ -354,8 +355,8 @@ std::unique_ptr<ReadFromMergeTree> ReadFromMergeTree::createLocalParallelReplica
requested_num_streams,
max_block_numbers_to_read,
log,
(analyzed_merge_tree ? analyzed_merge_tree->analyzed_result_ptr : nullptr),
true,
analyzed_result_ptr_,
enable_parallel_reading,
all_ranges_callback_,
read_task_callback_,
number_of_local_replica);

View File

@ -127,9 +127,9 @@ public:
std::optional<size_t> number_of_current_replica_ = std::nullopt);
std::unique_ptr<ReadFromMergeTree> createLocalParallelReplicasReadingStep(
const ReadFromMergeTree * analyzed_merge_tree,
AnalysisResultPtr analyzed_result_ptr_,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_,
std::optional<MergeTreeReadTaskCallback> read_task_callback);
std::optional<MergeTreeReadTaskCallback> read_task_callback_);
static constexpr auto name = "ReadFromMergeTree";
String getName() const override { return name; }