Rewrite IN in query for remote shards to exclude values that does not belongs to shard

v2: fix optimize_skip_unused_shards_rewrite_in for sharding_key wrapped into function
v3: fix column name for optimize_skip_unused_shards_rewrite_in
v4: fix optimize_skip_unused_shards_rewrite_in with Null
v5:
- squash with Remove query argument for IStreamFactory::createForShard()
- use proper column after function execution (using sharding_key_column_name)
- update the test reference since (X) now is tuple(X)
This commit is contained in:
Azat Khuzhin 2021-03-07 18:51:01 +03:00
parent 773212529b
commit fbb386dca5
13 changed files with 338 additions and 10 deletions

View File

@ -1565,6 +1565,17 @@ Possible values:
Default value: 0 Default value: 0
## optimize_skip_unused_shards_rewrite_in {#optimize-skip-unused-shardslrewrite-in}
Rewrite IN in query for remote shards to exclude values that does not belong to the shard (requires optimize_skip_unused_shards).
Possible values:
- 0 — Disabled.
- 1 — Enabled.
Default value: 1 (since it requires `optimize_skip_unused_shards` anyway, which `0` by default)
## allow_nondeterministic_optimize_skip_unused_shards {#allow-nondeterministic-optimize-skip-unused-shards} ## allow_nondeterministic_optimize_skip_unused_shards {#allow-nondeterministic-optimize-skip-unused-shards}
Allow nondeterministic (like `rand` or `dictGet`, since later has some caveats with updates) functions in sharding key. Allow nondeterministic (like `rand` or `dictGet`, since later has some caveats with updates) functions in sharding key.

View File

@ -118,6 +118,7 @@ class IColumn;
M(Bool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avoiding costly aggregation on the initiator server).", 0) \ M(Bool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avoiding costly aggregation on the initiator server).", 0) \
M(UInt64, optimize_skip_unused_shards_limit, 1000, "Limit for number of sharding key values, turns off optimize_skip_unused_shards if the limit is reached", 0) \ M(UInt64, optimize_skip_unused_shards_limit, 1000, "Limit for number of sharding key values, turns off optimize_skip_unused_shards if the limit is reached", 0) \
M(Bool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \ M(Bool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \
M(Bool, optimize_skip_unused_shards_rewrite_in, true, "Rewrite IN in query for remote shards to exclude values that does not belong to the shard (requires optimize_skip_unused_shards)", 0) \
M(Bool, allow_nondeterministic_optimize_skip_unused_shards, false, "Allow non-deterministic functions (includes dictGet) in sharding_key for optimize_skip_unused_shards", 0) \ M(Bool, allow_nondeterministic_optimize_skip_unused_shards, false, "Allow non-deterministic functions (includes dictGet) in sharding_key for optimize_skip_unused_shards", 0) \
M(UInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \ M(UInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \
M(UInt64, optimize_skip_unused_shards_nesting, 0, "Same as optimize_skip_unused_shards, but accept nesting level until which it will work.", 0) \ M(UInt64, optimize_skip_unused_shards_nesting, 0, "Same as optimize_skip_unused_shards, but accept nesting level until which it will work.", 0) \

View File

@ -30,7 +30,7 @@ public:
virtual void createForShard( virtual void createForShard(
const Cluster::ShardInfo & shard_info, const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast, const ASTPtr & query_ast,
ContextPtr context, const ThrottlerPtr & throttler, ContextPtr context, const ThrottlerPtr & throttler,
const SelectQueryInfo & query_info, const SelectQueryInfo & query_info,
std::vector<QueryPlanPtr> & res, std::vector<QueryPlanPtr> & res,

View File

@ -115,7 +115,7 @@ String formattedAST(const ASTPtr & ast)
void SelectStreamFactory::createForShard( void SelectStreamFactory::createForShard(
const Cluster::ShardInfo & shard_info, const Cluster::ShardInfo & shard_info,
const String &, const ASTPtr & query_ast, const ASTPtr & query_ast,
ContextPtr context, const ThrottlerPtr & throttler, ContextPtr context, const ThrottlerPtr & throttler,
const SelectQueryInfo &, const SelectQueryInfo &,
std::vector<QueryPlanPtr> & plans, std::vector<QueryPlanPtr> & plans,

View File

@ -34,7 +34,7 @@ public:
void createForShard( void createForShard(
const Cluster::ShardInfo & shard_info, const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast, const ASTPtr & query_ast,
ContextPtr context, const ThrottlerPtr & throttler, ContextPtr context, const ThrottlerPtr & throttler,
const SelectQueryInfo & query_info, const SelectQueryInfo & query_info,
std::vector<QueryPlanPtr> & plans, std::vector<QueryPlanPtr> & plans,

View File

@ -5,7 +5,7 @@
#include <Interpreters/Cluster.h> #include <Interpreters/Cluster.h>
#include <Interpreters/IInterpreter.h> #include <Interpreters/IInterpreter.h>
#include <Interpreters/ProcessList.h> #include <Interpreters/ProcessList.h>
#include <Parsers/queryToString.h> #include <Interpreters/OptimizeShardingKeyRewriteInVisitor.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Processors/QueryPlan/QueryPlan.h> #include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h> #include <Processors/QueryPlan/ReadFromPreparedSource.h>
@ -91,7 +91,10 @@ ContextPtr updateSettingsForCluster(const Cluster & cluster, ContextPtr context,
void executeQuery( void executeQuery(
QueryPlan & query_plan, QueryPlan & query_plan,
IStreamFactory & stream_factory, Poco::Logger * log, IStreamFactory & stream_factory, Poco::Logger * log,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info) const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster)
{ {
assert(log); assert(log);
@ -104,8 +107,6 @@ void executeQuery(
Pipes remote_pipes; Pipes remote_pipes;
Pipes delayed_pipes; Pipes delayed_pipes;
const std::string query = queryToString(query_ast);
auto new_context = updateSettingsForCluster(*query_info.cluster, context, settings, log); auto new_context = updateSettingsForCluster(*query_info.cluster, context, settings, log);
new_context->getClientInfo().distributed_depth += 1; new_context->getClientInfo().distributed_depth += 1;
@ -127,9 +128,28 @@ void executeQuery(
else else
throttler = user_level_throttler; throttler = user_level_throttler;
size_t shards = query_info.cluster->getShardCount();
for (const auto & shard_info : query_info.cluster->getShardsInfo()) for (const auto & shard_info : query_info.cluster->getShardsInfo())
{ {
stream_factory.createForShard(shard_info, query, query_ast, ASTPtr query_ast_for_shard;
if (settings.optimize_skip_unused_shards && settings.optimize_skip_unused_shards_rewrite_in && shards > 1)
{
query_ast_for_shard = query_ast->clone();
OptimizeShardingKeyRewriteInVisitor::Data visitor_data{
sharding_key_expr,
sharding_key_column_name,
shard_info,
not_optimized_cluster->getSlotToShard(),
};
OptimizeShardingKeyRewriteInVisitor visitor(visitor_data);
visitor.visit(query_ast_for_shard);
}
else
query_ast_for_shard = query_ast;
stream_factory.createForShard(shard_info,
query_ast_for_shard,
new_context, throttler, query_info, plans, new_context, throttler, query_info, plans,
remote_pipes, delayed_pipes, log); remote_pipes, delayed_pipes, log);
} }

View File

@ -8,11 +8,15 @@ namespace DB
struct Settings; struct Settings;
class Cluster; class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
struct SelectQueryInfo; struct SelectQueryInfo;
class Pipe; class Pipe;
class QueryPlan; class QueryPlan;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
namespace ClusterProxy namespace ClusterProxy
{ {
@ -35,7 +39,10 @@ ContextPtr updateSettingsForCluster(const Cluster & cluster, ContextPtr context,
void executeQuery( void executeQuery(
QueryPlan & query_plan, QueryPlan & query_plan,
IStreamFactory & stream_factory, Poco::Logger * log, IStreamFactory & stream_factory, Poco::Logger * log,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info); const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster);
} }

View File

@ -0,0 +1,110 @@
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/convertFieldToType.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <DataTypes/FieldToDataType.h>
#include <Interpreters/OptimizeShardingKeyRewriteInVisitor.h>
namespace
{
using namespace DB;
Field executeFunctionOnField(
const Field & field, const std::string & name,
const ExpressionActionsPtr & expr,
const std::string & sharding_key_column_name)
{
DataTypePtr type = applyVisitor(FieldToDataType{}, field);
ColumnWithTypeAndName column;
column.column = type->createColumnConst(1, field);
column.name = name;
column.type = type;
Block block{column};
size_t num_rows = 1;
expr->execute(block, num_rows);
ColumnWithTypeAndName & ret = block.getByName(sharding_key_column_name);
return (*ret.column)[0];
}
/// Return true if shard may contain such value (or it is unknown), otherwise false.
bool shardContains(
const Field & sharding_column_value,
const std::string & sharding_column_name,
const ExpressionActionsPtr & expr,
const std::string & sharding_key_column_name,
const Cluster::ShardInfo & shard_info,
const Cluster::SlotToShard & slots)
{
/// NULL is not allowed in sharding key,
/// so it should be safe to assume that shard cannot contain it.
if (sharding_column_value.isNull())
return false;
Field sharding_value = executeFunctionOnField(sharding_column_value, sharding_column_name, expr, sharding_key_column_name);
UInt64 value = sharding_value.get<UInt64>();
const auto shard_num = slots[value % slots.size()] + 1;
return shard_info.shard_num == shard_num;
}
}
namespace DB
{
bool OptimizeShardingKeyRewriteInMatcher::needChildVisit(ASTPtr & /*node*/, const ASTPtr & /*child*/)
{
return true;
}
void OptimizeShardingKeyRewriteInMatcher::visit(ASTPtr & node, Data & data)
{
if (auto * function = node->as<ASTFunction>())
visit(*function, data);
}
void OptimizeShardingKeyRewriteInMatcher::visit(ASTFunction & function, Data & data)
{
if (function.name != "in")
return;
auto * left = function.arguments->children.front().get();
auto * right = function.arguments->children.back().get();
auto * identifier = left->as<ASTIdentifier>();
if (!identifier)
return;
const auto & expr = data.sharding_key_expr;
const auto & sharding_key_column_name = data.sharding_key_column_name;
if (!expr->getRequiredColumnsWithTypes().contains(identifier->name()))
return;
/// NOTE: that we should not take care about empty tuple,
/// since after optimize_skip_unused_shards,
/// at least one element should match each shard.
if (auto * tuple_func = right->as<ASTFunction>(); tuple_func && tuple_func->name == "tuple")
{
auto * tuple_elements = tuple_func->children.front()->as<ASTExpressionList>();
std::erase_if(tuple_elements->children, [&](auto & child)
{
auto * literal = child->template as<ASTLiteral>();
return literal && !shardContains(literal->value, identifier->name(), expr, sharding_key_column_name, data.shard_info, data.slots);
});
}
else if (auto * tuple_literal = right->as<ASTLiteral>();
tuple_literal && tuple_literal->value.getType() == Field::Types::Tuple)
{
auto & tuple = tuple_literal->value.get<Tuple &>();
std::erase_if(tuple, [&](auto & child)
{
return !shardContains(child, identifier->name(), expr, sharding_key_column_name, data.shard_info, data.slots);
});
}
}
}

View File

@ -0,0 +1,41 @@
#pragma once
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/Cluster.h>
namespace DB
{
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class ASTFunction;
/// Rewrite `sharding_key IN (...)` for specific shard,
/// so that it will contain only values that belong to this specific shard.
///
/// See also:
/// - evaluateExpressionOverConstantCondition()
/// - StorageDistributed::createSelector()
/// - createBlockSelector()
struct OptimizeShardingKeyRewriteInMatcher
{
/// Cluster::SlotToShard
using SlotToShard = std::vector<UInt64>;
struct Data
{
const ExpressionActionsPtr & sharding_key_expr;
const std::string & sharding_key_column_name;
const Cluster::ShardInfo & shard_info;
const Cluster::SlotToShard & slots;
};
static bool needChildVisit(ASTPtr & /*node*/, const ASTPtr & /*child*/);
static void visit(ASTPtr & node, Data & data);
static void visit(ASTFunction & function, Data & data);
};
using OptimizeShardingKeyRewriteInVisitor = InDepthNodeVisitor<OptimizeShardingKeyRewriteInMatcher, true>;
}

View File

@ -118,6 +118,7 @@ SRCS(
OpenTelemetrySpanLog.cpp OpenTelemetrySpanLog.cpp
OptimizeIfChains.cpp OptimizeIfChains.cpp
OptimizeIfWithConstantConditionVisitor.cpp OptimizeIfWithConstantConditionVisitor.cpp
OptimizeShardingKeyRewriteInVisitor.cpp
PartLog.cpp PartLog.cpp
PredicateExpressionsOptimizer.cpp PredicateExpressionsOptimizer.cpp
PredicateRewriteVisitor.cpp PredicateRewriteVisitor.cpp

View File

@ -586,7 +586,9 @@ void StorageDistributed::read(
local_context->getExternalTables()); local_context->getExternalTables());
ClusterProxy::executeQuery(query_plan, select_stream_factory, log, ClusterProxy::executeQuery(query_plan, select_stream_factory, log,
modified_query_ast, local_context, query_info); modified_query_ast, local_context, query_info,
sharding_key_expr, sharding_key_column_name,
getCluster());
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier. /// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
if (!query_plan.isInitialized()) if (!query_plan.isInitialized())

View File

@ -0,0 +1,19 @@
(0, 2)
0 0
0 0
WITH CAST(\'default\', \'String\') AS id_no SELECT one.dummy, ignore(id_no) FROM system.one WHERE dummy IN (0, 2)
WITH CAST(\'default\', \'String\') AS id_no SELECT one.dummy, ignore(id_no) FROM system.one WHERE dummy IN (0, 2)
optimize_skip_unused_shards_rewrite_in(0, 2)
0 0
WITH CAST(\'default\', \'String\') AS id_02 SELECT one.dummy, ignore(id_02) FROM system.one WHERE dummy IN tuple(0)
WITH CAST(\'default\', \'String\') AS id_02 SELECT one.dummy, ignore(id_02) FROM system.one WHERE dummy IN tuple(2)
optimize_skip_unused_shards_rewrite_in(2,)
WITH CAST(\'default\', \'String\') AS id_2 SELECT one.dummy, ignore(id_2) FROM system.one WHERE dummy IN tuple(2)
optimize_skip_unused_shards_rewrite_in(0,)
0 0
WITH CAST(\'default\', \'String\') AS id_0 SELECT one.dummy, ignore(id_0) FROM system.one WHERE dummy IN tuple(0)
errors
others
0
0
0

View File

@ -0,0 +1,116 @@
-- NOTE: this test cannot use 'current_database = currentDatabase()',
-- because it does not propagated via remote queries,
-- hence it uses 'with (select currentDatabase()) as X'
-- (with subquery to expand it on the initiator).
drop table if exists dist_01756;
-- SELECT
-- intHash64(0) % 2,
-- intHash64(2) % 2
-- ┌─modulo(intHash64(0), 2)─┬─modulo(intHash64(2), 2)─┐
-- │ 0 │ 1 │
-- └─────────────────────────┴─────────────────────────┘
create table dist_01756 as system.one engine=Distributed(test_cluster_two_shards, system, one, intHash64(dummy));
-- separate log entry for localhost queries
set prefer_localhost_replica=0;
set force_optimize_skip_unused_shards=2;
set optimize_skip_unused_shards=1;
set optimize_skip_unused_shards_rewrite_in=0;
set log_queries=1;
--
-- w/o optimize_skip_unused_shards_rewrite_in=1
--
select '(0, 2)';
with (select currentDatabase()) as id_no select *, ignore(id_no) from dist_01756 where dummy in (0, 2);
system flush logs;
select query from system.query_log where
event_date = today() and
event_time > now() - interval 1 hour and
not is_initial_query and
query not like '%system.query_log%' and
query like concat('WITH%', currentDatabase(), '%AS id_no %') and
type = 'QueryFinish'
order by query;
--
-- w/ optimize_skip_unused_shards_rewrite_in=1
--
set optimize_skip_unused_shards_rewrite_in=1;
-- detailed coverage for realistic examples
select 'optimize_skip_unused_shards_rewrite_in(0, 2)';
with (select currentDatabase()) as id_02 select *, ignore(id_02) from dist_01756 where dummy in (0, 2);
system flush logs;
select query from system.query_log where
event_date = today() and
event_time > now() - interval 1 hour and
not is_initial_query and
query not like '%system.query_log%' and
query like concat('WITH%', currentDatabase(), '%AS id_02 %') and
type = 'QueryFinish'
order by query;
select 'optimize_skip_unused_shards_rewrite_in(2,)';
with (select currentDatabase()) as id_2 select *, ignore(id_2) from dist_01756 where dummy in (2,);
system flush logs;
select query from system.query_log where
event_date = today() and
event_time > now() - interval 1 hour and
not is_initial_query and
query not like '%system.query_log%' and
query like concat('WITH%', currentDatabase(), '%AS id_2 %') and
type = 'QueryFinish'
order by query;
select 'optimize_skip_unused_shards_rewrite_in(0,)';
with (select currentDatabase()) as id_0 select *, ignore(id_0) from dist_01756 where dummy in (0,);
system flush logs;
select query from system.query_log where
event_date = today() and
event_time > now() - interval 1 hour and
not is_initial_query and
query not like '%system.query_log%' and
query like concat('WITH%', currentDatabase(), '%AS id_0 %') and
type = 'QueryFinish'
order by query;
--
-- errors
--
select 'errors';
-- not tuple
select * from dist_01756 where dummy in (0); -- { serverError 507 }
-- optimize_skip_unused_shards does not support non-constants
select * from dist_01756 where dummy in (select * from system.one); -- { serverError 507 }
select * from dist_01756 where dummy in (toUInt8(0)); -- { serverError 507 }
-- wrong type
select * from dist_01756 where dummy in ('0'); -- { serverError 507 }
-- NOT IN does not supported
select * from dist_01756 where dummy not in (0, 2); -- { serverError 507 }
--
-- others
--
select 'others';
select * from dist_01756 where dummy not in (2, 3) and dummy in (0, 2);
select * from dist_01756 where dummy in tuple(0, 2);
select * from dist_01756 where dummy in tuple(0);
select * from dist_01756 where dummy in tuple(2);
-- Identifier is NULL
select (2 IN (2,)), * from dist_01756 where dummy in (0, 2) format Null;
-- Literal is NULL
select (dummy IN (toUInt8(2),)), * from dist_01756 where dummy in (0, 2) format Null;
-- different type
create table data_01756_str (key String) engine=Memory();
create table dist_01756_str as data_01756_str engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01756_str, cityHash64(key));
select * from dist_01756_str where key in ('0', '2');
select * from dist_01756_str where key in ('0', Null); -- { serverError 507 }
select * from dist_01756_str where key in (0, 2); -- { serverError 53 }
select * from dist_01756_str where key in (0, Null); -- { serverError 53 }