Fixes for parallel replicas (#50195)

This commit is contained in:
Nikita Mikhaylov 2023-05-25 14:41:04 +02:00 committed by GitHub
parent 5c59d9a0ed
commit 1c3b6738f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 113 additions and 76 deletions

View File

@ -174,7 +174,7 @@ void HedgedConnections::sendQuery(
modified_settings.group_by_two_level_threshold_bytes = 0;
}
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && !settings.allow_experimental_parallel_reading_from_replicas;
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas == 0;
if (offset_states.size() > 1 && enable_sample_offset_parallel_processing)
{

View File

@ -142,7 +142,7 @@ void MultiplexedConnections::sendQuery(
}
}
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && !settings.allow_experimental_parallel_reading_from_replicas;
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas == 0;
size_t num_replicas = replica_states.size();
if (num_replicas > 1)

View File

@ -154,7 +154,7 @@ class IColumn;
M(ParallelReplicasCustomKeyFilterType, parallel_replicas_custom_key_filter_type, ParallelReplicasCustomKeyFilterType::DEFAULT, "Type of filter to use with custom key for parallel replicas. default - use modulo operation on the custom key, range - use range filter on custom key using all possible values for the value type of custom key.", 0) \
\
M(String, cluster_for_parallel_replicas, "default", "Cluster for a shard in which current server is located", 0) \
M(Bool, allow_experimental_parallel_reading_from_replicas, false, "If true, ClickHouse will send a SELECT query to all replicas of a table. It will work for any kind on MergeTree table.", 0) \
M(UInt64, allow_experimental_parallel_reading_from_replicas, 0, "Use all the replicas from a shard for SELECT query execution. Reading is parallelized and coordinated dynamically. 0 - disabled, 1 - enabled, silently disable them in case of failure, 2 - enabled, throw an exception in case of failure", 0) \
M(Float, parallel_replicas_single_task_marks_count_multiplier, 2, "A multiplier which will be added during calculation for minimal number of marks to retrieve from coordinator. This will be applied only for remote replicas.", 0) \
M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \
\

View File

@ -4324,7 +4324,7 @@ Context::ParallelReplicasMode Context::getParallelReplicasMode() const
if (!settings_.parallel_replicas_custom_key.value.empty())
return CUSTOM_KEY;
if (settings_.allow_experimental_parallel_reading_from_replicas
if (settings_.allow_experimental_parallel_reading_from_replicas > 0
&& !settings_.use_hedged_requests)
return READ_TASKS;

View File

@ -116,6 +116,7 @@ namespace ErrorCodes
extern const int ACCESS_DENIED;
extern const int UNKNOWN_IDENTIFIER;
extern const int BAD_ARGUMENTS;
extern const int SUPPORT_IS_DISABLED;
}
/// Assumes `storage` is set and the table filter (row-level security) is not empty.
@ -409,6 +410,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
ApplyWithSubqueryVisitor().visit(query_ptr);
}
query_info.query = query_ptr->clone();
query_info.original_query = query_ptr->clone();
if (settings.count_distinct_optimization)
@ -456,25 +458,35 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}
if (joined_tables.tablesCount() > 1 && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas))
/// Check support for JOINs for parallel replicas
if (joined_tables.tablesCount() > 1 && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas > 0))
{
LOG_WARNING(log, "Joins are not supported with parallel replicas. Query will be executed without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
context->setSetting("parallel_replicas_custom_key", String{""});
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_WARNING(log, "JOINs are not supported with parallel replicas. Query will be executed without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("parallel_replicas_custom_key", String{""});
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "JOINs are not supported with parallel replicas");
}
}
/// Try to execute query without parallel replicas if we find that there is a FINAL modifier there.
bool is_query_with_final = false;
if (query_info.table_expression_modifiers)
is_query_with_final = query_info.table_expression_modifiers->hasFinal();
else if (query_info.query)
is_query_with_final = query_info.query->as<ASTSelectQuery &>().final();
if (is_query_with_final && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas))
/// Check support for FINAL for parallel replicas
bool is_query_with_final = isQueryWithFinal(query_info);
if (is_query_with_final && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas > 0))
{
LOG_WARNING(log, "FINAL modifier is supported with parallel replicas. Will try to execute the query without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
context->setSetting("parallel_replicas_custom_key", String{""});
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_WARNING(log, "FINAL modifier is not supported with parallel replicas. Query will be executed without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("parallel_replicas_custom_key", String{""});
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FINAL modifier is not supported with parallel replicas");
}
}
/// Rewrite JOINs
@ -3134,4 +3146,14 @@ void InterpreterSelectQuery::initSettings()
}
}
bool InterpreterSelectQuery::isQueryWithFinal(const SelectQueryInfo & info)
{
bool result = info.query->as<ASTSelectQuery &>().final();
if (info.table_expression_modifiers)
result |= info.table_expression_modifiers->hasFinal();
return result;
}
}

View File

@ -131,6 +131,8 @@ public:
static SortDescription getSortDescription(const ASTSelectQuery & query, const ContextPtr & context);
static UInt64 getLimitForSorting(const ASTSelectQuery & query, const ContextPtr & context);
static bool isQueryWithFinal(const SelectQueryInfo & info);
private:
InterpreterSelectQuery(
const ASTPtr & query_ptr_,

View File

@ -113,7 +113,7 @@ std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
}
/// We don't want to execute reading for subqueries in parallel
subquery_context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
subquery_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
return std::make_shared<InterpreterSelectWithUnionQuery>(query, subquery_context, subquery_options, required_source_columns);
}

View File

@ -83,6 +83,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int TOO_DEEP_SUBQUERIES;
extern const int NOT_IMPLEMENTED;
extern const int SUPPORT_IS_DISABLED;
}
/** ClickHouse query planner.
@ -1192,16 +1193,25 @@ void Planner::buildPlanForQueryNode()
const auto & settings = query_context->getSettingsRef();
if (planner_context->getTableExpressionNodeToData().size() > 1
&& (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas))
&& (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas > 0))
{
LOG_WARNING(
&Poco::Logger::get("Planner"), "Joins are not supported with parallel replicas. Query will be executed without using them.");
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_WARNING(
&Poco::Logger::get("Planner"), "JOINs are not supported with parallel replicas. Query will be executed without using them.");
auto & mutable_context = planner_context->getMutableQueryContext();
mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
mutable_context->setSetting("parallel_replicas_custom_key", String{""});
auto & mutable_context = planner_context->getMutableQueryContext();
mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
mutable_context->setSetting("parallel_replicas_custom_key", String{""});
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "JOINs are not supported with parallel replicas");
}
}
/// TODO: Also disable parallel replicas in case of FINAL
auto top_level_identifiers = collectTopLevelColumnIdentifiers(query_tree, planner_context);
auto join_tree_query_plan = buildJoinTreeQueryPlan(query_tree,
select_query_info,

View File

@ -3,6 +3,7 @@
#include <IO/Operators.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/TreeRewriter.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
@ -99,7 +100,6 @@ namespace ErrorCodes
extern const int INDEX_NOT_USED;
extern const int LOGICAL_ERROR;
extern const int TOO_MANY_ROWS;
extern const int SUPPORT_IS_DISABLED;
}
static MergeTreeReaderSettings getMergeTreeReaderSettings(
@ -1314,7 +1314,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
auto reader_settings = getMergeTreeReaderSettings(context, query_info);
bool use_skip_indexes = settings.use_skip_indexes;
bool final = isFinal(query_info);
bool final = InterpreterSelectQuery::isQueryWithFinal(query_info);
if (final && !settings.use_skip_indexes_if_final)
use_skip_indexes = false;
@ -1377,7 +1377,7 @@ bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
/// Disable read-in-order optimization for reverse order with final.
/// Otherwise, it can lead to incorrect final behavior because the implementation may rely on the reading in direct order).
if (direction != 1 && isFinal(query_info))
if (direction != 1 && isQueryWithFinal())
return false;
auto order_info = std::make_shared<InputOrderInfo>(SortDescription{}, prefix_size, direction, limit);
@ -1500,11 +1500,7 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const
bool ReadFromMergeTree::isQueryWithFinal() const
{
const auto & select = query_info.query->as<ASTSelectQuery &>();
if (query_info.table_expression_modifiers)
return query_info.table_expression_modifiers->hasFinal();
else
return select.final();
return InterpreterSelectQuery::isQueryWithFinal(query_info);
}
bool ReadFromMergeTree::isQueryWithSampling() const
@ -1522,7 +1518,7 @@ bool ReadFromMergeTree::isQueryWithSampling() const
Pipe ReadFromMergeTree::spreadMarkRanges(
RangesInDataParts && parts_with_ranges, size_t num_streams, AnalysisResult & result, ActionsDAGPtr & result_projection)
{
bool final = isQueryWithFinal();
const bool final = isQueryWithFinal();
const auto & input_order_info = query_info.getInputOrderInfo();
Names column_names_to_read = result.column_names_to_read;
@ -1539,8 +1535,7 @@ Pipe ReadFromMergeTree::spreadMarkRanges(
if (final)
{
if (is_parallel_reading_from_replicas)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FINAL modifier is not supported with parallel replicas");
chassert(!is_parallel_reading_from_replicas);
if (output_each_partition_through_separate_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Optimisation isn't supposed to be used for queries with final");
@ -1960,15 +1955,6 @@ void ReadFromMergeTree::describeIndexes(JSONBuilder::JSONMap & map) const
}
}
bool ReadFromMergeTree::isFinal(const SelectQueryInfo & query_info)
{
if (query_info.table_expression_modifiers)
return query_info.table_expression_modifiers->hasFinal();
const auto & select = query_info.query->as<ASTSelectQuery &>();
return select.final();
}
bool MergeTreeDataSelectAnalysisResult::error() const
{
return std::holds_alternative<std::exception_ptr>(result);

View File

@ -159,7 +159,6 @@ public:
void updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info_value);
static bool isFinal(const SelectQueryInfo & query_info);
bool isQueryWithFinal() const;
bool isQueryWithSampling() const;

View File

@ -7198,9 +7198,17 @@ QueryProcessingStage::Enum MergeTreeData::getQueryProcessingStage(
if (query_context->getClientInfo().collaborate_with_initiator)
return QueryProcessingStage::Enum::FetchColumns;
if (query_context->canUseParallelReplicasOnInitiator()
&& to_stage >= QueryProcessingStage::WithMergeableState)
return QueryProcessingStage::Enum::WithMergeableState;
/// Parallel replicas
if (query_context->canUseParallelReplicasOnInitiator() && to_stage >= QueryProcessingStage::WithMergeableState)
{
/// ReplicatedMergeTree
if (supportsReplication())
return QueryProcessingStage::Enum::WithMergeableState;
/// For non-replicated MergeTree we allow them only if parallel_replicas_for_non_replicated_merge_tree is enabled
if (query_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree)
return QueryProcessingStage::Enum::WithMergeableState;
}
if (to_stage >= QueryProcessingStage::Enum::WithMergeableState)
{

View File

@ -145,7 +145,6 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int INFINITE_LOOP;
extern const int ILLEGAL_FINAL;
extern const int TYPE_MISMATCH;
extern const int TOO_MANY_ROWS;
extern const int UNABLE_TO_SKIP_UNUSED_SHARDS;
@ -1045,10 +1044,6 @@ void StorageDistributed::read(
const size_t /*max_block_size*/,
const size_t /*num_streams*/)
{
const auto * select_query = query_info.query->as<ASTSelectQuery>();
if (select_query->final() && local_context->getSettingsRef().allow_experimental_parallel_reading_from_replicas)
throw Exception(ErrorCodes::ILLEGAL_FINAL, "Final modifier is not allowed together with parallel reading from replicas feature");
Block header;
ASTPtr query_ast;

View File

@ -633,10 +633,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
auto & modified_select = modified_query_info.query->as<ASTSelectQuery &>();
QueryPipelineBuilderPtr builder;
bool final = isFinal(modified_query_info);
if (!final && storage->needRewriteQueryWithFinal(real_column_names))
if (!InterpreterSelectQuery::isQueryWithFinal(modified_query_info) && storage->needRewriteQueryWithFinal(real_column_names))
{
/// NOTE: It may not work correctly in some cases, because query was analyzed without final.
/// However, it's needed for MaterializedMySQL and it's unlikely that someone will use it with Merge tables.
@ -1010,21 +1007,13 @@ bool ReadFromMerge::requestReadingInOrder(InputOrderInfoPtr order_info_)
{
/// Disable read-in-order optimization for reverse order with final.
/// Otherwise, it can lead to incorrect final behavior because the implementation may rely on the reading in direct order).
if (order_info_->direction != 1 && isFinal(query_info))
if (order_info_->direction != 1 && InterpreterSelectQuery::isQueryWithFinal(query_info))
return false;
order_info = order_info_;
return true;
}
bool ReadFromMerge::isFinal(const SelectQueryInfo & query_info)
{
if (query_info.table_expression_modifiers)
return query_info.table_expression_modifiers->hasFinal();
const auto & select_query = query_info.query->as<ASTSelectQuery &>();
return select_query.final();
}
IStorage::ColumnSizeByName StorageMerge::getColumnSizes() const
{
ColumnSizeByName column_sizes;

View File

@ -145,7 +145,6 @@ public:
/// Returns `false` if requested reading cannot be performed.
bool requestReadingInOrder(InputOrderInfoPtr order_info_);
static bool isFinal(const SelectQueryInfo & query_info);
private:
const size_t required_max_block_size;

View File

@ -139,3 +139,6 @@
02703_row_policy_for_database
02721_url_cluster
02534_s3_cluster_insert_select_schema_inference
02764_parallel_replicas_plain_merge_tree
02765_parallel_replicas_final_modifier

View File

@ -41,6 +41,6 @@ run_count_with_custom_key "y"
run_count_with_custom_key "cityHash64(y)"
run_count_with_custom_key "cityHash64(y) + 1"
$CLICKHOUSE_CLIENT --query="SELECT count() FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), 02535_custom_key) as t1 JOIN 02535_custom_key USING y" --parallel_replicas_custom_key="y" --send_logs_level="trace" 2>&1 | grep -Fac "Joins are not supported with parallel replicas"
$CLICKHOUSE_CLIENT --query="SELECT count() FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), 02535_custom_key) as t1 JOIN 02535_custom_key USING y" --parallel_replicas_custom_key="y" --send_logs_level="trace" 2>&1 | grep -Fac "JOINs are not supported with parallel replicas"
$CLICKHOUSE_CLIENT --query="DROP TABLE 02535_custom_key"

View File

@ -1,5 +0,0 @@
DROP TABLE IF EXISTS t_02709;
CREATE TABLE t_02709 (key UInt32, sign Int8, date Datetime) ENGINE=CollapsingMergeTree(sign) PARTITION BY date ORDER BY key;
INSERT INTO t_02709 VALUES (1, 1, '2020-01-01'), (2, 1, '2020-01-02'), (1, -1, '2020-01-01'), (2, -1, '2020-01-02'), (1, 1, '2020-01-01');
SELECT * FROM t_02709 FINAL ORDER BY key SETTINGS max_parallel_replicas=3, allow_experimental_parallel_reading_from_replicas=1, use_hedged_requests=0, cluster_for_parallel_replicas='parallel_replicas';
DROP TABLE t_02709;

View File

@ -0,0 +1,16 @@
CREATE TABLE IF NOT EXISTS parallel_replicas_plain (x String) ENGINE=MergeTree() ORDER BY x;
INSERT INTO parallel_replicas_plain SELECT toString(number) FROM numbers(10);
SET max_parallel_replicas=3, allow_experimental_parallel_reading_from_replicas=1, use_hedged_requests=0, cluster_for_parallel_replicas='parallel_replicas';
SET parallel_replicas_for_non_replicated_merge_tree = 0;
SELECT x FROM parallel_replicas_plain LIMIT 1 FORMAT Null;
SELECT max(length(x)) FROM parallel_replicas_plain FORMAT Null;
SET parallel_replicas_for_non_replicated_merge_tree = 1;
SELECT x FROM parallel_replicas_plain LIMIT 1 FORMAT Null;
SELECT max(length(x)) FROM parallel_replicas_plain FORMAT Null;
DROP TABLE IF EXISTS parallel_replicas_plain;

View File

@ -0,0 +1,14 @@
CREATE TABLE IF NOT EXISTS parallel_replicas_final (x String) ENGINE=ReplacingMergeTree() ORDER BY x;
INSERT INTO parallel_replicas_final SELECT toString(number) FROM numbers(10);
SET max_parallel_replicas=3, allow_experimental_parallel_reading_from_replicas=1, use_hedged_requests=0, cluster_for_parallel_replicas='parallel_replicas';
SET parallel_replicas_for_non_replicated_merge_tree = 1;
SELECT * FROM parallel_replicas_final FINAL FORMAT Null;
SET allow_experimental_parallel_reading_from_replicas=2;
SELECT * FROM parallel_replicas_final FINAL FORMAT Null; -- { serverError SUPPORT_IS_DISABLED }
DROP TABLE IF EXISTS parallel_replicas_final;