From 406b48a20ef9568e79ff514f4323e4efc2953267 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 11 Mar 2020 18:51:04 +0300 Subject: [PATCH] First stupid implementation of non-blocking alter for vanilla merge tree --- dbms/src/Storages/MergeTree/MergeTreeData.h | 1 - dbms/src/Storages/StorageMergeTree.cpp | 123 ++++++------------ dbms/src/Storages/StorageMergeTree.h | 3 - .../Storages/StorageReplicatedMergeTree.cpp | 2 - 4 files changed, 38 insertions(+), 91 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index f5382ca64d8..c06567b3457 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -799,7 +799,6 @@ protected: friend class IMergeTreeDataPart; friend class MergeTreeDataMergerMutator; - friend class ReplicatedMergeTreeAlterThread; friend struct ReplicatedMergeTreeTableMetadata; friend class StorageReplicatedMergeTree; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 9393f0de5d2..adc039d45fd 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -206,53 +206,6 @@ void StorageMergeTree::truncate(const ASTPtr &, const Context &, TableStructureW } -std::vector StorageMergeTree::prepareAlterTransactions( - const ColumnsDescription & new_columns, const IndicesDescription & new_indices, const Context & context) -{ - auto parts = getDataParts({MergeTreeDataPartState::PreCommitted, - MergeTreeDataPartState::Committed, - MergeTreeDataPartState::Outdated}); - std::vector transactions; - transactions.reserve(parts.size()); - - const auto & columns_for_parts = new_columns.getAllPhysical(); - - const Settings & settings_ = context.getSettingsRef(); - size_t thread_pool_size = std::min(parts.size(), settings_.max_alter_threads); - - std::optional thread_pool; - - if (thread_pool_size > 1) - thread_pool.emplace(thread_pool_size); - - for (const auto & part : parts) - { - transactions.push_back(std::make_unique(part)); - - auto job = [this, & transaction = transactions.back(), & columns_for_parts, & new_indices = new_indices.indices] - { - this->alterDataPart(columns_for_parts, new_indices, false, transaction); - }; - - if (thread_pool) - thread_pool->scheduleOrThrowOnError(job); - else - job(); - } - - if (thread_pool) - thread_pool->wait(); - - auto erase_pos = std::remove_if(transactions.begin(), transactions.end(), - [](const MergeTreeData::AlterDataPartTransactionPtr & transaction) - { - return !transaction->isValid(); - }); - transactions.erase(erase_pos, transactions.end()); - - return transactions; -} - void StorageMergeTree::alter( const AlterCommands & params, const Context & context, @@ -262,55 +215,38 @@ void StorageMergeTree::alter( lockNewDataStructureExclusively(table_lock_holder, context.getCurrentQueryId()); - StorageInMemoryMetadata metadata = getInMemoryMetadata(); + StorageInMemoryMetadata current_metadata = getInMemoryMetadata(); - params.apply(metadata); - - /// Update metdata in memory - auto update_metadata = [&metadata, &table_lock_holder, this]() - { - - changeSettings(metadata.settings_ast, table_lock_holder); - /// Reinitialize primary key because primary key column types might have changed. - setProperties(metadata); - - setTTLExpressions(metadata.columns.getColumnTTLs(), metadata.ttl_for_table_ast); - }; + auto maybe_mutation_commands = params.getMutationCommands(current_metadata); + params.apply(current_metadata); /// This alter can be performed at metadata level only - if (!params.isModifyingData()) + if (params.isSettingsAlter()) { lockStructureExclusively(table_lock_holder, context.getCurrentQueryId()); - context.getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, metadata); + changeSettings(current_metadata.settings_ast, table_lock_holder); - update_metadata(); + context.getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, current_metadata); } else { - - /// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time. - /// Also block moves, because they can replace part with old state. - auto merge_blocker = merger_mutator.merges_blocker.cancel(); - auto moves_blocked = parts_mover.moves_blocker.cancel(); - - - auto transactions = prepareAlterTransactions(metadata.columns, metadata.indices, context); - - lockStructureExclusively(table_lock_holder, context.getCurrentQueryId()); - - context.getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, metadata); - - update_metadata(); - - for (auto & transaction : transactions) { - transaction->commit(); - transaction.reset(); + lockStructureExclusively(table_lock_holder, context.getCurrentQueryId()); + + changeSettings(current_metadata.settings_ast, table_lock_holder); + /// Reinitialize primary key because primary key column types might have changed. + setProperties(current_metadata); + + setTTLExpressions(current_metadata.columns.getColumnTTLs(), current_metadata.ttl_for_table_ast); + + context.getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, current_metadata); } - /// Columns sizes could be changed - recalculateColumnSizes(); + Context copy = context; + copy.getSettings().mutations_sync = 1; + if (!maybe_mutation_commands.empty()) + mutate(maybe_mutation_commands, copy); } } @@ -737,9 +673,26 @@ bool StorageMergeTree::tryMutatePart() size_t current_ast_elements = 0; for (auto it = mutations_begin_it; it != mutations_end_it; ++it) { - MutationsInterpreter interpreter(shared_from_this(), it->second.commands, global_context, false); + size_t commands_size = 0; + MutationCommands commands_for_size_validation; + for (const auto & command : it->second.commands) + { + if (command.type != MutationCommand::Type::DROP_COLUMN && command.type != MutationCommand::Type::DROP_INDEX) + { + commands_for_size_validation.push_back(command); + } + else + { + commands_size += command.ast->size(); + } + } + + if (commands_for_size_validation.size()) + { + MutationsInterpreter interpreter(shared_from_this(), commands_for_size_validation, global_context, false); + commands_size += interpreter.evaluateCommandsSize(); + } - size_t commands_size = interpreter.evaluateCommandsSize(); if (current_ast_elements + commands_size >= max_ast_elements) break; diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index 33d48cf1a85..ba9efde900b 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -109,9 +109,6 @@ private: BackgroundProcessingPool::TaskHandle merging_mutating_task_handle; BackgroundProcessingPool::TaskHandle moving_task_handle; - std::vector prepareAlterTransactions( - const ColumnsDescription & new_columns, const IndicesDescription & new_indices, const Context & context); - void loadMutations(); /** Determines what parts should be merged and merges it. diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 1f651f0c3d6..aba53085b63 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3246,8 +3246,6 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer /// This transaction may not happen, but it's OK, because on the next retry we will eventually create/update this node zookeeper->createOrUpdate(replica_path + "/metadata_version", std::to_string(metadata_version), zkutil::CreateMode::Persistent); - recalculateColumnSizes(); - return true; }