execute mutation merge entries

This commit is contained in:
Alexey Zatelepin 2018-04-20 22:11:20 +03:00
parent 0b784a197e
commit 8d37c71f41
5 changed files with 165 additions and 20 deletions

View File

@ -100,6 +100,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeMutation::executeOnPart(const MergeTr
in->readSuffix();
out.writeSuffixAndFinalizePart(new_data_part);
if (!new_data_part->rows_count)
return nullptr;
return new_data_part;
}

View File

@ -976,6 +976,27 @@ bool ReplicatedMergeTreeQueue::canMutatePart(const MergeTreePartInfo & part_info
return true;
}
MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
const MergeTreePartInfo & part_info, Int64 desired_mutation_version) const
{
std::lock_guard lock(mutex);
Int64 data_version = part_info.version ? part_info.version : part_info.min_block;
auto begin = mutations_by_block_number.upper_bound(data_version);
auto end = mutations_by_block_number.find(desired_mutation_version);
if (end == mutations_by_block_number.end())
throw Exception("Mutation with version " + toString(desired_mutation_version) + " not found",
ErrorCodes::LOGICAL_ERROR);
++end;
std::vector<MutationCommand> commands;
for (auto it = begin; it != end; ++it)
commands.insert(commands.end(), it->second->commands.commands.begin(), it->second->commands.commands.end());
return MutationCommands{commands};
}
void ReplicatedMergeTreeQueue::disableMergesInRange(const String & part_name)
{
virtual_parts.add(part_name);

View File

@ -217,6 +217,8 @@ public:
bool canMutatePart(const MergeTreePartInfo & part_info, Int64 & desired_mutation_version) const;
MutationCommands getMutationCommands(const MergeTreePartInfo & part_info, Int64 desired_mutation_version) const;
/// Prohibit merges in the specified range.
void disableMergesInRange(const String & part_name);

View File

@ -11,6 +11,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <Storages/MergeTree/MergeTreeMutation.h>
#include <Databases/IDatabase.h>
@ -1042,7 +1043,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
}
if (entry.type == LogEntry::GET_PART ||
entry.type == LogEntry::MERGE_PARTS)
entry.type == LogEntry::MERGE_PARTS ||
entry.type == LogEntry::MUTATE_PART)
{
/// If we already have this part or a part covering it, we do not need to do anything.
/// The part may be still in the PreCommitted -> Committed transition so we first search
@ -1079,7 +1081,11 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
}
else if (entry.type == LogEntry::MERGE_PARTS)
{
tryExecuteMerge(entry, do_fetch);
do_fetch = !tryExecuteMerge(entry);
}
else if (entry.type == LogEntry::MUTATE_PART)
{
do_fetch = !tryExecutePartMutation(entry);
}
else
{
@ -1093,12 +1099,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
}
void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTree::LogEntry & entry, bool & do_fetch)
bool StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTree::LogEntry & entry)
{
/// The caller has already decided to make the fetch
if (do_fetch)
return;
// Log source part names just in case
{
std::stringstream log_message;
@ -1133,8 +1135,8 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
if (!have_all_parts)
{
/// If you do not have all the necessary parts, try to take some already merged part from someone.
do_fetch = true;
LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
return false;
}
else if (entry.create_time + data.settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr))
{
@ -1150,15 +1152,12 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
String replica = findReplicaHavingPart(entry.new_part_name, true); /// NOTE excessive ZK requests for same data later, may remove.
if (!replica.empty())
{
do_fetch = true;
LOG_DEBUG(log, "Prefer to fetch " << entry.new_part_name << " from replica " << replica);
return false;
}
}
}
if (do_fetch)
return;
/// Start to make the main work
size_t estimated_space_for_merge = MergeTreeDataMerger::estimateDiskSpaceForMerge(parts);
@ -1169,7 +1168,6 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
auto table_lock = lockStructure(false, __PRETTY_FUNCTION__);
MergeList::EntryPtr merge_entry = context.getMergeList().insert(database_name, table_name, entry.new_part_name, parts);
MergeTreeData::Transaction transaction;
size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io;
MergeTreeDataMerger::FuturePart future_merged_part(parts);
@ -1179,11 +1177,11 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
+ entry.new_part_name + "`", ErrorCodes::BAD_DATA_PART_NAME);
}
/// Logging
Stopwatch stopwatch;
ExecutionStatus execution_status;
MergeTreeData::Transaction transaction;
MergeTreeData::MutableDataPartPtr part;
/// Logging
Stopwatch stopwatch;
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
try
@ -1242,7 +1240,6 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
{
if (MergeTreeDataPartChecksums::isBadChecksumsErrorCode(e.code()))
{
do_fetch = true;
transaction.rollback();
ProfileEvents::increment(ProfileEvents::DataAfterMergeDiffersFromReplica);
@ -1261,13 +1258,14 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
"We will download merged part from replica to force byte-identical result.");
write_part_log(ExecutionStatus::fromCurrentException());
return;
return false;
}
throw;
}
/** Removing old chunks from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts.
/** Removing old parts from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts.
*/
/** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
@ -1277,6 +1275,125 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
write_part_log({});
return true;
}
catch (...)
{
write_part_log(ExecutionStatus::fromCurrentException());
throw;
}
}
bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedMergeTree::LogEntry & entry)
{
const String & source_part_name = entry.parts_to_merge.at(0);
LOG_TRACE(log, "Executing log entry to mutate part " << source_part_name << " to " << entry.new_part_name);
MergeTreeData::DataPartPtr source_part = data.getActiveContainingPart(source_part_name);
if (!source_part)
{
LOG_DEBUG(log, "Source part for " << entry.new_part_name << " is not ready; will try to fetch it instead");
return false;
}
if (source_part->name != source_part_name)
{
throw Exception("Part " + source_part_name + " is covered by " + source_part->name
+ " but should be mutated to " + entry.new_part_name + ". This is a bug.",
ErrorCodes::LOGICAL_ERROR);
}
/// TODO - some better heuristic?
size_t estimated_space_for_result = source_part->bytes_on_disk;
if (entry.create_time + data.settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)
&& estimated_space_for_result >= data.settings.prefer_fetch_merged_part_size_threshold)
{
/// If entry is old enough, and have enough size, and some replica has the desired part,
/// then prefer fetching from replica.
String replica = findReplicaHavingPart(entry.new_part_name, true); /// NOTE excessive ZK requests for same data later, may remove.
if (!replica.empty())
{
LOG_DEBUG(log, "Prefer to fetch " << entry.new_part_name << " from replica " << replica);
return false;
}
}
MergeTreePartInfo new_part_info = MergeTreePartInfo::fromPartName(
entry.new_part_name, data.format_version);
MutationCommands commands = queue.getMutationCommands(source_part->info, new_part_info.version);
/// Can throw an exception.
DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_result);
auto table_lock = lockStructure(false, __PRETTY_FUNCTION__);
MergeTreeData::MutableDataPartPtr new_part;
MergeTreeData::Transaction transaction;
/// Logging
Stopwatch stopwatch;
auto write_part_log = [&] (const ExecutionStatus & /* execution_status */)
{
try
{
/// TODO
return;
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
};
try
{
MergeTreeMutation mutation_executor(data, new_part_info.version, commands.commands);
new_part = mutation_executor.executeOnPart(source_part, context);
if (new_part)
data.renameTempPartAndReplace(new_part, nullptr, &transaction);
else
throw Exception("The part was fully deleted, this case is not implemented yet",
ErrorCodes::NOT_IMPLEMENTED); /// TODO
try
{
checkPartChecksumsAndCommit(transaction, new_part);
}
catch (const Exception & e)
{
if (MergeTreeDataPartChecksums::isBadChecksumsErrorCode(e.code()))
{
transaction.rollback();
ProfileEvents::increment(ProfileEvents::DataAfterMergeDiffersFromReplica);
LOG_ERROR(log, getCurrentExceptionMessage(false) << ". "
"Data after mutation is not byte-identical to data on another replicas. "
"We will download merged part from replica to force byte-identical result.");
write_part_log(ExecutionStatus::fromCurrentException());
return false;
}
throw;
}
/// TODO immediately delete the old part so that it doesn't waste space.
/** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
* This is not a problem, because in this case the entry will remain in the queue, and we will try again.
*/
merge_selecting_event.set();
/// TODO metrics (ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);)
write_part_log({});
return true;
}
catch (...)
{

View File

@ -359,7 +359,9 @@ private:
void executeDropRange(const LogEntry & entry);
/// Do the merge or recommend to make the fetch instead of the merge
void tryExecuteMerge(const LogEntry & entry, bool & do_fetch);
bool tryExecuteMerge(const LogEntry & entry);
bool tryExecutePartMutation(const LogEntry & entry);
bool executeFetch(const LogEntry & entry);