Alter primary key: development [#METR-21119].

This commit is contained in:
Alexey Milovidov 2016-05-17 02:04:03 +03:00
parent 161e1c3ace
commit bf265e264a
8 changed files with 123 additions and 30 deletions

View File

@ -78,7 +78,7 @@ private:
}
};
typedef std::vector<PartitionCommand> PartitionCommands;
using PartitionCommands = std::vector<PartitionCommand>;
ASTPtr query_ptr;

View File

@ -409,7 +409,7 @@ public:
AlterDataPartTransactionPtr alterDataPart(
const DataPartPtr & part,
const NamesAndTypesList & new_columns,
const NamesAndTypesList & new_primary_key,
const ASTPtr & new_primary_key,
bool skip_sanity_checks);
/// Нужно вызывать под залоченным lockStructureForAlter().
@ -469,12 +469,13 @@ public:
const MergeTreeSettings settings;
const ASTPtr primary_expr_ast;
ASTPtr primary_expr_ast;
Block primary_key_sample;
DataTypes primary_key_data_types;
private:
friend struct MergeTreeDataPart;
friend class StorageMergeTree;
bool require_part_metadata;
@ -512,6 +513,8 @@ private:
*/
PerShardDataParts per_shard_data_parts;
void initPrimaryKey();
/** Выражение, преобразующее типы столбцов.
* Если преобразований типов нет, out_expression=nullptr.
* out_rename_map отображает файлы-столбцы на выходе выражения в новые файлы таблицы.

View File

@ -474,7 +474,7 @@ void DatabaseOrdinary::alterTable(
if (engine_modifier)
engine_modifier(ast_create_query.storage);
statement = getTableDefinitionFromCreateQuery(ast_create_query);
statement = getTableDefinitionFromCreateQuery(ast);
{
WriteBufferFromFile out(table_metadata_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL);

View File

@ -82,30 +82,35 @@ MergeTreeData::MergeTreeData(
Poco::File(full_path + "detached").createDirectory();
if (primary_expr_ast)
{
/// инициализируем описание сортировки
sort_descr.reserve(primary_expr_ast->children.size());
for (const ASTPtr & ast : primary_expr_ast->children)
{
String name = ast->getColumnName();
sort_descr.push_back(SortColumnDescription(name, 1));
}
primary_expr = ExpressionAnalyzer(primary_expr_ast, context, nullptr, getColumnsList()).getActions(false);
ExpressionActionsPtr projected_expr = ExpressionAnalyzer(primary_expr_ast, context, nullptr, getColumnsList()).getActions(true);
primary_key_sample = projected_expr->getSampleBlock();
size_t primary_key_size = primary_key_sample.columns();
primary_key_data_types.resize(primary_key_size);
for (size_t i = 0; i < primary_key_size; ++i)
primary_key_data_types[i] = primary_key_sample.unsafeGetByPosition(i).type;
}
initPrimaryKey();
else if (merging_params.mode != MergingParams::Unsorted)
throw Exception("Primary key could be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS);
}
void MergeTreeData::initPrimaryKey()
{
/// инициализируем описание сортировки
sort_descr.clear();
sort_descr.reserve(primary_expr_ast->children.size());
for (const ASTPtr & ast : primary_expr_ast->children)
{
String name = ast->getColumnName();
sort_descr.emplace_back(name, 1);
}
primary_expr = ExpressionAnalyzer(primary_expr_ast, context, nullptr, getColumnsList()).getActions(false);
ExpressionActionsPtr projected_expr = ExpressionAnalyzer(primary_expr_ast, context, nullptr, getColumnsList()).getActions(true);
primary_key_sample = projected_expr->getSampleBlock();
size_t primary_key_size = primary_key_sample.columns();
primary_key_data_types.resize(primary_key_size);
for (size_t i = 0; i < primary_key_size; ++i)
primary_key_data_types[i] = primary_key_sample.unsafeGetByPosition(i).type;
}
void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) const
{
/// Проверяем, что столбец sign_column, если нужен, существует, и имеет тип Int8.
@ -209,6 +214,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
data_parts.clear();
all_data_parts.clear();
Strings part_file_names;
Poco::DirectoryIterator end;
@ -537,7 +543,7 @@ void MergeTreeData::checkAlter(const AlterCommands & params)
createConvertExpression(nullptr, getColumnsList(), new_columns, unused_expression, unused_map, unused_bool);
}
void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
@ -638,7 +644,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
const DataPartPtr & part,
const NamesAndTypesList & new_columns,
const NamesAndTypesList & new_primary_key,
const ASTPtr & new_primary_key,
bool skip_sanity_checks)
{
ExpressionActionsPtr expression;
@ -646,6 +652,53 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
bool force_update_metadata;
createConvertExpression(part, part->columns, new_columns, expression, transaction->rename_map, force_update_metadata);
/// Обновление первичного ключа, если нужно.
if (new_primary_key.get() != primary_expr_ast.get())
{
ExpressionActionsPtr new_primary_expr = ExpressionAnalyzer(new_primary_key, context, nullptr, getColumnsList()).getActions(true);
Block new_primary_key_sample = new_primary_expr->getSampleBlock();
size_t new_key_size = new_primary_key_sample.columns();
Columns new_index(new_key_size);
/// Копируем существующие столбцы первичного ключа. Новые заполняем значениями по-умолчанию.
/// NOTE Не поддерживаются вычислимые значения по-умолчанию.
ssize_t prev_position_of_existing_column = -1;
for (size_t i = 0; i < new_key_size; ++i)
{
const String & column_name = new_primary_key_sample.getByPosition(i).name;
if (primary_key_sample.has(column_name))
{
ssize_t position_of_existing_column = primary_key_sample.getPositionByName(column_name);
if (position_of_existing_column < prev_position_of_existing_column)
throw Exception("Permuting of columns of primary key is not supported", ErrorCodes::BAD_ARGUMENTS);
new_index[i] = part->index.at(position_of_existing_column);
prev_position_of_existing_column = position_of_existing_column;
}
else
{
const IDataType & type = *new_primary_key_sample.getByPosition(i).type;
new_index[i] = type.createConstColumn(part->size, type.getDefault());
}
}
if (prev_position_of_existing_column == -1)
throw Exception("No common columns while modifying primary key", ErrorCodes::BAD_ARGUMENTS);
String index_tmp_path = full_path + part->name + "/primary.idx.tmp";
WriteBufferFromFile index_file(index_tmp_path);
for (size_t i = 0, size = part->size; i < size; ++i)
for (size_t j = 0; j < new_key_size; ++j)
new_primary_key_sample.unsafeGetByPosition(j).type.get()->serializeBinary(*new_index[j].get(), index_file);
transaction->rename_map["primary.idx.tmp"] = "primary.idx";
}
if (!skip_sanity_checks && transaction->rename_map.size() > settings.max_files_to_modify_in_alter_columns)
{
transaction->clear();

View File

@ -437,6 +437,7 @@ void MergeTreeDataPart::loadIndex()
if (key_size)
{
index.clear();
index.resize(key_size);
for (size_t i = 0; i < key_size; ++i)

View File

@ -159,7 +159,8 @@ void ReplicatedMergeTreeAlterThread::run()
/// Обновим кусок и запишем результат во временные файлы.
/// TODO: Можно пропускать проверку на слишком большие изменения, если в ZooKeeper есть, например,
/// нода /flags/force_alter.
auto transaction = storage.data.alterDataPart(part, columns_plus_materialized, false);
auto transaction = storage.data.alterDataPart(
part, columns_plus_materialized, storage.data.primary_expr_ast, false);
if (!transaction)
continue;
@ -197,7 +198,8 @@ void ReplicatedMergeTreeAlterThread::run()
for (const MergeTreeData::DataPartPtr & part : parts)
{
auto transaction = storage.unreplicated_data->alterDataPart(part, columns_plus_materialized, false);
auto transaction = storage.unreplicated_data->alterDataPart(
part, columns_plus_materialized, storage.data.primary_expr_ast, false);
if (!transaction)
continue;

View File

@ -7,6 +7,7 @@
#include <DB/Databases/IDatabase.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Interpreters/InterpreterAlterQuery.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <Poco/DirectoryIterator.h>
@ -159,7 +160,11 @@ void StorageMergeTree::rename(const String & new_path_to_db, const String & new_
/// TODO: Можно обновить названия логгеров у this, data, reader, writer, merger.
}
void StorageMergeTree::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context)
void StorageMergeTree::alter(
const AlterCommands & params,
const String & database_name,
const String & table_name,
const Context & context)
{
/// NOTE: Здесь так же как в ReplicatedMergeTree можно сделать ALTER, не блокирующий запись данных надолго.
const MergeTreeMergeBlocker merge_blocker{merger};
@ -182,15 +187,31 @@ void StorageMergeTree::alter(const AlterCommands & params, const String & databa
MergeTreeData::DataParts parts = data.getDataParts();
std::vector<MergeTreeData::AlterDataPartTransactionPtr> transactions;
bool primary_key_is_modified = false;
ASTPtr new_primary_key_ast = data.primary_expr_ast;
for (const AlterCommand & param : params)
{
if (param.type == AlterCommand::MODIFY_PRIMARY_KEY)
{
primary_key_is_modified = true;
new_primary_key_ast = param.primary_key;
}
}
if (primary_key_is_modified && data.merging_params.mode == MergeTreeData::MergingParams::Unsorted)
throw Exception("UnsortedMergeTree cannot have primary key", ErrorCodes::BAD_ARGUMENTS);
for (const MergeTreeData::DataPartPtr & part : parts)
if (auto transaction = data.alterDataPart(part, columns_for_parts, false))
if (auto transaction = data.alterDataPart(part, columns_for_parts, new_primary_key_ast, false))
transactions.push_back(std::move(transaction));
auto table_hard_lock = lockStructureForAlter();
context.getDatabase(database_name)->alterTable(
context, table_name,
new_columns, new_materialized_columns, new_alias_columns, new_column_defaults, {});
new_columns, new_materialized_columns, new_alias_columns, new_column_defaults,
[&new_primary_key_ast] (ASTPtr & primary_key_ast) { primary_key_ast = new_primary_key_ast; });
materialized_columns = new_materialized_columns;
alias_columns = new_alias_columns;
@ -201,8 +222,17 @@ void StorageMergeTree::alter(const AlterCommands & params, const String & databa
data.alias_columns = std::move(new_alias_columns);
data.column_defaults = std::move(new_column_defaults);
if (primary_key_is_modified)
{
data.primary_expr_ast = new_primary_key_ast;
data.initPrimaryKey();
}
for (auto & transaction : transactions)
transaction->commit();
if (primary_key_is_modified)
data.loadDataParts(false);
}
bool StorageMergeTree::merge(

View File

@ -2302,6 +2302,10 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
data.checkAlter(params);
for (const AlterCommand & param : params)
if (param.type == AlterCommand::MODIFY_PRIMARY_KEY)
throw Exception("Modification of primary key is not supported for replicated tables", ErrorCodes::NOT_IMPLEMENTED);
new_columns = data.getColumnsListNonMaterialized();
new_materialized_columns = data.materialized_columns;
new_alias_columns = data.alias_columns;