disable vertical merges with cleanup

This commit is contained in:
Alexander Tokmakov 2023-12-27 19:28:50 +01:00
parent f924848347
commit f5bcfaffa5
8 changed files with 73 additions and 22 deletions

View File

@ -41,6 +41,7 @@ namespace ErrorCodes
extern const int ABORTED;
extern const int DIRECTORY_ALREADY_EXISTS;
extern const int LOGICAL_ERROR;
extern const int SUPPORT_IS_DISABLED;
}
@ -1005,10 +1006,13 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
break;
case MergeTreeData::MergingParams::Replacing:
if (global_ctx->cleanup && !data_settings->allow_experimental_replacing_merge_with_cleanup)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed");
merged_transform = std::make_shared<ReplacingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.is_deleted_column, ctx->merging_params.version_column,
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size,
(data_settings->clean_deleted_rows != CleanDeletedRows::Never) || global_ctx->cleanup);
global_ctx->cleanup);
break;
case MergeTreeData::MergingParams::Graphite:
@ -1086,6 +1090,8 @@ MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm
return MergeAlgorithm::Horizontal;
if (global_ctx->future_part->part_format.storage_type != MergeTreeDataPartStorageType::Full)
return MergeAlgorithm::Horizontal;
if (global_ctx->cleanup)
return MergeAlgorithm::Horizontal;
if (!data_settings->allow_vertical_merges_from_compact_to_wide_parts)
{

View File

@ -74,7 +74,6 @@ struct Settings;
M(Bool, min_age_to_force_merge_on_partition_only, false, "Whether min_age_to_force_merge_seconds should be applied only on the entire partition and not on subset.", false) \
M(UInt64, number_of_free_entries_in_pool_to_execute_optimize_entire_partition, 25, "When there is less than specified number of free entries in pool, do not try to execute optimize entire partition with a merge (this merge is created when set min_age_to_force_merge_seconds > 0 and min_age_to_force_merge_on_partition_only = true). This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \
M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \
M(CleanDeletedRows, clean_deleted_rows, CleanDeletedRows::Never, "Is the Replicated Merge cleanup has to be done automatically at each merge or manually (possible values are 'Always'/'Never' (default))", 0) \
M(UInt64, replicated_max_mutations_in_one_entry, 10000, "Max number of mutation commands that can be merged together and executed in one MUTATE_PART entry (0 means unlimited)", 0) \
M(UInt64, number_of_mutations_to_delay, 500, "If table has at least that many unfinished mutations, artificially slow down mutations of table. Disabled if set to 0", 0) \
M(UInt64, number_of_mutations_to_throw, 1000, "If table has at least that many unfinished mutations, throw 'Too many mutations' exception. Disabled if set to 0", 0) \
@ -193,6 +192,7 @@ struct Settings;
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \
M(Bool, cache_populated_by_fetch, false, "Only available in ClickHouse Cloud", 0) \
M(Bool, allow_experimental_block_number_column, false, "Enable persisting column _block_number for each row.", 0) \
M(Bool, allow_experimental_replacing_merge_with_cleanup, false, "Allow experimental CLEANUP merges for ReplacingMergeTree with is_deleted column.", 0) \
\
/** Compress marks and primary key. */ \
M(Bool, compress_marks, true, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \
@ -233,6 +233,7 @@ struct Settings;
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, Seconds, replicated_fetches_http_send_timeout, 0) \
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, Seconds, replicated_fetches_http_receive_timeout, 0) \
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, UInt64, replicated_max_parallel_fetches_for_host, DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT) \
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, CleanDeletedRows, clean_deleted_rows, CleanDeletedRows::Never) \
/// Settings that should not change after the creation of a table.
/// NOLINTNEXTLINE

View File

@ -62,6 +62,7 @@ namespace ErrorCodes
extern const int UNKNOWN_POLICY;
extern const int NO_SUCH_DATA_PART;
extern const int ABORTED;
extern const int SUPPORT_IS_DISABLED;
}
namespace ActionLocks
@ -1530,6 +1531,9 @@ bool StorageMergeTree::optimize(
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);
}
if (cleanup && !getSettings()->allow_experimental_replacing_merge_with_cleanup)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed");
DataPartsVector data_parts = getVisibleDataPartsVector(local_context);
std::unordered_set<String> partition_ids;

View File

@ -3643,7 +3643,6 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
const auto storage_settings_ptr = getSettings();
const bool deduplicate = false; /// TODO: read deduplicate option from table config
const Names deduplicate_by_columns = {};
const bool cleanup = (storage_settings_ptr->clean_deleted_rows != CleanDeletedRows::Never);
CreateMergeEntryResult create_result = CreateMergeEntryResult::Other;
enum class AttemptStatus
@ -3727,7 +3726,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
future_merged_part->part_format,
deduplicate,
deduplicate_by_columns,
cleanup,
/*cleanup*/ false,
nullptr,
merge_pred->getVersion(),
future_merged_part->merge_type);
@ -5637,7 +5636,11 @@ bool StorageReplicatedMergeTree::optimize(
throw Exception(ErrorCodes::NOT_A_LEADER, "OPTIMIZE cannot be done on this replica because it is not a leader");
if (cleanup)
{
if (!getSettings()->allow_experimental_replacing_merge_with_cleanup)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed");
LOG_DEBUG(log, "Cleanup the ReplicatedMergeTree.");
}
auto handle_noop = [&]<typename... Args>(FormatStringHelper<Args...> fmt_string, Args && ...args)
{

View File

@ -3,7 +3,7 @@ set optimize_on_insert = 0;
drop table if exists tab_00577;
create table tab_00577 (date Date, version UInt64, val UInt64) engine = ReplacingMergeTree(version) partition by date order by date settings enable_vertical_merge_algorithm = 1,
vertical_merge_algorithm_min_rows_to_activate = 0, vertical_merge_algorithm_min_columns_to_activate = 0, min_rows_for_wide_part = 0,
min_bytes_for_wide_part = 0;
min_bytes_for_wide_part = 0, allow_experimental_replacing_merge_with_cleanup=1;
insert into tab_00577 values ('2018-01-01', 2, 2), ('2018-01-01', 1, 1);
insert into tab_00577 values ('2018-01-01', 0, 0);
select * from tab_00577 order by version;
@ -16,7 +16,7 @@ DROP TABLE IF EXISTS testCleanupR1;
CREATE TABLE testCleanupR1 (uid String, version UInt32, is_deleted UInt8)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/test_cleanup/', 'r1', version, is_deleted)
ORDER BY uid SETTINGS enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 0, vertical_merge_algorithm_min_columns_to_activate = 0, min_rows_for_wide_part = 0,
min_bytes_for_wide_part = 0;
min_bytes_for_wide_part = 0, allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO testCleanupR1 (*) VALUES ('d1', 1, 0),('d2', 1, 0),('d3', 1, 0),('d4', 1, 0);
INSERT INTO testCleanupR1 (*) VALUES ('d3', 2, 1);
INSERT INTO testCleanupR1 (*) VALUES ('d1', 2, 1);

View File

@ -91,6 +91,28 @@ d4 1 0
== (Replicas) Test settings ==
c2 1 0
c4 1 0
no cleanup 1 d1 5 0
no cleanup 1 d2 1 0
no cleanup 1 d3 1 0
no cleanup 1 d4 3 0
no cleanup 1 d5 1 0
no cleanup 2 d1 5 0
no cleanup 2 d2 1 0
no cleanup 2 d3 1 0
no cleanup 2 d4 3 0
no cleanup 2 d5 1 0
no cleanup 2 d6 2 1
no cleanup 3 d1 5 0
no cleanup 3 d2 1 0
no cleanup 3 d3 1 0
no cleanup 3 d4 3 0
no cleanup 3 d5 1 0
no cleanup 4 d1 5 0
no cleanup 4 d2 1 0
no cleanup 4 d3 1 0
no cleanup 4 d4 3 0
no cleanup 4 d5 1 0
no cleanup 4 d6 2 1
== Check cleanup & settings for other merge trees ==
d1 1 1
d1 1 1

View File

@ -5,7 +5,7 @@ set allow_deprecated_syntax_for_merge_tree=0;
-- Test the bahaviour without the is_deleted column
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version) Order by (uid);
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version) Order by (uid) settings allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
SELECT '== Test SELECT ... FINAL - no is_deleted ==';
select * from test FINAL order by uid;
@ -13,7 +13,7 @@ OPTIMIZE TABLE test FINAL CLEANUP;
select * from test order by uid;
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version) Order by (uid) SETTINGS clean_deleted_rows='Always';
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version) Order by (uid) SETTINGS clean_deleted_rows='Always', allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
SELECT '== Test SELECT ... FINAL - no is_deleted SETTINGS clean_deleted_rows=Always ==';
select * from test FINAL order by uid;
@ -22,7 +22,7 @@ select * from test order by uid;
-- Test the new behaviour
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid);
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid) settings allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
SELECT '== Test SELECT ... FINAL ==';
select * from test FINAL order by uid;
@ -37,7 +37,7 @@ INSERT INTO test (*) VALUES ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2,
select * from test FINAL order by uid;
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid);
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid) settings allow_experimental_replacing_merge_with_cleanup=1;
-- Expect d6 to be version=3 is_deleted=false
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 3, 0);
@ -56,7 +56,7 @@ OPTIMIZE TABLE test FINAL CLEANUP;
select * from test order by uid;
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid) SETTINGS clean_deleted_rows='Always';
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid) SETTINGS clean_deleted_rows='Always', allow_experimental_replacing_merge_with_cleanup=1;
SELECT '== Test of the SETTINGS clean_deleted_rows as Always ==';
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
@ -66,7 +66,7 @@ select * from test order by uid;
OPTIMIZE TABLE test FINAL;
-- d6 has to be removed since we set clean_deleted_rows as 'Always'
select * from test order by uid;
select * from test where is_deleted=0 order by uid;
SELECT '== Test of the SETTINGS clean_deleted_rows as Never ==';
ALTER TABLE test MODIFY SETTING clean_deleted_rows='Never';
@ -80,7 +80,7 @@ DROP TABLE IF EXISTS testCleanupR1;
CREATE TABLE testCleanupR1 (uid String, version UInt32, is_deleted UInt8)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/test_cleanup/', 'r1', version, is_deleted)
ORDER BY uid;
ORDER BY uid settings allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO testCleanupR1 (*) VALUES ('d1', 1, 0),('d2', 1, 0),('d3', 1, 0),('d4', 1, 0);
@ -101,7 +101,7 @@ DROP TABLE IF EXISTS testSettingsR1;
CREATE TABLE testSettingsR1 (col1 String, version UInt32, is_deleted UInt8)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/test_setting/', 'r1', version, is_deleted)
ORDER BY col1
SETTINGS clean_deleted_rows = 'Always';
SETTINGS clean_deleted_rows = 'Always', allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO testSettingsR1 (*) VALUES ('c1', 1, 1),('c2', 1, 0),('c3', 1, 1),('c4', 1, 0);
SYSTEM SYNC REPLICA testSettingsR1; -- Avoid "Cannot select parts for optimization: Entry for part all_2_2_0 hasn't been read from the replication log yet"
@ -110,13 +110,13 @@ OPTIMIZE TABLE testSettingsR1 FINAL;
-- Only d3 to d5 remain
SELECT '== (Replicas) Test settings ==';
SELECT * FROM testSettingsR1 order by col1;
SELECT * FROM testSettingsR1 where is_deleted=0 order by col1;
------------------------------
-- Check errors
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid);
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid) settings allow_experimental_replacing_merge_with_cleanup=1;
-- is_deleted == 0/1
INSERT INTO test (*) VALUES ('d1', 1, 2); -- { serverError INCORRECT_DATA }
@ -125,35 +125,49 @@ DROP TABLE IF EXISTS test;
-- checkis_deleted type
CREATE TABLE test (uid String, version UInt32, is_deleted String) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid); -- { serverError BAD_TYPE_OF_FIELD }
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid);
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
select 'no cleanup 1', * from test FINAL order by uid;
OPTIMIZE TABLE test FINAL CLEANUP; -- { serverError SUPPORT_IS_DISABLED }
select 'no cleanup 2', * from test order by uid;
DROP TABLE test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/no_cleanup/', 'r1', version, is_deleted) Order by (uid);
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
select 'no cleanup 3', * from test FINAL order by uid;
OPTIMIZE TABLE test FINAL CLEANUP; -- { serverError SUPPORT_IS_DISABLED }
select 'no cleanup 4', * from test order by uid;
DROP TABLE test;
-- is_deleted column for other mergeTrees - ErrorCodes::LOGICAL_ERROR)
-- Check clean_deleted_rows='Always' for other MergeTrees
SELECT '== Check cleanup & settings for other merge trees ==';
CREATE TABLE testMT (uid String, version UInt32, is_deleted UInt8) ENGINE = MergeTree() Order by (uid) SETTINGS clean_deleted_rows='Always';
CREATE TABLE testMT (uid String, version UInt32, is_deleted UInt8) ENGINE = MergeTree() Order by (uid) SETTINGS clean_deleted_rows='Always', allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO testMT (*) VALUES ('d1', 1, 1);
OPTIMIZE TABLE testMT FINAL CLEANUP; -- { serverError CANNOT_ASSIGN_OPTIMIZE }
OPTIMIZE TABLE testMT FINAL;
SELECT * FROM testMT order by uid;
CREATE TABLE testSummingMT (uid String, version UInt32, is_deleted UInt8) ENGINE = SummingMergeTree() Order by (uid) SETTINGS clean_deleted_rows='Always';
CREATE TABLE testSummingMT (uid String, version UInt32, is_deleted UInt8) ENGINE = SummingMergeTree() Order by (uid) SETTINGS clean_deleted_rows='Always', allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO testSummingMT (*) VALUES ('d1', 1, 1);
OPTIMIZE TABLE testSummingMT FINAL CLEANUP; -- { serverError CANNOT_ASSIGN_OPTIMIZE }
OPTIMIZE TABLE testSummingMT FINAL;
SELECT * FROM testSummingMT order by uid;
CREATE TABLE testAggregatingMT (uid String, version UInt32, is_deleted UInt8) ENGINE = AggregatingMergeTree() Order by (uid) SETTINGS clean_deleted_rows='Always';
CREATE TABLE testAggregatingMT (uid String, version UInt32, is_deleted UInt8) ENGINE = AggregatingMergeTree() Order by (uid) SETTINGS clean_deleted_rows='Always', allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO testAggregatingMT (*) VALUES ('d1', 1, 1);
OPTIMIZE TABLE testAggregatingMT FINAL CLEANUP; -- { serverError CANNOT_ASSIGN_OPTIMIZE }
OPTIMIZE TABLE testAggregatingMT FINAL;
SELECT * FROM testAggregatingMT order by uid;
CREATE TABLE testCollapsingMT (uid String, version UInt32, is_deleted UInt8, sign Int8) ENGINE = CollapsingMergeTree(sign) Order by (uid) SETTINGS clean_deleted_rows='Always';
CREATE TABLE testCollapsingMT (uid String, version UInt32, is_deleted UInt8, sign Int8) ENGINE = CollapsingMergeTree(sign) Order by (uid) SETTINGS clean_deleted_rows='Always', allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO testCollapsingMT (*) VALUES ('d1', 1, 1, 1);
OPTIMIZE TABLE testCollapsingMT FINAL CLEANUP; -- { serverError CANNOT_ASSIGN_OPTIMIZE }
OPTIMIZE TABLE testCollapsingMT FINAL;
SELECT * FROM testCollapsingMT order by uid;
CREATE TABLE testVersionedCMT (uid String, version UInt32, is_deleted UInt8, sign Int8) ENGINE = VersionedCollapsingMergeTree(sign, version) Order by (uid) SETTINGS clean_deleted_rows='Always';
CREATE TABLE testVersionedCMT (uid String, version UInt32, is_deleted UInt8, sign Int8) ENGINE = VersionedCollapsingMergeTree(sign, version) Order by (uid) SETTINGS clean_deleted_rows='Always', allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO testVersionedCMT (*) VALUES ('d1', 1, 1, 1);
OPTIMIZE TABLE testVersionedCMT FINAL CLEANUP; -- { serverError CANNOT_ASSIGN_OPTIMIZE }
OPTIMIZE TABLE testVersionedCMT FINAL;

View File

@ -2,7 +2,8 @@ DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid) SETTINGS vertical_merge_algorithm_min_rows_to_activate = 1,
vertical_merge_algorithm_min_columns_to_activate = 0,
min_rows_for_wide_part = 1,
min_bytes_for_wide_part = 1;
min_bytes_for_wide_part = 1,
allow_experimental_replacing_merge_with_cleanup=1;
-- Expect d6 to be version=3 is_deleted=false
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 3, 0);