Merge pull request #9997 from azat/dist-on-dist-fixes

Fix Distributed-over-Distributed with one only shard in nested table
This commit is contained in:
alexey-milovidov 2020-04-08 03:45:29 +03:00 committed by GitHub
commit 09a397a68f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 247 additions and 36 deletions

View File

@ -510,7 +510,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl(bool try_move_to_prewhere)
}
if (storage && !options.only_analyze)
from_stage = storage->getQueryProcessingStage(*context, query_ptr);
from_stage = storage->getQueryProcessingStage(*context, options.to_stage, query_ptr);
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
bool first_stage = from_stage < QueryProcessingStage::WithMergeableState

View File

@ -218,9 +218,18 @@ public:
*
* SelectQueryInfo is required since the stage can depends on the query
* (see Distributed() engine and optimize_skip_unused_shards).
*
* QueryProcessingStage::Enum required for Distributed over Distributed,
* since it cannot return Complete for intermediate queries never.
*/
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const { return getQueryProcessingStage(context, {}); }
virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &, const ASTPtr &) const { return QueryProcessingStage::FetchColumns; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const
{
return getQueryProcessingStage(context, QueryProcessingStage::Complete, {});
}
virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const
{
return QueryProcessingStage::FetchColumns;
}
/** Watch live changes to the table.
* Accepts a list of columns to read, as well as a description of the query,

View File

@ -26,7 +26,7 @@ public:
return std::make_shared<StorageBlocks>(table_id, columns, std::move(pipes), to_stage);
}
std::string getName() const override { return "Blocks"; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, const ASTPtr &) const override { return to_stage; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override { return to_stage; }
Pipes read(
const Names & /*column_names*/,

View File

@ -135,7 +135,7 @@ private:
};
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
{
if (destination_id)
{
@ -144,7 +144,7 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
return destination->getQueryProcessingStage(context, query_ptr);
return destination->getQueryProcessingStage(context, to_stage, query_ptr);
}
return QueryProcessingStage::FetchColumns;

View File

@ -54,7 +54,7 @@ public:
std::string getName() const override { return "Buffer"; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ASTPtr &) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override;
Pipes read(
const Names & column_names,

View File

@ -242,6 +242,24 @@ void replaceConstantExpressions(ASTPtr & node, const Context & context, const Na
visitor.visit(node);
}
QueryProcessingStage::Enum getQueryProcessingStageImpl(const Context & context, QueryProcessingStage::Enum to_stage, const ClusterPtr & cluster)
{
const Settings & settings = context.getSettingsRef();
size_t num_local_shards = cluster->getLocalShardCount();
size_t num_remote_shards = cluster->getRemoteShardCount();
size_t result_size = (num_remote_shards * settings.max_parallel_replicas) + num_local_shards;
if (settings.distributed_group_by_no_merge)
return QueryProcessingStage::Complete;
/// Nested distributed query cannot return Complete stage,
/// since the parent query need to aggregate the results after.
if (to_stage == QueryProcessingStage::WithMergeableState)
return QueryProcessingStage::WithMergeableState;
return result_size == 1 ? QueryProcessingStage::Complete
: QueryProcessingStage::WithMergeableState;
}
}
@ -360,25 +378,10 @@ StoragePtr StorageDistributed::createWithOwnCluster(
}
static QueryProcessingStage::Enum getQueryProcessingStageImpl(const Context & context, const ClusterPtr & cluster)
{
const Settings & settings = context.getSettingsRef();
size_t num_local_shards = cluster->getLocalShardCount();
size_t num_remote_shards = cluster->getRemoteShardCount();
size_t result_size = (num_remote_shards * settings.max_parallel_replicas) + num_local_shards;
if (settings.distributed_group_by_no_merge)
return QueryProcessingStage::Complete;
else /// Normal mode.
return result_size == 1 ? QueryProcessingStage::Complete
: QueryProcessingStage::WithMergeableState;
}
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr &query_ptr) const
{
auto cluster = getOptimizedCluster(context, query_ptr);
return getQueryProcessingStageImpl(context, cluster);
return getQueryProcessingStageImpl(context, to_stage, cluster);
}
Pipes StorageDistributed::read(

View File

@ -67,7 +67,7 @@ public:
bool isRemote() const override { return true; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ASTPtr &) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override;
Pipes read(
const Names & column_names,

View File

@ -171,9 +171,9 @@ StorageInMemoryMetadata StorageMaterializedView::getInMemoryMetadata() const
return result;
}
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr &query_ptr) const
{
return getTargetTable()->getQueryProcessingStage(context, query_ptr);
return getTargetTable()->getQueryProcessingStage(context, to_stage, query_ptr);
}
Pipes StorageMaterializedView::read(

View File

@ -59,7 +59,7 @@ public:
void checkTableCanBeDropped() const override;
void checkPartitionCanBeDropped(const ASTPtr & partition) override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ASTPtr &) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override;
StoragePtr getTargetTable() const;
StoragePtr tryGetTargetTable() const;

View File

@ -136,7 +136,7 @@ bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, cons
}
QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const
QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
{
auto stage_in_source_tables = QueryProcessingStage::FetchColumns;
@ -150,7 +150,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
if (table.get() != this)
{
++selected_table_size;
stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context, query_ptr));
stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context, to_stage, query_ptr));
}
iterator->next();
@ -287,7 +287,8 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
return pipes;
}
if (processed_stage <= storage->getQueryProcessingStage(*modified_context, query_info.query))
auto storage_stage = storage->getQueryProcessingStage(*modified_context, QueryProcessingStage::Complete, query_info.query);
if (processed_stage <= storage_stage)
{
/// If there are only virtual columns in query, you must request at least one other column.
if (real_column_names.empty())
@ -295,7 +296,7 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
pipes = storage->read(real_column_names, modified_query_info, *modified_context, processed_stage, max_block_size, UInt32(streams_num));
}
else if (processed_stage > storage->getQueryProcessingStage(*modified_context, query_info.query))
else if (processed_stage > storage_stage)
{
modified_query_info.query->as<ASTSelectQuery>()->replaceDatabaseAndTable(source_database, table_name);

View File

@ -31,7 +31,7 @@ public:
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, const ASTPtr &) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override;
Pipes read(
const Names & column_names,

View File

@ -0,0 +1,18 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,35 @@
<yandex>
<profiles>
<default>
<connections_with_failover_max_tries>3</connections_with_failover_max_tries>
<connect_timeout_with_failover_ms>1000</connect_timeout_with_failover_ms>
<min_insert_block_size_rows>1</min_insert_block_size_rows>
</default>
<delays>
<connections_with_failover_max_tries>5</connections_with_failover_max_tries>
<connect_timeout_with_failover_ms>3000</connect_timeout_with_failover_ms>
<min_insert_block_size_rows>1</min_insert_block_size_rows>
</delays>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
<ready_to_wait>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>delays</profile>
<quota>default</quota>
</ready_to_wait>
</users>
<quotas><default></default></quotas>
</yandex>

View File

@ -0,0 +1,98 @@
# This test is a subset of the 01223_dist_on_dist.
# (just in case, with real separate instances).
from __future__ import print_function
import itertools
import timeit
import logging
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
NODES = {'node' + str(i): cluster.add_instance(
'node' + str(i),
main_configs=['configs/remote_servers.xml'],
user_configs=['configs/set_distributed_defaults.xml'],
) for i in (1, 2)}
CREATE_TABLES_SQL = '''
CREATE TABLE
base_table(
node String,
key Int32,
value Int32
)
ENGINE = Memory;
CREATE TABLE
distributed_table
AS base_table
ENGINE = Distributed(test_cluster, default, base_table);
CREATE TABLE
distributed_over_distributed_table
AS distributed_table
ENGINE = Distributed('test_cluster', default, distributed_table);
'''
INSERT_SQL_TEMPLATE = "INSERT INTO base_table VALUES ('{node_id}', {key}, {value})"
@pytest.fixture(scope="session")
def started_cluster():
try:
cluster.start()
for node_index, (node_name, node) in enumerate(NODES.items()):
node.query(CREATE_TABLES_SQL)
for i in range(0, 2):
node.query(INSERT_SQL_TEMPLATE.format(node_id=node_name, key=i, value=i + (node_index * 10)))
yield cluster
finally:
cluster.shutdown()
@pytest.mark.parametrize("node", NODES.values())
@pytest.mark.parametrize("source", ["distributed_over_distributed_table", "cluster('test_cluster', default, distributed_table)"])
class TestDistributedOverDistributedSuite:
def test_select_with_order_by_node(self, started_cluster, node, source):
assert node.query("SELECT * FROM {source} ORDER BY node, key".format(source=source)) \
== """node1 0 0
node1 0 0
node1 1 1
node1 1 1
node2 0 10
node2 0 10
node2 1 11
node2 1 11
"""
def test_select_with_order_by_key(self, started_cluster, node, source):
assert node.query("SELECT * FROM {source} ORDER BY key, node".format(source=source)) \
== """node1 0 0
node1 0 0
node2 0 10
node2 0 10
node1 1 1
node1 1 1
node2 1 11
node2 1 11
"""
def test_select_with_group_by_node(self, started_cluster, node, source):
assert node.query("SELECT node, SUM(value) FROM {source} GROUP BY node ORDER BY node".format(source=source)) \
== "node1 2\nnode2 42\n"
def test_select_with_group_by_key(self, started_cluster, node, source):
assert node.query("SELECT key, SUM(value) FROM {source} GROUP BY key ORDER BY key".format(source=source)) \
== "0 20\n1 24\n"
def test_select_sum(self, started_cluster, node, source):
assert node.query("SELECT SUM(value) FROM {source}".format(source=source)) \
== "44\n"

View File

@ -82,3 +82,20 @@ GROUP BY ORDER BY distributed_aggregation_memory_efficient/group_by_two_level_th
0
1
2
COUNT
132
distributed_group_by_no_merge
33
33
33
33
only one shard in nested
66
distributed_group_by_no_merge
33
33
merge()
66
distributed_group_by_no_merge
33
33

View File

@ -1,6 +1,11 @@
create table if not exists data_01223 (key Int) Engine=Memory();
create table if not exists dist_layer_01223 as data_01223 Engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01223);
create table if not exists dist_01223 as data_01223 Engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01223);
drop table if exists merge_dist_01223;
drop table if exists dist_01223;
drop table if exists dist_layer_01223;
drop table if exists data_01223;
create table data_01223 (key Int) Engine=Memory();
create table dist_layer_01223 as data_01223 Engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01223);
create table dist_01223 as data_01223 Engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01223);
select * from dist_01223;
@ -53,6 +58,31 @@ group_by_two_level_threshold=1,
group_by_two_level_threshold_bytes=1,
distributed_aggregation_memory_efficient=1;
select 'COUNT';
select count() from dist_01223;
select 'distributed_group_by_no_merge';
select count() from dist_01223 settings distributed_group_by_no_merge=1;
drop table dist_01223;
drop table dist_layer_01223;
-- only one shard in nested
select 'only one shard in nested';
create table dist_layer_01223 as data_01223 Engine=Distributed(test_shard_localhost, currentDatabase(), data_01223);
create table dist_01223 as data_01223 Engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01223);
select count() from dist_01223;
select 'distributed_group_by_no_merge';
select count() from dist_01223 settings distributed_group_by_no_merge=1;
-- wrap with merge()
select 'merge()';
create table merge_dist_01223 as dist_01223 engine=Merge(currentDatabase(), 'dist_01223');
select count() from merge_dist_01223;
select 'distributed_group_by_no_merge';
select count() from merge_dist_01223 settings distributed_group_by_no_merge=1;
drop table merge_dist_01223;
drop table dist_01223;
drop table dist_layer_01223;
drop table data_01223;