mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge remote-tracking branch 'origin/master' into optimize-tests-2
This commit is contained in:
commit
85d2f73a1a
@ -647,7 +647,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
|
||||
auto table_expression_query_info = select_query_info;
|
||||
table_expression_query_info.table_expression = table_expression;
|
||||
table_expression_query_info.filter_actions_dag = table_expression_data.getFilterActions();
|
||||
table_expression_query_info.analyzer_can_use_parallel_replicas_on_follower = table_node == planner_context->getGlobalPlannerContext()->parallel_replicas_table;
|
||||
table_expression_query_info.current_table_chosen_for_reading_with_parallel_replicas
|
||||
= table_node == planner_context->getGlobalPlannerContext()->parallel_replicas_table;
|
||||
|
||||
size_t max_streams = settings.max_threads;
|
||||
size_t max_threads_execute_query = settings.max_threads;
|
||||
@ -862,6 +863,15 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
|
||||
from_stage = storage->getQueryProcessingStage(
|
||||
query_context, select_query_options.to_stage, storage_snapshot, table_expression_query_info);
|
||||
|
||||
/// It is just a safety check needed until we have a proper sending plan to replicas.
|
||||
/// If we have a non-trivial storage like View it might create its own Planner inside read(), run findTableForParallelReplicas()
|
||||
/// and find some other table that might be used for reading with parallel replicas. It will lead to errors.
|
||||
const bool other_table_already_chosen_for_reading_with_parallel_replicas
|
||||
= planner_context->getGlobalPlannerContext()->parallel_replicas_table
|
||||
&& !table_expression_query_info.current_table_chosen_for_reading_with_parallel_replicas;
|
||||
if (other_table_already_chosen_for_reading_with_parallel_replicas)
|
||||
planner_context->getMutableQueryContext()->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
|
||||
|
||||
storage->read(
|
||||
query_plan,
|
||||
columns_names,
|
||||
|
@ -444,6 +444,9 @@ void DefaultCoordinator::doHandleInitialAllRangesAnnouncement(InitialAllRangesAn
|
||||
ErrorCodes::LOGICAL_ERROR, "Replica number ({}) is bigger than total replicas count ({})", replica_num, stats.size());
|
||||
|
||||
++stats[replica_num].number_of_requests;
|
||||
|
||||
if (replica_status[replica_num].is_announcement_received)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Duplicate announcement received for replica number {}", replica_num);
|
||||
replica_status[replica_num].is_announcement_received = true;
|
||||
|
||||
LOG_DEBUG(log, "Sent initial requests: {} Replicas count: {}", sent_initial_requests, replicas_count);
|
||||
|
@ -166,7 +166,7 @@ struct SelectQueryInfo
|
||||
/// It's guaranteed to be present in JOIN TREE of `query_tree`
|
||||
QueryTreeNodePtr table_expression;
|
||||
|
||||
bool analyzer_can_use_parallel_replicas_on_follower = false;
|
||||
bool current_table_chosen_for_reading_with_parallel_replicas = false;
|
||||
|
||||
/// Table expression modifiers for storage
|
||||
std::optional<TableExpressionModifiers> table_expression_modifiers;
|
||||
|
@ -252,7 +252,7 @@ void StorageMergeTree::read(
|
||||
|
||||
const bool enable_parallel_reading = local_context->canUseParallelReplicasOnFollower()
|
||||
&& local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree
|
||||
&& (!local_context->getSettingsRef().allow_experimental_analyzer || query_info.analyzer_can_use_parallel_replicas_on_follower);
|
||||
&& (!local_context->getSettingsRef().allow_experimental_analyzer || query_info.current_table_chosen_for_reading_with_parallel_replicas);
|
||||
|
||||
if (auto plan = reader.read(
|
||||
column_names,
|
||||
|
@ -5540,7 +5540,8 @@ void StorageReplicatedMergeTree::readLocalImpl(
|
||||
const size_t num_streams)
|
||||
{
|
||||
const bool enable_parallel_reading = local_context->canUseParallelReplicasOnFollower()
|
||||
&& (!local_context->getSettingsRef().allow_experimental_analyzer || query_info.analyzer_can_use_parallel_replicas_on_follower);
|
||||
&& (!local_context->getSettingsRef().allow_experimental_analyzer
|
||||
|| query_info.current_table_chosen_for_reading_with_parallel_replicas);
|
||||
|
||||
auto plan = reader.read(
|
||||
column_names, storage_snapshot, query_info,
|
||||
|
@ -0,0 +1,10 @@
|
||||
a1451105-722e-4fe7-bfaa-65ad2ae249c2 whatever
|
||||
a1451105-722e-4fe7-bfaa-65ad2ae249c2 whatever
|
||||
---------------------------
|
||||
a1451105-722e-4fe7-bfaa-65ad2ae249c2
|
||||
a1451105-722e-4fe7-bfaa-65ad2ae249c2
|
||||
---------------------------
|
||||
a1451105-722e-4fe7-bfaa-65ad2ae249c2
|
||||
---------------------------
|
||||
a1451105-722e-4fe7-bfaa-65ad2ae249c2
|
||||
a1451105-722e-4fe7-bfaa-65ad2ae249c2
|
83
tests/queries/0_stateless/03173_parallel_replicas_join_bug.sh
Executable file
83
tests/queries/0_stateless/03173_parallel_replicas_join_bug.sh
Executable file
@ -0,0 +1,83 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
|
||||
$CLICKHOUSE_CLIENT -nq "
|
||||
CREATE TABLE ids (id UUID, whatever String) Engine=MergeTree ORDER BY tuple();
|
||||
INSERT INTO ids VALUES ('a1451105-722e-4fe7-bfaa-65ad2ae249c2', 'whatever');
|
||||
|
||||
CREATE TABLE data (id UUID, event_time DateTime, status String) Engine=MergeTree ORDER BY tuple();
|
||||
INSERT INTO data VALUES ('a1451105-722e-4fe7-bfaa-65ad2ae249c2', '2000-01-01', 'CREATED');
|
||||
|
||||
CREATE TABLE data2 (id UUID, event_time DateTime, status String) Engine=MergeTree ORDER BY tuple();
|
||||
INSERT INTO data2 VALUES ('a1451105-722e-4fe7-bfaa-65ad2ae249c2', '2000-01-02', 'CREATED');
|
||||
"
|
||||
|
||||
$CLICKHOUSE_CLIENT -nq "
|
||||
SET allow_experimental_analyzer = 1, cluster_for_parallel_replicas = 'parallel_replicas', max_parallel_replicas = 10, allow_experimental_parallel_reading_from_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, max_threads = 1;
|
||||
|
||||
SELECT
|
||||
id,
|
||||
whatever
|
||||
FROM ids AS l
|
||||
INNER JOIN view(
|
||||
SELECT *
|
||||
FROM merge($CLICKHOUSE_DATABASE, 'data.*')
|
||||
) AS s ON l.id = s.id
|
||||
WHERE status IN ['CREATED', 'CREATING']
|
||||
ORDER BY event_time DESC;
|
||||
|
||||
SELECT '---------------------------';
|
||||
|
||||
with
|
||||
results1 as (
|
||||
SELECT id
|
||||
FROM data t1
|
||||
inner join ids t2
|
||||
on t1.id = t2.id
|
||||
),
|
||||
results2 as (
|
||||
SELECT id
|
||||
FROM ids t1
|
||||
inner join data t2
|
||||
on t1.id = t2.id
|
||||
)
|
||||
select * from results1 union all select * from results2;
|
||||
|
||||
SELECT '---------------------------';
|
||||
|
||||
with
|
||||
results1 as (
|
||||
SELECT id
|
||||
FROM data t1
|
||||
inner join ids t2
|
||||
on t1.id = t2.id
|
||||
),
|
||||
results2 as (
|
||||
SELECT id
|
||||
FROM ids t1
|
||||
inner join data t2
|
||||
on t1.id = t2.id
|
||||
)
|
||||
select * from results1 t1 inner join results2 t2 using (id);
|
||||
|
||||
SELECT '---------------------------';
|
||||
|
||||
with
|
||||
results1 as (
|
||||
SELECT t1.id
|
||||
FROM data t1
|
||||
inner join ids t2 on t1.id = t2.id
|
||||
left join data t3 on t2.id = t3.id
|
||||
),
|
||||
results2 as (
|
||||
SELECT id
|
||||
FROM ids t1
|
||||
inner join data t2
|
||||
on t1.id = t2.id
|
||||
)
|
||||
select * from results1 union all select * from results2;
|
||||
"
|
Loading…
Reference in New Issue
Block a user