This commit is contained in:
Nikita Mikhaylov 2023-04-05 15:19:45 +00:00
parent 7b526d3df9
commit 3f44c6d936
7 changed files with 27 additions and 3 deletions

View File

@ -462,6 +462,20 @@ InterpreterSelectQuery::InterpreterSelectQuery(
context->setSetting("parallel_replicas_custom_key", String{""});
}
/// Try to execute query without parallel replicas if we find that there is a FINAL modifier there.
bool is_query_with_final = false;
if (query_info.table_expression_modifiers)
is_query_with_final = query_info.table_expression_modifiers->hasFinal();
else if (query_info.query)
is_query_with_final = query_info.query->as<ASTSelectQuery &>().final();
if (is_query_with_final && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas))
{
LOG_WARNING(log, "FINAL modifier is supported with parallel replicas. Will try to execute the query without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
context->setSetting("parallel_replicas_custom_key", String{""});
}
/// Rewrite JOINs
if (!has_input && joined_tables.tablesCount() > 1)
{

View File

@ -99,6 +99,7 @@ namespace ErrorCodes
extern const int INDEX_NOT_USED;
extern const int LOGICAL_ERROR;
extern const int TOO_MANY_ROWS;
extern const int SUPPORT_IS_DISABLED;
}
static MergeTreeReaderSettings getMergeTreeReaderSettings(
@ -1539,7 +1540,7 @@ Pipe ReadFromMergeTree::spreadMarkRanges(
if (final)
{
if (is_parallel_reading_from_replicas)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Final modifier is not supported with parallel replicas");
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FINAL modifier is not supported with parallel replicas");
if (output_each_partition_through_separate_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Optimisation isn't supposed to be used for queries with final");

View File

@ -6939,8 +6939,7 @@ QueryProcessingStage::Enum MergeTreeData::getQueryProcessingStage(
if (query_context->getClientInfo().collaborate_with_initiator)
return QueryProcessingStage::Enum::FetchColumns;
if (query_context->getSettingsRef().allow_experimental_parallel_reading_from_replicas
&& !query_context->getClientInfo().collaborate_with_initiator
if (query_context->canUseParallelReplicasOnInitiator()
&& to_stage >= QueryProcessingStage::WithMergeableState)
return QueryProcessingStage::Enum::WithMergeableState;

View File

@ -0,0 +1,3 @@
CREATE TABLE IF NOT EXISTS t_02708(x DateTime) ENGINE = MergeTree ORDER BY tuple();
SELECT count() FROM t_02708 SETTINGS allow_experimental_parallel_reading_from_replicas=1;
DROP TABLE t_02708;

View File

@ -0,0 +1 @@
1 1 2020-01-01 00:00:00

View File

@ -0,0 +1,5 @@
DROP TABLE IF EXISTS t_02709;
CREATE TABLE t_02709 (key UInt32, sign Int8, date Datetime) ENGINE=CollapsingMergeTree(sign) PARTITION BY date ORDER BY key;
INSERT INTO t_02709 VALUES (1, 1, '2020-01-01'), (2, 1, '2020-01-02'), (1, -1, '2020-01-01'), (2, -1, '2020-01-02'), (1, 1, '2020-01-01');
SELECT * FROM t_02709 FINAL ORDER BY key SETTINGS max_parallel_replicas=3, allow_experimental_parallel_reading_from_replicas=1, use_hedged_requests=0, cluster_for_parallel_replicas='parallel_replicas';
DROP TABLE t_02709;