add setting to disable vertical merges from compact parts

This commit is contained in:
Anton Popov 2023-02-10 16:33:46 +00:00
parent a252d3a139
commit 1205932a19
4 changed files with 124 additions and 1 deletions

View File

@ -958,6 +958,15 @@ MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm
if (global_ctx->future_part->part_format.storage_type != MergeTreeDataPartStorageType::Full)
return MergeAlgorithm::Horizontal;
if (!data_settings->allow_vertical_merges_from_compact_to_wide_parts)
{
for (const auto & part : global_ctx->future_part->parts)
{
if (!isWidePart(part))
return MergeAlgorithm::Horizontal;
}
}
bool is_supported_storage =
ctx->merging_params.mode == MergeTreeData::MergingParams::Ordinary ||
ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing ||

View File

@ -149,6 +149,7 @@ struct Settings;
M(UInt64, min_marks_to_honor_max_concurrent_queries, 0, "Minimal number of marks to honor the MergeTree-level's max_concurrent_queries (0 - disabled). Queries will still be limited by other max_concurrent_queries settings.", 0) \
M(UInt64, min_bytes_to_rebalance_partition_over_jbod, 0, "Minimal amount of bytes to enable part rebalance over JBOD array (0 - disabled).", 0) \
M(Bool, check_sample_column_is_correct, true, "Check columns or columns by hash for sampling are unsigned integer.", 0) \
M(Bool, allow_vertical_merges_from_compact_to_wide_parts, false, "Allows vertical merges from compact to wide parts. This settings must have the same value on all replicas", 0) \
\
/** Experimental/work in progress feature. Unsafe for production. */ \
M(UInt64, part_moves_between_shards_enable, 0, "Experimental/Incomplete feature to move parts between shards. Does not take into account sharding expressions.", 0) \

View File

@ -0,0 +1,112 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node_old = cluster.add_instance(
"node1",
image="clickhouse/clickhouse-server",
tag="22.8",
stay_alive=True,
with_installed_binary=True,
with_zookeeper=True,
)
node_new = cluster.add_instance(
"node2",
with_zookeeper=True,
stay_alive=True,
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_vertical_merges_from_comapact_parts(start_cluster):
for i, node in enumerate([node_old, node_new]):
node.query(
"""
CREATE TABLE t_vertical_merges (id UInt64, v1 UInt64, v2 UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/t_vertical_merges', '{}')
ORDER BY id
SETTINGS
index_granularity = 50,
vertical_merge_algorithm_min_rows_to_activate = 1,
vertical_merge_algorithm_min_columns_to_activate = 1,
min_bytes_for_wide_part = 0,
min_rows_for_wide_part = 100;
""".format(
i
)
)
node_new.query(
"INSERT INTO t_vertical_merges SELECT number, number, number FROM numbers(60)"
)
node_new.query(
"INSERT INTO t_vertical_merges SELECT number * 2, number, number FROM numbers(60)"
)
node_new.query("OPTIMIZE TABLE t_vertical_merges FINAL")
node_old.query("SYSTEM SYNC REPLICA t_vertical_merges")
check_query = """
SELECT merge_algorithm, part_type FROM system.part_log
WHERE event_type = 'MergeParts' AND table = 't_vertical_merges' AND part_name = '{}';
"""
node_new.query("SYSTEM FLUSH LOGS")
node_old.query("SYSTEM FLUSH LOGS")
assert node_new.query(check_query.format("all_0_1_1")) == "Horizontal\tWide\n"
assert node_old.query(check_query.format("all_0_1_1")) == "Horizontal\tWide\n"
node_new.query(
"ALTER TABLE t_vertical_merges MODIFY SETTING allow_vertical_merges_from_compact_to_wide_parts = 1"
)
node_new.query(
"INSERT INTO t_vertical_merges SELECT number * 3, number, number FROM numbers(60)"
)
node_new.query("OPTIMIZE TABLE t_vertical_merges FINAL")
node_old.query("SYSTEM SYNC REPLICA t_vertical_merges")
node_new.query("SYSTEM FLUSH LOGS")
node_old.query("SYSTEM FLUSH LOGS")
assert node_old.contains_in_log(
"CHECKSUM_DOESNT_MATCH"
) or node_new.contains_in_log("CHECKSUM_DOESNT_MATCH")
assert node_new.query(check_query.format("all_0_2_2")) == "Vertical\tWide\n"
assert node_old.query(check_query.format("all_0_2_2")) == "Horizontal\tWide\n"
node_old.restart_with_latest_version()
node_new.restart_clickhouse()
node_old.query(
"ALTER TABLE t_vertical_merges MODIFY SETTING allow_vertical_merges_from_compact_to_wide_parts = 1"
)
node_new.query(
"INSERT INTO t_vertical_merges SELECT number * 4, number, number FROM numbers(60)"
)
node_new.query("OPTIMIZE TABLE t_vertical_merges FINAL")
node_old.query("SYSTEM SYNC REPLICA t_vertical_merges")
node_new.query("SYSTEM FLUSH LOGS")
node_old.query("SYSTEM FLUSH LOGS")
assert not (
node_old.contains_in_log("CHECKSUM_DOESNT_MATCH")
or node_new.contains_in_log("CHECKSUM_DOESNT_MATCH")
)
assert node_new.query(check_query.format("all_0_3_3")) == "Vertical\tWide\n"
assert node_old.query(check_query.format("all_0_3_3")) == "Vertical\tWide\n"

View File

@ -7,7 +7,8 @@ SETTINGS
min_bytes_for_wide_part = 0,
min_rows_for_wide_part = 100,
vertical_merge_algorithm_min_rows_to_activate = 1,
vertical_merge_algorithm_min_columns_to_activate = 1;
vertical_merge_algorithm_min_columns_to_activate = 1,
allow_vertical_merges_from_compact_to_wide_parts = 1;
INSERT INTO t_compact_vertical_merge SELECT number, toString(number), range(number % 10) FROM numbers(40);
INSERT INTO t_compact_vertical_merge SELECT number, toString(number), range(number % 10) FROM numbers(40);