Merge pull request #9808 from azat/optimize_skip_unused_shards-DISTINCT

Fix DISTINCT for Distributed and optimize_skip_unused_shards
This commit is contained in:
alexey-milovidov 2020-03-28 04:31:50 +03:00 committed by GitHub
commit aa9ab6bae0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 103 additions and 67 deletions

View File

@ -510,7 +510,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl(bool try_move_to_prewhere)
}
if (storage && !options.only_analyze)
from_stage = storage->getQueryProcessingStage(*context);
from_stage = storage->getQueryProcessingStage(*context, query_ptr);
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
bool first_stage = from_stage < QueryProcessingStage::WithMergeableState

View File

@ -220,8 +220,12 @@ public:
/** Returns stage to which query is going to be processed in read() function.
* (Normally, the function only reads the columns from the list, but in other cases,
* for example, the request can be partially processed on a remote server.)
*
* SelectQueryInfo is required since the stage can depends on the query
* (see Distributed() engine and optimize_skip_unused_shards).
*/
virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &) const { return QueryProcessingStage::FetchColumns; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const { return getQueryProcessingStage(context, {}); }
virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &, const ASTPtr &) const { return QueryProcessingStage::FetchColumns; }
/** Watch live changes to the table.
* Accepts a list of columns to read, as well as a description of the query,

View File

@ -26,7 +26,7 @@ public:
return std::make_shared<StorageBlocks>(table_id, columns, std::move(pipes), to_stage);
}
std::string getName() const override { return "Blocks"; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & /*context*/) const override { return to_stage; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, const ASTPtr &) const override { return to_stage; }
Pipes read(
const Names & /*column_names*/,

View File

@ -135,7 +135,7 @@ private:
};
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context) const
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const
{
if (destination_id)
{
@ -144,7 +144,7 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
return destination->getQueryProcessingStage(context);
return destination->getQueryProcessingStage(context, query_ptr);
}
return QueryProcessingStage::FetchColumns;

View File

@ -54,7 +54,7 @@ public:
std::string getName() const override { return "Buffer"; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ASTPtr &) const override;
Pipes read(
const Names & column_names,

View File

@ -369,9 +369,9 @@ static QueryProcessingStage::Enum getQueryProcessingStageImpl(const Context & co
: QueryProcessingStage::WithMergeableState;
}
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context & context) const
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const
{
auto cluster = getCluster();
auto cluster = getOptimizedCluster(context, query_ptr);
return getQueryProcessingStageImpl(context, cluster);
}
@ -383,9 +383,7 @@ Pipes StorageDistributed::read(
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
auto cluster = getCluster();
const Settings & settings = context.getSettingsRef();
auto cluster = getOptimizedCluster(context, query_info.query);
const auto & modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table, remote_table_function_ptr);
@ -405,50 +403,8 @@ Pipes StorageDistributed::read(
: ClusterProxy::SelectStreamFactory(
header, processed_stage, StorageID{remote_database, remote_table}, scalars, has_virtual_shard_num_column, context.getExternalTables());
UInt64 force = settings.force_optimize_skip_unused_shards;
if (settings.optimize_skip_unused_shards)
{
ClusterPtr smaller_cluster;
auto table_id = getStorageID();
if (has_sharding_key)
{
smaller_cluster = skipUnusedShards(cluster, query_info, context);
if (smaller_cluster)
{
cluster = smaller_cluster;
LOG_DEBUG(log, "Reading from " << table_id.getNameForLogs() << ": "
"Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): "
" " << makeFormattedListOfShards(cluster));
}
}
if (!smaller_cluster)
{
LOG_DEBUG(log, "Reading from " << table_id.getNameForLogs() <<
(has_sharding_key ? "" : " (no sharding key)") << ": "
"Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - "
"the query will be sent to all shards of the cluster");
if (force)
{
std::stringstream exception_message;
if (!has_sharding_key)
exception_message << "No sharding key";
else
exception_message << "Sharding key " << sharding_key_column_name << " is not used";
if (force == FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS)
throw Exception(exception_message.str(), ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS);
if (force == FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_HAS_SHARDING_KEY && has_sharding_key)
throw Exception(exception_message.str(), ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS);
}
}
}
return ClusterProxy::executeQuery(
select_stream_factory, cluster, modified_query_ast, context, settings, query_info);
select_stream_factory, cluster, modified_query_ast, context, context.getSettingsRef(), query_info);
}
@ -631,6 +587,51 @@ ClusterPtr StorageDistributed::getCluster() const
return owned_cluster ? owned_cluster : global_context.getCluster(cluster_name);
}
ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, const ASTPtr & query_ptr) const
{
ClusterPtr cluster = getCluster();
const Settings & settings = context.getSettingsRef();
auto table_id = getStorageID();
if (!settings.optimize_skip_unused_shards)
return cluster;
if (has_sharding_key)
{
ClusterPtr optimized = skipUnusedShards(cluster, query_ptr, context);
if (optimized)
{
LOG_DEBUG(log, "Reading from " << table_id.getNameForLogs() << ": "
"Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): "
" " << makeFormattedListOfShards(cluster));
return optimized;
}
}
LOG_DEBUG(log, "Reading from " << table_id.getNameForLogs() <<
(has_sharding_key ? "" : " (no sharding key)") << ": "
"Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - "
"the query will be sent to all shards of the cluster");
UInt64 force = settings.force_optimize_skip_unused_shards;
if (force)
{
std::stringstream exception_message;
if (!has_sharding_key)
exception_message << "No sharding key";
else
exception_message << "Sharding key " << sharding_key_column_name << " is not used";
if (force == FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS)
throw Exception(exception_message.str(), ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS);
if (force == FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_HAS_SHARDING_KEY && has_sharding_key)
throw Exception(exception_message.str(), ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS);
}
return cluster;
}
void StorageDistributed::ClusterNodeData::flushAllData()
{
directory_monitor->flushAllData();
@ -643,9 +644,9 @@ void StorageDistributed::ClusterNodeData::shutdownAndDropAllData()
/// Returns a new cluster with fewer shards if constant folding for `sharding_key_expr` is possible
/// using constraints from "PREWHERE" and "WHERE" conditions, otherwise returns `nullptr`
ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info, const Context & context)
ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const ASTPtr & query_ptr, const Context & context) const
{
const auto & select = query_info.query->as<ASTSelectQuery &>();
const auto & select = query_ptr->as<ASTSelectQuery &>();
if (!select.prewhere() && !select.where())
{

View File

@ -66,7 +66,7 @@ public:
bool isRemote() const override { return true; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ASTPtr &) const override;
Pipes read(
const Names & column_names,
@ -114,6 +114,12 @@ public:
ClusterPtr getCluster() const;
/// Apply the following settings:
/// - optimize_skip_unused_shards
/// - force_optimize_skip_unused_shards
ClusterPtr getOptimizedCluster(const Context &, const ASTPtr & query_ptr) const;
ClusterPtr skipUnusedShards(ClusterPtr cluster, const ASTPtr & query_ptr, const Context & context) const;
ActionLock getActionLock(StorageActionBlockType type) override;
String remote_database;
@ -164,8 +170,6 @@ protected:
const String & relative_data_path_,
bool attach);
ClusterPtr skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info, const Context & context);
void createStorage();
String storage_policy;

View File

@ -171,9 +171,9 @@ StorageInMemoryMetadata StorageMaterializedView::getInMemoryMetadata() const
return result;
}
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(const Context & context) const
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const
{
return getTargetTable()->getQueryProcessingStage(context);
return getTargetTable()->getQueryProcessingStage(context, query_ptr);
}
Pipes StorageMaterializedView::read(

View File

@ -59,7 +59,7 @@ public:
void checkTableCanBeDropped() const override;
void checkPartitionCanBeDropped(const ASTPtr & partition) override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ASTPtr &) const override;
StoragePtr getTargetTable() const;
StoragePtr tryGetTargetTable() const;

View File

@ -136,7 +136,7 @@ bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, cons
}
QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context) const
QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const
{
auto stage_in_source_tables = QueryProcessingStage::FetchColumns;
@ -150,7 +150,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
if (table.get() != this)
{
++selected_table_size;
stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context));
stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context, query_ptr));
}
iterator->next();
@ -287,7 +287,7 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
return pipes;
}
if (processed_stage <= storage->getQueryProcessingStage(*modified_context))
if (processed_stage <= storage->getQueryProcessingStage(*modified_context, query_info.query))
{
/// If there are only virtual columns in query, you must request at least one other column.
if (real_column_names.empty())
@ -295,7 +295,7 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
pipes = storage->read(real_column_names, modified_query_info, *modified_context, processed_stage, max_block_size, UInt32(streams_num));
}
else if (processed_stage > storage->getQueryProcessingStage(*modified_context))
else if (processed_stage > storage->getQueryProcessingStage(*modified_context, query_info.query))
{
modified_query_info.query->as<ASTSelectQuery>()->replaceDatabaseAndTable(source_database, table_name);

View File

@ -31,7 +31,7 @@ public:
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, const ASTPtr &) const override;
Pipes read(
const Names & column_names,

View File

@ -0,0 +1,8 @@
distributed_group_by_no_merge
1
1
optimize_skip_unused_shards
1
optimize_skip_unused_shards lack of WHERE
0
1

View File

@ -0,0 +1,19 @@
CREATE TABLE IF NOT EXISTS local_01213 (id Int) ENGINE = MergeTree ORDER BY tuple();
CREATE TABLE IF NOT EXISTS dist_01213 AS local_01213 ENGINE = Distributed(test_cluster_two_shards_localhost, currentDatabase(), local_01213, id);
-- at least two parts
INSERT INTO local_01213 SELECT toString(number) FROM numbers(2);
INSERT INTO local_01213 SELECT toString(number) FROM numbers(2);
-- check that without merge we will have two rows
SELECT 'distributed_group_by_no_merge';
SELECT DISTINCT id FROM dist_01213 WHERE id = 1 SETTINGS distributed_group_by_no_merge=1;
-- check that with merge there will be only one
SELECT 'optimize_skip_unused_shards';
SELECT DISTINCT id FROM dist_01213 WHERE id = 1 SETTINGS optimize_skip_unused_shards=1;
-- check that querying all shards is ok
SELECT 'optimize_skip_unused_shards lack of WHERE';
SELECT DISTINCT id FROM dist_01213 SETTINGS optimize_skip_unused_shards=1;
DROP TABLE local_01213;
DROP TABLE dist_01213;