First stupid implementation of non-blocking alter for vanilla merge tree

This commit is contained in:
alesapin 2020-03-11 18:51:04 +03:00
parent 50a88885e1
commit 406b48a20e
4 changed files with 38 additions and 91 deletions

View File

@ -799,7 +799,6 @@ protected:
friend class IMergeTreeDataPart;
friend class MergeTreeDataMergerMutator;
friend class ReplicatedMergeTreeAlterThread;
friend struct ReplicatedMergeTreeTableMetadata;
friend class StorageReplicatedMergeTree;

View File

@ -206,53 +206,6 @@ void StorageMergeTree::truncate(const ASTPtr &, const Context &, TableStructureW
}
std::vector<MergeTreeData::AlterDataPartTransactionPtr> StorageMergeTree::prepareAlterTransactions(
const ColumnsDescription & new_columns, const IndicesDescription & new_indices, const Context & context)
{
auto parts = getDataParts({MergeTreeDataPartState::PreCommitted,
MergeTreeDataPartState::Committed,
MergeTreeDataPartState::Outdated});
std::vector<MergeTreeData::AlterDataPartTransactionPtr> transactions;
transactions.reserve(parts.size());
const auto & columns_for_parts = new_columns.getAllPhysical();
const Settings & settings_ = context.getSettingsRef();
size_t thread_pool_size = std::min<size_t>(parts.size(), settings_.max_alter_threads);
std::optional<ThreadPool> thread_pool;
if (thread_pool_size > 1)
thread_pool.emplace(thread_pool_size);
for (const auto & part : parts)
{
transactions.push_back(std::make_unique<MergeTreeData::AlterDataPartTransaction>(part));
auto job = [this, & transaction = transactions.back(), & columns_for_parts, & new_indices = new_indices.indices]
{
this->alterDataPart(columns_for_parts, new_indices, false, transaction);
};
if (thread_pool)
thread_pool->scheduleOrThrowOnError(job);
else
job();
}
if (thread_pool)
thread_pool->wait();
auto erase_pos = std::remove_if(transactions.begin(), transactions.end(),
[](const MergeTreeData::AlterDataPartTransactionPtr & transaction)
{
return !transaction->isValid();
});
transactions.erase(erase_pos, transactions.end());
return transactions;
}
void StorageMergeTree::alter(
const AlterCommands & params,
const Context & context,
@ -262,55 +215,38 @@ void StorageMergeTree::alter(
lockNewDataStructureExclusively(table_lock_holder, context.getCurrentQueryId());
StorageInMemoryMetadata metadata = getInMemoryMetadata();
StorageInMemoryMetadata current_metadata = getInMemoryMetadata();
params.apply(metadata);
/// Update metdata in memory
auto update_metadata = [&metadata, &table_lock_holder, this]()
{
changeSettings(metadata.settings_ast, table_lock_holder);
/// Reinitialize primary key because primary key column types might have changed.
setProperties(metadata);
setTTLExpressions(metadata.columns.getColumnTTLs(), metadata.ttl_for_table_ast);
};
auto maybe_mutation_commands = params.getMutationCommands(current_metadata);
params.apply(current_metadata);
/// This alter can be performed at metadata level only
if (!params.isModifyingData())
if (params.isSettingsAlter())
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
context.getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, metadata);
changeSettings(current_metadata.settings_ast, table_lock_holder);
update_metadata();
context.getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, current_metadata);
}
else
{
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
/// Also block moves, because they can replace part with old state.
auto merge_blocker = merger_mutator.merges_blocker.cancel();
auto moves_blocked = parts_mover.moves_blocker.cancel();
auto transactions = prepareAlterTransactions(metadata.columns, metadata.indices, context);
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
context.getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, metadata);
update_metadata();
for (auto & transaction : transactions)
{
transaction->commit();
transaction.reset();
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
changeSettings(current_metadata.settings_ast, table_lock_holder);
/// Reinitialize primary key because primary key column types might have changed.
setProperties(current_metadata);
setTTLExpressions(current_metadata.columns.getColumnTTLs(), current_metadata.ttl_for_table_ast);
context.getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, current_metadata);
}
/// Columns sizes could be changed
recalculateColumnSizes();
Context copy = context;
copy.getSettings().mutations_sync = 1;
if (!maybe_mutation_commands.empty())
mutate(maybe_mutation_commands, copy);
}
}
@ -737,9 +673,26 @@ bool StorageMergeTree::tryMutatePart()
size_t current_ast_elements = 0;
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
MutationsInterpreter interpreter(shared_from_this(), it->second.commands, global_context, false);
size_t commands_size = 0;
MutationCommands commands_for_size_validation;
for (const auto & command : it->second.commands)
{
if (command.type != MutationCommand::Type::DROP_COLUMN && command.type != MutationCommand::Type::DROP_INDEX)
{
commands_for_size_validation.push_back(command);
}
else
{
commands_size += command.ast->size();
}
}
if (commands_for_size_validation.size())
{
MutationsInterpreter interpreter(shared_from_this(), commands_for_size_validation, global_context, false);
commands_size += interpreter.evaluateCommandsSize();
}
size_t commands_size = interpreter.evaluateCommandsSize();
if (current_ast_elements + commands_size >= max_ast_elements)
break;

View File

@ -109,9 +109,6 @@ private:
BackgroundProcessingPool::TaskHandle merging_mutating_task_handle;
BackgroundProcessingPool::TaskHandle moving_task_handle;
std::vector<MergeTreeData::AlterDataPartTransactionPtr> prepareAlterTransactions(
const ColumnsDescription & new_columns, const IndicesDescription & new_indices, const Context & context);
void loadMutations();
/** Determines what parts should be merged and merges it.

View File

@ -3246,8 +3246,6 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
/// This transaction may not happen, but it's OK, because on the next retry we will eventually create/update this node
zookeeper->createOrUpdate(replica_path + "/metadata_version", std::to_string(metadata_version), zkutil::CreateMode::Persistent);
recalculateColumnSizes();
return true;
}