diff --git a/README.md b/README.md index 2292966bebd..1b22826e13f 100644 --- a/README.md +++ b/README.md @@ -43,8 +43,6 @@ Keep an eye out for upcoming meetups and events around the world. Somewhere else Upcoming meetups -* [Ghent Meetup](https://www.meetup.com/clickhouse-belgium-user-group/events/303049405/) - November 19 -* [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/303096434) - November 26 * [Amsterdam Meetup](https://www.meetup.com/clickhouse-netherlands-user-group/events/303638814) - December 3 * [Stockholm Meetup](https://www.meetup.com/clickhouse-stockholm-user-group/events/304382411) - December 9 * [New York Meetup](https://www.meetup.com/clickhouse-new-york-user-group/events/304268174) - December 9 @@ -54,6 +52,8 @@ Upcoming meetups Recently completed meetups +* [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/303096434) - November 26 +* [Ghent Meetup](https://www.meetup.com/clickhouse-belgium-user-group/events/303049405/) - November 19 * [Barcelona Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096876/) - November 12 * [Madrid Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096564/) - October 22 * [Singapore Meetup](https://www.meetup.com/clickhouse-singapore-meetup-group/events/303212064/) - October 3 diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index d61d0fca544..cf9b0e48892 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -5652,6 +5652,9 @@ Parts virtually divided into segments to be distributed between replicas for par )", BETA) \ DECLARE(Bool, parallel_replicas_local_plan, true, R"( Build local plan for local replica +)", BETA) \ + DECLARE(Bool, parallel_replicas_index_analysis_only_on_coordinator, true, R"( +Index analysis done only on replica-coordinator and skipped on other replicas. Effective only with enabled parallel_replicas_local_plan )", BETA) \ \ DECLARE(Bool, allow_experimental_analyzer, true, R"( diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 4edbd63cfb1..f951c5b0ed9 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -61,6 +61,7 @@ static std::initializer_list void IMergeTreeDataPart::optimizeIndexColumns(size_t marks_count, Columns & index_columns) const { + if (marks_count == 0) + { + chassert(isEmpty()); + return; + } + size_t key_size = index_columns.size(); Float64 ratio_to_drop_suffix_columns = (*storage.getSettings())[MergeTreeSetting::primary_key_ratio_of_unique_prefix_values_to_skip_suffix_columns]; /// Cut useless suffix columns, if necessary. if (key_size > 1 && ratio_to_drop_suffix_columns > 0 && ratio_to_drop_suffix_columns < 1) { - chassert(marks_count > 0); for (size_t j = 0; j < key_size - 1; ++j) { size_t num_changes = 0; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 56b9eaff261..2934dc1c91c 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -80,6 +80,8 @@ namespace Setting extern const SettingsUInt64 parallel_replica_offset; extern const SettingsUInt64 parallel_replicas_count; extern const SettingsParallelReplicasMode parallel_replicas_mode; + extern const SettingsBool parallel_replicas_local_plan; + extern const SettingsBool parallel_replicas_index_analysis_only_on_coordinator; } namespace MergeTreeSetting @@ -631,10 +633,23 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd bool use_skip_indexes, bool find_exact_ranges) { - RangesInDataParts parts_with_ranges; - parts_with_ranges.resize(parts.size()); const Settings & settings = context->getSettingsRef(); + if (context->canUseParallelReplicasOnFollower() && settings[Setting::parallel_replicas_local_plan] + && settings[Setting::parallel_replicas_index_analysis_only_on_coordinator]) + { + // Skip index analysis and return parts with all marks + // The coordinator will chose ranges to read for workers based on index analysis on its side + RangesInDataParts parts_with_ranges; + parts_with_ranges.reserve(parts.size()); + for (size_t part_index = 0; part_index < parts.size(); ++part_index) + { + const auto & part = parts[part_index]; + parts_with_ranges.emplace_back(part, part_index, MarkRanges{{0, part->getMarksCount()}}); + } + return parts_with_ranges; + } + if (use_skip_indexes && settings[Setting::force_data_skipping_indices].changed) { const auto & indices_str = settings[Setting::force_data_skipping_indices].toString(); @@ -673,6 +688,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd std::atomic sum_marks_pk = 0; std::atomic sum_parts_pk = 0; + RangesInDataParts parts_with_ranges(parts.size()); + /// Let's find what range to read from each part. { auto mark_cache = context->getIndexMarkCache(); diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 32a8fb0c5f6..f965857a61d 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -270,6 +270,7 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync( new_part->index_granularity = std::move(new_index_granularity); } + /// It's important to set index after index granularity. if (auto computed_index = writer->releaseIndexColumns()) new_part->setIndex(std::move(*computed_index)); diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 8c8c07fa266..e54511166a2 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -994,6 +994,7 @@ void finalizeMutatedPart( new_data_part->index_granularity = std::move(new_index_granularity); } + /// It's important to set index after index granularity. if (!new_data_part->storage.getPrimaryIndexCache()) new_data_part->setIndex(*source_part->getIndex()); diff --git a/tests/ci/run_check.py b/tests/ci/run_check.py index 61c3c0c4ec4..ee92f0714b8 100644 --- a/tests/ci/run_check.py +++ b/tests/ci/run_check.py @@ -42,6 +42,7 @@ TRUSTED_CONTRIBUTORS = { "tsolodov", # ClickHouse, Inc "justindeguzman", # ClickHouse, Inc "XuJia0210", # ClickHouse, Inc + "nauu", # ClickHouse, Inc ] } diff --git a/tests/clickhouse-test b/tests/clickhouse-test index 274232f0985..aad3e619e81 100755 --- a/tests/clickhouse-test +++ b/tests/clickhouse-test @@ -984,6 +984,8 @@ class MergeTreeSettingsRandomizer: "prewarm_mark_cache": lambda: random.randint(0, 1), "use_const_adaptive_granularity": lambda: random.randint(0, 1), "enable_index_granularity_compression": lambda: random.randint(0, 1), + "use_primary_key_cache": lambda: random.randint(0, 1), + "prewarm_primary_key_cache": lambda: random.randint(0, 1), } @staticmethod diff --git a/tests/docker_scripts/stress_tests.lib b/tests/docker_scripts/stress_tests.lib index 5c346a2d17f..77865436f35 100644 --- a/tests/docker_scripts/stress_tests.lib +++ b/tests/docker_scripts/stress_tests.lib @@ -184,6 +184,9 @@ function stop_server() # Preserve the pid, since the server can hung after the PID will be deleted. pid="$(cat /var/run/clickhouse-server/clickhouse-server.pid)" + # FIXME: workaround for slow shutdown of Distributed tables (due to sequential tables shutdown) + # Refs: https://github.com/ClickHouse/ClickHouse/issues/72557 + clickhouse client -q "SYSTEM STOP DISTRIBUTED SENDS" ||: clickhouse stop --max-tries "$max_tries" --do-not-kill && return if [ "$check_hang" == true ] diff --git a/tests/integration/test_parallel_replicas_all_marks_read/test.py b/tests/integration/test_parallel_replicas_all_marks_read/test.py index 593b98126df..c93e8c7c09b 100644 --- a/tests/integration/test_parallel_replicas_all_marks_read/test.py +++ b/tests/integration/test_parallel_replicas_all_marks_read/test.py @@ -71,6 +71,7 @@ def _get_result_with_parallel_replicas( "cluster_for_parallel_replicas": f"{cluster_name}", "parallel_replicas_mark_segment_size": parallel_replicas_mark_segment_size, "query_id": query_id, + "parallel_replicas_index_analysis_only_on_coordinator": False, }, ) diff --git a/tests/integration/test_parallel_replicas_protocol/test.py b/tests/integration/test_parallel_replicas_protocol/test.py index 2ed39a3273f..c9a8ffa287b 100644 --- a/tests/integration/test_parallel_replicas_protocol/test.py +++ b/tests/integration/test_parallel_replicas_protocol/test.py @@ -1,4 +1,5 @@ import re +import uuid from random import randint import pytest @@ -18,15 +19,6 @@ nodes = [ ] -@pytest.fixture(scope="module", autouse=True) -def start_cluster(): - try: - cluster.start() - yield cluster - finally: - cluster.shutdown() - - def _create_tables(table_name): nodes[0].query( f"DROP TABLE IF EXISTS {table_name} ON CLUSTER 'parallel_replicas'", @@ -48,28 +40,41 @@ def _create_tables(table_name): nodes[0].query(f"SYSTEM SYNC REPLICA ON CLUSTER 'parallel_replicas' {table_name}") +table_name = "t" + + +@pytest.fixture(scope="module", autouse=True) +def start_cluster(): + try: + cluster.start() + _create_tables(table_name) + yield cluster + finally: + cluster.shutdown() + + # now mark_segment_size is part of the protocol and is communicated to the initiator. # let's check that the correct value is actually used by the coordinator -def test_mark_segment_size_communicated_correctly(start_cluster): - table_name = "t" - _create_tables(table_name) +@pytest.mark.parametrize("local_plan", [0, 1]) +@pytest.mark.parametrize("index_analysis_only_on_coordinator", [0, 1]) +def test_mark_segment_size_communicated_correctly( + start_cluster, local_plan, index_analysis_only_on_coordinator +): - for local_plan in [0, 1]: - query_id = f"query_id_{randint(0, 1000000)}" - nodes[0].query( - f"SELECT sum(value) FROM {table_name}", - settings={ - "allow_experimental_parallel_reading_from_replicas": 2, - "max_parallel_replicas": 100, - "cluster_for_parallel_replicas": "parallel_replicas", - "parallel_replicas_mark_segment_size": 0, - "parallel_replicas_local_plan": local_plan, - "query_id": query_id, - }, - ) + query_id = f"query_id_{str(uuid.uuid4())}" + nodes[0].query( + f"SELECT sum(value) FROM {table_name}", + settings={ + "allow_experimental_parallel_reading_from_replicas": 2, + "max_parallel_replicas": 100, + "cluster_for_parallel_replicas": "parallel_replicas", + "parallel_replicas_mark_segment_size": 0, + "parallel_replicas_local_plan": local_plan, + "query_id": query_id, + "parallel_replicas_index_analysis_only_on_coordinator": index_analysis_only_on_coordinator, + }, + ) - nodes[0].query("SYSTEM FLUSH LOGS") - log_line = nodes[0].grep_in_log( - f"{query_id}.*Reading state is fully initialized" - ) - assert re.search(r"mark_segment_size: (\d+)", log_line).group(1) == "16384" + nodes[0].query("SYSTEM FLUSH LOGS") + log_line = nodes[0].grep_in_log(f"{query_id}.*Reading state is fully initialized") + assert re.search(r"mark_segment_size: (\d+)", log_line).group(1) == "16384" diff --git a/tests/queries/0_stateless/02993_lazy_index_loading.sql b/tests/queries/0_stateless/02993_lazy_index_loading.sql index 5d4233e7d5e..df1912483d2 100644 --- a/tests/queries/0_stateless/02993_lazy_index_loading.sql +++ b/tests/queries/0_stateless/02993_lazy_index_loading.sql @@ -1,5 +1,5 @@ DROP TABLE IF EXISTS test; -CREATE TABLE test (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1; +CREATE TABLE test (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1, use_primary_key_cache = 0; SET optimize_trivial_insert_select = 1; INSERT INTO test SELECT randomString(1000) FROM numbers(100000); diff --git a/tests/queries/0_stateless/03127_system_unload_primary_key_table.sql b/tests/queries/0_stateless/03127_system_unload_primary_key_table.sql index 414e661c5ba..088a56910f1 100644 --- a/tests/queries/0_stateless/03127_system_unload_primary_key_table.sql +++ b/tests/queries/0_stateless/03127_system_unload_primary_key_table.sql @@ -1,8 +1,8 @@ DROP TABLE IF EXISTS test; DROP TABLE IF EXISTS test2; -CREATE TABLE test (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1; -CREATE TABLE test2 (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1; +CREATE TABLE test (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1, use_primary_key_cache = 0; +CREATE TABLE test2 (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1, use_primary_key_cache = 0; INSERT INTO test SELECT randomString(1000) FROM numbers(100000); SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table IN ('test', 'test2'); diff --git a/tests/queries/0_stateless/03128_system_unload_primary_key.sql b/tests/queries/0_stateless/03128_system_unload_primary_key.sql index 907afb331c5..521ccfb131d 100644 --- a/tests/queries/0_stateless/03128_system_unload_primary_key.sql +++ b/tests/queries/0_stateless/03128_system_unload_primary_key.sql @@ -2,8 +2,8 @@ DROP TABLE IF EXISTS test; DROP TABLE IF EXISTS test2; -CREATE TABLE test (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1; -CREATE TABLE test2 (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1; +CREATE TABLE test (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1, use_primary_key_cache = 0; +CREATE TABLE test2 (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granularity = 1, use_primary_key_cache = 0; INSERT INTO test SELECT randomString(1000) FROM numbers(100000); INSERT INTO test2 SELECT randomString(1000) FROM numbers(100000); diff --git a/tests/queries/0_stateless/03198_unload_primary_key_outdated.sh b/tests/queries/0_stateless/03198_unload_primary_key_outdated.sh index c759cc34425..f43fd6bc310 100755 --- a/tests/queries/0_stateless/03198_unload_primary_key_outdated.sh +++ b/tests/queries/0_stateless/03198_unload_primary_key_outdated.sh @@ -9,7 +9,7 @@ $CLICKHOUSE_CLIENT " CREATE TABLE t_unload_primary_key (a UInt64, b UInt64) ENGINE = MergeTree ORDER BY a - SETTINGS old_parts_lifetime = 10000; + SETTINGS old_parts_lifetime = 10000, use_primary_key_cache = 0; INSERT INTO t_unload_primary_key VALUES (1, 1); diff --git a/tests/queries/0_stateless/03276_index_empty_part.reference b/tests/queries/0_stateless/03276_index_empty_part.reference new file mode 100644 index 00000000000..d05b1f927f4 --- /dev/null +++ b/tests/queries/0_stateless/03276_index_empty_part.reference @@ -0,0 +1 @@ +0 0 diff --git a/tests/queries/0_stateless/03276_index_empty_part.sql b/tests/queries/0_stateless/03276_index_empty_part.sql new file mode 100644 index 00000000000..19d160c5cdf --- /dev/null +++ b/tests/queries/0_stateless/03276_index_empty_part.sql @@ -0,0 +1,15 @@ +DROP TABLE IF EXISTS t_index_empty_part; + +CREATE TABLE t_index_empty_part (c0 Int, c1 Int) +ENGINE = MergeTree() PRIMARY KEY (c0, c1) +SETTINGS primary_key_lazy_load = 0, remove_empty_parts = 0; + +INSERT INTO TABLE t_index_empty_part (c0, c1) VALUES (1, 1); + +TRUNCATE t_index_empty_part; +DETACH TABLE t_index_empty_part; +ATTACH TABLE t_index_empty_part; + +SELECT rows, primary_key_bytes_in_memory FROM system.parts WHERE database = currentDatabase() AND table = 't_index_empty_part'; + +DROP TABLE t_index_empty_part; diff --git a/tests/queries/0_stateless/03276_merge_tree_index_lazy_load.sql b/tests/queries/0_stateless/03276_merge_tree_index_lazy_load.sql index 4e3137c5514..70b4ee282be 100644 --- a/tests/queries/0_stateless/03276_merge_tree_index_lazy_load.sql +++ b/tests/queries/0_stateless/03276_merge_tree_index_lazy_load.sql @@ -2,7 +2,7 @@ DROP TABLE IF EXISTS t_index_lazy_load; CREATE TABLE t_index_lazy_load (a UInt64) ENGINE = MergeTree ORDER BY a -SETTINGS index_granularity = 4, index_granularity_bytes = '10M', primary_key_lazy_load = 1; +SETTINGS index_granularity = 4, index_granularity_bytes = '10M', primary_key_lazy_load = 1, use_primary_key_cache = 0; INSERT INTO t_index_lazy_load SELECT number FROM numbers(15);