Merge remote-tracking branch 'origin/master' into optimize-join-performance-by-extracting-common-exprs

This commit is contained in:
János Benjamin Antal 2024-11-28 15:05:11 +00:00
commit 217d779840
19 changed files with 99 additions and 42 deletions

View File

@ -43,8 +43,6 @@ Keep an eye out for upcoming meetups and events around the world. Somewhere else
Upcoming meetups
* [Ghent Meetup](https://www.meetup.com/clickhouse-belgium-user-group/events/303049405/) - November 19
* [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/303096434) - November 26
* [Amsterdam Meetup](https://www.meetup.com/clickhouse-netherlands-user-group/events/303638814) - December 3
* [Stockholm Meetup](https://www.meetup.com/clickhouse-stockholm-user-group/events/304382411) - December 9
* [New York Meetup](https://www.meetup.com/clickhouse-new-york-user-group/events/304268174) - December 9
@ -54,6 +52,8 @@ Upcoming meetups
Recently completed meetups
* [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/303096434) - November 26
* [Ghent Meetup](https://www.meetup.com/clickhouse-belgium-user-group/events/303049405/) - November 19
* [Barcelona Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096876/) - November 12
* [Madrid Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096564/) - October 22
* [Singapore Meetup](https://www.meetup.com/clickhouse-singapore-meetup-group/events/303212064/) - October 3

View File

@ -5652,6 +5652,9 @@ Parts virtually divided into segments to be distributed between replicas for par
)", BETA) \
DECLARE(Bool, parallel_replicas_local_plan, true, R"(
Build local plan for local replica
)", BETA) \
DECLARE(Bool, parallel_replicas_index_analysis_only_on_coordinator, true, R"(
Index analysis done only on replica-coordinator and skipped on other replicas. Effective only with enabled parallel_replicas_local_plan
)", BETA) \
\
DECLARE(Bool, allow_experimental_analyzer, true, R"(

View File

@ -61,6 +61,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"24.12",
{
{"optimize_extract_common_expressions", false, false, "Introduce setting to optimize WHERE, PREWHERE, ON, HAVING and QUALIFY expressions by extracting common expressions out from disjunction of conjunctions."},
{"parallel_replicas_index_analysis_only_on_coordinator", false, true, "Index analysis done only on replica-coordinator and skipped on other replicas. Effective only with enabled parallel_replicas_local_plan"},
{"use_async_executor_for_materialized_views", false, false, "New setting."},
}
},

View File

@ -781,6 +781,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
loadIndexGranularity();
/// It's important to load index after index granularity.
if (!(*storage.getSettings())[MergeTreeSetting::primary_key_lazy_load])
index = loadIndex();
@ -959,13 +960,18 @@ void IMergeTreeDataPart::appendFilesOfIndexGranularity(Strings & /* files */) co
template <typename Columns>
void IMergeTreeDataPart::optimizeIndexColumns(size_t marks_count, Columns & index_columns) const
{
if (marks_count == 0)
{
chassert(isEmpty());
return;
}
size_t key_size = index_columns.size();
Float64 ratio_to_drop_suffix_columns = (*storage.getSettings())[MergeTreeSetting::primary_key_ratio_of_unique_prefix_values_to_skip_suffix_columns];
/// Cut useless suffix columns, if necessary.
if (key_size > 1 && ratio_to_drop_suffix_columns > 0 && ratio_to_drop_suffix_columns < 1)
{
chassert(marks_count > 0);
for (size_t j = 0; j < key_size - 1; ++j)
{
size_t num_changes = 0;

View File

@ -80,6 +80,8 @@ namespace Setting
extern const SettingsUInt64 parallel_replica_offset;
extern const SettingsUInt64 parallel_replicas_count;
extern const SettingsParallelReplicasMode parallel_replicas_mode;
extern const SettingsBool parallel_replicas_local_plan;
extern const SettingsBool parallel_replicas_index_analysis_only_on_coordinator;
}
namespace MergeTreeSetting
@ -631,10 +633,23 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
bool use_skip_indexes,
bool find_exact_ranges)
{
RangesInDataParts parts_with_ranges;
parts_with_ranges.resize(parts.size());
const Settings & settings = context->getSettingsRef();
if (context->canUseParallelReplicasOnFollower() && settings[Setting::parallel_replicas_local_plan]
&& settings[Setting::parallel_replicas_index_analysis_only_on_coordinator])
{
// Skip index analysis and return parts with all marks
// The coordinator will chose ranges to read for workers based on index analysis on its side
RangesInDataParts parts_with_ranges;
parts_with_ranges.reserve(parts.size());
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
{
const auto & part = parts[part_index];
parts_with_ranges.emplace_back(part, part_index, MarkRanges{{0, part->getMarksCount()}});
}
return parts_with_ranges;
}
if (use_skip_indexes && settings[Setting::force_data_skipping_indices].changed)
{
const auto & indices_str = settings[Setting::force_data_skipping_indices].toString();
@ -673,6 +688,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
std::atomic<size_t> sum_marks_pk = 0;
std::atomic<size_t> sum_parts_pk = 0;
RangesInDataParts parts_with_ranges(parts.size());
/// Let's find what range to read from each part.
{
auto mark_cache = context->getIndexMarkCache();

View File

@ -270,6 +270,7 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
new_part->index_granularity = std::move(new_index_granularity);
}
/// It's important to set index after index granularity.
if (auto computed_index = writer->releaseIndexColumns())
new_part->setIndex(std::move(*computed_index));

View File

@ -994,6 +994,7 @@ void finalizeMutatedPart(
new_data_part->index_granularity = std::move(new_index_granularity);
}
/// It's important to set index after index granularity.
if (!new_data_part->storage.getPrimaryIndexCache())
new_data_part->setIndex(*source_part->getIndex());

View File

@ -42,6 +42,7 @@ TRUSTED_CONTRIBUTORS = {
"tsolodov", # ClickHouse, Inc
"justindeguzman", # ClickHouse, Inc
"XuJia0210", # ClickHouse, Inc
"nauu", # ClickHouse, Inc
]
}

View File

@ -984,6 +984,8 @@ class MergeTreeSettingsRandomizer:
"prewarm_mark_cache": lambda: random.randint(0, 1),
"use_const_adaptive_granularity": lambda: random.randint(0, 1),
"enable_index_granularity_compression": lambda: random.randint(0, 1),
"use_primary_key_cache": lambda: random.randint(0, 1),
"prewarm_primary_key_cache": lambda: random.randint(0, 1),
}
@staticmethod

View File

@ -184,6 +184,9 @@ function stop_server()
# Preserve the pid, since the server can hung after the PID will be deleted.
pid="$(cat /var/run/clickhouse-server/clickhouse-server.pid)"
# FIXME: workaround for slow shutdown of Distributed tables (due to sequential tables shutdown)
# Refs: https://github.com/ClickHouse/ClickHouse/issues/72557
clickhouse client -q "SYSTEM STOP DISTRIBUTED SENDS" ||:
clickhouse stop --max-tries "$max_tries" --do-not-kill && return
if [ "$check_hang" == true ]

View File

@ -71,6 +71,7 @@ def _get_result_with_parallel_replicas(
"cluster_for_parallel_replicas": f"{cluster_name}",
"parallel_replicas_mark_segment_size": parallel_replicas_mark_segment_size,
"query_id": query_id,
"parallel_replicas_index_analysis_only_on_coordinator": False,
},
)

View File

@ -1,4 +1,5 @@
import re
import uuid
from random import randint
import pytest
@ -18,15 +19,6 @@ nodes = [
]
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def _create_tables(table_name):
nodes[0].query(
f"DROP TABLE IF EXISTS {table_name} ON CLUSTER 'parallel_replicas'",
@ -48,28 +40,41 @@ def _create_tables(table_name):
nodes[0].query(f"SYSTEM SYNC REPLICA ON CLUSTER 'parallel_replicas' {table_name}")
table_name = "t"
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
_create_tables(table_name)
yield cluster
finally:
cluster.shutdown()
# now mark_segment_size is part of the protocol and is communicated to the initiator.
# let's check that the correct value is actually used by the coordinator
def test_mark_segment_size_communicated_correctly(start_cluster):
table_name = "t"
_create_tables(table_name)
@pytest.mark.parametrize("local_plan", [0, 1])
@pytest.mark.parametrize("index_analysis_only_on_coordinator", [0, 1])
def test_mark_segment_size_communicated_correctly(
start_cluster, local_plan, index_analysis_only_on_coordinator
):
for local_plan in [0, 1]:
query_id = f"query_id_{randint(0, 1000000)}"
nodes[0].query(
f"SELECT sum(value) FROM {table_name}",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 100,
"cluster_for_parallel_replicas": "parallel_replicas",
"parallel_replicas_mark_segment_size": 0,
"parallel_replicas_local_plan": local_plan,
"query_id": query_id,
},
)
query_id = f"query_id_{str(uuid.uuid4())}"
nodes[0].query(
f"SELECT sum(value) FROM {table_name}",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 100,
"cluster_for_parallel_replicas": "parallel_replicas",
"parallel_replicas_mark_segment_size": 0,
"parallel_replicas_local_plan": local_plan,
"query_id": query_id,
"parallel_replicas_index_analysis_only_on_coordinator": index_analysis_only_on_coordinator,
},
)
nodes[0].query("SYSTEM FLUSH LOGS")
log_line = nodes[0].grep_in_log(
f"{query_id}.*Reading state is fully initialized"
)
assert re.search(r"mark_segment_size: (\d+)", log_line).group(1) == "16384"
nodes[0].query("SYSTEM FLUSH LOGS")
log_line = nodes[0].grep_in_log(f"{query_id}.*Reading state is fully initialized")
assert re.search(r"mark_segment_size: (\d+)", log_line).group(1) == "16384"

View File

@ -1,5 +1,5 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1;
CREATE TABLE test (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1, use_primary_key_cache = 0;
SET optimize_trivial_insert_select = 1;
INSERT INTO test SELECT randomString(1000) FROM numbers(100000);

View File

@ -1,8 +1,8 @@
DROP TABLE IF EXISTS test;
DROP TABLE IF EXISTS test2;
CREATE TABLE test (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1;
CREATE TABLE test2 (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1;
CREATE TABLE test (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1, use_primary_key_cache = 0;
CREATE TABLE test2 (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1, use_primary_key_cache = 0;
INSERT INTO test SELECT randomString(1000) FROM numbers(100000);
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table IN ('test', 'test2');

View File

@ -2,8 +2,8 @@
DROP TABLE IF EXISTS test;
DROP TABLE IF EXISTS test2;
CREATE TABLE test (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1;
CREATE TABLE test2 (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1;
CREATE TABLE test (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1, use_primary_key_cache = 0;
CREATE TABLE test2 (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1, use_primary_key_cache = 0;
INSERT INTO test SELECT randomString(1000) FROM numbers(100000);
INSERT INTO test2 SELECT randomString(1000) FROM numbers(100000);

View File

@ -9,7 +9,7 @@ $CLICKHOUSE_CLIENT "
CREATE TABLE t_unload_primary_key (a UInt64, b UInt64)
ENGINE = MergeTree ORDER BY a
SETTINGS old_parts_lifetime = 10000;
SETTINGS old_parts_lifetime = 10000, use_primary_key_cache = 0;
INSERT INTO t_unload_primary_key VALUES (1, 1);

View File

@ -0,0 +1 @@
0 0

View File

@ -0,0 +1,15 @@
DROP TABLE IF EXISTS t_index_empty_part;
CREATE TABLE t_index_empty_part (c0 Int, c1 Int)
ENGINE = MergeTree() PRIMARY KEY (c0, c1)
SETTINGS primary_key_lazy_load = 0, remove_empty_parts = 0;
INSERT INTO TABLE t_index_empty_part (c0, c1) VALUES (1, 1);
TRUNCATE t_index_empty_part;
DETACH TABLE t_index_empty_part;
ATTACH TABLE t_index_empty_part;
SELECT rows, primary_key_bytes_in_memory FROM system.parts WHERE database = currentDatabase() AND table = 't_index_empty_part';
DROP TABLE t_index_empty_part;

View File

@ -2,7 +2,7 @@ DROP TABLE IF EXISTS t_index_lazy_load;
CREATE TABLE t_index_lazy_load (a UInt64)
ENGINE = MergeTree ORDER BY a
SETTINGS index_granularity = 4, index_granularity_bytes = '10M', primary_key_lazy_load = 1;
SETTINGS index_granularity = 4, index_granularity_bytes = '10M', primary_key_lazy_load = 1, use_primary_key_cache = 0;
INSERT INTO t_index_lazy_load SELECT number FROM numbers(15);