execute part mutations in a background pool [#CLICKHOUSE-3748]

This commit is contained in:
Alexey Zatelepin 2018-07-09 18:34:11 +03:00 committed by alexey-milovidov
parent 88300258a7
commit 78b83d78f0
4 changed files with 134 additions and 89 deletions

View File

@ -27,7 +27,7 @@ void MergeTreeBlockOutputStream::write(const Block & block)
PartLog::addNewPart(storage.context, part, watch.elapsed());
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
storage.merge_task_handle->wake();
storage.background_task_handle->wake();
}
}

View File

@ -78,7 +78,7 @@ StorageMergeTree::StorageMergeTree(
void StorageMergeTree::startup()
{
merge_task_handle = background_pool.addTask([this] { return mergeTask(); });
background_task_handle = background_pool.addTask([this] { return backgroundTask(); });
data.clearOldPartsFromFilesystem();
@ -95,8 +95,8 @@ void StorageMergeTree::shutdown()
return;
shutdown_called = true;
merger_mutator.actions_blocker.cancelForever();
if (merge_task_handle)
background_pool.removeTask(merge_task_handle);
if (background_task_handle)
background_pool.removeTask(background_task_handle);
}
@ -285,89 +285,22 @@ struct CurrentlyMergingPartsTagger
};
void StorageMergeTree::mutate(const MutationCommands & commands, const Context & context)
void StorageMergeTree::mutate(const MutationCommands & commands, const Context &)
{
Int64 version;
decltype(current_mutations_by_version.end()) mutation_it;
{
std::lock_guard lock(currently_merging_mutex);
version = increment.get();
Int64 version = increment.get();
MergeTreeMutationEntry entry;
entry.create_time = time(nullptr);
entry.block_number = version;
entry.commands = commands;
mutation_it = current_mutations_by_version.emplace(version, std::move(entry));
current_mutations_by_version.emplace(version, std::move(entry));
}
size_t parts_mutated = 0;
while (true)
{
std::optional<CurrentlyMergingPartsTagger> tagger;
MergeTreeDataMergerMutator::FuturePart future_mutated_part;
bool some_locked = false;
{
std::lock_guard lock(currently_merging_mutex);
Int64 prev_version = 0;
if (mutation_it != current_mutations_by_version.begin())
prev_version = std::prev(mutation_it)->first;
for (const auto & part : data.getDataPartsVector())
{
Int64 part_mutation_version = getCurrentMutationVersion(part, lock);
if (part_mutation_version >= version)
continue;
if (part_mutation_version < prev_version)
{
LOG_TRACE(log,
"Part " << part->name << " has mutation version " << part_mutation_version
<< ", will wait until it has version " << prev_version);
some_locked = true;
continue;
}
if (currently_merging.count(part))
{
LOG_TRACE(log, "Part " << part->name << " is currently locked, will wait.");
some_locked = true;
continue;
}
auto new_part_info = part->info;
new_part_info.mutation = version;
future_mutated_part.parts.push_back(part);
future_mutated_part.part_info = new_part_info;
future_mutated_part.name = part->getNewName(new_part_info);
tagger.emplace({part}, part->bytes_on_disk, *this);
break;
}
}
if (!future_mutated_part.parts.empty())
{
auto new_part = merger_mutator.mutatePartToTemporaryPart(
future_mutated_part, mutation_it->second.commands, context);
data.renameTempPartAndReplace(new_part);
++parts_mutated;
}
else if (some_locked)
sleep(1);
else
break;
}
{
std::lock_guard lock(currently_merging_mutex);
current_mutations_by_version.erase(mutation_it);
}
LOG_TRACE(log, "Finished, mutated " << parts_mutated << " parts.");
background_task_handle->wake();
}
@ -420,17 +353,8 @@ bool StorageMergeTree::merge(
bool deduplicate,
String * out_disable_reason)
{
/// Clear old parts. It does not matter to do it more frequently than each second.
if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
{
data.clearOldPartsFromFilesystem();
data.clearOldTemporaryDirectories();
}
auto structure_lock = lockStructure(true, __PRETTY_FUNCTION__);
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
MergeTreeDataMergerMutator::FuturePart future_part;
/// You must call destructor with unlocked `currently_merging_mutex`.
@ -455,6 +379,7 @@ bool StorageMergeTree::merge(
}
else
{
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
selected = merger_mutator.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason);
}
@ -531,7 +456,114 @@ bool StorageMergeTree::merge(
}
bool StorageMergeTree::mergeTask()
bool StorageMergeTree::tryMutatePart()
{
auto structure_lock = lockStructure(true, __PRETTY_FUNCTION__);
MergeTreeDataMergerMutator::FuturePart future_part;
MutationCommands commands;
/// You must call destructor with unlocked `currently_merging_mutex`.
std::optional<CurrentlyMergingPartsTagger> tagger;
{
auto disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
std::lock_guard<std::mutex> lock(currently_merging_mutex);
if (current_mutations_by_version.empty())
return false;
auto mutations_end_it = current_mutations_by_version.end();
for (const auto & part : data.getDataPartsVector())
{
if (currently_merging.count(part))
continue;
auto mutations_begin_it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
if (mutations_begin_it == mutations_end_it)
continue;
auto estimated_needed_space = MergeTreeDataMergerMutator::estimateNeededDiskSpace({part});
if (estimated_needed_space > disk_space)
continue;
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end());
auto new_part_info = part->info;
new_part_info.mutation = current_mutations_by_version.rbegin()->first;
future_part.parts.push_back(part);
future_part.part_info = new_part_info;
future_part.name = part->getNewName(new_part_info);
tagger.emplace({part}, estimated_needed_space, *this);
break;
}
}
if (!tagger)
return false;
Stopwatch stopwatch;
MergeTreeData::MutableDataPartPtr new_part;
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
try
{
auto part_log = context.getPartLog(database_name);
if (!part_log)
return;
PartLogElement part_log_elem;
part_log_elem.event_type = PartLogElement::MUTATE_PART;
part_log_elem.error = static_cast<UInt16>(execution_status.code);
part_log_elem.exception = execution_status.message;
part_log_elem.event_time = time(nullptr);
part_log_elem.duration_ms = stopwatch.elapsed() / 1000000;
part_log_elem.database_name = database_name;
part_log_elem.table_name = table_name;
part_log_elem.part_name = future_part.name;
if (new_part)
{
part_log_elem.bytes_compressed_on_disk = new_part->bytes_on_disk;
part_log_elem.rows = new_part->rows_count;
}
part_log_elem.source_part_names.reserve(future_part.parts.size());
for (const auto & source_part : future_part.parts)
part_log_elem.source_part_names.push_back(source_part->name);
part_log->add(part_log_elem);
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
};
try
{
new_part = merger_mutator.mutatePartToTemporaryPart(future_part, commands, context);
data.renameTempPartAndReplace(new_part);
write_part_log({});
}
catch (...)
{
write_part_log(ExecutionStatus::fromCurrentException());
throw;
}
return true;
}
bool StorageMergeTree::backgroundTask()
{
if (shutdown_called)
return false;
@ -541,8 +573,19 @@ bool StorageMergeTree::mergeTask()
try
{
/// Clear old parts. It is unnecessary to do it more than once a second.
if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
{
data.clearOldPartsFromFilesystem();
data.clearOldTemporaryDirectories();
}
size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io;
return merge(aio_threshold, false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/); ///TODO: read deduplicate option from table config
///TODO: read deduplicate option from table config
if (merge(aio_threshold, false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/))
return true;
return tryMutatePart();
}
catch (Exception & e)
{

View File

@ -120,7 +120,7 @@ private:
std::atomic<bool> shutdown_called {false};
BackgroundProcessingPool::TaskHandle merge_task_handle;
BackgroundProcessingPool::TaskHandle background_task_handle;
/** Determines what parts should be merged and merges it.
* If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query).
@ -129,7 +129,10 @@ private:
bool merge(size_t aio_threshold, bool aggressive, const String & partition_id, bool final, bool deduplicate,
String * out_disable_reason = nullptr);
bool mergeTask();
/// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true.
bool tryMutatePart();
bool backgroundTask();
Int64 getCurrentMutationVersion(
const MergeTreeData::DataPartPtr & part,

View File

@ -1155,7 +1155,6 @@ void StorageReplicatedMergeTree::writePartLog(
if (merge_entry)
{
part_log_elem.rows_read = (*merge_entry)->bytes_read_uncompressed;
part_log_elem.bytes_read_uncompressed = (*merge_entry)->bytes_read_uncompressed;