From 1780671443c6863cdb5fbbd230443efe7edbf1b2 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Fri, 15 Dec 2023 15:23:23 +0100 Subject: [PATCH] ClickHouse Cloud promotion (#57638) Co-authored-by: Alexander Sapin --- docker/test/upgrade/run.sh | 2 + src/Core/Settings.cpp | 3 +- src/Core/Settings.h | 10 ++++ src/Storages/MergeTree/ActiveDataPartSet.cpp | 32 ++++++++++ src/Storages/MergeTree/ActiveDataPartSet.h | 7 ++- .../MergeTree/BackgroundJobsAssignee.cpp | 10 ++-- .../MergeTree/BackgroundJobsAssignee.h | 4 +- .../MergeTree/DataPartStorageOnDiskFull.cpp | 17 ++++++ .../MergeTree/DataPartStorageOnDiskFull.h | 1 + src/Storages/MergeTree/IDataPartStorage.h | 2 + src/Storages/MergeTree/IMergeTreeReader.h | 8 +-- src/Storages/MergeTree/MergeTask.cpp | 8 +-- src/Storages/MergeTree/MergeTask.h | 7 +++ src/Storages/MergeTree/MergeTreeData.cpp | 19 ++++++ src/Storages/MergeTree/MergeTreeSettings.h | 20 ++++++- src/Storages/MergeTree/MutateTask.cpp | 58 +++++++++++++------ src/Storages/StorageMergeTree.cpp | 9 ++- src/Storages/StorageReplicatedMergeTree.cpp | 39 +++++++++++-- .../zero_copy_destructive_operations.xml | 7 +++ tests/config/install.sh | 1 + .../configs/config.d/storage_conf.xml | 4 ++ .../configs/config.d/s3.xml | 3 + ...1417_freeze_partition_verbose_zookeeper.sh | 2 +- 23 files changed, 232 insertions(+), 41 deletions(-) create mode 100644 tests/config/config.d/zero_copy_destructive_operations.xml diff --git a/docker/test/upgrade/run.sh b/docker/test/upgrade/run.sh index 158ac19229e..f014fce49f6 100644 --- a/docker/test/upgrade/run.sh +++ b/docker/test/upgrade/run.sh @@ -77,6 +77,7 @@ remove_keeper_config "create_if_not_exists" "[01]" # it contains some new settings, but we can safely remove it rm /etc/clickhouse-server/config.d/merge_tree.xml rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml +rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml rm /etc/clickhouse-server/users.d/nonconst_timezone.xml rm /etc/clickhouse-server/users.d/s3_cache_new.xml rm /etc/clickhouse-server/users.d/replicated_ddl_entry.xml @@ -115,6 +116,7 @@ sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_defau # it contains some new settings, but we can safely remove it rm /etc/clickhouse-server/config.d/merge_tree.xml rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml +rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml rm /etc/clickhouse-server/users.d/nonconst_timezone.xml rm /etc/clickhouse-server/users.d/s3_cache_new.xml rm /etc/clickhouse-server/users.d/replicated_ddl_entry.xml diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 33db7660abd..5e5194eeb68 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -90,7 +90,8 @@ void Settings::checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfigura for (const auto & setting : settings.all()) { const auto & name = setting.getName(); - if (config.has(name) && !setting.isObsolete()) + bool should_skip_check = name == "max_table_size_to_drop" || name == "max_partition_size_to_drop"; + if (config.has(name) && !setting.isObsolete() && !should_skip_check) { throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "A setting '{}' appeared at top level in config {}." " But it is user-level setting that should be located in users.xml inside section for specific profile." diff --git a/src/Core/Settings.h b/src/Core/Settings.h index fae2e3ebf89..69efedf5d3e 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -527,6 +527,8 @@ class IColumn; M(Int64, max_partitions_to_read, -1, "Limit the max number of partitions that can be accessed in one query. <= 0 means unlimited.", 0) \ M(Bool, check_query_single_value_result, true, "Return check query result as single 1/0 value", 0) \ M(Bool, allow_drop_detached, false, "Allow ALTER TABLE ... DROP DETACHED PART[ITION] ... queries", 0) \ + M(UInt64, max_table_size_to_drop, 0, "Only available in ClickHouse Cloud", 0) \ + M(UInt64, max_partition_size_to_drop, 0, "Only available in ClickHouse Cloud", 0) \ \ M(UInt64, postgresql_connection_pool_size, 16, "Connection pool size for PostgreSQL table engine and database engine.", 0) \ M(UInt64, postgresql_connection_pool_wait_timeout, 5000, "Connection pool push/pop timeout on empty pool for PostgreSQL table engine and database engine. By default it will block on empty pool.", 0) \ @@ -620,6 +622,7 @@ class IColumn; M(Bool, describe_include_subcolumns, false, "If true, subcolumns of all table columns will be included into result of DESCRIBE query", 0) \ M(Bool, describe_include_virtual_columns, false, "If true, virtual columns of table will be included into result of DESCRIBE query", 0) \ M(Bool, describe_compact_output, false, "If true, include only column names and types into result of DESCRIBE query", 0) \ + M(Bool, apply_mutations_on_fly, false, "Only available in ClickHouse Cloud", 0) \ M(Bool, mutations_execute_nondeterministic_on_initiator, false, "If true nondeterministic function are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \ M(Bool, mutations_execute_subqueries_on_initiator, false, "If true scalar subqueries are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \ M(UInt64, mutations_max_literal_size_to_replace, 16384, "The maximum size of serialized literal in bytes to replace in UPDATE and DELETE queries", 0) \ @@ -671,6 +674,8 @@ class IColumn; M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \ M(Bool, database_replicated_allow_only_replicated_engine, false, "Allow to create only Replicated tables in database with engine Replicated", 0) \ M(Bool, database_replicated_allow_replicated_engine_arguments, true, "Allow to create only Replicated tables in database with engine Replicated with explicit arguments", 0) \ + M(Bool, cloud_mode, false, "Only available in ClickHouse Cloud", 0) \ + M(UInt64, cloud_mode_engine, 1, "Only available in ClickHouse Cloud", 0) \ M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result, one of: 'none', 'throw', 'null_status_on_timeout', 'never_throw'", 0) \ M(UInt64, distributed_ddl_entry_format_version, 5, "Compatibility version of distributed DDL (ON CLUSTER) queries", 0) \ \ @@ -724,6 +729,7 @@ class IColumn; M(UInt64, merge_tree_min_bytes_per_task_for_remote_reading, 4 * DBMS_DEFAULT_BUFFER_SIZE, "Min bytes to read per task.", 0) \ M(Bool, merge_tree_use_const_size_tasks_for_remote_reading, true, "Whether to use constant size tasks for reading from a remote table.", 0) \ M(Bool, merge_tree_determine_task_size_by_prewhere_columns, true, "Whether to use only prewhere columns size to determine reading task size.", 0) \ + M(UInt64, merge_tree_compact_parts_min_granules_to_multibuffer_read, 16, "Only available in ClickHouse Cloud", 0) \ \ M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \ M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \ @@ -835,6 +841,10 @@ class IColumn; M(Bool, print_pretty_type_names, false, "Print pretty type names in DESCRIBE query and toTypeName() function", 0) \ M(Bool, create_table_empty_primary_key_by_default, false, "Allow to create *MergeTree tables with empty primary key when ORDER BY and PRIMARY KEY not specified", 0) \ M(Bool, allow_named_collection_override_by_default, true, "Allow named collections' fields override by default.", 0)\ + M(Bool, allow_experimental_shared_merge_tree, false, "Only available in ClickHouse Cloud", 0) \ + M(UInt64, cache_warmer_threads, 4, "Only available in ClickHouse Cloud", 0) \ + M(Int64, ignore_cold_parts_seconds, 0, "Only available in ClickHouse Cloud", 0) \ + M(Int64, prefer_warmed_unmerged_parts_seconds, 0, "Only available in ClickHouse Cloud", 0) \ // End of COMMON_SETTINGS // Please add settings related to formats into the FORMAT_FACTORY_SETTINGS, move obsolete settings to OBSOLETE_SETTINGS and obsolete format settings to OBSOLETE_FORMAT_SETTINGS. diff --git a/src/Storages/MergeTree/ActiveDataPartSet.cpp b/src/Storages/MergeTree/ActiveDataPartSet.cpp index 89e01008945..ba95da37b29 100644 --- a/src/Storages/MergeTree/ActiveDataPartSet.cpp +++ b/src/Storages/MergeTree/ActiveDataPartSet.cpp @@ -39,6 +39,38 @@ bool ActiveDataPartSet::add(const MergeTreePartInfo & part_info, const String & return outcome == AddPartOutcome::Added; } +void ActiveDataPartSet::checkIntersectingParts(const MergeTreePartInfo & part_info) const +{ + auto it = part_info_to_name.lower_bound(part_info); + /// Let's go left. + while (it != part_info_to_name.begin()) + { + --it; + if (!part_info.contains(it->first)) + { + if (!part_info.isDisjoint(it->first)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects previous part {}. It is a bug or a result of manual intervention in the ZooKeeper data.", part_info.getPartNameForLogs(), it->first.getPartNameForLogs()); + ++it; + break; + } + } + /// Let's go to the right. + while (it != part_info_to_name.end() && part_info.contains(it->first)) + { + assert(part_info != it->first); + ++it; + } + + if (it != part_info_to_name.end() && !part_info.isDisjoint(it->first)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects next part {}. It is a bug or a result of manual intervention in the ZooKeeper data.", part_info.getPartNameForLogs(), it->first.getPartNameForLogs()); + +} + +void ActiveDataPartSet::checkIntersectingParts(const String & name) const +{ + auto part_info = MergeTreePartInfo::fromPartName(name, format_version); + checkIntersectingParts(part_info); +} bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts) { diff --git a/src/Storages/MergeTree/ActiveDataPartSet.h b/src/Storages/MergeTree/ActiveDataPartSet.h index 98a06e02ae8..3c644c89b8c 100644 --- a/src/Storages/MergeTree/ActiveDataPartSet.h +++ b/src/Storages/MergeTree/ActiveDataPartSet.h @@ -106,11 +106,16 @@ public: MergeTreeDataFormatVersion getFormatVersion() const { return format_version; } + void checkIntersectingParts(const MergeTreePartInfo & part_info) const; + void checkIntersectingParts(const String & name) const; + private: AddPartOutcome addImpl(const MergeTreePartInfo & part_info, const String & name, Strings * out_replaced_parts = nullptr, String * out_reason = nullptr); MergeTreeDataFormatVersion format_version; - std::map part_info_to_name; + + using PartInfoToName = std::map; + PartInfoToName part_info_to_name; std::vector::const_iterator> getPartsCoveredByImpl(const MergeTreePartInfo & part_info) const; diff --git a/src/Storages/MergeTree/BackgroundJobsAssignee.cpp b/src/Storages/MergeTree/BackgroundJobsAssignee.cpp index 32714b3b07f..56a4378cf9a 100644 --- a/src/Storages/MergeTree/BackgroundJobsAssignee.cpp +++ b/src/Storages/MergeTree/BackgroundJobsAssignee.cpp @@ -60,10 +60,11 @@ bool BackgroundJobsAssignee::scheduleMergeMutateTask(ExecutableTaskPtr merge_tas } -void BackgroundJobsAssignee::scheduleFetchTask(ExecutableTaskPtr fetch_task) +bool BackgroundJobsAssignee::scheduleFetchTask(ExecutableTaskPtr fetch_task) { bool res = getContext()->getFetchesExecutor()->trySchedule(fetch_task); res ? trigger() : postpone(); + return res; } @@ -75,10 +76,11 @@ bool BackgroundJobsAssignee::scheduleMoveTask(ExecutableTaskPtr move_task) } -void BackgroundJobsAssignee::scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger) +bool BackgroundJobsAssignee::scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger) { - bool res = getContext()->getCommonExecutor()->trySchedule(common_task) && need_trigger; - res ? trigger() : postpone(); + bool schedule_res = getContext()->getCommonExecutor()->trySchedule(common_task); + schedule_res && need_trigger ? trigger() : postpone(); + return schedule_res; } diff --git a/src/Storages/MergeTree/BackgroundJobsAssignee.h b/src/Storages/MergeTree/BackgroundJobsAssignee.h index f1c7eadf5f7..65fefce0917 100644 --- a/src/Storages/MergeTree/BackgroundJobsAssignee.h +++ b/src/Storages/MergeTree/BackgroundJobsAssignee.h @@ -66,9 +66,9 @@ public: void finish(); bool scheduleMergeMutateTask(ExecutableTaskPtr merge_task); - void scheduleFetchTask(ExecutableTaskPtr fetch_task); + bool scheduleFetchTask(ExecutableTaskPtr fetch_task); bool scheduleMoveTask(ExecutableTaskPtr move_task); - void scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger); + bool scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger); /// Just call finish ~BackgroundJobsAssignee(); diff --git a/src/Storages/MergeTree/DataPartStorageOnDiskFull.cpp b/src/Storages/MergeTree/DataPartStorageOnDiskFull.cpp index 20b6c5a919e..94f636423cc 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDiskFull.cpp +++ b/src/Storages/MergeTree/DataPartStorageOnDiskFull.cpp @@ -184,6 +184,23 @@ void DataPartStorageOnDiskFull::createHardLinkFrom(const IDataPartStorage & sour }); } +void DataPartStorageOnDiskFull::copyFileFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) +{ + const auto * source_on_disk = typeid_cast(&source); + if (!source_on_disk) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cannot create copy file from different storage. Expected DataPartStorageOnDiskFull, got {}", + typeid(source).name()); + + /// Copying files between different disks is + /// not supported in disk transactions. + source_on_disk->getDisk()->copyFile( + fs::path(source_on_disk->getRelativePath()) / from, + *volume->getDisk(), + fs::path(root_path) / part_dir / to); +} + void DataPartStorageOnDiskFull::createProjection(const std::string & name) { executeWriteOperation([&](auto & disk) { disk.createDirectory(fs::path(root_path) / part_dir / name); }); diff --git a/src/Storages/MergeTree/DataPartStorageOnDiskFull.h b/src/Storages/MergeTree/DataPartStorageOnDiskFull.h index 5d70404fcfa..15c6d42c721 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDiskFull.h +++ b/src/Storages/MergeTree/DataPartStorageOnDiskFull.h @@ -48,6 +48,7 @@ public: void removeFileIfExists(const String & name) override; void createHardLinkFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) override; + void copyFileFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) override; void beginTransaction() override; void commitTransaction() override; diff --git a/src/Storages/MergeTree/IDataPartStorage.h b/src/Storages/MergeTree/IDataPartStorage.h index 59251e40626..afbe91a8a6d 100644 --- a/src/Storages/MergeTree/IDataPartStorage.h +++ b/src/Storages/MergeTree/IDataPartStorage.h @@ -12,6 +12,7 @@ #include #include #include +#include namespace DB { @@ -304,6 +305,7 @@ public: virtual SyncGuardPtr getDirectorySyncGuard() const { return nullptr; } virtual void createHardLinkFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) = 0; + virtual void copyFileFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) = 0; /// Rename part. /// Ideally, new_root_path should be the same as current root (but it is not true). diff --git a/src/Storages/MergeTree/IMergeTreeReader.h b/src/Storages/MergeTree/IMergeTreeReader.h index 02faebf4b41..f3ea0c6c361 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.h +++ b/src/Storages/MergeTree/IMergeTreeReader.h @@ -86,13 +86,13 @@ protected: /// Actual serialization of columns in part. Serializations serializations; - UncompressedCache * uncompressed_cache; - MarkCache * mark_cache; + UncompressedCache * const uncompressed_cache; + MarkCache * const mark_cache; MergeTreeReaderSettings settings; - StorageSnapshotPtr storage_snapshot; - MarkRanges all_mark_ranges; + const StorageSnapshotPtr storage_snapshot; + const MarkRanges all_mark_ranges; /// Position and level (of nesting). using ColumnNameLevel = std::optional>; diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 2d1191033c7..822b3ae72a5 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -165,7 +165,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() std::optional builder; if (global_ctx->parent_part) { - auto data_part_storage = global_ctx->parent_part->getDataPartStorage().getProjection(local_tmp_part_basename); + auto data_part_storage = global_ctx->parent_part->getDataPartStorage().getProjection(local_tmp_part_basename, /* use parent transaction */ false); builder.emplace(*global_ctx->data, global_ctx->future_part->name, data_part_storage); builder->withParentPart(global_ctx->parent_part); } @@ -185,11 +185,11 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() if (data_part_storage->exists()) throw Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS, "Directory {} already exists", data_part_storage->getFullPath()); + data_part_storage->beginTransaction(); + /// Background temp dirs cleaner will not touch tmp projection directory because + /// it's located inside part's directory if (!global_ctx->parent_part) - { - data_part_storage->beginTransaction(); global_ctx->temporary_directory_lock = global_ctx->data->getTemporaryPartDirectoryHolder(local_tmp_part_basename); - } global_ctx->all_column_names = global_ctx->metadata_snapshot->getColumns().getNamesOfPhysical(); global_ctx->storage_columns = global_ctx->metadata_snapshot->getColumns().getAllPhysical(); diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index 268d5f86cef..8a96ceb8c40 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -113,6 +113,13 @@ public: return global_ctx->promise.get_future(); } + MergeTreeData::MutableDataPartPtr getUnfinishedPart() + { + if (global_ctx) + return global_ctx->new_data_part; + return nullptr; + } + bool execute(); private: diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 8445c513372..d0c22e4e2b5 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -2080,6 +2080,9 @@ size_t MergeTreeData::clearOldTemporaryDirectories(const String & root_path, siz if (disk->isBroken()) continue; + if (!disk->exists(root_path)) + continue; + for (auto it = disk->iterateDirectory(root_path); it->isValid(); it->next()) { const std::string & basename = it->name(); @@ -7284,6 +7287,8 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher( const String & with_name, ContextPtr local_context) { + auto settings = getSettings(); + String clickhouse_path = fs::canonical(local_context->getPath()); String default_shadow_path = fs::path(clickhouse_path) / "shadow/"; fs::create_directories(default_shadow_path); @@ -7294,6 +7299,20 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher( /// Acquire a snapshot of active data parts to prevent removing while doing backup. const auto data_parts = getVisibleDataPartsVector(local_context); + bool has_zero_copy_part = false; + for (const auto & part : data_parts) + { + if (part->isStoredOnRemoteDiskWithZeroCopySupport()) + { + has_zero_copy_part = true; + break; + } + } + + if (supportsReplication() && settings->disable_freeze_partition_for_zero_copy_replication + && settings->allow_remote_fs_zero_copy_replication && has_zero_copy_part) + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FREEZE PARTITION queries are disabled."); + String backup_name = (!with_name.empty() ? escapeForFileName(with_name) : toString(increment)); String backup_path = fs::path(shadow_path) / backup_name / ""; diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index b921458feab..2a73668715d 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -36,6 +36,12 @@ struct Settings; M(Float, ratio_of_defaults_for_sparse_serialization, 0.9375f, "Minimal ratio of number of default values to number of all values in column to store it in sparse serializations. If >= 1, columns will be always written in full serialization.", 0) \ M(Bool, replace_long_file_name_to_hash, false, "If the file name for column is too long (more than 'max_file_name_length' bytes) replace it to SipHash128", 0) \ M(UInt64, max_file_name_length, 127, "The maximal length of the file name to keep it as is without hashing", 0) \ + M(UInt64, min_bytes_for_full_part_storage, 0, "Only available in ClickHouse Cloud", 0) \ + M(UInt64, min_rows_for_full_part_storage, 0, "Only available in ClickHouse Cloud", 0) \ + M(UInt64, compact_parts_max_bytes_to_buffer, 128 * 1024 * 1024, "Only available in ClickHouse Cloud", 0) \ + M(UInt64, compact_parts_max_granules_to_buffer, 128, "Only available in ClickHouse Cloud", 0) \ + M(UInt64, compact_parts_merge_max_bytes_to_prefetch_part, 16 * 1024 * 1024, "Only available in ClickHouse Cloud", 0) \ + \ /** Merge settings. */ \ M(UInt64, merge_max_block_size, 8192, "How many rows in blocks should be formed for merge operations. By default has the same value as `index_granularity`.", 0) \ M(UInt64, merge_max_block_size_bytes, 10 * 1024 * 1024, "How many bytes in blocks should be formed for merge operations. By default has the same value as `index_granularity_bytes`.", 0) \ @@ -124,6 +130,9 @@ struct Settings; M(Milliseconds, wait_for_unique_parts_send_before_shutdown_ms, 0, "Before shutdown table will wait for required amount time for unique parts (exist only on current replica) to be fetched by other replicas (0 means disabled).", 0) \ M(Float, fault_probability_before_part_commit, 0, "For testing. Do not change it.", 0) \ M(Float, fault_probability_after_part_commit, 0, "For testing. Do not change it.", 0) \ + M(Bool, shared_merge_tree_disable_merges_and_mutations_assignment, false, "Only available in ClickHouse Cloud", 0) \ + M(Float, shared_merge_tree_partitions_hint_ratio_to_reload_merge_pred_for_mutations, 0.5, "Only available in ClickHouse Cloud", 0) \ + M(UInt64, shared_merge_tree_parts_load_batch_size, 32, "Only available in ClickHouse Cloud", 0) \ \ /** Check delay of replicas settings. */ \ M(UInt64, min_relative_delay_to_measure, 120, "Calculate relative replica delay only if absolute delay is not less that this value.", 0) \ @@ -131,6 +140,10 @@ struct Settings; M(UInt64, max_cleanup_delay_period, 300, "Maximum period to clean old queue logs, blocks hashes and parts.", 0) \ M(UInt64, cleanup_delay_period_random_add, 10, "Add uniformly distributed value from 0 to x seconds to cleanup_delay_period to avoid thundering herd effect and subsequent DoS of ZooKeeper in case of very large number of tables.", 0) \ M(UInt64, cleanup_thread_preferred_points_per_iteration, 150, "Preferred batch size for background cleanup (points are abstract but 1 point is approximately equivalent to 1 inserted block).", 0) \ + M(UInt64, cleanup_threads, 128, "Only available in ClickHouse Cloud", 0) \ + M(UInt64, kill_delay_period, 30, "Only available in ClickHouse Cloud", 0) \ + M(UInt64, kill_delay_period_random_add, 10, "Only available in ClickHouse Cloud", 0) \ + M(UInt64, kill_threads, 128, "Only available in ClickHouse Cloud", 0) \ M(UInt64, min_relative_delay_to_close, 300, "Minimal delay from other replicas to close, stop serving requests and not return Ok during status check.", 0) \ M(UInt64, min_absolute_delay_to_close, 0, "Minimal absolute delay to close, stop serving requests and not return Ok during status check.", 0) \ M(UInt64, enable_vertical_merge_algorithm, 1, "Enable usage of Vertical merge algorithm.", 0) \ @@ -170,13 +183,18 @@ struct Settings; M(UInt64, zero_copy_merge_mutation_min_parts_size_sleep_before_lock, 1ULL * 1024 * 1024 * 1024, "If zero copy replication is enabled sleep random amount of time before trying to lock depending on parts size for merge or mutation", 0) \ M(Bool, allow_floating_point_partition_key, false, "Allow floating point as partition key", 0) \ M(UInt64, sleep_before_loading_outdated_parts_ms, 0, "For testing. Do not change it.", 0) \ + M(Bool, always_use_copy_instead_of_hardlinks, false, "Always copy data instead of hardlinking during mutations/replaces/detaches and so on.", 0) \ + M(Bool, disable_freeze_partition_for_zero_copy_replication, true, "Disable FREEZE PARTITION query for zero copy replication.", 0) \ + M(Bool, disable_detach_partition_for_zero_copy_replication, true, "Disable DETACH PARTITION query for zero copy replication.", 0) \ + M(Bool, disable_fetch_partition_for_zero_copy_replication, true, "Disable FETCH PARTITION query for zero copy replication.", 0) \ \ /** Experimental/work in progress feature. Unsafe for production. */ \ M(UInt64, part_moves_between_shards_enable, 0, "Experimental/Incomplete feature to move parts between shards. Does not take into account sharding expressions.", 0) \ M(UInt64, part_moves_between_shards_delay_seconds, 30, "Time to wait before/after moving parts between shards.", 0) \ M(Bool, allow_remote_fs_zero_copy_replication, false, "Don't use this setting in production, because it is not ready.", 0) \ M(String, remote_fs_zero_copy_zookeeper_path, "/clickhouse/zero_copy", "ZooKeeper path for zero-copy table-independent info.", 0) \ - M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \ + M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \ + M(Bool, cache_populated_by_fetch, false, "Only available in ClickHouse Cloud", 0) \ M(Bool, allow_experimental_block_number_column, false, "Enable persisting column _block_number for each row.", 0) \ \ /** Compress marks and primary key. */ \ diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 214a7ea56a1..ab4fb7efd88 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1357,6 +1357,7 @@ private: NameSet removed_stats; /// A stat file need to be renamed iff the column is renamed. NameToNameMap renamed_stats; + for (const auto & command : ctx->for_file_renames) { if (command.type == MutationCommand::DROP_INDEX) @@ -1652,6 +1653,8 @@ private: ctx->new_data_part->version.setCreationTID(tid, nullptr); ctx->new_data_part->storeVersionMetadata(); + auto settings = ctx->source_part->storage.getSettings(); + NameSet hardlinked_files; /// NOTE: Renames must be done in order @@ -1691,9 +1694,18 @@ private: if (it->isFile()) { - ctx->new_data_part->getDataPartStorage().createHardLinkFrom( - ctx->source_part->getDataPartStorage(), file_name, destination); - hardlinked_files.insert(file_name); + if (settings->always_use_copy_instead_of_hardlinks) + { + ctx->new_data_part->getDataPartStorage().copyFileFrom( + ctx->source_part->getDataPartStorage(), it->name(), destination); + } + else + { + ctx->new_data_part->getDataPartStorage().createHardLinkFrom( + ctx->source_part->getDataPartStorage(), it->name(), destination); + + hardlinked_files.insert(it->name()); + } } else if (!endsWith(it->name(), ".tmp_proj")) // ignore projection tmp merge dir { @@ -1705,11 +1717,20 @@ private: for (auto p_it = projection_data_part_storage_src->iterate(); p_it->isValid(); p_it->next()) { - auto file_name_with_projection_prefix = fs::path(projection_data_part_storage_src->getPartDirectory()) / p_it->name(); - projection_data_part_storage_dst->createHardLinkFrom( - *projection_data_part_storage_src, p_it->name(), p_it->name()); + if (settings->always_use_copy_instead_of_hardlinks) + { + projection_data_part_storage_dst->copyFileFrom( + *projection_data_part_storage_src, p_it->name(), p_it->name()); + } + else + { + auto file_name_with_projection_prefix = fs::path(projection_data_part_storage_src->getPartDirectory()) / p_it->name(); - hardlinked_files.insert(file_name_with_projection_prefix); + projection_data_part_storage_dst->createHardLinkFrom( + *projection_data_part_storage_src, p_it->name(), p_it->name()); + + hardlinked_files.insert(file_name_with_projection_prefix); + } } } } @@ -1973,19 +1994,20 @@ bool MutateTask::prepare() IDataPartStorage::ClonePartParams clone_params { .txn = ctx->txn, .hardlinked_files = &ctx->hardlinked_files, - .files_to_copy_instead_of_hardlinks = std::move(files_to_copy_instead_of_hardlinks), .keep_metadata_version = true + .copy_instead_of_hardlink = settings_ptr->always_use_copy_instead_of_hardlinks, + .files_to_copy_instead_of_hardlinks = std::move(files_to_copy_instead_of_hardlinks), + .keep_metadata_version = true, }; - auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk( - ctx->source_part, - prefix, - ctx->future_part->part_info, - ctx->metadata_snapshot, - clone_params, - ctx->context->getReadSettings(), - ctx->context->getWriteSettings()); - part->getDataPartStorage().beginTransaction(); + MergeTreeData::MutableDataPartPtr part; + scope_guard lock; + + { + std::tie(part, lock) = ctx->data->cloneAndLoadDataPartOnSameDisk( + ctx->source_part, prefix, ctx->future_part->part_info, ctx->metadata_snapshot, clone_params, ctx->context->getReadSettings(), ctx->context->getWriteSettings()); + part->getDataPartStorage().beginTransaction(); + ctx->temporary_directory_lock = std::move(lock); + } - ctx->temporary_directory_lock = std::move(lock); promise.set_value(std::move(part)); return false; } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index ac1880b75fb..ed2e9c6d3ff 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -8,6 +8,7 @@ #include #include #include +#include "Common/Exception.h" #include #include #include @@ -2143,7 +2144,12 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const Int64 temp_index = insert_increment.get(); MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level); - IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()}; + IDataPartStorage::ClonePartParams clone_params + { + .txn = local_context->getCurrentTransaction(), + .copy_instead_of_hardlink = getSettings()->always_use_copy_instead_of_hardlinks, + }; + auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk( src_part, TMP_PREFIX, @@ -2153,6 +2159,7 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const local_context->getReadSettings(), local_context->getWriteSettings() ); + dst_parts.emplace_back(std::move(dst_part)); dst_parts_locks.emplace_back(std::move(part_lock)); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 307870aaf4c..d0f7edf08ce 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -190,6 +190,7 @@ namespace ErrorCodes extern const int TOO_LARGE_DISTRIBUTED_DEPTH; extern const int TABLE_IS_DROPPED; extern const int CANNOT_BACKUP_TABLE; + extern const int SUPPORT_IS_DISABLED; } namespace ActionLocks @@ -2688,7 +2689,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(LogEntry & entry) IDataPartStorage::ClonePartParams clone_params { - .copy_instead_of_hardlink = (our_zero_copy_enabled || source_zero_copy_enabled) && part_desc->src_table_part->isStoredOnRemoteDiskWithZeroCopySupport(), + .copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || ((our_zero_copy_enabled || source_zero_copy_enabled) && part_desc->src_table_part->isStoredOnRemoteDiskWithZeroCopySupport()), .metadata_version_to_write = metadata_snapshot->getMetadataVersion() }; auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk( @@ -4777,7 +4778,12 @@ bool StorageReplicatedMergeTree::fetchPart( get_part = [&, part_to_clone]() { chassert(!is_zero_copy_part(part_to_clone)); - IDataPartStorage::ClonePartParams clone_params{ .keep_metadata_version = true }; + IDataPartStorage::ClonePartParams clone_params + { + .copy_instead_of_hardlink = getSettings()->always_use_copy_instead_of_hardlinks, + .keep_metadata_version = true, + }; + auto [cloned_part, lock] = cloneAndLoadDataPartOnSameDisk( part_to_clone, "tmp_clone_", @@ -4786,6 +4792,7 @@ bool StorageReplicatedMergeTree::fetchPart( clone_params, getContext()->getReadSettings(), getContext()->getWriteSettings()); + part_directory_lock = std::move(lock); return cloned_part; }; @@ -6363,6 +6370,18 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool de if (!is_leader) throw Exception(ErrorCodes::NOT_A_LEADER, "DROP PARTITION cannot be done on this replica because it is not a leader"); + auto settings = getSettings(); + + if (detach && settings->disable_detach_partition_for_zero_copy_replication + && settings->allow_remote_fs_zero_copy_replication) + { + for (const auto & disk : getDisks()) + { + if (disk->supportZeroCopyReplication()) + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "DETACH PARTITION queries are disabled."); + } + } + zkutil::ZooKeeperPtr zookeeper = getZooKeeperAndAssertNotReadonly(); const auto * partition_ast = partition->as(); @@ -7022,6 +7041,8 @@ void StorageReplicatedMergeTree::fetchPartition( bool fetch_part, ContextPtr query_context) { + auto settings = getSettings(); + Macros::MacroExpansionInfo info; info.expand_special_macros_only = false; info.table_id = getStorageID(); @@ -7032,6 +7053,16 @@ void StorageReplicatedMergeTree::fetchPartition( if (from.empty()) throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "ZooKeeper path should not be empty"); + if (settings->disable_fetch_partition_for_zero_copy_replication + && settings->allow_remote_fs_zero_copy_replication) + { + for (const auto & disk : getDisks()) + { + if (disk->supportZeroCopyReplication()) + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FETCH PARTITION queries are disabled."); + } + } + zkutil::ZooKeeperPtr zookeeper; if (from_zookeeper_name != default_zookeeper_name) zookeeper = getContext()->getAuxiliaryZooKeeper(from_zookeeper_name); @@ -7884,7 +7915,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( || dynamic_cast(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication; IDataPartStorage::ClonePartParams clone_params { - .copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(), + .copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()), .metadata_version_to_write = metadata_snapshot->getMetadataVersion() }; auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk( @@ -8132,7 +8163,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta IDataPartStorage::ClonePartParams clone_params { - .copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(), + .copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()), .metadata_version_to_write = dest_metadata_snapshot->getMetadataVersion() }; auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk( diff --git a/tests/config/config.d/zero_copy_destructive_operations.xml b/tests/config/config.d/zero_copy_destructive_operations.xml new file mode 100644 index 00000000000..20663137f7f --- /dev/null +++ b/tests/config/config.d/zero_copy_destructive_operations.xml @@ -0,0 +1,7 @@ + + + 0 + 0 + 0 + + diff --git a/tests/config/install.sh b/tests/config/install.sh index bcc8a8425d2..6046f05c922 100755 --- a/tests/config/install.sh +++ b/tests/config/install.sh @@ -63,6 +63,7 @@ ln -sf $SRC_PATH/config.d/enable_wait_for_shutdown_replicated_tables.xml $DEST_S ln -sf $SRC_PATH/config.d/backups.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/filesystem_caches_path.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/validate_tcp_client_information.xml $DEST_SERVER_PATH/config.d/ +ln -sf $SRC_PATH/config.d/zero_copy_destructive_operations.xml $DEST_SERVER_PATH/config.d/ # Not supported with fasttest. if [ "${DEST_SERVER_PATH}" = "/etc/clickhouse-server" ] diff --git a/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml b/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml index c12bdf064ce..7087c348072 100644 --- a/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml +++ b/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml @@ -156,6 +156,10 @@ 0 1.0 + 0 + 0 + 0 + 0 0 diff --git a/tests/integration/test_s3_zero_copy_replication/configs/config.d/s3.xml b/tests/integration/test_s3_zero_copy_replication/configs/config.d/s3.xml index 7cb7f50582c..8df9e8e8c26 100644 --- a/tests/integration/test_s3_zero_copy_replication/configs/config.d/s3.xml +++ b/tests/integration/test_s3_zero_copy_replication/configs/config.d/s3.xml @@ -69,6 +69,9 @@ 1024 1 + 0 + 0 + 0 true 1.0 diff --git a/tests/queries/0_stateless/01417_freeze_partition_verbose_zookeeper.sh b/tests/queries/0_stateless/01417_freeze_partition_verbose_zookeeper.sh index 1fd8a2b29c6..bf97a8e4f9d 100755 --- a/tests/queries/0_stateless/01417_freeze_partition_verbose_zookeeper.sh +++ b/tests/queries/0_stateless/01417_freeze_partition_verbose_zookeeper.sh @@ -13,7 +13,7 @@ FREEZE_OUT_STRUCTURE='backup_name String, backup_path String , part_backup_path # setup ${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS table_for_freeze_replicated SYNC;" -${CLICKHOUSE_CLIENT} --query "CREATE TABLE table_for_freeze_replicated (key UInt64, value String) ENGINE = ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table_for_freeze_replicated', '1') ORDER BY key PARTITION BY key % 10;" +${CLICKHOUSE_CLIENT} --query "CREATE TABLE table_for_freeze_replicated (key UInt64, value String) ENGINE = ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table_for_freeze_replicated', '1') ORDER BY key PARTITION BY key % 10 SETTINGS disable_freeze_partition_for_zero_copy_replication=0;" ${CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --query "INSERT INTO table_for_freeze_replicated SELECT number, toString(number) from numbers(10);" ${CLICKHOUSE_CLIENT} --query "ALTER TABLE table_for_freeze_replicated FREEZE WITH NAME 'test_01417' FORMAT TSVWithNames SETTINGS alter_partition_verbose_result = 1;" \