mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #40031 from ClickHouse/fix_old_tmp_dirs_cleanup
Fix old temporary directories cleanup
This commit is contained in:
commit
46aaf842f8
@ -314,6 +314,8 @@ else
|
||||
|
||||
# Avoid "Setting allow_deprecated_database_ordinary is neither a builtin setting..."
|
||||
rm -f /etc/clickhouse-server/users.d/database_ordinary.xml ||:
|
||||
# Disable aggressive cleanup of tmp dirs (it worked incorrectly before 22.8)
|
||||
rm -f /etc/clickhouse-server/config.d/merge_tree_old_dirs_cleanup.xml ||:
|
||||
|
||||
start
|
||||
|
||||
|
@ -420,6 +420,13 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
if (blocker.isCancelled())
|
||||
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
|
||||
|
||||
/// It should be "tmp-fetch_" and not "tmp_fetch_", because we can fetch part to detached/,
|
||||
/// but detached part name prefix should not contain underscore.
|
||||
static const String TMP_PREFIX = "tmp-fetch_";
|
||||
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
|
||||
String part_dir = tmp_prefix + part_name;
|
||||
auto temporary_directory_lock = data.getTemporaryPartDirectoryHolder(part_dir);
|
||||
|
||||
/// Validation of the input that may come from malicious replica.
|
||||
auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version);
|
||||
const auto data_settings = data.getSettings();
|
||||
@ -555,7 +562,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
|
||||
try
|
||||
{
|
||||
return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, disk, *in, throttler);
|
||||
return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix, disk, *in, throttler);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
@ -575,9 +582,11 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
tryLogCurrentException(log);
|
||||
}
|
||||
|
||||
temporary_directory_lock = {};
|
||||
|
||||
/// Try again but without zero-copy
|
||||
return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
|
||||
user, password, interserver_scheme, throttler, to_detached, tmp_prefix_, nullptr, false, disk);
|
||||
user, password, interserver_scheme, throttler, to_detached, tmp_prefix, nullptr, false, disk);
|
||||
}
|
||||
}
|
||||
|
||||
@ -597,7 +606,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
MergeTreeData::DataPart::Checksums checksums;
|
||||
return part_type == "InMemory"
|
||||
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, *in, projections, throttler)
|
||||
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, disk, *in, projections, checksums, throttler);
|
||||
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix, sync, disk, *in, projections, checksums, throttler);
|
||||
}
|
||||
|
||||
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
|
||||
@ -755,7 +764,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
|
||||
const String & part_name,
|
||||
const String & replica_path,
|
||||
bool to_detached,
|
||||
const String & tmp_prefix_,
|
||||
const String & tmp_prefix,
|
||||
bool sync,
|
||||
DiskPtr disk,
|
||||
PooledReadWriteBufferFromHTTP & in,
|
||||
@ -763,8 +772,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
|
||||
MergeTreeData::DataPart::Checksums & checksums,
|
||||
ThrottlerPtr throttler)
|
||||
{
|
||||
static const String TMP_PREFIX = "tmp-fetch_";
|
||||
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
|
||||
assert(!tmp_prefix.empty());
|
||||
|
||||
/// We will remove directory if it's already exists. Make precautions.
|
||||
if (tmp_prefix.empty() //-V560
|
||||
@ -836,7 +844,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
|
||||
const String & part_name,
|
||||
const String & replica_path,
|
||||
bool to_detached,
|
||||
const String & tmp_prefix_,
|
||||
const String & tmp_prefix,
|
||||
DiskPtr disk,
|
||||
PooledReadWriteBufferFromHTTP & in,
|
||||
ThrottlerPtr throttler)
|
||||
@ -854,8 +862,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
|
||||
|
||||
data.lockSharedDataTemporary(part_name, part_id, disk);
|
||||
|
||||
static const String TMP_PREFIX = "tmp-fetch_";
|
||||
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
|
||||
assert(!tmp_prefix.empty());
|
||||
|
||||
String part_dir = tmp_prefix + part_name;
|
||||
String part_relative_path = data.getRelativeDataPath() + String(to_detached ? "detached/" : "");
|
||||
|
@ -143,10 +143,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
|
||||
if (global_ctx->data_part_storage_builder->exists())
|
||||
throw Exception("Directory " + global_ctx->data_part_storage_builder->getFullPath() + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
|
||||
|
||||
global_ctx->data->temporary_parts.add(local_tmp_part_basename);
|
||||
SCOPE_EXIT(
|
||||
global_ctx->data->temporary_parts.remove(local_tmp_part_basename);
|
||||
);
|
||||
if (!global_ctx->parent_part)
|
||||
global_ctx->temporary_directory_lock = global_ctx->data->getTemporaryPartDirectoryHolder(local_tmp_part_basename);
|
||||
|
||||
global_ctx->all_column_names = global_ctx->metadata_snapshot->getColumns().getNamesOfPhysical();
|
||||
global_ctx->storage_columns = global_ctx->metadata_snapshot->getColumns().getAllPhysical();
|
||||
|
@ -178,6 +178,8 @@ private:
|
||||
IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns{};
|
||||
|
||||
MergeTreeTransactionPtr txn;
|
||||
|
||||
scope_guard temporary_directory_lock;
|
||||
};
|
||||
|
||||
using GlobalRuntimeContextPtr = std::shared_ptr<GlobalRuntimeContext>;
|
||||
|
@ -1599,6 +1599,12 @@ size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lif
|
||||
return cleared_count;
|
||||
}
|
||||
|
||||
scope_guard MergeTreeData::getTemporaryPartDirectoryHolder(const String & part_dir_name)
|
||||
{
|
||||
temporary_parts.add(part_dir_name);
|
||||
return [this, part_dir_name]() { temporary_parts.remove(part_dir_name); };
|
||||
}
|
||||
|
||||
|
||||
MergeTreeData::DataPartsVector MergeTreeData::grabOldParts(bool force)
|
||||
{
|
||||
@ -2858,6 +2864,13 @@ void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction
|
||||
part->is_temp = false;
|
||||
part->setState(DataPartState::PreActive);
|
||||
|
||||
assert([&]()
|
||||
{
|
||||
String dir_name = fs::path(part->data_part_storage->getRelativePath()).filename();
|
||||
bool may_be_cleaned_up = dir_name.starts_with("tmp_") || dir_name.starts_with("tmp-fetch_");
|
||||
return !may_be_cleaned_up || temporary_parts.contains(dir_name);
|
||||
}());
|
||||
|
||||
part->renameTo(part->name, true, builder);
|
||||
|
||||
data_parts_indexes.insert(part);
|
||||
@ -5968,7 +5981,7 @@ MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(
|
||||
return checkStructureAndGetMergeTreeData(*source_table, src_snapshot, my_snapshot);
|
||||
}
|
||||
|
||||
MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
|
||||
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAndLoadDataPartOnSameDisk(
|
||||
const MergeTreeData::DataPartPtr & src_part,
|
||||
const String & tmp_part_prefix,
|
||||
const MergeTreePartInfo & dst_part_info,
|
||||
@ -5996,6 +6009,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
|
||||
String dst_part_name = src_part->getNewName(dst_part_info);
|
||||
assert(!tmp_part_prefix.empty());
|
||||
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
|
||||
auto temporary_directory_lock = getTemporaryPartDirectoryHolder(tmp_dst_part_name);
|
||||
|
||||
/// Why it is needed if we only hardlink files?
|
||||
auto reservation = src_part->data_part_storage->reserve(src_part->getBytesOnDisk());
|
||||
@ -6043,7 +6057,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
|
||||
|
||||
dst_data_part->loadColumnsChecksumsIndexes(require_part_metadata, true);
|
||||
dst_data_part->modification_time = dst_part_storage->getLastModified().epochTime();
|
||||
return dst_data_part;
|
||||
return std::make_pair(dst_data_part, std::move(temporary_directory_lock));
|
||||
}
|
||||
|
||||
String MergeTreeData::getFullPathOnDisk(const DiskPtr & disk) const
|
||||
|
@ -638,7 +638,7 @@ public:
|
||||
|
||||
/// Delete all directories which names begin with "tmp"
|
||||
/// Must be called with locked lockForShare() because it's using relative_data_path.
|
||||
size_t clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes = {"tmp_", });
|
||||
size_t clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes = {"tmp_", "tmp-fetch_"});
|
||||
|
||||
size_t clearEmptyParts();
|
||||
|
||||
@ -792,7 +792,7 @@ public:
|
||||
NameSet hardlinks_from_source_part;
|
||||
};
|
||||
|
||||
MergeTreeData::MutableDataPartPtr cloneAndLoadDataPartOnSameDisk(
|
||||
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> cloneAndLoadDataPartOnSameDisk(
|
||||
const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix,
|
||||
const MergeTreePartInfo & dst_part_info, const StorageMetadataPtr & metadata_snapshot,
|
||||
const MergeTreeTransactionPtr & txn, HardlinkedFiles * hardlinked_files,
|
||||
@ -1001,6 +1001,9 @@ public:
|
||||
/// Used for freezePartitionsByMatcher and unfreezePartitionsByMatcher
|
||||
using MatcherFn = std::function<bool(const String &)>;
|
||||
|
||||
/// Returns an object that protects temporary directory from cleanup
|
||||
scope_guard getTemporaryPartDirectoryHolder(const String & part_dir_name);
|
||||
|
||||
protected:
|
||||
friend class IMergeTreeDataPart;
|
||||
friend class MergeTreeDataMergerMutator;
|
||||
|
@ -321,6 +321,9 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
|
||||
else
|
||||
part_name = new_part_info.getPartName();
|
||||
|
||||
String part_dir = TMP_PREFIX + part_name;
|
||||
temp_part.temporary_directory_lock = data.getTemporaryPartDirectoryHolder(part_dir);
|
||||
|
||||
/// If we need to calculate some columns to sort.
|
||||
if (metadata_snapshot->hasSortingKey() || metadata_snapshot->hasSecondaryIndices())
|
||||
data.getSortingKeyAndSkipIndicesExpression(metadata_snapshot)->execute(block);
|
||||
|
@ -64,6 +64,8 @@ public:
|
||||
|
||||
std::vector<Stream> streams;
|
||||
|
||||
scope_guard temporary_directory_lock;
|
||||
|
||||
void finalize();
|
||||
};
|
||||
|
||||
|
@ -736,6 +736,8 @@ struct MutationContext
|
||||
MergeTreeTransactionPtr txn;
|
||||
|
||||
MergeTreeData::HardlinkedFiles hardlinked_files;
|
||||
|
||||
scope_guard temporary_directory_lock;
|
||||
};
|
||||
|
||||
using MutationContextPtr = std::shared_ptr<MutationContext>;
|
||||
@ -1501,7 +1503,9 @@ bool MutateTask::prepare()
|
||||
ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading)))
|
||||
{
|
||||
LOG_TRACE(ctx->log, "Part {} doesn't change up to mutation version {}", ctx->source_part->name, ctx->future_part->part_info.mutation);
|
||||
promise.set_value(ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_clone_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn, &ctx->hardlinked_files, false));
|
||||
auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_clone_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn, &ctx->hardlinked_files, false);
|
||||
ctx->temporary_directory_lock = std::move(lock);
|
||||
promise.set_value(std::move(part));
|
||||
return false;
|
||||
}
|
||||
else
|
||||
@ -1532,15 +1536,18 @@ bool MutateTask::prepare()
|
||||
/// FIXME new_data_part is not used in the case when we clone part with cloneAndLoadDataPartOnSameDisk and return false
|
||||
/// Is it possible to handle this case earlier?
|
||||
|
||||
String tmp_part_dir_name = "tmp_mut_" + ctx->future_part->name;
|
||||
ctx->temporary_directory_lock = ctx->data->getTemporaryPartDirectoryHolder(tmp_part_dir_name);
|
||||
|
||||
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(
|
||||
single_disk_volume,
|
||||
ctx->data->getRelativeDataPath(),
|
||||
"tmp_mut_" + ctx->future_part->name);
|
||||
tmp_part_dir_name);
|
||||
|
||||
ctx->data_part_storage_builder = std::make_shared<DataPartStorageBuilderOnDisk>(
|
||||
single_disk_volume,
|
||||
ctx->data->getRelativeDataPath(),
|
||||
"tmp_mut_" + ctx->future_part->name);
|
||||
tmp_part_dir_name);
|
||||
|
||||
ctx->new_data_part = ctx->data->createPart(
|
||||
ctx->future_part->name, ctx->future_part->type, ctx->future_part->part_info, data_part_storage);
|
||||
@ -1608,7 +1615,11 @@ bool MutateTask::prepare()
|
||||
&& ctx->files_to_rename.empty())
|
||||
{
|
||||
LOG_TRACE(ctx->log, "Part {} doesn't change up to mutation version {} (optimized)", ctx->source_part->name, ctx->future_part->part_info.mutation);
|
||||
promise.set_value(ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_mut_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn, &ctx->hardlinked_files, false));
|
||||
/// new_data_part is not used here, another part is created instead (see the comment above)
|
||||
ctx->temporary_directory_lock = {};
|
||||
auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_mut_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn, &ctx->hardlinked_files, false);
|
||||
ctx->temporary_directory_lock = std::move(lock);
|
||||
promise.set_value(std::move(part));
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1,24 +1,34 @@
|
||||
#include <Storages/MergeTree/TemporaryParts.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
bool TemporaryParts::contains(const std::string & basename) const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return parts.contains(basename);
|
||||
}
|
||||
|
||||
void TemporaryParts::add(std::string basename)
|
||||
void TemporaryParts::add(const std::string & basename)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
parts.emplace(std::move(basename));
|
||||
bool inserted = parts.emplace(basename).second;
|
||||
if (!inserted)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary part {} already added", basename);
|
||||
}
|
||||
|
||||
void TemporaryParts::remove(const std::string & basename)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
parts.erase(basename);
|
||||
bool removed = parts.erase(basename);
|
||||
if (!removed)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary part {} does not exist", basename);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -18,6 +18,10 @@ private:
|
||||
/// NOTE: It is pretty short, so use STL is fine.
|
||||
std::unordered_set<std::string> parts;
|
||||
|
||||
void add(const std::string & basename);
|
||||
void remove(const std::string & basename);
|
||||
|
||||
friend class MergeTreeData;
|
||||
public:
|
||||
/// Returns true if passed part name is active.
|
||||
/// (is the destination for one of active mutation/merge).
|
||||
@ -25,9 +29,6 @@ public:
|
||||
/// NOTE: that it accept basename (i.e. dirname), not the path,
|
||||
/// since later requires canonical form.
|
||||
bool contains(const std::string & basename) const;
|
||||
|
||||
void add(std::string basename);
|
||||
void remove(const std::string & basename);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -123,7 +123,7 @@ void StorageMergeTree::startup()
|
||||
|
||||
/// Temporary directories contain incomplete results of merges (after forced restart)
|
||||
/// and don't allow to reinitialize them, so delete each of them immediately
|
||||
clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_"});
|
||||
clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"});
|
||||
|
||||
/// NOTE background task will also do the above cleanups periodically.
|
||||
time_after_previous_cleanup_parts.restart();
|
||||
@ -1594,6 +1594,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
|
||||
|
||||
DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id);
|
||||
MutableDataPartsVector dst_parts;
|
||||
std::vector<scope_guard> dst_parts_locks;
|
||||
|
||||
static const String TMP_PREFIX = "tmp_replace_from_";
|
||||
|
||||
@ -1608,8 +1609,9 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
|
||||
Int64 temp_index = insert_increment.get();
|
||||
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
|
||||
|
||||
auto dst_part = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, my_metadata_snapshot, local_context->getCurrentTransaction(), {}, false);
|
||||
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, my_metadata_snapshot, local_context->getCurrentTransaction(), {}, false);
|
||||
dst_parts.emplace_back(std::move(dst_part));
|
||||
dst_parts_locks.emplace_back(std::move(part_lock));
|
||||
}
|
||||
|
||||
/// ATTACH empty part set
|
||||
@ -1687,6 +1689,7 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
|
||||
|
||||
DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id);
|
||||
MutableDataPartsVector dst_parts;
|
||||
std::vector<scope_guard> dst_parts_locks;
|
||||
|
||||
static const String TMP_PREFIX = "tmp_move_from_";
|
||||
|
||||
@ -1701,8 +1704,9 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
|
||||
Int64 temp_index = insert_increment.get();
|
||||
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
|
||||
|
||||
auto dst_part = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, local_context->getCurrentTransaction(), {}, false);
|
||||
auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, local_context->getCurrentTransaction(), {}, false);
|
||||
dst_parts.emplace_back(std::move(dst_part));
|
||||
dst_parts_locks.emplace_back(std::move(part_lock));
|
||||
}
|
||||
|
||||
/// empty part set
|
||||
|
@ -466,7 +466,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
}
|
||||
/// Temporary directories contain uninitialized results of Merges or Fetches (after forced restart),
|
||||
/// don't allow to reinitialize them, delete each of them immediately.
|
||||
clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_"});
|
||||
clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"});
|
||||
clearOldWriteAheadLogs();
|
||||
if (getSettings()->merge_tree_enable_clear_old_broken_detached)
|
||||
clearOldBrokenPartsFromDetachedDirecory();
|
||||
@ -1957,6 +1957,8 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
String replica;
|
||||
|
||||
MergeTreeData::HardlinkedFiles hardlinked_files;
|
||||
|
||||
scope_guard temporary_part_lock;
|
||||
};
|
||||
|
||||
using PartDescriptionPtr = std::shared_ptr<PartDescription>;
|
||||
@ -2203,8 +2205,10 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
if (part_desc->checksum_hex != part_desc->src_table_part->checksums.getTotalChecksumHex())
|
||||
throw Exception("Checksums of " + part_desc->src_table_part->name + " is suddenly changed", ErrorCodes::UNFINISHED);
|
||||
|
||||
part_desc->res_part = cloneAndLoadDataPartOnSameDisk(
|
||||
auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk(
|
||||
part_desc->src_table_part, TMP_PREFIX + "clone_", part_desc->new_part_info, metadata_snapshot, NO_TRANSACTION_PTR, &part_desc->hardlinked_files, false);
|
||||
part_desc->res_part = std::move(res_part);
|
||||
part_desc->temporary_part_lock = std::move(temporary_part_lock);
|
||||
}
|
||||
else if (!part_desc->replica.empty())
|
||||
{
|
||||
@ -3943,12 +3947,15 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
|
||||
std::optional<CurrentlySubmergingEmergingTagger> tagger_ptr;
|
||||
std::function<MutableDataPartPtr()> get_part;
|
||||
MergeTreeData::HardlinkedFiles hardlinked_files;
|
||||
scope_guard part_to_clone_lock;
|
||||
|
||||
if (part_to_clone)
|
||||
{
|
||||
get_part = [&, part_to_clone]()
|
||||
{
|
||||
return cloneAndLoadDataPartOnSameDisk(part_to_clone, "tmp_clone_", part_info, metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, false);
|
||||
auto [cloned_part, lock] = cloneAndLoadDataPartOnSameDisk(part_to_clone, "tmp_clone_", part_info, metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, false);
|
||||
part_to_clone_lock = std::move(lock);
|
||||
return cloned_part;
|
||||
};
|
||||
}
|
||||
else
|
||||
@ -4044,7 +4051,10 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
|
||||
/// The same part is being written right now (but probably it's not committed yet).
|
||||
/// We will check the need for fetch later.
|
||||
if (e.code() == ErrorCodes::DIRECTORY_ALREADY_EXISTS)
|
||||
{
|
||||
LOG_TRACE(log, "Not fetching part: {}", e.message());
|
||||
return false;
|
||||
}
|
||||
|
||||
throw;
|
||||
}
|
||||
@ -4156,7 +4166,10 @@ DataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
|
||||
/// The same part is being written right now (but probably it's not committed yet).
|
||||
/// We will check the need for fetch later.
|
||||
if (e.code() == ErrorCodes::DIRECTORY_ALREADY_EXISTS)
|
||||
{
|
||||
LOG_TRACE(log, "Not fetching part: {}", e.message());
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
throw;
|
||||
}
|
||||
@ -6377,6 +6390,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|
||||
{
|
||||
DataPartsVector src_parts;
|
||||
MutableDataPartsVector dst_parts;
|
||||
std::vector<scope_guard> dst_parts_locks;
|
||||
Strings block_id_paths;
|
||||
Strings part_checksums;
|
||||
std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
|
||||
@ -6443,10 +6457,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|
||||
bool copy_instead_of_hardlink = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|
||||
&& src_part->isStoredOnRemoteDiskWithZeroCopySupport();
|
||||
|
||||
auto dst_part = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, copy_instead_of_hardlink);
|
||||
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, copy_instead_of_hardlink);
|
||||
|
||||
src_parts.emplace_back(src_part);
|
||||
dst_parts.emplace_back(dst_part);
|
||||
dst_parts_locks.emplace_back(std::move(part_lock));
|
||||
ephemeral_locks.emplace_back(std::move(*lock));
|
||||
block_id_paths.emplace_back(block_id_path);
|
||||
part_checksums.emplace_back(hash_hex);
|
||||
@ -6646,6 +6661,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
||||
Coordination::Stat dest_alter_partition_version_stat;
|
||||
zookeeper->get(dest_alter_partition_version_path, &dest_alter_partition_version_stat);
|
||||
std::vector<MergeTreeData::HardlinkedFiles> hardlinked_files_for_parts;
|
||||
std::vector<scope_guard> temporary_parts_locks;
|
||||
|
||||
for (const auto & src_part : src_all_parts)
|
||||
{
|
||||
@ -6672,10 +6688,11 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
||||
bool copy_instead_of_hardlink = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|
||||
&& src_part->isStoredOnRemoteDiskWithZeroCopySupport();
|
||||
|
||||
auto dst_part = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, copy_instead_of_hardlink);
|
||||
auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, copy_instead_of_hardlink);
|
||||
|
||||
src_parts.emplace_back(src_part);
|
||||
dst_parts.emplace_back(dst_part);
|
||||
temporary_parts_locks.emplace_back(std::move(dst_part_lock));
|
||||
ephemeral_locks.emplace_back(std::move(*lock));
|
||||
block_id_paths.emplace_back(block_id_path);
|
||||
part_checksums.emplace_back(hash_hex);
|
||||
|
8
tests/config/config.d/merge_tree_old_dirs_cleanup.xml
Normal file
8
tests/config/config.d/merge_tree_old_dirs_cleanup.xml
Normal file
@ -0,0 +1,8 @@
|
||||
<clickhouse>
|
||||
<merge_tree>
|
||||
<!-- Default is 86400 (1 day), but we have protection from removal of tmp dirs that are currently in use -->
|
||||
<temporary_directories_lifetime>1</temporary_directories_lifetime>
|
||||
<!-- Default is 60 seconds, but let's make tests more aggressive -->
|
||||
<merge_tree_clear_old_temporary_directories_interval_seconds>10</merge_tree_clear_old_temporary_directories_interval_seconds>
|
||||
</merge_tree>
|
||||
</clickhouse>
|
@ -29,6 +29,7 @@ ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/merge_tree_old_dirs_cleanup.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/test_cluster_with_incorrect_pw.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/keeper_port.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/logging_no_rotate.xml $DEST_SERVER_PATH/config.d/
|
||||
|
@ -0,0 +1 @@
|
||||
10 45
|
@ -0,0 +1,11 @@
|
||||
DROP TABLE IF EXISTS test_inserts;
|
||||
|
||||
CREATE TABLE test_inserts (`key` Int, `part` Int) ENGINE = MergeTree PARTITION BY part ORDER BY key
|
||||
SETTINGS temporary_directories_lifetime = 0, merge_tree_clear_old_temporary_directories_interval_seconds = 0;
|
||||
|
||||
INSERT INTO test_inserts SELECT sleep(1), number FROM numbers(10)
|
||||
SETTINGS max_insert_delayed_streams_for_parallel_write = 100, max_insert_block_size = 1, min_insert_block_size_rows = 1;
|
||||
|
||||
SELECT count(), sum(part) FROM test_inserts;
|
||||
|
||||
DROP TABLE test_inserts;
|
Loading…
Reference in New Issue
Block a user