From 9f19a25fb862ede0c22d73ad0c39dd52970e2c9a Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 3 Apr 2020 14:09:27 +0300 Subject: [PATCH] More comments, better code --- dbms/Storages/ColumnsDescription.cpp | 1 - dbms/Storages/MergeTree/IMergeTreeReader.h | 1 + dbms/Storages/StorageMergeTree.cpp | 56 ++++++++++++---------- dbms/Storages/StorageMergeTree.h | 6 ++- 4 files changed, 36 insertions(+), 28 deletions(-) diff --git a/dbms/Storages/ColumnsDescription.cpp b/dbms/Storages/ColumnsDescription.cpp index f7d00ea2d54..76a55d059ee 100644 --- a/dbms/Storages/ColumnsDescription.cpp +++ b/dbms/Storages/ColumnsDescription.cpp @@ -1,6 +1,5 @@ #include -#include #include #include #include diff --git a/dbms/Storages/MergeTree/IMergeTreeReader.h b/dbms/Storages/MergeTree/IMergeTreeReader.h index 6f588276855..622e11dae8b 100644 --- a/dbms/Storages/MergeTree/IMergeTreeReader.h +++ b/dbms/Storages/MergeTree/IMergeTreeReader.h @@ -78,6 +78,7 @@ protected: MarkRanges all_mark_ranges; friend class MergeTreeRangeReader::DelayedStream; + /// Alter conversions, which must be applied on fly if required MergeTreeData::AlterConversions alter_conversions; }; diff --git a/dbms/Storages/StorageMergeTree.cpp b/dbms/Storages/StorageMergeTree.cpp index 5742b3a811b..ce65f5748cc 100644 --- a/dbms/Storages/StorageMergeTree.cpp +++ b/dbms/Storages/StorageMergeTree.cpp @@ -241,6 +241,11 @@ void StorageMergeTree::alter( DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, metadata); + String mutation_file_name; + Int64 mutation_version = -1; + if (!maybe_mutation_commands.empty()) + mutation_version = startMutation(maybe_mutation_commands, mutation_file_name); + /// We release all locks except alter_intention_lock which allows /// to execute alter queries sequentially table_lock_holder.releaseAllExceptAlterIntention(); @@ -248,7 +253,7 @@ void StorageMergeTree::alter( /// Always execute required mutations synchronously, because alters /// should be executed in sequential order. if (!maybe_mutation_commands.empty()) - mutateImpl(maybe_mutation_commands, /* mutations_sync = */ 1); + waitForMutation(mutation_version, mutation_file_name); } } @@ -351,43 +356,42 @@ public: }; -void StorageMergeTree::mutateImpl(const MutationCommands & commands, size_t mutations_sync) +Int64 StorageMergeTree::startMutation(const MutationCommands & commands, String & mutation_file_name) { /// Choose any disk, because when we load mutations we search them at each disk /// where storage can be placed. See loadMutations(). auto disk = getStoragePolicy()->getAnyDisk(); - String file_name; Int64 version; + std::lock_guard lock(currently_processing_in_background_mutex); - { - std::lock_guard lock(currently_processing_in_background_mutex); + MergeTreeMutationEntry entry(commands, disk, relative_data_path, insert_increment.get()); + version = increment.get(); + entry.commit(version); + mutation_file_name = entry.file_name; + auto insertion = current_mutations_by_id.emplace(mutation_file_name, std::move(entry)); + current_mutations_by_version.emplace(version, insertion.first->second); - MergeTreeMutationEntry entry(commands, disk, relative_data_path, insert_increment.get()); - version = increment.get(); - entry.commit(version); - file_name = entry.file_name; - auto insertion = current_mutations_by_id.emplace(file_name, std::move(entry)); - current_mutations_by_version.emplace(version, insertion.first->second); - - LOG_INFO(log, "Added mutation: " << file_name); - merging_mutating_task_handle->wake(); - } - - /// We have to wait mutation end - if (mutations_sync > 0) - { - LOG_INFO(log, "Waiting mutation: " << file_name); - auto check = [version, this]() { return shutdown_called || isMutationDone(version); }; - std::unique_lock lock(mutation_wait_mutex); - mutation_wait_event.wait(lock, check); - LOG_INFO(log, "Mutation " << file_name << " done"); - } + LOG_INFO(log, "Added mutation: " << mutation_file_name); + merging_mutating_task_handle->wake(); + return version; +} +void StorageMergeTree::waitForMutation(Int64 version, const String & file_name) +{ + LOG_INFO(log, "Waiting mutation: " << file_name); + auto check = [version, this]() { return shutdown_called || isMutationDone(version); }; + std::unique_lock lock(mutation_wait_mutex); + mutation_wait_event.wait(lock, check); + LOG_INFO(log, "Mutation " << file_name << " done"); } void StorageMergeTree::mutate(const MutationCommands & commands, const Context & query_context) { - mutateImpl(commands, query_context.getSettingsRef().mutations_sync); + String mutation_file_name; + Int64 version = startMutation(commands, mutation_file_name); + + if (query_context.getSettingsRef().mutations_sync > 0) + waitForMutation(version, mutation_file_name); } namespace diff --git a/dbms/Storages/StorageMergeTree.h b/dbms/Storages/StorageMergeTree.h index 44ebce03802..fc65692e726 100644 --- a/dbms/Storages/StorageMergeTree.h +++ b/dbms/Storages/StorageMergeTree.h @@ -120,7 +120,11 @@ private: BackgroundProcessingPoolTaskResult movePartsTask(); - void mutateImpl(const MutationCommands & commands, size_t mutations_sync); + /// Allocate block number for new mutation, write mutation to disk + /// and into in-memory structures. Wake up merge-mutation task. + Int64 startMutation(const MutationCommands & commands, String & mutation_file_name); + /// Wait until mutation with version will finish mutation for all parts + void waitForMutation(Int64 version, const String & filename); /// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true. bool tryMutatePart();