This commit is contained in:
Pavel Kartavyy 2013-08-07 13:07:42 +00:00
parent 82af5f8e41
commit a5c799d7a3
12 changed files with 486 additions and 5 deletions

View File

@ -0,0 +1,23 @@
#pragma once
#include <DB/Storages/IStorage.h>
#include <DB/Interpreters/Context.h>
namespace DB
{
/** Позволяет добавить или удалить столбец в таблицу
*/
class InterpreterAlterQuery
{
public:
InterpreterAlterQuery(ASTPtr query_ptr_, Context & context_);
void execute();
private:
ASTPtr query_ptr;
Context context;
};
}

View File

@ -0,0 +1,64 @@
#pragma once
#include <DB/Parsers/IAST.h>
#include <boost/concept_check.hpp>
namespace DB
{
/** ALTER запрос
* ALTER TABLE [db.]name_type
* ADD COLUMN col_name type [AFTER col_after],
* DROP COLUMN col_drop,
* ...
*/
class ASTAlterQuery : public IAST
{
public:
enum ParameterType
{
ADD,
DROP,
NO_TYPE
};
struct Parameters
{
Parameters() : type(NO_TYPE) {}
int type;
/// Пара имя, тип используется в ADD COLUMN
ASTPtr name_type;
ASTPtr column;
/// deep copy
void clone( Parameters & p) const
{
p.type = type;
p.column = column->clone();
p.name_type = name_type->clone();
}
};
typedef std::vector<Parameters> ParameterContainer;
ParameterContainer parameters;
String database;
String table;
ASTAlterQuery(StringRange range_) : IAST(range_) {};
/** Получить текст, который идентифицирует этот элемент. */
String getID() const { return ("AlterQuery_" + database + "_" + table); };
ASTPtr clone() const
{
ASTAlterQuery * res = new ASTAlterQuery(*this);
for(ParameterContainer::size_type i = 0; i < parameters.size(); ++i)
{
parameters[i].clone(res->parameters[i]);
}
return res;
}
};
}

View File

@ -0,0 +1,20 @@
#pragma once
#include <DB/Parsers/IParserBase.h>
#include <DB/Parsers/ExpressionElementParsers.h>
namespace DB
{
/** Запрос типа такого:
* ALTER TABLE [db.]name
* [ADD COLUMN col_type [AFTER col_after],]
* [DROP COLUMN col_drop, ...]
*/
class ParserAlterQuery : public IParserBase
{
protected:
String getName() { return "ALTER query"; }
bool parseImpl(Pos & pos, Pos end, ASTPtr & node, String & expected);
};
}

View File

@ -21,6 +21,7 @@
#include <DB/Parsers/ASTAsterisk.h>
#include <DB/Parsers/ASTOrderByElement.h>
#include <DB/Parsers/ASTSubquery.h>
#include <DB/Parsers/ASTAlterQuery.h>
namespace DB
@ -52,5 +53,6 @@ void formatAST(const ASTAsterisk & ast, std::ostream & s, size_t indent = 0, bo
void formatAST(const ASTOrderByElement & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false);
void formatAST(const ASTSubquery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false);
void formatAST(const ASTQueryWithTableAndOutput & ast, std::string name, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false);
void formatAST(const ASTAlterQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false);
}

View File

@ -10,6 +10,7 @@
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/Parsers/IAST.h>
#include <DB/Parsers/ASTAlterQuery.h>
#include <DB/Interpreters/Settings.h>
#include <DB/Storages/StoragePtr.h>
#include "DatabaseDropper.h"
@ -122,7 +123,7 @@ public:
/** ALTER таблицы в виде изменения столбцов, не затрагивающий изменение Storage или его параметров.
* (ALTER, затрагивающий изменение движка, делается внешним кодом, путём копирования данных.)
*/
virtual void alter(NamesAndTypesListPtr columns)
virtual void alter(const ASTAlterQuery::Parameters &params)
{
throw Exception("Method alter is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}

View File

@ -152,6 +152,8 @@ public:
void rename(const String & new_path_to_db, const String & new_name);
void alter(const ASTAlterQuery::Parameters &params);
private:
String path;
String name;
@ -180,6 +182,9 @@ private:
Logger * log;
/// Регулярное выражение соответсвующее названию директории с кусочками
Poco::RegularExpression file_name_regexp;
/// Описание куска с данными.
struct DataPart
{
@ -319,6 +324,12 @@ private:
void joinMergeThreads();
Poco::SharedPtr<boost::threadpool::pool> merge_threads;
/// берет нерекурсивные блокировки data_parts_mutex и all_data_parts_mutex
void removeColumnFiles(String column_name);
/// Возвращает true если имя директории совпадает с форматом имени директории кусочков
bool isBlockDirectory(const String dir_name, Poco::RegularExpression::MatchVec & matches);
};
}

View File

@ -0,0 +1,117 @@
#include <DB/Interpreters/InterpreterAlterQuery.h>
#include <DB/Parsers/ASTAlterQuery.h>
#include <DB/Parsers/ASTCreateQuery.h>
#include <DB/Parsers/ASTExpressionList.h>
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ParserCreateQuery.h>
#include <DB/IO/copyData.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Parsers/formatAST.h>
#include <algorithm>
#include <boost/bind.hpp>
#include <boost/bind/placeholders.hpp>
namespace DB
{
InterpreterAlterQuery::InterpreterAlterQuery(ASTPtr query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_)
{
}
bool namesEqual( const String &name, const ASTPtr & name_type_)
{
const ASTNameTypePair &name_type = dynamic_cast<const ASTNameTypePair &>(*name_type_);
return name_type.name == name;
}
void InterpreterAlterQuery::execute()
{
/// Poco::Mutex является рекурсивным, т.е. взятие мьютекса дважды из одного потока не приводит к блокировке
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
ASTAlterQuery &alter = dynamic_cast<ASTAlterQuery &>(*query_ptr);
String database_name = alter.database.empty() ? context.getCurrentDatabase() : alter.database;
String &table_name = alter.table;
String database_name_escaped = escapeForFileName(database_name);
String table_name_escaped = escapeForFileName(table_name);
StoragePtr table = context.getTable(database_name, table_name);
String path = context.getPath();
String metadata_path = path + "metadata/" + database_name_escaped + "/" + (!table_name.empty() ? table_name_escaped + ".sql" : "");
ASTPtr attach_ptr = context.getCreateQuery(database_name, table_name);
ASTCreateQuery & attach = dynamic_cast< ASTCreateQuery &>(*attach_ptr);
attach.attach = true;
ASTs & columns = dynamic_cast<ASTExpressionList &>( *attach.columns).children;
const ASTFunction & storage = dynamic_cast<const ASTFunction &>(*attach.storage);
String engine_string = storage.getColumnName();
Poco::RegularExpression::MatchVec matches;
const DataTypeFactory & data_type_factory = context.getDataTypeFactory();
for (ASTAlterQuery::ParameterContainer::const_iterator alter_it = alter.parameters.begin();
alter_it != alter.parameters.end(); ++alter_it)
{
const ASTAlterQuery::Parameters & params = *alter_it;
if (params.type == ASTAlterQuery::ADD)
{
const ASTNameTypePair &name_type = dynamic_cast<const ASTNameTypePair &>(*params.name_type);
StringRange type_range = name_type.type->range;
/// проверяем корректность типа. В случае некоректного типа будет исключение
data_type_factory.get(String(type_range.first, type_range.second - type_range.first));
/// Проверяем, что колонка еще не существует
if (std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, name_type.name, _1)) != columns.end())
throw DB::Exception("Wrong column name. Column already exists", DB::ErrorCodes::ILLEGAL_COLUMN);
/// Проверяем опциональный аргумент AFTER
if (params.column)
{
const ASTIdentifier & col_after = dynamic_cast<const ASTIdentifier &>(*params.column);
ASTs::iterator insert_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, col_after.name, _1)) ;
if (insert_it == columns.end())
throw DB::Exception("Wrong column name. Cannot find column to insert after", DB::ErrorCodes::ILLEGAL_COLUMN);
else
{
// increase iterator because we want to insert after founded element not before
++insert_it;
columns.insert(insert_it, params.name_type);
}
}
else
columns.push_back(params.name_type);
}
else
if (params.type == ASTAlterQuery::DROP)
{
/// Проверяем, что поле не является ключевым
const ASTIdentifier & drop_column = dynamic_cast <const ASTIdentifier &>(*params.column);
Poco::RegularExpression key_column_re("[\\(,\\s]\\s*" + drop_column.name + "\\s*[,\\)]");
if (key_column_re.match(engine_string, 0, matches))
throw DB::Exception("Cannot drop key column", DB::ErrorCodes::ILLEGAL_COLUMN);
ASTs::iterator drop_it = std::find_if(columns.begin(), columns.end(), boost::bind(namesEqual, drop_column.name, _1));
if (drop_it == columns.end())
throw DB::Exception("Wrong column name. Cannot find column to drop", DB::ErrorCodes::ILLEGAL_COLUMN);
else
columns.erase(drop_it);
}
table->alter(params);
/// Перезаписываем файл метадата каждую итерацию
Poco::FileOutputStream ostr(metadata_path);
formatAST(attach, ostr, 0, false);
}
}
}

View File

@ -7,6 +7,7 @@
#include <DB/Parsers/ASTUseQuery.h>
#include <DB/Parsers/ASTSetQuery.h>
#include <DB/Parsers/ASTOptimizeQuery.h>
#include <DB/Parsers/ASTAlterQuery.h>
#include <DB/Parsers/TablePropertiesQueriesASTs.h>
#include <DB/Interpreters/InterpreterSelectQuery.h>
@ -22,6 +23,7 @@
#include <DB/Interpreters/InterpreterDescribeQuery.h>
#include <DB/Interpreters/InterpreterShowCreateQuery.h>
#include <DB/Interpreters/InterpreterQuery.h>
#include <DB/Interpreters/InterpreterAlterQuery.h>
namespace DB
@ -102,6 +104,12 @@ void InterpreterQuery::execute(WriteBuffer & ostr, ReadBuffer * remaining_data_i
InterpreterDescribeQuery interpreter(query_ptr, context);
query_plan = interpreter.executeAndFormat(ostr);
}
else if (dynamic_cast<ASTAlterQuery *>(&*query_ptr))
{
throwIfReadOnly();
InterpreterAlterQuery interpreter(query_ptr, context);
interpreter.execute();
}
else
throw Exception("Unknown type of query: " + query_ptr->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
}
@ -179,6 +187,12 @@ BlockIO InterpreterQuery::execute()
InterpreterDescribeQuery interpreter(query_ptr, context);
res = interpreter.execute();
}
else if (dynamic_cast<ASTAlterQuery *>(&*query_ptr))
{
throwIfReadOnly();
InterpreterAlterQuery interpreter(query_ptr, context);
interpreter.execute();
}
else
throw Exception("Unknown type of query: " + query_ptr->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY);

View File

@ -0,0 +1,131 @@
#include <DB/Parsers/ParserAlterQuery.h>
#include <DB/Parsers/CommonParsers.h>
#include <DB/Parsers/ExpressionElementParsers.h>
#include <DB/Parsers/ParserCreateQuery.h>
#include <boost/concept_check.hpp>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTAlterQuery.h>
namespace DB
{
bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & expected)
{
Pos begin = pos;
ParserWhiteSpaceOrComments ws;
ParserString s_alter("ALTER", true, true);
ParserString s_table("TABLE", true, true);
ParserString s_dot(".");
ParserString s_add("ADD", true, true);
ParserString s_column("COLUMN", true, true);
ParserString s_after("AFTER", true, true);
ParserString s_drop("DROP", true, true);
ParserString s_comma(",");
ParserIdentifier parser_name;
ParserNameTypePair parser_name_type;
ASTPtr table;
ASTPtr database;
ASTPtr col_type;
ASTPtr col_after;
ASTPtr col_drop;
ASTAlterQuery * query = new ASTAlterQuery(StringRange(begin, pos));
ASTPtr temp_node = query;
ws.ignore(pos, end);
if (!s_alter.ignore(pos, end, expected))
{
return false;
}
ws.ignore(pos, end);
if (!s_table.ignore(pos, end, expected))
{
return false;
}
ws.ignore(pos, end);
if (!parser_name.parse(pos, end, database, expected))
return false;
// Parse [db].name
if (s_dot.ignore(pos, end))
{
if (!parser_name.parse(pos, end, table, expected))
return false;
query->table = dynamic_cast<ASTIdentifier &>(*table).name;
query->database = dynamic_cast<ASTIdentifier &>(*database).name;
}
else
{
table = database;
query->table = dynamic_cast<ASTIdentifier &>(*table).name;
}
bool parsing_finished = false;
do
{
ASTAlterQuery::Parameters params;
ws.ignore(pos, end);
if (s_add.ignore(pos, end, expected))
{
ws.ignore(pos, end);
s_column.ignore(pos, end, expected);
ws.ignore(pos, end);
parser_name_type.parse(pos, end, params.name_type, expected);
ws.ignore(pos, end);
if (s_after.ignore(pos, end, expected))
{
ws.ignore(pos, end);
if(!parser_name.parse(pos, end, params.column, expected))
return false;
}
params.type = ASTAlterQuery::ADD;
}
else
{
if (s_drop.ignore(pos, end, expected))
{
ws.ignore(pos, end);
s_column.ignore(pos, end, expected);
ws.ignore(pos, end);
parser_name.parse(pos, end, params.column, expected);
params.type = ASTAlterQuery::DROP;
} else
return false;
}
ws.ignore(pos, end);
if (!s_comma.ignore(pos, end, expected))
{
ws.ignore(pos, end);
parsing_finished = true;
}
query->parameters.push_back(params);
}
while (!parsing_finished);
node = temp_node;
return true;
}
}

View File

@ -10,6 +10,7 @@
#include <DB/Parsers/ParserSetQuery.h>
#include <DB/Parsers/ParserQuery.h>
#include <DB/Parsers/ParserTablePropertiesQuery.h>
#include <DB/Parsers/ParserAlterQuery.h>
namespace DB
@ -24,6 +25,7 @@ bool ParserQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & expected
ParserCreateQuery create_p;
ParserRenameQuery rename_p;
ParserDropQuery drop_p;
ParserAlterQuery alter_p;
ParserUseQuery use_p;
ParserSetQuery set_p;
ParserOptimizeQuery optimize_p;
@ -35,13 +37,14 @@ bool ParserQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & expected
|| create_p.parse(pos, end, node, expected)
|| rename_p.parse(pos, end, node, expected)
|| drop_p.parse(pos, end, node, expected)
|| alter_p.parse(pos, end, node, expected)
|| use_p.parse(pos, end, node, expected)
|| set_p.parse(pos, end, node, expected)
|| optimize_p.parse(pos, end, node, expected)
|| table_p.parse(pos, end, node, expected);
if (!res)
expected = "One of: SHOW TABLES, SHOW DATABASES, SHOW CREATE TABLE, SELECT, INSERT, CREATE, ATTACH, RENAME, DROP, DETACH, USE, SET, OPTIMIZE, EXISTS, DESCRIBE, DESC";
expected = "One of: SHOW TABLES, SHOW DATABASES, SHOW CREATE TABLE, SELECT, INSERT, CREATE, ATTACH, RENAME, DROP, DETACH, USE, SET, OPTIMIZE, EXISTS, DESCRIBE, DESC, ALTER";
return res;
}

View File

@ -65,6 +65,7 @@ void formatAST(const IAST & ast, std::ostream & s, size_t indent, bool hilite, b
DISPATCH(Asterisk)
DISPATCH(OrderByElement)
DISPATCH(Subquery)
DISPATCH(AlterQuery)
else
throw DB::Exception("Unknown element in AST: " + ast.getID() + " '" + std::string(ast.range.first, ast.range.second - ast.range.first) + "'",
ErrorCodes::UNKNOWN_ELEMENT_IN_AST);
@ -450,5 +451,53 @@ void formatAST(const ASTOrderByElement & ast, std::ostream & s, size_t indent,
}
}
void formatAST(const ASTAlterQuery & ast, std::ostream & s, size_t indent, bool hilite, bool one_line)
{
std::string nl_or_nothing = one_line ? "" : "\n";
std::string indent_str = one_line ? "" : std::string(4 * indent, ' ');
std::string nl_or_ws = one_line ? " " : "\n";
s << (hilite ? hilite_keyword : "") << indent_str << "ALTER TABLE " << (hilite ? hilite_none : "");
if (!ast.table.empty())
{
if (!ast.database.empty())
{
s << (hilite ? hilite_keyword : "") << indent_str << ast.database << (hilite ? hilite_none : "");
s << ".";
}
s << (hilite ? hilite_keyword : "") << indent_str << ast.table << (hilite ? hilite_none : "");
}
s << nl_or_ws;
for( std::size_t i = 0; i < ast.parameters.size(); ++i)
{
const ASTAlterQuery::Parameters &p = ast.parameters[i];
if (p.type == ASTAlterQuery::ADD)
{
s << (hilite ? hilite_keyword : "") << indent_str << "ADD COLUMN " << (hilite ? hilite_none : "");
formatAST(*p.name_type, s, indent, hilite, true);
/// AFTER
if (p.column)
{
s << (hilite ? hilite_keyword : "") << indent_str << " AFTER " << (hilite ? hilite_none : "");
formatAST(*p.column, s, indent, hilite, one_line);
}
}
else if (p.type == ASTAlterQuery::DROP)
{
s << (hilite ? hilite_keyword : "") << indent_str << "DROP COLUMN " << (hilite ? hilite_none : "");
formatAST(*p.column, s, indent, hilite, true);
}
std::string comma = (i < (ast.parameters.size() -1) ) ? "," : "";
s << (hilite ? hilite_keyword : "") << indent_str << comma << (hilite ? hilite_none : "");
s << nl_or_ws;
}
}
}

View File

@ -38,6 +38,7 @@
#include <DB/Parsers/ASTFunction.h>
#include <DB/Parsers/ASTLiteral.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/Interpreters/sortBlock.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
@ -66,7 +67,8 @@ StorageMergeTree::StorageMergeTree(
index_granularity(index_granularity_),
sign_column(sign_column_),
settings(settings_),
increment(full_path + "increment.txt"), log(&Logger::get("StorageMergeTree: " + name))
increment(full_path + "increment.txt"), log(&Logger::get("StorageMergeTree: " + name)),
file_name_regexp("^(\\d{8})_(\\d{8})_(\\d+)_(\\d+)_(\\d+)")
{
min_marks_for_seek = (settings.min_rows_for_seek + index_granularity - 1) / index_granularity;
min_marks_for_concurrent_read = (settings.min_rows_for_concurrent_read + index_granularity - 1) / index_granularity;
@ -490,14 +492,13 @@ void StorageMergeTree::loadDataParts()
Yandex::DateLUTSingleton & date_lut = Yandex::DateLUTSingleton::instance();
data_parts.clear();
static Poco::RegularExpression file_name_regexp("^(\\d{8})_(\\d{8})_(\\d+)_(\\d+)_(\\d+)");
Poco::DirectoryIterator end;
Poco::RegularExpression::MatchVec matches;
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
{
std::string file_name = it.name();
if (!(file_name_regexp.match(file_name, 0, matches) && 6 == matches.size()))
if (!isBlockDirectory(file_name, matches))
continue;
DataPartPtr part = new DataPart(*this);
@ -914,4 +915,49 @@ void StorageMergeTree::dropImpl()
Poco::File(full_path).remove(true);
}
void StorageMergeTree::removeColumnFiles(String column_name)
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
Poco::DirectoryIterator end;
Poco::RegularExpression::MatchVec matches;
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path); it != end; ++it)
{
std::string file_name = it.name();
if (!isBlockDirectory(file_name, matches))
continue;
String full_dir_name = full_path + file_name + "/";
Poco::File file(full_dir_name + column_name + ".mrk");
if (file.exists())
file.remove();
file = Poco::File(full_dir_name + column_name + ".bin");
if (file.exists())
file.remove();
}
}
void StorageMergeTree::alter(const ASTAlterQuery::Parameters &params)
{
if (params.type == ASTAlterQuery::ADD)
return;
if (params.type == ASTAlterQuery::DROP)
{
const ASTIdentifier & column = dynamic_cast<const ASTIdentifier &>(*params.column);
removeColumnFiles(column.name);
}
else
throw Exception("Wrong parameter type in alter query");
}
bool StorageMergeTree::isBlockDirectory(const String dir_name, Poco::RegularExpression::MatchVec & matches)
{
return (file_name_regexp.match(dir_name, 0, matches) && 6 == matches.size());
}
}