mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-13 09:52:38 +00:00
remove strange multimap from mutations
This commit is contained in:
parent
6000872658
commit
52885db5d7
@ -1,5 +1,6 @@
|
||||
#include <Storages/MergeTree/MergeTreeMutationEntry.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
@ -10,7 +11,39 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk_, const String & path_prefix_, Int64 tmp_number)
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
String MergeTreeMutationEntry::versionToFileName(UInt64 block_number_)
|
||||
{
|
||||
assert(block_number_);
|
||||
return fmt::format("mutation_{}.txt", block_number_);
|
||||
}
|
||||
|
||||
UInt64 MergeTreeMutationEntry::tryParseFileName(const String & file_name_)
|
||||
{
|
||||
UInt64 maybe_block_number = 0;
|
||||
ReadBufferFromString file_name_buf(file_name_);
|
||||
if (!checkString("mutation_", file_name_buf))
|
||||
return 0;
|
||||
if (!tryReadIntText(maybe_block_number, file_name_buf))
|
||||
return 0;
|
||||
if (!checkString(".txt", file_name_buf))
|
||||
return 0;
|
||||
assert(maybe_block_number);
|
||||
return maybe_block_number;
|
||||
}
|
||||
|
||||
UInt64 MergeTreeMutationEntry::parseFileName(const String & file_name_)
|
||||
{
|
||||
if (UInt64 maybe_block_number = tryParseFileName(file_name_))
|
||||
return maybe_block_number;
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse mutation version from file name, expected 'mutation_<UInt64>.txt', got '{}'", file_name_);
|
||||
}
|
||||
|
||||
MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk_, const String & path_prefix_, UInt64 tmp_number)
|
||||
: create_time(time(nullptr))
|
||||
, commands(std::move(commands_))
|
||||
, disk(std::move(disk_))
|
||||
@ -35,10 +68,11 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskP
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeMutationEntry::commit(Int64 block_number_)
|
||||
void MergeTreeMutationEntry::commit(UInt64 block_number_)
|
||||
{
|
||||
assert(block_number_);
|
||||
block_number = block_number_;
|
||||
String new_file_name = "mutation_" + toString(block_number) + ".txt";
|
||||
String new_file_name = versionToFileName(block_number);
|
||||
disk->moveFile(path_prefix + file_name, path_prefix + new_file_name);
|
||||
is_temp = false;
|
||||
file_name = new_file_name;
|
||||
@ -62,10 +96,7 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(DiskPtr disk_, const String & pat
|
||||
, file_name(file_name_)
|
||||
, is_temp(false)
|
||||
{
|
||||
ReadBufferFromString file_name_buf(file_name);
|
||||
file_name_buf >> "mutation_" >> block_number >> ".txt";
|
||||
assertEOF(file_name_buf);
|
||||
|
||||
block_number = parseFileName(file_name);
|
||||
auto buf = disk->readFile(path_prefix + file_name);
|
||||
|
||||
*buf >> "format version: 1\n";
|
||||
|
@ -21,7 +21,7 @@ struct MergeTreeMutationEntry
|
||||
String file_name;
|
||||
bool is_temp = false;
|
||||
|
||||
Int64 block_number = 0;
|
||||
UInt64 block_number = 0;
|
||||
|
||||
String latest_failed_part;
|
||||
MergeTreePartInfo latest_failed_part_info;
|
||||
@ -29,15 +29,19 @@ struct MergeTreeMutationEntry
|
||||
String latest_fail_reason;
|
||||
|
||||
/// Create a new entry and write it to a temporary file.
|
||||
MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk, const String & path_prefix_, Int64 tmp_number);
|
||||
MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk, const String & path_prefix_, UInt64 tmp_number);
|
||||
MergeTreeMutationEntry(const MergeTreeMutationEntry &) = delete;
|
||||
MergeTreeMutationEntry(MergeTreeMutationEntry &&) = default;
|
||||
|
||||
/// Commit entry and rename it to a permanent file.
|
||||
void commit(Int64 block_number_);
|
||||
void commit(UInt64 block_number_);
|
||||
|
||||
void removeFile();
|
||||
|
||||
static String versionToFileName(UInt64 block_number_);
|
||||
static UInt64 tryParseFileName(const String & file_name_);
|
||||
static UInt64 parseFileName(const String & file_name_);
|
||||
|
||||
/// Load an existing entry.
|
||||
MergeTreeMutationEntry(DiskPtr disk_, const String & path_prefix_, const String & file_name_);
|
||||
|
||||
|
@ -412,8 +412,9 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, String
|
||||
version = increment.get();
|
||||
entry.commit(version);
|
||||
mutation_file_name = entry.file_name;
|
||||
auto insertion = current_mutations_by_id.emplace(mutation_file_name, std::move(entry));
|
||||
current_mutations_by_version.emplace(version, insertion.first->second);
|
||||
bool inserted = current_mutations_by_version.try_emplace(version, std::move(entry)).second;
|
||||
if (!inserted)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", version);
|
||||
|
||||
LOG_INFO(log, "Added mutation: {}", mutation_file_name);
|
||||
}
|
||||
@ -618,16 +619,18 @@ std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() cons
|
||||
CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
|
||||
{
|
||||
LOG_TRACE(log, "Killing mutation {}", mutation_id);
|
||||
UInt64 mutation_version = MergeTreeMutationEntry::tryParseFileName(mutation_id);
|
||||
if (!mutation_version)
|
||||
return CancellationCode::NotFound;
|
||||
|
||||
std::optional<MergeTreeMutationEntry> to_kill;
|
||||
{
|
||||
std::lock_guard lock(currently_processing_in_background_mutex);
|
||||
auto it = current_mutations_by_id.find(mutation_id);
|
||||
if (it != current_mutations_by_id.end())
|
||||
auto it = current_mutations_by_version.find(mutation_version);
|
||||
if (it != current_mutations_by_version.end())
|
||||
{
|
||||
to_kill.emplace(std::move(it->second));
|
||||
current_mutations_by_id.erase(it);
|
||||
current_mutations_by_version.erase(to_kill->block_number);
|
||||
current_mutations_by_version.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
@ -668,10 +671,11 @@ void StorageMergeTree::loadMutations()
|
||||
if (startsWith(it->name(), "mutation_"))
|
||||
{
|
||||
MergeTreeMutationEntry entry(disk, path, it->name());
|
||||
Int64 block_number = entry.block_number;
|
||||
UInt64 block_number = entry.block_number;
|
||||
LOG_DEBUG(log, "Loading mutation: {} entry, commands size: {}", it->name(), entry.commands.size());
|
||||
auto insertion = current_mutations_by_id.emplace(it->name(), std::move(entry));
|
||||
current_mutations_by_version.emplace(block_number, insertion.first->second);
|
||||
auto inserted = current_mutations_by_version.try_emplace(block_number, std::move(entry)).second;
|
||||
if (!inserted)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", block_number);
|
||||
}
|
||||
else if (startsWith(it->name(), "tmp_mutation_"))
|
||||
{
|
||||
@ -1111,7 +1115,6 @@ size_t StorageMergeTree::clearOldMutations(bool truncate)
|
||||
for (size_t i = 0; i < to_delete_count; ++i)
|
||||
{
|
||||
mutations_to_delete.push_back(std::move(it->second));
|
||||
current_mutations_by_id.erase(mutations_to_delete.back().file_name);
|
||||
it = current_mutations_by_version.erase(it);
|
||||
}
|
||||
}
|
||||
|
@ -131,9 +131,7 @@ private:
|
||||
/// This set have to be used with `currently_processing_in_background_mutex`.
|
||||
DataParts currently_merging_mutating_parts;
|
||||
|
||||
|
||||
std::map<String, MergeTreeMutationEntry> current_mutations_by_id;
|
||||
std::multimap<Int64, MergeTreeMutationEntry &> current_mutations_by_version;
|
||||
std::map<UInt64, MergeTreeMutationEntry> current_mutations_by_version;
|
||||
|
||||
std::atomic<bool> shutdown_called {false};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user