Merge pull request #45108 from ClickHouse/custom-key-parallel-replicas

Add support for custom key in parallel replicas
This commit is contained in:
Antonio Andelic 2023-03-08 17:46:11 +01:00 committed by GitHub
commit a04b38db90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 863 additions and 85 deletions

View File

@ -1248,7 +1248,9 @@ Possible values:
Default value: 1.
:::warning
Disable this setting if you use [max_parallel_replicas](#settings-max_parallel_replicas).
Disable this setting if you use [max_parallel_replicas](#settings-max_parallel_replicas) without [parallel_replicas_custom_key](#settings-parallel_replicas_custom_key).
If [parallel_replicas_custom_key](#settings-parallel_replicas_custom_key) is set, disable this setting only if it's used on a cluster with multiple shards containing multiple replicas.
If it's used on a cluster with a single shard and multiple replicas, disabling this setting will have negative effects.
:::
## totals_mode {#totals-mode}
@ -1273,16 +1275,47 @@ Default value: `1`.
**Additional Info**
This setting is useful for replicated tables with a sampling key. A query may be processed faster if it is executed on several servers in parallel. But the query performance may degrade in the following cases:
This options will produce different results depending on the settings used.
:::warning
This setting will produce incorrect results when joins or subqueries are involved, and all tables don't meet certain requirements. See [Distributed Subqueries and max_parallel_replicas](../../sql-reference/operators/in.md/#max_parallel_replica-subqueries) for more details.
:::
### Parallel processing using `SAMPLE` key
A query may be processed faster if it is executed on several servers in parallel. But the query performance may degrade in the following cases:
- The position of the sampling key in the partitioning key does not allow efficient range scans.
- Adding a sampling key to the table makes filtering by other columns less efficient.
- The sampling key is an expression that is expensive to calculate.
- The cluster latency distribution has a long tail, so that querying more servers increases the query overall latency.
:::warning
This setting will produce incorrect results when joins or subqueries are involved, and all tables don't meet certain requirements. See [Distributed Subqueries and max_parallel_replicas](../../sql-reference/operators/in.md/#max_parallel_replica-subqueries) for more details.
:::
### Parallel processing using [parallel_replicas_custom_key](#settings-parallel_replicas_custom_key)
This setting is useful for any replicated table.
## parallel_replicas_custom_key {#settings-parallel_replicas_custom_key}
An arbitrary integer expression that can be used to split work between replicas for a specific table.
The value can be any integer expression.
A query may be processed faster if it is executed on several servers in parallel but it depends on the used [parallel_replicas_custom_key](#settings-parallel_replicas_custom_key)
and [parallel_replicas_custom_key_filter_type](#settings-parallel_replicas_custom_key_filter_type).
Simple expressions using primary keys are preferred.
If the setting is used on a cluster that consists of a single shard with multiple replicas, those replicas will be converted into virtual shards.
Otherwise, it will behave same as for `SAMPLE` key, it will use multiple replicas of each shard.
## parallel_replicas_custom_key_filter_type {#settings-parallel_replicas_custom_key_filter_type}
How to use `parallel_replicas_custom_key` expression for splitting work between replicas.
Possible values:
- `default` — Use the default implementation using modulo operation on the `parallel_replicas_custom_key`.
- `range` — Split the entire value space of the expression in the ranges. This type of filtering is useful if values of `parallel_replicas_custom_key` are uniformly spread across the entire integer space, e.g. hash values.
Default value: `default`.
## compile_expressions {#compile-expressions}

View File

@ -233,8 +233,9 @@ If `some_predicate` is not selective enough, it will return large amount of data
### Distributed Subqueries and max_parallel_replicas
When max_parallel_replicas is greater than 1, distributed queries are further transformed. For example, the following:
When [max_parallel_replicas](#settings-max_parallel_replicas) is greater than 1, distributed queries are further transformed.
For example, the following:
```sql
SELECT CounterID, count() FROM distributed_table_1 WHERE UserID IN (SELECT UserID FROM local_table_2 WHERE CounterID < 100)
SETTINGS max_parallel_replicas=3
@ -247,8 +248,12 @@ SELECT CounterID, count() FROM local_table_1 WHERE UserID IN (SELECT UserID FROM
SETTINGS parallel_replicas_count=3, parallel_replicas_offset=M
```
where M is between 1 and 3 depending on which replica the local query is executing on. These settings affect every MergeTree-family table in the query and have the same effect as applying `SAMPLE 1/3 OFFSET (M-1)/3` on each table.
where M is between 1 and 3 depending on which replica the local query is executing on.
Therefore adding the max_parallel_replicas setting will only produce correct results if both tables have the same replication scheme and are sampled by UserID or a subkey of it. In particular, if local_table_2 does not have a sampling key, incorrect results will be produced. The same rule applies to JOIN.
These settings affect every MergeTree-family table in the query and have the same effect as applying `SAMPLE 1/3 OFFSET (M-1)/3` on each table.
Therefore adding the [max_parallel_replicas](#settings-max_parallel_replicas) setting will only produce correct results if both tables have the same replication scheme and are sampled by UserID or a subkey of it. In particular, if local_table_2 does not have a sampling key, incorrect results will be produced. The same rule applies to JOIN.
One workaround if local_table_2 does not meet the requirements, is to use `GLOBAL IN` or `GLOBAL JOIN`.
If a table doesn't have a sampling key, more flexible options for [parallel_replicas_custom_key](#settings-parallel_replicas_custom_key) can be used that can produce different and more optimal behaviour.

View File

@ -147,6 +147,8 @@ class IColumn;
M(UInt64, max_parallel_replicas, 1, "The maximum number of replicas of each shard used when the query is executed. For consistency (to get different parts of the same partition), this option only works for the specified sampling key. The lag of the replicas is not controlled.", 0) \
M(UInt64, parallel_replicas_count, 0, "This is internal setting that should not be used directly and represents an implementation detail of the 'parallel replicas' mode. This setting will be automatically set up by the initiator server for distributed queries to the number of parallel replicas participating in query processing.", 0) \
M(UInt64, parallel_replica_offset, 0, "This is internal setting that should not be used directly and represents an implementation detail of the 'parallel replicas' mode. This setting will be automatically set up by the initiator server for distributed queries to the index of the replica participating in query processing among parallel replicas.", 0) \
M(String, parallel_replicas_custom_key, "", "Custom key assigning work to replicas when parallel replicas are used.", 0) \
M(ParallelReplicasCustomKeyFilterType, parallel_replicas_custom_key_filter_type, ParallelReplicasCustomKeyFilterType::DEFAULT, "Type of filter to use with custom key for parallel replicas. default - use modulo operation on the custom key, range - use range filter on custom key using all possible values for the value type of custom key.", 0) \
\
M(String, cluster_for_parallel_replicas, "default", "Cluster for a shard in which current server is located", 0) \
M(Bool, allow_experimental_parallel_reading_from_replicas, false, "If true, ClickHouse will send a SELECT query to all replicas of a table. It will work for any kind on MergeTree table.", 0) \

View File

@ -167,6 +167,10 @@ IMPLEMENT_SETTING_ENUM(Dialect, ErrorCodes::BAD_ARGUMENTS,
{{"clickhouse", Dialect::clickhouse},
{"kusto", Dialect::kusto}})
IMPLEMENT_SETTING_ENUM(ParallelReplicasCustomKeyFilterType, ErrorCodes::BAD_ARGUMENTS,
{{"default", ParallelReplicasCustomKeyFilterType::DEFAULT},
{"range", ParallelReplicasCustomKeyFilterType::RANGE}})
IMPLEMENT_SETTING_ENUM(LocalFSReadMethod, ErrorCodes::BAD_ARGUMENTS,
{{"mmap", LocalFSReadMethod::mmap},
{"pread", LocalFSReadMethod::pread},

View File

@ -203,5 +203,13 @@ enum class Dialect
DECLARE_SETTING_ENUM(Dialect)
enum class ParallelReplicasCustomKeyFilterType : uint8_t
{
DEFAULT,
RANGE,
};
DECLARE_SETTING_ENUM(ParallelReplicasCustomKeyFilterType)
DECLARE_SETTING_ENUM(LocalFSReadMethod)
}

View File

@ -15,6 +15,7 @@
#include <base/sort.h>
#include <boost/range/algorithm_ext/erase.hpp>
#include <span>
namespace DB
{
@ -509,7 +510,6 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
shard_local_addresses.push_back(replica);
shard_all_addresses.push_back(replica);
}
ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
all_replicas_pools, settings.load_balancing,
settings.distributed_replica_error_half_life.totalSeconds(), settings.distributed_replica_error_cap);
@ -653,9 +653,9 @@ void Cluster::initMisc()
}
}
std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings) const
std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard) const
{
return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings)};
return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard)};
}
std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard(size_t index) const
@ -668,7 +668,44 @@ std::unique_ptr<Cluster> Cluster::getClusterWithMultipleShards(const std::vector
return std::unique_ptr<Cluster>{ new Cluster(SubclusterTag{}, *this, indices) };
}
Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings)
namespace
{
void shuffleReplicas(std::vector<Cluster::Address> & replicas, const Settings & settings, size_t replicas_needed)
{
std::random_device rd;
std::mt19937 gen{rd()};
if (settings.prefer_localhost_replica)
{
// force for local replica to always be included
auto first_non_local_replica = std::partition(replicas.begin(), replicas.end(), [](const auto & replica) { return replica.is_local; });
size_t local_replicas_count = first_non_local_replica - replicas.begin();
if (local_replicas_count == replicas_needed)
{
/// we have exact amount of local replicas as needed, no need to do anything
return;
}
if (local_replicas_count > replicas_needed)
{
/// we can use only local replicas, shuffle them
std::shuffle(replicas.begin(), first_non_local_replica, gen);
return;
}
/// shuffle just non local replicas
std::shuffle(first_non_local_replica, replicas.end(), gen);
return;
}
std::shuffle(replicas.begin(), replicas.end(), gen);
}
}
Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard)
{
if (from.addresses_with_failover.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster is empty");
@ -677,40 +714,55 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
std::set<std::pair<String, int>> unique_hosts;
for (size_t shard_index : collections::range(0, from.shards_info.size()))
{
const auto & replicas = from.addresses_with_failover[shard_index];
for (const auto & address : replicas)
auto create_shards_from_replicas = [&](std::span<const Address> replicas)
{
if (!unique_hosts.emplace(address.host_name, address.port).second)
continue; /// Duplicate host, skip.
for (const auto & address : replicas)
{
if (!unique_hosts.emplace(address.host_name, address.port).second)
continue; /// Duplicate host, skip.
ShardInfo info;
info.shard_num = ++shard_num;
ShardInfo info;
info.shard_num = ++shard_num;
if (address.is_local)
info.local_addresses.push_back(address);
if (address.is_local)
info.local_addresses.push_back(address);
info.all_addresses.push_back(address);
info.all_addresses.push_back(address);
auto pool = ConnectionPoolFactory::instance().get(
static_cast<unsigned>(settings.distributed_connections_pool_size),
address.host_name,
address.port,
address.default_database,
address.user,
address.password,
address.quota_key,
address.cluster,
address.cluster_secret,
"server",
address.compression,
address.secure,
address.priority);
auto pool = ConnectionPoolFactory::instance().get(
static_cast<unsigned>(settings.distributed_connections_pool_size),
address.host_name,
address.port,
address.default_database,
address.user,
address.password,
address.quota_key,
address.cluster,
address.cluster_secret,
"server",
address.compression,
address.secure,
address.priority);
info.pool = std::make_shared<ConnectionPoolWithFailover>(ConnectionPoolPtrs{pool}, settings.load_balancing);
info.per_replica_pools = {std::move(pool)};
info.pool = std::make_shared<ConnectionPoolWithFailover>(ConnectionPoolPtrs{pool}, settings.load_balancing);
info.per_replica_pools = {std::move(pool)};
addresses_with_failover.emplace_back(Addresses{address});
shards_info.emplace_back(std::move(info));
addresses_with_failover.emplace_back(Addresses{address});
shards_info.emplace_back(std::move(info));
}
};
const auto & replicas = from.addresses_with_failover[shard_index];
if (!max_replicas_from_shard || replicas.size() <= max_replicas_from_shard)
{
create_shards_from_replicas(replicas);
}
else
{
auto shuffled_replicas = replicas;
// shuffle replicas so we don't always pick the same subset
shuffleReplicas(shuffled_replicas, settings, max_replicas_from_shard);
create_shards_from_replicas(std::span{shuffled_replicas.begin(), max_replicas_from_shard});
}
}

View File

@ -250,7 +250,7 @@ public:
std::unique_ptr<Cluster> getClusterWithMultipleShards(const std::vector<size_t> & indices) const;
/// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards.
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings) const;
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0) const;
/// Returns false if cluster configuration doesn't allow to use it for cross-replication.
/// NOTE: true does not mean, that it's actually a cross-replication cluster.
@ -271,7 +271,7 @@ private:
/// For getClusterWithReplicasAsShards implementation
struct ReplicasAsShardsTag {};
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings);
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard);
/// Inter-server secret
String secret;

View File

@ -10,6 +10,7 @@
#include <Interpreters/IInterpreter.h>
#include <Interpreters/OptimizeShardingKeyRewriteInVisitor.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTFunction.h>
#include <Interpreters/ProcessList.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromRemote.h>
@ -20,6 +21,7 @@
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageReplicatedMergeTree.h>
namespace DB
{
@ -157,7 +159,8 @@ void executeQuery(
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster)
const ClusterPtr & not_optimized_cluster,
AdditionalShardFilterGenerator shard_filter_generator)
{
const Settings & settings = context->getSettingsRef();
@ -189,7 +192,22 @@ void executeQuery(
visitor.visit(query_ast_for_shard);
}
else
query_ast_for_shard = query_ast;
query_ast_for_shard = query_ast->clone();
if (shard_filter_generator)
{
auto shard_filter = shard_filter_generator(shard_info.shard_num);
if (shard_filter)
{
auto & select_query = query_ast_for_shard->as<ASTSelectQuery &>();
auto where_expression = select_query.where();
if (where_expression)
shard_filter = makeASTFunction("and", where_expression, shard_filter);
select_query.setExpression(ASTSelectQuery::Expression::WHERE, std::move(shard_filter));
}
}
stream_factory.createForShard(shard_info,
query_ast_for_shard, main_table, table_func_ptr,

View File

@ -37,6 +37,7 @@ class SelectStreamFactory;
ContextMutablePtr updateSettingsForCluster(
const Cluster & cluster, ContextPtr context, const Settings & settings, const StorageID & main_table, const SelectQueryInfo * query_info = nullptr, Poco::Logger * log = nullptr);
using AdditionalShardFilterGenerator = std::function<ASTPtr(uint64_t)>;
/// Execute a distributed query, creating a query plan, from which the query pipeline can be built.
/// `stream_factory` object encapsulates the logic of creating plans for a different type of query
/// (currently SELECT, DESCRIBE).
@ -50,7 +51,8 @@ void executeQuery(
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster);
const ClusterPtr & not_optimized_cluster,
AdditionalShardFilterGenerator shard_filter_generator = {});
void executeQueryWithParallelReplicas(

View File

@ -4056,21 +4056,34 @@ std::shared_ptr<AsyncReadCounters> Context::getAsyncReadCounters() const
return async_read_counters;
}
Context::ParallelReplicasMode Context::getParallelReplicasMode() const
{
const auto & settings = getSettingsRef();
using enum Context::ParallelReplicasMode;
if (!settings.parallel_replicas_custom_key.value.empty())
return CUSTOM_KEY;
if (settings.allow_experimental_parallel_reading_from_replicas
&& !settings.use_hedged_requests)
return READ_TASKS;
return SAMPLE_KEY;
}
bool Context::canUseParallelReplicasOnInitiator() const
{
const auto & settings = getSettingsRef();
return settings.allow_experimental_parallel_reading_from_replicas
return getParallelReplicasMode() == ParallelReplicasMode::READ_TASKS
&& settings.max_parallel_replicas > 1
&& !settings.use_hedged_requests
&& !getClientInfo().collaborate_with_initiator;
}
bool Context::canUseParallelReplicasOnFollower() const
{
const auto & settings = getSettingsRef();
return settings.allow_experimental_parallel_reading_from_replicas
return getParallelReplicasMode() == ParallelReplicasMode::READ_TASKS
&& settings.max_parallel_replicas > 1
&& !settings.use_hedged_requests
&& getClientInfo().collaborate_with_initiator;
}

View File

@ -1123,6 +1123,15 @@ public:
bool canUseParallelReplicasOnInitiator() const;
bool canUseParallelReplicasOnFollower() const;
enum class ParallelReplicasMode : uint8_t
{
SAMPLE_KEY,
CUSTOM_KEY,
READ_TASKS,
};
ParallelReplicasMode getParallelReplicasMode() const;
private:
std::unique_lock<std::recursive_mutex> getLock() const;

View File

@ -38,6 +38,7 @@
#include <Interpreters/QueryLog.h>
#include <Interpreters/replaceAliasColumnsInQuery.h>
#include <Interpreters/RewriteCountDistinctVisitor.h>
#include <Interpreters/getCustomKeyFilterForParallelReplicas.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/QueryPlan/AggregatingStep.h>
@ -114,6 +115,7 @@ namespace ErrorCodes
extern const int INVALID_WITH_FILL_EXPRESSION;
extern const int ACCESS_DENIED;
extern const int UNKNOWN_IDENTIFIER;
extern const int BAD_ARGUMENTS;
}
/// Assumes `storage` is set and the table filter (row-level security) is not empty.
@ -229,10 +231,13 @@ InterpreterSelectQuery::InterpreterSelectQuery(
InterpreterSelectQuery::~InterpreterSelectQuery() = default;
namespace
{
/** There are no limits on the maximum size of the result for the subquery.
* Since the result of the query is not the result of the entire query.
*/
static ContextPtr getSubqueryContext(const ContextPtr & context)
ContextPtr getSubqueryContext(const ContextPtr & context)
{
auto subquery_context = Context::createCopy(context);
Settings subquery_settings = context->getSettings();
@ -244,7 +249,7 @@ static ContextPtr getSubqueryContext(const ContextPtr & context)
return subquery_context;
}
static void rewriteMultipleJoins(ASTPtr & query, const TablesWithColumns & tables, const String & database, const Settings & settings)
void rewriteMultipleJoins(ASTPtr & query, const TablesWithColumns & tables, const String & database, const Settings & settings)
{
ASTSelectQuery & select = query->as<ASTSelectQuery &>();
@ -264,7 +269,7 @@ static void rewriteMultipleJoins(ASTPtr & query, const TablesWithColumns & table
}
/// Checks that the current user has the SELECT privilege.
static void checkAccessRightsForSelect(
void checkAccessRightsForSelect(
const ContextPtr & context,
const StorageID & table_id,
const StorageMetadataPtr & table_metadata,
@ -294,7 +299,7 @@ static void checkAccessRightsForSelect(
context->checkAccess(AccessType::SELECT, table_id, syntax_analyzer_result.requiredSourceColumnsForAccessCheck());
}
static ASTPtr parseAdditionalFilterConditionForTable(
ASTPtr parseAdditionalFilterConditionForTable(
const Map & setting,
const DatabaseAndTableWithAlias & target,
const Context & context)
@ -322,7 +327,7 @@ static ASTPtr parseAdditionalFilterConditionForTable(
}
/// Returns true if we should ignore quotas and limits for a specified table in the system database.
static bool shouldIgnoreQuotaAndLimits(const StorageID & table_id)
bool shouldIgnoreQuotaAndLimits(const StorageID & table_id)
{
if (table_id.database_name == DatabaseCatalog::SYSTEM_DATABASE)
{
@ -333,6 +338,8 @@ static bool shouldIgnoreQuotaAndLimits(const StorageID & table_id)
return false;
}
}
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const ContextPtr & context_,
@ -448,10 +455,11 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}
if (joined_tables.tablesCount() > 1 && settings.allow_experimental_parallel_reading_from_replicas)
if (joined_tables.tablesCount() > 1 && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas))
{
LOG_WARNING(log, "Joins are not supported with parallel replicas. Query will be executed without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
context->setSetting("parallel_replicas_custom_key", String{""});
}
/// Rewrite JOINs
@ -509,6 +517,42 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query_info.additional_filter_ast = parseAdditionalFilterConditionForTable(
settings.additional_table_filters, joined_tables.tablesWithColumns().front().table, *context);
ASTPtr parallel_replicas_custom_filter_ast = nullptr;
if (context->getParallelReplicasMode() == Context::ParallelReplicasMode::CUSTOM_KEY && !joined_tables.tablesWithColumns().empty())
{
if (settings.parallel_replicas_count > 1)
{
if (auto custom_key_ast = parseCustomKeyForTable(settings.parallel_replicas_custom_key, *context))
{
LOG_TRACE(log, "Processing query on a replica using custom_key '{}'", settings.parallel_replicas_custom_key.value);
if (!storage)
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Storage is unknown when trying to parse custom key for parallel replica");
parallel_replicas_custom_filter_ast = getCustomKeyFilterForParallelReplica(
settings.parallel_replicas_count,
settings.parallel_replica_offset,
std::move(custom_key_ast),
settings.parallel_replicas_custom_key_filter_type,
*storage,
context);
}
else if (settings.parallel_replica_offset > 0)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Parallel replicas processing with custom_key has been requested "
"(setting 'max_parallel_replicas') but the table does not have custom_key defined for it "
"or it's invalid (settings `parallel_replicas_custom_key`)");
}
}
else if (auto * distributed = dynamic_cast<StorageDistributed *>(storage.get());
distributed && canUseCustomKey(settings, *distributed->getCluster(), *context))
{
query_info.use_custom_key = true;
context->setSetting("distributed_group_by_no_merge", 2);
}
}
if (autoFinalOnQuery(query))
{
query.setFinal();
@ -693,6 +737,16 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query_info.filter_asts.push_back(query_info.additional_filter_ast);
}
if (parallel_replicas_custom_filter_ast)
{
parallel_replicas_custom_filter_info = generateFilterActions(
table_id, parallel_replicas_custom_filter_ast, context, storage, storage_snapshot, metadata_snapshot, required_columns,
prepared_sets);
parallel_replicas_custom_filter_info->do_remove_column = true;
query_info.filter_asts.push_back(parallel_replicas_custom_filter_ast);
}
source_header = storage_snapshot->getSampleBlockForColumns(required_columns, parameter_values);
}
@ -1435,17 +1489,23 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
query_plan.addStep(std::move(row_level_security_step));
}
if (additional_filter_info)
const auto add_filter_step = [&](const auto & new_filter_info, const std::string & description)
{
auto additional_filter_step = std::make_unique<FilterStep>(
auto filter_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
additional_filter_info->actions,
additional_filter_info->column_name,
additional_filter_info->do_remove_column);
new_filter_info->actions,
new_filter_info->column_name,
new_filter_info->do_remove_column);
additional_filter_step->setStepDescription("Additional filter");
query_plan.addStep(std::move(additional_filter_step));
}
filter_step->setStepDescription(description);
query_plan.addStep(std::move(filter_step));
};
if (additional_filter_info)
add_filter_step(additional_filter_info, "Additional filter");
if (parallel_replicas_custom_filter_info)
add_filter_step(parallel_replicas_custom_filter_info, "Parallel replica custom key filter");
if (expressions.before_array_join)
{

View File

@ -215,6 +215,9 @@ private:
/// For additional_filter setting.
FilterDAGInfoPtr additional_filter_info;
/// For "per replica" filter when multiple replicas are used
FilterDAGInfoPtr parallel_replicas_custom_filter_info;
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
/// List of columns to read to execute the query.

View File

@ -0,0 +1,134 @@
#include <Interpreters/getCustomKeyFilterForParallelReplicas.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSampleRatio.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Interpreters/Context.h>
#include <DataTypes/DataTypesNumber.h>
#include <boost/rational.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
}
bool canUseCustomKey(const Settings & settings, const Cluster & cluster, const Context & context)
{
return settings.max_parallel_replicas > 1 && context.getParallelReplicasMode() == Context::ParallelReplicasMode::CUSTOM_KEY
&& cluster.getShardCount() == 1 && cluster.getShardsInfo()[0].getAllNodeCount() > 1;
}
ASTPtr getCustomKeyFilterForParallelReplica(
size_t replicas_count,
size_t replica_num,
ASTPtr custom_key_ast,
ParallelReplicasCustomKeyFilterType filter_type,
const IStorage & storage,
const ContextPtr & context)
{
assert(replicas_count > 1);
if (filter_type == ParallelReplicasCustomKeyFilterType::DEFAULT)
{
// first we do modulo with replica count
auto modulo_function = makeASTFunction("positiveModulo", custom_key_ast, std::make_shared<ASTLiteral>(replicas_count));
/// then we compare result to the current replica number (offset)
auto equals_function = makeASTFunction("equals", std::move(modulo_function), std::make_shared<ASTLiteral>(replica_num));
return equals_function;
}
assert(filter_type == ParallelReplicasCustomKeyFilterType::RANGE);
KeyDescription custom_key_description
= KeyDescription::getKeyFromAST(custom_key_ast, storage.getInMemoryMetadataPtr()->columns, context);
using RelativeSize = boost::rational<ASTSampleRatio::BigNum>;
RelativeSize size_of_universum = 0;
DataTypePtr custom_key_column_type = custom_key_description.data_types[0];
size_of_universum = RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1);
if (custom_key_description.data_types.size() == 1)
{
if (typeid_cast<const DataTypeUInt64 *>(custom_key_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt64>::max()) + RelativeSize(1);
else if (typeid_cast<const DataTypeUInt32 *>(custom_key_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1);
else if (typeid_cast<const DataTypeUInt16 *>(custom_key_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt16>::max()) + RelativeSize(1);
else if (typeid_cast<const DataTypeUInt8 *>(custom_key_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt8>::max()) + RelativeSize(1);
}
if (size_of_universum == RelativeSize(0))
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER,
"Invalid custom key column type: {}. Must be one unsigned integer type",
custom_key_column_type->getName());
RelativeSize relative_range_size = RelativeSize(1) / replicas_count;
RelativeSize relative_range_offset = relative_range_size * RelativeSize(replica_num);
/// Calculate the half-interval of `[lower, upper)` column values.
bool has_lower_limit = false;
bool has_upper_limit = false;
RelativeSize lower_limit_rational = relative_range_offset * size_of_universum;
RelativeSize upper_limit_rational = (relative_range_offset + relative_range_size) * size_of_universum;
UInt64 lower = boost::rational_cast<ASTSampleRatio::BigNum>(lower_limit_rational);
UInt64 upper = boost::rational_cast<ASTSampleRatio::BigNum>(upper_limit_rational);
if (lower > 0)
has_lower_limit = true;
if (upper_limit_rational < size_of_universum)
has_upper_limit = true;
assert(has_lower_limit || has_upper_limit);
/// Let's add the conditions to cut off something else when the index is scanned again and when the request is processed.
std::shared_ptr<ASTFunction> lower_function;
std::shared_ptr<ASTFunction> upper_function;
if (has_lower_limit)
{
lower_function = makeASTFunction("greaterOrEquals", custom_key_ast, std::make_shared<ASTLiteral>(lower));
if (!has_upper_limit)
return lower_function;
}
if (has_upper_limit)
{
upper_function = makeASTFunction("less", custom_key_ast, std::make_shared<ASTLiteral>(upper));
if (!has_lower_limit)
return upper_function;
}
assert(upper_function && lower_function);
return makeASTFunction("and", std::move(lower_function), std::move(upper_function));
}
ASTPtr parseCustomKeyForTable(const String & custom_key, const Context & context)
{
/// Try to parse expression
ParserExpression parser;
const auto & settings = context.getSettingsRef();
return parseQuery(
parser, custom_key.data(), custom_key.data() + custom_key.size(),
"parallel replicas custom key", settings.max_query_size, settings.max_parser_depth);
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Interpreters/Cluster.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage.h>
#include <Core/SettingsEnums.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
namespace DB
{
bool canUseCustomKey(const Settings & settings, const Cluster & cluster, const Context & context);
/// Get AST for filter created from custom_key
/// replica_num is the number of the replica for which we are generating filter starting from 0
ASTPtr getCustomKeyFilterForParallelReplica(
size_t replicas_count,
size_t replica_num,
ASTPtr custom_key_ast,
ParallelReplicasCustomKeyFilterType filter_type,
const IStorage & storage,
const ContextPtr & context);
ASTPtr parseCustomKeyForTable(const String & custom_keys, const Context & context);
}

View File

@ -588,18 +588,24 @@ MergeTreeDataSelectSamplingData MergeTreeDataSelectExecutor::getSampling(
* It is also important that the entire universe can be covered using SAMPLE 0.1 OFFSET 0, ... OFFSET 0.9 and similar decimals.
*/
auto parallel_replicas_mode = context->getParallelReplicasMode();
/// Parallel replicas has been requested but there is no way to sample data.
/// Select all data from first replica and no data from other replicas.
if (settings.parallel_replicas_count > 1 && !data.supportsSampling() && settings.parallel_replica_offset > 0)
if (settings.parallel_replicas_count > 1 && parallel_replicas_mode == Context::ParallelReplicasMode::SAMPLE_KEY
&& !data.supportsSampling() && settings.parallel_replica_offset > 0)
{
LOG_DEBUG(log, "Will use no data on this replica because parallel replicas processing has been requested"
LOG_DEBUG(
log,
"Will use no data on this replica because parallel replicas processing has been requested"
" (the setting 'max_parallel_replicas') but the table does not support sampling and this replica is not the first.");
sampling.read_nothing = true;
return sampling;
}
sampling.use_sampling = relative_sample_size > 0 || (settings.parallel_replicas_count > 1 && data.supportsSampling());
bool no_data = false; /// There is nothing left after sampling.
sampling.use_sampling = relative_sample_size > 0
|| (settings.parallel_replicas_count > 1 && parallel_replicas_mode == Context::ParallelReplicasMode::SAMPLE_KEY
&& data.supportsSampling());
bool no_data = false; /// There is nothing left after sampling.
if (sampling.use_sampling)
{

View File

@ -207,6 +207,8 @@ struct SelectQueryInfo
///
/// Configured in StorageDistributed::getQueryProcessingStage()
ClusterPtr optimized_cluster;
/// should we use custom key with the cluster
bool use_custom_key = false;
mutable ParallelReplicasReadingCoordinatorPtr coordinator;
@ -218,6 +220,8 @@ struct SelectQueryInfo
/// It is needed for PK analysis based on row_level_policy and additional_filters.
ASTs filter_asts;
ASTPtr parallel_replica_custom_key_ast;
/// Filter actions dag for current storage
ActionsDAGPtr filter_actions_dag;

View File

@ -51,6 +51,7 @@
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterDescribeQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
@ -65,6 +66,8 @@
#include <Interpreters/getClusterName.h>
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <Interpreters/getCustomKeyFilterForParallelReplicas.h>
#include <Functions/IFunction.h>
#include <TableFunctions/TableFunctionView.h>
#include <TableFunctions/TableFunctionFactory.h>
@ -82,6 +85,7 @@
#include <Processors/Sinks/EmptySink.h>
#include <Core/Settings.h>
#include <Core/SettingsEnums.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
@ -262,7 +266,6 @@ size_t getClusterQueriedNodes(const Settings & settings, const ClusterPtr & clus
}
/// For destruction of std::unique_ptr of type that is incomplete in class definition.
StorageDistributed::~StorageDistributed() = default;
@ -400,29 +403,38 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
const auto & settings = local_context->getSettingsRef();
ClusterPtr cluster = getCluster();
query_info.cluster = cluster;
size_t nodes = getClusterQueriedNodes(settings, cluster);
/// Always calculate optimized cluster here, to avoid conditions during read()
/// (Anyway it will be calculated in the read())
if (nodes > 1 && settings.optimize_skip_unused_shards)
if (query_info.use_custom_key)
{
ClusterPtr optimized_cluster = getOptimizedCluster(local_context, storage_snapshot, query_info.query);
if (optimized_cluster)
{
LOG_DEBUG(log, "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}",
makeFormattedListOfShards(optimized_cluster));
LOG_INFO(log, "Single shard cluster used with custom_key, transforming replicas into virtual shards");
query_info.cluster = cluster->getClusterWithReplicasAsShards(settings, settings.max_parallel_replicas);
}
else
{
query_info.cluster = cluster;
cluster = optimized_cluster;
query_info.optimized_cluster = cluster;
nodes = getClusterQueriedNodes(settings, cluster);
}
else
if (nodes > 1 && settings.optimize_skip_unused_shards)
{
LOG_DEBUG(log, "Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the cluster{}",
has_sharding_key ? "" : " (no sharding key)");
/// Always calculate optimized cluster here, to avoid conditions during read()
/// (Anyway it will be calculated in the read())
ClusterPtr optimized_cluster = getOptimizedCluster(local_context, storage_snapshot, query_info.query);
if (optimized_cluster)
{
LOG_DEBUG(log, "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}",
makeFormattedListOfShards(optimized_cluster));
cluster = optimized_cluster;
query_info.optimized_cluster = cluster;
nodes = getClusterQueriedNodes(settings, cluster);
}
else
{
LOG_DEBUG(log, "Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the cluster{}",
has_sharding_key ? "" : " (no sharding key)");
}
}
}
@ -728,13 +740,36 @@ void StorageDistributed::read(
storage_snapshot,
processed_stage);
auto settings = local_context->getSettingsRef();
ClusterProxy::AdditionalShardFilterGenerator additional_shard_filter_generator;
if (query_info.use_custom_key)
{
if (auto custom_key_ast = parseCustomKeyForTable(settings.parallel_replicas_custom_key, *local_context))
{
if (query_info.getCluster()->getShardCount() == 1)
{
// we are reading from single shard with multiple replicas but didn't transform replicas
// into virtual shards with custom_key set
throw Exception(ErrorCodes::LOGICAL_ERROR, "Replicas weren't transformed into virtual shards");
}
additional_shard_filter_generator =
[&, custom_key_ast = std::move(custom_key_ast), shard_count = query_info.cluster->getShardCount()](uint64_t shard_num) -> ASTPtr
{
return getCustomKeyFilterForParallelReplica(
shard_count, shard_num - 1, custom_key_ast, settings.parallel_replicas_custom_key_filter_type, *this, local_context);
};
}
}
ClusterProxy::executeQuery(
query_plan, header, processed_stage,
main_table, remote_table_function_ptr,
select_stream_factory, log, modified_query_ast,
local_context, query_info,
sharding_key_expr, sharding_key_column_name,
query_info.cluster);
query_info.cluster, additional_shard_filter_generator);
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
if (!query_plan.isInitialized())

View File

@ -0,0 +1,50 @@
<clickhouse>
<remote_servers>
<test_multiple_shards_multiple_replicas>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>n1</host>
<port>9000</port>
</replica>
<replica>
<host>n2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
<replica>
<host>n4</host>
<port>9000</port>
</replica>
</shard>
</test_multiple_shards_multiple_replicas>
<test_single_shard_multiple_replicas>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>n1</host>
<port>9000</port>
</replica>
<replica>
<host>n2</host>
<port>9000</port>
</replica>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
<replica>
<host>n4</host>
<port>9000</port>
</replica>
</shard>
</test_single_shard_multiple_replicas>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,94 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
nodes = [
cluster.add_instance(
f"n{i}", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
for i in range(1, 5)
]
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def create_tables(cluster):
n1 = nodes[0]
n1.query("DROP TABLE IF EXISTS dist_table")
n1.query(f"DROP TABLE IF EXISTS test_table ON CLUSTER {cluster}")
n1.query(
f"CREATE TABLE test_table ON CLUSTER {cluster} (key Int32, value String) Engine=MergeTree ORDER BY (key, sipHash64(value))"
)
n1.query(
f"""
CREATE TABLE dist_table AS test_table
Engine=Distributed(
{cluster},
currentDatabase(),
test_table,
rand()
)
"""
)
def insert_data(cluster, row_num):
create_tables(cluster)
n1 = nodes[0]
n1.query(
f"INSERT INTO dist_table SELECT number % 4, number FROM numbers({row_num})"
)
n1.query("SYSTEM FLUSH DISTRIBUTED dist_table")
@pytest.mark.parametrize("custom_key", ["sipHash64(key)", "key"])
@pytest.mark.parametrize("filter_type", ["default", "range"])
@pytest.mark.parametrize(
"cluster",
["test_multiple_shards_multiple_replicas", "test_single_shard_multiple_replicas"],
)
def test_parallel_replicas_custom_key(start_cluster, cluster, custom_key, filter_type):
for node in nodes:
node.rotate_logs()
row_num = 1000
insert_data(cluster, row_num)
expected_result = ""
for i in range(4):
expected_result += f"{i}\t250\n"
n1 = nodes[0]
assert (
n1.query(
"SELECT key, count() FROM dist_table GROUP BY key ORDER BY key",
settings={
"prefer_localhost_replica": 0,
"max_parallel_replicas": 4,
"parallel_replicas_custom_key": custom_key,
"parallel_replicas_custom_key_filter_type": filter_type,
},
)
== expected_result
)
if cluster == "test_multiple_shards_multiple_replicas":
# we simply process query on all replicas for each shard by appending the filter on replica
assert all(
node.contains_in_log("Processing query on a replica using custom_key")
for node in nodes
)
else:
# we first transform all replicas into shards and then append for each shard filter
assert n1.contains_in_log(
"Single shard cluster used with custom_key, transforming replicas into virtual shards"
)

View File

@ -0,0 +1,173 @@
query='SELECT * FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), 02535_custom_key)' with custom_key='sipHash64(x)'
filter_type='default' max_replicas=1 prefer_localhost_replica=0
Hello
filter_type='default' max_replicas=2 prefer_localhost_replica=0
Hello
filter_type='default' max_replicas=3 prefer_localhost_replica=0
Hello
filter_type='range' max_replicas=1 prefer_localhost_replica=0
Hello
filter_type='range' max_replicas=2 prefer_localhost_replica=0
Hello
filter_type='range' max_replicas=3 prefer_localhost_replica=0
Hello
filter_type='default' max_replicas=1 prefer_localhost_replica=1
Hello
filter_type='default' max_replicas=2 prefer_localhost_replica=1
Hello
filter_type='default' max_replicas=3 prefer_localhost_replica=1
Hello
filter_type='range' max_replicas=1 prefer_localhost_replica=1
Hello
filter_type='range' max_replicas=2 prefer_localhost_replica=1
Hello
filter_type='range' max_replicas=3 prefer_localhost_replica=1
Hello
query='SELECT y, count() FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), 02535_custom_key) GROUP BY y ORDER BY y' with custom_key='y'
filter_type='default' max_replicas=1 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='default' max_replicas=2 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='default' max_replicas=3 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='range' max_replicas=1 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='range' max_replicas=2 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='range' max_replicas=3 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='default' max_replicas=1 prefer_localhost_replica=1
0 334
1 333
2 333
filter_type='default' max_replicas=2 prefer_localhost_replica=1
0 334
1 333
2 333
filter_type='default' max_replicas=3 prefer_localhost_replica=1
0 334
1 333
2 333
filter_type='range' max_replicas=1 prefer_localhost_replica=1
0 334
1 333
2 333
filter_type='range' max_replicas=2 prefer_localhost_replica=1
0 334
1 333
2 333
filter_type='range' max_replicas=3 prefer_localhost_replica=1
0 334
1 333
2 333
query='SELECT y, count() FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), 02535_custom_key) GROUP BY y ORDER BY y' with custom_key='cityHash64(y)'
filter_type='default' max_replicas=1 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='default' max_replicas=2 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='default' max_replicas=3 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='range' max_replicas=1 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='range' max_replicas=2 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='range' max_replicas=3 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='default' max_replicas=1 prefer_localhost_replica=1
0 334
1 333
2 333
filter_type='default' max_replicas=2 prefer_localhost_replica=1
0 334
1 333
2 333
filter_type='default' max_replicas=3 prefer_localhost_replica=1
0 334
1 333
2 333
filter_type='range' max_replicas=1 prefer_localhost_replica=1
0 334
1 333
2 333
filter_type='range' max_replicas=2 prefer_localhost_replica=1
0 334
1 333
2 333
filter_type='range' max_replicas=3 prefer_localhost_replica=1
0 334
1 333
2 333
query='SELECT y, count() FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), 02535_custom_key) GROUP BY y ORDER BY y' with custom_key='cityHash64(y) + 1'
filter_type='default' max_replicas=1 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='default' max_replicas=2 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='default' max_replicas=3 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='range' max_replicas=1 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='range' max_replicas=2 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='range' max_replicas=3 prefer_localhost_replica=0
0 334
1 333
2 333
filter_type='default' max_replicas=1 prefer_localhost_replica=1
0 334
1 333
2 333
filter_type='default' max_replicas=2 prefer_localhost_replica=1
0 334
1 333
2 333
filter_type='default' max_replicas=3 prefer_localhost_replica=1
0 334
1 333
2 333
filter_type='range' max_replicas=1 prefer_localhost_replica=1
0 334
1 333
2 333
filter_type='range' max_replicas=2 prefer_localhost_replica=1
0 334
1 333
2 333
filter_type='range' max_replicas=3 prefer_localhost_replica=1
0 334
1 333
2 333
1

View File

@ -0,0 +1,46 @@
#!/usr/bin/env bash
# Tags: no-parallel, long
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function run_with_custom_key {
echo "query='$1' with custom_key='$2'"
for prefer_localhost_replica in 0 1; do
for filter_type in 'default' 'range'; do
for max_replicas in {1..3}; do
echo "filter_type='$filter_type' max_replicas=$max_replicas prefer_localhost_replica=$prefer_localhost_replica"
query="$1 SETTINGS max_parallel_replicas=$max_replicas\
, parallel_replicas_custom_key='$2'\
, parallel_replicas_custom_key_filter_type='$filter_type'\
, prefer_localhost_replica=$prefer_localhost_replica"
$CLICKHOUSE_CLIENT --query="$query"
done
done
done
}
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS 02535_custom_key";
$CLICKHOUSE_CLIENT --query="CREATE TABLE 02535_custom_key (x String) ENGINE = MergeTree ORDER BY x";
$CLICKHOUSE_CLIENT --query="INSERT INTO 02535_custom_key VALUES ('Hello')";
run_with_custom_key "SELECT * FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), 02535_custom_key)" "sipHash64(x)"
$CLICKHOUSE_CLIENT --query="DROP TABLE 02535_custom_key"
$CLICKHOUSE_CLIENT --query="CREATE TABLE 02535_custom_key (x String, y Int32) ENGINE = MergeTree ORDER BY cityHash64(x)"
$CLICKHOUSE_CLIENT --query="INSERT INTO 02535_custom_key SELECT toString(number), number % 3 FROM numbers(1000)"
function run_count_with_custom_key {
run_with_custom_key "SELECT y, count() FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), 02535_custom_key) GROUP BY y ORDER BY y" "$1"
}
run_count_with_custom_key "y"
run_count_with_custom_key "cityHash64(y)"
run_count_with_custom_key "cityHash64(y) + 1"
$CLICKHOUSE_CLIENT --query="SELECT count() FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), 02535_custom_key) as t1 JOIN 02535_custom_key USING y" --parallel_replicas_custom_key="y" --send_logs_level="trace" 2>&1 | grep -Fac "Joins are not supported with parallel replicas"
$CLICKHOUSE_CLIENT --query="DROP TABLE 02535_custom_key"