mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-18 04:12:19 +00:00
Merge pull request #21511 from azat/optimize_skip_unused_shards-improvement
Rewrite IN in query for remote shards to exclude values that do not belong to the shard
This commit is contained in:
commit
d239c66fd3
@ -1565,6 +1565,17 @@ Possible values:
|
|||||||
|
|
||||||
Default value: 0
|
Default value: 0
|
||||||
|
|
||||||
|
## optimize_skip_unused_shards_rewrite_in {#optimize-skip-unused-shardslrewrite-in}
|
||||||
|
|
||||||
|
Rewrite IN in query for remote shards to exclude values that does not belong to the shard (requires optimize_skip_unused_shards).
|
||||||
|
|
||||||
|
Possible values:
|
||||||
|
|
||||||
|
- 0 — Disabled.
|
||||||
|
- 1 — Enabled.
|
||||||
|
|
||||||
|
Default value: 1 (since it requires `optimize_skip_unused_shards` anyway, which `0` by default)
|
||||||
|
|
||||||
## allow_nondeterministic_optimize_skip_unused_shards {#allow-nondeterministic-optimize-skip-unused-shards}
|
## allow_nondeterministic_optimize_skip_unused_shards {#allow-nondeterministic-optimize-skip-unused-shards}
|
||||||
|
|
||||||
Allow nondeterministic (like `rand` or `dictGet`, since later has some caveats with updates) functions in sharding key.
|
Allow nondeterministic (like `rand` or `dictGet`, since later has some caveats with updates) functions in sharding key.
|
||||||
|
@ -118,6 +118,7 @@ class IColumn;
|
|||||||
M(Bool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avoiding costly aggregation on the initiator server).", 0) \
|
M(Bool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avoiding costly aggregation on the initiator server).", 0) \
|
||||||
M(UInt64, optimize_skip_unused_shards_limit, 1000, "Limit for number of sharding key values, turns off optimize_skip_unused_shards if the limit is reached", 0) \
|
M(UInt64, optimize_skip_unused_shards_limit, 1000, "Limit for number of sharding key values, turns off optimize_skip_unused_shards if the limit is reached", 0) \
|
||||||
M(Bool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \
|
M(Bool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \
|
||||||
|
M(Bool, optimize_skip_unused_shards_rewrite_in, true, "Rewrite IN in query for remote shards to exclude values that does not belong to the shard (requires optimize_skip_unused_shards)", 0) \
|
||||||
M(Bool, allow_nondeterministic_optimize_skip_unused_shards, false, "Allow non-deterministic functions (includes dictGet) in sharding_key for optimize_skip_unused_shards", 0) \
|
M(Bool, allow_nondeterministic_optimize_skip_unused_shards, false, "Allow non-deterministic functions (includes dictGet) in sharding_key for optimize_skip_unused_shards", 0) \
|
||||||
M(UInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \
|
M(UInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \
|
||||||
M(UInt64, optimize_skip_unused_shards_nesting, 0, "Same as optimize_skip_unused_shards, but accept nesting level until which it will work.", 0) \
|
M(UInt64, optimize_skip_unused_shards_nesting, 0, "Same as optimize_skip_unused_shards, but accept nesting level until which it will work.", 0) \
|
||||||
|
@ -55,6 +55,8 @@ public:
|
|||||||
static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit);
|
static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
using SlotToShard = std::vector<UInt64>;
|
||||||
|
|
||||||
struct Address
|
struct Address
|
||||||
{
|
{
|
||||||
/** In configuration file,
|
/** In configuration file,
|
||||||
@ -232,7 +234,6 @@ public:
|
|||||||
bool maybeCrossReplication() const;
|
bool maybeCrossReplication() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
using SlotToShard = std::vector<UInt64>;
|
|
||||||
SlotToShard slot_to_shard;
|
SlotToShard slot_to_shard;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
@ -30,7 +30,7 @@ public:
|
|||||||
|
|
||||||
virtual void createForShard(
|
virtual void createForShard(
|
||||||
const Cluster::ShardInfo & shard_info,
|
const Cluster::ShardInfo & shard_info,
|
||||||
const String & query, const ASTPtr & query_ast,
|
const ASTPtr & query_ast,
|
||||||
ContextPtr context, const ThrottlerPtr & throttler,
|
ContextPtr context, const ThrottlerPtr & throttler,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
std::vector<QueryPlanPtr> & res,
|
std::vector<QueryPlanPtr> & res,
|
||||||
|
@ -115,7 +115,7 @@ String formattedAST(const ASTPtr & ast)
|
|||||||
|
|
||||||
void SelectStreamFactory::createForShard(
|
void SelectStreamFactory::createForShard(
|
||||||
const Cluster::ShardInfo & shard_info,
|
const Cluster::ShardInfo & shard_info,
|
||||||
const String &, const ASTPtr & query_ast,
|
const ASTPtr & query_ast,
|
||||||
ContextPtr context, const ThrottlerPtr & throttler,
|
ContextPtr context, const ThrottlerPtr & throttler,
|
||||||
const SelectQueryInfo &,
|
const SelectQueryInfo &,
|
||||||
std::vector<QueryPlanPtr> & plans,
|
std::vector<QueryPlanPtr> & plans,
|
||||||
|
@ -34,7 +34,7 @@ public:
|
|||||||
|
|
||||||
void createForShard(
|
void createForShard(
|
||||||
const Cluster::ShardInfo & shard_info,
|
const Cluster::ShardInfo & shard_info,
|
||||||
const String & query, const ASTPtr & query_ast,
|
const ASTPtr & query_ast,
|
||||||
ContextPtr context, const ThrottlerPtr & throttler,
|
ContextPtr context, const ThrottlerPtr & throttler,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
std::vector<QueryPlanPtr> & plans,
|
std::vector<QueryPlanPtr> & plans,
|
||||||
|
@ -5,7 +5,7 @@
|
|||||||
#include <Interpreters/Cluster.h>
|
#include <Interpreters/Cluster.h>
|
||||||
#include <Interpreters/IInterpreter.h>
|
#include <Interpreters/IInterpreter.h>
|
||||||
#include <Interpreters/ProcessList.h>
|
#include <Interpreters/ProcessList.h>
|
||||||
#include <Parsers/queryToString.h>
|
#include <Interpreters/OptimizeShardingKeyRewriteInVisitor.h>
|
||||||
#include <Processors/Pipe.h>
|
#include <Processors/Pipe.h>
|
||||||
#include <Processors/QueryPlan/QueryPlan.h>
|
#include <Processors/QueryPlan/QueryPlan.h>
|
||||||
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
|
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
|
||||||
@ -91,7 +91,10 @@ ContextPtr updateSettingsForCluster(const Cluster & cluster, ContextPtr context,
|
|||||||
void executeQuery(
|
void executeQuery(
|
||||||
QueryPlan & query_plan,
|
QueryPlan & query_plan,
|
||||||
IStreamFactory & stream_factory, Poco::Logger * log,
|
IStreamFactory & stream_factory, Poco::Logger * log,
|
||||||
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info)
|
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
|
||||||
|
const ExpressionActionsPtr & sharding_key_expr,
|
||||||
|
const std::string & sharding_key_column_name,
|
||||||
|
const ClusterPtr & not_optimized_cluster)
|
||||||
{
|
{
|
||||||
assert(log);
|
assert(log);
|
||||||
|
|
||||||
@ -104,9 +107,7 @@ void executeQuery(
|
|||||||
Pipes remote_pipes;
|
Pipes remote_pipes;
|
||||||
Pipes delayed_pipes;
|
Pipes delayed_pipes;
|
||||||
|
|
||||||
const std::string query = queryToString(query_ast);
|
auto new_context = updateSettingsForCluster(*query_info.getCluster(), context, settings, log);
|
||||||
|
|
||||||
auto new_context = updateSettingsForCluster(*query_info.cluster, context, settings, log);
|
|
||||||
|
|
||||||
new_context->getClientInfo().distributed_depth += 1;
|
new_context->getClientInfo().distributed_depth += 1;
|
||||||
|
|
||||||
@ -127,9 +128,28 @@ void executeQuery(
|
|||||||
else
|
else
|
||||||
throttler = user_level_throttler;
|
throttler = user_level_throttler;
|
||||||
|
|
||||||
for (const auto & shard_info : query_info.cluster->getShardsInfo())
|
size_t shards = query_info.getCluster()->getShardCount();
|
||||||
|
for (const auto & shard_info : query_info.getCluster()->getShardsInfo())
|
||||||
{
|
{
|
||||||
stream_factory.createForShard(shard_info, query, query_ast,
|
ASTPtr query_ast_for_shard;
|
||||||
|
if (query_info.optimized_cluster && settings.optimize_skip_unused_shards_rewrite_in && shards > 1)
|
||||||
|
{
|
||||||
|
query_ast_for_shard = query_ast->clone();
|
||||||
|
|
||||||
|
OptimizeShardingKeyRewriteInVisitor::Data visitor_data{
|
||||||
|
sharding_key_expr,
|
||||||
|
sharding_key_column_name,
|
||||||
|
shard_info,
|
||||||
|
not_optimized_cluster->getSlotToShard(),
|
||||||
|
};
|
||||||
|
OptimizeShardingKeyRewriteInVisitor visitor(visitor_data);
|
||||||
|
visitor.visit(query_ast_for_shard);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
query_ast_for_shard = query_ast;
|
||||||
|
|
||||||
|
stream_factory.createForShard(shard_info,
|
||||||
|
query_ast_for_shard,
|
||||||
new_context, throttler, query_info, plans,
|
new_context, throttler, query_info, plans,
|
||||||
remote_pipes, delayed_pipes, log);
|
remote_pipes, delayed_pipes, log);
|
||||||
}
|
}
|
||||||
|
@ -8,11 +8,15 @@ namespace DB
|
|||||||
|
|
||||||
struct Settings;
|
struct Settings;
|
||||||
class Cluster;
|
class Cluster;
|
||||||
|
using ClusterPtr = std::shared_ptr<Cluster>;
|
||||||
struct SelectQueryInfo;
|
struct SelectQueryInfo;
|
||||||
|
|
||||||
class Pipe;
|
class Pipe;
|
||||||
class QueryPlan;
|
class QueryPlan;
|
||||||
|
|
||||||
|
class ExpressionActions;
|
||||||
|
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
|
||||||
|
|
||||||
namespace ClusterProxy
|
namespace ClusterProxy
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -35,7 +39,10 @@ ContextPtr updateSettingsForCluster(const Cluster & cluster, ContextPtr context,
|
|||||||
void executeQuery(
|
void executeQuery(
|
||||||
QueryPlan & query_plan,
|
QueryPlan & query_plan,
|
||||||
IStreamFactory & stream_factory, Poco::Logger * log,
|
IStreamFactory & stream_factory, Poco::Logger * log,
|
||||||
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info);
|
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
|
||||||
|
const ExpressionActionsPtr & sharding_key_expr,
|
||||||
|
const std::string & sharding_key_column_name,
|
||||||
|
const ClusterPtr & not_optimized_cluster);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
110
src/Interpreters/OptimizeShardingKeyRewriteInVisitor.cpp
Normal file
110
src/Interpreters/OptimizeShardingKeyRewriteInVisitor.cpp
Normal file
@ -0,0 +1,110 @@
|
|||||||
|
#include <Interpreters/ExpressionActions.h>
|
||||||
|
#include <Interpreters/convertFieldToType.h>
|
||||||
|
#include <Parsers/ASTFunction.h>
|
||||||
|
#include <Parsers/ASTLiteral.h>
|
||||||
|
#include <Parsers/ASTIdentifier.h>
|
||||||
|
#include <DataTypes/FieldToDataType.h>
|
||||||
|
#include <Interpreters/OptimizeShardingKeyRewriteInVisitor.h>
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
|
||||||
|
using namespace DB;
|
||||||
|
|
||||||
|
Field executeFunctionOnField(
|
||||||
|
const Field & field, const std::string & name,
|
||||||
|
const ExpressionActionsPtr & expr,
|
||||||
|
const std::string & sharding_key_column_name)
|
||||||
|
{
|
||||||
|
DataTypePtr type = applyVisitor(FieldToDataType{}, field);
|
||||||
|
|
||||||
|
ColumnWithTypeAndName column;
|
||||||
|
column.column = type->createColumnConst(1, field);
|
||||||
|
column.name = name;
|
||||||
|
column.type = type;
|
||||||
|
|
||||||
|
Block block{column};
|
||||||
|
size_t num_rows = 1;
|
||||||
|
expr->execute(block, num_rows);
|
||||||
|
|
||||||
|
ColumnWithTypeAndName & ret = block.getByName(sharding_key_column_name);
|
||||||
|
return (*ret.column)[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return true if shard may contain such value (or it is unknown), otherwise false.
|
||||||
|
bool shardContains(
|
||||||
|
const Field & sharding_column_value,
|
||||||
|
const std::string & sharding_column_name,
|
||||||
|
const ExpressionActionsPtr & expr,
|
||||||
|
const std::string & sharding_key_column_name,
|
||||||
|
const Cluster::ShardInfo & shard_info,
|
||||||
|
const Cluster::SlotToShard & slots)
|
||||||
|
{
|
||||||
|
/// NULL is not allowed in sharding key,
|
||||||
|
/// so it should be safe to assume that shard cannot contain it.
|
||||||
|
if (sharding_column_value.isNull())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
Field sharding_value = executeFunctionOnField(sharding_column_value, sharding_column_name, expr, sharding_key_column_name);
|
||||||
|
UInt64 value = sharding_value.get<UInt64>();
|
||||||
|
const auto shard_num = slots[value % slots.size()] + 1;
|
||||||
|
return shard_info.shard_num == shard_num;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
bool OptimizeShardingKeyRewriteInMatcher::needChildVisit(ASTPtr & /*node*/, const ASTPtr & /*child*/)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void OptimizeShardingKeyRewriteInMatcher::visit(ASTPtr & node, Data & data)
|
||||||
|
{
|
||||||
|
if (auto * function = node->as<ASTFunction>())
|
||||||
|
visit(*function, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
void OptimizeShardingKeyRewriteInMatcher::visit(ASTFunction & function, Data & data)
|
||||||
|
{
|
||||||
|
if (function.name != "in")
|
||||||
|
return;
|
||||||
|
|
||||||
|
auto * left = function.arguments->children.front().get();
|
||||||
|
auto * right = function.arguments->children.back().get();
|
||||||
|
auto * identifier = left->as<ASTIdentifier>();
|
||||||
|
if (!identifier)
|
||||||
|
return;
|
||||||
|
|
||||||
|
const auto & expr = data.sharding_key_expr;
|
||||||
|
const auto & sharding_key_column_name = data.sharding_key_column_name;
|
||||||
|
|
||||||
|
if (!expr->getRequiredColumnsWithTypes().contains(identifier->name()))
|
||||||
|
return;
|
||||||
|
|
||||||
|
/// NOTE: that we should not take care about empty tuple,
|
||||||
|
/// since after optimize_skip_unused_shards,
|
||||||
|
/// at least one element should match each shard.
|
||||||
|
if (auto * tuple_func = right->as<ASTFunction>(); tuple_func && tuple_func->name == "tuple")
|
||||||
|
{
|
||||||
|
auto * tuple_elements = tuple_func->children.front()->as<ASTExpressionList>();
|
||||||
|
std::erase_if(tuple_elements->children, [&](auto & child)
|
||||||
|
{
|
||||||
|
auto * literal = child->template as<ASTLiteral>();
|
||||||
|
return literal && !shardContains(literal->value, identifier->name(), expr, sharding_key_column_name, data.shard_info, data.slots);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else if (auto * tuple_literal = right->as<ASTLiteral>();
|
||||||
|
tuple_literal && tuple_literal->value.getType() == Field::Types::Tuple)
|
||||||
|
{
|
||||||
|
auto & tuple = tuple_literal->value.get<Tuple &>();
|
||||||
|
std::erase_if(tuple, [&](auto & child)
|
||||||
|
{
|
||||||
|
return !shardContains(child, identifier->name(), expr, sharding_key_column_name, data.shard_info, data.slots);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
41
src/Interpreters/OptimizeShardingKeyRewriteInVisitor.h
Normal file
41
src/Interpreters/OptimizeShardingKeyRewriteInVisitor.h
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <Interpreters/InDepthNodeVisitor.h>
|
||||||
|
#include <Interpreters/Cluster.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
class ExpressionActions;
|
||||||
|
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
|
||||||
|
|
||||||
|
class ASTFunction;
|
||||||
|
|
||||||
|
/// Rewrite `sharding_key IN (...)` for specific shard,
|
||||||
|
/// so that it will contain only values that belong to this specific shard.
|
||||||
|
///
|
||||||
|
/// See also:
|
||||||
|
/// - evaluateExpressionOverConstantCondition()
|
||||||
|
/// - StorageDistributed::createSelector()
|
||||||
|
/// - createBlockSelector()
|
||||||
|
struct OptimizeShardingKeyRewriteInMatcher
|
||||||
|
{
|
||||||
|
/// Cluster::SlotToShard
|
||||||
|
using SlotToShard = std::vector<UInt64>;
|
||||||
|
|
||||||
|
struct Data
|
||||||
|
{
|
||||||
|
const ExpressionActionsPtr & sharding_key_expr;
|
||||||
|
const std::string & sharding_key_column_name;
|
||||||
|
const Cluster::ShardInfo & shard_info;
|
||||||
|
const Cluster::SlotToShard & slots;
|
||||||
|
};
|
||||||
|
|
||||||
|
static bool needChildVisit(ASTPtr & /*node*/, const ASTPtr & /*child*/);
|
||||||
|
static void visit(ASTPtr & node, Data & data);
|
||||||
|
static void visit(ASTFunction & function, Data & data);
|
||||||
|
};
|
||||||
|
|
||||||
|
using OptimizeShardingKeyRewriteInVisitor = InDepthNodeVisitor<OptimizeShardingKeyRewriteInMatcher, true>;
|
||||||
|
|
||||||
|
}
|
@ -118,6 +118,7 @@ SRCS(
|
|||||||
OpenTelemetrySpanLog.cpp
|
OpenTelemetrySpanLog.cpp
|
||||||
OptimizeIfChains.cpp
|
OptimizeIfChains.cpp
|
||||||
OptimizeIfWithConstantConditionVisitor.cpp
|
OptimizeIfWithConstantConditionVisitor.cpp
|
||||||
|
OptimizeShardingKeyRewriteInVisitor.cpp
|
||||||
PartLog.cpp
|
PartLog.cpp
|
||||||
PredicateExpressionsOptimizer.cpp
|
PredicateExpressionsOptimizer.cpp
|
||||||
PredicateRewriteVisitor.cpp
|
PredicateRewriteVisitor.cpp
|
||||||
|
@ -119,9 +119,13 @@ struct SelectQueryInfo
|
|||||||
ASTPtr query;
|
ASTPtr query;
|
||||||
ASTPtr view_query; /// Optimized VIEW query
|
ASTPtr view_query; /// Optimized VIEW query
|
||||||
|
|
||||||
/// For optimize_skip_unused_shards.
|
/// Cluster for the query.
|
||||||
/// Can be modified in getQueryProcessingStage()
|
|
||||||
ClusterPtr cluster;
|
ClusterPtr cluster;
|
||||||
|
/// Optimized cluster for the query.
|
||||||
|
/// In case of optimize_skip_unused_shards it may differs from original cluster.
|
||||||
|
///
|
||||||
|
/// Configured in StorageDistributed::getQueryProcessingStage()
|
||||||
|
ClusterPtr optimized_cluster;
|
||||||
|
|
||||||
TreeRewriterResultPtr syntax_analyzer_result;
|
TreeRewriterResultPtr syntax_analyzer_result;
|
||||||
|
|
||||||
@ -134,6 +138,8 @@ struct SelectQueryInfo
|
|||||||
/// Prepared sets are used for indices by storage engine.
|
/// Prepared sets are used for indices by storage engine.
|
||||||
/// Example: x IN (1, 2, 3)
|
/// Example: x IN (1, 2, 3)
|
||||||
PreparedSets sets;
|
PreparedSets sets;
|
||||||
|
|
||||||
|
ClusterPtr getCluster() const { return !optimized_cluster ? cluster : optimized_cluster; }
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -478,7 +478,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
|
|||||||
"Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}",
|
"Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}",
|
||||||
makeFormattedListOfShards(optimized_cluster));
|
makeFormattedListOfShards(optimized_cluster));
|
||||||
cluster = optimized_cluster;
|
cluster = optimized_cluster;
|
||||||
query_info.cluster = cluster;
|
query_info.optimized_cluster = cluster;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -558,7 +558,7 @@ void StorageDistributed::read(
|
|||||||
InterpreterSelectQuery(query_info.query, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
|
InterpreterSelectQuery(query_info.query, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
|
||||||
|
|
||||||
/// Return directly (with correct header) if no shard to query.
|
/// Return directly (with correct header) if no shard to query.
|
||||||
if (query_info.cluster->getShardsInfo().empty())
|
if (query_info.getCluster()->getShardsInfo().empty())
|
||||||
{
|
{
|
||||||
Pipe pipe(std::make_shared<NullSource>(header));
|
Pipe pipe(std::make_shared<NullSource>(header));
|
||||||
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
|
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
|
||||||
@ -586,7 +586,9 @@ void StorageDistributed::read(
|
|||||||
local_context->getExternalTables());
|
local_context->getExternalTables());
|
||||||
|
|
||||||
ClusterProxy::executeQuery(query_plan, select_stream_factory, log,
|
ClusterProxy::executeQuery(query_plan, select_stream_factory, log,
|
||||||
modified_query_ast, local_context, query_info);
|
modified_query_ast, local_context, query_info,
|
||||||
|
sharding_key_expr, sharding_key_column_name,
|
||||||
|
getCluster());
|
||||||
|
|
||||||
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
|
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
|
||||||
if (!query_plan.isInitialized())
|
if (!query_plan.isInitialized())
|
||||||
@ -952,7 +954,7 @@ ClusterPtr StorageDistributed::getOptimizedCluster(
|
|||||||
throw Exception(exception_message.str(), ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS);
|
throw Exception(exception_message.str(), ErrorCodes::UNABLE_TO_SKIP_UNUSED_SHARDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
return cluster;
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
IColumn::Selector StorageDistributed::createSelector(const ClusterPtr cluster, const ColumnWithTypeAndName & result)
|
IColumn::Selector StorageDistributed::createSelector(const ClusterPtr cluster, const ColumnWithTypeAndName & result)
|
||||||
|
@ -0,0 +1,22 @@
|
|||||||
|
(0, 2)
|
||||||
|
0 0
|
||||||
|
0 0
|
||||||
|
WITH CAST(\'default\', \'String\') AS id_no SELECT one.dummy, ignore(id_no) FROM system.one WHERE dummy IN (0, 2)
|
||||||
|
WITH CAST(\'default\', \'String\') AS id_no SELECT one.dummy, ignore(id_no) FROM system.one WHERE dummy IN (0, 2)
|
||||||
|
optimize_skip_unused_shards_rewrite_in(0, 2)
|
||||||
|
0 0
|
||||||
|
WITH CAST(\'default\', \'String\') AS id_02 SELECT one.dummy, ignore(id_02) FROM system.one WHERE dummy IN tuple(0)
|
||||||
|
WITH CAST(\'default\', \'String\') AS id_02 SELECT one.dummy, ignore(id_02) FROM system.one WHERE dummy IN tuple(2)
|
||||||
|
optimize_skip_unused_shards_rewrite_in(2,)
|
||||||
|
WITH CAST(\'default\', \'String\') AS id_2 SELECT one.dummy, ignore(id_2) FROM system.one WHERE dummy IN tuple(2)
|
||||||
|
optimize_skip_unused_shards_rewrite_in(0,)
|
||||||
|
0 0
|
||||||
|
WITH CAST(\'default\', \'String\') AS id_0 SELECT one.dummy, ignore(id_0) FROM system.one WHERE dummy IN tuple(0)
|
||||||
|
errors
|
||||||
|
others
|
||||||
|
0
|
||||||
|
0
|
||||||
|
0
|
||||||
|
optimize_skip_unused_shards_limit
|
||||||
|
0
|
||||||
|
0
|
@ -0,0 +1,127 @@
|
|||||||
|
-- NOTE: this test cannot use 'current_database = currentDatabase()',
|
||||||
|
-- because it does not propagated via remote queries,
|
||||||
|
-- hence it uses 'with (select currentDatabase()) as X'
|
||||||
|
-- (with subquery to expand it on the initiator).
|
||||||
|
|
||||||
|
drop table if exists dist_01756;
|
||||||
|
drop table if exists dist_01756_str;
|
||||||
|
drop table if exists data_01756_str;
|
||||||
|
|
||||||
|
-- SELECT
|
||||||
|
-- intHash64(0) % 2,
|
||||||
|
-- intHash64(2) % 2
|
||||||
|
-- ┌─modulo(intHash64(0), 2)─┬─modulo(intHash64(2), 2)─┐
|
||||||
|
-- │ 0 │ 1 │
|
||||||
|
-- └─────────────────────────┴─────────────────────────┘
|
||||||
|
create table dist_01756 as system.one engine=Distributed(test_cluster_two_shards, system, one, intHash64(dummy));
|
||||||
|
|
||||||
|
-- separate log entry for localhost queries
|
||||||
|
set prefer_localhost_replica=0;
|
||||||
|
set force_optimize_skip_unused_shards=2;
|
||||||
|
set optimize_skip_unused_shards=1;
|
||||||
|
set optimize_skip_unused_shards_rewrite_in=0;
|
||||||
|
set log_queries=1;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- w/o optimize_skip_unused_shards_rewrite_in=1
|
||||||
|
--
|
||||||
|
select '(0, 2)';
|
||||||
|
with (select currentDatabase()) as id_no select *, ignore(id_no) from dist_01756 where dummy in (0, 2);
|
||||||
|
system flush logs;
|
||||||
|
select query from system.query_log where
|
||||||
|
event_date = today() and
|
||||||
|
event_time > now() - interval 1 hour and
|
||||||
|
not is_initial_query and
|
||||||
|
query not like '%system.query_log%' and
|
||||||
|
query like concat('WITH%', currentDatabase(), '%AS id_no %') and
|
||||||
|
type = 'QueryFinish'
|
||||||
|
order by query;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- w/ optimize_skip_unused_shards_rewrite_in=1
|
||||||
|
--
|
||||||
|
|
||||||
|
set optimize_skip_unused_shards_rewrite_in=1;
|
||||||
|
|
||||||
|
-- detailed coverage for realistic examples
|
||||||
|
select 'optimize_skip_unused_shards_rewrite_in(0, 2)';
|
||||||
|
with (select currentDatabase()) as id_02 select *, ignore(id_02) from dist_01756 where dummy in (0, 2);
|
||||||
|
system flush logs;
|
||||||
|
select query from system.query_log where
|
||||||
|
event_date = today() and
|
||||||
|
event_time > now() - interval 1 hour and
|
||||||
|
not is_initial_query and
|
||||||
|
query not like '%system.query_log%' and
|
||||||
|
query like concat('WITH%', currentDatabase(), '%AS id_02 %') and
|
||||||
|
type = 'QueryFinish'
|
||||||
|
order by query;
|
||||||
|
|
||||||
|
select 'optimize_skip_unused_shards_rewrite_in(2,)';
|
||||||
|
with (select currentDatabase()) as id_2 select *, ignore(id_2) from dist_01756 where dummy in (2,);
|
||||||
|
system flush logs;
|
||||||
|
select query from system.query_log where
|
||||||
|
event_date = today() and
|
||||||
|
event_time > now() - interval 1 hour and
|
||||||
|
not is_initial_query and
|
||||||
|
query not like '%system.query_log%' and
|
||||||
|
query like concat('WITH%', currentDatabase(), '%AS id_2 %') and
|
||||||
|
type = 'QueryFinish'
|
||||||
|
order by query;
|
||||||
|
|
||||||
|
select 'optimize_skip_unused_shards_rewrite_in(0,)';
|
||||||
|
with (select currentDatabase()) as id_0 select *, ignore(id_0) from dist_01756 where dummy in (0,);
|
||||||
|
system flush logs;
|
||||||
|
select query from system.query_log where
|
||||||
|
event_date = today() and
|
||||||
|
event_time > now() - interval 1 hour and
|
||||||
|
not is_initial_query and
|
||||||
|
query not like '%system.query_log%' and
|
||||||
|
query like concat('WITH%', currentDatabase(), '%AS id_0 %') and
|
||||||
|
type = 'QueryFinish'
|
||||||
|
order by query;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- errors
|
||||||
|
--
|
||||||
|
select 'errors';
|
||||||
|
|
||||||
|
-- not tuple
|
||||||
|
select * from dist_01756 where dummy in (0); -- { serverError 507 }
|
||||||
|
-- optimize_skip_unused_shards does not support non-constants
|
||||||
|
select * from dist_01756 where dummy in (select * from system.one); -- { serverError 507 }
|
||||||
|
select * from dist_01756 where dummy in (toUInt8(0)); -- { serverError 507 }
|
||||||
|
-- wrong type
|
||||||
|
select * from dist_01756 where dummy in ('0'); -- { serverError 507 }
|
||||||
|
-- NOT IN does not supported
|
||||||
|
select * from dist_01756 where dummy not in (0, 2); -- { serverError 507 }
|
||||||
|
|
||||||
|
--
|
||||||
|
-- others
|
||||||
|
--
|
||||||
|
select 'others';
|
||||||
|
|
||||||
|
select * from dist_01756 where dummy not in (2, 3) and dummy in (0, 2);
|
||||||
|
select * from dist_01756 where dummy in tuple(0, 2);
|
||||||
|
select * from dist_01756 where dummy in tuple(0);
|
||||||
|
select * from dist_01756 where dummy in tuple(2);
|
||||||
|
-- Identifier is NULL
|
||||||
|
select (2 IN (2,)), * from dist_01756 where dummy in (0, 2) format Null;
|
||||||
|
-- Literal is NULL
|
||||||
|
select (dummy IN (toUInt8(2),)), * from dist_01756 where dummy in (0, 2) format Null;
|
||||||
|
|
||||||
|
-- different type
|
||||||
|
create table data_01756_str (key String) engine=Memory();
|
||||||
|
create table dist_01756_str as data_01756_str engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01756_str, cityHash64(key));
|
||||||
|
select * from dist_01756_str where key in ('0', '2');
|
||||||
|
select * from dist_01756_str where key in ('0', Null); -- { serverError 507 }
|
||||||
|
select * from dist_01756_str where key in (0, 2); -- { serverError 53 }
|
||||||
|
select * from dist_01756_str where key in (0, Null); -- { serverError 53 }
|
||||||
|
|
||||||
|
-- optimize_skip_unused_shards_limit
|
||||||
|
select 'optimize_skip_unused_shards_limit';
|
||||||
|
select * from dist_01756 where dummy in (0, 2) settings optimize_skip_unused_shards_limit=1; -- { serverError 507 }
|
||||||
|
select * from dist_01756 where dummy in (0, 2) settings optimize_skip_unused_shards_limit=1, force_optimize_skip_unused_shards=0;
|
||||||
|
|
||||||
|
drop table dist_01756;
|
||||||
|
drop table dist_01756_str;
|
||||||
|
drop table data_01756_str;
|
Loading…
Reference in New Issue
Block a user