ClickHouse Cloud promotion (#57638)

Co-authored-by: Alexander Sapin <alesapin@gmail.com>
This commit is contained in:
Nikita Mikhaylov 2023-12-15 15:23:23 +01:00 committed by GitHub
parent 7844fcc196
commit 1780671443
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 232 additions and 41 deletions

View File

@ -77,6 +77,7 @@ remove_keeper_config "create_if_not_exists" "[01]"
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
rm /etc/clickhouse-server/users.d/replicated_ddl_entry.xml
@ -115,6 +116,7 @@ sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_defau
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
rm /etc/clickhouse-server/users.d/replicated_ddl_entry.xml

View File

@ -90,7 +90,8 @@ void Settings::checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfigura
for (const auto & setting : settings.all())
{
const auto & name = setting.getName();
if (config.has(name) && !setting.isObsolete())
bool should_skip_check = name == "max_table_size_to_drop" || name == "max_partition_size_to_drop";
if (config.has(name) && !setting.isObsolete() && !should_skip_check)
{
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "A setting '{}' appeared at top level in config {}."
" But it is user-level setting that should be located in users.xml inside <profiles> section for specific profile."

View File

@ -527,6 +527,8 @@ class IColumn;
M(Int64, max_partitions_to_read, -1, "Limit the max number of partitions that can be accessed in one query. <= 0 means unlimited.", 0) \
M(Bool, check_query_single_value_result, true, "Return check query result as single 1/0 value", 0) \
M(Bool, allow_drop_detached, false, "Allow ALTER TABLE ... DROP DETACHED PART[ITION] ... queries", 0) \
M(UInt64, max_table_size_to_drop, 0, "Only available in ClickHouse Cloud", 0) \
M(UInt64, max_partition_size_to_drop, 0, "Only available in ClickHouse Cloud", 0) \
\
M(UInt64, postgresql_connection_pool_size, 16, "Connection pool size for PostgreSQL table engine and database engine.", 0) \
M(UInt64, postgresql_connection_pool_wait_timeout, 5000, "Connection pool push/pop timeout on empty pool for PostgreSQL table engine and database engine. By default it will block on empty pool.", 0) \
@ -620,6 +622,7 @@ class IColumn;
M(Bool, describe_include_subcolumns, false, "If true, subcolumns of all table columns will be included into result of DESCRIBE query", 0) \
M(Bool, describe_include_virtual_columns, false, "If true, virtual columns of table will be included into result of DESCRIBE query", 0) \
M(Bool, describe_compact_output, false, "If true, include only column names and types into result of DESCRIBE query", 0) \
M(Bool, apply_mutations_on_fly, false, "Only available in ClickHouse Cloud", 0) \
M(Bool, mutations_execute_nondeterministic_on_initiator, false, "If true nondeterministic function are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \
M(Bool, mutations_execute_subqueries_on_initiator, false, "If true scalar subqueries are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \
M(UInt64, mutations_max_literal_size_to_replace, 16384, "The maximum size of serialized literal in bytes to replace in UPDATE and DELETE queries", 0) \
@ -671,6 +674,8 @@ class IColumn;
M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \
M(Bool, database_replicated_allow_only_replicated_engine, false, "Allow to create only Replicated tables in database with engine Replicated", 0) \
M(Bool, database_replicated_allow_replicated_engine_arguments, true, "Allow to create only Replicated tables in database with engine Replicated with explicit arguments", 0) \
M(Bool, cloud_mode, false, "Only available in ClickHouse Cloud", 0) \
M(UInt64, cloud_mode_engine, 1, "Only available in ClickHouse Cloud", 0) \
M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result, one of: 'none', 'throw', 'null_status_on_timeout', 'never_throw'", 0) \
M(UInt64, distributed_ddl_entry_format_version, 5, "Compatibility version of distributed DDL (ON CLUSTER) queries", 0) \
\
@ -724,6 +729,7 @@ class IColumn;
M(UInt64, merge_tree_min_bytes_per_task_for_remote_reading, 4 * DBMS_DEFAULT_BUFFER_SIZE, "Min bytes to read per task.", 0) \
M(Bool, merge_tree_use_const_size_tasks_for_remote_reading, true, "Whether to use constant size tasks for reading from a remote table.", 0) \
M(Bool, merge_tree_determine_task_size_by_prewhere_columns, true, "Whether to use only prewhere columns size to determine reading task size.", 0) \
M(UInt64, merge_tree_compact_parts_min_granules_to_multibuffer_read, 16, "Only available in ClickHouse Cloud", 0) \
\
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \
@ -835,6 +841,10 @@ class IColumn;
M(Bool, print_pretty_type_names, false, "Print pretty type names in DESCRIBE query and toTypeName() function", 0) \
M(Bool, create_table_empty_primary_key_by_default, false, "Allow to create *MergeTree tables with empty primary key when ORDER BY and PRIMARY KEY not specified", 0) \
M(Bool, allow_named_collection_override_by_default, true, "Allow named collections' fields override by default.", 0)\
M(Bool, allow_experimental_shared_merge_tree, false, "Only available in ClickHouse Cloud", 0) \
M(UInt64, cache_warmer_threads, 4, "Only available in ClickHouse Cloud", 0) \
M(Int64, ignore_cold_parts_seconds, 0, "Only available in ClickHouse Cloud", 0) \
M(Int64, prefer_warmed_unmerged_parts_seconds, 0, "Only available in ClickHouse Cloud", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS, move obsolete settings to OBSOLETE_SETTINGS and obsolete format settings to OBSOLETE_FORMAT_SETTINGS.

View File

@ -39,6 +39,38 @@ bool ActiveDataPartSet::add(const MergeTreePartInfo & part_info, const String &
return outcome == AddPartOutcome::Added;
}
void ActiveDataPartSet::checkIntersectingParts(const MergeTreePartInfo & part_info) const
{
auto it = part_info_to_name.lower_bound(part_info);
/// Let's go left.
while (it != part_info_to_name.begin())
{
--it;
if (!part_info.contains(it->first))
{
if (!part_info.isDisjoint(it->first))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects previous part {}. It is a bug or a result of manual intervention in the ZooKeeper data.", part_info.getPartNameForLogs(), it->first.getPartNameForLogs());
++it;
break;
}
}
/// Let's go to the right.
while (it != part_info_to_name.end() && part_info.contains(it->first))
{
assert(part_info != it->first);
++it;
}
if (it != part_info_to_name.end() && !part_info.isDisjoint(it->first))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects next part {}. It is a bug or a result of manual intervention in the ZooKeeper data.", part_info.getPartNameForLogs(), it->first.getPartNameForLogs());
}
void ActiveDataPartSet::checkIntersectingParts(const String & name) const
{
auto part_info = MergeTreePartInfo::fromPartName(name, format_version);
checkIntersectingParts(part_info);
}
bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
{

View File

@ -106,11 +106,16 @@ public:
MergeTreeDataFormatVersion getFormatVersion() const { return format_version; }
void checkIntersectingParts(const MergeTreePartInfo & part_info) const;
void checkIntersectingParts(const String & name) const;
private:
AddPartOutcome addImpl(const MergeTreePartInfo & part_info, const String & name, Strings * out_replaced_parts = nullptr, String * out_reason = nullptr);
MergeTreeDataFormatVersion format_version;
std::map<MergeTreePartInfo, String> part_info_to_name;
using PartInfoToName = std::map<MergeTreePartInfo, String>;
PartInfoToName part_info_to_name;
std::vector<std::map<MergeTreePartInfo, String>::const_iterator> getPartsCoveredByImpl(const MergeTreePartInfo & part_info) const;

View File

@ -60,10 +60,11 @@ bool BackgroundJobsAssignee::scheduleMergeMutateTask(ExecutableTaskPtr merge_tas
}
void BackgroundJobsAssignee::scheduleFetchTask(ExecutableTaskPtr fetch_task)
bool BackgroundJobsAssignee::scheduleFetchTask(ExecutableTaskPtr fetch_task)
{
bool res = getContext()->getFetchesExecutor()->trySchedule(fetch_task);
res ? trigger() : postpone();
return res;
}
@ -75,10 +76,11 @@ bool BackgroundJobsAssignee::scheduleMoveTask(ExecutableTaskPtr move_task)
}
void BackgroundJobsAssignee::scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger)
bool BackgroundJobsAssignee::scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger)
{
bool res = getContext()->getCommonExecutor()->trySchedule(common_task) && need_trigger;
res ? trigger() : postpone();
bool schedule_res = getContext()->getCommonExecutor()->trySchedule(common_task);
schedule_res && need_trigger ? trigger() : postpone();
return schedule_res;
}

View File

@ -66,9 +66,9 @@ public:
void finish();
bool scheduleMergeMutateTask(ExecutableTaskPtr merge_task);
void scheduleFetchTask(ExecutableTaskPtr fetch_task);
bool scheduleFetchTask(ExecutableTaskPtr fetch_task);
bool scheduleMoveTask(ExecutableTaskPtr move_task);
void scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger);
bool scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger);
/// Just call finish
~BackgroundJobsAssignee();

View File

@ -184,6 +184,23 @@ void DataPartStorageOnDiskFull::createHardLinkFrom(const IDataPartStorage & sour
});
}
void DataPartStorageOnDiskFull::copyFileFrom(const IDataPartStorage & source, const std::string & from, const std::string & to)
{
const auto * source_on_disk = typeid_cast<const DataPartStorageOnDiskFull *>(&source);
if (!source_on_disk)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create copy file from different storage. Expected DataPartStorageOnDiskFull, got {}",
typeid(source).name());
/// Copying files between different disks is
/// not supported in disk transactions.
source_on_disk->getDisk()->copyFile(
fs::path(source_on_disk->getRelativePath()) / from,
*volume->getDisk(),
fs::path(root_path) / part_dir / to);
}
void DataPartStorageOnDiskFull::createProjection(const std::string & name)
{
executeWriteOperation([&](auto & disk) { disk.createDirectory(fs::path(root_path) / part_dir / name); });

View File

@ -48,6 +48,7 @@ public:
void removeFileIfExists(const String & name) override;
void createHardLinkFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) override;
void copyFileFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) override;
void beginTransaction() override;
void commitTransaction() override;

View File

@ -12,6 +12,7 @@
#include <optional>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Disks/IDiskTransaction.h>
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
namespace DB
{
@ -304,6 +305,7 @@ public:
virtual SyncGuardPtr getDirectorySyncGuard() const { return nullptr; }
virtual void createHardLinkFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) = 0;
virtual void copyFileFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) = 0;
/// Rename part.
/// Ideally, new_root_path should be the same as current root (but it is not true).

View File

@ -86,13 +86,13 @@ protected:
/// Actual serialization of columns in part.
Serializations serializations;
UncompressedCache * uncompressed_cache;
MarkCache * mark_cache;
UncompressedCache * const uncompressed_cache;
MarkCache * const mark_cache;
MergeTreeReaderSettings settings;
StorageSnapshotPtr storage_snapshot;
MarkRanges all_mark_ranges;
const StorageSnapshotPtr storage_snapshot;
const MarkRanges all_mark_ranges;
/// Position and level (of nesting).
using ColumnNameLevel = std::optional<std::pair<String, size_t>>;

View File

@ -165,7 +165,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
std::optional<MergeTreeDataPartBuilder> builder;
if (global_ctx->parent_part)
{
auto data_part_storage = global_ctx->parent_part->getDataPartStorage().getProjection(local_tmp_part_basename);
auto data_part_storage = global_ctx->parent_part->getDataPartStorage().getProjection(local_tmp_part_basename, /* use parent transaction */ false);
builder.emplace(*global_ctx->data, global_ctx->future_part->name, data_part_storage);
builder->withParentPart(global_ctx->parent_part);
}
@ -185,11 +185,11 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
if (data_part_storage->exists())
throw Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS, "Directory {} already exists", data_part_storage->getFullPath());
data_part_storage->beginTransaction();
/// Background temp dirs cleaner will not touch tmp projection directory because
/// it's located inside part's directory
if (!global_ctx->parent_part)
{
data_part_storage->beginTransaction();
global_ctx->temporary_directory_lock = global_ctx->data->getTemporaryPartDirectoryHolder(local_tmp_part_basename);
}
global_ctx->all_column_names = global_ctx->metadata_snapshot->getColumns().getNamesOfPhysical();
global_ctx->storage_columns = global_ctx->metadata_snapshot->getColumns().getAllPhysical();

View File

@ -113,6 +113,13 @@ public:
return global_ctx->promise.get_future();
}
MergeTreeData::MutableDataPartPtr getUnfinishedPart()
{
if (global_ctx)
return global_ctx->new_data_part;
return nullptr;
}
bool execute();
private:

View File

@ -2080,6 +2080,9 @@ size_t MergeTreeData::clearOldTemporaryDirectories(const String & root_path, siz
if (disk->isBroken())
continue;
if (!disk->exists(root_path))
continue;
for (auto it = disk->iterateDirectory(root_path); it->isValid(); it->next())
{
const std::string & basename = it->name();
@ -7284,6 +7287,8 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
const String & with_name,
ContextPtr local_context)
{
auto settings = getSettings();
String clickhouse_path = fs::canonical(local_context->getPath());
String default_shadow_path = fs::path(clickhouse_path) / "shadow/";
fs::create_directories(default_shadow_path);
@ -7294,6 +7299,20 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
/// Acquire a snapshot of active data parts to prevent removing while doing backup.
const auto data_parts = getVisibleDataPartsVector(local_context);
bool has_zero_copy_part = false;
for (const auto & part : data_parts)
{
if (part->isStoredOnRemoteDiskWithZeroCopySupport())
{
has_zero_copy_part = true;
break;
}
}
if (supportsReplication() && settings->disable_freeze_partition_for_zero_copy_replication
&& settings->allow_remote_fs_zero_copy_replication && has_zero_copy_part)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FREEZE PARTITION queries are disabled.");
String backup_name = (!with_name.empty() ? escapeForFileName(with_name) : toString(increment));
String backup_path = fs::path(shadow_path) / backup_name / "";

View File

@ -36,6 +36,12 @@ struct Settings;
M(Float, ratio_of_defaults_for_sparse_serialization, 0.9375f, "Minimal ratio of number of default values to number of all values in column to store it in sparse serializations. If >= 1, columns will be always written in full serialization.", 0) \
M(Bool, replace_long_file_name_to_hash, false, "If the file name for column is too long (more than 'max_file_name_length' bytes) replace it to SipHash128", 0) \
M(UInt64, max_file_name_length, 127, "The maximal length of the file name to keep it as is without hashing", 0) \
M(UInt64, min_bytes_for_full_part_storage, 0, "Only available in ClickHouse Cloud", 0) \
M(UInt64, min_rows_for_full_part_storage, 0, "Only available in ClickHouse Cloud", 0) \
M(UInt64, compact_parts_max_bytes_to_buffer, 128 * 1024 * 1024, "Only available in ClickHouse Cloud", 0) \
M(UInt64, compact_parts_max_granules_to_buffer, 128, "Only available in ClickHouse Cloud", 0) \
M(UInt64, compact_parts_merge_max_bytes_to_prefetch_part, 16 * 1024 * 1024, "Only available in ClickHouse Cloud", 0) \
\
/** Merge settings. */ \
M(UInt64, merge_max_block_size, 8192, "How many rows in blocks should be formed for merge operations. By default has the same value as `index_granularity`.", 0) \
M(UInt64, merge_max_block_size_bytes, 10 * 1024 * 1024, "How many bytes in blocks should be formed for merge operations. By default has the same value as `index_granularity_bytes`.", 0) \
@ -124,6 +130,9 @@ struct Settings;
M(Milliseconds, wait_for_unique_parts_send_before_shutdown_ms, 0, "Before shutdown table will wait for required amount time for unique parts (exist only on current replica) to be fetched by other replicas (0 means disabled).", 0) \
M(Float, fault_probability_before_part_commit, 0, "For testing. Do not change it.", 0) \
M(Float, fault_probability_after_part_commit, 0, "For testing. Do not change it.", 0) \
M(Bool, shared_merge_tree_disable_merges_and_mutations_assignment, false, "Only available in ClickHouse Cloud", 0) \
M(Float, shared_merge_tree_partitions_hint_ratio_to_reload_merge_pred_for_mutations, 0.5, "Only available in ClickHouse Cloud", 0) \
M(UInt64, shared_merge_tree_parts_load_batch_size, 32, "Only available in ClickHouse Cloud", 0) \
\
/** Check delay of replicas settings. */ \
M(UInt64, min_relative_delay_to_measure, 120, "Calculate relative replica delay only if absolute delay is not less that this value.", 0) \
@ -131,6 +140,10 @@ struct Settings;
M(UInt64, max_cleanup_delay_period, 300, "Maximum period to clean old queue logs, blocks hashes and parts.", 0) \
M(UInt64, cleanup_delay_period_random_add, 10, "Add uniformly distributed value from 0 to x seconds to cleanup_delay_period to avoid thundering herd effect and subsequent DoS of ZooKeeper in case of very large number of tables.", 0) \
M(UInt64, cleanup_thread_preferred_points_per_iteration, 150, "Preferred batch size for background cleanup (points are abstract but 1 point is approximately equivalent to 1 inserted block).", 0) \
M(UInt64, cleanup_threads, 128, "Only available in ClickHouse Cloud", 0) \
M(UInt64, kill_delay_period, 30, "Only available in ClickHouse Cloud", 0) \
M(UInt64, kill_delay_period_random_add, 10, "Only available in ClickHouse Cloud", 0) \
M(UInt64, kill_threads, 128, "Only available in ClickHouse Cloud", 0) \
M(UInt64, min_relative_delay_to_close, 300, "Minimal delay from other replicas to close, stop serving requests and not return Ok during status check.", 0) \
M(UInt64, min_absolute_delay_to_close, 0, "Minimal absolute delay to close, stop serving requests and not return Ok during status check.", 0) \
M(UInt64, enable_vertical_merge_algorithm, 1, "Enable usage of Vertical merge algorithm.", 0) \
@ -170,13 +183,18 @@ struct Settings;
M(UInt64, zero_copy_merge_mutation_min_parts_size_sleep_before_lock, 1ULL * 1024 * 1024 * 1024, "If zero copy replication is enabled sleep random amount of time before trying to lock depending on parts size for merge or mutation", 0) \
M(Bool, allow_floating_point_partition_key, false, "Allow floating point as partition key", 0) \
M(UInt64, sleep_before_loading_outdated_parts_ms, 0, "For testing. Do not change it.", 0) \
M(Bool, always_use_copy_instead_of_hardlinks, false, "Always copy data instead of hardlinking during mutations/replaces/detaches and so on.", 0) \
M(Bool, disable_freeze_partition_for_zero_copy_replication, true, "Disable FREEZE PARTITION query for zero copy replication.", 0) \
M(Bool, disable_detach_partition_for_zero_copy_replication, true, "Disable DETACH PARTITION query for zero copy replication.", 0) \
M(Bool, disable_fetch_partition_for_zero_copy_replication, true, "Disable FETCH PARTITION query for zero copy replication.", 0) \
\
/** Experimental/work in progress feature. Unsafe for production. */ \
M(UInt64, part_moves_between_shards_enable, 0, "Experimental/Incomplete feature to move parts between shards. Does not take into account sharding expressions.", 0) \
M(UInt64, part_moves_between_shards_delay_seconds, 30, "Time to wait before/after moving parts between shards.", 0) \
M(Bool, allow_remote_fs_zero_copy_replication, false, "Don't use this setting in production, because it is not ready.", 0) \
M(String, remote_fs_zero_copy_zookeeper_path, "/clickhouse/zero_copy", "ZooKeeper path for zero-copy table-independent info.", 0) \
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \
M(Bool, cache_populated_by_fetch, false, "Only available in ClickHouse Cloud", 0) \
M(Bool, allow_experimental_block_number_column, false, "Enable persisting column _block_number for each row.", 0) \
\
/** Compress marks and primary key. */ \

View File

@ -1357,6 +1357,7 @@ private:
NameSet removed_stats;
/// A stat file need to be renamed iff the column is renamed.
NameToNameMap renamed_stats;
for (const auto & command : ctx->for_file_renames)
{
if (command.type == MutationCommand::DROP_INDEX)
@ -1652,6 +1653,8 @@ private:
ctx->new_data_part->version.setCreationTID(tid, nullptr);
ctx->new_data_part->storeVersionMetadata();
auto settings = ctx->source_part->storage.getSettings();
NameSet hardlinked_files;
/// NOTE: Renames must be done in order
@ -1691,9 +1694,18 @@ private:
if (it->isFile())
{
ctx->new_data_part->getDataPartStorage().createHardLinkFrom(
ctx->source_part->getDataPartStorage(), file_name, destination);
hardlinked_files.insert(file_name);
if (settings->always_use_copy_instead_of_hardlinks)
{
ctx->new_data_part->getDataPartStorage().copyFileFrom(
ctx->source_part->getDataPartStorage(), it->name(), destination);
}
else
{
ctx->new_data_part->getDataPartStorage().createHardLinkFrom(
ctx->source_part->getDataPartStorage(), it->name(), destination);
hardlinked_files.insert(it->name());
}
}
else if (!endsWith(it->name(), ".tmp_proj")) // ignore projection tmp merge dir
{
@ -1705,11 +1717,20 @@ private:
for (auto p_it = projection_data_part_storage_src->iterate(); p_it->isValid(); p_it->next())
{
auto file_name_with_projection_prefix = fs::path(projection_data_part_storage_src->getPartDirectory()) / p_it->name();
projection_data_part_storage_dst->createHardLinkFrom(
*projection_data_part_storage_src, p_it->name(), p_it->name());
if (settings->always_use_copy_instead_of_hardlinks)
{
projection_data_part_storage_dst->copyFileFrom(
*projection_data_part_storage_src, p_it->name(), p_it->name());
}
else
{
auto file_name_with_projection_prefix = fs::path(projection_data_part_storage_src->getPartDirectory()) / p_it->name();
hardlinked_files.insert(file_name_with_projection_prefix);
projection_data_part_storage_dst->createHardLinkFrom(
*projection_data_part_storage_src, p_it->name(), p_it->name());
hardlinked_files.insert(file_name_with_projection_prefix);
}
}
}
}
@ -1973,19 +1994,20 @@ bool MutateTask::prepare()
IDataPartStorage::ClonePartParams clone_params
{
.txn = ctx->txn, .hardlinked_files = &ctx->hardlinked_files,
.files_to_copy_instead_of_hardlinks = std::move(files_to_copy_instead_of_hardlinks), .keep_metadata_version = true
.copy_instead_of_hardlink = settings_ptr->always_use_copy_instead_of_hardlinks,
.files_to_copy_instead_of_hardlinks = std::move(files_to_copy_instead_of_hardlinks),
.keep_metadata_version = true,
};
auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(
ctx->source_part,
prefix,
ctx->future_part->part_info,
ctx->metadata_snapshot,
clone_params,
ctx->context->getReadSettings(),
ctx->context->getWriteSettings());
part->getDataPartStorage().beginTransaction();
MergeTreeData::MutableDataPartPtr part;
scope_guard lock;
{
std::tie(part, lock) = ctx->data->cloneAndLoadDataPartOnSameDisk(
ctx->source_part, prefix, ctx->future_part->part_info, ctx->metadata_snapshot, clone_params, ctx->context->getReadSettings(), ctx->context->getWriteSettings());
part->getDataPartStorage().beginTransaction();
ctx->temporary_directory_lock = std::move(lock);
}
ctx->temporary_directory_lock = std::move(lock);
promise.set_value(std::move(part));
return false;
}

View File

@ -8,6 +8,7 @@
#include <base/sort.h>
#include <Backups/BackupEntriesCollector.h>
#include <Databases/IDatabase.h>
#include "Common/Exception.h"
#include <Common/MemoryTracker.h>
#include <Common/escapeForFileName.h>
#include <Common/ProfileEventsScope.h>
@ -2143,7 +2144,12 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
Int64 temp_index = insert_increment.get();
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()};
IDataPartStorage::ClonePartParams clone_params
{
.txn = local_context->getCurrentTransaction(),
.copy_instead_of_hardlink = getSettings()->always_use_copy_instead_of_hardlinks,
};
auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(
src_part,
TMP_PREFIX,
@ -2153,6 +2159,7 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
local_context->getReadSettings(),
local_context->getWriteSettings()
);
dst_parts.emplace_back(std::move(dst_part));
dst_parts_locks.emplace_back(std::move(part_lock));
}

View File

@ -190,6 +190,7 @@ namespace ErrorCodes
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int TABLE_IS_DROPPED;
extern const int CANNOT_BACKUP_TABLE;
extern const int SUPPORT_IS_DISABLED;
}
namespace ActionLocks
@ -2688,7 +2689,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(LogEntry & entry)
IDataPartStorage::ClonePartParams clone_params
{
.copy_instead_of_hardlink = (our_zero_copy_enabled || source_zero_copy_enabled) && part_desc->src_table_part->isStoredOnRemoteDiskWithZeroCopySupport(),
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || ((our_zero_copy_enabled || source_zero_copy_enabled) && part_desc->src_table_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
};
auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk(
@ -4777,7 +4778,12 @@ bool StorageReplicatedMergeTree::fetchPart(
get_part = [&, part_to_clone]()
{
chassert(!is_zero_copy_part(part_to_clone));
IDataPartStorage::ClonePartParams clone_params{ .keep_metadata_version = true };
IDataPartStorage::ClonePartParams clone_params
{
.copy_instead_of_hardlink = getSettings()->always_use_copy_instead_of_hardlinks,
.keep_metadata_version = true,
};
auto [cloned_part, lock] = cloneAndLoadDataPartOnSameDisk(
part_to_clone,
"tmp_clone_",
@ -4786,6 +4792,7 @@ bool StorageReplicatedMergeTree::fetchPart(
clone_params,
getContext()->getReadSettings(),
getContext()->getWriteSettings());
part_directory_lock = std::move(lock);
return cloned_part;
};
@ -6363,6 +6370,18 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool de
if (!is_leader)
throw Exception(ErrorCodes::NOT_A_LEADER, "DROP PARTITION cannot be done on this replica because it is not a leader");
auto settings = getSettings();
if (detach && settings->disable_detach_partition_for_zero_copy_replication
&& settings->allow_remote_fs_zero_copy_replication)
{
for (const auto & disk : getDisks())
{
if (disk->supportZeroCopyReplication())
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "DETACH PARTITION queries are disabled.");
}
}
zkutil::ZooKeeperPtr zookeeper = getZooKeeperAndAssertNotReadonly();
const auto * partition_ast = partition->as<ASTPartition>();
@ -7022,6 +7041,8 @@ void StorageReplicatedMergeTree::fetchPartition(
bool fetch_part,
ContextPtr query_context)
{
auto settings = getSettings();
Macros::MacroExpansionInfo info;
info.expand_special_macros_only = false;
info.table_id = getStorageID();
@ -7032,6 +7053,16 @@ void StorageReplicatedMergeTree::fetchPartition(
if (from.empty())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "ZooKeeper path should not be empty");
if (settings->disable_fetch_partition_for_zero_copy_replication
&& settings->allow_remote_fs_zero_copy_replication)
{
for (const auto & disk : getDisks())
{
if (disk->supportZeroCopyReplication())
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FETCH PARTITION queries are disabled.");
}
}
zkutil::ZooKeeperPtr zookeeper;
if (from_zookeeper_name != default_zookeeper_name)
zookeeper = getContext()->getAuxiliaryZooKeeper(from_zookeeper_name);
@ -7884,7 +7915,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|| dynamic_cast<const MergeTreeData *>(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication;
IDataPartStorage::ClonePartParams clone_params
{
.copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(),
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
};
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(
@ -8132,7 +8163,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
IDataPartStorage::ClonePartParams clone_params
{
.copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(),
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.metadata_version_to_write = dest_metadata_snapshot->getMetadataVersion()
};
auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(

View File

@ -0,0 +1,7 @@
<clickhouse>
<merge_tree>
<disable_freeze_partition_for_zero_copy_replication>0</disable_freeze_partition_for_zero_copy_replication>
<disable_detach_partition_for_zero_copy_replication>0</disable_detach_partition_for_zero_copy_replication>
<disable_fetch_partition_for_zero_copy_replication>0</disable_fetch_partition_for_zero_copy_replication>
</merge_tree>
</clickhouse>

View File

@ -63,6 +63,7 @@ ln -sf $SRC_PATH/config.d/enable_wait_for_shutdown_replicated_tables.xml $DEST_S
ln -sf $SRC_PATH/config.d/backups.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/filesystem_caches_path.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/validate_tcp_client_information.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/zero_copy_destructive_operations.xml $DEST_SERVER_PATH/config.d/
# Not supported with fasttest.
if [ "${DEST_SERVER_PATH}" = "/etc/clickhouse-server" ]

View File

@ -156,6 +156,10 @@
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
<ratio_of_defaults_for_sparse_serialization>1.0</ratio_of_defaults_for_sparse_serialization>
<min_bytes_for_full_part_storage>0</min_bytes_for_full_part_storage>
<disable_freeze_partition_for_zero_copy_replication>0</disable_freeze_partition_for_zero_copy_replication>
<disable_detach_partition_for_zero_copy_replication>0</disable_detach_partition_for_zero_copy_replication>
<disable_fetch_partition_for_zero_copy_replication>0</disable_fetch_partition_for_zero_copy_replication>
</merge_tree>
<database_catalog_unused_dir_hide_timeout_sec>0</database_catalog_unused_dir_hide_timeout_sec>

View File

@ -69,6 +69,9 @@
<merge_tree>
<min_bytes_for_wide_part>1024</min_bytes_for_wide_part>
<old_parts_lifetime>1</old_parts_lifetime>
<disable_freeze_partition_for_zero_copy_replication>0</disable_freeze_partition_for_zero_copy_replication>
<disable_detach_partition_for_zero_copy_replication>0</disable_detach_partition_for_zero_copy_replication>
<disable_fetch_partition_for_zero_copy_replication>0</disable_fetch_partition_for_zero_copy_replication>
<allow_remote_fs_zero_copy_replication>true</allow_remote_fs_zero_copy_replication>
<ratio_of_defaults_for_sparse_serialization>1.0</ratio_of_defaults_for_sparse_serialization>
</merge_tree>

View File

@ -13,7 +13,7 @@ FREEZE_OUT_STRUCTURE='backup_name String, backup_path String , part_backup_path
# setup
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS table_for_freeze_replicated SYNC;"
${CLICKHOUSE_CLIENT} --query "CREATE TABLE table_for_freeze_replicated (key UInt64, value String) ENGINE = ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table_for_freeze_replicated', '1') ORDER BY key PARTITION BY key % 10;"
${CLICKHOUSE_CLIENT} --query "CREATE TABLE table_for_freeze_replicated (key UInt64, value String) ENGINE = ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table_for_freeze_replicated', '1') ORDER BY key PARTITION BY key % 10 SETTINGS disable_freeze_partition_for_zero_copy_replication=0;"
${CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --query "INSERT INTO table_for_freeze_replicated SELECT number, toString(number) from numbers(10);"
${CLICKHOUSE_CLIENT} --query "ALTER TABLE table_for_freeze_replicated FREEZE WITH NAME 'test_01417' FORMAT TSVWithNames SETTINGS alter_partition_verbose_result = 1;" \