This commit is contained in:
Michael Kolupaev 2014-07-14 16:02:17 +04:00
commit 516789157b
20 changed files with 538 additions and 665 deletions

View File

@ -252,6 +252,7 @@ namespace ErrorCodes
TABLE_IS_READ_ONLY, TABLE_IS_READ_ONLY,
NOT_ENOUGH_SPACE, NOT_ENOUGH_SPACE,
UNEXPECTED_ZOOKEEPER_ERROR, UNEXPECTED_ZOOKEEPER_ERROR,
INVALID_NESTED_NAME,
POCO_EXCEPTION = 1000, POCO_EXCEPTION = 1000,
STD_EXCEPTION, STD_EXCEPTION,

View File

@ -1,11 +1,14 @@
#pragma once #pragma once
#include <DB/Storages/IStorage.h> #include <DB/Storages/IStorage.h>
#include <DB/Storages/AlterCommands.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Parsers/ASTIdentifier.h>
namespace DB namespace DB
{ {
class ASTIdentifier;
/** Позволяет добавить или удалить столбец в таблице. /** Позволяет добавить или удалить столбец в таблице.
*/ */
@ -16,10 +19,12 @@ public:
void execute(); void execute();
private: /** Изменяет список столбцов в метаданных таблицы на диске. Нужно вызывать под TableStructureLock соответствующей таблицы.
void dropColumnFromAST(const ASTIdentifier & drop_column, ASTs & columns); */
void addColumnToAST(StoragePtr table, ASTs & columns, const ASTPtr & add_column_ptr, const ASTPtr & after_column_ptr); static void updateMetadata(const String & database, const String & table, const NamesAndTypesList & columns, Context & context);
static AlterCommands parseAlter(const ASTAlterQuery::ParameterContainer & params, const DataTypeFactory & data_type_factory);
private:
ASTPtr query_ptr; ASTPtr query_ptr;
Context context; Context context;

View File

@ -21,6 +21,11 @@ public:
* (для случая выполнения запроса из существующего файла с метаданными). * (для случая выполнения запроса из существующего файла с метаданными).
*/ */
StoragePtr execute(bool assume_metadata_exists = false); StoragePtr execute(bool assume_metadata_exists = false);
/** AST в список столбцов с типами и обратно. Столбцы типа Nested развернуты в список настоящих столбцов.
*/
static NamesAndTypesList parseColumns(ASTPtr expression_list, const DataTypeFactory & data_type_factory);
static ASTPtr formatColumns(const NamesAndTypesList & columns);
private: private:
ASTPtr query_ptr; ASTPtr query_ptr;

View File

@ -0,0 +1,120 @@
#pragma once
#include <DB/Core/NamesAndTypes.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataTypes/DataTypeArray.h>
namespace DB
{
/// Операция из запроса ALTER. Добавление столбцов типа Nested не развернуто в добавление отдельных столбцов.
struct AlterCommand
{
enum Type
{
ADD,
DROP,
MODIFY
};
Type type;
String column_name;
/// Для ADD и MODIFY - новый тип столбца.
DataTypePtr data_type;
/// Для ADD - после какого столбца добавить новый. Если пустая строка, добавить в конец. Добавить в начало сейчас нельзя.
String after_column;
/// одинаковыми считаются имена, если они совпадают целиком или name_without_dot совпадает с частью имени до точки
static bool namesEqual(const String & name_without_dot, const DB::NameAndTypePair & name_type)
{
String name_with_dot = name_without_dot + ".";
return (name_with_dot == name_type.name.substr(0, name_without_dot.length() + 1) || name_without_dot == name_type.name);
}
void apply(NamesAndTypesList & columns) const
{
if (type == ADD)
{
if (std::count_if(columns.begin(), columns.end(), std::bind(namesEqual, column_name, std::placeholders::_1)))
throw Exception("Cannot add column " + column_name + ": column with this name already exisits.",
DB::ErrorCodes::ILLEGAL_COLUMN);
if (DataTypeNested::extractNestedTableName(column_name) != column_name &&
!typeid_cast<const DataTypeArray *>(&*data_type))
throw Exception("Can't add nested column " + column_name + " of non-array type " + data_type->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
NamesAndTypesList::iterator insert_it = columns.end();
if (!after_column.empty())
{
/// Пытаемся найти первую с конца колонку с именем column_name или с именем, начинающимся с column_name и ".".
/// Например "fruits.bananas"
/// одинаковыми считаются имена, если они совпадают целиком или name_without_dot совпадает с частью имени до точки
NamesAndTypesList::reverse_iterator reverse_insert_it = std::find_if(columns.rbegin(), columns.rend(),
std::bind(namesEqual, after_column, std::placeholders::_1));
if (reverse_insert_it == columns.rend())
throw Exception("Wrong column name. Cannot find column " + column_name + " to insert after",
DB::ErrorCodes::ILLEGAL_COLUMN);
else
{
/// base возвращает итератор, уже смещенный на один элемент вправо
insert_it = reverse_insert_it.base();
}
}
columns.insert(insert_it, NameAndTypePair(column_name, data_type));
/// Медленно, так как каждый раз копируется список
columns = *DataTypeNested::expandNestedColumns(columns);
}
else if (type == DROP)
{
bool is_first = true;
NamesAndTypesList::iterator column_it;
do
{
column_it = std::find_if(columns.begin(), columns.end(), std::bind(namesEqual, column_name, std::placeholders::_1));
if (column_it == columns.end())
{
if (is_first)
throw Exception("Wrong column name. Cannot find column " + column_name + " to drop",
DB::ErrorCodes::ILLEGAL_COLUMN);
}
else
columns.erase(column_it);
is_first = false;
}
while (column_it != columns.end());
}
else if (type == MODIFY)
{
NamesAndTypesList::iterator column_it = std::find_if(columns.begin(), columns.end(),
std::bind(namesEqual, column_name, std::placeholders::_1) );
if (column_it == columns.end())
throw Exception("Wrong column name. Cannot find column " + column_name + " to modify.",
DB::ErrorCodes::ILLEGAL_COLUMN);
column_it->type = data_type;
}
else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
}
};
class AlterCommands : public std::vector<AlterCommand>
{
public:
void apply(NamesAndTypesList & columns) const
{
NamesAndTypesList new_columns = columns;
for (const AlterCommand & command : *this)
command.apply(new_columns);
columns = new_columns;
}
};
}

View File

@ -11,6 +11,7 @@
#include <DB/Parsers/ASTAlterQuery.h> #include <DB/Parsers/ASTAlterQuery.h>
#include <DB/Interpreters/Settings.h> #include <DB/Interpreters/Settings.h>
#include <DB/Storages/ITableDeclaration.h> #include <DB/Storages/ITableDeclaration.h>
#include <DB/Storages/AlterCommands.h>
#include <Poco/File.h> #include <Poco/File.h>
#include <Poco/RWLock.h> #include <Poco/RWLock.h>
@ -196,30 +197,14 @@ public:
} }
/** ALTER таблицы в виде изменения столбцов, не затрагивающий изменение Storage или его параметров. /** ALTER таблицы в виде изменения столбцов, не затрагивающий изменение Storage или его параметров.
* (ALTER, затрагивающий изменение движка, делается внешним кодом, путём копирования данных.) * Этот метод должен полностью выполнить запрос ALTER, самостоятельно заботясь о блокировках.
* Вызывается при заблокированной на запись структуре таблицы. * Для обновления метаданных таблицы на диске этот метод должен вызвать InterpreterAlterQuery::updateMetadata.
* Для ALTER MODIFY можно использовать другие методы (см. ниже).
*/ */
virtual void alter(const ASTAlterQuery::Parameters & params) virtual void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)
{ {
throw Exception("Method alter is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); throw Exception("Method alter is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
} }
/** ALTER MODIFY (изменение типа столбца) выполняется в два вызова:
* Сначала вызывается prepareAlterModify при заблокированной записи данных, но незаблокированной структуре таблицы.
* В нем можно выполнить долгую работу по записи сконвертированных данных, оставляя доступными существующие данные.
* Потом вызывается commitAlterModify при заблокированной структуре таблицы.
* В нем нужно закончить изменение типа столбца.
* Для движков с тривиальным ALTER MODIFY можно оставить реализацию по умолчанию, вызывающую alter.
*/
virtual void prepareAlterModify(const ASTAlterQuery::Parameters & params) {}
virtual void commitAlterModify(const ASTAlterQuery::Parameters & params)
{
alter(params);
}
/** Выполнить какую-либо фоновую работу. Например, объединение кусков в таблице типа MergeTree. /** Выполнить какую-либо фоновую работу. Например, объединение кусков в таблице типа MergeTree.
* Возвращает - была ли выполнена какая-либо работа. * Возвращает - была ли выполнена какая-либо работа.
*/ */

View File

@ -4,7 +4,6 @@
#include <DB/Core/NamesAndTypes.h> #include <DB/Core/NamesAndTypes.h>
#include <DB/Core/Exception.h> #include <DB/Core/Exception.h>
#include <DB/Core/Block.h> #include <DB/Core/Block.h>
#include <DB/Parsers/ASTAlterQuery.h>
namespace DB namespace DB
{ {
@ -62,9 +61,6 @@ public:
*/ */
void check(const Block & block, bool need_all = false) const; void check(const Block & block, bool need_all = false) const;
/// реализация alter, модифицирующая список столбцов.
static void alterColumns(const ASTAlterQuery::Parameters & params, NamesAndTypesListPtr & columns, const Context & context);
virtual ~ITableDeclaration() {} virtual ~ITableDeclaration() {}
}; };

View File

@ -338,6 +338,13 @@ public:
} }
} }
} }
bool hasColumnFiles(const String & column) const
{
String escaped_column = escapeForFileName(column);
return Poco::File(storage.full_path + name + "/" + escaped_column + ".bin").exists() &&
Poco::File(storage.full_path + name + "/" + escaped_column + ".mrk").exists();
}
}; };
typedef std::shared_ptr<DataPart> MutableDataPartPtr; typedef std::shared_ptr<DataPart> MutableDataPartPtr;
@ -385,6 +392,28 @@ public:
DataPartsVector added_parts; DataPartsVector added_parts;
}; };
/// Объект, помнящий какие временные файлы были созданы в директории с куском в ходе изменения (ALTER) его столбцов.
class AlterDataPartTransaction : private boost::noncopyable
{
public:
/// Переименовывает временные файлы, завершая ALTER куска.
void commit();
/// Если не был вызван commit(), удаляет временные файлы, отменяя ALTER куска.
~AlterDataPartTransaction();
private:
friend class MergeTreeData;
AlterDataPartTransaction(DataPartPtr data_part_) : data_part(data_part_) {}
DataPartPtr data_part;
/// Если значение - пустая строка, файл нужно удалить, и он не временный.
NameToNameMap rename_map;
};
typedef std::unique_ptr<AlterDataPartTransaction> AlterDataPartTransactionPtr;
/// Режим работы. См. выше. /// Режим работы. См. выше.
enum Mode enum Mode
{ {
@ -489,13 +518,23 @@ public:
/** Перемещает всю директорию с данными. /** Перемещает всю директорию с данными.
* Сбрасывает кеши разжатых блоков и засечек. * Сбрасывает кеши разжатых блоков и засечек.
* Нужно вызывать под залоченным lockStructure(). * Нужно вызывать под залоченным lockStructureForAlter().
*/ */
void setPath(const String & full_path); void setPath(const String & full_path);
void alter(const ASTAlterQuery::Parameters & params); /* Проверить, что такой ALTER можно выполнить:
void prepareAlterModify(const ASTAlterQuery::Parameters & params); * - Есть все нужные столбцы.
void commitAlterModify(const ASTAlterQuery::Parameters & params); * - Все преобразования типов допустимы.
* - Не затронуты столбцы ключа, знака и семплирования.
* Бросает исключение, если что-то не так.
*/
void checkAlter(const AlterCommands & params);
/// Выполняет ALTER куска данных и записывает результат во временные файлы.
AlterDataPartTransactionPtr alterDataPart(DataPartPtr part, const NamesAndTypesList & new_columns);
/// Нужно вызывать под залоченным lockStructureForAlter().
void setColumnsList(const NamesAndTypesList & new_columns) { columns = new NamesAndTypesList(new_columns); }
ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; } ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; }
SortDescription getSortDescription() const { return sort_descr; } SortDescription getSortDescription() const { return sort_descr; }
@ -540,12 +579,17 @@ private:
/// Загрузить множество кусков с данными с диска. Вызывается один раз - при создании объекта. /// Загрузить множество кусков с данными с диска. Вызывается один раз - при создании объекта.
void loadDataParts(); void loadDataParts();
void removeColumnFiles(String column_name, bool remove_array_size_files);
/// Определить, не битые ли данные в директории. Проверяет индекс и засечеки, но не сами данные. /// Определить, не битые ли данные в директории. Проверяет индекс и засечеки, но не сами данные.
bool isBrokenPart(const String & path); bool isBrokenPart(const String & path);
void createConvertExpression(const String & in_column_name, const String & out_type, ExpressionActionsPtr & out_expression, String & out_column); /** Выражение, преобразующее типы столбцов.
* Если преобразований типов нет, out_expression=nullptr.
* out_rename_map отображает файлы-столбцы на выходе выражения в новые файлы таблицы.
* Файлы, которые нужно удалить, в out_rename_map отображаются в пустую строку.
* Если !part, просто проверяет, что все нужные преобразования типов допустимы.
*/
void createConvertExpression(DataPartPtr part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map);
}; };
} }

View File

@ -59,7 +59,7 @@ public:
void rename(const String & new_path_to_db, const String & new_name) { name = new_name; } void rename(const String & new_path_to_db, const String & new_name) { name = new_name; }
/// в подтаблицах добавлять и удалять столбы нужно вручную /// в подтаблицах добавлять и удалять столбы нужно вручную
/// структура подтаблиц не проверяется /// структура подтаблиц не проверяется
void alter(const ASTAlterQuery::Parameters &params); void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context);
private: private:
StorageDistributed( StorageDistributed(

View File

@ -49,7 +49,7 @@ public:
/// в подтаблицах добавлять и удалять столбы нужно вручную /// в подтаблицах добавлять и удалять столбы нужно вручную
/// структура подтаблиц не проверяется /// структура подтаблиц не проверяется
void alter(const ASTAlterQuery::Parameters & params); void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context);
Block getBlockWithVirtualColumns(const std::vector<StoragePtr> & selected_tables) const; Block getBlockWithVirtualColumns(const std::vector<StoragePtr> & selected_tables) const;
private: private:

View File

@ -71,9 +71,7 @@ public:
void rename(const String & new_path_to_db, const String & new_name); void rename(const String & new_path_to_db, const String & new_name);
void alter(const ASTAlterQuery::Parameters & params); void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context);
void prepareAlterModify(const ASTAlterQuery::Parameters & params);
void commitAlterModify(const ASTAlterQuery::Parameters & params);
bool supportsIndexForIn() const override { return true; } bool supportsIndexForIn() const override { return true; }

View File

@ -33,15 +33,21 @@ std::string DataTypeNested::concatenateNestedName(const std::string & nested_tab
std::string DataTypeNested::extractNestedTableName(const std::string & nested_name) std::string DataTypeNested::extractNestedTableName(const std::string & nested_name)
{ {
const char * pos = strchr(nested_name.data(), '.'); const char * first_pos = strchr(nested_name.data(), '.');
return pos == nullptr ? nested_name : nested_name.substr(0, pos - nested_name.data()); const char * last_pos = strrchr(nested_name.data(), '.');
if (first_pos != last_pos)
throw Exception("Invalid nested column name: " + nested_name, ErrorCodes::INVALID_NESTED_NAME);
return first_pos == nullptr ? nested_name : nested_name.substr(0, first_pos - nested_name.data());
} }
std::string DataTypeNested::extractNestedColumnName(const std::string & nested_name) std::string DataTypeNested::extractNestedColumnName(const std::string & nested_name)
{ {
const char * pos = strrchr(nested_name.data(), '.'); const char * first_pos = strchr(nested_name.data(), '.');
return pos == nullptr ? nested_name : nested_name.substr(pos - nested_name.data() + 1); const char * last_pos = strrchr(nested_name.data(), '.');
if (first_pos != last_pos)
throw Exception("Invalid nested column name: " + nested_name, ErrorCodes::INVALID_NESTED_NAME);
return last_pos == nullptr ? nested_name : nested_name.substr(last_pos - nested_name.data() + 1);
} }

View File

@ -1,4 +1,5 @@
#include <DB/Interpreters/InterpreterAlterQuery.h> #include <DB/Interpreters/InterpreterAlterQuery.h>
#include <DB/Interpreters/InterpreterCreateQuery.h>
#include <DB/Parsers/ASTAlterQuery.h> #include <DB/Parsers/ASTAlterQuery.h>
#include <DB/Parsers/ASTCreateQuery.h> #include <DB/Parsers/ASTCreateQuery.h>
#include <DB/Parsers/ASTExpressionList.h> #include <DB/Parsers/ASTExpressionList.h>
@ -27,297 +28,108 @@ InterpreterAlterQuery::InterpreterAlterQuery(ASTPtr query_ptr_, Context & contex
{ {
} }
static bool namesEqual(const String &name, const ASTPtr & name_type_)
{
const ASTNameTypePair & name_type = typeid_cast<const ASTNameTypePair &>(*name_type_);
return name_type.name == name;
}
/// одинаковыми считаются имена, если они совпадают целиком или name_without_dot совпадает с частью имени до точки
static bool namesEqualIgnoreAfterDot(const String & name_without_dot, const ASTPtr & name_type_)
{
const ASTNameTypePair & name_type = typeid_cast<const ASTNameTypePair &>(*name_type_);
String name_with_dot = name_without_dot + ".";
return (name_without_dot == name_type.name || name_with_dot == name_type.name.substr(0, name_with_dot.length()));
}
void InterpreterAlterQuery::dropColumnFromAST(const ASTIdentifier & drop_column, ASTs & columns)
{
Exception e("Wrong column name. Cannot find column " + drop_column.name + " to drop", DB::ErrorCodes::ILLEGAL_COLUMN);
ASTs::iterator drop_it;
size_t dot_pos = drop_column.name.find('.');
/// случай удаления nested столбца
if (dot_pos != std::string::npos)
{
/// в Distributed таблицах столбцы имеют название "nested.column"
drop_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, drop_column.name, _1));
if (drop_it != columns.end())
columns.erase(drop_it);
else
{
try
{
/// в MergeTree таблицах есть ASTFunction "nested"
/// в аргументах которой записаны столбцы
ASTs::iterator nested_it;
std::string nested_base_name = drop_column.name.substr(0, dot_pos);
nested_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, nested_base_name, _1));
if (nested_it == columns.end())
throw e;
if ((**nested_it).children.size() != 1)
throw e;
ASTFunction & f = typeid_cast<ASTFunction &>(*(**nested_it).children.back());
if (f.name != "Nested")
throw e;
ASTs & nested_columns = typeid_cast<ASTExpressionList &>(*f.arguments).children;
drop_it = std::find_if(nested_columns.begin(), nested_columns.end(), boost::bind(namesEqual, drop_column.name.substr(dot_pos + 1), _1));
if (drop_it == nested_columns.end())
throw e;
else
nested_columns.erase(drop_it);
if (nested_columns.empty())
columns.erase(nested_it);
}
catch (std::bad_cast & bad_cast_err)
{
throw e;
}
}
}
else
{
drop_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, drop_column.name, _1));
if (drop_it == columns.end())
throw e;
else
columns.erase(drop_it);
}
}
void addColumnToAST1(ASTs & columns, const ASTPtr & add_column_ptr, const ASTPtr & after_column_ptr)
{
const ASTNameTypePair & add_column = typeid_cast<const ASTNameTypePair &>(*add_column_ptr);
const ASTIdentifier * col_after = after_column_ptr ? &typeid_cast<const ASTIdentifier &>(*after_column_ptr) : nullptr;
if (std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, add_column.name, _1)) != columns.end())
{
throw Exception("Fail to add column " + add_column.name + ". Column already exists");
}
ASTs::iterator insert_it = columns.end();
if (col_after)
{
/// если есть точка, то нас просят вставить после nested столбца
auto find_functor = col_after->name.find('.') != std::string ::npos ? boost::bind(namesEqual, col_after->name, _1) : boost::bind(namesEqualIgnoreAfterDot, col_after->name, _1);
insert_it = std::find_if(columns.begin(), columns.end(), find_functor);
if (insert_it == columns.end())
throw Exception("Wrong column name. Cannot find column " + col_after->name + " to insert after");
++insert_it;
}
columns.insert(insert_it, add_column_ptr);
}
void InterpreterAlterQuery::addColumnToAST(StoragePtr table, ASTs & columns, const ASTPtr & add_column_ptr, const ASTPtr & after_column_ptr)
{
/// хотим исключение если приведение зафейлится
const ASTNameTypePair & add_column = typeid_cast<const ASTNameTypePair &>(*add_column_ptr);
const ASTIdentifier * after_col = after_column_ptr ? &typeid_cast<const ASTIdentifier &>(*after_column_ptr) : nullptr;
size_t dot_pos = add_column.name.find('.');
bool insert_nested_column = dot_pos != std::string::npos;
const DataTypeFactory & data_type_factory = context.getDataTypeFactory();
StringRange type_range = add_column.type->range;
String type(type_range.first, type_range.second - type_range.first);
DataTypePtr datatype = data_type_factory.get(type);
if (insert_nested_column)
{
if (!typeid_cast<DataTypeArray *>(datatype.get()))
{
throw Exception("Cannot add column " + add_column.name + ". Because it is not an array. Only arrays could be nested and consist '.' in their names");
}
}
if ((typeid_cast<StorageMergeTree *>(table.get()) || typeid_cast<StorageReplicatedMergeTree *>(table.get())) && insert_nested_column)
{
/// специальный случай для вставки nested столбцов в MergeTree
/// в MergeTree таблицах есть ASTFunction "Nested" в аргументах которой записаны столбцы
std::string nested_base_name = add_column.name.substr(0, dot_pos);
ASTs::iterator nested_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, nested_base_name, _1));
if (nested_it != columns.end())
{
/// нужно добавить колонку в уже существующий nested столбец
ASTFunction * nested_func = typeid_cast<ASTFunction *>((*nested_it)->children.back().get());
if (!(**nested_it).children.size() || !nested_func || nested_func->name != "Nested")
throw Exception("Column with name " + nested_base_name + " already exists. But it is not nested.");
ASTs & nested_columns = typeid_cast<ASTExpressionList &>(*nested_func->arguments).children;
ASTPtr new_nested_column_ptr = add_column_ptr->clone();
ASTNameTypePair& new_nested_column = typeid_cast<ASTNameTypePair &>(*new_nested_column_ptr);
new_nested_column.name = add_column.name.substr(dot_pos + 1);
ASTPtr new_after_column = after_column_ptr ? after_column_ptr->clone() : nullptr;
if (new_after_column)
{
size_t after_dot_pos = after_col->name.find('.');
if (after_dot_pos == std::string::npos)
throw Exception("Nested column " + add_column.name + " should be inserted only after nested column");
if (add_column.name.substr(0, dot_pos) != after_col->name.substr(0, after_dot_pos))
throw Exception("Nested column " + add_column.name + "should be inserted after column with the same name before the '.'");
typeid_cast<ASTIdentifier &>(*new_after_column).name = after_col->name.substr(after_dot_pos + 1);
}
{
/// удаляем массив из типа, т.е. Array(String) -> String
ParserIdentifierWithOptionalParameters type_parser;
Expected expected;
const char * begin = new_nested_column.type->range.first + strlen("Array(");
const char * end = new_nested_column.type->range.second - static_cast<int>(strlen(")"));
if (!type_parser.parse(begin, end, new_nested_column.type, expected))
throw Exception("Fail to convert type like Array(SomeType) -> SomeType for type " + type);
}
addColumnToAST1(nested_columns, new_nested_column_ptr, new_after_column);
}
else
{
throw Exception("If you want to create new Nested column use syntax like. ALTER TABLE table ADD COLUMN MyColumn Nested(Name1 Type1, Name2 Type2...) [AFTER BeforeColumn]");
}
}
else
{
/// в Distributed и Merge таблицах столбцы имеют название "nested.column"
addColumnToAST1(columns, add_column_ptr, after_column_ptr);
}
}
void InterpreterAlterQuery::execute() void InterpreterAlterQuery::execute()
{ {
ASTAlterQuery & alter = typeid_cast<ASTAlterQuery &>(*query_ptr); ASTAlterQuery & alter = typeid_cast<ASTAlterQuery &>(*query_ptr);
String & table_name = alter.table; String & table_name = alter.table;
String database_name = alter.database.empty() ? context.getCurrentDatabase() : alter.database; String database_name = alter.database.empty() ? context.getCurrentDatabase() : alter.database;
AlterCommands commands = parseAlter(alter.parameters, context.getDataTypeFactory());
StoragePtr table = context.getTable(database_name, table_name); StoragePtr table = context.getTable(database_name, table_name);
auto table_soft_lock = table->lockDataForAlter(); table->alter(commands, database_name, table_name, context);
}
const DataTypeFactory & data_type_factory = context.getDataTypeFactory(); AlterCommands InterpreterAlterQuery::parseAlter(
const ASTAlterQuery::ParameterContainer & params_container, const DataTypeFactory & data_type_factory)
{
AlterCommands res;
for (const auto & params : params_container)
{
res.push_back(AlterCommand());
AlterCommand & command = res.back();
if (params.type == ASTAlterQuery::ADD)
{
command.type = AlterCommand::ADD;
const ASTNameTypePair & ast_name_type = typeid_cast<const ASTNameTypePair &>(*params.name_type);
StringRange type_range = ast_name_type.type->range;
String type_string = String(type_range.first, type_range.second - type_range.first);
command.column_name = ast_name_type.name;
command.data_type = data_type_factory.get(type_string);
if (params.column)
command.after_column = typeid_cast<const ASTIdentifier &>(*params.column).name;
}
else if (params.type == ASTAlterQuery::DROP)
{
command.type = AlterCommand::DROP;
command.column_name = typeid_cast<const ASTIdentifier &>(*(params.column)).name;
}
else if (params.type == ASTAlterQuery::MODIFY)
{
command.type = AlterCommand::MODIFY;
const ASTNameTypePair & ast_name_type = typeid_cast<const ASTNameTypePair &>(*params.name_type);
StringRange type_range = ast_name_type.type->range;
String type_string = String(type_range.first, type_range.second - type_range.first);
command.column_name = ast_name_type.name;
command.data_type = data_type_factory.get(type_string);
}
else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
}
return res;
}
void InterpreterAlterQuery::updateMetadata(
const String& database_name, const String& table_name, const NamesAndTypesList& columns, Context& context)
{
String path = context.getPath(); String path = context.getPath();
String database_name_escaped = escapeForFileName(database_name); String database_name_escaped = escapeForFileName(database_name);
String table_name_escaped = escapeForFileName(table_name); String table_name_escaped = escapeForFileName(table_name);
String metadata_path = path + "metadata/" + database_name_escaped + "/" + table_name_escaped + ".sql"; String metadata_path = path + "metadata/" + database_name_escaped + "/" + table_name_escaped + ".sql";
String metadata_temp_path = metadata_path + ".tmp";
ASTPtr attach_ptr = context.getCreateQuery(database_name, table_name); StringPtr query = new String();
ASTCreateQuery & attach = typeid_cast<ASTCreateQuery &>(*attach_ptr);
attach.attach = true;
ASTs & columns = typeid_cast<ASTExpressionList &>(*attach.columns).children;
/// Различные проверки, на возможность выполнения запроса
ASTs columns_copy;
for (const auto & ast : columns)
columns_copy.push_back(ast->clone());
IdentifierNameSet identifier_names;
attach.storage->collectIdentifierNames(identifier_names);
for (ASTAlterQuery::ParameterContainer::const_iterator alter_it = alter.parameters.begin();
alter_it != alter.parameters.end(); ++alter_it)
{ {
const ASTAlterQuery::Parameters & params = *alter_it; ReadBufferFromFile in(metadata_path);
WriteBufferFromString out(*query);
if (params.type == ASTAlterQuery::ADD) copyData(in, out);
{
addColumnToAST(table, columns_copy, params.name_type, params.column);
}
else if (params.type == ASTAlterQuery::DROP)
{
const ASTIdentifier & drop_column = typeid_cast<const ASTIdentifier &>(*params.column);
/// Проверяем, что поле не является ключевым
if (identifier_names.find(drop_column.name) != identifier_names.end())
throw Exception("Cannot drop key column", DB::ErrorCodes::ILLEGAL_COLUMN);
dropColumnFromAST(drop_column, columns_copy);
}
else if (params.type == ASTAlterQuery::MODIFY)
{
const ASTNameTypePair & name_type = typeid_cast<const ASTNameTypePair &>(*params.name_type);
StringRange type_range = name_type.type->range;
/// проверяем корректность типа. В случае некоректного типа будет исключение
String type(type_range.first, type_range.second - type_range.first);
data_type_factory.get(type);
/// проверяем, что колонка существует
auto modified_column = std::find_if(columns_copy.begin(), columns_copy.end(), boost::bind(namesEqual, name_type.name, _1));
if ( modified_column == columns_copy.end())
throw Exception("Wrong column name. Column " + name_type.name + " not exists", DB::ErrorCodes::ILLEGAL_COLUMN);
/// Проверяем, что поле не является ключевым
if (identifier_names.find(name_type.name) != identifier_names.end())
throw Exception("Modification of primary column not supported", DB::ErrorCodes::ILLEGAL_COLUMN);
/// к сожалению, проверить на возможно ли это приведение типов можно только во время выполнения
}
} }
/// Пока разрешим читать из таблицы. Запретим при первой попытке изменить структуру таблицы. const char * begin = query->data();
/// Это позволит сделать большую часть первого MODIFY, не останавливая чтение из таблицы. const char * end = begin + query->size();
IStorage::TableStructureWriteLockPtr table_hard_lock; const char * pos = begin;
ParserCreateQuery parser;
ASTPtr ast;
Expected expected = "";
bool parse_res = parser.parse(pos, end, ast, expected);
/// Распарсенный запрос должен заканчиваться на конец входных данных или на точку с запятой.
if (!parse_res || (pos != end && *pos != ';'))
throw Exception(getSyntaxErrorMessage(parse_res, begin, end, pos, expected, "in file " + metadata_path),
DB::ErrorCodes::SYNTAX_ERROR);
ast->query_string = query;
ASTCreateQuery & attach = typeid_cast<ASTCreateQuery &>(*ast);
ASTPtr new_columns = InterpreterCreateQuery::formatColumns(columns);
*std::find(attach.children.begin(), attach.children.end(), attach.columns) = new_columns;
attach.columns = new_columns;
/// todo cycle over sub tables and tables
/// Применяем изменения
for (ASTAlterQuery::ParameterContainer::const_iterator alter_it = alter.parameters.begin();
alter_it != alter.parameters.end(); ++alter_it)
{ {
const ASTAlterQuery::Parameters & params = *alter_it; Poco::FileOutputStream ostr(metadata_temp_path);
if (params.type == ASTAlterQuery::MODIFY)
{
table->prepareAlterModify(params);
if (!table_hard_lock)
table_hard_lock = table->lockStructureForAlter();
table->commitAlterModify(params);
}
else
{
if (!table_hard_lock)
table_hard_lock = table->lockStructureForAlter();
table->alter(params);
}
if (params.type == ASTAlterQuery::ADD)
{
addColumnToAST(table, columns, params.name_type, params.column);
}
else if (params.type == ASTAlterQuery::DROP)
{
const ASTIdentifier & drop_column = typeid_cast<const ASTIdentifier &>(*params.column);
dropColumnFromAST(drop_column, columns);
}
else if (params.type == ASTAlterQuery::MODIFY)
{
const ASTNameTypePair & name_type = typeid_cast<const ASTNameTypePair &>(*params.name_type);
ASTs::iterator modify_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, name_type.name, _1));
ASTNameTypePair & modified_column = typeid_cast<ASTNameTypePair &>(**modify_it);
modified_column.type = name_type.type;
}
/// Перезаписываем файл метадата каждую итерацию
Poco::FileOutputStream ostr(metadata_path);
formatAST(attach, ostr, 0, false); formatAST(attach, ostr, 0, false);
} }
Poco::File(metadata_temp_path).renameTo(metadata_path);
} }

View File

@ -20,6 +20,7 @@
#include <DB/Interpreters/InterpreterSelectQuery.h> #include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Interpreters/InterpreterCreateQuery.h> #include <DB/Interpreters/InterpreterCreateQuery.h>
#include <DB/DataTypes/DataTypeNested.h>
namespace DB namespace DB
@ -114,15 +115,7 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists)
/// Получаем список столбцов /// Получаем список столбцов
if (create.columns) if (create.columns)
{ {
ASTExpressionList & columns_list = typeid_cast<ASTExpressionList &>(*create.columns); columns = new NamesAndTypesList(parseColumns(create.columns, context.getDataTypeFactory()));
for (ASTs::iterator it = columns_list.children.begin(); it != columns_list.children.end(); ++it)
{
ASTNameTypePair & name_and_type_pair = typeid_cast<ASTNameTypePair &>(**it);
StringRange type_range = name_and_type_pair.type->range;
columns->push_back(NameAndTypePair(
name_and_type_pair.name,
context.getDataTypeFactory().get(String(type_range.first, type_range.second - type_range.first))));
}
} }
else if (!create.as_table.empty()) else if (!create.as_table.empty())
columns = new NamesAndTypesList(as_storage->getColumnsList()); columns = new NamesAndTypesList(as_storage->getColumnsList());
@ -135,34 +128,13 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists)
else else
throw Exception("Incorrect CREATE query: required list of column descriptions or AS section or SELECT.", ErrorCodes::INCORRECT_QUERY); throw Exception("Incorrect CREATE query: required list of column descriptions or AS section or SELECT.", ErrorCodes::INCORRECT_QUERY);
/// Дополняем запрос списком столбцов из другой таблицы, если его не было. /// Даже если в запросе был список столбцов, на всякий случай приведем его к стандартному виду (развернем Nested).
if (!create.columns) ASTPtr new_columns = formatColumns(*columns);
{ if (create.columns)
ASTPtr columns_list_ptr = new ASTExpressionList; *std::find(create.children.begin(), create.children.end(), create.columns) = new_columns;
ASTExpressionList & columns_list = typeid_cast<ASTExpressionList &>(*columns_list_ptr); else
create.children.push_back(new_columns);
for (NamesAndTypesList::const_iterator it = columns->begin(); it != columns->end(); ++it) create.columns = new_columns;
{
ASTPtr name_and_type_pair_ptr = new ASTNameTypePair;
ASTNameTypePair & name_and_type_pair = typeid_cast<ASTNameTypePair &>(*name_and_type_pair_ptr);
name_and_type_pair.name = it->name;
StringPtr type_name = new String(it->type->getName());
ParserIdentifierWithOptionalParameters storage_p;
Expected expected = "";
const char * pos = type_name->data();
const char * end = pos + type_name->size();
if (!storage_p.parse(pos, end, name_and_type_pair.type, expected))
throw Exception("Cannot parse data type.", ErrorCodes::SYNTAX_ERROR);
name_and_type_pair.type->query_string = type_name;
columns_list.children.push_back(name_and_type_pair_ptr);
}
create.columns = columns_list_ptr;
create.children.push_back(create.columns);
}
/// Выбор нужного движка таблицы /// Выбор нужного движка таблицы
if (create.storage) if (create.storage)
@ -238,7 +210,6 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists)
if (create.is_temporary) if (create.is_temporary)
{ {
// res->is_dropped = true;
context.getSessionContext().addExternalTable(table_name, res); context.getSessionContext().addExternalTable(table_name, res);
} }
else else
@ -255,4 +226,47 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists)
return res; return res;
} }
NamesAndTypesList InterpreterCreateQuery::parseColumns(ASTPtr expression_list, const DataTypeFactory & data_type_factory)
{
NamesAndTypesList columns;
ASTExpressionList & columns_list = typeid_cast<ASTExpressionList &>(*expression_list);
for (const ASTPtr & ast : columns_list.children)
{
const ASTNameTypePair & name_and_type_pair = typeid_cast<const ASTNameTypePair &>(*ast);
StringRange type_range = name_and_type_pair.type->range;
columns.push_back(NameAndTypePair(
name_and_type_pair.name,
data_type_factory.get(String(type_range.first, type_range.second - type_range.first))));
}
columns = *DataTypeNested::expandNestedColumns(columns);
return columns;
}
ASTPtr InterpreterCreateQuery::formatColumns(const NamesAndTypesList & columns)
{
ASTPtr columns_list_ptr = new ASTExpressionList;
ASTExpressionList & columns_list = typeid_cast<ASTExpressionList &>(*columns_list_ptr);
for (const NameAndTypePair & it : columns)
{
ASTPtr name_and_type_pair_ptr = new ASTNameTypePair;
ASTNameTypePair & name_and_type_pair = typeid_cast<ASTNameTypePair &>(*name_and_type_pair_ptr);
name_and_type_pair.name = it.name;
StringPtr type_name = new String(it.type->getName());
ParserIdentifierWithOptionalParameters storage_p;
Expected expected = "";
const char * pos = type_name->data();
const char * end = pos + type_name->size();
if (!storage_p.parse(pos, end, name_and_type_pair.type, expected))
throw Exception("Cannot parse data type.", ErrorCodes::SYNTAX_ERROR);
name_and_type_pair.type->query_string = type_name;
columns_list.children.push_back(name_and_type_pair_ptr);
}
return columns_list_ptr;
}
} }

View File

@ -176,84 +176,4 @@ void ITableDeclaration::check(const Block & block, bool need_all) const
} }
} }
/// одинаковыми считаются имена, если они совпадают целиком или name_without_dot совпадает с частью имени до точки
static bool namesEqual(const String & name_without_dot, const DB::NameAndTypePair & name_type)
{
String name_with_dot = name_without_dot + ".";
return (name_with_dot == name_type.name.substr(0, name_without_dot.length() + 1) || name_without_dot == name_type.name);
}
void ITableDeclaration::alterColumns(const ASTAlterQuery::Parameters & params, NamesAndTypesListPtr & columns, const Context & context)
{
if (params.type == ASTAlterQuery::ADD)
{
NamesAndTypesList::iterator insert_it = columns->end();
if (params.column)
{
String column_name = typeid_cast<const ASTIdentifier &>(*params.column).name;
/// Пытаемся найти первую с конца колонку с именем column_name или с именем, начинающимся с column_name и ".".
/// Например "fruits.bananas"
NamesAndTypesList::reverse_iterator reverse_insert_it = std::find_if(columns->rbegin(), columns->rend(), boost::bind(namesEqual, column_name, _1) );
if (reverse_insert_it == columns->rend())
throw Exception("Wrong column name. Cannot find column " + column_name + " to insert after", DB::ErrorCodes::ILLEGAL_COLUMN);
else
{
/// base возвращает итератор уже смещенный на один элемент вправо
insert_it = reverse_insert_it.base();
}
}
const ASTNameTypePair & ast_name_type = typeid_cast<const ASTNameTypePair &>(*params.name_type);
StringRange type_range = ast_name_type.type->range;
String type_string = String(type_range.first, type_range.second - type_range.first);
DB::DataTypePtr data_type = context.getDataTypeFactory().get(type_string);
NameAndTypePair pair(ast_name_type.name, data_type );
columns->insert(insert_it, pair);
/// Медленно, так как каждый раз копируется список
columns = DataTypeNested::expandNestedColumns(*columns);
return;
}
else if (params.type == ASTAlterQuery::DROP)
{
String column_name = typeid_cast<const ASTIdentifier &>(*(params.column)).name;
/// Удаляем колонки из листа columns
bool is_first = true;
NamesAndTypesList::iterator column_it;
do
{
column_it = std::find_if(columns->begin(), columns->end(), boost::bind(namesEqual, column_name, _1));
if (column_it == columns->end())
{
if (is_first)
throw Exception("Wrong column name. Cannot find column " + column_name + " to drop", DB::ErrorCodes::ILLEGAL_COLUMN);
}
else
columns->erase(column_it);
is_first = false;
}
while (column_it != columns->end());
}
else if (params.type == ASTAlterQuery::MODIFY)
{
const ASTNameTypePair & ast_name_type = typeid_cast<const ASTNameTypePair &>(*params.name_type);
StringRange type_range = ast_name_type.type->range;
String type_string = String(type_range.first, type_range.second - type_range.first);
DB::DataTypePtr data_type = context.getDataTypeFactory().get(type_string);
NameAndTypePair pair(ast_name_type.name, data_type);
NamesAndTypesList::iterator column_it = std::find_if(columns->begin(), columns->end(), boost::bind(namesEqual, ast_name_type.name, _1) );
if (column_it == columns->end())
throw Exception("Wrong column name. Cannot find column " + ast_name_type.name + " to modify.", DB::ErrorCodes::ILLEGAL_COLUMN);
column_it->type = data_type;
}
else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
}
} }

View File

@ -316,272 +316,219 @@ void MergeTreeData::dropAllData()
Poco::File(full_path).remove(true); Poco::File(full_path).remove(true);
} }
void MergeTreeData::removeColumnFiles(String column_name, bool remove_array_size_files)
void MergeTreeData::checkAlter(const AlterCommands & params)
{ {
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex); /// Проверим, что указанные преобразования можно совершить над списком столбцов без учета типов.
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex); NamesAndTypesList new_columns = *columns;
params.apply(new_columns);
size_t dot_pos = column_name.find('.'); /// Список столбцов, которые нельзя трогать.
if (dot_pos != std::string::npos) /// sampling_expression можно не учитывать, потому что он обязан содержаться в первичном ключе.
Names keys = primary_expr->getRequiredColumns();
keys.push_back(sign_column);
std::sort(keys.begin(), keys.end());
for (const AlterCommand & command : params)
{ {
std::string nested_column = column_name.substr(0, dot_pos); if (std::binary_search(keys.begin(), keys.end(), command.column_name))
column_name = nested_column + "%2E" + column_name.substr(dot_pos + 1); throw Exception("trying to ALTER key column " + command.column_name, ErrorCodes::ILLEGAL_COLUMN);
if (remove_array_size_files)
column_name = std::string("(?:") + nested_column + "|" + column_name + ")";
} }
/// Регэксп выбирает файлы столбца для удаления /// Проверим, что преобразования типов возможны.
Poco::RegularExpression re(column_name + "(?:(?:\\.|\\%2E).+){0,1}" +"(?:\\.mrk|\\.bin|\\.size\\d+\\.bin|\\.size\\d+\\.mrk)"); ExpressionActionsPtr unused_expression;
/// Цикл по всем директориям кусочков NameToNameMap unused_map;
Poco::RegularExpression::MatchVec matches; createConvertExpression(nullptr, *columns, new_columns, unused_expression, unused_map);
Poco::DirectoryIterator end; }
for (Poco::DirectoryIterator it_dir = Poco::DirectoryIterator(full_path); it_dir != end; ++it_dir)
void MergeTreeData::createConvertExpression(DataPartPtr part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map)
{
out_expression = nullptr;
out_rename_map.clear();
typedef std::map<String, DataTypePtr> NameToType;
NameToType new_types;
for (const NameAndTypePair & column : new_columns)
{ {
std::string dir_name = it_dir.name(); new_types[column.name] = column.type;
}
if (!ActiveDataPartSet::isPartDirectory(dir_name, matches)) /// Сколько столбцов сейчас в каждой вложенной структуре. Столбцы не из вложенных структур сюда тоже попадут и не помешают.
continue; std::map<String, int> nested_table_counts;
for (const NameAndTypePair & column : old_columns)
{
++nested_table_counts[DataTypeNested::extractNestedTableName(column.name)];
}
/// Цикл по каждому из файлов в директории кусочков for (const NameAndTypePair & column : old_columns)
String full_dir_name = full_path + dir_name + "/"; {
for (Poco::DirectoryIterator it_file(full_dir_name); it_file != end; ++it_file) if (!new_types.count(column.name))
{ {
if (re.match(it_file.name())) if (!part || part->hasColumnFiles(column.name))
{ {
Poco::File file(full_dir_name + it_file.name()); /// Столбец нужно удалить.
if (file.exists())
file.remove(); String escaped_column = escapeForFileName(column.name);
out_rename_map[escaped_column + ".bin"] = "";
out_rename_map[escaped_column + ".mrk"] = "";
/// Если это массив или последний столбец вложенной структуры, нужно удалить файлы с размерами.
if (typeid_cast<const DataTypeArray *>(&*column.type))
{
String nested_table = DataTypeNested::extractNestedTableName(column.name);
/// Если это был последний столбец, относящийся к этим файлам .size0, удалим файлы.
if (!--nested_table_counts[nested_table])
{
String escaped_nested_table = escapeForFileName(nested_table);
out_rename_map[escaped_nested_table + ".size0.bin"] = "";
out_rename_map[escaped_nested_table + ".size0.mrk"] = "";
}
}
} }
} }
else
/// Удаляем лишние столбцы из checksums.txt
for (auto & part : all_data_parts)
{ {
if (!part) String new_type_name = new_types[column.name]->getName();
continue;
for (auto it = part->checksums.files.lower_bound(column_name); if (new_type_name != column.type->getName() &&
(it != part->checksums.files.end()) && (it->first.substr(0, column_name.size()) == column_name);) (!part || part->hasColumnFiles(column.name)))
{ {
if (re.match(it->first)) /// Нужно изменить тип столбца.
it = const_cast<DataPart::Checksums::FileChecksums &>(part->checksums.files).erase(it);
else if (!out_expression)
++it; out_expression = new ExpressionActions(NamesAndTypesList(), context.getSettingsRef());
out_expression->addInput(ColumnWithNameAndType(nullptr, column.type, column.name));
FunctionPtr function = context.getFunctionFactory().get("to" + new_type_name, context);
Names out_names;
out_expression->add(ExpressionAction::applyFunction(function, Names(1, column.name)), out_names);
out_expression->add(ExpressionAction::removeColumn(column.name));
String escaped_expr = escapeForFileName(out_names[0]);
String escaped_column = escapeForFileName(column.name);
out_rename_map[escaped_expr + ".bin"] = escaped_column + ".bin";
out_rename_map[escaped_expr + ".mrk"] = escaped_column + ".mrk";
} }
/// Записываем файл с чексуммами.
WriteBufferFromFile out(full_path + part->name + "/" + "checksums.txt", 1024);
part->checksums.writeText(out);
} }
} }
} }
void MergeTreeData::createConvertExpression( MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(DataPartPtr part, const NamesAndTypesList & new_columns)
const String & in_column_name,
const String & out_type,
ExpressionActionsPtr & out_expression,
String & out_column)
{ {
Names out_names; ExpressionActionsPtr expression;
out_expression = new ExpressionActions( AlterDataPartTransactionPtr transaction(new AlterDataPartTransaction(part));
NamesAndTypesList(1, NameAndTypePair(in_column_name, getDataTypeByName(in_column_name))), context.getSettingsRef()); createConvertExpression(part, *columns, new_columns, expression, transaction->rename_map); // TODO: part->columns
FunctionPtr function = context.getFunctionFactory().get("to" + out_type, context); if (transaction->rename_map.empty())
out_expression->add(ExpressionAction::applyFunction(function, Names(1, in_column_name)), out_names);
out_expression->add(ExpressionAction::removeColumn(in_column_name));
out_column = out_names[0];
}
static DataTypePtr getDataTypeByName(const String & name, const NamesAndTypesList & columns)
{
for (const auto & it : columns)
{ {
if (it.name == name) transaction->data_part = nullptr;
return it.type; return transaction;
}
throw Exception("No column " + name + " in table", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
}
/// одинаковыми считаются имена, вида "name.*"
static bool namesWithDotEqual(const String & name_with_dot, const NameAndTypePair & name_type)
{
return (name_with_dot == name_type.name.substr(0, name_with_dot.length()));
}
void MergeTreeData::alter(const ASTAlterQuery::Parameters & params)
{
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
alterColumns(params, columns, context);
} }
if (params.type == ASTAlterQuery::DROP) DataPart::Checksums add_checksums;
{
String column_name = typeid_cast<const ASTIdentifier &>(*params.column).name;
/// Если нет колонок вида nested_name.*, то удалим столбцы размера массивов /// Применим выражение и запишем результат во временные файлы.
bool remove_array_size_files = false; if (expression)
size_t dot_pos = column_name.find('.');
if (dot_pos != std::string::npos)
{
remove_array_size_files = (columns->end() == std::find_if(columns->begin(), columns->end(), boost::bind(namesWithDotEqual, column_name.substr(0, dot_pos), _1)));
}
removeColumnFiles(column_name, remove_array_size_files);
context.resetCaches();
}
}
void MergeTreeData::prepareAlterModify(const ASTAlterQuery::Parameters & params)
{
DataPartsVector parts;
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
parts = DataPartsVector(data_parts.begin(), data_parts.end());
}
Names column_name;
const ASTNameTypePair & name_type = typeid_cast<const ASTNameTypePair &>(*params.name_type);
StringRange type_range = name_type.type->range;
String type(type_range.first, type_range.second - type_range.first);
DataTypePtr old_type_ptr = DB::getDataTypeByName(name_type.name, *columns);
DataTypePtr new_type_ptr = context.getDataTypeFactory().get(type);
if (typeid_cast<DataTypeNested *>(old_type_ptr.get()) || typeid_cast<DataTypeArray *>(old_type_ptr.get()) ||
typeid_cast<DataTypeNested *>(new_type_ptr.get()) || typeid_cast<DataTypeArray *>(new_type_ptr.get()))
throw Exception("ALTER MODIFY not supported for nested and array types");
column_name.push_back(name_type.name);
ExpressionActionsPtr expr;
String out_column;
createConvertExpression(name_type.name, type, expr, out_column);
ColumnNumbers num(1, 0);
for (DataPartPtr & part : parts)
{ {
MarkRanges ranges(1, MarkRange(0, part->size)); MarkRanges ranges(1, MarkRange(0, part->size));
ExpressionBlockInputStream in(new MergeTreeBlockInputStream(full_path + part->name + '/', BlockInputStreamPtr part_in = new MergeTreeBlockInputStream(full_path + part->name + '/',
DEFAULT_MERGE_BLOCK_SIZE, column_name, *this, part, ranges, false, nullptr, ""), expr); DEFAULT_MERGE_BLOCK_SIZE, expression->getRequiredColumns(), *this, part, ranges, false, nullptr, "");
ExpressionBlockInputStream in(part_in, expression);
MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true); MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true);
in.readPrefix(); in.readPrefix();
out.writePrefix(); out.writePrefix();
try while (Block b = in.read())
out.write(b);
in.readSuffix();
add_checksums = out.writeSuffixAndGetChecksums();
}
/// Обновим контрольные суммы.
DataPart::Checksums new_checksums = part->checksums;
for (auto it : transaction->rename_map)
{
if (it.second == "")
{ {
while (Block b = in.read()) new_checksums.files.erase(it.first);
out.write(b); }
else
{
new_checksums.files[it.second] = add_checksums.files[it.first];
}
}
in.readSuffix(); /// Запишем обновленные контрольные суммы во временный файл
DataPart::Checksums add_checksums = out.writeSuffixAndGetChecksums(); if (!part->checksums.empty())
{
WriteBufferFromFile checksums_file(full_path + part->name + "/checksums.txt.tmp", 4096);
new_checksums.writeText(checksums_file);
transaction->rename_map["checksums.txt.tmp"] = "checksums.txt";
}
/// Запишем обновленные контрольные суммы во временный файл. return transaction;
if (!part->checksums.empty()) }
void MergeTreeData::AlterDataPartTransaction::commit()
{
if (!data_part)
return;
try
{
String path = data_part->storage.full_path + data_part->name + "/";
for (auto it : rename_map)
{
if (it.second.empty())
{ {
DataPart::Checksums new_checksums = part->checksums; Poco::File(path + it.first).renameTo(path + it.first + ".removing");
std::string escaped_name = escapeForFileName(name_type.name); Poco::File(path + it.first + ".removing").remove();
std::string escaped_out_column = escapeForFileName(out_column); }
new_checksums.files[escaped_name + ".bin"] = add_checksums.files[escaped_out_column + ".bin"]; else
new_checksums.files[escaped_name + ".mrk"] = add_checksums.files[escaped_out_column + ".mrk"]; {
Poco::File(path + it.first).renameTo(path + it.second);
WriteBufferFromFile checksums_file(full_path + part->name + '/' + escaped_out_column + ".checksums.txt", 1024);
new_checksums.writeText(checksums_file);
} }
} }
catch (const Exception & e) data_part = nullptr;
{ }
if (e.code() != ErrorCodes::ALL_REQUESTED_COLUMNS_ARE_MISSING) catch (...)
throw; {
} /// Если что-то пошло не так, не будем удалять временные файлы в деструкторе.
data_part = nullptr;
throw;
} }
} }
void MergeTreeData::commitAlterModify(const ASTAlterQuery::Parameters & params) MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction()
{ {
DataPartsVector parts; try
{ {
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex); if (!data_part)
parts = DataPartsVector(data_parts.begin(), data_parts.end()); return;
}
const ASTNameTypePair & name_type = typeid_cast<const ASTNameTypePair &>(*params.name_type); LOG_WARNING(data_part->storage.log, "Aborting ALTER of part " << data_part->name);
StringRange type_range = name_type.type->range;
String type(type_range.first, type_range.second - type_range.first);
ExpressionActionsPtr expr; String path = data_part->storage.full_path + data_part->name + "/";
String out_column; for (auto it : rename_map)
createConvertExpression(name_type.name, type, expr, out_column);
/// переименовываем файлы
/// переименовываем старые столбцы, добавляя расширение .old
for (DataPartPtr & part : parts)
{
std::string part_path = full_path + part->name + '/';
std::string path = part_path + escapeForFileName(name_type.name);
if (Poco::File(path + ".bin").exists())
{ {
LOG_TRACE(log, "Renaming " << path + ".bin" << " to " << path + ".bin" + ".old"); if (!it.second.empty())
Poco::File(path + ".bin").renameTo(path + ".bin" + ".old");
LOG_TRACE(log, "Renaming " << path + ".mrk" << " to " << path + ".mrk" + ".old");
Poco::File(path + ".mrk").renameTo(path + ".mrk" + ".old");
if (Poco::File(part_path + "checksums.txt").exists())
{ {
LOG_TRACE(log, "Renaming " << part_path + "checksums.txt" << " to " << part_path + "checksums.txt" + ".old"); try
Poco::File(part_path + "checksums.txt").renameTo(part_path + "checksums.txt" + ".old"); {
Poco::File(path + it.first).remove();
}
catch (Poco::Exception & e)
{
LOG_WARNING(data_part->storage.log, "Can't remove " << path + it.first << ": " << e.displayText());
}
} }
} }
} }
catch (...)
/// переименовываем временные столбцы
for (DataPartPtr & part : parts)
{ {
std::string part_path = full_path + part->name + '/'; tryLogCurrentException(__PRETTY_FUNCTION__);
std::string name = escapeForFileName(out_column);
std::string new_name = escapeForFileName(name_type.name);
std::string path = part_path + name;
std::string new_path = part_path + new_name;
if (Poco::File(path + ".bin").exists())
{
LOG_TRACE(log, "Renaming " << path + ".bin" << " to " << new_path + ".bin");
Poco::File(path + ".bin").renameTo(new_path + ".bin");
LOG_TRACE(log, "Renaming " << path + ".mrk" << " to " << new_path + ".mrk");
Poco::File(path + ".mrk").renameTo(new_path + ".mrk");
if (Poco::File(path + ".checksums.txt").exists())
{
LOG_TRACE(log, "Renaming " << path + ".checksums.txt" << " to " << part_path + ".checksums.txt");
Poco::File(path + ".checksums.txt").renameTo(part_path + "checksums.txt");
}
}
}
// удаляем старые столбцы
for (DataPartPtr & part : parts)
{
std::string part_path = full_path + part->name + '/';
std::string path = part_path + escapeForFileName(name_type.name);
if (Poco::File(path + ".bin" + ".old").exists())
{
LOG_TRACE(log, "Removing old column " << path + ".bin" + ".old");
Poco::File(path + ".bin" + ".old").remove();
LOG_TRACE(log, "Removing old column " << path + ".mrk" + ".old");
Poco::File(path + ".mrk" + ".old").remove();
if (Poco::File(part_path + "checksums.txt" + ".old").exists())
{
LOG_TRACE(log, "Removing old checksums " << part_path + "checksums.txt" + ".old");
Poco::File(part_path + "checksums.txt" + ".old").remove();
}
}
}
context.resetCaches();
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
alterColumns(params, columns, context);
} }
} }

View File

@ -9,6 +9,7 @@
#include <DB/Client/ConnectionPool.h> #include <DB/Client/ConnectionPool.h>
#include <DB/Interpreters/InterpreterSelectQuery.h> #include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Interpreters/InterpreterAlterQuery.h>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <DB/Core/Field.h> #include <DB/Core/Field.h>
@ -132,9 +133,11 @@ BlockInputStreams StorageDistributed::read(
return res; return res;
} }
void StorageDistributed::alter(const ASTAlterQuery::Parameters &params) void StorageDistributed::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)
{ {
alterColumns(params, columns, context); auto lock = lockStructureForAlter();
params.apply(*columns);
InterpreterAlterQuery::updateMetadata(database_name, table_name, *columns, context);
} }
} }

View File

@ -75,8 +75,6 @@ StoragePtr StorageFactory::get(
NamesAndTypesListPtr columns, NamesAndTypesListPtr columns,
bool attach) const bool attach) const
{ {
columns = DataTypeNested::expandNestedColumns(*columns);
if (name == "Log") if (name == "Log")
{ {
return StorageLog::create(data_path, table_name, columns, context.getSettings().max_compress_block_size); return StorageLog::create(data_path, table_name, columns, context.getSettings().max_compress_block_size);

View File

@ -1,6 +1,7 @@
#include <DB/DataStreams/narrowBlockInputStreams.h> #include <DB/DataStreams/narrowBlockInputStreams.h>
#include <DB/Storages/StorageMerge.h> #include <DB/Storages/StorageMerge.h>
#include <DB/Common/VirtualColumnUtils.h> #include <DB/Common/VirtualColumnUtils.h>
#include <DB/Interpreters/InterpreterAlterQuery.h>
namespace DB namespace DB
{ {
@ -166,9 +167,12 @@ void StorageMerge::getSelectedTables(StorageVector & selected_tables) const
} }
void StorageMerge::alter(const ASTAlterQuery::Parameters & params) void StorageMerge::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)
{ {
alterColumns(params, columns, context); auto lock = lockStructureForAlter();
} params.apply(*columns);
InterpreterAlterQuery::updateMetadata(database_name, table_name, *columns, context);
}
} }

View File

@ -2,6 +2,7 @@
#include <DB/Storages/MergeTree/MergeTreeBlockOutputStream.h> #include <DB/Storages/MergeTree/MergeTreeBlockOutputStream.h>
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h> #include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
#include <DB/Common/escapeForFileName.h> #include <DB/Common/escapeForFileName.h>
#include <DB/Interpreters/InterpreterAlterQuery.h>
namespace DB namespace DB
{ {
@ -103,19 +104,33 @@ void StorageMergeTree::rename(const String & new_path_to_db, const String & new_
/// TODO: Можно обновить названия логгеров у this, data, reader, writer, merger. /// TODO: Можно обновить названия логгеров у this, data, reader, writer, merger.
} }
void StorageMergeTree::alter(const ASTAlterQuery::Parameters & params) void StorageMergeTree::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)
{ {
data.alter(params); /// NOTE: Здесь так же как в ReplicatedMergeTree можно сделать ALTER, не блокирующий запись данных надолго.
}
void StorageMergeTree::prepareAlterModify(const ASTAlterQuery::Parameters & params) auto table_soft_lock = lockDataForAlter();
{
data.prepareAlterModify(params);
}
void StorageMergeTree::commitAlterModify(const ASTAlterQuery::Parameters & params) data.checkAlter(params);
{
data.commitAlterModify(params); NamesAndTypesList new_columns = data.getColumnsList();
params.apply(new_columns);
MergeTreeData::DataParts parts = data.getDataParts();
std::vector<MergeTreeData::AlterDataPartTransactionPtr> transactions;
for (MergeTreeData::DataPartPtr part : parts)
{
transactions.push_back(data.alterDataPart(part, new_columns));
}
auto table_hard_lock = lockStructureForAlter();
InterpreterAlterQuery::updateMetadata(database_name, table_name, new_columns, context);
data.setColumnsList(new_columns);
for (auto & transaction : transactions)
{
transaction->commit();
}
} }
bool StorageMergeTree::merge(bool aggressive, BackgroundProcessingPool::Context * pool_context) bool StorageMergeTree::merge(bool aggressive, BackgroundProcessingPool::Context * pool_context)

View File

@ -107,7 +107,7 @@ private:
} }
if (zookeeper.exists(path + "/" + *(it - 1), nullptr, event)) if (zookeeper.exists(path + "/" + *(it - 1), nullptr, event))
event->tryWait(60 * 1000); event->wait();
success = true; success = true;
} }