More comments, better code

This commit is contained in:
alesapin 2020-04-03 14:09:27 +03:00
parent 20d40b8bce
commit 9f19a25fb8
4 changed files with 36 additions and 28 deletions

View File

@ -1,6 +1,5 @@
#include <Storages/ColumnsDescription.h> #include <Storages/ColumnsDescription.h>
#include <boost/algorithm/string/replace.hpp>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Parsers/ExpressionElementParsers.h> #include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ExpressionListParsers.h> #include <Parsers/ExpressionListParsers.h>

View File

@ -78,6 +78,7 @@ protected:
MarkRanges all_mark_ranges; MarkRanges all_mark_ranges;
friend class MergeTreeRangeReader::DelayedStream; friend class MergeTreeRangeReader::DelayedStream;
/// Alter conversions, which must be applied on fly if required
MergeTreeData::AlterConversions alter_conversions; MergeTreeData::AlterConversions alter_conversions;
}; };

View File

@ -241,6 +241,11 @@ void StorageMergeTree::alter(
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, metadata); DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, metadata);
String mutation_file_name;
Int64 mutation_version = -1;
if (!maybe_mutation_commands.empty())
mutation_version = startMutation(maybe_mutation_commands, mutation_file_name);
/// We release all locks except alter_intention_lock which allows /// We release all locks except alter_intention_lock which allows
/// to execute alter queries sequentially /// to execute alter queries sequentially
table_lock_holder.releaseAllExceptAlterIntention(); table_lock_holder.releaseAllExceptAlterIntention();
@ -248,7 +253,7 @@ void StorageMergeTree::alter(
/// Always execute required mutations synchronously, because alters /// Always execute required mutations synchronously, because alters
/// should be executed in sequential order. /// should be executed in sequential order.
if (!maybe_mutation_commands.empty()) if (!maybe_mutation_commands.empty())
mutateImpl(maybe_mutation_commands, /* mutations_sync = */ 1); waitForMutation(mutation_version, mutation_file_name);
} }
} }
@ -351,43 +356,42 @@ public:
}; };
void StorageMergeTree::mutateImpl(const MutationCommands & commands, size_t mutations_sync) Int64 StorageMergeTree::startMutation(const MutationCommands & commands, String & mutation_file_name)
{ {
/// Choose any disk, because when we load mutations we search them at each disk /// Choose any disk, because when we load mutations we search them at each disk
/// where storage can be placed. See loadMutations(). /// where storage can be placed. See loadMutations().
auto disk = getStoragePolicy()->getAnyDisk(); auto disk = getStoragePolicy()->getAnyDisk();
String file_name;
Int64 version; Int64 version;
{
std::lock_guard lock(currently_processing_in_background_mutex); std::lock_guard lock(currently_processing_in_background_mutex);
MergeTreeMutationEntry entry(commands, disk, relative_data_path, insert_increment.get()); MergeTreeMutationEntry entry(commands, disk, relative_data_path, insert_increment.get());
version = increment.get(); version = increment.get();
entry.commit(version); entry.commit(version);
file_name = entry.file_name; mutation_file_name = entry.file_name;
auto insertion = current_mutations_by_id.emplace(file_name, std::move(entry)); auto insertion = current_mutations_by_id.emplace(mutation_file_name, std::move(entry));
current_mutations_by_version.emplace(version, insertion.first->second); current_mutations_by_version.emplace(version, insertion.first->second);
LOG_INFO(log, "Added mutation: " << file_name); LOG_INFO(log, "Added mutation: " << mutation_file_name);
merging_mutating_task_handle->wake(); merging_mutating_task_handle->wake();
} return version;
}
/// We have to wait mutation end void StorageMergeTree::waitForMutation(Int64 version, const String & file_name)
if (mutations_sync > 0) {
{
LOG_INFO(log, "Waiting mutation: " << file_name); LOG_INFO(log, "Waiting mutation: " << file_name);
auto check = [version, this]() { return shutdown_called || isMutationDone(version); }; auto check = [version, this]() { return shutdown_called || isMutationDone(version); };
std::unique_lock lock(mutation_wait_mutex); std::unique_lock lock(mutation_wait_mutex);
mutation_wait_event.wait(lock, check); mutation_wait_event.wait(lock, check);
LOG_INFO(log, "Mutation " << file_name << " done"); LOG_INFO(log, "Mutation " << file_name << " done");
}
} }
void StorageMergeTree::mutate(const MutationCommands & commands, const Context & query_context) void StorageMergeTree::mutate(const MutationCommands & commands, const Context & query_context)
{ {
mutateImpl(commands, query_context.getSettingsRef().mutations_sync); String mutation_file_name;
Int64 version = startMutation(commands, mutation_file_name);
if (query_context.getSettingsRef().mutations_sync > 0)
waitForMutation(version, mutation_file_name);
} }
namespace namespace

View File

@ -120,7 +120,11 @@ private:
BackgroundProcessingPoolTaskResult movePartsTask(); BackgroundProcessingPoolTaskResult movePartsTask();
void mutateImpl(const MutationCommands & commands, size_t mutations_sync); /// Allocate block number for new mutation, write mutation to disk
/// and into in-memory structures. Wake up merge-mutation task.
Int64 startMutation(const MutationCommands & commands, String & mutation_file_name);
/// Wait until mutation with version will finish mutation for all parts
void waitForMutation(Int64 version, const String & filename);
/// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true. /// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true.
bool tryMutatePart(); bool tryMutatePart();