mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge
This commit is contained in:
parent
baa434c7c3
commit
1bf0490191
@ -32,7 +32,7 @@ class PODArray : private boost::noncopyable, private std::allocator<char> /// em
|
||||
{
|
||||
private:
|
||||
typedef std::allocator<char> Allocator;
|
||||
static const size_t initial_size = 4096;
|
||||
static const size_t initial_size;
|
||||
|
||||
char * c_start;
|
||||
char * c_end;
|
||||
@ -264,5 +264,9 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
const size_t PODArray<T>::initial_size = 4096;
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
@ -188,8 +188,11 @@ public:
|
||||
protected:
|
||||
IStorage() : drop_on_destroy(false) {}
|
||||
|
||||
/// реализация alter, модифицирующая список столбцов.
|
||||
void alter_columns(const ASTAlterQuery::Parameters & params, NamesAndTypesListPtr & columns, const Context & context) const;
|
||||
private:
|
||||
boost::weak_ptr<StoragePtr::Wrapper> this_ptr;
|
||||
};
|
||||
|
||||
typedef std::vector<StoragePtr> StorageVector;
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <DB/Client/ConnectionPool.h>
|
||||
#include <DB/Client/ConnectionPoolWithFailover.h>
|
||||
#include <DB/Interpreters/Settings.h>
|
||||
#include <DB/Interpreters/Context.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -42,6 +43,7 @@ public:
|
||||
const String & remote_table_, /// Имя таблицы на удалённых серверах.
|
||||
const DataTypeFactory & data_type_factory_,
|
||||
const Settings & settings,
|
||||
const Context & context_,
|
||||
const String & sign_column_name_ = "");
|
||||
|
||||
/// Использовать реплики для отказоустойчивости.
|
||||
@ -53,6 +55,7 @@ public:
|
||||
const String & remote_table_, /// Имя таблицы на удалённых серверах.
|
||||
const DataTypeFactory & data_type_factory_,
|
||||
const Settings & settings,
|
||||
const Context & context_,
|
||||
const String & sign_column_name_ = "");
|
||||
|
||||
std::string getName() const { return "Distributed"; }
|
||||
@ -75,6 +78,9 @@ public:
|
||||
|
||||
void dropImpl() {}
|
||||
void rename(const String & new_path_to_db, const String & new_name) { name = new_name; }
|
||||
/// в подтаблицах добавлять и удалять столбы нужно вручную
|
||||
/// структура подтаблиц не проверяется
|
||||
void alter(const ASTAlterQuery::Parameters ¶ms);
|
||||
|
||||
private:
|
||||
StorageDistributed(
|
||||
@ -85,6 +91,7 @@ private:
|
||||
const String & remote_table_,
|
||||
const DataTypeFactory & data_type_factory_,
|
||||
const Settings & settings,
|
||||
const Context & context_,
|
||||
const String & sign_column_name_ = "");
|
||||
|
||||
/// Использовать реплики для отказоустойчивости.
|
||||
@ -96,6 +103,7 @@ private:
|
||||
const String & remote_table_,
|
||||
const DataTypeFactory & data_type_factory_,
|
||||
const Settings & settings,
|
||||
const Context & context_,
|
||||
const String & sign_column_name_ = "");
|
||||
|
||||
String name;
|
||||
@ -105,6 +113,7 @@ private:
|
||||
const DataTypeFactory & data_type_factory;
|
||||
String sign_column_name;
|
||||
|
||||
const Context & context;
|
||||
/// Соединения с удалёнными серверами.
|
||||
ConnectionPools pools;
|
||||
};
|
||||
|
@ -9,6 +9,9 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class StorageMerge;
|
||||
typedef Poco::SharedPtr<StorageMerge> StorageMergePtr;
|
||||
|
||||
/** Таблица, представляющая собой объединение произвольного количества других таблиц.
|
||||
* У всех таблиц должна быть одинаковая структура.
|
||||
*/
|
||||
@ -38,7 +41,12 @@ public:
|
||||
|
||||
void dropImpl() {}
|
||||
void rename(const String & new_path_to_db, const String & new_name) { name = new_name; }
|
||||
|
||||
void getSelectedTables(StorageVector & selected_tables);
|
||||
|
||||
/// в подтаблицах добавлять и удалять столбы нужно вручную
|
||||
/// структура подтаблиц не проверяется
|
||||
void alter(const ASTAlterQuery::Parameters & params);
|
||||
private:
|
||||
String name;
|
||||
NamesAndTypesListPtr columns;
|
||||
|
@ -335,8 +335,7 @@ private:
|
||||
|
||||
Poco::SharedPtr<boost::threadpool::pool> merge_threads;
|
||||
|
||||
/// берет нерекурсивные блокировки data_parts_mutex и all_data_parts_mutex
|
||||
void removeColumn(String column_name);
|
||||
void removeColumnFiles(String column_name);
|
||||
|
||||
/// Возвращает true если имя директории совпадает с форматом имени директории кусочков
|
||||
bool isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec & matches) const;
|
||||
|
@ -9,14 +9,15 @@
|
||||
#include <DB/IO/copyData.h>
|
||||
#include <DB/Common/escapeForFileName.h>
|
||||
#include <DB/Parsers/formatAST.h>
|
||||
#include <DB/Storages/StorageMerge.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <boost/bind.hpp>
|
||||
#include <boost/bind/placeholders.hpp>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
using namespace DB;
|
||||
|
||||
InterpreterAlterQuery::InterpreterAlterQuery(ASTPtr query_ptr_, Context & context_)
|
||||
: query_ptr(query_ptr_), context(context_)
|
||||
{
|
||||
@ -32,16 +33,15 @@ void InterpreterAlterQuery::execute()
|
||||
{
|
||||
/// Poco::Mutex является рекурсивным, т.е. взятие мьютекса дважды из одного потока не приводит к блокировке
|
||||
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
|
||||
|
||||
const DataTypeFactory & data_type_factory = context.getDataTypeFactory();
|
||||
ASTAlterQuery & alter = dynamic_cast<ASTAlterQuery &>(*query_ptr);
|
||||
|
||||
String database_name = alter.database.empty() ? context.getCurrentDatabase() : alter.database;
|
||||
String & table_name = alter.table;
|
||||
String database_name_escaped = escapeForFileName(database_name);
|
||||
String table_name_escaped = escapeForFileName(table_name);
|
||||
|
||||
StoragePtr table = context.getTable(database_name, table_name);
|
||||
String path = context.getPath();
|
||||
|
||||
String database_name = alter.database.empty() ? context.getCurrentDatabase() : alter.database;
|
||||
String database_name_escaped = escapeForFileName(database_name);
|
||||
String & table_name = alter.table;
|
||||
String table_name_escaped = escapeForFileName(table_name);
|
||||
StoragePtr table = context.getTable(database_name, table_name);
|
||||
String metadata_path = path + "metadata/" + database_name_escaped + "/" + table_name_escaped + ".sql";
|
||||
|
||||
ASTPtr attach_ptr = context.getCreateQuery(database_name, table_name);
|
||||
@ -49,12 +49,7 @@ void InterpreterAlterQuery::execute()
|
||||
attach.attach = true;
|
||||
ASTs & columns = dynamic_cast<ASTExpressionList &>(*attach.columns).children;
|
||||
|
||||
const ASTFunction & storage = dynamic_cast<const ASTFunction &>(*attach.storage);
|
||||
|
||||
Poco::RegularExpression::MatchVec matches;
|
||||
|
||||
const DataTypeFactory & data_type_factory = context.getDataTypeFactory();
|
||||
|
||||
/// Различные проверки, на возможность выполнения запроса
|
||||
IdentifierNameSet identifier_names;
|
||||
attach.storage->collectIdentifierNames(identifier_names);
|
||||
for (ASTAlterQuery::ParameterContainer::const_iterator alter_it = alter.parameters.begin();
|
||||
@ -75,21 +70,14 @@ void InterpreterAlterQuery::execute()
|
||||
throw DB::Exception("Wrong column name. Column already exists", DB::ErrorCodes::ILLEGAL_COLUMN);
|
||||
|
||||
/// Проверяем опциональный аргумент AFTER
|
||||
ASTs::iterator insert_it = columns.end();
|
||||
if (params.column)
|
||||
{
|
||||
const ASTIdentifier & col_after = dynamic_cast<const ASTIdentifier &>(*params.column);
|
||||
ASTs::iterator insert_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, col_after.name, _1)) ;
|
||||
insert_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, col_after.name, _1)) ;
|
||||
if (insert_it == columns.end())
|
||||
throw DB::Exception("Wrong column name. Cannot find column to insert after", DB::ErrorCodes::ILLEGAL_COLUMN);
|
||||
else
|
||||
{
|
||||
/// increase iterator because we want to insert after found element not before
|
||||
++insert_it;
|
||||
columns.insert(insert_it, params.name_type);
|
||||
}
|
||||
}
|
||||
else
|
||||
columns.push_back(params.name_type);
|
||||
}
|
||||
else if (params.type == ASTAlterQuery::DROP)
|
||||
{
|
||||
@ -102,16 +90,37 @@ void InterpreterAlterQuery::execute()
|
||||
ASTs::iterator drop_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, drop_column.name, _1));
|
||||
if (drop_it == columns.end())
|
||||
throw DB::Exception("Wrong column name. Cannot find column to drop", DB::ErrorCodes::ILLEGAL_COLUMN);
|
||||
else
|
||||
columns.erase(drop_it);
|
||||
}
|
||||
}
|
||||
|
||||
/// todo cycle over sub tables and tables
|
||||
/// Применяем изменения
|
||||
for (ASTAlterQuery::ParameterContainer::const_iterator alter_it = alter.parameters.begin();
|
||||
alter_it != alter.parameters.end(); ++alter_it)
|
||||
{
|
||||
const ASTAlterQuery::Parameters & params = *alter_it;
|
||||
table->alter(params);
|
||||
|
||||
if (params.type == ASTAlterQuery::ADD)
|
||||
{
|
||||
ASTs::iterator insert_it = columns.end();
|
||||
if (params.column)
|
||||
{
|
||||
const ASTIdentifier & col_after = dynamic_cast<const ASTIdentifier &>(*params.column);
|
||||
insert_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, col_after.name, _1)) ;
|
||||
++insert_it; /// increase iterator because we want to insert after found element not before
|
||||
}
|
||||
columns.insert(insert_it, params.name_type);
|
||||
}
|
||||
else if (params.type == ASTAlterQuery::DROP)
|
||||
{
|
||||
const ASTIdentifier & drop_column = dynamic_cast <const ASTIdentifier &>(*params.column);
|
||||
ASTs::iterator drop_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, drop_column.name, _1));
|
||||
columns.erase(drop_it);
|
||||
}
|
||||
|
||||
/// Перезаписываем файл метадата каждую итерацию
|
||||
Poco::FileOutputStream ostr(metadata_path);
|
||||
formatAST(attach, ostr, 0, false);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,9 +1,15 @@
|
||||
#include <set>
|
||||
#include <boost/bind.hpp>
|
||||
|
||||
#include <sparsehash/dense_hash_set>
|
||||
#include <sparsehash/dense_hash_map>
|
||||
|
||||
#include <DB/Columns/ColumnNested.h>
|
||||
#include <DB/DataTypes/DataTypeNested.h>
|
||||
#include <DB/Storages/IStorage.h>
|
||||
#include <DB/Parsers/ASTIdentifier.h>
|
||||
#include <DB/Parsers/ASTNameTypePair.h>
|
||||
#include <DB/Interpreters/Context.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -138,4 +144,78 @@ void IStorage::check(const Block & block, bool need_all) const
|
||||
}
|
||||
|
||||
|
||||
/// одинаковыми считаются имена, если они совпадают целиком или nameWithoutDot совпадает с частью имени до точки
|
||||
bool namesEqual(const String & nameWithoutDot, const DB::NameAndTypePair & name_type)
|
||||
{
|
||||
String nameWithDot = nameWithoutDot + ".";
|
||||
return (nameWithDot == name_type.first.substr(0, nameWithoutDot.length() + 1) || nameWithoutDot == name_type.first);
|
||||
}
|
||||
|
||||
void IStorage::alter_columns(const ASTAlterQuery::Parameters & params, NamesAndTypesListPtr & columns, const Context & context) const
|
||||
{
|
||||
if (params.type == ASTAlterQuery::ADD)
|
||||
{
|
||||
/// TODO: нужны ли блокировки
|
||||
//Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
|
||||
//Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
|
||||
|
||||
NamesAndTypesList::iterator insert_it = columns->end();
|
||||
if (params.column)
|
||||
{
|
||||
String column_name = dynamic_cast<const ASTIdentifier &>(*params.column).name;
|
||||
|
||||
/// Пытаемся найти первую с конца колонку с именем column_name или column_name.*
|
||||
NamesAndTypesList::reverse_iterator reverse_insert_it = std::find_if(columns->rbegin(), columns->rend(), boost::bind(namesEqual, column_name, _1) );
|
||||
|
||||
if (reverse_insert_it == columns->rend())
|
||||
throw DB::Exception("Wrong column name. Cannot find column to insert after", DB::ErrorCodes::ILLEGAL_COLUMN);
|
||||
else
|
||||
{
|
||||
/// base возвращает итератор уже смещенный на один элемент вправо
|
||||
insert_it = reverse_insert_it.base();
|
||||
}
|
||||
}
|
||||
|
||||
const ASTNameTypePair & ast_name_type = dynamic_cast<const ASTNameTypePair &>(*params.name_type);
|
||||
StringRange type_range = ast_name_type.type->range;
|
||||
String type_string = String(type_range.first, type_range.second - type_range.first);
|
||||
|
||||
DB::DataTypePtr data_type = context.getDataTypeFactory().get(type_string);
|
||||
NameAndTypePair pair(ast_name_type.name, data_type );
|
||||
columns->insert(insert_it, pair);
|
||||
|
||||
/// Медленно, так как каждый раз копируется список
|
||||
columns = DataTypeNested::expandNestedColumns(*columns);
|
||||
return;
|
||||
}
|
||||
else if (params.type == ASTAlterQuery::DROP)
|
||||
{
|
||||
String column_name = dynamic_cast<const ASTIdentifier &>(*(params.column)).name;
|
||||
|
||||
/// TODO: нужны ли блокировки
|
||||
///Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
|
||||
///Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
|
||||
|
||||
/// Удаляем колонки из листа columns
|
||||
bool is_first = true;
|
||||
NamesAndTypesList::iterator column_it;
|
||||
do
|
||||
{
|
||||
column_it = std::find_if(columns->begin(), columns->end(), boost::bind(namesEqual, column_name, _1));
|
||||
|
||||
if (column_it == columns->end())
|
||||
{
|
||||
if (is_first)
|
||||
throw DB::Exception("Wrong column name. Cannot find column to drop", DB::ErrorCodes::ILLEGAL_COLUMN);
|
||||
}
|
||||
else
|
||||
columns->erase(column_it);
|
||||
is_first = false;
|
||||
}
|
||||
while (column_it != columns->end());
|
||||
}
|
||||
else
|
||||
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -16,11 +16,13 @@ StorageDistributed::StorageDistributed(
|
||||
const String & remote_table_,
|
||||
const DataTypeFactory & data_type_factory_,
|
||||
const Settings & settings,
|
||||
const Context & context_,
|
||||
const String & sign_column_name_)
|
||||
: name(name_), columns(columns_),
|
||||
remote_database(remote_database_), remote_table(remote_table_),
|
||||
data_type_factory(data_type_factory_),
|
||||
sign_column_name(sign_column_name_)
|
||||
sign_column_name(sign_column_name_),
|
||||
context(context_)
|
||||
{
|
||||
for (Addresses::const_iterator it = addresses.begin(); it != addresses.end(); ++it)
|
||||
pools.push_back(new ConnectionPool(
|
||||
@ -37,11 +39,13 @@ StorageDistributed::StorageDistributed(
|
||||
const String & remote_table_,
|
||||
const DataTypeFactory & data_type_factory_,
|
||||
const Settings & settings,
|
||||
const Context & context_,
|
||||
const String & sign_column_name_)
|
||||
: name(name_), columns(columns_),
|
||||
remote_database(remote_database_), remote_table(remote_table_),
|
||||
data_type_factory(data_type_factory_),
|
||||
sign_column_name(sign_column_name_)
|
||||
sign_column_name(sign_column_name_),
|
||||
context(context_)
|
||||
{
|
||||
for (AddressesWithFailover::const_iterator it = addresses.begin(); it != addresses.end(); ++it)
|
||||
{
|
||||
@ -66,9 +70,10 @@ StoragePtr StorageDistributed::create(
|
||||
const String & remote_table_,
|
||||
const DataTypeFactory & data_type_factory_,
|
||||
const Settings & settings,
|
||||
const Context & context_,
|
||||
const String & sign_column_name_)
|
||||
{
|
||||
return (new StorageDistributed(name_, columns_, addresses, remote_database_, remote_table_, data_type_factory_, settings, sign_column_name_))->thisPtr();
|
||||
return (new StorageDistributed(name_, columns_, addresses, remote_database_, remote_table_, data_type_factory_, settings, context_, sign_column_name_))->thisPtr();
|
||||
}
|
||||
|
||||
StoragePtr StorageDistributed::create(
|
||||
@ -79,9 +84,10 @@ StoragePtr StorageDistributed::create(
|
||||
const String & remote_table_,
|
||||
const DataTypeFactory & data_type_factory_,
|
||||
const Settings & settings,
|
||||
const Context & context_,
|
||||
const String & sign_column_name_)
|
||||
{
|
||||
return (new StorageDistributed(name_, columns_, addresses, remote_database_, remote_table_, data_type_factory_, settings, sign_column_name_))->thisPtr();
|
||||
return (new StorageDistributed(name_, columns_, addresses, remote_database_, remote_table_, data_type_factory_, settings, context_, sign_column_name_))->thisPtr();
|
||||
}
|
||||
|
||||
|
||||
@ -119,4 +125,8 @@ BlockInputStreams StorageDistributed::read(
|
||||
return res;
|
||||
}
|
||||
|
||||
void StorageDistributed::alter(const ASTAlterQuery::Parameters ¶ms)
|
||||
{
|
||||
alter_columns(params, columns, context);
|
||||
}
|
||||
}
|
||||
|
@ -204,10 +204,10 @@ StoragePtr StorageFactory::get(
|
||||
|
||||
if (!addresses_with_failover.empty())
|
||||
return StorageDistributed::create(table_name, columns, addresses_with_failover, remote_database, remote_table,
|
||||
context.getDataTypeFactory(), context.getSettings(), sign_column_name);
|
||||
context.getDataTypeFactory(), context.getSettings(), context, sign_column_name);
|
||||
else if (!addresses.empty())
|
||||
return StorageDistributed::create(table_name, columns, addresses, remote_database, remote_table,
|
||||
context.getDataTypeFactory(), context.getSettings(), sign_column_name);
|
||||
context.getDataTypeFactory(), context.getSettings(), context, sign_column_name);
|
||||
else
|
||||
throw Exception("No addresses listed in config", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
|
||||
}
|
||||
|
@ -47,15 +47,11 @@ BlockInputStreams StorageMerge::read(
|
||||
{
|
||||
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
|
||||
context.assertDatabaseExists(source_database);
|
||||
|
||||
const Tables & tables = context.getDatabases().at(source_database);
|
||||
|
||||
|
||||
/** Сначала составим список выбранных таблиц, чтобы узнать его размер.
|
||||
* Это нужно, чтобы правильно передать в каждую таблицу рекомендацию по количеству потоков.
|
||||
*/
|
||||
for (Tables::const_iterator it = tables.begin(); it != tables.end(); ++it)
|
||||
if (it->second != this && table_name_regexp.match(it->first))
|
||||
selected_tables.push_back(it->second);
|
||||
getSelectedTables(selected_tables);
|
||||
}
|
||||
|
||||
for (SelectedTables::iterator it = selected_tables.begin(); it != selected_tables.end(); ++it)
|
||||
@ -83,4 +79,18 @@ BlockInputStreams StorageMerge::read(
|
||||
return res;
|
||||
}
|
||||
|
||||
void StorageMerge::getSelectedTables(StorageVector & selected_tables)
|
||||
{
|
||||
const Tables & tables = context.getDatabases().at(source_database);
|
||||
for (Tables::const_iterator it = tables.begin(); it != tables.end(); ++it)
|
||||
if (it->second != this && table_name_regexp.match(it->first))
|
||||
selected_tables.push_back(it->second);
|
||||
}
|
||||
|
||||
|
||||
void StorageMerge::alter(const ASTAlterQuery::Parameters & params)
|
||||
{
|
||||
alter_columns(params, columns, context);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -950,37 +950,11 @@ void StorageMergeTree::dropImpl()
|
||||
Poco::File(full_path).remove(true);
|
||||
}
|
||||
|
||||
/// одинаковыми считаются имена, если они совпадают целиком или nameWithoutDot совпадает с частью имени до точки
|
||||
bool namesEqual(const String & nameWithoutDot, const DB::NameAndTypePair & name_type)
|
||||
{
|
||||
String nameWithDot = nameWithoutDot + ".";
|
||||
return (nameWithDot == name_type.first.substr(0, nameWithoutDot.length() + 1) || nameWithoutDot == name_type.first);
|
||||
}
|
||||
|
||||
|
||||
void StorageMergeTree::removeColumn(String column_name)
|
||||
void StorageMergeTree::removeColumnFiles(String column_name)
|
||||
{
|
||||
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
|
||||
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
|
||||
|
||||
/// Удаляем колонки из листа columns
|
||||
bool is_first = true;
|
||||
NamesAndTypesList::iterator column_it;
|
||||
do
|
||||
{
|
||||
column_it = std::find_if(columns->begin(), columns->end(), boost::bind(namesEqual, column_name, _1));
|
||||
|
||||
if (column_it == columns->end())
|
||||
{
|
||||
if (is_first)
|
||||
throw DB::Exception("Wrong column name. Cannot find column to drop", DB::ErrorCodes::ILLEGAL_COLUMN);
|
||||
}
|
||||
else
|
||||
columns->erase(column_it);
|
||||
is_first = false;
|
||||
}
|
||||
while (column_it != columns->end());
|
||||
|
||||
/// Регэксп выбирает файлы столбца для удаления
|
||||
Poco::RegularExpression re(column_name + "(?:(?:\\.|\\%2E).+){0,1}" +"(?:\\.mrk|\\.bin|\\.size\\d+\\.bin|\\.size\\d+\\.mrk)");
|
||||
/// Цикл по всем директориям кусочков
|
||||
@ -1007,51 +981,18 @@ void StorageMergeTree::removeColumn(String column_name)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void StorageMergeTree::alter(const ASTAlterQuery::Parameters & params)
|
||||
{
|
||||
if (params.type == ASTAlterQuery::ADD)
|
||||
{
|
||||
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
|
||||
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
|
||||
|
||||
NamesAndTypesList::iterator insert_it = columns->end();
|
||||
if (params.column)
|
||||
{
|
||||
String column_name = dynamic_cast<const ASTIdentifier &>(*params.column).name;
|
||||
|
||||
/// Пытаемся найти первую с конца колонку с именем column_name или column_name.*
|
||||
NamesAndTypesList::reverse_iterator reverse_insert_it = std::find_if(columns->rbegin(), columns->rend(), boost::bind(namesEqual, column_name, _1) );
|
||||
|
||||
if (reverse_insert_it == columns->rend())
|
||||
throw DB::Exception("Wrong column name. Cannot find column to insert after", DB::ErrorCodes::ILLEGAL_COLUMN);
|
||||
else
|
||||
{
|
||||
/// base возвращает итератор уже смещенный на один элемент вправо
|
||||
insert_it = reverse_insert_it.base();
|
||||
}
|
||||
}
|
||||
|
||||
const ASTNameTypePair & ast_name_type = dynamic_cast<const ASTNameTypePair &>(*params.name_type);
|
||||
StringRange type_range = ast_name_type.type->range;
|
||||
String type_string = String(type_range.first, type_range.second - type_range.first);
|
||||
|
||||
DB::DataTypePtr data_type = context.getDataTypeFactory().get(type_string);
|
||||
NameAndTypePair pair(ast_name_type.name, data_type );
|
||||
columns->insert(insert_it, pair);
|
||||
|
||||
/// Медленно, так как каждый раз копируется список
|
||||
columns = DataTypeNested::expandNestedColumns(*columns);
|
||||
return;
|
||||
alter_columns(params, columns, context);
|
||||
}
|
||||
else if (params.type == ASTAlterQuery::DROP)
|
||||
if (params.type == ASTAlterQuery::DROP)
|
||||
{
|
||||
String column_name = dynamic_cast<const ASTIdentifier &>(*params.column).name;
|
||||
removeColumn(column_name);
|
||||
removeColumnFiles(column_name);
|
||||
}
|
||||
else
|
||||
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -1082,7 +1023,7 @@ bool StorageMergeTree::removeIfBroken(const String & path)
|
||||
{
|
||||
Poco::File marks_file(path + "/" + escapeForFileName(it->first) + ".mrk");
|
||||
|
||||
/// Такое случается при добавлении нового столбца в таблицу. Не будем ничего удалять.
|
||||
/// при добавлении нового столбца в таблицу файлы .mrk не создаются. Не будем ничего удалять.
|
||||
if (!marks_file.exists())
|
||||
return false;
|
||||
|
||||
|
28
dbms/src/Storages/tests/test_alter_distributed.sql
Normal file
28
dbms/src/Storages/tests/test_alter_distributed.sql
Normal file
@ -0,0 +1,28 @@
|
||||
create database if not exists test;
|
||||
|
||||
drop table if exists test.merge_distributed;
|
||||
drop table if exists test.merge_distributed1;
|
||||
|
||||
create table test.merge_distributed1 ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = CollapsingMergeTree(StartDate, intHash32(UserID), tuple(CounterID, StartDate, intHash32(UserID), VisitID, ClickLogID), 8192, Sign);
|
||||
insert into test.merge_distributed1 values (1, '2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3);
|
||||
|
||||
create table test.merge_distributed ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = Distributed(self, test, merge_distributed1);
|
||||
|
||||
alter table test.merge_distributed1 add column dummy String after CounterID;
|
||||
alter table test.merge_distributed add column dummy String after CounterID;
|
||||
|
||||
describe table test.merge_distributed;
|
||||
show create table test.merge_distributed;
|
||||
|
||||
insert into test.merge_distributed1 values (1, 'Hello, Alter Table!','2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3);
|
||||
select CounterID, dummy from test.merge_distributed where dummy <> '' limit 10;
|
||||
|
||||
alter table test.merge_distributed drop column dummy;
|
||||
|
||||
describe table test.merge_distributed;
|
||||
show create table test.merge_distributed;
|
||||
|
||||
--error: должен упасть, так как не существует столбца dummy1
|
||||
alter table test.merge_distributed add column dummy1 String after CounterID;
|
||||
select CounterID, dummy1 from test.merge_distributed where dummy1 <> '' limit 10;
|
||||
|
35
dbms/src/Storages/tests/test_alter_merge.sql
Normal file
35
dbms/src/Storages/tests/test_alter_merge.sql
Normal file
@ -0,0 +1,35 @@
|
||||
create database if not exists test;
|
||||
|
||||
drop table if exists test.merge;
|
||||
drop table if exists test.merge1;
|
||||
drop table if exists test.merge2;
|
||||
|
||||
create table test.merge1 ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = CollapsingMergeTree(StartDate, intHash32(UserID), tuple(CounterID, StartDate, intHash32(UserID), VisitID, ClickLogID), 8192, Sign);
|
||||
insert into test.merge1 values (1, '2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3);
|
||||
|
||||
create table test.merge2 ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = CollapsingMergeTree(StartDate, intHash32(UserID), tuple(CounterID, StartDate, intHash32(UserID), VisitID, ClickLogID), 8192, Sign);
|
||||
insert into test.merge2 values (2, '2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3);
|
||||
|
||||
create table test.merge ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = Merge(test, 'merge\[0-9\]');
|
||||
|
||||
alter table test.merge1 add column dummy String after CounterID;
|
||||
alter table test.merge2 add column dummy String after CounterID;
|
||||
alter table test.merge add column dummy String after CounterID;
|
||||
|
||||
describe table test.merge;
|
||||
show create table test.merge;
|
||||
|
||||
insert into test.merge1 values (1, 'Hello, Alter Table!','2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3);
|
||||
|
||||
select CounterID, dummy from test.merge where dummy <> '' limit 10;
|
||||
|
||||
|
||||
alter table test.merge drop column dummy;
|
||||
|
||||
describe table test.merge;
|
||||
show create table test.merge;
|
||||
|
||||
--error: должно правильно упасть в alter
|
||||
alter table test.merge add column dummy1 String after CounterID;
|
||||
select CounterID, dummy1 from test.merge where dummy1 <> '' limit 10;
|
||||
|
17
dbms/src/Storages/tests/test_alter_merge_tree.sql
Normal file
17
dbms/src/Storages/tests/test_alter_merge_tree.sql
Normal file
@ -0,0 +1,17 @@
|
||||
create database if not exists test;
|
||||
|
||||
drop table if exists test.merge_tree;
|
||||
|
||||
create table test.merge_tree ( CounterID UInt32, StartDate Date, Sign Int8, VisitID UInt64, UserID UInt64, StartTime DateTime, ClickLogID UInt64) ENGINE = CollapsingMergeTree(StartDate, intHash32(UserID), tuple(CounterID, StartDate, intHash32(UserID), VisitID, ClickLogID), 8192, Sign);
|
||||
|
||||
insert into test.merge_tree values (1, '2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3)
|
||||
alter table test.merge_tree add column dummy String after CounterID;
|
||||
describe table test.merge_tree;
|
||||
|
||||
insert into test.merge_tree values (1, 'Hello, Alter Table!','2013-09-19', 1, 0, 2, '2013-09-19 12:43:06', 3)
|
||||
|
||||
select CounterID, dummy from test.merge_tree where dummy <> '' limit 10;
|
||||
|
||||
alter table test.merge_tree drop column dummy;
|
||||
|
||||
describe table test.merge_tree;
|
91
dbms/tests/test_runner.sh
Executable file
91
dbms/tests/test_runner.sh
Executable file
@ -0,0 +1,91 @@
|
||||
#!/bin/bash
|
||||
# script to run query to databases
|
||||
|
||||
function usage()
|
||||
{
|
||||
cat <<EOF
|
||||
usage: $0 test_queries.sql
|
||||
|
||||
This script run queries from test file and check output
|
||||
|
||||
OPTIONS:
|
||||
-h Show this message
|
||||
EOF
|
||||
}
|
||||
|
||||
TIMES=1
|
||||
test_file=$1
|
||||
log=$test_file"_log_$(date +%H_%M_%d_%m_%Y)"
|
||||
echo "Log to $log"
|
||||
|
||||
while getopts “h” OPTION
|
||||
do
|
||||
case $OPTION in
|
||||
h)
|
||||
usage
|
||||
exit 0
|
||||
;;
|
||||
?)
|
||||
usage
|
||||
exit 0
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
if [[ ! -f $test_file ]]; then
|
||||
echo "Not found: test file"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
function execute()
|
||||
{
|
||||
echo "start time: $(date)" > $log
|
||||
queries=("${@}")
|
||||
queries_count=${#queries[@]}
|
||||
|
||||
index=0
|
||||
while [ "$index" -lt "$queries_count" ]; do
|
||||
query=${queries[$index]}
|
||||
|
||||
if [[ $query == "" ]]; then
|
||||
let "index = $index + 1"
|
||||
continue
|
||||
fi
|
||||
|
||||
comment_re='--.*'
|
||||
if [[ $query =~ $comment_re ]]; then
|
||||
echo "$query"
|
||||
echo
|
||||
else
|
||||
echo "query:" "$query"
|
||||
expect -c "#!/bin/bash
|
||||
#!/bin/expect
|
||||
|
||||
# Set timeout
|
||||
set timeout 600
|
||||
|
||||
# Get arguments
|
||||
set query [lindex $argv 0]
|
||||
|
||||
spawn clickhouse-client --multiline;
|
||||
expect \":) \"
|
||||
send \"$query;\r\";
|
||||
expect \":) \"
|
||||
send \"quit\";" >> "$log"
|
||||
fi
|
||||
let "index = $index + 1"
|
||||
done
|
||||
|
||||
echo "stop time: $(date)" >> $log
|
||||
}
|
||||
|
||||
mapfile -t test_queries < $test_file
|
||||
|
||||
execute "${test_queries[@]}"
|
||||
|
||||
echo "Error list"
|
||||
cat $log
|
||||
|
||||
echo
|
||||
echo Error list\:
|
||||
cat $log | grep -iP 'error|exception'
|
Loading…
Reference in New Issue
Block a user