ClickHouse/src/Storages/StorageMergeTree.cpp
2023-08-23 18:59:43 +02:00

2362 lines
92 KiB
C++

#include "StorageMergeTree.h"
#include "Core/QueryProcessingStage.h"
#include "Storages/MergeTree/IMergeTreeDataPart.h"
#include <optional>
#include <ranges>
#include <base/sort.h>
#include <Backups/BackupEntriesCollector.h>
#include <Databases/IDatabase.h>
#include <Common/MemoryTracker.h>
#include <Common/escapeForFileName.h>
#include <Common/ProfileEventsScope.h>
#include <Common/typeid_cast.h>
#include <Common/ThreadPool.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/MutationsInterpreter.h>
#include <Interpreters/Context.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <IO/copyData.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTPartition.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/formatAST.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/AlterCommands.h>
#include <Storages/PartitionCommands.h>
#include <Storages/MergeTree/MergeTreeSink.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergePlainMergeTreeTask.h>
#include <Storages/MergeTree/PartitionPruner.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <fmt/core.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
extern const int NOT_ENOUGH_SPACE;
extern const int BAD_ARGUMENTS;
extern const int INCORRECT_DATA;
extern const int CANNOT_ASSIGN_OPTIMIZE;
extern const int TIMEOUT_EXCEEDED;
extern const int UNKNOWN_POLICY;
extern const int NO_SUCH_DATA_PART;
extern const int ABORTED;
}
namespace ActionLocks
{
extern const StorageActionBlockType PartsMerge;
extern const StorageActionBlockType PartsTTLMerge;
extern const StorageActionBlockType PartsMove;
}
static MergeTreeTransactionPtr tryGetTransactionForMutation(const MergeTreeMutationEntry & mutation, Poco::Logger * log = nullptr)
{
assert(!mutation.tid.isEmpty());
if (mutation.tid.isPrehistoric())
return {};
auto txn = TransactionLog::instance().tryGetRunningTransaction(mutation.tid.getHash());
if (txn)
return txn;
if (log)
LOG_WARNING(log, "Cannot find transaction {} which had started mutation {}, probably it finished", mutation.tid, mutation.file_name);
return {};
}
StorageMergeTree::StorageMergeTree(
const StorageID & table_id_,
const String & relative_data_path_,
const StorageInMemoryMetadata & metadata_,
bool attach,
ContextMutablePtr context_,
const String & date_column_name,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> storage_settings_,
bool has_force_restore_data_flag)
: MergeTreeData(
table_id_,
metadata_,
context_,
date_column_name,
merging_params_,
std::move(storage_settings_),
false, /// require_part_metadata
attach)
, reader(*this)
, writer(*this)
, merger_mutator(*this)
{
initializeDirectoriesAndFormatVersion(relative_data_path_, attach, date_column_name);
loadDataParts(has_force_restore_data_flag);
if (!attach && !getDataPartsForInternalUsage().empty() && !isStaticStorage())
throw Exception(ErrorCodes::INCORRECT_DATA,
"Data directory for table already containing data parts - probably "
"it was unclean DROP table or manual intervention. "
"You must either clear directory by hand or use ATTACH TABLE instead "
"of CREATE TABLE if you need to use that parts.");
increment.set(getMaxBlockNumber());
loadMutations();
loadDeduplicationLog();
}
void StorageMergeTree::startup()
{
clearOldWriteAheadLogs();
clearEmptyParts();
/// Temporary directories contain incomplete results of merges (after forced restart)
/// and don't allow to reinitialize them, so delete each of them immediately
clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"});
/// NOTE background task will also do the above cleanups periodically.
time_after_previous_cleanup_parts.restart();
time_after_previous_cleanup_temporary_directories.restart();
/// Do not schedule any background jobs if current storage has static data files.
if (isStaticStorage())
return;
try
{
background_operations_assignee.start();
startBackgroundMovesIfNeeded();
startOutdatedDataPartsLoadingTask();
}
catch (...)
{
/// Exception safety: failed "startup" does not require a call to "shutdown" from the caller.
/// And it should be able to safely destroy table after exception in "startup" method.
/// It means that failed "startup" must not create any background tasks that we will have to wait.
try
{
shutdown();
}
catch (...)
{
std::terminate();
}
/// Note: after failed "startup", the table will be in a state that only allows to destroy the object.
throw;
}
}
void StorageMergeTree::shutdown()
{
if (shutdown_called.exchange(true))
return;
stopOutdatedDataPartsLoadingTask();
/// Unlock all waiting mutations
{
std::lock_guard lock(mutation_wait_mutex);
mutation_wait_event.notify_all();
}
merger_mutator.merges_blocker.cancelForever();
parts_mover.moves_blocker.cancelForever();
background_operations_assignee.finish();
background_moves_assignee.finish();
if (deduplication_log)
deduplication_log->shutdown();
}
StorageMergeTree::~StorageMergeTree()
{
shutdown();
}
void StorageMergeTree::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
if (!query_info.parallel_replicas_disabled &&
local_context->canUseParallelReplicasOnInitiator() &&
local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree)
{
auto table_id = getStorageID();
const auto & modified_query_ast = ClusterProxy::rewriteSelectQuery(
local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
String cluster_for_parallel_replicas = local_context->getSettingsRef().cluster_for_parallel_replicas;
auto cluster = local_context->getCluster(cluster_for_parallel_replicas);
Block header;
if (local_context->getSettingsRef().allow_experimental_analyzer)
header = InterpreterSelectQueryAnalyzer::getSampleBlock(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze());
else
header = InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
ClusterProxy::SelectStreamFactory select_stream_factory =
ClusterProxy::SelectStreamFactory(
header,
{},
storage_snapshot,
processed_stage);
ClusterProxy::executeQueryWithParallelReplicas(
query_plan, getStorageID(), /*remove_table_function_ptr*/ nullptr,
select_stream_factory, modified_query_ast,
local_context, query_info, cluster);
}
else
{
const bool enable_parallel_reading =
!query_info.parallel_replicas_disabled &&
local_context->canUseParallelReplicasOnFollower() &&
local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree;
if (auto plan = reader.read(
column_names, storage_snapshot, query_info,
local_context, max_block_size, num_streams,
processed_stage, nullptr, enable_parallel_reading))
query_plan = std::move(*plan);
}
}
std::optional<UInt64> StorageMergeTree::totalRows(const Settings &) const
{
return getTotalActiveSizeInRows();
}
std::optional<UInt64> StorageMergeTree::totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr local_context) const
{
auto parts = getVisibleDataPartsVector(local_context);
return totalRowsByPartitionPredicateImpl(query_info, local_context, parts);
}
std::optional<UInt64> StorageMergeTree::totalBytes(const Settings &) const
{
return getTotalActiveSizeInBytes();
}
SinkToStoragePtr
StorageMergeTree::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
{
const auto & settings = local_context->getSettingsRef();
return std::make_shared<MergeTreeSink>(
*this, metadata_snapshot, settings.max_partitions_per_insert_block, local_context);
}
void StorageMergeTree::checkTableCanBeDropped() const
{
if (!supportsReplication() && isStaticStorage())
return;
auto table_id = getStorageID();
getContext()->checkTableCanBeDropped(table_id.database_name, table_id.table_name, getTotalActiveSizeInBytes());
}
void StorageMergeTree::drop()
{
shutdown();
/// In case there is read-only disk we cannot allow to call dropAllData(), but dropping tables is allowed.
if (isStaticStorage())
return;
dropAllData();
}
void StorageMergeTree::alter(
const AlterCommands & commands,
ContextPtr local_context,
AlterLockHolder & table_lock_holder)
{
if (local_context->getCurrentTransaction() && local_context->getSettingsRef().throw_on_unsupported_query_inside_transaction)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "ALTER METADATA is not supported inside transactions");
auto table_id = getStorageID();
auto old_storage_settings = getSettings();
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
auto maybe_mutation_commands = commands.getMutationCommands(new_metadata, local_context->getSettingsRef().materialize_ttl_after_modify, local_context);
if (!maybe_mutation_commands.empty())
delayMutationOrThrowIfNeeded(nullptr, local_context);
Int64 mutation_version = -1;
commands.apply(new_metadata, local_context);
/// This alter can be performed at new_metadata level only
if (commands.isSettingsAlter())
{
changeSettings(new_metadata.settings_changes, table_lock_holder);
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
}
else
{
if (!maybe_mutation_commands.empty() && maybe_mutation_commands.containBarrierCommand())
{
int64_t prev_mutation = 0;
{
std::lock_guard lock(currently_processing_in_background_mutex);
auto it = current_mutations_by_version.rbegin();
if (it != current_mutations_by_version.rend())
prev_mutation = it->first;
}
if (prev_mutation != 0)
{
LOG_DEBUG(log, "Cannot change metadata with barrier alter query, will wait for mutation {}", prev_mutation);
waitForMutation(prev_mutation, /* from_another_mutation */ true);
LOG_DEBUG(log, "Mutation {} finished", prev_mutation);
}
}
{
changeSettings(new_metadata.settings_changes, table_lock_holder);
checkTTLExpressions(new_metadata, old_metadata);
/// Reinitialize primary key because primary key column types might have changed.
setProperties(new_metadata, old_metadata, false, local_context);
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
if (!maybe_mutation_commands.empty())
mutation_version = startMutation(maybe_mutation_commands, local_context);
}
{
/// Reset Object columns, because column of type
/// Object may be added or dropped by alter.
auto parts_lock = lockParts();
resetObjectColumnsFromActiveParts(parts_lock);
}
/// Always execute required mutations synchronously, because alters
/// should be executed in sequential order.
if (!maybe_mutation_commands.empty())
waitForMutation(mutation_version, false);
}
{
/// Some additional changes in settings
auto new_storage_settings = getSettings();
if (old_storage_settings->non_replicated_deduplication_window != new_storage_settings->non_replicated_deduplication_window)
{
/// We cannot place this check into settings sanityCheck because it depends on format_version.
/// sanityCheck must work event without storage.
if (new_storage_settings->non_replicated_deduplication_window != 0 && format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Deduplication for non-replicated MergeTree in old syntax is not supported");
deduplication_log->setDeduplicationWindowSize(new_storage_settings->non_replicated_deduplication_window);
}
}
}
/// While exists, marks parts as 'currently_merging_mutating_parts' and reserves free space on filesystem.
CurrentlyMergingPartsTagger::CurrentlyMergingPartsTagger(
FutureMergedMutatedPartPtr future_part_,
size_t total_size,
StorageMergeTree & storage_,
const StorageMetadataPtr & metadata_snapshot,
bool is_mutation)
: future_part(future_part_), storage(storage_)
{
/// Assume mutex is already locked, because this method is called from mergeTask.
/// if we mutate part, than we should reserve space on the same disk, because mutations possible can create hardlinks
if (is_mutation)
{
reserved_space = storage.tryReserveSpace(total_size, future_part->parts[0]->getDataPartStorage());
}
else
{
IMergeTreeDataPart::TTLInfos ttl_infos;
size_t max_volume_index = 0;
for (auto & part_ptr : future_part->parts)
{
ttl_infos.update(part_ptr->ttl_infos);
auto disk_name = part_ptr->getDataPartStorage().getDiskName();
size_t volume_index = storage.getStoragePolicy()->getVolumeIndexByDiskName(disk_name);
max_volume_index = std::max(max_volume_index, volume_index);
}
reserved_space = storage.balancedReservation(
metadata_snapshot,
total_size,
max_volume_index,
future_part->name,
future_part->part_info,
future_part->parts,
&tagger,
&ttl_infos);
if (!reserved_space)
reserved_space
= storage.tryReserveSpacePreferringTTLRules(metadata_snapshot, total_size, ttl_infos, time(nullptr), max_volume_index);
}
if (!reserved_space)
{
if (is_mutation)
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space for mutating part '{}'", future_part->parts[0]->name);
else
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space for merging parts");
}
future_part->updatePath(storage, reserved_space.get());
for (const auto & part : future_part->parts)
{
if (storage.currently_merging_mutating_parts.contains(part))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged part {}. This is a bug.", part->name);
}
storage.currently_merging_mutating_parts.insert(future_part->parts.begin(), future_part->parts.end());
}
CurrentlyMergingPartsTagger::~CurrentlyMergingPartsTagger()
{
std::lock_guard lock(storage.currently_processing_in_background_mutex);
for (const auto & part : future_part->parts)
{
if (!storage.currently_merging_mutating_parts.contains(part))
std::terminate();
storage.currently_merging_mutating_parts.erase(part);
}
storage.currently_processing_in_background_condition.notify_all();
}
Int64 StorageMergeTree::startMutation(const MutationCommands & commands, ContextPtr query_context)
{
/// Choose any disk, because when we load mutations we search them at each disk
/// where storage can be placed. See loadMutations().
auto disk = getStoragePolicy()->getAnyDisk();
TransactionID current_tid = Tx::PrehistoricTID;
String additional_info;
auto txn = query_context->getCurrentTransaction();
if (txn)
{
current_tid = txn->tid;
additional_info = fmt::format(" (TID: {}; TIDH: {})", current_tid, current_tid.getHash());
}
Int64 version;
{
std::lock_guard lock(currently_processing_in_background_mutex);
MergeTreeMutationEntry entry(commands, disk, relative_data_path, insert_increment.get(), current_tid, getContext()->getWriteSettings());
version = increment.get();
entry.commit(version);
String mutation_id = entry.file_name;
if (txn)
txn->addMutation(shared_from_this(), mutation_id);
bool inserted = current_mutations_by_version.try_emplace(version, std::move(entry)).second;
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", version);
LOG_INFO(log, "Added mutation: {}{}", mutation_id, additional_info);
}
background_operations_assignee.trigger();
return version;
}
void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr result_part, bool is_successful, const String & exception_message)
{
/// Update the information about failed parts in the system.mutations table.
Int64 sources_data_version = result_part->parts.at(0)->info.getDataVersion();
Int64 result_data_version = result_part->part_info.getDataVersion();
if (sources_data_version != result_data_version)
{
std::lock_guard lock(currently_processing_in_background_mutex);
auto mutations_begin_it = current_mutations_by_version.upper_bound(sources_data_version);
auto mutations_end_it = current_mutations_by_version.upper_bound(result_data_version);
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
MergeTreeMutationEntry & entry = it->second;
if (is_successful)
{
if (!entry.latest_failed_part.empty() && result_part->part_info.contains(entry.latest_failed_part_info))
{
entry.latest_failed_part.clear();
entry.latest_failed_part_info = MergeTreePartInfo();
entry.latest_fail_time = 0;
entry.latest_fail_reason.clear();
}
}
else
{
entry.latest_failed_part = result_part->parts.at(0)->name;
entry.latest_failed_part_info = result_part->parts.at(0)->info;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = exception_message;
}
}
}
std::unique_lock lock(mutation_wait_mutex);
mutation_wait_event.notify_all();
}
void StorageMergeTree::waitForMutation(Int64 version, bool wait_for_another_mutation)
{
String mutation_id = MergeTreeMutationEntry::versionToFileName(version);
waitForMutation(version, mutation_id, wait_for_another_mutation);
}
void StorageMergeTree::waitForMutation(const String & mutation_id, bool wait_for_another_mutation)
{
Int64 version = MergeTreeMutationEntry::parseFileName(mutation_id);
waitForMutation(version, mutation_id, wait_for_another_mutation);
}
void StorageMergeTree::waitForMutation(Int64 version, const String & mutation_id, bool wait_for_another_mutation)
{
LOG_INFO(log, "Waiting mutation: {}", mutation_id);
{
auto check = [version, wait_for_another_mutation, this]()
{
if (shutdown_called)
return true;
auto mutation_status = getIncompleteMutationsStatus(version, nullptr, wait_for_another_mutation);
return !mutation_status || mutation_status->is_done || !mutation_status->latest_fail_reason.empty();
};
std::unique_lock lock(mutation_wait_mutex);
mutation_wait_event.wait(lock, check);
}
/// At least we have our current mutation
std::set<String> mutation_ids;
mutation_ids.insert(mutation_id);
auto mutation_status = getIncompleteMutationsStatus(version, &mutation_ids, wait_for_another_mutation);
checkMutationStatus(mutation_status, mutation_ids);
LOG_INFO(log, "Mutation {} done", mutation_id);
}
void StorageMergeTree::setMutationCSN(const String & mutation_id, CSN csn)
{
LOG_INFO(log, "Writing CSN {} for mutation {}", csn, mutation_id);
UInt64 version = MergeTreeMutationEntry::parseFileName(mutation_id);
std::lock_guard lock(currently_processing_in_background_mutex);
auto it = current_mutations_by_version.find(version);
if (it == current_mutations_by_version.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find mutation {}", mutation_id);
it->second.writeCSN(csn);
}
void StorageMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context)
{
delayMutationOrThrowIfNeeded(nullptr, query_context);
/// Validate partition IDs (if any) before starting mutation
getPartitionIdsAffectedByCommands(commands, query_context);
Int64 version;
{
/// It's important to serialize order of mutations with alter queries because
/// they can depend on each other.
if (auto alter_lock = tryLockForAlter(query_context->getSettings().lock_acquire_timeout); alter_lock == std::nullopt)
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED,
"Cannot start mutation in {}ms because some metadata-changing ALTER (MODIFY|RENAME|ADD|DROP) is currently executing. "
"You can change this timeout with `lock_acquire_timeout` setting",
query_context->getSettings().lock_acquire_timeout.totalMilliseconds());
}
version = startMutation(commands, query_context);
}
if (query_context->getSettingsRef().mutations_sync > 0 || query_context->getCurrentTransaction())
waitForMutation(version, false);
}
bool StorageMergeTree::hasLightweightDeletedMask() const
{
return has_lightweight_delete_parts.load(std::memory_order_relaxed);
}
namespace
{
struct PartVersionWithName
{
Int64 version;
String name;
};
bool comparator(const PartVersionWithName & f, const PartVersionWithName & s)
{
return f.version < s.version;
}
}
std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsStatus(
Int64 mutation_version, std::set<String> * mutation_ids, bool from_another_mutation) const
{
std::unique_lock lock(currently_processing_in_background_mutex);
return getIncompleteMutationsStatusUnlocked(mutation_version, lock, mutation_ids, from_another_mutation);
}
std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsStatusUnlocked(
Int64 mutation_version, std::unique_lock<std::mutex> & /*lock*/, std::set<String> * mutation_ids, bool from_another_mutation) const
{
auto current_mutation_it = current_mutations_by_version.find(mutation_version);
/// Killed
if (current_mutation_it == current_mutations_by_version.end())
return {};
MergeTreeMutationStatus result{.is_done = false};
const auto & mutation_entry = current_mutation_it->second;
auto txn = tryGetTransactionForMutation(mutation_entry, log);
/// There's no way a transaction may finish before a mutation that was started by the transaction.
/// But sometimes we need to check status of an unrelated mutation, in this case we don't care about transactions.
assert(txn || mutation_entry.tid.isPrehistoric() || from_another_mutation);
auto data_parts = getVisibleDataPartsVector(txn);
for (const auto & data_part : data_parts)
{
Int64 data_version = data_part->info.getDataVersion();
if (data_version < mutation_version)
{
if (!mutation_entry.latest_fail_reason.empty())
{
result.latest_failed_part = mutation_entry.latest_failed_part;
result.latest_fail_reason = mutation_entry.latest_fail_reason;
result.latest_fail_time = mutation_entry.latest_fail_time;
/// Fill all mutations which failed with the same error
/// (we can execute several mutations together)
if (mutation_ids)
{
auto mutations_begin_it = current_mutations_by_version.upper_bound(data_version);
for (auto it = mutations_begin_it; it != current_mutations_by_version.end(); ++it)
/// All mutations with the same failure
if (it->second.latest_fail_reason == result.latest_fail_reason)
mutation_ids->insert(it->second.file_name);
}
}
else if (txn && !from_another_mutation)
{
/// Part is locked by concurrent transaction, most likely it will never be mutated
TIDHash part_locked = data_part->version.removal_tid_lock.load();
if (part_locked && part_locked != mutation_entry.tid.getHash())
{
result.latest_failed_part = data_part->name;
result.latest_fail_reason = fmt::format("Serialization error: part {} is locked by transaction {}", data_part->name, part_locked);
result.latest_fail_time = time(nullptr);
}
}
return result;
}
}
result.is_done = true;
return result;
}
std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() const
{
std::lock_guard lock(currently_processing_in_background_mutex);
std::vector<PartVersionWithName> part_versions_with_names;
auto data_parts = getDataPartsVectorForInternalUsage();
part_versions_with_names.reserve(data_parts.size());
for (const auto & part : data_parts)
part_versions_with_names.emplace_back(PartVersionWithName{part->info.getDataVersion(), part->name});
std::sort(part_versions_with_names.begin(), part_versions_with_names.end(), comparator);
std::vector<MergeTreeMutationStatus> result;
for (const auto & kv : current_mutations_by_version)
{
Int64 mutation_version = kv.first;
const MergeTreeMutationEntry & entry = kv.second;
const PartVersionWithName needle{mutation_version, ""};
auto versions_it = std::lower_bound(
part_versions_with_names.begin(), part_versions_with_names.end(), needle, comparator);
size_t parts_to_do = versions_it - part_versions_with_names.begin();
Names parts_to_do_names;
parts_to_do_names.reserve(parts_to_do);
for (size_t i = 0; i < parts_to_do; ++i)
parts_to_do_names.push_back(part_versions_with_names[i].name);
std::map<String, Int64> block_numbers_map({{"", entry.block_number}});
for (const MutationCommand & command : entry.commands)
{
WriteBufferFromOwnString buf;
formatAST(*command.ast, buf, false, true);
result.push_back(MergeTreeMutationStatus
{
entry.file_name,
buf.str(),
entry.create_time,
block_numbers_map,
parts_to_do_names,
/* is_done = */parts_to_do_names.empty(),
entry.latest_failed_part,
entry.latest_fail_time,
entry.latest_fail_reason,
});
}
}
return result;
}
CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
{
LOG_TRACE(log, "Killing mutation {}", mutation_id);
UInt64 mutation_version = MergeTreeMutationEntry::tryParseFileName(mutation_id);
if (!mutation_version)
return CancellationCode::NotFound;
std::optional<MergeTreeMutationEntry> to_kill;
{
std::lock_guard lock(currently_processing_in_background_mutex);
auto it = current_mutations_by_version.find(mutation_version);
if (it != current_mutations_by_version.end())
{
to_kill.emplace(std::move(it->second));
current_mutations_by_version.erase(it);
}
}
if (!to_kill)
return CancellationCode::NotFound;
if (auto txn = tryGetTransactionForMutation(*to_kill, log))
{
LOG_TRACE(log, "Cancelling transaction {} which had started mutation {}", to_kill->tid, mutation_id);
TransactionLog::instance().rollbackTransaction(txn);
}
getContext()->getMergeList().cancelPartMutations(getStorageID(), {}, to_kill->block_number);
to_kill->removeFile();
LOG_TRACE(log, "Cancelled part mutations and removed mutation file {}", mutation_id);
{
std::lock_guard lock(mutation_wait_mutex);
mutation_wait_event.notify_all();
}
/// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately.
background_operations_assignee.trigger();
return CancellationCode::CancelSent;
}
void StorageMergeTree::loadDeduplicationLog()
{
auto settings = getSettings();
if (settings->non_replicated_deduplication_window != 0 && format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Deduplication for non-replicated MergeTree in old syntax is not supported");
auto disk = getDisks()[0];
std::string path = fs::path(relative_data_path) / "deduplication_logs";
deduplication_log = std::make_unique<MergeTreeDeduplicationLog>(path, settings->non_replicated_deduplication_window, format_version, disk);
deduplication_log->load();
}
void StorageMergeTree::loadMutations()
{
for (const auto & disk : getDisks())
{
for (auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
if (startsWith(it->name(), "mutation_"))
{
MergeTreeMutationEntry entry(disk, relative_data_path, it->name());
UInt64 block_number = entry.block_number;
LOG_DEBUG(log, "Loading mutation: {} entry, commands size: {}", it->name(), entry.commands.size());
if (!entry.tid.isPrehistoric() && !entry.csn)
{
if (auto csn = TransactionLog::getCSN(entry.tid))
{
/// Transaction is committed => mutation is finished, but let's load it anyway (so it will be shown in system.mutations)
entry.writeCSN(csn);
}
else
{
TransactionLog::assertTIDIsNotOutdated(entry.tid);
LOG_DEBUG(log, "Mutation entry {} was created by transaction {}, but it was not committed. Removing mutation entry",
it->name(), entry.tid);
disk->removeFile(it->path());
continue;
}
}
auto inserted = current_mutations_by_version.try_emplace(block_number, std::move(entry)).second;
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", block_number);
}
else if (startsWith(it->name(), "tmp_mutation_"))
{
disk->removeFile(it->path());
}
}
}
if (!current_mutations_by_version.empty())
increment.value = std::max(increment.value.load(), current_mutations_by_version.rbegin()->first);
}
MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
const StorageMetadataPtr & metadata_snapshot,
bool aggressive,
const String & partition_id,
bool final,
String & out_disable_reason,
TableLockHolder & /* table_lock_holder */,
std::unique_lock<std::mutex> & lock,
const MergeTreeTransactionPtr & txn,
bool optimize_skip_merged_partitions,
SelectPartsDecision * select_decision_out)
{
auto data_settings = getSettings();
auto future_part = std::make_shared<FutureMergedMutatedPart>();
if (storage_settings.get()->assign_part_uuids)
future_part->uuid = UUIDHelpers::generateV4();
/// You must call destructor with unlocked `currently_processing_in_background_mutex`.
CurrentlyMergingPartsTaggerPtr merging_tagger;
MergeList::EntryPtr merge_entry;
auto can_merge = [this, &lock](const DataPartPtr & left, const DataPartPtr & right, const MergeTreeTransaction * tx, String & disable_reason) -> bool
{
if (tx)
{
/// Cannot merge parts if some of them are not visible in current snapshot
/// TODO Transactions: We can use simplified visibility rules (without CSN lookup) here
if ((left && !left->version.isVisible(tx->getSnapshot(), Tx::EmptyTID))
|| (right && !right->version.isVisible(tx->getSnapshot(), Tx::EmptyTID)))
{
disable_reason = "Some part is not visible in transaction";
return false;
}
/// Do not try to merge parts that are locked for removal (merge will probably fail)
if ((left && left->version.isRemovalTIDLocked())
|| (right && right->version.isRemovalTIDLocked()))
{
disable_reason = "Some part is locked for removal in another cuncurrent transaction";
return false;
}
}
/// This predicate is checked for the first part of each range.
/// (left = nullptr, right = "first part of partition")
if (!left)
{
if (currently_merging_mutating_parts.contains(right))
{
disable_reason = "Some part currently in a merging or mutating process";
return false;
}
else
return true;
}
if (currently_merging_mutating_parts.contains(left) || currently_merging_mutating_parts.contains(right))
{
disable_reason = "Some part currently in a merging or mutating process";
return false;
}
if (getCurrentMutationVersion(left, lock) != getCurrentMutationVersion(right, lock))
{
disable_reason = "Some parts have different mutation version";
return false;
}
if (!partsContainSameProjections(left, right))
{
disable_reason = "Some parts contains differ projections";
return false;
}
auto max_possible_level = getMaxLevelInBetween(left, right);
if (max_possible_level > std::max(left->info.level, right->info.level))
{
disable_reason = fmt::format("There is an outdated part in a gap between two active parts ({}, {}) with merge level {} higher than these active parts have", left->name, right->name, max_possible_level);
return false;
}
return true;
};
SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT;
auto is_background_memory_usage_ok = [](String & disable_reason) -> bool
{
if (canEnqueueBackgroundTask())
return true;
disable_reason = fmt::format("Current background tasks memory usage ({}) is more than the limit ({})",
formatReadableSizeWithBinarySuffix(background_memory_tracker.get()),
formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()));
return false;
};
if (partition_id.empty())
{
if (is_background_memory_usage_ok(out_disable_reason))
{
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool;
/// TTL requirements is much more strict than for regular merge, so
/// if regular not possible, than merge with ttl is not also not
/// possible.
if (max_source_parts_size > 0)
{
select_decision = merger_mutator.selectPartsToMerge(
future_part,
aggressive,
max_source_parts_size,
can_merge,
merge_with_ttl_allowed,
txn,
out_disable_reason);
}
else
out_disable_reason = "Current value of max_source_parts_size is zero";
}
}
else
{
while (true)
{
auto timeout_ms = getSettings()->lock_acquire_timeout_for_background_operations.totalMilliseconds();
auto timeout = std::chrono::milliseconds(timeout_ms);
if (!is_background_memory_usage_ok(out_disable_reason))
{
constexpr auto poll_interval = std::chrono::seconds(1);
Int64 attempts = timeout / poll_interval;
bool ok = false;
for (Int64 i = 0; i < attempts; ++i)
{
std::this_thread::sleep_for(poll_interval);
if (is_background_memory_usage_ok(out_disable_reason))
{
ok = true;
break;
}
}
if (!ok)
break;
}
select_decision = merger_mutator.selectAllPartsToMergeWithinPartition(
future_part, can_merge, partition_id, final, metadata_snapshot, txn, out_disable_reason, optimize_skip_merged_partitions);
/// If final - we will wait for currently processing merges to finish and continue.
if (final
&& select_decision != SelectPartsDecision::SELECTED
&& !currently_merging_mutating_parts.empty())
{
LOG_DEBUG(log, "Waiting for currently running merges ({} parts are merging right now) to perform OPTIMIZE FINAL",
currently_merging_mutating_parts.size());
if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for(lock, timeout))
{
out_disable_reason = fmt::format("Timeout ({} ms) while waiting for already running merges before running OPTIMIZE with FINAL", timeout_ms);
break;
}
}
else
break;
}
}
/// In case of final we need to know the decision of select in StorageMergeTree::merge
/// to treat NOTHING_TO_MERGE as successful merge (otherwise optimize final will be uncompleted)
if (select_decision_out)
*select_decision_out = select_decision;
if (select_decision != SelectPartsDecision::SELECTED)
{
if (!out_disable_reason.empty())
out_disable_reason += ". ";
out_disable_reason += "Cannot select parts for optimization";
return {};
}
/// Account TTL merge here to avoid exceeding the max_number_of_merges_with_ttl_in_pool limit
if (isTTLMergeType(future_part->merge_type))
getContext()->getMergeList().bookMergeWithTTL();
merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts), *this, metadata_snapshot, false);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(merging_tagger), std::make_shared<MutationCommands>());
}
bool StorageMergeTree::merge(
bool aggressive,
const String & partition_id,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
const MergeTreeTransactionPtr & txn,
String & out_disable_reason,
bool optimize_skip_merged_partitions)
{
auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto metadata_snapshot = getInMemoryMetadataPtr();
SelectPartsDecision select_decision;
MergeMutateSelectedEntryPtr merge_mutate_entry;
{
std::unique_lock lock(currently_processing_in_background_mutex);
if (merger_mutator.merges_blocker.isCancelled())
throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts");
merge_mutate_entry = selectPartsToMerge(
metadata_snapshot,
aggressive,
partition_id,
final,
out_disable_reason,
table_lock_holder,
lock,
txn,
optimize_skip_merged_partitions,
&select_decision);
}
/// If there is nothing to merge then we treat this merge as successful (needed for optimize final optimization)
if (select_decision == SelectPartsDecision::NOTHING_TO_MERGE)
return true;
if (!merge_mutate_entry)
return false;
/// Copying a vector of columns `deduplicate by columns.
IExecutableTask::TaskResultCallback f = [](bool) {};
auto task = std::make_shared<MergePlainMergeTreeTask>(
*this, metadata_snapshot, deduplicate, deduplicate_by_columns, cleanup, merge_mutate_entry, table_lock_holder, f);
task->setCurrentTransaction(MergeTreeTransactionHolder{}, MergeTreeTransactionPtr{txn});
executeHere(task);
return true;
}
bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & part) const
{
std::lock_guard background_processing_lock(currently_processing_in_background_mutex);
return currently_merging_mutating_parts.contains(part);
}
MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
const StorageMetadataPtr & metadata_snapshot, String & /* disable_reason */, TableLockHolder & /* table_lock_holder */,
std::unique_lock<std::mutex> & /*currently_processing_in_background_mutex_lock*/)
{
if (current_mutations_by_version.empty())
return {};
size_t max_source_part_size = merger_mutator.getMaxSourcePartSizeForMutation();
if (max_source_part_size == 0)
{
LOG_DEBUG(
log,
"Not enough idle threads to apply mutations at the moment. See settings 'number_of_free_entries_in_pool_to_execute_mutation' "
"and 'background_pool_size'");
return {};
}
size_t max_ast_elements = getContext()->getSettingsRef().max_expanded_ast_elements;
auto future_part = std::make_shared<FutureMergedMutatedPart>();
if (storage_settings.get()->assign_part_uuids)
future_part->uuid = UUIDHelpers::generateV4();
CurrentlyMergingPartsTaggerPtr tagger;
auto mutations_end_it = current_mutations_by_version.end();
for (const auto & part : getDataPartsVectorForInternalUsage())
{
if (currently_merging_mutating_parts.contains(part))
continue;
auto mutations_begin_it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
if (mutations_begin_it == mutations_end_it)
continue;
if (max_source_part_size < part->getBytesOnDisk())
{
LOG_DEBUG(
log,
"Current max source part size for mutation is {} but part size {}. Will not mutate part {} yet",
max_source_part_size,
part->getBytesOnDisk(),
part->name);
continue;
}
TransactionID first_mutation_tid = mutations_begin_it->second.tid;
MergeTreeTransactionPtr txn;
if (!first_mutation_tid.isPrehistoric())
{
/// Mutate visible parts only
/// NOTE Do not mutate visible parts in Outdated state, because it does not make sense:
/// mutation will fail anyway due to serialization error.
/// It's possible that both mutation and transaction are already finished,
/// because that part should not be mutated because it was not visible for that transaction.
if (!part->version.isVisible(first_mutation_tid.start_csn, first_mutation_tid))
continue;
txn = tryGetTransactionForMutation(mutations_begin_it->second, log);
if (!txn)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find transaction {} that has started mutation {} "
"that is going to be applied to part {}",
first_mutation_tid, mutations_begin_it->second.file_name, part->name);
}
auto commands = std::make_shared<MutationCommands>();
size_t current_ast_elements = 0;
auto last_mutation_to_apply = mutations_end_it;
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
/// Do not squash mutations from different transactions to be able to commit/rollback them independently.
if (first_mutation_tid != it->second.tid)
break;
size_t commands_size = 0;
MutationCommands commands_for_size_validation;
for (const auto & command : it->second.commands)
{
if (command.type != MutationCommand::Type::DROP_COLUMN
&& command.type != MutationCommand::Type::DROP_INDEX
&& command.type != MutationCommand::Type::DROP_PROJECTION
&& command.type != MutationCommand::Type::RENAME_COLUMN)
{
commands_for_size_validation.push_back(command);
}
else
{
commands_size += command.ast->size();
}
}
if (!commands_for_size_validation.empty())
{
try
{
auto fake_query_context = Context::createCopy(getContext());
fake_query_context->makeQueryContext();
fake_query_context->setCurrentQueryId("");
MutationsInterpreter::Settings settings(false);
MutationsInterpreter interpreter(
shared_from_this(), metadata_snapshot, commands_for_size_validation, fake_query_context, settings);
commands_size += interpreter.evaluateCommandsSize();
}
catch (...)
{
tryLogCurrentException(log);
MergeTreeMutationEntry & entry = it->second;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = getCurrentExceptionMessage(false);
/// NOTE we should not skip mutations, because exception may be retryable (e.g. MEMORY_LIMIT_EXCEEDED)
break;
}
}
if (current_ast_elements + commands_size >= max_ast_elements)
break;
const auto & single_mutation_commands = it->second.commands;
if (single_mutation_commands.containBarrierCommand())
{
if (commands->empty())
{
commands->insert(commands->end(), single_mutation_commands.begin(), single_mutation_commands.end());
last_mutation_to_apply = it;
}
break;
}
else
{
current_ast_elements += commands_size;
commands->insert(commands->end(), single_mutation_commands.begin(), single_mutation_commands.end());
last_mutation_to_apply = it;
}
}
assert(commands->empty() == (last_mutation_to_apply == mutations_end_it));
if (!commands->empty())
{
auto new_part_info = part->info;
new_part_info.mutation = last_mutation_to_apply->first;
future_part->parts.push_back(part);
future_part->part_info = new_part_info;
future_part->name = part->getNewName(new_part_info);
future_part->part_format = part->getFormat();
tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands, txn);
}
}
return {};
}
UInt32 StorageMergeTree::getMaxLevelInBetween(const DataPartPtr & left, const DataPartPtr & right) const
{
auto parts_lock = lockParts();
auto begin = data_parts_by_info.find(left->info);
if (begin == data_parts_by_info.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "unable to find left part, left part {}. It's a bug", left->name);
auto end = data_parts_by_info.find(right->info);
if (end == data_parts_by_info.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "unable to find right part, right part {}. It's a bug", right->name);
UInt32 level = 0;
for (auto it = begin++; it != end; ++it)
{
if (it == data_parts_by_info.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "left and right parts in the wrong order, left part {}, right part {}. It's a bug", left->name, right->name);
level = std::max(level, (*it)->info.level);
}
return level;
}
bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assignee)
{
if (shutdown_called)
return false;
assert(!isStaticStorage());
auto metadata_snapshot = getInMemoryMetadataPtr();
MergeMutateSelectedEntryPtr merge_entry, mutate_entry;
auto shared_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
MergeTreeTransactionHolder transaction_for_merge;
MergeTreeTransactionPtr txn;
if (transactions_enabled.load(std::memory_order_relaxed))
{
/// TODO Transactions: avoid beginning transaction if there is nothing to merge.
txn = TransactionLog::instance().beginTransaction();
transaction_for_merge = MergeTreeTransactionHolder{txn, /* autocommit = */ false};
}
bool has_mutations = false;
{
std::unique_lock lock(currently_processing_in_background_mutex);
if (merger_mutator.merges_blocker.isCancelled())
return false;
String out_reason;
merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, out_reason, shared_lock, lock, txn);
if (!merge_entry && !current_mutations_by_version.empty())
mutate_entry = selectPartsToMutate(metadata_snapshot, out_reason, shared_lock, lock);
has_mutations = !current_mutations_by_version.empty();
}
if (merge_entry)
{
auto task = std::make_shared<MergePlainMergeTreeTask>(*this, metadata_snapshot, /* deduplicate */ false, Names{}, /* cleanup */ false, merge_entry, shared_lock, common_assignee_trigger);
task->setCurrentTransaction(std::move(transaction_for_merge), std::move(txn));
bool scheduled = assignee.scheduleMergeMutateTask(task);
/// The problem that we already booked a slot for TTL merge, but a merge list entry will be created only in a prepare method
/// in MergePlainMergeTreeTask. So, this slot will never be freed.
if (!scheduled && isTTLMergeType(merge_entry->future_part->merge_type))
getContext()->getMergeList().cancelMergeWithTTL();
return scheduled;
}
if (mutate_entry)
{
/// We take new metadata snapshot here. It's because mutation commands can be executed only with metadata snapshot
/// which is equal or more fresh than commands themselves. In extremely rare case it can happen that we will have alter
/// in between we took snapshot above and selected commands. That is why we take new snapshot here.
auto task = std::make_shared<MutatePlainMergeTreeTask>(*this, getInMemoryMetadataPtr(), mutate_entry, shared_lock, common_assignee_trigger);
return assignee.scheduleMergeMutateTask(task);
}
if (has_mutations)
{
/// Notify in case of errors if no mutation was successfully selected.
/// Otherwise, notification will occur after any of mutations complete.
std::lock_guard lock(mutation_wait_mutex);
mutation_wait_event.notify_all();
}
bool scheduled = false;
if (auto lock = time_after_previous_cleanup_temporary_directories.compareAndRestartDeferred(
getSettings()->merge_tree_clear_old_temporary_directories_interval_seconds))
{
assignee.scheduleCommonTask(std::make_shared<ExecutableLambdaAdapter>(
[this, shared_lock] ()
{
return clearOldTemporaryDirectories(getSettings()->temporary_directories_lifetime.totalSeconds());
}, common_assignee_trigger, getStorageID()), /* need_trigger */ false);
scheduled = true;
}
if (auto lock = time_after_previous_cleanup_parts.compareAndRestartDeferred(
getSettings()->merge_tree_clear_old_parts_interval_seconds))
{
assignee.scheduleCommonTask(std::make_shared<ExecutableLambdaAdapter>(
[this, shared_lock] ()
{
/// All use relative_data_path which changes during rename
/// so execute under share lock.
size_t cleared_count = 0;
cleared_count += clearOldPartsFromFilesystem();
cleared_count += clearOldWriteAheadLogs();
cleared_count += clearOldMutations();
cleared_count += clearEmptyParts();
cleared_count += clearOldBrokenPartsFromDetachedDirectory();
return cleared_count;
/// TODO maybe take into account number of cleared objects when calculating backoff
}, common_assignee_trigger, getStorageID()), /* need_trigger */ false);
scheduled = true;
}
return scheduled;
}
size_t StorageMergeTree::getNumberOfUnfinishedMutations() const
{
std::unique_lock lock(currently_processing_in_background_mutex);
size_t count = 0;
for (const auto & [version, _] : current_mutations_by_version | std::views::reverse)
{
auto status = getIncompleteMutationsStatusUnlocked(version, lock, nullptr, true);
if (!status)
continue;
if (status->is_done)
break;
++count;
}
return count;
}
UInt64 StorageMergeTree::getCurrentMutationVersion(
const DataPartPtr & part,
std::unique_lock<std::mutex> & /*currently_processing_in_background_mutex_lock*/) const
{
auto it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
if (it == current_mutations_by_version.begin())
return 0;
--it;
return it->first;
}
size_t StorageMergeTree::clearOldMutations(bool truncate)
{
size_t finished_mutations_to_keep = truncate ? 0 : getSettings()->finished_mutations_to_keep;
std::vector<MergeTreeMutationEntry> mutations_to_delete;
{
std::lock_guard lock(currently_processing_in_background_mutex);
if (current_mutations_by_version.size() <= finished_mutations_to_keep)
return 0;
auto end_it = current_mutations_by_version.end();
auto begin_it = current_mutations_by_version.begin();
if (std::optional<Int64> min_version = getMinPartDataVersion())
end_it = current_mutations_by_version.upper_bound(*min_version);
size_t done_count = std::distance(begin_it, end_it);
if (done_count <= finished_mutations_to_keep)
return 0;
for (auto it = begin_it; it != end_it; ++it)
{
if (!it->second.tid.isPrehistoric())
{
done_count = std::distance(begin_it, it);
break;
}
}
if (done_count <= finished_mutations_to_keep)
return 0;
size_t to_delete_count = done_count - finished_mutations_to_keep;
auto it = begin_it;
for (size_t i = 0; i < to_delete_count; ++i)
{
const auto & tid = it->second.tid;
if (!tid.isPrehistoric() && !TransactionLog::getCSN(tid))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot remove mutation {}, because transaction {} is not committed. It's a bug",
it->first, tid);
mutations_to_delete.push_back(std::move(it->second));
it = current_mutations_by_version.erase(it);
}
}
for (auto & mutation : mutations_to_delete)
{
LOG_TRACE(log, "Removing mutation: {}", mutation.file_name);
mutation.removeFile();
}
return mutations_to_delete.size();
}
bool StorageMergeTree::optimize(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr local_context)
{
if (deduplicate)
{
if (deduplicate_by_columns.empty())
LOG_DEBUG(log, "DEDUPLICATE BY all columns");
else
LOG_DEBUG(log, "DEDUPLICATE BY ('{}')", fmt::join(deduplicate_by_columns, "', '"));
}
auto txn = local_context->getCurrentTransaction();
String disable_reason;
if (!partition && final)
{
if (cleanup && this->merging_params.mode != MergingParams::Mode::Replacing)
{
constexpr const char * message = "Cannot OPTIMIZE with CLEANUP table: {}";
disable_reason = "only ReplacingMergeTree can be CLEANUP";
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);
}
DataPartsVector data_parts = getVisibleDataPartsVector(local_context);
std::unordered_set<String> partition_ids;
for (const DataPartPtr & part : data_parts)
partition_ids.emplace(part->info.partition_id);
for (const String & partition_id : partition_ids)
{
if (!merge(
true,
partition_id,
true,
deduplicate,
deduplicate_by_columns,
cleanup,
txn,
disable_reason,
local_context->getSettingsRef().optimize_skip_merged_partitions))
{
constexpr auto message = "Cannot OPTIMIZE table: {}";
if (disable_reason.empty())
disable_reason = "unknown reason";
LOG_INFO(log, message, disable_reason);
if (local_context->getSettingsRef().optimize_throw_if_noop)
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);
return false;
}
}
}
else
{
String partition_id;
if (partition)
partition_id = getPartitionIDFromQuery(partition, local_context);
if (!merge(
true,
partition_id,
final,
deduplicate,
deduplicate_by_columns,
cleanup,
txn,
disable_reason,
local_context->getSettingsRef().optimize_skip_merged_partitions))
{
constexpr auto message = "Cannot OPTIMIZE table: {}";
if (disable_reason.empty())
disable_reason = "unknown reason";
LOG_INFO(log, message, disable_reason);
if (local_context->getSettingsRef().optimize_throw_if_noop)
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);
return false;
}
}
return true;
}
ActionLock StorageMergeTree::stopMergesAndWait()
{
/// TODO allow to stop merges in specific partition only (like it's done in ReplicatedMergeTree)
std::unique_lock lock(currently_processing_in_background_mutex);
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.merges_blocker.cancel();
while (!currently_merging_mutating_parts.empty())
{
LOG_DEBUG(log, "Waiting for currently running merges ({} parts are merging right now)",
currently_merging_mutating_parts.size());
if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for(
lock, std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC)))
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout while waiting for already running merges");
}
}
return merge_blocker;
}
MergeTreeDataPartPtr StorageMergeTree::outdatePart(MergeTreeTransaction * txn, const String & part_name, bool force, bool clear_without_timeout)
{
if (force)
{
/// Forcefully stop merges and make part outdated
auto merge_blocker = stopMergesAndWait();
auto parts_lock = lockParts();
auto part = getPartIfExistsUnlocked(part_name, {MergeTreeDataPartState::Active}, parts_lock);
if (!part)
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "Part {} not found, won't try to drop it.", part_name);
removePartsFromWorkingSet(txn, {part}, clear_without_timeout, &parts_lock);
return part;
}
else
{
/// Wait merges selector
std::unique_lock lock(currently_processing_in_background_mutex);
auto parts_lock = lockParts();
auto part = getPartIfExistsUnlocked(part_name, {MergeTreeDataPartState::Active}, parts_lock);
/// It's okay, part was already removed
if (!part)
return nullptr;
/// Part will be "removed" by merge or mutation, it's OK in case of some
/// background cleanup processes like removing of empty parts.
if (currently_merging_mutating_parts.contains(part))
return nullptr;
removePartsFromWorkingSet(txn, {part}, clear_without_timeout, &parts_lock);
return part;
}
}
void StorageMergeTree::dropPartNoWaitNoThrow(const String & part_name)
{
if (auto part = outdatePart(NO_TRANSACTION_RAW, part_name, /*force=*/ false, /*clear_without_timeout=*/ false))
{
if (deduplication_log)
{
deduplication_log->dropPart(part->info);
}
/// Need to destroy part objects before clearing them from filesystem.
part.reset();
clearOldPartsFromFilesystem();
}
/// Else nothing to do, part was removed in some different way
}
struct FutureNewEmptyPart
{
MergeTreePartInfo part_info;
MergeTreePartition partition;
std::string part_name;
StorageMergeTree::MutableDataPartPtr data_part;
};
using FutureNewEmptyParts = std::vector<FutureNewEmptyPart>;
Strings getPartsNames(const FutureNewEmptyParts & parts)
{
Strings part_names;
for (const auto & p : parts)
part_names.push_back(p.part_name);
return part_names;
}
FutureNewEmptyParts initCoverageWithNewEmptyParts(const DataPartsVector & old_parts)
{
FutureNewEmptyParts future_parts;
for (const auto & old_part : old_parts)
{
future_parts.emplace_back();
auto & new_part = future_parts.back();
new_part.part_info = old_part->info;
new_part.part_info.level += 1;
new_part.partition = old_part->partition;
new_part.part_name = old_part->getNewName(new_part.part_info);
}
return future_parts;
}
std::pair<StorageMergeTree::MutableDataPartsVector, std::vector<scope_guard>> createEmptyDataParts(
MergeTreeData & data, FutureNewEmptyParts & future_parts, const MergeTreeTransactionPtr & txn)
{
std::pair<StorageMergeTree::MutableDataPartsVector, std::vector<scope_guard>> data_parts;
for (auto & part: future_parts)
{
auto [new_data_part, tmp_dir_holder] = data.createEmptyPart(part.part_info, part.partition, part.part_name, txn);
data_parts.first.emplace_back(std::move(new_data_part));
data_parts.second.emplace_back(std::move(tmp_dir_holder));
}
return data_parts;
}
void StorageMergeTree::renameAndCommitEmptyParts(MutableDataPartsVector & new_parts, Transaction & transaction)
{
DataPartsVector covered_parts;
for (auto & part: new_parts)
{
DataPartsVector covered_parts_by_one_part = renameTempPartAndReplace(part, transaction);
if (covered_parts_by_one_part.size() > 1)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Part {} expected to cover not more then 1 part. "
"{} covered parts have been found. This is a bug.",
part->name, covered_parts_by_one_part.size());
std::move(covered_parts_by_one_part.begin(), covered_parts_by_one_part.end(), std::back_inserter(covered_parts));
}
LOG_INFO(log, "Remove {} parts by covering them with empty {} parts. With txn {}.",
covered_parts.size(), new_parts.size(), transaction.getTID());
transaction.commit();
/// Remove covered parts without waiting for old_parts_lifetime seconds.
for (auto & part: covered_parts)
part->remove_time.store(0, std::memory_order_relaxed);
if (deduplication_log)
for (const auto & part : covered_parts)
deduplication_log->dropPart(part->info);
}
void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr query_context, TableExclusiveLockHolder &)
{
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
waitForOutdatedPartsToBeLoaded();
auto merge_blocker = stopMergesAndWait();
Stopwatch watch;
ProfileEventsScope profile_events_scope;
auto txn = query_context->getCurrentTransaction();
if (txn)
{
auto data_parts_lock = lockParts();
auto parts_to_remove = getVisibleDataPartsVectorUnlocked(query_context, data_parts_lock);
removePartsFromWorkingSet(txn.get(), parts_to_remove, true, data_parts_lock);
LOG_INFO(log, "Removed {} parts: [{}]", parts_to_remove.size(), fmt::join(getPartsNames(parts_to_remove), ", "));
}
else
{
MergeTreeData::Transaction transaction(*this, txn.get());
auto operation_data_parts_lock = lockOperationsWithParts();
auto parts = getVisibleDataPartsVector(query_context);
auto future_parts = initCoverageWithNewEmptyParts(parts);
LOG_TEST(log, "Made {} empty parts in order to cover {} parts. Empty parts: {}, covered parts: {}. With txn {}",
future_parts.size(), parts.size(),
fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames(parts), ", "),
transaction.getTID());
auto [new_data_parts, tmp_dir_holders] = createEmptyDataParts(*this, future_parts, txn);
renameAndCommitEmptyParts(new_data_parts, transaction);
PartLog::addNewParts(query_context, PartLog::createPartLogEntries(new_data_parts, watch.elapsed(), profile_events_scope.getSnapshot()));
LOG_INFO(log, "Truncated table with {} parts by replacing them with new empty {} parts. With txn {}",
parts.size(), future_parts.size(),
transaction.getTID());
}
}
/// Old parts are needed to be destroyed before clearing them from filesystem.
clearOldMutations(true);
clearOldPartsFromFilesystem();
clearEmptyParts();
}
void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPtr query_context)
{
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = stopMergesAndWait();
Stopwatch watch;
ProfileEventsScope profile_events_scope;
/// It's important to create it outside of lock scope because
/// otherwise it can lock parts in destructor and deadlock is possible.
auto txn = query_context->getCurrentTransaction();
if (txn)
{
if (auto part = outdatePart(txn.get(), part_name, /*force=*/ true))
dropPartsImpl({part}, detach);
}
else
{
MergeTreeData::Transaction transaction(*this, txn.get());
auto operation_data_parts_lock = lockOperationsWithParts();
auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active});
if (!part)
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "Part {} not found, won't try to drop it.", part_name);
if (detach)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
String part_dir = part->getDataPartStorage().getPartDirectory();
LOG_INFO(log, "Detaching {}", part_dir);
auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir);
part->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {});
}
{
auto future_parts = initCoverageWithNewEmptyParts({part});
LOG_TEST(log, "Made {} empty parts in order to cover {} part. With txn {}",
fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames({part}), ", "),
transaction.getTID());
auto [new_data_parts, tmp_dir_holders] = createEmptyDataParts(*this, future_parts, txn);
renameAndCommitEmptyParts(new_data_parts, transaction);
PartLog::addNewParts(query_context, PartLog::createPartLogEntries(new_data_parts, watch.elapsed(), profile_events_scope.getSnapshot()));
const auto * op = detach ? "Detached" : "Dropped";
LOG_INFO(log, "{} {} part by replacing it with new empty {} part. With txn {}",
op, part->name, future_parts[0].part_name,
transaction.getTID());
}
}
}
/// Old part objects is needed to be destroyed before clearing them from filesystem.
clearOldMutations(true);
clearOldPartsFromFilesystem();
clearEmptyParts();
}
void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context)
{
{
const auto * partition_ast = partition->as<ASTPartition>();
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = stopMergesAndWait();
Stopwatch watch;
ProfileEventsScope profile_events_scope;
/// It's important to create it outside of lock scope because
/// otherwise it can lock parts in destructor and deadlock is possible.
auto txn = query_context->getCurrentTransaction();
if (txn)
{
DataPartsVector parts_to_remove;
{
auto data_parts_lock = lockParts();
if (partition_ast && partition_ast->all)
parts_to_remove = getVisibleDataPartsVectorUnlocked(query_context, data_parts_lock);
else
{
String partition_id = getPartitionIDFromQuery(partition, query_context, &data_parts_lock);
parts_to_remove = getVisibleDataPartsVectorInPartition(query_context, partition_id, data_parts_lock);
}
removePartsFromWorkingSet(txn.get(), parts_to_remove, true, data_parts_lock);
}
dropPartsImpl(std::move(parts_to_remove), detach);
}
else
{
MergeTreeData::Transaction transaction(*this, txn.get());
auto operation_data_parts_lock = lockOperationsWithParts();
DataPartsVector parts;
{
if (partition_ast && partition_ast->all)
parts = getVisibleDataPartsVector(query_context);
else
{
String partition_id = getPartitionIDFromQuery(partition, query_context);
parts = getVisibleDataPartsVectorInPartition(query_context, partition_id);
}
}
if (detach)
{
for (const auto & part : parts)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
String part_dir = part->getDataPartStorage().getPartDirectory();
LOG_INFO(log, "Detaching {}", part_dir);
auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir);
part->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {});
}
}
auto future_parts = initCoverageWithNewEmptyParts(parts);
LOG_TEST(log, "Made {} empty parts in order to cover {} parts. Empty parts: {}, covered parts: {}. With txn {}",
future_parts.size(), parts.size(),
fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames(parts), ", "),
transaction.getTID());
auto [new_data_parts, tmp_dir_holders] = createEmptyDataParts(*this, future_parts, txn);
renameAndCommitEmptyParts(new_data_parts, transaction);
PartLog::addNewParts(query_context, PartLog::createPartLogEntries(new_data_parts, watch.elapsed(), profile_events_scope.getSnapshot()));
const auto * op = detach ? "Detached" : "Dropped";
LOG_INFO(log, "{} partition with {} parts by replacing them with new empty {} parts. With txn {}",
op, parts.size(), future_parts.size(),
transaction.getTID());
}
}
/// Old parts are needed to be destroyed before clearing them from filesystem.
clearOldMutations(true);
clearOldPartsFromFilesystem();
clearEmptyParts();
}
void StorageMergeTree::dropPartsImpl(DataPartsVector && parts_to_remove, bool detach)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
if (detach)
{
/// If DETACH clone parts to detached/ directory
/// NOTE: no race with background cleanup until we hold pointers to parts
for (const auto & part : parts_to_remove)
{
String part_dir = part->getDataPartStorage().getPartDirectory();
LOG_INFO(log, "Detaching {}", part_dir);
auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir);
part->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {});
}
}
if (deduplication_log)
{
for (const auto & part : parts_to_remove)
deduplication_log->dropPart(part->info);
}
if (detach)
LOG_INFO(log, "Detached {} parts: [{}]", parts_to_remove.size(), fmt::join(getPartsNames(parts_to_remove), ", "));
else
LOG_INFO(log, "Removed {} parts: [{}]", parts_to_remove.size(), fmt::join(getPartsNames(parts_to_remove), ", "));
}
PartitionCommandsResultInfo StorageMergeTree::attachPartition(
const ASTPtr & partition, const StorageMetadataPtr & /* metadata_snapshot */,
bool attach_part, ContextPtr local_context)
{
PartitionCommandsResultInfo results;
PartsTemporaryRename renamed_parts(*this, "detached/");
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, local_context, renamed_parts);
for (size_t i = 0; i < loaded_parts.size(); ++i)
{
LOG_INFO(log, "Attaching part {} from {}", loaded_parts[i]->name, renamed_parts.old_and_new_names[i].new_name);
/// We should write version metadata on part creation to distinguish it from parts that were created without transaction.
auto txn = local_context->getCurrentTransaction();
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
loaded_parts[i]->version.setCreationTID(tid, nullptr);
loaded_parts[i]->storeVersionMetadata();
String old_name = renamed_parts.old_and_new_names[i].old_name;
/// It's important to create it outside of lock scope because
/// otherwise it can lock parts in destructor and deadlock is possible.
MergeTreeData::Transaction transaction(*this, local_context->getCurrentTransaction().get());
{
auto lock = lockParts();
fillNewPartName(loaded_parts[i], lock);
renameTempPartAndAdd(loaded_parts[i], transaction, lock);
transaction.commit(&lock);
}
renamed_parts.old_and_new_names[i].old_name.clear();
results.push_back(PartitionCommandResultInfo{
.partition_id = loaded_parts[i]->info.partition_id,
.part_name = loaded_parts[i]->name,
.old_part_name = old_name,
});
LOG_INFO(log, "Finished attaching part");
}
/// New parts with other data may appear in place of deleted parts.
local_context->clearCaches();
return results;
}
void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr local_context)
{
auto lock1 = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto lock2 = source_table->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto merges_blocker = stopMergesAndWait();
auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr();
auto my_metadata_snapshot = getInMemoryMetadataPtr();
Stopwatch watch;
ProfileEventsScope profile_events_scope;
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, my_metadata_snapshot);
String partition_id = getPartitionIDFromQuery(partition, local_context);
DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id);
MutableDataPartsVector dst_parts;
std::vector<scope_guard> dst_parts_locks;
static const String TMP_PREFIX = "tmp_replace_from_";
for (const DataPartPtr & src_part : src_parts)
{
if (!canReplacePartition(src_part))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot replace partition '{}' because part '{}' has inconsistent granularity with table",
partition_id, src_part->name);
/// This will generate unique name in scope of current server process.
Int64 temp_index = insert_increment.get();
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()};
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, my_metadata_snapshot, clone_params);
dst_parts.emplace_back(std::move(dst_part));
dst_parts_locks.emplace_back(std::move(part_lock));
}
/// ATTACH empty part set
if (!replace && dst_parts.empty())
return;
MergeTreePartInfo drop_range;
if (replace)
{
drop_range.partition_id = partition_id;
drop_range.min_block = 0;
drop_range.max_block = increment.get(); // there will be a "hole" in block numbers
drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max();
}
/// Atomically add new parts and remove old ones
try
{
{
/// Here we use the transaction just like RAII since rare errors in renameTempPartAndReplace() are possible
/// and we should be able to rollback already added (Precomitted) parts
Transaction transaction(*this, local_context->getCurrentTransaction().get());
auto data_parts_lock = lockParts();
/** It is important that obtaining new block number and adding that block to parts set is done atomically.
* Otherwise there is race condition - merge of blocks could happen in interval that doesn't yet contain new part.
*/
for (auto part : dst_parts)
{
fillNewPartName(part, data_parts_lock);
renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock);
}
/// Populate transaction
transaction.commit(&data_parts_lock);
/// If it is REPLACE (not ATTACH), remove all parts which max_block_number less then min_block_number of the first new block
if (replace)
removePartsInRangeFromWorkingSet(local_context->getCurrentTransaction().get(), drop_range, data_parts_lock);
}
/// Note: same elapsed time and profile events for all parts is used
PartLog::addNewParts(getContext(), PartLog::createPartLogEntries(dst_parts, watch.elapsed(), profile_events_scope.getSnapshot()));
}
catch (...)
{
PartLog::addNewParts(getContext(), PartLog::createPartLogEntries(dst_parts, watch.elapsed()), ExecutionStatus::fromCurrentException("", true));
throw;
}
}
void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr local_context)
{
auto lock1 = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto lock2 = dest_table->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto merges_blocker = stopMergesAndWait();
auto dest_table_storage = std::dynamic_pointer_cast<StorageMergeTree>(dest_table);
if (!dest_table_storage)
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Table {} supports movePartitionToTable only for MergeTree family of table engines. Got {}",
getStorageID().getNameForLogs(), dest_table->getName());
if (dest_table_storage->getStoragePolicy() != this->getStoragePolicy())
throw Exception(ErrorCodes::UNKNOWN_POLICY,
"Destination table {} should have the same storage policy of source table {}. {}: {}, {}: {}",
dest_table_storage->getStorageID().getNameForLogs(),
getStorageID().getNameForLogs(), getStorageID().getNameForLogs(),
this->getStoragePolicy()->getName(), dest_table_storage->getStorageID().getNameForLogs(),
dest_table_storage->getStoragePolicy()->getName());
auto dest_metadata_snapshot = dest_table->getInMemoryMetadataPtr();
auto metadata_snapshot = getInMemoryMetadataPtr();
Stopwatch watch;
ProfileEventsScope profile_events_scope;
MergeTreeData & src_data = dest_table_storage->checkStructureAndGetMergeTreeData(*this, metadata_snapshot, dest_metadata_snapshot);
String partition_id = getPartitionIDFromQuery(partition, local_context);
DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id);
MutableDataPartsVector dst_parts;
std::vector<scope_guard> dst_parts_locks;
static const String TMP_PREFIX = "tmp_move_from_";
for (const DataPartPtr & src_part : src_parts)
{
if (!dest_table_storage->canReplacePartition(src_part))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot move partition '{}' because part '{}' has inconsistent granularity with table",
partition_id, src_part->name);
/// This will generate unique name in scope of current server process.
Int64 temp_index = insert_increment.get();
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()};
auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, clone_params);
dst_parts.emplace_back(std::move(dst_part));
dst_parts_locks.emplace_back(std::move(part_lock));
}
/// empty part set
if (dst_parts.empty())
return;
/// Move new parts to the destination table. NOTE It doesn't look atomic.
try
{
{
Transaction transaction(*dest_table_storage, local_context->getCurrentTransaction().get());
auto src_data_parts_lock = lockParts();
auto dest_data_parts_lock = dest_table_storage->lockParts();
for (auto & part : dst_parts)
{
dest_table_storage->fillNewPartName(part, dest_data_parts_lock);
dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock);
}
removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), src_parts, true, src_data_parts_lock);
transaction.commit(&src_data_parts_lock);
}
clearOldPartsFromFilesystem();
/// Note: same elapsed time and profile events for all parts is used
PartLog::addNewParts(getContext(), PartLog::createPartLogEntries(dst_parts, watch.elapsed(), profile_events_scope.getSnapshot()));
}
catch (...)
{
PartLog::addNewParts(getContext(), PartLog::createPartLogEntries(dst_parts, watch.elapsed()), ExecutionStatus::fromCurrentException("", true));
throw;
}
}
ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type)
{
if (action_type == ActionLocks::PartsMerge)
return merger_mutator.merges_blocker.cancel();
else if (action_type == ActionLocks::PartsTTLMerge)
return merger_mutator.ttl_merges_blocker.cancel();
else if (action_type == ActionLocks::PartsMove)
return parts_mover.moves_blocker.cancel();
return {};
}
void StorageMergeTree::onActionLockRemove(StorageActionBlockType action_type)
{
if (action_type == ActionLocks::PartsMerge || action_type == ActionLocks::PartsTTLMerge)
background_operations_assignee.trigger();
else if (action_type == ActionLocks::PartsMove)
background_moves_assignee.trigger();
}
CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_context)
{
CheckResults results;
DataPartsVector data_parts;
if (const auto & check_query = query->as<ASTCheckQuery &>(); check_query.partition)
{
String partition_id = getPartitionIDFromQuery(check_query.partition, local_context);
data_parts = getVisibleDataPartsVectorInPartition(local_context, partition_id);
}
else
data_parts = getVisibleDataPartsVector(local_context);
for (auto & part : data_parts)
{
/// If the checksums file is not present, calculate the checksums and write them to disk.
static constexpr auto checksums_path = "checksums.txt";
if (part->isStoredOnDisk() && !part->getDataPartStorage().exists(checksums_path))
{
try
{
auto calculated_checksums = checkDataPart(part, false);
calculated_checksums.checkEqual(part->checksums, true);
auto & part_mutable = const_cast<IMergeTreeDataPart &>(*part);
part_mutable.writeChecksums(part->checksums, local_context->getWriteSettings());
part->checkMetadata();
results.emplace_back(part->name, true, "Checksums recounted and written to disk.");
}
catch (const Exception & ex)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
results.emplace_back(part->name, false, "Check of part finished with error: '" + ex.message() + "'");
}
}
else
{
try
{
checkDataPart(part, true);
part->checkMetadata();
results.emplace_back(part->name, true, "");
}
catch (const Exception & ex)
{
results.emplace_back(part->name, false, ex.message());
}
}
}
return results;
}
void StorageMergeTree::backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & partitions)
{
const auto & backup_settings = backup_entries_collector.getBackupSettings();
const auto & read_settings = backup_entries_collector.getReadSettings();
auto local_context = backup_entries_collector.getContext();
DataPartsVector data_parts;
if (partitions)
data_parts = getVisibleDataPartsVectorInPartitions(local_context, getPartitionIDsFromQuery(*partitions, local_context));
else
data_parts = getVisibleDataPartsVector(local_context);
Int64 min_data_version = std::numeric_limits<Int64>::max();
for (const auto & data_part : data_parts)
min_data_version = std::min(min_data_version, data_part->info.getDataVersion() + 1);
auto parts_backup_entries = backupParts(data_parts, data_path_in_backup, backup_settings, read_settings, local_context);
for (auto & part_backup_entries : parts_backup_entries)
backup_entries_collector.addBackupEntries(std::move(part_backup_entries.backup_entries));
backup_entries_collector.addBackupEntries(backupMutations(min_data_version, data_path_in_backup));
}
BackupEntries StorageMergeTree::backupMutations(UInt64 version, const String & data_path_in_backup) const
{
std::lock_guard lock(currently_processing_in_background_mutex);
fs::path mutations_path_in_backup = fs::path{data_path_in_backup} / "mutations";
BackupEntries backup_entries;
for (auto it = current_mutations_by_version.lower_bound(version); it != current_mutations_by_version.end(); ++it)
backup_entries.emplace_back(mutations_path_in_backup / fmt::format("{:010}.txt", it->first), it->second.backup());
return backup_entries;
}
void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
{
for (auto part : parts)
{
/// It's important to create it outside of lock scope because
/// otherwise it can lock parts in destructor and deadlock is possible.
MergeTreeData::Transaction transaction(*this, NO_TRANSACTION_RAW);
{
auto lock = lockParts();
fillNewPartName(part, lock);
renameTempPartAndAdd(part, transaction, lock);
transaction.commit(&lock);
}
}
}
std::map<int64_t, MutationCommands> StorageMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const
{
std::lock_guard lock(currently_processing_in_background_mutex);
UInt64 part_data_version = part->info.getDataVersion();
std::map<int64_t, MutationCommands> result;
for (const auto & [mutation_version, entry] : current_mutations_by_version | std::views::reverse)
{
if (mutation_version > part_data_version)
result[mutation_version] = entry.commands;
else
break;
}
return result;
}
void StorageMergeTree::startBackgroundMovesIfNeeded()
{
if (areBackgroundMovesNeeded())
background_moves_assignee.start();
}
std::unique_ptr<MergeTreeSettings> StorageMergeTree::getDefaultSettings() const
{
return std::make_unique<MergeTreeSettings>(getContext()->getMergeTreeSettings());
}
PreparedSetsCachePtr StorageMergeTree::getPreparedSetsCache(Int64 mutation_id)
{
auto l = std::lock_guard(mutation_prepared_sets_cache_mutex);
/// Cleanup stale entries where the shared_ptr is expired.
while (!mutation_prepared_sets_cache.empty())
{
auto it = mutation_prepared_sets_cache.begin();
if (it->second.lock())
break;
mutation_prepared_sets_cache.erase(it);
}
/// Look up an existing entry.
auto it = mutation_prepared_sets_cache.find(mutation_id);
if (it != mutation_prepared_sets_cache.end())
{
/// If the entry is still alive, return it.
auto existing_set_cache = it->second.lock();
if (existing_set_cache)
return existing_set_cache;
}
/// Create new entry.
auto cache = std::make_shared<PreparedSetsCache>();
mutation_prepared_sets_cache[mutation_id] = cache;
return cache;
}
void StorageMergeTree::fillNewPartName(MutableDataPartPtr & part, DataPartsLock &)
{
part->info.min_block = part->info.max_block = increment.get();
part->info.mutation = 0;
part->setName(part->getNewName(part->info));
}
}