This commit is contained in:
Antonio Andelic 2023-01-11 09:57:13 +00:00
parent 347ccab8f9
commit 19e5988d33
3 changed files with 61 additions and 64 deletions

View File

@ -510,10 +510,37 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query_info.additional_filter_ast = parseAdditionalFilterConditionForTable(
settings.additional_table_filters, joined_tables.tablesWithColumns().front().table, *context);
if (settings.parallel_replicas_mode == ParallelReplicasMode::CUSTOM_KEY && !settings.parallel_replicas_custom_key.value.empty())
ASTPtr parallel_replicas_custom_filter_ast = nullptr;
if (settings.parallel_replicas_count > 1 && settings.parallel_replicas_mode == ParallelReplicasMode::CUSTOM_KEY)
{
query_info.parallel_replica_custom_key_ast = parseParallelReplicaCustomKey(
if (settings.parallel_replicas_custom_key.value.empty())
throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Parallel replicas mode set to 'custom_key' but 'parallel_replicas_custom_key' has no value");
auto custom_key_ast = parseParallelReplicaCustomKey(
settings.parallel_replicas_custom_key, *context);
// first we do modulo with replica count
ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(custom_key_ast);
args->children.push_back(std::make_shared<ASTLiteral>(settings.parallel_replicas_count.value));
auto modulo_function = std::make_shared<ASTFunction>();
modulo_function->name = "positiveModulo";
modulo_function->arguments = args;
modulo_function->children.push_back(modulo_function->arguments);
/// then we compare result to the current replica number (offset)
args = std::make_shared<ASTExpressionList>();
args->children.push_back(modulo_function);
args->children.push_back(std::make_shared<ASTLiteral>(settings.parallel_replica_offset.value));
auto equals_function = std::make_shared<ASTFunction>();
equals_function->name = "equals";
equals_function->arguments = args;
equals_function->children.push_back(equals_function->arguments);
parallel_replicas_custom_filter_ast = equals_function;
}
auto analyze = [&] (bool try_move_to_prewhere)
@ -653,6 +680,16 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query_info.filter_asts.push_back(query_info.additional_filter_ast);
}
if (parallel_replicas_custom_filter_ast)
{
custom_key_filter_info = generateFilterActions(
table_id, parallel_replicas_custom_filter_ast, context, storage, storage_snapshot, metadata_snapshot, required_columns,
prepared_sets);
custom_key_filter_info->do_remove_column = true;
query_info.filter_asts.push_back(parallel_replicas_custom_filter_ast);
}
source_header = storage_snapshot->getSampleBlockForColumns(required_columns);
}
@ -1396,17 +1433,23 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
query_plan.addStep(std::move(row_level_security_step));
}
if (additional_filter_info)
const auto add_filter_step = [&](const auto & new_filter_info, const std::string & description)
{
auto additional_filter_step = std::make_unique<FilterStep>(
auto filter_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
additional_filter_info->actions,
additional_filter_info->column_name,
additional_filter_info->do_remove_column);
new_filter_info->actions,
new_filter_info->column_name,
new_filter_info->do_remove_column);
additional_filter_step->setStepDescription("Additional filter");
query_plan.addStep(std::move(additional_filter_step));
}
filter_step->setStepDescription(description);
query_plan.addStep(std::move(filter_step));
};
if (additional_filter_info)
add_filter_step(additional_filter_info, "Additional filter");
if (custom_key_filter_info)
add_filter_step(custom_key_filter_info, "Paralel replica custom key filter");
if (expressions.before_array_join)
{

View File

@ -224,6 +224,8 @@ private:
/// For additional_filter setting.
FilterDAGInfoPtr additional_filter_info;
FilterDAGInfoPtr custom_key_filter_info;
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
/// List of columns to read to execute the query.

View File

@ -34,7 +34,6 @@
#include <Processors/Transforms/AggregatingTransform.h>
#include <Core/UUID.h>
#include <Core/SettingsEnums.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeUUID.h>
@ -45,8 +44,6 @@
#include <IO/WriteBufferFromOStream.h>
#include <Storages/MergeTree/CommonANNIndexes.h>
#include <Storages/KeyDescription.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
{
@ -64,7 +61,6 @@ namespace ErrorCodes
extern const int DUPLICATED_PART_UUIDS;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int PROJECTION_NOT_USED;
extern const int BAD_ARGUMENTS;
}
@ -474,25 +470,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
return plan;
}
namespace
{
bool supportsSamplingForParallelReplicas(const SelectQueryInfo & select_query_info, const MergeTreeData & data, const Settings & settings)
{
if (settings.parallel_replicas_mode == ParallelReplicasMode::CUSTOM_KEY)
{
/// maybe just don't use sampling or try to fallback to SAMPLE_KEY?
if (select_query_info.parallel_replica_custom_key_ast == nullptr)
throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Parallel replicas mode set to 'custom_key' but no 'parallel_replicas_custom_key' defined");
return true;
}
return data.supportsSampling();
}
}
MergeTreeDataSelectSamplingData MergeTreeDataSelectExecutor::getSampling(
const SelectQueryInfo & select_query_info,
NamesAndTypesList available_real_columns,
@ -610,10 +587,9 @@ MergeTreeDataSelectSamplingData MergeTreeDataSelectExecutor::getSampling(
* It is also important that the entire universe can be covered using SAMPLE 0.1 OFFSET 0, ... OFFSET 0.9 and similar decimals.
*/
bool supports_sampling_for_parallel_replicas = supportsSamplingForParallelReplicas(select_query_info, data, settings);
/// Parallel replicas has been requested but there is no way to sample data.
/// Select all data from first replica and no data from other replicas.
if (settings.parallel_replicas_count > 1 && !supports_sampling_for_parallel_replicas && settings.parallel_replica_offset > 0)
if (settings.parallel_replicas_count > 1 && !data.supportsSampling() && settings.parallel_replica_offset > 0)
{
LOG_DEBUG(log, "Will use no data on this replica because parallel replicas processing has been requested"
" (the setting 'max_parallel_replicas') but the table does not support sampling and this replica is not the first.");
@ -621,34 +597,16 @@ MergeTreeDataSelectSamplingData MergeTreeDataSelectExecutor::getSampling(
return sampling;
}
sampling.use_sampling = relative_sample_size > 0 || (settings.parallel_replicas_count > 1 && supports_sampling_for_parallel_replicas);
sampling.use_sampling = relative_sample_size > 0 || (settings.parallel_replicas_count > 1 && settings.parallel_replicas_mode == ParallelReplicasMode::SAMPLE_KEY && data.supportsSampling());
bool no_data = false; /// There is nothing left after sampling.
std::optional<KeyDescription> parallel_replicas_custom_key_description;
const auto get_sampling_key = [&]() -> const KeyDescription &
{
if (settings.parallel_replicas_count > 1 && settings.parallel_replicas_mode == ParallelReplicasMode::CUSTOM_KEY)
{
assert(select_query_info.parallel_replica_custom_key_ast);
LOG_INFO(log, "Using custom key for sampling while processing with multiple replicas");
if (!parallel_replicas_custom_key_description)
parallel_replicas_custom_key_description = KeyDescription::getKeyFromAST(select_query_info.parallel_replica_custom_key_ast, metadata_snapshot->columns, context);
return *parallel_replicas_custom_key_description;
}
return metadata_snapshot->getSamplingKey();
};
if (sampling.use_sampling)
{
if (sample_factor_column_queried && relative_sample_size != RelativeSize(0))
sampling.used_sample_factor = 1.0 / boost::rational_cast<Float64>(relative_sample_size);
RelativeSize size_of_universum = 0;
const auto & sampling_key = get_sampling_key();
const auto & sampling_key = metadata_snapshot->getSamplingKey();
DataTypePtr sampling_column_type = sampling_key.data_types[0];
if (sampling_key.data_types.size() == 1)
@ -723,7 +681,7 @@ MergeTreeDataSelectSamplingData MergeTreeDataSelectExecutor::getSampling(
/// If sample and final are used together no need to calculate sampling expression twice.
/// The first time it was calculated for final, because sample key is a part of the PK.
/// So, assume that we already have calculated column.
ASTPtr sampling_key_ast = sampling_key.definition_ast;
ASTPtr sampling_key_ast = metadata_snapshot->getSamplingKeyAST();
if (final)
{
@ -735,11 +693,8 @@ MergeTreeDataSelectSamplingData MergeTreeDataSelectExecutor::getSampling(
if (has_lower_limit)
{
if (!key_condition.addCondition(
sampling_key.column_names[0], Range::createLeftBounded(lower, true, sampling_key.data_types[0]->isNullable()))
&& (settings.parallel_replicas_count <= 1 || settings.parallel_replicas_mode == ParallelReplicasMode::SAMPLE_KEY))
{
sampling_key.column_names[0], Range::createLeftBounded(lower, true, sampling_key.data_types[0]->isNullable())))
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
}
ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(sampling_key_ast);
@ -756,11 +711,8 @@ MergeTreeDataSelectSamplingData MergeTreeDataSelectExecutor::getSampling(
if (has_upper_limit)
{
if (!key_condition.addCondition(
sampling_key.column_names[0], Range::createRightBounded(upper, false, sampling_key.data_types[0]->isNullable()))
&& (settings.parallel_replicas_count <= 1 || settings.parallel_replicas_mode == ParallelReplicasMode::SAMPLE_KEY))
{
sampling_key.column_names[0], Range::createRightBounded(upper, false, sampling_key.data_types[0]->isNullable())))
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
}
ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(sampling_key_ast);