diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index c8e9da0bb27..da77be0aa10 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -252,6 +252,7 @@ namespace ErrorCodes TABLE_IS_READ_ONLY, NOT_ENOUGH_SPACE, UNEXPECTED_ZOOKEEPER_ERROR, + INVALID_NESTED_NAME, POCO_EXCEPTION = 1000, STD_EXCEPTION, diff --git a/dbms/include/DB/Interpreters/InterpreterAlterQuery.h b/dbms/include/DB/Interpreters/InterpreterAlterQuery.h index c990b59ce8b..8c2a635d535 100644 --- a/dbms/include/DB/Interpreters/InterpreterAlterQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterAlterQuery.h @@ -1,11 +1,14 @@ #pragma once #include +#include #include +#include namespace DB { -class ASTIdentifier; + + /** Позволяет добавить или удалить столбец в таблице. */ @@ -16,10 +19,12 @@ public: void execute(); -private: - void dropColumnFromAST(const ASTIdentifier & drop_column, ASTs & columns); - void addColumnToAST(StoragePtr table, ASTs & columns, const ASTPtr & add_column_ptr, const ASTPtr & after_column_ptr); + /** Изменяет список столбцов в метаданных таблицы на диске. Нужно вызывать под TableStructureLock соответствующей таблицы. + */ + static void updateMetadata(const String & database, const String & table, const NamesAndTypesList & columns, Context & context); + static AlterCommands parseAlter(const ASTAlterQuery::ParameterContainer & params, const DataTypeFactory & data_type_factory); +private: ASTPtr query_ptr; Context context; diff --git a/dbms/include/DB/Interpreters/InterpreterCreateQuery.h b/dbms/include/DB/Interpreters/InterpreterCreateQuery.h index 64333192b3c..ddcf7672e48 100644 --- a/dbms/include/DB/Interpreters/InterpreterCreateQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterCreateQuery.h @@ -21,6 +21,11 @@ public: * (для случая выполнения запроса из существующего файла с метаданными). */ StoragePtr execute(bool assume_metadata_exists = false); + + /** AST в список столбцов с типами и обратно. Столбцы типа Nested развернуты в список настоящих столбцов. + */ + static NamesAndTypesList parseColumns(ASTPtr expression_list, const DataTypeFactory & data_type_factory); + static ASTPtr formatColumns(const NamesAndTypesList & columns); private: ASTPtr query_ptr; diff --git a/dbms/include/DB/Storages/AlterCommands.h b/dbms/include/DB/Storages/AlterCommands.h new file mode 100644 index 00000000000..c2dc7485fa2 --- /dev/null +++ b/dbms/include/DB/Storages/AlterCommands.h @@ -0,0 +1,120 @@ +#pragma once +#include +#include +#include + +namespace DB +{ + +/// Операция из запроса ALTER. Добавление столбцов типа Nested не развернуто в добавление отдельных столбцов. +struct AlterCommand +{ + enum Type + { + ADD, + DROP, + MODIFY + }; + + Type type; + + String column_name; + + /// Для ADD и MODIFY - новый тип столбца. + DataTypePtr data_type; + + /// Для ADD - после какого столбца добавить новый. Если пустая строка, добавить в конец. Добавить в начало сейчас нельзя. + String after_column; + + + /// одинаковыми считаются имена, если они совпадают целиком или name_without_dot совпадает с частью имени до точки + static bool namesEqual(const String & name_without_dot, const DB::NameAndTypePair & name_type) + { + String name_with_dot = name_without_dot + "."; + return (name_with_dot == name_type.name.substr(0, name_without_dot.length() + 1) || name_without_dot == name_type.name); + } + + void apply(NamesAndTypesList & columns) const + { + if (type == ADD) + { + if (std::count_if(columns.begin(), columns.end(), std::bind(namesEqual, column_name, std::placeholders::_1))) + throw Exception("Cannot add column " + column_name + ": column with this name already exisits.", + DB::ErrorCodes::ILLEGAL_COLUMN); + + if (DataTypeNested::extractNestedTableName(column_name) != column_name && + !typeid_cast(&*data_type)) + throw Exception("Can't add nested column " + column_name + " of non-array type " + data_type->getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + NamesAndTypesList::iterator insert_it = columns.end(); + if (!after_column.empty()) + { + /// Пытаемся найти первую с конца колонку с именем column_name или с именем, начинающимся с column_name и ".". + /// Например "fruits.bananas" + /// одинаковыми считаются имена, если они совпадают целиком или name_without_dot совпадает с частью имени до точки + NamesAndTypesList::reverse_iterator reverse_insert_it = std::find_if(columns.rbegin(), columns.rend(), + std::bind(namesEqual, after_column, std::placeholders::_1)); + + if (reverse_insert_it == columns.rend()) + throw Exception("Wrong column name. Cannot find column " + column_name + " to insert after", + DB::ErrorCodes::ILLEGAL_COLUMN); + else + { + /// base возвращает итератор, уже смещенный на один элемент вправо + insert_it = reverse_insert_it.base(); + } + } + + columns.insert(insert_it, NameAndTypePair(column_name, data_type)); + + /// Медленно, так как каждый раз копируется список + columns = *DataTypeNested::expandNestedColumns(columns); + } + else if (type == DROP) + { + bool is_first = true; + NamesAndTypesList::iterator column_it; + do + { + column_it = std::find_if(columns.begin(), columns.end(), std::bind(namesEqual, column_name, std::placeholders::_1)); + + if (column_it == columns.end()) + { + if (is_first) + throw Exception("Wrong column name. Cannot find column " + column_name + " to drop", + DB::ErrorCodes::ILLEGAL_COLUMN); + } + else + columns.erase(column_it); + is_first = false; + } + while (column_it != columns.end()); + } + else if (type == MODIFY) + { + NamesAndTypesList::iterator column_it = std::find_if(columns.begin(), columns.end(), + std::bind(namesEqual, column_name, std::placeholders::_1) ); + if (column_it == columns.end()) + throw Exception("Wrong column name. Cannot find column " + column_name + " to modify.", + DB::ErrorCodes::ILLEGAL_COLUMN); + column_it->type = data_type; + } + else + throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR); + } +}; + +class AlterCommands : public std::vector +{ +public: + void apply(NamesAndTypesList & columns) const + { + NamesAndTypesList new_columns = columns; + for (const AlterCommand & command : *this) + command.apply(new_columns); + columns = new_columns; + } +}; + +} diff --git a/dbms/include/DB/Storages/IStorage.h b/dbms/include/DB/Storages/IStorage.h index 07d21bd5fcb..e34583f2dda 100644 --- a/dbms/include/DB/Storages/IStorage.h +++ b/dbms/include/DB/Storages/IStorage.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -196,30 +197,14 @@ public: } /** ALTER таблицы в виде изменения столбцов, не затрагивающий изменение Storage или его параметров. - * (ALTER, затрагивающий изменение движка, делается внешним кодом, путём копирования данных.) - * Вызывается при заблокированной на запись структуре таблицы. - * Для ALTER MODIFY можно использовать другие методы (см. ниже). + * Этот метод должен полностью выполнить запрос ALTER, самостоятельно заботясь о блокировках. + * Для обновления метаданных таблицы на диске этот метод должен вызвать InterpreterAlterQuery::updateMetadata. */ - virtual void alter(const ASTAlterQuery::Parameters & params) + virtual void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) { throw Exception("Method alter is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); } - /** ALTER MODIFY (изменение типа столбца) выполняется в два вызова: - * Сначала вызывается prepareAlterModify при заблокированной записи данных, но незаблокированной структуре таблицы. - * В нем можно выполнить долгую работу по записи сконвертированных данных, оставляя доступными существующие данные. - * Потом вызывается commitAlterModify при заблокированной структуре таблицы. - * В нем нужно закончить изменение типа столбца. - * Для движков с тривиальным ALTER MODIFY можно оставить реализацию по умолчанию, вызывающую alter. - */ - - virtual void prepareAlterModify(const ASTAlterQuery::Parameters & params) {} - - virtual void commitAlterModify(const ASTAlterQuery::Parameters & params) - { - alter(params); - } - /** Выполнить какую-либо фоновую работу. Например, объединение кусков в таблице типа MergeTree. * Возвращает - была ли выполнена какая-либо работа. */ diff --git a/dbms/include/DB/Storages/ITableDeclaration.h b/dbms/include/DB/Storages/ITableDeclaration.h index dfff175a701..cd153bed04c 100644 --- a/dbms/include/DB/Storages/ITableDeclaration.h +++ b/dbms/include/DB/Storages/ITableDeclaration.h @@ -4,7 +4,6 @@ #include #include #include -#include namespace DB { @@ -62,9 +61,6 @@ public: */ void check(const Block & block, bool need_all = false) const; - /// реализация alter, модифицирующая список столбцов. - static void alterColumns(const ASTAlterQuery::Parameters & params, NamesAndTypesListPtr & columns, const Context & context); - virtual ~ITableDeclaration() {} }; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index bc7f510e26b..e0cfb5a5cd5 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -338,6 +338,13 @@ public: } } } + + bool hasColumnFiles(const String & column) const + { + String escaped_column = escapeForFileName(column); + return Poco::File(storage.full_path + name + "/" + escaped_column + ".bin").exists() && + Poco::File(storage.full_path + name + "/" + escaped_column + ".mrk").exists(); + } }; typedef std::shared_ptr MutableDataPartPtr; @@ -385,6 +392,28 @@ public: DataPartsVector added_parts; }; + /// Объект, помнящий какие временные файлы были созданы в директории с куском в ходе изменения (ALTER) его столбцов. + class AlterDataPartTransaction : private boost::noncopyable + { + public: + /// Переименовывает временные файлы, завершая ALTER куска. + void commit(); + + /// Если не был вызван commit(), удаляет временные файлы, отменяя ALTER куска. + ~AlterDataPartTransaction(); + + private: + friend class MergeTreeData; + + AlterDataPartTransaction(DataPartPtr data_part_) : data_part(data_part_) {} + + DataPartPtr data_part; + /// Если значение - пустая строка, файл нужно удалить, и он не временный. + NameToNameMap rename_map; + }; + + typedef std::unique_ptr AlterDataPartTransactionPtr; + /// Режим работы. См. выше. enum Mode { @@ -489,13 +518,23 @@ public: /** Перемещает всю директорию с данными. * Сбрасывает кеши разжатых блоков и засечек. - * Нужно вызывать под залоченным lockStructure(). + * Нужно вызывать под залоченным lockStructureForAlter(). */ void setPath(const String & full_path); - void alter(const ASTAlterQuery::Parameters & params); - void prepareAlterModify(const ASTAlterQuery::Parameters & params); - void commitAlterModify(const ASTAlterQuery::Parameters & params); + /* Проверить, что такой ALTER можно выполнить: + * - Есть все нужные столбцы. + * - Все преобразования типов допустимы. + * - Не затронуты столбцы ключа, знака и семплирования. + * Бросает исключение, если что-то не так. + */ + void checkAlter(const AlterCommands & params); + + /// Выполняет ALTER куска данных и записывает результат во временные файлы. + AlterDataPartTransactionPtr alterDataPart(DataPartPtr part, const NamesAndTypesList & new_columns); + + /// Нужно вызывать под залоченным lockStructureForAlter(). + void setColumnsList(const NamesAndTypesList & new_columns) { columns = new NamesAndTypesList(new_columns); } ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; } SortDescription getSortDescription() const { return sort_descr; } @@ -540,12 +579,17 @@ private: /// Загрузить множество кусков с данными с диска. Вызывается один раз - при создании объекта. void loadDataParts(); - void removeColumnFiles(String column_name, bool remove_array_size_files); - /// Определить, не битые ли данные в директории. Проверяет индекс и засечеки, но не сами данные. bool isBrokenPart(const String & path); - void createConvertExpression(const String & in_column_name, const String & out_type, ExpressionActionsPtr & out_expression, String & out_column); + /** Выражение, преобразующее типы столбцов. + * Если преобразований типов нет, out_expression=nullptr. + * out_rename_map отображает файлы-столбцы на выходе выражения в новые файлы таблицы. + * Файлы, которые нужно удалить, в out_rename_map отображаются в пустую строку. + * Если !part, просто проверяет, что все нужные преобразования типов допустимы. + */ + void createConvertExpression(DataPartPtr part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns, + ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map); }; } diff --git a/dbms/include/DB/Storages/StorageDistributed.h b/dbms/include/DB/Storages/StorageDistributed.h index 320354041cd..d0c6579affd 100644 --- a/dbms/include/DB/Storages/StorageDistributed.h +++ b/dbms/include/DB/Storages/StorageDistributed.h @@ -59,7 +59,7 @@ public: void rename(const String & new_path_to_db, const String & new_name) { name = new_name; } /// в подтаблицах добавлять и удалять столбы нужно вручную /// структура подтаблиц не проверяется - void alter(const ASTAlterQuery::Parameters ¶ms); + void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context); private: StorageDistributed( diff --git a/dbms/include/DB/Storages/StorageMerge.h b/dbms/include/DB/Storages/StorageMerge.h index 651ce90e8c9..55e0a7d048b 100644 --- a/dbms/include/DB/Storages/StorageMerge.h +++ b/dbms/include/DB/Storages/StorageMerge.h @@ -49,7 +49,7 @@ public: /// в подтаблицах добавлять и удалять столбы нужно вручную /// структура подтаблиц не проверяется - void alter(const ASTAlterQuery::Parameters & params); + void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context); Block getBlockWithVirtualColumns(const std::vector & selected_tables) const; private: diff --git a/dbms/include/DB/Storages/StorageMergeTree.h b/dbms/include/DB/Storages/StorageMergeTree.h index 75b2223a711..80269000d35 100644 --- a/dbms/include/DB/Storages/StorageMergeTree.h +++ b/dbms/include/DB/Storages/StorageMergeTree.h @@ -71,9 +71,7 @@ public: void rename(const String & new_path_to_db, const String & new_name); - void alter(const ASTAlterQuery::Parameters & params); - void prepareAlterModify(const ASTAlterQuery::Parameters & params); - void commitAlterModify(const ASTAlterQuery::Parameters & params); + void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context); bool supportsIndexForIn() const override { return true; } diff --git a/dbms/src/DataTypes/DataTypeNested.cpp b/dbms/src/DataTypes/DataTypeNested.cpp index cf3f0453547..b2c07073e3d 100644 --- a/dbms/src/DataTypes/DataTypeNested.cpp +++ b/dbms/src/DataTypes/DataTypeNested.cpp @@ -33,15 +33,21 @@ std::string DataTypeNested::concatenateNestedName(const std::string & nested_tab std::string DataTypeNested::extractNestedTableName(const std::string & nested_name) { - const char * pos = strchr(nested_name.data(), '.'); - return pos == nullptr ? nested_name : nested_name.substr(0, pos - nested_name.data()); + const char * first_pos = strchr(nested_name.data(), '.'); + const char * last_pos = strrchr(nested_name.data(), '.'); + if (first_pos != last_pos) + throw Exception("Invalid nested column name: " + nested_name, ErrorCodes::INVALID_NESTED_NAME); + return first_pos == nullptr ? nested_name : nested_name.substr(0, first_pos - nested_name.data()); } std::string DataTypeNested::extractNestedColumnName(const std::string & nested_name) { - const char * pos = strrchr(nested_name.data(), '.'); - return pos == nullptr ? nested_name : nested_name.substr(pos - nested_name.data() + 1); + const char * first_pos = strchr(nested_name.data(), '.'); + const char * last_pos = strrchr(nested_name.data(), '.'); + if (first_pos != last_pos) + throw Exception("Invalid nested column name: " + nested_name, ErrorCodes::INVALID_NESTED_NAME); + return last_pos == nullptr ? nested_name : nested_name.substr(last_pos - nested_name.data() + 1); } diff --git a/dbms/src/Interpreters/InterpreterAlterQuery.cpp b/dbms/src/Interpreters/InterpreterAlterQuery.cpp index d1f1d685c5d..12d1cec5b5f 100644 --- a/dbms/src/Interpreters/InterpreterAlterQuery.cpp +++ b/dbms/src/Interpreters/InterpreterAlterQuery.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -27,297 +28,108 @@ InterpreterAlterQuery::InterpreterAlterQuery(ASTPtr query_ptr_, Context & contex { } -static bool namesEqual(const String &name, const ASTPtr & name_type_) -{ - const ASTNameTypePair & name_type = typeid_cast(*name_type_); - return name_type.name == name; -} - -/// одинаковыми считаются имена, если они совпадают целиком или name_without_dot совпадает с частью имени до точки -static bool namesEqualIgnoreAfterDot(const String & name_without_dot, const ASTPtr & name_type_) -{ - const ASTNameTypePair & name_type = typeid_cast(*name_type_); - - String name_with_dot = name_without_dot + "."; - return (name_without_dot == name_type.name || name_with_dot == name_type.name.substr(0, name_with_dot.length())); -} - -void InterpreterAlterQuery::dropColumnFromAST(const ASTIdentifier & drop_column, ASTs & columns) -{ - Exception e("Wrong column name. Cannot find column " + drop_column.name + " to drop", DB::ErrorCodes::ILLEGAL_COLUMN); - ASTs::iterator drop_it; - - size_t dot_pos = drop_column.name.find('.'); - /// случай удаления nested столбца - if (dot_pos != std::string::npos) - { - /// в Distributed таблицах столбцы имеют название "nested.column" - drop_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, drop_column.name, _1)); - if (drop_it != columns.end()) - columns.erase(drop_it); - else - { - try - { - /// в MergeTree таблицах есть ASTFunction "nested" - /// в аргументах которой записаны столбцы - ASTs::iterator nested_it; - std::string nested_base_name = drop_column.name.substr(0, dot_pos); - nested_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, nested_base_name, _1)); - if (nested_it == columns.end()) - throw e; - - if ((**nested_it).children.size() != 1) - throw e; - - ASTFunction & f = typeid_cast(*(**nested_it).children.back()); - if (f.name != "Nested") - throw e; - - ASTs & nested_columns = typeid_cast(*f.arguments).children; - - drop_it = std::find_if(nested_columns.begin(), nested_columns.end(), boost::bind(namesEqual, drop_column.name.substr(dot_pos + 1), _1)); - if (drop_it == nested_columns.end()) - throw e; - else - nested_columns.erase(drop_it); - - if (nested_columns.empty()) - columns.erase(nested_it); - } - catch (std::bad_cast & bad_cast_err) - { - throw e; - } - } - } - else - { - drop_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, drop_column.name, _1)); - if (drop_it == columns.end()) - throw e; - else - columns.erase(drop_it); - } -} - -void addColumnToAST1(ASTs & columns, const ASTPtr & add_column_ptr, const ASTPtr & after_column_ptr) -{ - const ASTNameTypePair & add_column = typeid_cast(*add_column_ptr); - const ASTIdentifier * col_after = after_column_ptr ? &typeid_cast(*after_column_ptr) : nullptr; - - if (std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, add_column.name, _1)) != columns.end()) - { - throw Exception("Fail to add column " + add_column.name + ". Column already exists"); - } - ASTs::iterator insert_it = columns.end(); - if (col_after) - { - /// если есть точка, то нас просят вставить после nested столбца - auto find_functor = col_after->name.find('.') != std::string ::npos ? boost::bind(namesEqual, col_after->name, _1) : boost::bind(namesEqualIgnoreAfterDot, col_after->name, _1); - - insert_it = std::find_if(columns.begin(), columns.end(), find_functor); - if (insert_it == columns.end()) - throw Exception("Wrong column name. Cannot find column " + col_after->name + " to insert after"); - ++insert_it; - } - columns.insert(insert_it, add_column_ptr); -} - -void InterpreterAlterQuery::addColumnToAST(StoragePtr table, ASTs & columns, const ASTPtr & add_column_ptr, const ASTPtr & after_column_ptr) -{ - /// хотим исключение если приведение зафейлится - const ASTNameTypePair & add_column = typeid_cast(*add_column_ptr); - const ASTIdentifier * after_col = after_column_ptr ? &typeid_cast(*after_column_ptr) : nullptr; - - size_t dot_pos = add_column.name.find('.'); - bool insert_nested_column = dot_pos != std::string::npos; - - const DataTypeFactory & data_type_factory = context.getDataTypeFactory(); - StringRange type_range = add_column.type->range; - String type(type_range.first, type_range.second - type_range.first); - DataTypePtr datatype = data_type_factory.get(type); - if (insert_nested_column) - { - if (!typeid_cast(datatype.get())) - { - throw Exception("Cannot add column " + add_column.name + ". Because it is not an array. Only arrays could be nested and consist '.' in their names"); - } - } - - if ((typeid_cast(table.get()) || typeid_cast(table.get())) && insert_nested_column) - { - /// специальный случай для вставки nested столбцов в MergeTree - /// в MergeTree таблицах есть ASTFunction "Nested" в аргументах которой записаны столбцы - std::string nested_base_name = add_column.name.substr(0, dot_pos); - ASTs::iterator nested_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, nested_base_name, _1)); - - if (nested_it != columns.end()) - { - /// нужно добавить колонку в уже существующий nested столбец - ASTFunction * nested_func = typeid_cast((*nested_it)->children.back().get()); - if (!(**nested_it).children.size() || !nested_func || nested_func->name != "Nested") - throw Exception("Column with name " + nested_base_name + " already exists. But it is not nested."); - - ASTs & nested_columns = typeid_cast(*nested_func->arguments).children; - - ASTPtr new_nested_column_ptr = add_column_ptr->clone(); - ASTNameTypePair& new_nested_column = typeid_cast(*new_nested_column_ptr); - new_nested_column.name = add_column.name.substr(dot_pos + 1); - ASTPtr new_after_column = after_column_ptr ? after_column_ptr->clone() : nullptr; - - if (new_after_column) - { - size_t after_dot_pos = after_col->name.find('.'); - if (after_dot_pos == std::string::npos) - throw Exception("Nested column " + add_column.name + " should be inserted only after nested column"); - if (add_column.name.substr(0, dot_pos) != after_col->name.substr(0, after_dot_pos)) - throw Exception("Nested column " + add_column.name + "should be inserted after column with the same name before the '.'"); - - typeid_cast(*new_after_column).name = after_col->name.substr(after_dot_pos + 1); - } - - { - /// удаляем массив из типа, т.е. Array(String) -> String - ParserIdentifierWithOptionalParameters type_parser; - Expected expected; - const char * begin = new_nested_column.type->range.first + strlen("Array("); - const char * end = new_nested_column.type->range.second - static_cast(strlen(")")); - if (!type_parser.parse(begin, end, new_nested_column.type, expected)) - throw Exception("Fail to convert type like Array(SomeType) -> SomeType for type " + type); - } - - addColumnToAST1(nested_columns, new_nested_column_ptr, new_after_column); - } - else - { - throw Exception("If you want to create new Nested column use syntax like. ALTER TABLE table ADD COLUMN MyColumn Nested(Name1 Type1, Name2 Type2...) [AFTER BeforeColumn]"); - } - } - else - { - /// в Distributed и Merge таблицах столбцы имеют название "nested.column" - addColumnToAST1(columns, add_column_ptr, after_column_ptr); - } -} - void InterpreterAlterQuery::execute() { ASTAlterQuery & alter = typeid_cast(*query_ptr); String & table_name = alter.table; String database_name = alter.database.empty() ? context.getCurrentDatabase() : alter.database; + AlterCommands commands = parseAlter(alter.parameters, context.getDataTypeFactory()); StoragePtr table = context.getTable(database_name, table_name); - auto table_soft_lock = table->lockDataForAlter(); + table->alter(commands, database_name, table_name, context); +} - const DataTypeFactory & data_type_factory = context.getDataTypeFactory(); +AlterCommands InterpreterAlterQuery::parseAlter( + const ASTAlterQuery::ParameterContainer & params_container, const DataTypeFactory & data_type_factory) +{ + AlterCommands res; + + for (const auto & params : params_container) + { + res.push_back(AlterCommand()); + AlterCommand & command = res.back(); + + if (params.type == ASTAlterQuery::ADD) + { + command.type = AlterCommand::ADD; + + const ASTNameTypePair & ast_name_type = typeid_cast(*params.name_type); + StringRange type_range = ast_name_type.type->range; + String type_string = String(type_range.first, type_range.second - type_range.first); + + command.column_name = ast_name_type.name; + command.data_type = data_type_factory.get(type_string); + + if (params.column) + command.after_column = typeid_cast(*params.column).name; + } + else if (params.type == ASTAlterQuery::DROP) + { + command.type = AlterCommand::DROP; + command.column_name = typeid_cast(*(params.column)).name; + } + else if (params.type == ASTAlterQuery::MODIFY) + { + command.type = AlterCommand::MODIFY; + + const ASTNameTypePair & ast_name_type = typeid_cast(*params.name_type); + StringRange type_range = ast_name_type.type->range; + String type_string = String(type_range.first, type_range.second - type_range.first); + + command.column_name = ast_name_type.name; + command.data_type = data_type_factory.get(type_string); + } + else + throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR); + } + + return res; +} + +void InterpreterAlterQuery::updateMetadata( + const String& database_name, const String& table_name, const NamesAndTypesList& columns, Context& context) +{ String path = context.getPath(); String database_name_escaped = escapeForFileName(database_name); String table_name_escaped = escapeForFileName(table_name); String metadata_path = path + "metadata/" + database_name_escaped + "/" + table_name_escaped + ".sql"; + String metadata_temp_path = metadata_path + ".tmp"; - ASTPtr attach_ptr = context.getCreateQuery(database_name, table_name); - ASTCreateQuery & attach = typeid_cast(*attach_ptr); - attach.attach = true; - ASTs & columns = typeid_cast(*attach.columns).children; - - /// Различные проверки, на возможность выполнения запроса - ASTs columns_copy; - for (const auto & ast : columns) - columns_copy.push_back(ast->clone()); - - IdentifierNameSet identifier_names; - attach.storage->collectIdentifierNames(identifier_names); - for (ASTAlterQuery::ParameterContainer::const_iterator alter_it = alter.parameters.begin(); - alter_it != alter.parameters.end(); ++alter_it) + StringPtr query = new String(); { - const ASTAlterQuery::Parameters & params = *alter_it; - - if (params.type == ASTAlterQuery::ADD) - { - addColumnToAST(table, columns_copy, params.name_type, params.column); - } - else if (params.type == ASTAlterQuery::DROP) - { - const ASTIdentifier & drop_column = typeid_cast(*params.column); - - /// Проверяем, что поле не является ключевым - if (identifier_names.find(drop_column.name) != identifier_names.end()) - throw Exception("Cannot drop key column", DB::ErrorCodes::ILLEGAL_COLUMN); - - dropColumnFromAST(drop_column, columns_copy); - } - else if (params.type == ASTAlterQuery::MODIFY) - { - const ASTNameTypePair & name_type = typeid_cast(*params.name_type); - StringRange type_range = name_type.type->range; - - /// проверяем корректность типа. В случае некоректного типа будет исключение - String type(type_range.first, type_range.second - type_range.first); - data_type_factory.get(type); - - /// проверяем, что колонка существует - auto modified_column = std::find_if(columns_copy.begin(), columns_copy.end(), boost::bind(namesEqual, name_type.name, _1)); - if ( modified_column == columns_copy.end()) - throw Exception("Wrong column name. Column " + name_type.name + " not exists", DB::ErrorCodes::ILLEGAL_COLUMN); - - /// Проверяем, что поле не является ключевым - if (identifier_names.find(name_type.name) != identifier_names.end()) - throw Exception("Modification of primary column not supported", DB::ErrorCodes::ILLEGAL_COLUMN); - - /// к сожалению, проверить на возможно ли это приведение типов можно только во время выполнения - } + ReadBufferFromFile in(metadata_path); + WriteBufferFromString out(*query); + copyData(in, out); } - /// Пока разрешим читать из таблицы. Запретим при первой попытке изменить структуру таблицы. - /// Это позволит сделать большую часть первого MODIFY, не останавливая чтение из таблицы. - IStorage::TableStructureWriteLockPtr table_hard_lock; + const char * begin = query->data(); + const char * end = begin + query->size(); + const char * pos = begin; + + ParserCreateQuery parser; + ASTPtr ast; + Expected expected = ""; + bool parse_res = parser.parse(pos, end, ast, expected); + + /// Распарсенный запрос должен заканчиваться на конец входных данных или на точку с запятой. + if (!parse_res || (pos != end && *pos != ';')) + throw Exception(getSyntaxErrorMessage(parse_res, begin, end, pos, expected, "in file " + metadata_path), + DB::ErrorCodes::SYNTAX_ERROR); + + ast->query_string = query; + + ASTCreateQuery & attach = typeid_cast(*ast); + + ASTPtr new_columns = InterpreterCreateQuery::formatColumns(columns); + *std::find(attach.children.begin(), attach.children.end(), attach.columns) = new_columns; + attach.columns = new_columns; - /// todo cycle over sub tables and tables - /// Применяем изменения - for (ASTAlterQuery::ParameterContainer::const_iterator alter_it = alter.parameters.begin(); - alter_it != alter.parameters.end(); ++alter_it) { - const ASTAlterQuery::Parameters & params = *alter_it; - - if (params.type == ASTAlterQuery::MODIFY) - { - table->prepareAlterModify(params); - - if (!table_hard_lock) - table_hard_lock = table->lockStructureForAlter(); - - table->commitAlterModify(params); - } - else - { - if (!table_hard_lock) - table_hard_lock = table->lockStructureForAlter(); - - table->alter(params); - } - - if (params.type == ASTAlterQuery::ADD) - { - addColumnToAST(table, columns, params.name_type, params.column); - } - else if (params.type == ASTAlterQuery::DROP) - { - const ASTIdentifier & drop_column = typeid_cast(*params.column); - dropColumnFromAST(drop_column, columns); - } - else if (params.type == ASTAlterQuery::MODIFY) - { - const ASTNameTypePair & name_type = typeid_cast(*params.name_type); - ASTs::iterator modify_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, name_type.name, _1)); - ASTNameTypePair & modified_column = typeid_cast(**modify_it); - modified_column.type = name_type.type; - } - - /// Перезаписываем файл метадата каждую итерацию - Poco::FileOutputStream ostr(metadata_path); + Poco::FileOutputStream ostr(metadata_temp_path); formatAST(attach, ostr, 0, false); } + + Poco::File(metadata_temp_path).renameTo(metadata_path); } diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index b0031878625..540f2585b7d 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -20,6 +20,7 @@ #include #include +#include namespace DB @@ -114,15 +115,7 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists) /// Получаем список столбцов if (create.columns) { - ASTExpressionList & columns_list = typeid_cast(*create.columns); - for (ASTs::iterator it = columns_list.children.begin(); it != columns_list.children.end(); ++it) - { - ASTNameTypePair & name_and_type_pair = typeid_cast(**it); - StringRange type_range = name_and_type_pair.type->range; - columns->push_back(NameAndTypePair( - name_and_type_pair.name, - context.getDataTypeFactory().get(String(type_range.first, type_range.second - type_range.first)))); - } + columns = new NamesAndTypesList(parseColumns(create.columns, context.getDataTypeFactory())); } else if (!create.as_table.empty()) columns = new NamesAndTypesList(as_storage->getColumnsList()); @@ -135,34 +128,13 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists) else throw Exception("Incorrect CREATE query: required list of column descriptions or AS section or SELECT.", ErrorCodes::INCORRECT_QUERY); - /// Дополняем запрос списком столбцов из другой таблицы, если его не было. - if (!create.columns) - { - ASTPtr columns_list_ptr = new ASTExpressionList; - ASTExpressionList & columns_list = typeid_cast(*columns_list_ptr); - - for (NamesAndTypesList::const_iterator it = columns->begin(); it != columns->end(); ++it) - { - ASTPtr name_and_type_pair_ptr = new ASTNameTypePair; - ASTNameTypePair & name_and_type_pair = typeid_cast(*name_and_type_pair_ptr); - name_and_type_pair.name = it->name; - StringPtr type_name = new String(it->type->getName()); - - ParserIdentifierWithOptionalParameters storage_p; - Expected expected = ""; - const char * pos = type_name->data(); - const char * end = pos + type_name->size(); - - if (!storage_p.parse(pos, end, name_and_type_pair.type, expected)) - throw Exception("Cannot parse data type.", ErrorCodes::SYNTAX_ERROR); - - name_and_type_pair.type->query_string = type_name; - columns_list.children.push_back(name_and_type_pair_ptr); - } - - create.columns = columns_list_ptr; - create.children.push_back(create.columns); - } + /// Даже если в запросе был список столбцов, на всякий случай приведем его к стандартному виду (развернем Nested). + ASTPtr new_columns = formatColumns(*columns); + if (create.columns) + *std::find(create.children.begin(), create.children.end(), create.columns) = new_columns; + else + create.children.push_back(new_columns); + create.columns = new_columns; /// Выбор нужного движка таблицы if (create.storage) @@ -238,7 +210,6 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists) if (create.is_temporary) { -// res->is_dropped = true; context.getSessionContext().addExternalTable(table_name, res); } else @@ -255,4 +226,47 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists) return res; } +NamesAndTypesList InterpreterCreateQuery::parseColumns(ASTPtr expression_list, const DataTypeFactory & data_type_factory) +{ + NamesAndTypesList columns; + ASTExpressionList & columns_list = typeid_cast(*expression_list); + for (const ASTPtr & ast : columns_list.children) + { + const ASTNameTypePair & name_and_type_pair = typeid_cast(*ast); + StringRange type_range = name_and_type_pair.type->range; + columns.push_back(NameAndTypePair( + name_and_type_pair.name, + data_type_factory.get(String(type_range.first, type_range.second - type_range.first)))); + } + columns = *DataTypeNested::expandNestedColumns(columns); + return columns; +} + +ASTPtr InterpreterCreateQuery::formatColumns(const NamesAndTypesList & columns) +{ + ASTPtr columns_list_ptr = new ASTExpressionList; + ASTExpressionList & columns_list = typeid_cast(*columns_list_ptr); + + for (const NameAndTypePair & it : columns) + { + ASTPtr name_and_type_pair_ptr = new ASTNameTypePair; + ASTNameTypePair & name_and_type_pair = typeid_cast(*name_and_type_pair_ptr); + name_and_type_pair.name = it.name; + StringPtr type_name = new String(it.type->getName()); + + ParserIdentifierWithOptionalParameters storage_p; + Expected expected = ""; + const char * pos = type_name->data(); + const char * end = pos + type_name->size(); + + if (!storage_p.parse(pos, end, name_and_type_pair.type, expected)) + throw Exception("Cannot parse data type.", ErrorCodes::SYNTAX_ERROR); + + name_and_type_pair.type->query_string = type_name; + columns_list.children.push_back(name_and_type_pair_ptr); + } + + return columns_list_ptr; +} + } diff --git a/dbms/src/Storages/ITableDeclaration.cpp b/dbms/src/Storages/ITableDeclaration.cpp index 580bacb8fe0..07181186b72 100644 --- a/dbms/src/Storages/ITableDeclaration.cpp +++ b/dbms/src/Storages/ITableDeclaration.cpp @@ -176,84 +176,4 @@ void ITableDeclaration::check(const Block & block, bool need_all) const } } -/// одинаковыми считаются имена, если они совпадают целиком или name_without_dot совпадает с частью имени до точки -static bool namesEqual(const String & name_without_dot, const DB::NameAndTypePair & name_type) -{ - String name_with_dot = name_without_dot + "."; - return (name_with_dot == name_type.name.substr(0, name_without_dot.length() + 1) || name_without_dot == name_type.name); -} - -void ITableDeclaration::alterColumns(const ASTAlterQuery::Parameters & params, NamesAndTypesListPtr & columns, const Context & context) -{ - if (params.type == ASTAlterQuery::ADD) - { - NamesAndTypesList::iterator insert_it = columns->end(); - if (params.column) - { - String column_name = typeid_cast(*params.column).name; - - /// Пытаемся найти первую с конца колонку с именем column_name или с именем, начинающимся с column_name и ".". - /// Например "fruits.bananas" - NamesAndTypesList::reverse_iterator reverse_insert_it = std::find_if(columns->rbegin(), columns->rend(), boost::bind(namesEqual, column_name, _1) ); - - if (reverse_insert_it == columns->rend()) - throw Exception("Wrong column name. Cannot find column " + column_name + " to insert after", DB::ErrorCodes::ILLEGAL_COLUMN); - else - { - /// base возвращает итератор уже смещенный на один элемент вправо - insert_it = reverse_insert_it.base(); - } - } - - const ASTNameTypePair & ast_name_type = typeid_cast(*params.name_type); - StringRange type_range = ast_name_type.type->range; - String type_string = String(type_range.first, type_range.second - type_range.first); - - DB::DataTypePtr data_type = context.getDataTypeFactory().get(type_string); - NameAndTypePair pair(ast_name_type.name, data_type ); - columns->insert(insert_it, pair); - - /// Медленно, так как каждый раз копируется список - columns = DataTypeNested::expandNestedColumns(*columns); - return; - } - else if (params.type == ASTAlterQuery::DROP) - { - String column_name = typeid_cast(*(params.column)).name; - - /// Удаляем колонки из листа columns - bool is_first = true; - NamesAndTypesList::iterator column_it; - do - { - column_it = std::find_if(columns->begin(), columns->end(), boost::bind(namesEqual, column_name, _1)); - - if (column_it == columns->end()) - { - if (is_first) - throw Exception("Wrong column name. Cannot find column " + column_name + " to drop", DB::ErrorCodes::ILLEGAL_COLUMN); - } - else - columns->erase(column_it); - is_first = false; - } - while (column_it != columns->end()); - } - else if (params.type == ASTAlterQuery::MODIFY) - { - const ASTNameTypePair & ast_name_type = typeid_cast(*params.name_type); - StringRange type_range = ast_name_type.type->range; - String type_string = String(type_range.first, type_range.second - type_range.first); - - DB::DataTypePtr data_type = context.getDataTypeFactory().get(type_string); - NameAndTypePair pair(ast_name_type.name, data_type); - NamesAndTypesList::iterator column_it = std::find_if(columns->begin(), columns->end(), boost::bind(namesEqual, ast_name_type.name, _1) ); - if (column_it == columns->end()) - throw Exception("Wrong column name. Cannot find column " + ast_name_type.name + " to modify.", DB::ErrorCodes::ILLEGAL_COLUMN); - column_it->type = data_type; - } - else - throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR); -} - } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index ccf78037cbf..8f68d353075 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -316,272 +316,219 @@ void MergeTreeData::dropAllData() Poco::File(full_path).remove(true); } -void MergeTreeData::removeColumnFiles(String column_name, bool remove_array_size_files) + +void MergeTreeData::checkAlter(const AlterCommands & params) { - Poco::ScopedLock lock(data_parts_mutex); - Poco::ScopedLock lock_all(all_data_parts_mutex); + /// Проверим, что указанные преобразования можно совершить над списком столбцов без учета типов. + NamesAndTypesList new_columns = *columns; + params.apply(new_columns); - size_t dot_pos = column_name.find('.'); - if (dot_pos != std::string::npos) + /// Список столбцов, которые нельзя трогать. + /// sampling_expression можно не учитывать, потому что он обязан содержаться в первичном ключе. + Names keys = primary_expr->getRequiredColumns(); + keys.push_back(sign_column); + std::sort(keys.begin(), keys.end()); + + for (const AlterCommand & command : params) { - std::string nested_column = column_name.substr(0, dot_pos); - column_name = nested_column + "%2E" + column_name.substr(dot_pos + 1); - - if (remove_array_size_files) - column_name = std::string("(?:") + nested_column + "|" + column_name + ")"; + if (std::binary_search(keys.begin(), keys.end(), command.column_name)) + throw Exception("trying to ALTER key column " + command.column_name, ErrorCodes::ILLEGAL_COLUMN); } - /// Регэксп выбирает файлы столбца для удаления - Poco::RegularExpression re(column_name + "(?:(?:\\.|\\%2E).+){0,1}" +"(?:\\.mrk|\\.bin|\\.size\\d+\\.bin|\\.size\\d+\\.mrk)"); - /// Цикл по всем директориям кусочков - Poco::RegularExpression::MatchVec matches; - Poco::DirectoryIterator end; - for (Poco::DirectoryIterator it_dir = Poco::DirectoryIterator(full_path); it_dir != end; ++it_dir) + /// Проверим, что преобразования типов возможны. + ExpressionActionsPtr unused_expression; + NameToNameMap unused_map; + createConvertExpression(nullptr, *columns, new_columns, unused_expression, unused_map); +} + +void MergeTreeData::createConvertExpression(DataPartPtr part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns, + ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map) +{ + out_expression = nullptr; + out_rename_map.clear(); + + typedef std::map NameToType; + NameToType new_types; + for (const NameAndTypePair & column : new_columns) { - std::string dir_name = it_dir.name(); + new_types[column.name] = column.type; + } - if (!ActiveDataPartSet::isPartDirectory(dir_name, matches)) - continue; + /// Сколько столбцов сейчас в каждой вложенной структуре. Столбцы не из вложенных структур сюда тоже попадут и не помешают. + std::map nested_table_counts; + for (const NameAndTypePair & column : old_columns) + { + ++nested_table_counts[DataTypeNested::extractNestedTableName(column.name)]; + } - /// Цикл по каждому из файлов в директории кусочков - String full_dir_name = full_path + dir_name + "/"; - for (Poco::DirectoryIterator it_file(full_dir_name); it_file != end; ++it_file) + for (const NameAndTypePair & column : old_columns) + { + if (!new_types.count(column.name)) { - if (re.match(it_file.name())) + if (!part || part->hasColumnFiles(column.name)) { - Poco::File file(full_dir_name + it_file.name()); - if (file.exists()) - file.remove(); + /// Столбец нужно удалить. + + String escaped_column = escapeForFileName(column.name); + out_rename_map[escaped_column + ".bin"] = ""; + out_rename_map[escaped_column + ".mrk"] = ""; + + /// Если это массив или последний столбец вложенной структуры, нужно удалить файлы с размерами. + if (typeid_cast(&*column.type)) + { + String nested_table = DataTypeNested::extractNestedTableName(column.name); + /// Если это был последний столбец, относящийся к этим файлам .size0, удалим файлы. + if (!--nested_table_counts[nested_table]) + { + String escaped_nested_table = escapeForFileName(nested_table); + out_rename_map[escaped_nested_table + ".size0.bin"] = ""; + out_rename_map[escaped_nested_table + ".size0.mrk"] = ""; + } + } } } - - /// Удаляем лишние столбцы из checksums.txt - for (auto & part : all_data_parts) + else { - if (!part) - continue; + String new_type_name = new_types[column.name]->getName(); - for (auto it = part->checksums.files.lower_bound(column_name); - (it != part->checksums.files.end()) && (it->first.substr(0, column_name.size()) == column_name);) + if (new_type_name != column.type->getName() && + (!part || part->hasColumnFiles(column.name))) { - if (re.match(it->first)) - it = const_cast(part->checksums.files).erase(it); - else - ++it; + /// Нужно изменить тип столбца. + + if (!out_expression) + out_expression = new ExpressionActions(NamesAndTypesList(), context.getSettingsRef()); + + out_expression->addInput(ColumnWithNameAndType(nullptr, column.type, column.name)); + + FunctionPtr function = context.getFunctionFactory().get("to" + new_type_name, context); + Names out_names; + out_expression->add(ExpressionAction::applyFunction(function, Names(1, column.name)), out_names); + out_expression->add(ExpressionAction::removeColumn(column.name)); + + String escaped_expr = escapeForFileName(out_names[0]); + String escaped_column = escapeForFileName(column.name); + out_rename_map[escaped_expr + ".bin"] = escaped_column + ".bin"; + out_rename_map[escaped_expr + ".mrk"] = escaped_column + ".mrk"; } - /// Записываем файл с чексуммами. - WriteBufferFromFile out(full_path + part->name + "/" + "checksums.txt", 1024); - part->checksums.writeText(out); } } } -void MergeTreeData::createConvertExpression( - const String & in_column_name, - const String & out_type, - ExpressionActionsPtr & out_expression, - String & out_column) +MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(DataPartPtr part, const NamesAndTypesList & new_columns) { - Names out_names; - out_expression = new ExpressionActions( - NamesAndTypesList(1, NameAndTypePair(in_column_name, getDataTypeByName(in_column_name))), context.getSettingsRef()); + ExpressionActionsPtr expression; + AlterDataPartTransactionPtr transaction(new AlterDataPartTransaction(part)); + createConvertExpression(part, *columns, new_columns, expression, transaction->rename_map); // TODO: part->columns - FunctionPtr function = context.getFunctionFactory().get("to" + out_type, context); - out_expression->add(ExpressionAction::applyFunction(function, Names(1, in_column_name)), out_names); - out_expression->add(ExpressionAction::removeColumn(in_column_name)); - - out_column = out_names[0]; -} - -static DataTypePtr getDataTypeByName(const String & name, const NamesAndTypesList & columns) -{ - for (const auto & it : columns) + if (transaction->rename_map.empty()) { - if (it.name == name) - return it.type; - } - throw Exception("No column " + name + " in table", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); -} - -/// одинаковыми считаются имена, вида "name.*" -static bool namesWithDotEqual(const String & name_with_dot, const NameAndTypePair & name_type) -{ - return (name_with_dot == name_type.name.substr(0, name_with_dot.length())); -} - -void MergeTreeData::alter(const ASTAlterQuery::Parameters & params) -{ - { - Poco::ScopedLock lock(data_parts_mutex); - Poco::ScopedLock lock_all(all_data_parts_mutex); - alterColumns(params, columns, context); + transaction->data_part = nullptr; + return transaction; } - if (params.type == ASTAlterQuery::DROP) - { - String column_name = typeid_cast(*params.column).name; + DataPart::Checksums add_checksums; - /// Если нет колонок вида nested_name.*, то удалим столбцы размера массивов - bool remove_array_size_files = false; - size_t dot_pos = column_name.find('.'); - if (dot_pos != std::string::npos) - { - remove_array_size_files = (columns->end() == std::find_if(columns->begin(), columns->end(), boost::bind(namesWithDotEqual, column_name.substr(0, dot_pos), _1))); - } - removeColumnFiles(column_name, remove_array_size_files); - - context.resetCaches(); - } -} - -void MergeTreeData::prepareAlterModify(const ASTAlterQuery::Parameters & params) -{ - DataPartsVector parts; - { - Poco::ScopedLock lock(data_parts_mutex); - parts = DataPartsVector(data_parts.begin(), data_parts.end()); - } - - Names column_name; - const ASTNameTypePair & name_type = typeid_cast(*params.name_type); - StringRange type_range = name_type.type->range; - String type(type_range.first, type_range.second - type_range.first); - DataTypePtr old_type_ptr = DB::getDataTypeByName(name_type.name, *columns); - DataTypePtr new_type_ptr = context.getDataTypeFactory().get(type); - if (typeid_cast(old_type_ptr.get()) || typeid_cast(old_type_ptr.get()) || - typeid_cast(new_type_ptr.get()) || typeid_cast(new_type_ptr.get())) - throw Exception("ALTER MODIFY not supported for nested and array types"); - - column_name.push_back(name_type.name); - ExpressionActionsPtr expr; - String out_column; - createConvertExpression(name_type.name, type, expr, out_column); - - ColumnNumbers num(1, 0); - for (DataPartPtr & part : parts) + /// Применим выражение и запишем результат во временные файлы. + if (expression) { MarkRanges ranges(1, MarkRange(0, part->size)); - ExpressionBlockInputStream in(new MergeTreeBlockInputStream(full_path + part->name + '/', - DEFAULT_MERGE_BLOCK_SIZE, column_name, *this, part, ranges, false, nullptr, ""), expr); + BlockInputStreamPtr part_in = new MergeTreeBlockInputStream(full_path + part->name + '/', + DEFAULT_MERGE_BLOCK_SIZE, expression->getRequiredColumns(), *this, part, ranges, false, nullptr, ""); + ExpressionBlockInputStream in(part_in, expression); MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true); in.readPrefix(); out.writePrefix(); - try + while (Block b = in.read()) + out.write(b); + + in.readSuffix(); + add_checksums = out.writeSuffixAndGetChecksums(); + } + + /// Обновим контрольные суммы. + DataPart::Checksums new_checksums = part->checksums; + for (auto it : transaction->rename_map) + { + if (it.second == "") { - while (Block b = in.read()) - out.write(b); + new_checksums.files.erase(it.first); + } + else + { + new_checksums.files[it.second] = add_checksums.files[it.first]; + } + } - in.readSuffix(); - DataPart::Checksums add_checksums = out.writeSuffixAndGetChecksums(); + /// Запишем обновленные контрольные суммы во временный файл + if (!part->checksums.empty()) + { + WriteBufferFromFile checksums_file(full_path + part->name + "/checksums.txt.tmp", 4096); + new_checksums.writeText(checksums_file); + transaction->rename_map["checksums.txt.tmp"] = "checksums.txt"; + } - /// Запишем обновленные контрольные суммы во временный файл. - if (!part->checksums.empty()) + return transaction; +} + +void MergeTreeData::AlterDataPartTransaction::commit() +{ + if (!data_part) + return; + try + { + String path = data_part->storage.full_path + data_part->name + "/"; + for (auto it : rename_map) + { + if (it.second.empty()) { - DataPart::Checksums new_checksums = part->checksums; - std::string escaped_name = escapeForFileName(name_type.name); - std::string escaped_out_column = escapeForFileName(out_column); - new_checksums.files[escaped_name + ".bin"] = add_checksums.files[escaped_out_column + ".bin"]; - new_checksums.files[escaped_name + ".mrk"] = add_checksums.files[escaped_out_column + ".mrk"]; - - WriteBufferFromFile checksums_file(full_path + part->name + '/' + escaped_out_column + ".checksums.txt", 1024); - new_checksums.writeText(checksums_file); + Poco::File(path + it.first).renameTo(path + it.first + ".removing"); + Poco::File(path + it.first + ".removing").remove(); + } + else + { + Poco::File(path + it.first).renameTo(path + it.second); } } - catch (const Exception & e) - { - if (e.code() != ErrorCodes::ALL_REQUESTED_COLUMNS_ARE_MISSING) - throw; - } + data_part = nullptr; + } + catch (...) + { + /// Если что-то пошло не так, не будем удалять временные файлы в деструкторе. + data_part = nullptr; + throw; } } -void MergeTreeData::commitAlterModify(const ASTAlterQuery::Parameters & params) +MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction() { - DataPartsVector parts; + try { - Poco::ScopedLock lock(data_parts_mutex); - parts = DataPartsVector(data_parts.begin(), data_parts.end()); - } + if (!data_part) + return; - const ASTNameTypePair & name_type = typeid_cast(*params.name_type); - StringRange type_range = name_type.type->range; - String type(type_range.first, type_range.second - type_range.first); + LOG_WARNING(data_part->storage.log, "Aborting ALTER of part " << data_part->name); - ExpressionActionsPtr expr; - String out_column; - createConvertExpression(name_type.name, type, expr, out_column); - - /// переименовываем файлы - /// переименовываем старые столбцы, добавляя расширение .old - for (DataPartPtr & part : parts) - { - std::string part_path = full_path + part->name + '/'; - std::string path = part_path + escapeForFileName(name_type.name); - if (Poco::File(path + ".bin").exists()) + String path = data_part->storage.full_path + data_part->name + "/"; + for (auto it : rename_map) { - LOG_TRACE(log, "Renaming " << path + ".bin" << " to " << path + ".bin" + ".old"); - Poco::File(path + ".bin").renameTo(path + ".bin" + ".old"); - LOG_TRACE(log, "Renaming " << path + ".mrk" << " to " << path + ".mrk" + ".old"); - Poco::File(path + ".mrk").renameTo(path + ".mrk" + ".old"); - - if (Poco::File(part_path + "checksums.txt").exists()) + if (!it.second.empty()) { - LOG_TRACE(log, "Renaming " << part_path + "checksums.txt" << " to " << part_path + "checksums.txt" + ".old"); - Poco::File(part_path + "checksums.txt").renameTo(part_path + "checksums.txt" + ".old"); + try + { + Poco::File(path + it.first).remove(); + } + catch (Poco::Exception & e) + { + LOG_WARNING(data_part->storage.log, "Can't remove " << path + it.first << ": " << e.displayText()); + } } } } - - /// переименовываем временные столбцы - for (DataPartPtr & part : parts) + catch (...) { - std::string part_path = full_path + part->name + '/'; - std::string name = escapeForFileName(out_column); - std::string new_name = escapeForFileName(name_type.name); - std::string path = part_path + name; - std::string new_path = part_path + new_name; - if (Poco::File(path + ".bin").exists()) - { - LOG_TRACE(log, "Renaming " << path + ".bin" << " to " << new_path + ".bin"); - Poco::File(path + ".bin").renameTo(new_path + ".bin"); - LOG_TRACE(log, "Renaming " << path + ".mrk" << " to " << new_path + ".mrk"); - Poco::File(path + ".mrk").renameTo(new_path + ".mrk"); - - if (Poco::File(path + ".checksums.txt").exists()) - { - LOG_TRACE(log, "Renaming " << path + ".checksums.txt" << " to " << part_path + ".checksums.txt"); - Poco::File(path + ".checksums.txt").renameTo(part_path + "checksums.txt"); - } - } - } - - // удаляем старые столбцы - for (DataPartPtr & part : parts) - { - std::string part_path = full_path + part->name + '/'; - std::string path = part_path + escapeForFileName(name_type.name); - if (Poco::File(path + ".bin" + ".old").exists()) - { - LOG_TRACE(log, "Removing old column " << path + ".bin" + ".old"); - Poco::File(path + ".bin" + ".old").remove(); - LOG_TRACE(log, "Removing old column " << path + ".mrk" + ".old"); - Poco::File(path + ".mrk" + ".old").remove(); - - if (Poco::File(part_path + "checksums.txt" + ".old").exists()) - { - LOG_TRACE(log, "Removing old checksums " << part_path + "checksums.txt" + ".old"); - Poco::File(part_path + "checksums.txt" + ".old").remove(); - } - } - } - - context.resetCaches(); - - { - Poco::ScopedLock lock(data_parts_mutex); - Poco::ScopedLock lock_all(all_data_parts_mutex); - alterColumns(params, columns, context); + tryLogCurrentException(__PRETTY_FUNCTION__); } } diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 6493eea55ea..eea0f17ff37 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -132,9 +133,11 @@ BlockInputStreams StorageDistributed::read( return res; } -void StorageDistributed::alter(const ASTAlterQuery::Parameters ¶ms) +void StorageDistributed::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) { - alterColumns(params, columns, context); + auto lock = lockStructureForAlter(); + params.apply(*columns); + InterpreterAlterQuery::updateMetadata(database_name, table_name, *columns, context); } } diff --git a/dbms/src/Storages/StorageFactory.cpp b/dbms/src/Storages/StorageFactory.cpp index 50e768c924b..7d8a1507e7b 100644 --- a/dbms/src/Storages/StorageFactory.cpp +++ b/dbms/src/Storages/StorageFactory.cpp @@ -75,8 +75,6 @@ StoragePtr StorageFactory::get( NamesAndTypesListPtr columns, bool attach) const { - columns = DataTypeNested::expandNestedColumns(*columns); - if (name == "Log") { return StorageLog::create(data_path, table_name, columns, context.getSettings().max_compress_block_size); diff --git a/dbms/src/Storages/StorageMerge.cpp b/dbms/src/Storages/StorageMerge.cpp index 772b573630d..edce707e500 100644 --- a/dbms/src/Storages/StorageMerge.cpp +++ b/dbms/src/Storages/StorageMerge.cpp @@ -1,6 +1,7 @@ #include #include #include +#include namespace DB { @@ -166,9 +167,12 @@ void StorageMerge::getSelectedTables(StorageVector & selected_tables) const } -void StorageMerge::alter(const ASTAlterQuery::Parameters & params) +void StorageMerge::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) { - alterColumns(params, columns, context); -} + auto lock = lockStructureForAlter(); + params.apply(*columns); + InterpreterAlterQuery::updateMetadata(database_name, table_name, *columns, context); +} + } diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index d5eff8241c0..c84c0391535 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -2,6 +2,7 @@ #include #include #include +#include namespace DB { @@ -103,19 +104,33 @@ void StorageMergeTree::rename(const String & new_path_to_db, const String & new_ /// TODO: Можно обновить названия логгеров у this, data, reader, writer, merger. } -void StorageMergeTree::alter(const ASTAlterQuery::Parameters & params) +void StorageMergeTree::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) { - data.alter(params); -} + /// NOTE: Здесь так же как в ReplicatedMergeTree можно сделать ALTER, не блокирующий запись данных надолго. -void StorageMergeTree::prepareAlterModify(const ASTAlterQuery::Parameters & params) -{ - data.prepareAlterModify(params); -} + auto table_soft_lock = lockDataForAlter(); -void StorageMergeTree::commitAlterModify(const ASTAlterQuery::Parameters & params) -{ - data.commitAlterModify(params); + data.checkAlter(params); + + NamesAndTypesList new_columns = data.getColumnsList(); + params.apply(new_columns); + + MergeTreeData::DataParts parts = data.getDataParts(); + std::vector transactions; + for (MergeTreeData::DataPartPtr part : parts) + { + transactions.push_back(data.alterDataPart(part, new_columns)); + } + + auto table_hard_lock = lockStructureForAlter(); + + InterpreterAlterQuery::updateMetadata(database_name, table_name, new_columns, context); + data.setColumnsList(new_columns); + + for (auto & transaction : transactions) + { + transaction->commit(); + } } bool StorageMergeTree::merge(bool aggressive, BackgroundProcessingPool::Context * pool_context) diff --git a/libs/libzkutil/include/zkutil/LeaderElection.h b/libs/libzkutil/include/zkutil/LeaderElection.h index 90df7434d93..c5bee69fdf8 100644 --- a/libs/libzkutil/include/zkutil/LeaderElection.h +++ b/libs/libzkutil/include/zkutil/LeaderElection.h @@ -107,7 +107,7 @@ private: } if (zookeeper.exists(path + "/" + *(it - 1), nullptr, event)) - event->tryWait(60 * 1000); + event->wait(); success = true; }