execute mutations synchronously for StorageMergeTree [#CLICKHOUSE-3748]

This commit is contained in:
Alexey Zatelepin 2018-03-30 22:25:37 +03:00 committed by alexey-milovidov
parent ad3d882b67
commit ce17868a5c
5 changed files with 150 additions and 19 deletions

View File

@ -425,7 +425,10 @@ Int64 MergeTreeData::getMaxBlockNumber()
Int64 max_block_num = 0;
for (const DataPartPtr & part : data_parts_by_info)
{
max_block_num = std::max(max_block_num, part->info.max_block);
max_block_num = std::max(max_block_num, part->info.mutation);
}
return max_block_num;
}

View File

@ -0,0 +1,19 @@
#pragma once
#include <Core/Types.h>
#include <Storages/MutationCommands.h>
namespace DB
{
/// A mutation entry for non-replicated MergeTree storage engines.
struct MergeTreeMutationEntry
{
time_t create_time = 0;
Int64 block_number;
MutationCommands commands;
};
}

View File

@ -61,7 +61,7 @@ StorageMergeTree::StorageMergeTree(
context_, primary_expr_ast_, secondary_sorting_expr_list_, date_column_name, partition_expr_ast_,
sampling_expression_, merging_params_,
settings_, false, attach),
reader(data), writer(data), merger(data, context.getBackgroundPool()),
reader(data), writer(data), merger_mutator(data, context.getBackgroundPool()),
log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)"))
{
if (path_.empty())
@ -94,7 +94,7 @@ void StorageMergeTree::shutdown()
if (shutdown_called)
return;
shutdown_called = true;
merger.actions_blocker.cancelForever();
merger_mutator.actions_blocker.cancelForever();
if (merge_task_handle)
background_pool.removeTask(merge_task_handle);
}
@ -139,7 +139,7 @@ void StorageMergeTree::truncate(const ASTPtr &)
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger.actions_blocker.cancel();
auto merge_blocker = merger_mutator.actions_blocker.cancel();
/// NOTE: It's assumed that this method is called under lockForAlter.
@ -173,7 +173,7 @@ void StorageMergeTree::alter(
const Context & context)
{
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
auto merge_blocker = merger.actions_blocker.cancel();
auto merge_blocker = merger_mutator.actions_blocker.cancel();
auto table_soft_lock = lockDataForAlter(__PRETTY_FUNCTION__);
@ -284,6 +284,94 @@ struct CurrentlyMergingPartsTagger
}
};
void StorageMergeTree::mutate(const MutationCommands & commands, const Context & context)
{
Int64 version;
decltype(current_mutations_by_version.end()) mutation_it;
{
std::lock_guard lock(currently_merging_mutex);
version = increment.get();
MergeTreeMutationEntry entry;
entry.create_time = time(nullptr);
entry.block_number = version;
entry.commands = commands;
mutation_it = current_mutations_by_version.emplace(version, std::move(entry));
}
size_t parts_mutated = 0;
while (true)
{
std::optional<CurrentlyMergingPartsTagger> tagger;
MergeTreeDataMergerMutator::FuturePart future_mutated_part;
bool some_locked = false;
{
std::lock_guard lock(currently_merging_mutex);
Int64 prev_version = 0;
if (mutation_it != current_mutations_by_version.begin())
prev_version = std::prev(mutation_it)->first;
for (const auto & part : data.getDataPartsVector())
{
Int64 part_mutation_version = getCurrentMutationVersion(part, lock);
if (part_mutation_version >= version)
continue;
if (part_mutation_version < prev_version)
{
LOG_TRACE(log,
"Part " << part->name << " has mutation version " << part_mutation_version
<< ", will wait until it has version " << prev_version);
some_locked = true;
continue;
}
if (currently_merging.count(part))
{
LOG_TRACE(log, "Part " << part->name << " is currently locked, will wait.");
some_locked = true;
continue;
}
auto new_part_info = part->info;
new_part_info.mutation = version;
future_mutated_part.parts.push_back(part);
future_mutated_part.part_info = new_part_info;
future_mutated_part.name = part->getNewName(new_part_info);
tagger.emplace({part}, part->bytes_on_disk, *this);
break;
}
}
if (!future_mutated_part.parts.empty())
{
auto new_part = merger_mutator.mutatePartToTemporaryPart(
future_mutated_part, mutation_it->second.commands, context);
data.renameTempPartAndReplace(new_part);
++parts_mutated;
}
else if (some_locked)
sleep(1);
else
break;
}
{
std::lock_guard lock(currently_merging_mutex);
current_mutations_by_version.erase(mutation_it);
}
LOG_TRACE(log, "Finished, mutated " << parts_mutated << " parts.");
}
bool StorageMergeTree::merge(
size_t aio_threshold,
bool aggressive,
@ -311,22 +399,23 @@ bool StorageMergeTree::merge(
{
std::lock_guard<std::mutex> lock(currently_merging_mutex);
auto can_merge = [this] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *)
auto can_merge = [this, &lock] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *)
{
return !currently_merging.count(left) && !currently_merging.count(right);
return !currently_merging.count(left) && !currently_merging.count(right)
&& getCurrentMutationVersion(left, lock) == getCurrentMutationVersion(right, lock);
};
bool selected = false;
if (partition_id.empty())
{
size_t max_source_parts_size = merger.getMaxSourcePartsSize();
size_t max_source_parts_size = merger_mutator.getMaxSourcePartsSize();
if (max_source_parts_size > 0)
selected = merger.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, out_disable_reason);
selected = merger_mutator.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, out_disable_reason);
}
else
{
selected = merger.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason);
selected = merger_mutator.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason);
}
if (!selected)
@ -385,9 +474,10 @@ bool StorageMergeTree::merge(
try
{
new_part = merger.mergePartsToTemporaryPart(future_part, *merge_entry, aio_threshold, time(nullptr),
merging_tagger->reserved_space.get(), deduplicate);
merger.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
new_part = merger_mutator.mergePartsToTemporaryPart(
future_part, *merge_entry, aio_threshold, time(nullptr),
merging_tagger->reserved_space.get(), deduplicate);
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
write_part_log({});
}
@ -406,7 +496,7 @@ bool StorageMergeTree::mergeTask()
if (shutdown_called)
return false;
if (merger.actions_blocker.isCancelled())
if (merger_mutator.actions_blocker.isCancelled())
return false;
try
@ -426,12 +516,23 @@ bool StorageMergeTree::mergeTask()
}
}
Int64 StorageMergeTree::getCurrentMutationVersion(
const MergeTreeData::DataPartPtr & part,
std::lock_guard<std::mutex> & /* currently_merging_mutex_lock */) const
{
auto it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
if (it == current_mutations_by_version.begin())
return 0;
--it;
return it->first;
};
void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Field & column_name, const Context & context)
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger.actions_blocker.cancel();
auto merge_blocker = merger_mutator.actions_blocker.cancel();
/// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function
auto lock_read_structure = lockStructure(false, __PRETTY_FUNCTION__);
@ -516,7 +617,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & /*query*/, const ASTPtr & pa
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger.actions_blocker.cancel();
auto merge_blocker = merger_mutator.actions_blocker.cancel();
/// Waits for completion of merge and does not start new ones.
auto lock = lockForAlter(__PRETTY_FUNCTION__);
@ -672,7 +773,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type)
{
if (action_type == ActionLocks::PartsMerge)
return merger.actions_blocker.cancel();
return merger_mutator.actions_blocker.cancel();
return {};
}

View File

@ -7,6 +7,7 @@
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/MergeTreeMutationEntry.h>
#include <Storages/MergeTree/DiskSpaceMonitor.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Common/SimpleIncrement.h>
@ -70,6 +71,8 @@ public:
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context) override;
void freezePartition(const ASTPtr & partition, const String & with_name, const Context & context) override;
void mutate(const MutationCommands & commands, const Context & context) override;
void drop() override;
void truncate(const ASTPtr &) override;
@ -98,7 +101,7 @@ private:
MergeTreeData data;
MergeTreeDataSelectExecutor reader;
MergeTreeDataWriter writer;
MergeTreeDataMergerMutator merger;
MergeTreeDataMergerMutator merger_mutator;
/// For block numbers.
SimpleIncrement increment{0};
@ -106,8 +109,9 @@ private:
/// For clearOldParts, clearOldTemporaryDirectories.
AtomicStopwatch time_after_previous_cleanup;
MergeTreeData::DataParts currently_merging;
std::mutex currently_merging_mutex;
MergeTreeData::DataParts currently_merging;
std::multimap<Int64, MergeTreeMutationEntry> current_mutations_by_version;
Logger * log;
@ -124,6 +128,10 @@ private:
bool mergeTask();
Int64 getCurrentMutationVersion(
const MergeTreeData::DataPartPtr & part,
std::lock_guard<std::mutex> & /* currently_merging_mutex_lock */) const;
friend class MergeTreeBlockOutputStream;
friend class MergeTreeData;
friend struct CurrentlyMergingPartsTagger;

View File

@ -4084,7 +4084,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const
const String & path_created =
static_cast<const zkutil::CreateResponse *>(responses[1].get())->path_created;
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
LOG_TRACE(log, "Created mutation with id " << entry.znode_name);
LOG_TRACE(log, "Created mutation with ID " << entry.znode_name);
break;
}
else if (rc == ZooKeeperImpl::ZooKeeper::ZBADVERSION)