ClickHouse/dbms/include/DB/Storages/AlterCommands.h

121 lines
4.4 KiB
C
Raw Normal View History

2014-07-11 08:10:45 +00:00
#pragma once
#include <DB/Core/NamesAndTypes.h>
#include <DB/DataTypes/DataTypeNested.h>
2014-07-14 09:12:50 +00:00
#include <DB/DataTypes/DataTypeArray.h>
2014-07-11 08:10:45 +00:00
namespace DB
{
2014-09-12 00:32:27 +00:00
/// Операция из запроса ALTER (кроме манипуляции с PART/PARTITION). Добавление столбцов типа Nested не развернуто в добавление отдельных столбцов.
2014-07-11 08:10:45 +00:00
struct AlterCommand
{
enum Type
{
ADD,
DROP,
MODIFY
};
Type type;
String column_name;
/// Для ADD и MODIFY - новый тип столбца.
DataTypePtr data_type;
/// Для ADD - после какого столбца добавить новый. Если пустая строка, добавить в конец. Добавить в начало сейчас нельзя.
String after_column;
/// одинаковыми считаются имена, если они совпадают целиком или name_without_dot совпадает с частью имени до точки
static bool namesEqual(const String & name_without_dot, const DB::NameAndTypePair & name_type)
{
String name_with_dot = name_without_dot + ".";
return (name_with_dot == name_type.name.substr(0, name_without_dot.length() + 1) || name_without_dot == name_type.name);
}
void apply(NamesAndTypesList & columns) const
2014-07-11 08:10:45 +00:00
{
if (type == ADD)
{
2014-07-14 09:12:50 +00:00
if (std::count_if(columns.begin(), columns.end(), std::bind(namesEqual, column_name, std::placeholders::_1)))
throw Exception("Cannot add column " + column_name + ": column with this name already exisits.",
DB::ErrorCodes::ILLEGAL_COLUMN);
if (DataTypeNested::extractNestedTableName(column_name) != column_name &&
!typeid_cast<const DataTypeArray *>(&*data_type))
throw Exception("Can't add nested column " + column_name + " of non-array type " + data_type->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
2014-07-11 08:10:45 +00:00
NamesAndTypesList::iterator insert_it = columns.end();
if (!after_column.empty())
{
/// Пытаемся найти первую с конца колонку с именем column_name или с именем, начинающимся с column_name и ".".
/// Например "fruits.bananas"
/// одинаковыми считаются имена, если они совпадают целиком или name_without_dot совпадает с частью имени до точки
NamesAndTypesList::reverse_iterator reverse_insert_it = std::find_if(columns.rbegin(), columns.rend(),
std::bind(namesEqual, after_column, std::placeholders::_1));
2014-07-11 08:10:45 +00:00
if (reverse_insert_it == columns.rend())
throw Exception("Wrong column name. Cannot find column " + column_name + " to insert after",
DB::ErrorCodes::ILLEGAL_COLUMN);
else
{
/// base возвращает итератор, уже смещенный на один элемент вправо
insert_it = reverse_insert_it.base();
}
}
columns.insert(insert_it, NameAndTypePair(column_name, data_type));
/// Медленно, так как каждый раз копируется список
columns = *DataTypeNested::expandNestedColumns(columns);
}
else if (type == DROP)
{
bool is_first = true;
NamesAndTypesList::iterator column_it;
do
{
column_it = std::find_if(columns.begin(), columns.end(), std::bind(namesEqual, column_name, std::placeholders::_1));
2014-07-11 08:10:45 +00:00
if (column_it == columns.end())
{
if (is_first)
throw Exception("Wrong column name. Cannot find column " + column_name + " to drop",
DB::ErrorCodes::ILLEGAL_COLUMN);
}
else
columns.erase(column_it);
is_first = false;
}
while (column_it != columns.end());
}
else if (type == MODIFY)
{
NamesAndTypesList::iterator column_it = std::find_if(columns.begin(), columns.end(),
std::bind(namesEqual, column_name, std::placeholders::_1) );
if (column_it == columns.end())
2014-07-11 08:10:45 +00:00
throw Exception("Wrong column name. Cannot find column " + column_name + " to modify.",
DB::ErrorCodes::ILLEGAL_COLUMN);
column_it->type = data_type;
}
else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
}
};
class AlterCommands : public std::vector<AlterCommand>
{
public:
void apply(NamesAndTypesList & columns) const
2014-07-11 08:10:45 +00:00
{
2014-07-11 13:34:12 +00:00
NamesAndTypesList new_columns = columns;
2014-07-11 08:10:45 +00:00
for (const AlterCommand & command : *this)
command.apply(new_columns);
columns = new_columns;
}
};
}