From b441e8a40883606ae745b87dc0d93fad36d1fe5a Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Mon, 6 Jan 2020 07:25:04 +0300 Subject: [PATCH 1/6] Integration test for Distributed over Distributed (from #8640) --- .../__init__.py | 0 .../configs/remote_servers.xml | 18 ++++ .../configs/set_distributed_defaults.xml | 35 +++++++ .../test_distributed_over_distributed/test.py | 98 +++++++++++++++++++ 4 files changed, 151 insertions(+) create mode 100644 tests/integration/test_distributed_over_distributed/__init__.py create mode 100644 tests/integration/test_distributed_over_distributed/configs/remote_servers.xml create mode 100644 tests/integration/test_distributed_over_distributed/configs/set_distributed_defaults.xml create mode 100644 tests/integration/test_distributed_over_distributed/test.py diff --git a/tests/integration/test_distributed_over_distributed/__init__.py b/tests/integration/test_distributed_over_distributed/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_distributed_over_distributed/configs/remote_servers.xml b/tests/integration/test_distributed_over_distributed/configs/remote_servers.xml new file mode 100644 index 00000000000..ebce4697529 --- /dev/null +++ b/tests/integration/test_distributed_over_distributed/configs/remote_servers.xml @@ -0,0 +1,18 @@ + + + + + + node1 + 9000 + + + + + node2 + 9000 + + + + + diff --git a/tests/integration/test_distributed_over_distributed/configs/set_distributed_defaults.xml b/tests/integration/test_distributed_over_distributed/configs/set_distributed_defaults.xml new file mode 100644 index 00000000000..194eb1ebb87 --- /dev/null +++ b/tests/integration/test_distributed_over_distributed/configs/set_distributed_defaults.xml @@ -0,0 +1,35 @@ + + + + 3 + 1000 + 1 + + + 5 + 3000 + 1 + + + + + + + + ::/0 + + default + default + + + + + ::/0 + + delays + default + + + + + diff --git a/tests/integration/test_distributed_over_distributed/test.py b/tests/integration/test_distributed_over_distributed/test.py new file mode 100644 index 00000000000..31d6de55bea --- /dev/null +++ b/tests/integration/test_distributed_over_distributed/test.py @@ -0,0 +1,98 @@ +# This test is a subset of the 01223_dist_on_dist. +# (just in case, with real separate instances). + +from __future__ import print_function + +import itertools +import timeit +import logging + +import pytest + +from helpers.cluster import ClickHouseCluster +from helpers.network import PartitionManager +from helpers.test_tools import TSV + + +cluster = ClickHouseCluster(__file__) + +NODES = {'node' + str(i): cluster.add_instance( + 'node' + str(i), + main_configs=['configs/remote_servers.xml'], + user_configs=['configs/set_distributed_defaults.xml'], +) for i in (1, 2)} + +CREATE_TABLES_SQL = ''' +CREATE TABLE + base_table( + node String, + key Int32, + value Int32 + ) +ENGINE = Memory; + +CREATE TABLE + distributed_table +AS base_table +ENGINE = Distributed(test_cluster, default, base_table); + +CREATE TABLE + distributed_over_distributed_table +AS distributed_table +ENGINE = Distributed('test_cluster', default, distributed_table); +''' + +INSERT_SQL_TEMPLATE = "INSERT INTO base_table VALUES ('{node_id}', {key}, {value})" + +@pytest.fixture(scope="session") +def started_cluster(): + try: + cluster.start() + for node_index, (node_name, node) in enumerate(NODES.items()): + node.query(CREATE_TABLES_SQL) + for i in range(0, 2): + node.query(INSERT_SQL_TEMPLATE.format(node_id=node_name, key=i, value=i + (node_index * 10))) + yield cluster + + finally: + cluster.shutdown() + + +@pytest.mark.parametrize("node", NODES.values()) +@pytest.mark.parametrize("source", ["distributed_over_distributed_table", "cluster('test_cluster', default, distributed_table)"]) +class TestDistributedOverDistributedSuite: + def test_select_with_order_by_node(self, started_cluster, node, source): + assert node.query("SELECT * FROM {source} ORDER BY node, key".format(source=source)) \ + == """node1 0 0 +node1 0 0 +node1 1 1 +node1 1 1 +node2 0 10 +node2 0 10 +node2 1 11 +node2 1 11 +""" + + def test_select_with_order_by_key(self, started_cluster, node, source): + assert node.query("SELECT * FROM {source} ORDER BY key, node".format(source=source)) \ + == """node1 0 0 +node1 0 0 +node2 0 10 +node2 0 10 +node1 1 1 +node1 1 1 +node2 1 11 +node2 1 11 +""" + + def test_select_with_group_by_node(self, started_cluster, node, source): + assert node.query("SELECT node, SUM(value) FROM {source} GROUP BY node ORDER BY node".format(source=source)) \ + == "node1 2\nnode2 42\n" + + def test_select_with_group_by_key(self, started_cluster, node, source): + assert node.query("SELECT key, SUM(value) FROM {source} GROUP BY key ORDER BY key".format(source=source)) \ + == "0 20\n1 24\n" + + def test_select_sum(self, started_cluster, node, source): + assert node.query("SELECT SUM(value) FROM {source}".format(source=source)) \ + == "44\n" From 1777e2fd6b5b0df5e395eeff589c86261676e8ff Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 1 Apr 2020 21:38:01 +0300 Subject: [PATCH 2/6] Regression for Distributed-over-Distributed when nested table has only one shard --- .../0_stateless/01223_dist_on_dist.reference | 17 +++++++++ .../0_stateless/01223_dist_on_dist.sql | 36 +++++++++++++++++-- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/tests/queries/0_stateless/01223_dist_on_dist.reference b/tests/queries/0_stateless/01223_dist_on_dist.reference index 7ca613f70fc..4a5dd8f316c 100644 --- a/tests/queries/0_stateless/01223_dist_on_dist.reference +++ b/tests/queries/0_stateless/01223_dist_on_dist.reference @@ -82,3 +82,20 @@ GROUP BY ORDER BY distributed_aggregation_memory_efficient/group_by_two_level_th 0 1 2 +COUNT +132 +distributed_group_by_no_merge +33 +33 +33 +33 +only one shard in nested +66 +distributed_group_by_no_merge +33 +33 +merge() +66 +distributed_group_by_no_merge +33 +33 diff --git a/tests/queries/0_stateless/01223_dist_on_dist.sql b/tests/queries/0_stateless/01223_dist_on_dist.sql index 1c239d6c666..1b9175f622e 100644 --- a/tests/queries/0_stateless/01223_dist_on_dist.sql +++ b/tests/queries/0_stateless/01223_dist_on_dist.sql @@ -1,6 +1,11 @@ -create table if not exists data_01223 (key Int) Engine=Memory(); -create table if not exists dist_layer_01223 as data_01223 Engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01223); -create table if not exists dist_01223 as data_01223 Engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01223); +drop table if exists merge_dist_01223; +drop table if exists dist_01223; +drop table if exists dist_layer_01223; +drop table if exists data_01223; + +create table data_01223 (key Int) Engine=Memory(); +create table dist_layer_01223 as data_01223 Engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01223); +create table dist_01223 as data_01223 Engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01223); select * from dist_01223; @@ -53,6 +58,31 @@ group_by_two_level_threshold=1, group_by_two_level_threshold_bytes=1, distributed_aggregation_memory_efficient=1; +select 'COUNT'; +select count() from dist_01223; +select 'distributed_group_by_no_merge'; +select count() from dist_01223 settings distributed_group_by_no_merge=1; + +drop table dist_01223; +drop table dist_layer_01223; + +-- only one shard in nested +select 'only one shard in nested'; +create table dist_layer_01223 as data_01223 Engine=Distributed(test_shard_localhost, currentDatabase(), data_01223); +create table dist_01223 as data_01223 Engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01223); +select count() from dist_01223; + +select 'distributed_group_by_no_merge'; +select count() from dist_01223 settings distributed_group_by_no_merge=1; + +-- wrap with merge() +select 'merge()'; +create table merge_dist_01223 as dist_01223 engine=Merge(currentDatabase(), 'dist_01223'); +select count() from merge_dist_01223; +select 'distributed_group_by_no_merge'; +select count() from merge_dist_01223 settings distributed_group_by_no_merge=1; + +drop table merge_dist_01223; drop table dist_01223; drop table dist_layer_01223; drop table data_01223; From 8d372b0be7ecda063943e6899a078294257fc2b0 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 1 Apr 2020 21:38:01 +0300 Subject: [PATCH 3/6] Call getQueryProcessingStage() once, since it is heavy for StorageDistributed Refs: #9808 --- src/Storages/StorageMerge.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index f3322c7dfff..dbc85feef05 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -287,7 +287,8 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer return pipes; } - if (processed_stage <= storage->getQueryProcessingStage(*modified_context, query_info.query)) + auto storage_stage = storage->getQueryProcessingStage(*modified_context, query_info.query); + if (processed_stage <= storage_stage) { /// If there are only virtual columns in query, you must request at least one other column. if (real_column_names.empty()) @@ -295,7 +296,7 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer pipes = storage->read(real_column_names, modified_query_info, *modified_context, processed_stage, max_block_size, UInt32(streams_num)); } - else if (processed_stage > storage->getQueryProcessingStage(*modified_context, query_info.query)) + else if (processed_stage > storage_stage) { modified_query_info.query->as()->replaceDatabaseAndTable(source_database, table_name); From 1232760f7836083121a53434cf19665987c16f71 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 1 Apr 2020 21:38:01 +0300 Subject: [PATCH 4/6] Fix Distributed-over-Distributed when nested table has only one shard --- src/Interpreters/InterpreterSelectQuery.cpp | 2 +- src/Storages/IStorage.h | 13 ++++++-- src/Storages/LiveView/StorageBlocks.h | 2 +- src/Storages/StorageBuffer.cpp | 4 +-- src/Storages/StorageBuffer.h | 2 +- src/Storages/StorageDistributed.cpp | 37 +++++++++++---------- src/Storages/StorageDistributed.h | 2 +- src/Storages/StorageMaterializedView.cpp | 4 +-- src/Storages/StorageMaterializedView.h | 2 +- src/Storages/StorageMerge.cpp | 6 ++-- src/Storages/StorageMerge.h | 2 +- 11 files changed, 44 insertions(+), 32 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 514efb90a00..47079a4732c 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -510,7 +510,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl(bool try_move_to_prewhere) } if (storage && !options.only_analyze) - from_stage = storage->getQueryProcessingStage(*context, query_ptr); + from_stage = storage->getQueryProcessingStage(*context, options.to_stage, query_ptr); /// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing. bool first_stage = from_stage < QueryProcessingStage::WithMergeableState diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index d3cede6e5c8..4cdfb3b29a3 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -218,9 +218,18 @@ public: * * SelectQueryInfo is required since the stage can depends on the query * (see Distributed() engine and optimize_skip_unused_shards). + * + * QueryProcessingStage::Enum required for Distributed over Distributed, + * since it cannot return Complete for intermediate queries never. */ - QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const { return getQueryProcessingStage(context, {}); } - virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &, const ASTPtr &) const { return QueryProcessingStage::FetchColumns; } + QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const + { + return getQueryProcessingStage(context, QueryProcessingStage::Complete, {}); + } + virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const + { + return QueryProcessingStage::FetchColumns; + } /** Watch live changes to the table. * Accepts a list of columns to read, as well as a description of the query, diff --git a/src/Storages/LiveView/StorageBlocks.h b/src/Storages/LiveView/StorageBlocks.h index fd856e27718..a21a9374137 100644 --- a/src/Storages/LiveView/StorageBlocks.h +++ b/src/Storages/LiveView/StorageBlocks.h @@ -26,7 +26,7 @@ public: return std::make_shared(table_id, columns, std::move(pipes), to_stage); } std::string getName() const override { return "Blocks"; } - QueryProcessingStage::Enum getQueryProcessingStage(const Context &, const ASTPtr &) const override { return to_stage; } + QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override { return to_stage; } Pipes read( const Names & /*column_names*/, diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 7699f8379d9..9f5403fec07 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -135,7 +135,7 @@ private: }; -QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const +QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr &query_ptr) const { if (destination_id) { @@ -144,7 +144,7 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context if (destination.get() == this) throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP); - return destination->getQueryProcessingStage(context, query_ptr); + return destination->getQueryProcessingStage(context, to_stage, query_ptr); } return QueryProcessingStage::FetchColumns; diff --git a/src/Storages/StorageBuffer.h b/src/Storages/StorageBuffer.h index 7a3d907ae76..93f95692b18 100644 --- a/src/Storages/StorageBuffer.h +++ b/src/Storages/StorageBuffer.h @@ -54,7 +54,7 @@ public: std::string getName() const override { return "Buffer"; } - QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ASTPtr &) const override; + QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override; Pipes read( const Names & column_names, diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index b4375dd5b0a..410aee748f7 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -242,6 +242,24 @@ void replaceConstantExpressions(ASTPtr & node, const Context & context, const Na visitor.visit(node); } +QueryProcessingStage::Enum getQueryProcessingStageImpl(const Context & context, QueryProcessingStage::Enum to_stage, const ClusterPtr & cluster) +{ + const Settings & settings = context.getSettingsRef(); + + size_t num_local_shards = cluster->getLocalShardCount(); + size_t num_remote_shards = cluster->getRemoteShardCount(); + size_t result_size = (num_remote_shards * settings.max_parallel_replicas) + num_local_shards; + + if (settings.distributed_group_by_no_merge) + return QueryProcessingStage::Complete; + /// Nested distributed query cannot return Complete stage, + /// since the parent query need to aggregate the results after. + if (to_stage == QueryProcessingStage::WithMergeableState) + return QueryProcessingStage::WithMergeableState; + return result_size == 1 ? QueryProcessingStage::Complete + : QueryProcessingStage::WithMergeableState; +} + } @@ -360,25 +378,10 @@ StoragePtr StorageDistributed::createWithOwnCluster( } -static QueryProcessingStage::Enum getQueryProcessingStageImpl(const Context & context, const ClusterPtr & cluster) -{ - const Settings & settings = context.getSettingsRef(); - - size_t num_local_shards = cluster->getLocalShardCount(); - size_t num_remote_shards = cluster->getRemoteShardCount(); - size_t result_size = (num_remote_shards * settings.max_parallel_replicas) + num_local_shards; - - if (settings.distributed_group_by_no_merge) - return QueryProcessingStage::Complete; - else /// Normal mode. - return result_size == 1 ? QueryProcessingStage::Complete - : QueryProcessingStage::WithMergeableState; -} - -QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const +QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr &query_ptr) const { auto cluster = getOptimizedCluster(context, query_ptr); - return getQueryProcessingStageImpl(context, cluster); + return getQueryProcessingStageImpl(context, to_stage, cluster); } Pipes StorageDistributed::read( diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index e12831709f7..81c6b54a63e 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -67,7 +67,7 @@ public: bool isRemote() const override { return true; } - QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ASTPtr &) const override; + QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override; Pipes read( const Names & column_names, diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index 3fb25bf8275..d974367bb93 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -171,9 +171,9 @@ StorageInMemoryMetadata StorageMaterializedView::getInMemoryMetadata() const return result; } -QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const +QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr &query_ptr) const { - return getTargetTable()->getQueryProcessingStage(context, query_ptr); + return getTargetTable()->getQueryProcessingStage(context, to_stage, query_ptr); } Pipes StorageMaterializedView::read( diff --git a/src/Storages/StorageMaterializedView.h b/src/Storages/StorageMaterializedView.h index 6284f791f4f..b4cdabe4af2 100644 --- a/src/Storages/StorageMaterializedView.h +++ b/src/Storages/StorageMaterializedView.h @@ -59,7 +59,7 @@ public: void checkTableCanBeDropped() const override; void checkPartitionCanBeDropped(const ASTPtr & partition) override; - QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ASTPtr &) const override; + QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override; StoragePtr getTargetTable() const; StoragePtr tryGetTargetTable() const; diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index dbc85feef05..abab85bd2b6 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -136,7 +136,7 @@ bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, cons } -QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const +QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr &query_ptr) const { auto stage_in_source_tables = QueryProcessingStage::FetchColumns; @@ -150,7 +150,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & if (table.get() != this) { ++selected_table_size; - stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context, query_ptr)); + stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context, to_stage, query_ptr)); } iterator->next(); @@ -287,7 +287,7 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer return pipes; } - auto storage_stage = storage->getQueryProcessingStage(*modified_context, query_info.query); + auto storage_stage = storage->getQueryProcessingStage(*modified_context, QueryProcessingStage::Complete, query_info.query); if (processed_stage <= storage_stage) { /// If there are only virtual columns in query, you must request at least one other column. diff --git a/src/Storages/StorageMerge.h b/src/Storages/StorageMerge.h index 1d2df3cb9ce..b846663bc54 100644 --- a/src/Storages/StorageMerge.h +++ b/src/Storages/StorageMerge.h @@ -31,7 +31,7 @@ public: NameAndTypePair getColumn(const String & column_name) const override; bool hasColumn(const String & column_name) const override; - QueryProcessingStage::Enum getQueryProcessingStage(const Context &, const ASTPtr &) const override; + QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override; Pipes read( const Names & column_names, From 1b704425b6fce6af25aff753d8d0394ec0255587 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Wed, 8 Apr 2020 03:42:47 +0300 Subject: [PATCH 5/6] Update StorageBuffer.cpp --- src/Storages/StorageBuffer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 9f5403fec07..ede162a180a 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -135,7 +135,7 @@ private: }; -QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr &query_ptr) const +QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const { if (destination_id) { From ebe9ae4fabe3af09b44e7a4b8cfea5c005d78b59 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Wed, 8 Apr 2020 03:45:11 +0300 Subject: [PATCH 6/6] Update StorageMerge.cpp --- src/Storages/StorageMerge.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index abab85bd2b6..06c80a613c2 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -136,7 +136,7 @@ bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, cons } -QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr &query_ptr) const +QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const { auto stage_in_source_tables = QueryProcessingStage::FetchColumns;