StorageDistributed: cleanup skipping unused shards

This commit is contained in:
Azat Khuzhin 2020-03-24 10:51:54 +03:00
parent 66ccbf5d11
commit 4707dd827a
2 changed files with 56 additions and 51 deletions

View File

@ -236,7 +236,7 @@ void replaceConstantExpressions(ASTPtr & node, const Context & context, const Na
visitor.visit(node);
}
}
} // \anonymous
/// For destruction of std::unique_ptr of type that is incomplete in class definition.
@ -383,9 +383,7 @@ Pipes StorageDistributed::read(
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
auto cluster = getCluster();
const Settings & settings = context.getSettingsRef();
auto cluster = getOptimizedCluster(context, query_info.query);
const auto & modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table, remote_table_function_ptr);
@ -405,50 +403,8 @@ Pipes StorageDistributed::read(
: ClusterProxy::SelectStreamFactory(
header, processed_stage, StorageID{remote_database, remote_table}, scalars, has_virtual_shard_num_column, context.getExternalTables());
UInt64 force = settings.force_optimize_skip_unused_shards;
if (settings.optimize_skip_unused_shards)
{
ClusterPtr smaller_cluster;
auto table_id = getStorageID();
if (has_sharding_key)
{
smaller_cluster = skipUnusedShards(cluster, query_info, context);
if (smaller_cluster)
{
cluster = smaller_cluster;
LOG_DEBUG(log, "Reading from " << table_id.getNameForLogs() << ": "
"Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): "
" " << makeFormattedListOfShards(cluster));
}
}
if (!smaller_cluster)
{
LOG_DEBUG(log, "Reading from " << table_id.getNameForLogs() <<
(has_sharding_key ? "" : " (no sharding key)") << ": "
"Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - "
"the query will be sent to all shards of the cluster");
if (force)
{
std::stringstream exception_message;
if (!has_sharding_key)
exception_message << "No sharding key";
else
exception_message << "Sharding key " << sharding_key_column_name << " is not used";
if (force == FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS)
throw Exception(exception_message.str(), ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS);
if (force == FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_HAS_SHARDING_KEY && has_sharding_key)
throw Exception(exception_message.str(), ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS);
}
}
}
return ClusterProxy::executeQuery(
select_stream_factory, cluster, modified_query_ast, context, settings, query_info);
select_stream_factory, cluster, modified_query_ast, context, context.getSettingsRef(), query_info);
}
@ -631,6 +587,51 @@ ClusterPtr StorageDistributed::getCluster() const
return owned_cluster ? owned_cluster : global_context.getCluster(cluster_name);
}
ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, const ASTPtr & query_ptr) const
{
ClusterPtr cluster = getCluster();
const Settings & settings = context.getSettingsRef();
auto table_id = getStorageID();
if (!settings.optimize_skip_unused_shards)
return cluster;
if (has_sharding_key)
{
ClusterPtr optimized = skipUnusedShards(cluster, query_ptr, context);
if (optimized)
{
LOG_DEBUG(log, "Reading from " << table_id.getNameForLogs() << ": "
"Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): "
" " << makeFormattedListOfShards(cluster));
return optimized;
}
}
LOG_DEBUG(log, "Reading from " << table_id.getNameForLogs() <<
(has_sharding_key ? "" : " (no sharding key)") << ": "
"Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - "
"the query will be sent to all shards of the cluster");
UInt64 force = settings.force_optimize_skip_unused_shards;
if (force)
{
std::stringstream exception_message;
if (!has_sharding_key)
exception_message << "No sharding key";
else
exception_message << "Sharding key " << sharding_key_column_name << " is not used";
if (force == FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS)
throw Exception(exception_message.str(), ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS);
if (force == FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_HAS_SHARDING_KEY && has_sharding_key)
throw Exception(exception_message.str(), ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS);
}
return cluster;
}
void StorageDistributed::ClusterNodeData::flushAllData()
{
directory_monitor->flushAllData();
@ -643,9 +644,9 @@ void StorageDistributed::ClusterNodeData::shutdownAndDropAllData()
/// Returns a new cluster with fewer shards if constant folding for `sharding_key_expr` is possible
/// using constraints from "PREWHERE" and "WHERE" conditions, otherwise returns `nullptr`
ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info, const Context & context)
ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const ASTPtr & query_ptr, const Context & context) const
{
const auto & select = query_info.query->as<ASTSelectQuery &>();
const auto & select = query_ptr->as<ASTSelectQuery &>();
if (!select.prewhere() && !select.where())
{

View File

@ -114,6 +114,12 @@ public:
ClusterPtr getCluster() const;
/// Apply the following settings:
/// - optimize_skip_unused_shards
/// - force_optimize_skip_unused_shards
ClusterPtr getOptimizedCluster(const Context &, const ASTPtr & query_ptr) const;
ClusterPtr skipUnusedShards(ClusterPtr cluster, const ASTPtr & query_ptr, const Context & context) const;
ActionLock getActionLock(StorageActionBlockType type) override;
String remote_database;
@ -164,8 +170,6 @@ protected:
const String & relative_data_path_,
bool attach);
ClusterPtr skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info, const Context & context);
void createStorage();
String storage_policy;