From 7a75144ce372da6134ae624668006c1348889d1e Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Thu, 19 Jan 2023 09:20:40 +0000 Subject: [PATCH] Refactor --- src/Interpreters/Cluster.cpp | 11 +- src/Interpreters/Cluster.h | 4 +- .../ClusterProxy/executeQuery.cpp | 13 +- src/Storages/StorageDistributed.cpp | 207 +++++++----------- 4 files changed, 92 insertions(+), 143 deletions(-) diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp index 9f0a9d3b35c..7e3e1baf6f2 100644 --- a/src/Interpreters/Cluster.cpp +++ b/src/Interpreters/Cluster.cpp @@ -653,9 +653,9 @@ void Cluster::initMisc() } } -std::unique_ptr Cluster::getClusterWithReplicasAsShards(const Settings & settings) const +std::unique_ptr Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard) const { - return std::unique_ptr{ new Cluster(ReplicasAsShardsTag{}, *this, settings)}; + return std::unique_ptr{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard)}; } std::unique_ptr Cluster::getClusterWithSingleShard(size_t index) const @@ -668,7 +668,7 @@ std::unique_ptr Cluster::getClusterWithMultipleShards(const std::vector return std::unique_ptr{ new Cluster(SubclusterTag{}, *this, indices) }; } -Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings) +Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard) { if (from.addresses_with_failover.empty()) throw Exception("Cluster is empty", ErrorCodes::LOGICAL_ERROR); @@ -678,6 +678,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti for (size_t shard_index : collections::range(0, from.shards_info.size())) { const auto & replicas = from.addresses_with_failover[shard_index]; + size_t replicas_used = 0; for (const auto & address : replicas) { if (!unique_hosts.emplace(address.host_name, address.port).second) @@ -685,6 +686,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti ShardInfo info; info.shard_num = ++shard_num; + ++replicas_used; if (address.is_local) info.local_addresses.push_back(address); @@ -711,6 +713,9 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti addresses_with_failover.emplace_back(Addresses{address}); shards_info.emplace_back(std::move(info)); + + if (max_replicas_from_shard && replicas_used == max_replicas_from_shard) + break; } } diff --git a/src/Interpreters/Cluster.h b/src/Interpreters/Cluster.h index ada04aa1cae..77e87e48ca7 100644 --- a/src/Interpreters/Cluster.h +++ b/src/Interpreters/Cluster.h @@ -250,7 +250,7 @@ public: std::unique_ptr getClusterWithMultipleShards(const std::vector & indices) const; /// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards. - std::unique_ptr getClusterWithReplicasAsShards(const Settings & settings) const; + std::unique_ptr getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0) const; /// Returns false if cluster configuration doesn't allow to use it for cross-replication. /// NOTE: true does not mean, that it's actually a cross-replication cluster. @@ -271,7 +271,7 @@ private: /// For getClusterWithReplicasAsShards implementation struct ReplicasAsShardsTag {}; - Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings); + Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard); /// Inter-server secret String secret; diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index d8d55b5486b..02673b9f7ac 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -191,18 +191,7 @@ void executeQuery( auto where_expression = select_query.where(); if (where_expression) - { - ASTPtr args = std::make_shared(); - args->children.push_back(where_expression); - args->children.push_back(shard_filter); - - auto and_function = std::make_shared(); - and_function->name = "and"; - and_function->arguments = args; - and_function->children.push_back(and_function->arguments); - - shard_filter = std::move(and_function); - } + shard_filter = makeASTFunction("and", where_expression, shard_filter); select_query.setExpression(ASTSelectQuery::Expression::WHERE, std::move(shard_filter)); } diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index a7c0dafaf62..82c38868cb4 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -445,10 +445,6 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( ClusterPtr cluster = getCluster(); - // if it's custom_key we will turn replicas into shards and filter specific data on each of them - if (settings.max_parallel_replicas > 1 && cluster->getShardCount() == 1 && settings.parallel_replicas_mode == ParallelReplicasMode::CUSTOM_KEY) - cluster = cluster->getClusterWithReplicasAsShards(settings); - query_info.cluster = cluster; size_t nodes = getClusterQueriedNodes(settings, cluster); @@ -758,15 +754,19 @@ void StorageDistributed::read( bool parallel_replicas = settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas && !settings.use_hedged_requests && settings.parallel_replicas_mode == ParallelReplicasMode::READ_TASKS; - auto shard_count = query_info.getCluster()->getShardCount(); - ClusterProxy::AdditionalShardFilterGenerator additional_shard_filter_generator; if (settings.max_parallel_replicas > 1 - && settings.parallel_replicas_mode == ParallelReplicasMode::CUSTOM_KEY) + && settings.parallel_replicas_mode == ParallelReplicasMode::CUSTOM_KEY + && getCluster()->getShardCount() == 1) { + LOG_INFO(log, "Single shard cluster used with custom_key, transforming replicas into shards"); + + query_info.cluster = getCluster()->getClusterWithReplicasAsShards(settings, settings.max_parallel_replicas); + query_info.optimized_cluster = nullptr; // it's a single shard cluster so nothing could've been optimized const std::string_view custom_key = settings.parallel_replicas_custom_key.value; - assert(!custom_key.empty()); + if (custom_key.empty()) + throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Parallel replicas mode set to 'custom_key' but 'parallel_replicas_custom_key' has no value"); ParserExpression parser; auto custom_key_ast = parseQuery( @@ -777,135 +777,90 @@ void StorageDistributed::read( settings.max_query_size, settings.max_parser_depth); - additional_shard_filter_generator = [&](uint64_t shard_num) -> ASTPtr + additional_shard_filter_generator = [&, custom_key_ast = std::move(custom_key_ast), shard_count = query_info.cluster->getShardCount()](uint64_t shard_num) mutable -> ASTPtr { - ParserExpression parser; - auto custom_key_ast = parseQuery( - parser, settings.parallel_replicas_custom_key.value.data(), settings.parallel_replicas_custom_key.value.data() + settings.parallel_replicas_custom_key.value.size(), - "parallel replicas custom key", settings.max_query_size, settings.max_parser_depth); - - ASTPtr shard_filter = nullptr ; if (settings.parallel_replicas_custom_key_filter_type == ParallelReplicasCustomKeyFilterType::DEFAULT) { // first we do modulo with replica count - ASTPtr args = std::make_shared(); - args->children.push_back(custom_key_ast); - args->children.push_back(std::make_shared(shard_count)); - - auto modulo_function = std::make_shared(); - modulo_function->name = "positiveModulo"; - modulo_function->arguments = args; - modulo_function->children.push_back(modulo_function->arguments); + auto modulo_function = makeASTFunction("positiveModulo", custom_key_ast, std::make_shared(shard_count)); /// then we compare result to the current replica number (offset) - args = std::make_shared(); - args->children.push_back(modulo_function); - args->children.push_back(std::make_shared(shard_num - 1)); - - auto equals_function = std::make_shared(); - equals_function->name = "equals"; - equals_function->arguments = args; - equals_function->children.push_back(equals_function->arguments); + auto equals_function = makeASTFunction("equals", std::move(modulo_function), std::make_shared(shard_num - 1)); return equals_function; } - else + + assert(settings.parallel_replicas_custom_key_filter_type == ParallelReplicasCustomKeyFilterType::RANGE); + + KeyDescription custom_key_description = KeyDescription::getKeyFromAST(custom_key_ast, getInMemoryMetadataPtr()->columns, local_context); + + using RelativeSize = boost::rational; + + RelativeSize size_of_universum = 0; + DataTypePtr custom_key_column_type = custom_key_description.data_types[0]; + + size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); + if (custom_key_description.data_types.size() == 1) { - assert(settings.parallel_replicas_custom_key_filter_type == ParallelReplicasCustomKeyFilterType::RANGE); - - KeyDescription custom_key_description = KeyDescription::getKeyFromAST(custom_key_ast, getInMemoryMetadataPtr()->columns, local_context); - - using RelativeSize = boost::rational; - - RelativeSize size_of_universum = 0; - DataTypePtr custom_key_column_type = custom_key_description.data_types[0]; - - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); - if (custom_key_description.data_types.size() == 1) - { - if (typeid_cast(custom_key_column_type.get())) - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); - else if (typeid_cast(custom_key_column_type.get())) - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); - else if (typeid_cast(custom_key_column_type.get())) - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); - else if (typeid_cast(custom_key_column_type.get())) - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); - } - - if (size_of_universum == RelativeSize(0)) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Invalid custom key column type: {}. Must be one unsigned integer type", custom_key_column_type->getName()); - - RelativeSize relative_range_size = RelativeSize(1) / query_info.getCluster()->getShardCount(); - RelativeSize relative_range_offset = relative_range_size * RelativeSize(shard_num - 1); - - /// Calculate the half-interval of `[lower, upper)` column values. - bool has_lower_limit = false; - bool has_upper_limit = false; - - RelativeSize lower_limit_rational = relative_range_offset * size_of_universum; - RelativeSize upper_limit_rational = (relative_range_offset + relative_range_size) * size_of_universum; - - UInt64 lower = boost::rational_cast(lower_limit_rational); - UInt64 upper = boost::rational_cast(upper_limit_rational); - - if (lower > 0) - has_lower_limit = true; - - if (upper_limit_rational < size_of_universum) - has_upper_limit = true; - - assert(has_lower_limit || has_upper_limit); - - /// Let's add the conditions to cut off something else when the index is scanned again and when the request is processed. - std::shared_ptr lower_function; - std::shared_ptr upper_function; - - if (has_lower_limit) - { - ASTPtr args = std::make_shared(); - args->children.push_back(custom_key_ast); - args->children.push_back(std::make_shared(lower)); - - lower_function = std::make_shared(); - lower_function->name = "greaterOrEquals"; - lower_function->arguments = args; - lower_function->children.push_back(lower_function->arguments); - - if (!has_upper_limit) - return lower_function; - } - - if (has_upper_limit) - { - ASTPtr args = std::make_shared(); - args->children.push_back(custom_key_ast); - args->children.push_back(std::make_shared(upper)); - - upper_function = std::make_shared(); - upper_function->name = "less"; - upper_function->arguments = args; - upper_function->children.push_back(upper_function->arguments); - - if (!has_lower_limit) - return upper_function; - } - - assert(has_lower_limit && has_upper_limit); - - ASTPtr args = std::make_shared(); - args->children.push_back(lower_function); - args->children.push_back(upper_function); - - auto and_function = std::make_shared(); - and_function->name = "and"; - and_function->arguments = args; - and_function->children.push_back(and_function->arguments); - - return and_function; + if (typeid_cast(custom_key_column_type.get())) + size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); + else if (typeid_cast(custom_key_column_type.get())) + size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); + else if (typeid_cast(custom_key_column_type.get())) + size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); + else if (typeid_cast(custom_key_column_type.get())) + size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); } + + if (size_of_universum == RelativeSize(0)) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Invalid custom key column type: {}. Must be one unsigned integer type", custom_key_column_type->getName()); + + RelativeSize relative_range_size = RelativeSize(1) / shard_count; + RelativeSize relative_range_offset = relative_range_size * RelativeSize(shard_num - 1); + + /// Calculate the half-interval of `[lower, upper)` column values. + bool has_lower_limit = false; + bool has_upper_limit = false; + + RelativeSize lower_limit_rational = relative_range_offset * size_of_universum; + RelativeSize upper_limit_rational = (relative_range_offset + relative_range_size) * size_of_universum; + + UInt64 lower = boost::rational_cast(lower_limit_rational); + UInt64 upper = boost::rational_cast(upper_limit_rational); + + if (lower > 0) + has_lower_limit = true; + + if (upper_limit_rational < size_of_universum) + has_upper_limit = true; + + assert(has_lower_limit || has_upper_limit); + + /// Let's add the conditions to cut off something else when the index is scanned again and when the request is processed. + std::shared_ptr lower_function; + std::shared_ptr upper_function; + + if (has_lower_limit) + { + lower_function = makeASTFunction("greaterOrEquals", custom_key_ast, std::make_shared(lower)); + + if (!has_upper_limit) + return lower_function; + } + + if (has_upper_limit) + { + upper_function = makeASTFunction("less", custom_key_ast, std::make_shared(upper)); + + if (!has_lower_limit) + return upper_function; + } + + assert(upper_function && lower_function); + + return makeASTFunction("and", std::move(lower_function), std::move(upper_function)); }; }