Merge pull request #2504 from yandex/system-mutations-table

[WIP] system.mutations table skeleton
This commit is contained in:
alexey-milovidov 2018-06-13 23:00:30 +03:00 committed by GitHub
commit 08ec751a8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1028 additions and 755 deletions

View File

@ -2,6 +2,8 @@
#include <DataStreams/FilterBlockInputStream.h> #include <DataStreams/FilterBlockInputStream.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Interpreters/ExpressionAnalyzer.h> #include <Interpreters/ExpressionAnalyzer.h>
#include <IO/WriteHelpers.h>
namespace DB namespace DB
{ {
@ -32,7 +34,7 @@ ApplyingMutationsBlockInputStream::ApplyingMutationsBlockInputStream(
break; break;
} }
default: default:
throw Exception("Unsupported mutation cmd type: " + toString(static_cast<int>(cmd.type)), throw Exception("Unsupported mutation cmd type: " + toString<int>(cmd.type),
ErrorCodes::LOGICAL_ERROR); ErrorCodes::LOGICAL_ERROR);
} }
} }

View File

@ -207,12 +207,12 @@ static std::unique_ptr<zkutil::Lock> createSimpleZooKeeperLock(
static bool isSupportedAlterType(int type) static bool isSupportedAlterType(int type)
{ {
static const std::unordered_set<int> supported_alter_types{ static const std::unordered_set<int> supported_alter_types{
ASTAlterQuery::ADD_COLUMN, ASTAlterCommand::ADD_COLUMN,
ASTAlterQuery::DROP_COLUMN, ASTAlterCommand::DROP_COLUMN,
ASTAlterQuery::MODIFY_COLUMN, ASTAlterCommand::MODIFY_COLUMN,
ASTAlterQuery::MODIFY_PRIMARY_KEY, ASTAlterCommand::MODIFY_PRIMARY_KEY,
ASTAlterQuery::DROP_PARTITION, ASTAlterCommand::DROP_PARTITION,
ASTAlterQuery::DELETE, ASTAlterCommand::DELETE,
}; };
return supported_alter_types.count(type) != 0; return supported_alter_types.count(type) != 0;
@ -621,13 +621,13 @@ void DDLWorker::processTaskAlter(
bool execute_once_on_replica = storage->supportsReplication(); bool execute_once_on_replica = storage->supportsReplication();
bool execute_on_leader_replica = false; bool execute_on_leader_replica = false;
for (const auto & param : ast_alter->parameters) for (const auto & command : ast_alter->command_list->commands)
{ {
if (!isSupportedAlterType(param.type)) if (!isSupportedAlterType(command->type))
throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED); throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED);
if (execute_once_on_replica) if (execute_once_on_replica)
execute_on_leader_replica |= param.type == ASTAlterQuery::DROP_PARTITION; execute_on_leader_replica |= command->type == ASTAlterCommand::DROP_PARTITION;
} }
const auto & shard_info = task.cluster->getShardsInfo().at(task.host_shard_num); const auto & shard_info = task.cluster->getShardsInfo().at(task.host_shard_num);
@ -1142,9 +1142,9 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
if (auto query_alter = dynamic_cast<const ASTAlterQuery *>(query_ptr.get())) if (auto query_alter = dynamic_cast<const ASTAlterQuery *>(query_ptr.get()))
{ {
for (const auto & param : query_alter->parameters) for (const auto & command : query_alter->command_list->commands)
{ {
if (!isSupportedAlterType(param.type)) if (!isSupportedAlterType(command->type))
throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED); throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED);
} }
} }

View File

@ -1,21 +1,7 @@
#include <Interpreters/InterpreterAlterQuery.h> #include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/DDLWorker.h> #include <Interpreters/DDLWorker.h>
#include <Parsers/ASTAlterQuery.h> #include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTCreateQuery.h> #include <Common/typeid_cast.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTNameTypePair.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ParserCreateQuery.h>
#include <IO/copyData.h>
#include <IO/ReadBufferFromFile.h>
#include <Common/escapeForFileName.h>
#include <DataTypes/DataTypeFactory.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Poco/FileStream.h>
#include <algorithm> #include <algorithm>
@ -26,8 +12,6 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int BAD_ARGUMENTS;
extern const int ILLEGAL_COLUMN; extern const int ILLEGAL_COLUMN;
} }
@ -51,9 +35,19 @@ BlockIO InterpreterAlterQuery::execute()
AlterCommands alter_commands; AlterCommands alter_commands;
PartitionCommands partition_commands; PartitionCommands partition_commands;
MutationCommands mutation_commands; MutationCommands mutation_commands;
parseAlter(alter.parameters, alter_commands, partition_commands, mutation_commands); for (ASTAlterCommand * command_ast : alter.command_list->commands)
{
if (auto alter_command = AlterCommand::parse(command_ast))
alter_commands.emplace_back(std::move(*alter_command));
else if (auto partition_command = PartitionCommand::parse(command_ast))
partition_commands.emplace_back(std::move(*partition_command));
else if (auto mut_command = MutationCommand::parse(command_ast))
mutation_commands.emplace_back(std::move(*mut_command));
else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
}
if (!mutation_commands.commands.empty()) if (!mutation_commands.empty())
{ {
mutation_commands.validate(*table, context); mutation_commands.validate(*table, context);
table->mutate(mutation_commands, context); table->mutate(mutation_commands, context);
@ -103,137 +97,4 @@ BlockIO InterpreterAlterQuery::execute()
return {}; return {};
} }
void InterpreterAlterQuery::parseAlter(
const ASTAlterQuery::ParameterContainer & params_container,
AlterCommands & out_alter_commands,
PartitionCommands & out_partition_commands,
MutationCommands & out_mutation_commands)
{
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
for (const auto & params : params_container)
{
if (params.type == ASTAlterQuery::ADD_COLUMN)
{
AlterCommand command;
command.type = AlterCommand::ADD_COLUMN;
const auto & ast_col_decl = typeid_cast<const ASTColumnDeclaration &>(*params.col_decl);
command.column_name = ast_col_decl.name;
if (ast_col_decl.type)
{
command.data_type = data_type_factory.get(ast_col_decl.type);
}
if (ast_col_decl.default_expression)
{
command.default_kind = columnDefaultKindFromString(ast_col_decl.default_specifier);
command.default_expression = ast_col_decl.default_expression;
}
if (params.column)
command.after_column = typeid_cast<const ASTIdentifier &>(*params.column).name;
out_alter_commands.emplace_back(std::move(command));
}
else if (params.type == ASTAlterQuery::DROP_COLUMN)
{
if (params.partition)
{
if (!params.clear_column)
throw Exception("Can't DROP COLUMN from partition. It is possible only CLEAR COLUMN in partition", ErrorCodes::BAD_ARGUMENTS);
const Field & column_name = typeid_cast<const ASTIdentifier &>(*(params.column)).name;
out_partition_commands.emplace_back(PartitionCommand::clearColumn(params.partition, column_name));
}
else
{
if (params.clear_column)
throw Exception("\"ALTER TABLE table CLEAR COLUMN column\" queries are not supported yet. Use \"CLEAR COLUMN column IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED);
AlterCommand command;
command.type = AlterCommand::DROP_COLUMN;
command.column_name = typeid_cast<const ASTIdentifier &>(*(params.column)).name;
out_alter_commands.emplace_back(std::move(command));
}
}
else if (params.type == ASTAlterQuery::MODIFY_COLUMN)
{
AlterCommand command;
command.type = AlterCommand::MODIFY_COLUMN;
const auto & ast_col_decl = typeid_cast<const ASTColumnDeclaration &>(*params.col_decl);
command.column_name = ast_col_decl.name;
if (ast_col_decl.type)
{
command.data_type = data_type_factory.get(ast_col_decl.type);
}
if (ast_col_decl.default_expression)
{
command.default_kind = columnDefaultKindFromString(ast_col_decl.default_specifier);
command.default_expression = ast_col_decl.default_expression;
}
out_alter_commands.emplace_back(std::move(command));
}
else if (params.type == ASTAlterQuery::MODIFY_PRIMARY_KEY)
{
AlterCommand command;
command.type = AlterCommand::MODIFY_PRIMARY_KEY;
command.primary_key = params.primary_key;
out_alter_commands.emplace_back(std::move(command));
}
else if (params.type == ASTAlterQuery::DROP_PARTITION)
{
out_partition_commands.emplace_back(PartitionCommand::dropPartition(params.partition, params.detach));
}
else if (params.type == ASTAlterQuery::ATTACH_PARTITION)
{
out_partition_commands.emplace_back(PartitionCommand::attachPartition(params.partition, params.part));
}
else if (params.type == ASTAlterQuery::REPLACE_PARTITION)
{
out_partition_commands.emplace_back(
PartitionCommand::replacePartition(params.partition, params.replace, params.from_database, params.from_table));
}
else if (params.type == ASTAlterQuery::FETCH_PARTITION)
{
out_partition_commands.emplace_back(PartitionCommand::fetchPartition(params.partition, params.from));
}
else if (params.type == ASTAlterQuery::FREEZE_PARTITION)
{
out_partition_commands.emplace_back(PartitionCommand::freezePartition(params.partition, params.with_name));
}
else if (params.type == ASTAlterQuery::DELETE)
{
out_mutation_commands.commands.emplace_back(MutationCommand::delete_(params.predicate));
}
else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
}
}
void InterpreterAlterQuery::PartitionCommands::validate(const IStorage & table)
{
for (const PartitionCommand & command : *this)
{
if (command.type == PartitionCommand::CLEAR_COLUMN)
{
String column_name = command.column_name.safeGet<String>();
if (!table.getColumns().hasPhysical(column_name))
{
throw Exception("Wrong column name. Cannot find column " + column_name + " to clear it from partition",
DB::ErrorCodes::ILLEGAL_COLUMN);
}
}
}
}
} }

View File

@ -3,6 +3,7 @@
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Storages/AlterCommands.h> #include <Storages/AlterCommands.h>
#include <Storages/MutationCommands.h> #include <Storages/MutationCommands.h>
#include <Storages/PartitionCommands.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/IInterpreter.h> #include <Interpreters/IInterpreter.h>
#include <Parsers/ASTAlterQuery.h> #include <Parsers/ASTAlterQuery.h>
@ -22,111 +23,9 @@ public:
BlockIO execute() override; BlockIO execute() override;
private: private:
struct PartitionCommand
{
enum Type
{
DROP_PARTITION,
ATTACH_PARTITION,
REPLACE_PARTITION,
FETCH_PARTITION,
FREEZE_PARTITION,
CLEAR_COLUMN,
};
Type type;
ASTPtr partition;
Field column_name;
/// true for DETACH PARTITION.
bool detach = false;
/// true for ATTACH PART (and false for PARTITION)
bool part = false;
/// For ATTACH PARTITION partition FROM db.table
String from_database;
String from_table;
bool replace = true;
/// For FETCH PARTITION - path in ZK to the shard, from which to download the partition.
String from_zookeeper_path;
/// For FREEZE PARTITION
String with_name;
static PartitionCommand dropPartition(const ASTPtr & partition, bool detach)
{
PartitionCommand res;
res.type = DROP_PARTITION;
res.partition = partition;
res.detach = detach;
return res;
}
static PartitionCommand clearColumn(const ASTPtr & partition, const Field & column_name)
{
PartitionCommand res;
res.type = CLEAR_COLUMN;
res.partition = partition;
res.column_name = column_name;
return res;
}
static PartitionCommand attachPartition(const ASTPtr & partition, bool part)
{
PartitionCommand res;
res.type = ATTACH_PARTITION;
res.partition = partition;
res.part = part;
return res;
}
static PartitionCommand replacePartition(const ASTPtr & partition, bool replace, const String & from_database, const String & from_table)
{
PartitionCommand res;
res.type = REPLACE_PARTITION;
res.partition = partition;
res.replace = replace;
res.from_database = from_database;
res.from_table = from_table;
return res;
}
static PartitionCommand fetchPartition(const ASTPtr & partition, const String & from)
{
PartitionCommand res;
res.type = FETCH_PARTITION;
res.partition = partition;
res.from_zookeeper_path = from;
return res;
}
static PartitionCommand freezePartition(const ASTPtr & partition, const String & with_name)
{
PartitionCommand res;
res.type = FREEZE_PARTITION;
res.partition = partition;
res.with_name = with_name;
return res;
}
};
class PartitionCommands : public std::vector<PartitionCommand>
{
public:
void validate(const IStorage & table);
};
ASTPtr query_ptr; ASTPtr query_ptr;
const Context & context; const Context & context;
static void parseAlter(const ASTAlterQuery::ParameterContainer & params,
AlterCommands & out_alter_commands,
PartitionCommands & out_partition_commands,
MutationCommands & out_mutation_commands);
}; };
} }

View File

@ -10,45 +10,172 @@ namespace ErrorCodes
extern const int UNEXPECTED_AST_STRUCTURE; extern const int UNEXPECTED_AST_STRUCTURE;
} }
ASTAlterQuery::Parameters::Parameters() {} ASTPtr ASTAlterCommand::clone() const
void ASTAlterQuery::Parameters::clone(Parameters & p) const
{ {
p = *this; auto res = std::make_shared<ASTAlterCommand>(*this);
if (col_decl) p.col_decl = col_decl->clone(); res->children.clear();
if (column) p.column = column->clone();
if (primary_key) p.primary_key = primary_key->clone(); if (col_decl)
if (partition) p.partition = partition->clone(); {
if (predicate) p.predicate = predicate->clone(); res->col_decl = col_decl->clone();
res->children.push_back(res->col_decl);
}
if (column)
{
res->column = column->clone();
res->children.push_back(res->column);
}
if (primary_key)
{
res->primary_key = primary_key->clone();
res->children.push_back(res->primary_key);
}
if (partition)
{
res->partition = partition->clone();
res->children.push_back(res->partition);
}
if (predicate)
{
res->predicate = predicate->clone();
res->children.push_back(res->predicate);
}
return res;
} }
void ASTAlterQuery::addParameters(const Parameters & params) void ASTAlterCommand::formatImpl(
const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{ {
parameters.push_back(params); std::string indent_str = settings.one_line ? "" : std::string(4 * frame.indent, ' ');
if (params.col_decl)
children.push_back(params.col_decl); if (type == ASTAlterCommand::ADD_COLUMN)
if (params.column) {
children.push_back(params.column); settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ADD COLUMN " << (settings.hilite ? hilite_none : "");
if (params.partition) col_decl->formatImpl(settings, state, frame);
children.push_back(params.partition);
if (params.primary_key) /// AFTER
children.push_back(params.primary_key); if (column)
if (params.predicate) {
children.push_back(params.predicate); settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << " AFTER " << (settings.hilite ? hilite_none : "");
column->formatImpl(settings, state, frame);
}
}
else if (type == ASTAlterCommand::DROP_COLUMN)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str
<< (clear_column ? "CLEAR " : "DROP ") << "COLUMN " << (settings.hilite ? hilite_none : "");
column->formatImpl(settings, state, frame);
if (partition)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str<< " IN PARTITION " << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
}
else if (type == ASTAlterCommand::MODIFY_COLUMN)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY COLUMN " << (settings.hilite ? hilite_none : "");
col_decl->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::MODIFY_PRIMARY_KEY)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY PRIMARY KEY " << (settings.hilite ? hilite_none : "");
settings.ostr << "(";
primary_key->formatImpl(settings, state, frame);
settings.ostr << ")";
}
else if (type == ASTAlterCommand::DROP_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << (detach ? "DETACH" : "DROP") << " PARTITION "
<< (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::ATTACH_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ATTACH "
<< (part ? "PART " : "PARTITION ") << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::REPLACE_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << (replace ? "REPLACE" : "ATTACH") << " PARTITION "
<< (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM " << (settings.hilite ? hilite_none : "");
if (!from_database.empty())
{
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(from_database)
<< (settings.hilite ? hilite_none : "") << ".";
}
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(from_table) << (settings.hilite ? hilite_none : "");
}
else if (type == ASTAlterCommand::FETCH_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "FETCH "
<< "PARTITION " << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
settings.ostr << (settings.hilite ? hilite_keyword : "")
<< " FROM " << (settings.hilite ? hilite_none : "") << std::quoted(from, '\'');
}
else if (type == ASTAlterCommand::FREEZE_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "FREEZE PARTITION " << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
if (!with_name.empty())
{
settings.ostr << " " << (settings.hilite ? hilite_keyword : "") << "WITH NAME" << (settings.hilite ? hilite_none : "")
<< " " << std::quoted(with_name, '\'');
}
}
else if (type == ASTAlterCommand::DELETE)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "DELETE WHERE " << (settings.hilite ? hilite_none : "");
predicate->formatImpl(settings, state, frame);
}
else
throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
} }
ASTPtr ASTAlterCommandList::clone() const
{
auto res = std::make_shared<ASTAlterCommandList>();
for (ASTAlterCommand * command : commands)
res->add(command->clone());
return res;
}
void ASTAlterCommandList::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
std::string indent_str = settings.one_line ? "" : std::string(4 * frame.indent, ' ');
for (size_t i = 0; i < commands.size(); ++i)
{
static_cast<IAST *>(commands[i])->formatImpl(settings, state, frame);
std::string comma = (i < (commands.size() - 1)) ? "," : "";
settings.ostr << (settings.hilite ? hilite_keyword : "") << comma << (settings.hilite ? hilite_none : "");
settings.ostr << settings.nl_or_ws;
}
}
/** Get the text that identifies this element. */ /** Get the text that identifies this element. */
String ASTAlterQuery::getID() const String ASTAlterQuery::getID() const
{ {
return ("AlterQuery_" + database + "_" + table); return "AlterQuery_" + database + "_" + table;
} }
ASTPtr ASTAlterQuery::clone() const ASTPtr ASTAlterQuery::clone() const
{ {
auto res = std::make_shared<ASTAlterQuery>(*this); auto res = std::make_shared<ASTAlterQuery>(*this);
for (ParameterContainer::size_type i = 0; i < parameters.size(); ++i) res->children.clear();
parameters[i].clone(res->parameters[i]);
cloneOutputOptions(*res); if (command_list)
res->set(res->command_list, command_list->clone());
return res; return res;
} }
@ -84,102 +211,10 @@ void ASTAlterQuery::formatQueryImpl(const FormatSettings & settings, FormatState
formatOnCluster(settings); formatOnCluster(settings);
settings.ostr << settings.nl_or_ws; settings.ostr << settings.nl_or_ws;
for (size_t i = 0; i < parameters.size(); ++i) FormatStateStacked frame_nested = frame;
{ frame_nested.need_parens = false;
const ASTAlterQuery::Parameters & p = parameters[i]; ++frame_nested.indent;
static_cast<IAST *>(command_list)->formatImpl(settings, state, frame_nested);
if (p.type == ASTAlterQuery::ADD_COLUMN)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ADD COLUMN " << (settings.hilite ? hilite_none : "");
p.col_decl->formatImpl(settings, state, frame);
/// AFTER
if (p.column)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << " AFTER " << (settings.hilite ? hilite_none : "");
p.column->formatImpl(settings, state, frame);
}
}
else if (p.type == ASTAlterQuery::DROP_COLUMN)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str
<< (p.clear_column ? "CLEAR " : "DROP ") << "COLUMN " << (settings.hilite ? hilite_none : "");
p.column->formatImpl(settings, state, frame);
if (p.partition)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str<< " IN PARTITION " << (settings.hilite ? hilite_none : "");
p.partition->formatImpl(settings, state, frame);
}
}
else if (p.type == ASTAlterQuery::MODIFY_COLUMN)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY COLUMN " << (settings.hilite ? hilite_none : "");
p.col_decl->formatImpl(settings, state, frame);
}
else if (p.type == ASTAlterQuery::MODIFY_PRIMARY_KEY)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY PRIMARY KEY " << (settings.hilite ? hilite_none : "");
settings.ostr << "(";
p.primary_key->formatImpl(settings, state, frame);
settings.ostr << ")";
}
else if (p.type == ASTAlterQuery::DROP_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << (p.detach ? "DETACH" : "DROP") << " PARTITION "
<< (settings.hilite ? hilite_none : "");
p.partition->formatImpl(settings, state, frame);
}
else if (p.type == ASTAlterQuery::ATTACH_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ATTACH "
<< (p.part ? "PART " : "PARTITION ") << (settings.hilite ? hilite_none : "");
p.partition->formatImpl(settings, state, frame);
}
else if (p.type == ASTAlterQuery::REPLACE_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << (p.replace ? "REPLACE" : "ATTACH") << " PARTITION "
<< (settings.hilite ? hilite_none : "");
p.partition->formatImpl(settings, state, frame);
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM " << (settings.hilite ? hilite_none : "");
if (!p.from_database.empty())
{
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(p.from_database)
<< (settings.hilite ? hilite_none : "") << ".";
}
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(p.from_table) << (settings.hilite ? hilite_none : "");
}
else if (p.type == ASTAlterQuery::FETCH_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "FETCH "
<< "PARTITION " << (settings.hilite ? hilite_none : "");
p.partition->formatImpl(settings, state, frame);
settings.ostr << (settings.hilite ? hilite_keyword : "")
<< " FROM " << (settings.hilite ? hilite_none : "") << std::quoted(p.from, '\'');
}
else if (p.type == ASTAlterQuery::FREEZE_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "FREEZE PARTITION " << (settings.hilite ? hilite_none : "");
p.partition->formatImpl(settings, state, frame);
if (!p.with_name.empty())
{
settings.ostr << " " << (settings.hilite ? hilite_keyword : "") << "WITH NAME" << (settings.hilite ? hilite_none : "")
<< " " << std::quoted(p.with_name, '\'');
}
}
else if (p.type == ASTAlterQuery::DELETE)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "DELETE WHERE " << (settings.hilite ? hilite_none : "");
p.predicate->formatImpl(settings, state, frame);
}
else
throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
std::string comma = (i < (parameters.size() -1) ) ? "," : "";
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << comma << (settings.hilite ? hilite_none : "");
settings.ostr << settings.nl_or_ws;
}
} }
} }

View File

@ -16,10 +16,10 @@ namespace DB
* DROP PARTITION partition, * DROP PARTITION partition,
*/ */
class ASTAlterQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster class ASTAlterCommand : public IAST
{ {
public: public:
enum ParameterType enum Type
{ {
ADD_COLUMN, ADD_COLUMN,
DROP_COLUMN, DROP_COLUMN,
@ -37,70 +37,86 @@ public:
NO_TYPE, NO_TYPE,
}; };
struct Parameters Type type = NO_TYPE;
/** The ADD COLUMN query stores the name and type of the column to add
* This field is not used in the DROP query
* In MODIFY query, the column name and the new type are stored here
*/
ASTPtr col_decl;
/** The ADD COLUMN query here optionally stores the name of the column following AFTER
* The DROP query stores the column name for deletion here
*/
ASTPtr column;
/** For MODIFY PRIMARY KEY
*/
ASTPtr primary_key;
/** Used in DROP PARTITION and ATTACH PARTITION FROM queries.
* The value or ID of the partition is stored here.
*/
ASTPtr partition;
/// For DELETE WHERE: the predicate that filters the rows to delete.
ASTPtr predicate;
bool detach = false; /// true for DETACH PARTITION
bool part = false; /// true for ATTACH PART
bool clear_column = false; /// for CLEAR COLUMN (do not drop column from metadata)
/** For FETCH PARTITION - the path in ZK to the shard, from which to download the partition.
*/
String from;
/** For FREEZE PARTITION - place local backup to directory with specified name.
*/
String with_name;
/// REPLACE(ATTACH) PARTITION partition FROM db.table
String from_database;
String from_table;
/// To distinguish REPLACE and ATTACH PARTITION partition FROM db.table
bool replace = true;
String getID() const override { return "AlterCommand_" + std::to_string(static_cast<int>(type)); }
ASTPtr clone() const override;
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
class ASTAlterCommandList : public IAST
{
public:
std::vector<ASTAlterCommand *> commands;
void add(const ASTPtr & command)
{ {
Parameters(); commands.push_back(static_cast<ASTAlterCommand *>(command.get()));
children.push_back(command);
}
int type = NO_TYPE; String getID() const override { return "AlterCommandList"; }
/** The ADD COLUMN query stores the name and type of the column to add ASTPtr clone() const override;
* This field is not used in the DROP query
* In MODIFY query, the column name and the new type are stored here
*/
ASTPtr col_decl;
/** The ADD COLUMN query here optionally stores the name of the column following AFTER protected:
* The DROP query stores the column name for deletion here void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
*/ };
ASTPtr column;
/** For MODIFY PRIMARY KEY class ASTAlterQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster
*/ {
ASTPtr primary_key; public:
/** Used in DROP PARTITION, RESHARD PARTITION and ATTACH PARTITION FROM queries.
* The value or ID of the partition is stored here.
*/
ASTPtr partition;
/// For DELETE WHERE: the predicate that filters the rows to delete.
ASTPtr predicate;
bool detach = false; /// true for DETACH PARTITION
bool part = false; /// true for ATTACH PART
bool do_copy = false; /// for RESHARD PARTITION
bool clear_column = false; /// for CLEAR COLUMN (do not drop column from metadata)
/** For FETCH PARTITION - the path in ZK to the shard, from which to download the partition.
*/
String from;
/** For FREEZE PARTITION - place local backup to directory with specified name.
*/
String with_name;
/// REPLACE(ATTACH) PARTITION partition FROM db.table
String from_database;
String from_table;
/// To distinguish REPLACE and ATTACH PARTITION partition FROM db.table
bool replace = true;
/// deep copy
void clone(Parameters & p) const;
};
using ParameterContainer = std::vector<Parameters>;
ParameterContainer parameters;
String database; String database;
String table; String table;
ASTAlterCommandList * command_list = nullptr;
void addParameters(const Parameters & params);
/** Get the text that identifies this element. */
String getID() const override; String getID() const override;
ASTPtr clone() const override; ASTPtr clone() const override;

View File

@ -12,9 +12,11 @@
namespace DB namespace DB
{ {
bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{ {
ParserKeyword s_alter_table("ALTER TABLE"); auto command = std::make_shared<ASTAlterCommand>();
node = command;
ParserKeyword s_add_column("ADD COLUMN"); ParserKeyword s_add_column("ADD COLUMN");
ParserKeyword s_drop_column("DROP COLUMN"); ParserKeyword s_drop_column("DROP COLUMN");
ParserKeyword s_clear_column("CLEAR COLUMN"); ParserKeyword s_clear_column("CLEAR COLUMN");
@ -37,207 +39,233 @@ bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_delete_where("DELETE WHERE"); ParserKeyword s_delete_where("DELETE WHERE");
ParserToken s_dot(TokenType::Dot);
ParserToken s_comma(TokenType::Comma);
ParserIdentifier table_parser;
ParserCompoundIdentifier parser_name; ParserCompoundIdentifier parser_name;
ParserStringLiteral parser_string_literal;
ParserCompoundColumnDeclaration parser_col_decl; ParserCompoundColumnDeclaration parser_col_decl;
ParserPartition parser_partition; ParserPartition parser_partition;
ParserStringLiteral parser_string_literal;
ParserExpression exp_elem; ParserExpression exp_elem;
String cluster_str; if (s_add_column.ignore(pos, expected))
ASTPtr col_type; {
ASTPtr col_after; if (!parser_col_decl.parse(pos, command->col_decl, expected))
ASTPtr col_drop; return false;
if (s_after.ignore(pos, expected))
{
if (!parser_name.parse(pos, command->column, expected))
return false;
}
command->type = ASTAlterCommand::ADD_COLUMN;
}
else if (s_drop_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))
return false;
command->type = ASTAlterCommand::DROP_PARTITION;
}
else if (s_drop_column.ignore(pos, expected))
{
if (!parser_name.parse(pos, command->column, expected))
return false;
command->type = ASTAlterCommand::DROP_COLUMN;
command->detach = false;
}
else if (s_clear_column.ignore(pos, expected))
{
if (!parser_name.parse(pos, command->column, expected))
return false;
command->type = ASTAlterCommand::DROP_COLUMN;
command->clear_column = true;
command->detach = false;
if (s_in_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))
return false;
}
}
else if (s_detach_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))
return false;
command->type = ASTAlterCommand::DROP_PARTITION;
command->detach = true;
}
else if (s_attach_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))
return false;
if (s_from.ignore(pos))
{
if (!parseDatabaseAndTableName(pos, expected, command->from_database, command->from_table))
return false;
command->replace = false;
command->type = ASTAlterCommand::REPLACE_PARTITION;
}
else
{
command->type = ASTAlterCommand::ATTACH_PARTITION;
}
}
else if (ParserKeyword{"REPLACE PARTITION"}.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))
return false;
if (!s_from.ignore(pos, expected))
return false;
if (!parseDatabaseAndTableName(pos, expected, command->from_database, command->from_table))
return false;
command->replace = true;
command->type = ASTAlterCommand::REPLACE_PARTITION;
}
else if (s_attach_part.ignore(pos, expected))
{
if (!parser_string_literal.parse(pos, command->partition, expected))
return false;
command->part = true;
command->type = ASTAlterCommand::ATTACH_PARTITION;
}
else if (s_fetch_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))
return false;
if (!s_from.ignore(pos, expected))
return false;
ASTPtr ast_from;
if (!parser_string_literal.parse(pos, ast_from, expected))
return false;
command->from = typeid_cast<const ASTLiteral &>(*ast_from).value.get<const String &>();
command->type = ASTAlterCommand::FETCH_PARTITION;
}
else if (s_freeze_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))
return false;
/// WITH NAME 'name' - place local backup to directory with specified name
if (s_with.ignore(pos, expected))
{
if (!s_name.ignore(pos, expected))
return false;
ASTPtr ast_with_name;
if (!parser_string_literal.parse(pos, ast_with_name, expected))
return false;
command->with_name = typeid_cast<const ASTLiteral &>(*ast_with_name).value.get<const String &>();
}
command->type = ASTAlterCommand::FREEZE_PARTITION;
}
else if (s_modify_column.ignore(pos, expected))
{
if (!parser_col_decl.parse(pos, command->col_decl, expected))
return false;
command->type = ASTAlterCommand::MODIFY_COLUMN;
}
else if (s_modify_primary_key.ignore(pos, expected))
{
if (pos->type != TokenType::OpeningRoundBracket)
return false;
++pos;
if (!ParserNotEmptyExpressionList(false).parse(pos, command->primary_key, expected))
return false;
if (pos->type != TokenType::ClosingRoundBracket)
return false;
++pos;
command->type = ASTAlterCommand::MODIFY_PRIMARY_KEY;
}
else if (s_delete_where.ignore(pos, expected))
{
if (!exp_elem.parse(pos, command->predicate, expected))
return false;
command->type = ASTAlterCommand::DELETE;
}
else
return false;
if (command->col_decl)
command->children.push_back(command->col_decl);
if (command->column)
command->children.push_back(command->column);
if (command->primary_key)
command->children.push_back(command->primary_key);
if (command->partition)
command->children.push_back(command->partition);
if (command->predicate)
command->children.push_back(command->predicate);
return true;
}
bool ParserAlterCommandList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto command_list = std::make_shared<ASTAlterCommandList>();
node = command_list;
ParserToken s_comma(TokenType::Comma);
ParserAlterCommand p_command;
do
{
ASTPtr command;
if (!p_command.parse(pos, command, expected))
return false;
command_list->add(command);
}
while (s_comma.ignore(pos, expected));
return true;
}
bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto query = std::make_shared<ASTAlterQuery>(); auto query = std::make_shared<ASTAlterQuery>();
node = query;
ParserKeyword s_alter_table("ALTER TABLE");
if (!s_alter_table.ignore(pos, expected)) if (!s_alter_table.ignore(pos, expected))
return false; return false;
if (!parseDatabaseAndTableName(pos, expected, query->database, query->table)) if (!parseDatabaseAndTableName(pos, expected, query->database, query->table))
return false; return false;
String cluster_str;
if (ParserKeyword{"ON"}.ignore(pos, expected)) if (ParserKeyword{"ON"}.ignore(pos, expected))
{ {
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected)) if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false; return false;
} }
bool parsing_finished = false;
do
{
ASTAlterQuery::Parameters params;
if (s_add_column.ignore(pos, expected))
{
if (!parser_col_decl.parse(pos, params.col_decl, expected))
return false;
if (s_after.ignore(pos, expected))
{
if (!parser_name.parse(pos, params.column, expected))
return false;
}
params.type = ASTAlterQuery::ADD_COLUMN;
}
else if (s_drop_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, params.partition, expected))
return false;
params.type = ASTAlterQuery::DROP_PARTITION;
}
else if (s_drop_column.ignore(pos, expected))
{
if (!parser_name.parse(pos, params.column, expected))
return false;
params.type = ASTAlterQuery::DROP_COLUMN;
params.detach = false;
}
else if (s_clear_column.ignore(pos, expected))
{
if (!parser_name.parse(pos, params.column, expected))
return false;
params.type = ASTAlterQuery::DROP_COLUMN;
params.clear_column = true;
params.detach = false;
if (s_in_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, params.partition, expected))
return false;
}
}
else if (s_detach_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, params.partition, expected))
return false;
params.type = ASTAlterQuery::DROP_PARTITION;
params.detach = true;
}
else if (s_attach_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, params.partition, expected))
return false;
if (s_from.ignore(pos))
{
if (!parseDatabaseAndTableName(pos, expected, params.from_database, params.from_table))
return false;
params.replace = false;
params.type = ASTAlterQuery::REPLACE_PARTITION;
}
else
{
params.type = ASTAlterQuery::ATTACH_PARTITION;
}
}
else if (ParserKeyword{"REPLACE PARTITION"}.ignore(pos, expected))
{
if (!parser_partition.parse(pos, params.partition, expected))
return false;
if (!s_from.ignore(pos, expected))
return false;
if (!parseDatabaseAndTableName(pos, expected, params.from_database, params.from_table))
return false;
params.replace = true;
params.type = ASTAlterQuery::REPLACE_PARTITION;
}
else if (s_attach_part.ignore(pos, expected))
{
if (!parser_string_literal.parse(pos, params.partition, expected))
return false;
params.part = true;
params.type = ASTAlterQuery::ATTACH_PARTITION;
}
else if (s_fetch_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, params.partition, expected))
return false;
if (!s_from.ignore(pos, expected))
return false;
ASTPtr ast_from;
if (!parser_string_literal.parse(pos, ast_from, expected))
return false;
params.from = typeid_cast<const ASTLiteral &>(*ast_from).value.get<const String &>();
params.type = ASTAlterQuery::FETCH_PARTITION;
}
else if (s_freeze_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, params.partition, expected))
return false;
/// WITH NAME 'name' - place local backup to directory with specified name
if (s_with.ignore(pos, expected))
{
if (!s_name.ignore(pos, expected))
return false;
ASTPtr ast_with_name;
if (!parser_string_literal.parse(pos, ast_with_name, expected))
return false;
params.with_name = typeid_cast<const ASTLiteral &>(*ast_with_name).value.get<const String &>();
}
params.type = ASTAlterQuery::FREEZE_PARTITION;
}
else if (s_modify_column.ignore(pos, expected))
{
if (!parser_col_decl.parse(pos, params.col_decl, expected))
return false;
params.type = ASTAlterQuery::MODIFY_COLUMN;
}
else if (s_modify_primary_key.ignore(pos, expected))
{
if (pos->type != TokenType::OpeningRoundBracket)
return false;
++pos;
if (!ParserNotEmptyExpressionList(false).parse(pos, params.primary_key, expected))
return false;
if (pos->type != TokenType::ClosingRoundBracket)
return false;
++pos;
params.type = ASTAlterQuery::MODIFY_PRIMARY_KEY;
}
else if (s_delete_where.ignore(pos, expected))
{
if (!exp_elem.parse(pos, params.predicate, expected))
return false;
params.type = ASTAlterQuery::DELETE;
}
else
return false;
if (!s_comma.ignore(pos, expected))
parsing_finished = true;
query->addParameters(params);
}
while (!parsing_finished);
query->cluster = cluster_str; query->cluster = cluster_str;
node = query;
ParserAlterCommandList p_command_list;
ASTPtr command_list;
if (!p_command_list.parse(pos, command_list, expected))
return false;
query->set(query->command_list, command_list);
return true; return true;
} }

View File

@ -18,6 +18,23 @@ namespace DB
* [FREEZE PARTITION] * [FREEZE PARTITION]
* [DELETE WHERE ...] * [DELETE WHERE ...]
*/ */
class ParserAlterCommand : public IParserBase
{
protected:
const char * getName() const { return "ALTER command"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
};
class ParserAlterCommandList : public IParserBase
{
protected:
const char * getName() const { return "a list of ALTER commands"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
};
class ParserAlterQuery : public IParserBase class ParserAlterQuery : public IParserBase
{ {
protected: protected:

View File

@ -1,5 +1,6 @@
#include <Storages/AlterCommands.h> #include <Storages/AlterCommands.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/NestedUtils.h> #include <DataTypes/NestedUtils.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
@ -9,6 +10,9 @@
#include <Parsers/ASTExpressionList.h> #include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Common/typeid_cast.h>
namespace DB namespace DB
@ -21,6 +25,83 @@ namespace ErrorCodes
} }
std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_ast)
{
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
if (command_ast->type == ASTAlterCommand::ADD_COLUMN)
{
AlterCommand command;
command.type = AlterCommand::ADD_COLUMN;
const auto & ast_col_decl = typeid_cast<const ASTColumnDeclaration &>(*command_ast->col_decl);
command.column_name = ast_col_decl.name;
if (ast_col_decl.type)
{
command.data_type = data_type_factory.get(ast_col_decl.type);
}
if (ast_col_decl.default_expression)
{
command.default_kind = columnDefaultKindFromString(ast_col_decl.default_specifier);
command.default_expression = ast_col_decl.default_expression;
}
if (command_ast->column)
command.after_column = typeid_cast<const ASTIdentifier &>(*command_ast->column).name;
return command;
}
else if (command_ast->type == ASTAlterCommand::DROP_COLUMN && !command_ast->partition)
{
if (command_ast->clear_column)
throw Exception("\"ALTER TABLE table CLEAR COLUMN column\" queries are not supported yet. Use \"CLEAR COLUMN column IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED);
AlterCommand command;
command.type = AlterCommand::DROP_COLUMN;
command.column_name = typeid_cast<const ASTIdentifier &>(*(command_ast->column)).name;
return command;
}
else if (command_ast->type == ASTAlterCommand::MODIFY_COLUMN)
{
AlterCommand command;
command.type = AlterCommand::MODIFY_COLUMN;
const auto & ast_col_decl = typeid_cast<const ASTColumnDeclaration &>(*command_ast->col_decl);
command.column_name = ast_col_decl.name;
if (ast_col_decl.type)
{
command.data_type = data_type_factory.get(ast_col_decl.type);
}
if (ast_col_decl.default_expression)
{
command.default_kind = columnDefaultKindFromString(ast_col_decl.default_specifier);
command.default_expression = ast_col_decl.default_expression;
}
return command;
}
else if (command_ast->type == ASTAlterCommand::MODIFY_PRIMARY_KEY)
{
AlterCommand command;
command.type = AlterCommand::MODIFY_PRIMARY_KEY;
command.primary_key = command_ast->primary_key;
return command;
}
else
return {};
}
/// the names are the same if they match the whole name or name_without_dot matches the part of the name up to the dot
static bool namesEqual(const String & name_without_dot, const DB::NameAndTypePair & name_type)
{
String name_with_dot = name_without_dot + ".";
return (name_with_dot == name_type.name.substr(0, name_without_dot.length() + 1) || name_without_dot == name_type.name);
}
void AlterCommand::apply(ColumnsDescription & columns_description) const void AlterCommand::apply(ColumnsDescription & columns_description) const
{ {
if (type == ADD_COLUMN) if (type == ADD_COLUMN)
@ -187,7 +268,7 @@ void AlterCommands::validate(const IStorage & table, const Context & context)
{ {
const auto & column_name = command.column_name; const auto & column_name = command.column_name;
const auto column_it = std::find_if(std::begin(all_columns), std::end(all_columns), const auto column_it = std::find_if(std::begin(all_columns), std::end(all_columns),
std::bind(AlterCommand::namesEqual, std::cref(command.column_name), std::placeholders::_1)); std::bind(namesEqual, std::cref(command.column_name), std::placeholders::_1));
if (command.type == AlterCommand::ADD_COLUMN) if (command.type == AlterCommand::ADD_COLUMN)
{ {
@ -251,7 +332,7 @@ void AlterCommands::validate(const IStorage & table, const Context & context)
auto found = false; auto found = false;
for (auto it = std::begin(all_columns); it != std::end(all_columns);) for (auto it = std::begin(all_columns); it != std::end(all_columns);)
{ {
if (AlterCommand::namesEqual(command.column_name, *it)) if (namesEqual(command.column_name, *it))
{ {
found = true; found = true;
it = all_columns.erase(it); it = all_columns.erase(it);
@ -262,7 +343,7 @@ void AlterCommands::validate(const IStorage & table, const Context & context)
for (auto it = std::begin(defaults); it != std::end(defaults);) for (auto it = std::begin(defaults); it != std::end(defaults);)
{ {
if (AlterCommand::namesEqual(command.column_name, { it->first, nullptr })) if (namesEqual(command.column_name, { it->first, nullptr }))
it = defaults.erase(it); it = defaults.erase(it);
else else
++it; ++it;
@ -280,7 +361,7 @@ void AlterCommands::validate(const IStorage & table, const Context & context)
{ {
const auto & column_name = col_def.first; const auto & column_name = col_def.first;
const auto column_it = std::find_if(all_columns.begin(), all_columns.end(), [&] (const NameAndTypePair & name_type) const auto column_it = std::find_if(all_columns.begin(), all_columns.end(), [&] (const NameAndTypePair & name_type)
{ return AlterCommand::namesEqual(column_name, name_type); }); { return namesEqual(column_name, name_type); });
const auto tmp_column_name = column_name + "_tmp"; const auto tmp_column_name = column_name + "_tmp";
const auto & column_type_ptr = column_it->type; const auto & column_type_ptr = column_it->type;

View File

@ -6,6 +6,8 @@
namespace DB namespace DB
{ {
class ASTAlterCommand;
/// Operation from the ALTER query (except for manipulation with PART/PARTITION). Adding Nested columns is not expanded to add individual columns. /// Operation from the ALTER query (except for manipulation with PART/PARTITION). Adding Nested columns is not expanded to add individual columns.
struct AlterCommand struct AlterCommand
{ {
@ -36,15 +38,6 @@ struct AlterCommand
/// For MODIFY_PRIMARY_KEY /// For MODIFY_PRIMARY_KEY
ASTPtr primary_key; ASTPtr primary_key;
/// the names are the same if they match the whole name or name_without_dot matches the part of the name up to the dot
static bool namesEqual(const String & name_without_dot, const DB::NameAndTypePair & name_type)
{
String name_with_dot = name_without_dot + ".";
return (name_with_dot == name_type.name.substr(0, name_without_dot.length() + 1) || name_without_dot == name_type.name);
}
void apply(ColumnsDescription & columns_description) const;
AlterCommand() = default; AlterCommand() = default;
AlterCommand(const Type type, const String & column_name, const DataTypePtr & data_type, AlterCommand(const Type type, const String & column_name, const DataTypePtr & data_type,
const ColumnDefaultKind default_kind, const ASTPtr & default_expression, const ColumnDefaultKind default_kind, const ASTPtr & default_expression,
@ -52,6 +45,11 @@ struct AlterCommand
: type{type}, column_name{column_name}, data_type{data_type}, default_kind{default_kind}, : type{type}, column_name{column_name}, data_type{data_type}, default_kind{default_kind},
default_expression{default_expression}, after_column{after_column} default_expression{default_expression}, after_column{after_column}
{} {}
static std::optional<AlterCommand> parse(const ASTAlterCommand * command);
void apply(ColumnsDescription & columns_description) const;
}; };
class IStorage; class IStorage;

View File

@ -0,0 +1,18 @@
#pragma once
#include <Core/Types.h>
#include <map>
namespace DB
{
struct MergeTreeMutationStatus
{
String id;
String command;
time_t create_time = 0;
std::map<String, Int64> block_numbers;
};
}

View File

@ -1,12 +1,21 @@
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h> #include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
#include <Parsers/ParserAlterQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/formatAST.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <Common/typeid_cast.h>
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int UNKNOWN_MUTATION_COMMAND;
}
void ReplicatedMergeTreeMutationEntry::writeText(WriteBuffer & out) const void ReplicatedMergeTreeMutationEntry::writeText(WriteBuffer & out) const
{ {
out << "format version: 1\n" out << "format version: 1\n"
@ -21,8 +30,9 @@ void ReplicatedMergeTreeMutationEntry::writeText(WriteBuffer & out) const
out << partition_id << "\t" << number << "\n"; out << partition_id << "\t" << number << "\n";
} }
out << "mutation commands:\n"; std::stringstream commands_ss;
commands.writeText(out); formatAST(*commands.ast(), commands_ss, /* hilite = */ false, /* one_line = */ true);
out << "commands: " << escape << commands_ss.str();
} }
void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in) void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in)
@ -45,8 +55,20 @@ void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in)
block_numbers[partition_id] = number; block_numbers[partition_id] = number;
} }
in >> "mutation commands:\n"; String commands_str;
commands.readText(in); in >> "commands: " >> escape >> commands_str;
ParserAlterCommandList p_alter_commands;
auto commands_ast = parseQuery(
p_alter_commands, commands_str.data(), commands_str.data() + commands_str.length(), "mutation commands list", 0);
for (ASTAlterCommand * command_ast : typeid_cast<const ASTAlterCommandList &>(*commands_ast).commands)
{
auto command = MutationCommand::parse(command_ast);
if (!command)
throw Exception("Unknown mutation command type: " + DB::toString<int>(command_ast->type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
commands.push_back(std::move(*command));
}
} }
String ReplicatedMergeTreeMutationEntry::toString() const String ReplicatedMergeTreeMutationEntry::toString() const

View File

@ -25,8 +25,10 @@ struct ReplicatedMergeTreeMutationEntry
time_t create_time = 0; time_t create_time = 0;
String source_replica; String source_replica;
std::unordered_map<String, Int64> block_numbers; std::map<String, Int64> block_numbers;
MutationCommands commands; MutationCommands commands;
}; };
using ReplicatedMergeTreeMutationEntryPtr = std::shared_ptr<const ReplicatedMergeTreeMutationEntry>;
} }

View File

@ -195,7 +195,7 @@ void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
} }
void ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry) void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry)
{ {
auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name); auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name);
@ -445,7 +445,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, z
for (auto it = mutations_by_znode.begin(); it != mutations_by_znode.end(); ) for (auto it = mutations_by_znode.begin(); it != mutations_by_znode.end(); )
{ {
const ReplicatedMergeTreeMutationEntry & entry = it->second; const ReplicatedMergeTreeMutationEntry & entry = *it->second;
if (!entries_in_zk_set.count(entry.znode_name)) if (!entries_in_zk_set.count(entry.znode_name))
{ {
LOG_DEBUG(log, "Removing obsolete mutation " + entry.znode_name + " from local state."); LOG_DEBUG(log, "Removing obsolete mutation " + entry.znode_name + " from local state.");
@ -478,25 +478,23 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, z
for (const String & entry : entries_to_load) for (const String & entry : entries_to_load)
futures.emplace_back(zookeeper->asyncGet(zookeeper_path + "/mutations/" + entry)); futures.emplace_back(zookeeper->asyncGet(zookeeper_path + "/mutations/" + entry));
std::vector<ReplicatedMergeTreeMutationEntry> new_mutations; std::vector<ReplicatedMergeTreeMutationEntryPtr> new_mutations;
for (size_t i = 0; i < entries_to_load.size(); ++i) for (size_t i = 0; i < entries_to_load.size(); ++i)
{ {
new_mutations.push_back( new_mutations.push_back(std::make_shared<ReplicatedMergeTreeMutationEntry>(
ReplicatedMergeTreeMutationEntry::parse(futures[i].get().data, entries_to_load[i])); ReplicatedMergeTreeMutationEntry::parse(futures[i].get().data, entries_to_load[i])));
} }
{ {
std::lock_guard lock(target_state_mutex); std::lock_guard lock(target_state_mutex);
for (ReplicatedMergeTreeMutationEntry & entry : new_mutations) for (const ReplicatedMergeTreeMutationEntryPtr & entry : new_mutations)
{ {
String znode = entry.znode_name; mutations_by_znode.emplace(entry->znode_name, entry);
const ReplicatedMergeTreeMutationEntry & inserted_entry =
mutations_by_znode.emplace(znode, std::move(entry)).first->second;
for (const auto & partition_and_block_num : inserted_entry.block_numbers) for (const auto & partition_and_block_num : entry->block_numbers)
mutations_by_partition[partition_and_block_num.first].emplace( mutations_by_partition[partition_and_block_num.first].emplace(
partition_and_block_num.second, &inserted_entry); partition_and_block_num.second, entry);
} }
} }
@ -893,7 +891,7 @@ bool ReplicatedMergeTreeQueue::processEntry(
try try
{ {
if (func(entry)) if (func(entry))
remove(get_zookeeper(), entry); removeProcessedEntry(get_zookeeper(), entry);
} }
catch (...) catch (...)
{ {
@ -951,11 +949,11 @@ MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
else else
++end; ++end;
std::vector<MutationCommand> commands; MutationCommands commands;
for (auto it = begin; it != end; ++it) for (auto it = begin; it != end; ++it)
commands.insert(commands.end(), it->second->commands.commands.begin(), it->second->commands.commands.end()); commands.insert(commands.end(), it->second->commands.begin(), it->second->commands.end());
return MutationCommands{commands}; return commands;
} }
void ReplicatedMergeTreeQueue::disableMergesInRange(const String & part_name) void ReplicatedMergeTreeQueue::disableMergesInRange(const String & part_name)
@ -1045,6 +1043,33 @@ void ReplicatedMergeTreeQueue::getInsertTimes(time_t & out_min_unprocessed_inser
} }
std::vector<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getMutationsStatus() const
{
std::lock_guard lock(target_state_mutex);
std::vector<MergeTreeMutationStatus> result;
for (const auto & pair : mutations_by_znode)
{
const ReplicatedMergeTreeMutationEntry & entry = *pair.second;
for (const MutationCommand & command : entry.commands)
{
std::stringstream ss;
formatAST(*command.ast, ss, false, true);
result.push_back(MergeTreeMutationStatus
{
entry.znode_name,
ss.str(),
entry.create_time,
entry.block_numbers,
});
}
}
return result;
}
ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate( ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper) ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper)
: queue(queue_) : queue(queue_)
@ -1348,4 +1373,5 @@ String padIndex(Int64 index)
String index_str = toString(index); String index_str = toString(index);
return std::string(10 - index_str.size(), '0') + index_str; return std::string(10 - index_str.size(), '0') + index_str;
} }
} }

View File

@ -7,6 +7,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h> #include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
#include <Storages/MergeTree/ActiveDataPartSet.h> #include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Common/ZooKeeper/ZooKeeper.h> #include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/BackgroundSchedulePool.h> #include <Common/BackgroundSchedulePool.h>
@ -91,8 +92,8 @@ private:
/// mutations_by_partition is an index partition ID -> block ID -> mutation into this list. /// mutations_by_partition is an index partition ID -> block ID -> mutation into this list.
/// Note that mutations are updated in such a way that they are always more recent than /// Note that mutations are updated in such a way that they are always more recent than
/// log_pointer (see pullLogsToQueue()). /// log_pointer (see pullLogsToQueue()).
std::map<String, ReplicatedMergeTreeMutationEntry> mutations_by_znode; std::map<String, ReplicatedMergeTreeMutationEntryPtr> mutations_by_znode;
std::unordered_map<String, std::map<Int64, const ReplicatedMergeTreeMutationEntry *>> mutations_by_partition; std::unordered_map<String, std::map<Int64, ReplicatedMergeTreeMutationEntryPtr>> mutations_by_partition;
/// Provides only one simultaneous call to pullLogsToQueue. /// Provides only one simultaneous call to pullLogsToQueue.
@ -138,7 +139,7 @@ private:
std::lock_guard<std::mutex> & target_state_lock, std::lock_guard<std::mutex> & target_state_lock,
std::lock_guard<std::mutex> & queue_lock); std::lock_guard<std::mutex> & queue_lock);
void remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry); void removeProcessedEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry);
/** Can I now try this action. If not, you need to leave it in the queue and try another one. /** Can I now try this action. If not, you need to leave it in the queue and try another one.
* Called under the queue_mutex. * Called under the queue_mutex.
@ -304,6 +305,8 @@ public:
/// Get information about the insertion times. /// Get information about the insertion times.
void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const; void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const;
std::vector<MergeTreeMutationStatus> getMutationsStatus() const;
}; };
class ReplicatedMergeTreeMergePredicate class ReplicatedMergeTreeMergePredicate

View File

@ -11,64 +11,34 @@
namespace DB namespace DB
{ {
namespace ErrorCodes std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command)
{ {
extern const int UNKNOWN_MUTATION_COMMAND; if (command->type == ASTAlterCommand::DELETE)
}
static String typeToString(MutationCommand::Type type)
{
switch (type)
{ {
case MutationCommand::DELETE: return "DELETE"; MutationCommand res;
default: res.ast = command->ptr();
throw Exception("Bad mutation type: " + toString<int>(type), ErrorCodes::LOGICAL_ERROR); res.type = DELETE;
} res.predicate = command->predicate;
} return res;
void MutationCommand::writeText(WriteBuffer & out) const
{
out << typeToString(type) << "\n";
switch (type)
{
case MutationCommand::DELETE:
{
std::stringstream ss;
formatAST(*predicate, ss, /* hilite = */ false, /* one_line = */ true);
out << "predicate: " << escape << ss.str() << "\n";
break;
}
default:
throw Exception("Bad mutation type: " + toString<int>(type), ErrorCodes::LOGICAL_ERROR);
}
}
void MutationCommand::readText(ReadBuffer & in)
{
String type_str;
in >> type_str >> "\n";
if (type_str == "DELETE")
{
type = DELETE;
String predicate_str;
in >> "predicate: " >> escape >> predicate_str >> "\n";
ParserExpressionWithOptionalAlias p_expr(false);
predicate = parseQuery(
p_expr, predicate_str.data(), predicate_str.data() + predicate_str.length(), "mutation predicate", 0);
} }
else else
throw Exception("Unknown mutation command: `" + type_str + "'", ErrorCodes::UNKNOWN_MUTATION_COMMAND); return {};
} }
void MutationCommands::validate(const IStorage & table, const Context & context) std::shared_ptr<ASTAlterCommandList> MutationCommands::ast() const
{
auto res = std::make_shared<ASTAlterCommandList>();
for (const MutationCommand & command : *this)
res->add(command.ast->clone());
return res;
}
void MutationCommands::validate(const IStorage & table, const Context & context) const
{ {
auto all_columns = table.getColumns().getAll(); auto all_columns = table.getColumns().getAll();
for (const MutationCommand & command : commands) for (const MutationCommand & command : *this)
{ {
switch (command.type) switch (command.type)
{ {
@ -86,29 +56,4 @@ void MutationCommands::validate(const IStorage & table, const Context & context)
} }
} }
void MutationCommands::writeText(WriteBuffer & out) const
{
out << "format version: 1\n"
<< "count: " << commands.size() << "\n";
for (const MutationCommand & command : commands)
{
command.writeText(out);
}
}
void MutationCommands::readText(ReadBuffer & in)
{
in >> "format version: 1\n";
size_t count;
in >> "count: " >> count >> "\n";
for (size_t i = 0; i < count; ++i)
{
MutationCommand command;
command.readText(in);
commands.push_back(std::move(command));
}
}
} }

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
#include <Parsers/IAST.h> #include <Parsers/ASTAlterQuery.h>
#include <IO/WriteHelpers.h> #include <optional>
namespace DB namespace DB
@ -12,6 +12,8 @@ class Context;
struct MutationCommand struct MutationCommand
{ {
ASTPtr ast; /// The AST of the whole command
enum Type enum Type
{ {
EMPTY, /// Not used. EMPTY, /// Not used.
@ -22,26 +24,15 @@ struct MutationCommand
ASTPtr predicate; ASTPtr predicate;
static MutationCommand delete_(const ASTPtr & predicate) static std::optional<MutationCommand> parse(ASTAlterCommand * command);
{
MutationCommand res;
res.type = DELETE;
res.predicate = predicate;
return res;
}
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
}; };
struct MutationCommands class MutationCommands : public std::vector<MutationCommand>
{ {
std::vector<MutationCommand> commands; public:
std::shared_ptr<ASTAlterCommandList> ast() const;
void validate(const IStorage & table, const Context & context); void validate(const IStorage & table, const Context & context) const;
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
}; };
} }

View File

@ -0,0 +1,94 @@
#include <Storages/PartitionCommands.h>
#include <Storages/IStorage.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Common/typeid_cast.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int ILLEGAL_COLUMN;
}
std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand * command_ast)
{
if (command_ast->type == ASTAlterCommand::DROP_PARTITION)
{
PartitionCommand res;
res.type = DROP_PARTITION;
res.partition = command_ast->partition;
res.detach = command_ast->detach;
return res;
}
else if (command_ast->type == ASTAlterCommand::ATTACH_PARTITION)
{
PartitionCommand res;
res.type = ATTACH_PARTITION;
res.partition = command_ast->partition;
res.part = command_ast->part;
return res;
}
else if (command_ast->type == ASTAlterCommand::REPLACE_PARTITION)
{
PartitionCommand res;
res.type = REPLACE_PARTITION;
res.partition = command_ast->partition;
res.replace = command_ast->replace;
res.from_database = command_ast->from_database;
res.from_table = command_ast->from_table;
return res;
}
else if (command_ast->type == ASTAlterCommand::FETCH_PARTITION)
{
PartitionCommand res;
res.type = FETCH_PARTITION;
res.partition = command_ast->partition;
res.from_zookeeper_path = command_ast->from;
return res;
}
else if (command_ast->type == ASTAlterCommand::FREEZE_PARTITION)
{
PartitionCommand res;
res.type = FREEZE_PARTITION;
res.partition = command_ast->partition;
res.with_name = command_ast->with_name;
return res;
}
else if (command_ast->type == ASTAlterCommand::DROP_COLUMN && command_ast->partition)
{
if (!command_ast->clear_column)
throw Exception("Can't DROP COLUMN from partition. It is possible only CLEAR COLUMN in partition", ErrorCodes::BAD_ARGUMENTS);
PartitionCommand res;
res.type = CLEAR_COLUMN;
res.partition = command_ast->partition;
const Field & column_name = typeid_cast<const ASTIdentifier &>(*(command_ast->column)).name;
res.column_name = column_name;
return res;
}
else
return {};
}
void PartitionCommands::validate(const IStorage & table)
{
for (const PartitionCommand & command : *this)
{
if (command.type == PartitionCommand::CLEAR_COLUMN)
{
String column_name = command.column_name.safeGet<String>();
if (!table.getColumns().hasPhysical(column_name))
{
throw Exception("Wrong column name. Cannot find column " + column_name + " to clear it from partition",
DB::ErrorCodes::ILLEGAL_COLUMN);
}
}
}
}
}

View File

@ -0,0 +1,59 @@
#pragma once
#include <Core/Types.h>
#include <Core/Field.h>
#include <Parsers/IAST.h>
#include <vector>
namespace DB
{
class IStorage;
class ASTAlterCommand;
struct PartitionCommand
{
enum Type
{
DROP_PARTITION,
ATTACH_PARTITION,
REPLACE_PARTITION,
FETCH_PARTITION,
FREEZE_PARTITION,
CLEAR_COLUMN,
};
Type type;
ASTPtr partition;
Field column_name;
/// true for DETACH PARTITION.
bool detach = false;
/// true for ATTACH PART (and false for PARTITION)
bool part = false;
/// For ATTACH PARTITION partition FROM db.table
String from_database;
String from_table;
bool replace = true;
/// For FETCH PARTITION - path in ZK to the shard, from which to download the partition.
String from_zookeeper_path;
/// For FREEZE PARTITION
String with_name;
static std::optional<PartitionCommand> parse(const ASTAlterCommand * command);
};
class PartitionCommands : public std::vector<PartitionCommand>
{
public:
void validate(const IStorage & table);
};
}

View File

@ -1382,7 +1382,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
try try
{ {
new_part = merger_mutator.mutatePartToTemporaryPart(future_mutated_part, commands.commands, context); new_part = merger_mutator.mutatePartToTemporaryPart(future_mutated_part, commands, context);
data.renameTempPartAndReplace(new_part, nullptr, &transaction); data.renameTempPartAndReplace(new_part, nullptr, &transaction);
try try
@ -4049,6 +4049,11 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const
} }
} }
std::vector<MergeTreeMutationStatus> StorageReplicatedMergeTree::getMutationsStatus() const
{
return queue.getMutationsStatus();
}
void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK() void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
{ {

View File

@ -125,6 +125,8 @@ public:
void mutate(const MutationCommands & commands, const Context & context) override; void mutate(const MutationCommands & commands, const Context & context) override;
std::vector<MergeTreeMutationStatus> getMutationsStatus() const;
/** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper. /** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper.
*/ */
void drop() override; void drop() override;

View File

@ -0,0 +1,123 @@
#include <Storages/System/StorageSystemMutations.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeArray.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Databases/IDatabase.h>
namespace DB
{
StorageSystemMutations::StorageSystemMutations(const std::string & name_)
: name(name_)
{
setColumns(ColumnsDescription({
{ "database", std::make_shared<DataTypeString>() },
{ "table", std::make_shared<DataTypeString>() },
{ "mutation_id", std::make_shared<DataTypeString>() },
{ "command", std::make_shared<DataTypeString>() },
{ "create_time", std::make_shared<DataTypeDateTime>() },
{ "block_numbers.partition_id", std::make_shared<DataTypeArray>(
std::make_shared<DataTypeString>()) },
{ "block_numbers.number", std::make_shared<DataTypeArray>(
std::make_shared<DataTypeInt64>()) },
}));
}
BlockInputStreams StorageSystemMutations::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
/// Collect a set of replicated tables.
std::map<String, std::map<String, StoragePtr>> replicated_tables;
for (const auto & db : context.getDatabases())
for (auto iterator = db.second->getIterator(context); iterator->isValid(); iterator->next())
if (dynamic_cast<const StorageReplicatedMergeTree *>(iterator->table().get()))
replicated_tables[db.first][iterator->name()] = iterator->table();
MutableColumnPtr col_database_mut = ColumnString::create();
MutableColumnPtr col_table_mut = ColumnString::create();
for (auto & db : replicated_tables)
{
for (auto & table : db.second)
{
col_database_mut->insert(db.first);
col_table_mut->insert(table.first);
}
}
ColumnPtr col_database = std::move(col_database_mut);
ColumnPtr col_table = std::move(col_table_mut);
/// Determine what tables are needed by the conditions in the query.
{
Block filtered_block
{
{ col_database, std::make_shared<DataTypeString>(), "database" },
{ col_table, std::make_shared<DataTypeString>(), "table" },
};
VirtualColumnUtils::filterBlockWithQuery(query_info.query, filtered_block, context);
if (!filtered_block.rows())
return BlockInputStreams();
col_database = filtered_block.getByName("database").column;
col_table = filtered_block.getByName("table").column;
}
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
for (size_t i_storage = 0; i_storage < col_database->size(); ++i_storage)
{
auto database = (*col_database)[i_storage].safeGet<String>();
auto table = (*col_table)[i_storage].safeGet<String>();
std::vector<MergeTreeMutationStatus> states =
dynamic_cast<StorageReplicatedMergeTree &>(*replicated_tables[database][table])
.getMutationsStatus();
for (const MergeTreeMutationStatus & status : states)
{
Array block_partition_ids;
block_partition_ids.reserve(status.block_numbers.size());
Array block_numbers;
block_numbers.reserve(status.block_numbers.size());
for (const auto & pair : status.block_numbers)
{
block_partition_ids.emplace_back(pair.first);
block_numbers.emplace_back(pair.second);
}
size_t col_num = 0;
res_columns[col_num++]->insert(database);
res_columns[col_num++]->insert(table);
res_columns[col_num++]->insert(status.id);
res_columns[col_num++]->insert(status.command);
res_columns[col_num++]->insert(UInt64(status.create_time));
res_columns[col_num++]->insert(block_partition_ids);
res_columns[col_num++]->insert(block_numbers);
}
}
Block res = getSampleBlock().cloneEmpty();
for (size_t i_col = 0; i_col < res.columns(); ++i_col)
res.getByPosition(i_col).column = std::move(res_columns[i_col]);
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(res));
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
namespace DB
{
class Context;
/// Implements the `mutations` system table, which provides information about the status of mutations
/// in the MergeTree tables.
class StorageSystemMutations : public ext::shared_ptr_helper<StorageSystemMutations>, public IStorage
{
public:
String getName() const override { return "SystemMutations"; }
String getTableName() const override { return name; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
const String name;
protected:
StorageSystemMutations(const String & name_);
};
}

View File

@ -31,9 +31,11 @@ StorageSystemParts::StorageSystemParts(const std::string & name)
{"refcount", std::make_shared<DataTypeUInt32>()}, {"refcount", std::make_shared<DataTypeUInt32>()},
{"min_date", std::make_shared<DataTypeDate>()}, {"min_date", std::make_shared<DataTypeDate>()},
{"max_date", std::make_shared<DataTypeDate>()}, {"max_date", std::make_shared<DataTypeDate>()},
{"partition_id", std::make_shared<DataTypeString>()},
{"min_block_number", std::make_shared<DataTypeInt64>()}, {"min_block_number", std::make_shared<DataTypeInt64>()},
{"max_block_number", std::make_shared<DataTypeInt64>()}, {"max_block_number", std::make_shared<DataTypeInt64>()},
{"level", std::make_shared<DataTypeUInt32>()}, {"level", std::make_shared<DataTypeUInt32>()},
{"data_version", std::make_shared<DataTypeUInt64>()},
{"primary_key_bytes_in_memory", std::make_shared<DataTypeUInt64>()}, {"primary_key_bytes_in_memory", std::make_shared<DataTypeUInt64>()},
{"primary_key_bytes_in_memory_allocated", std::make_shared<DataTypeUInt64>()}, {"primary_key_bytes_in_memory_allocated", std::make_shared<DataTypeUInt64>()},
@ -80,9 +82,11 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns, const Stor
columns[i++]->insert(static_cast<UInt64>(part->getMinDate())); columns[i++]->insert(static_cast<UInt64>(part->getMinDate()));
columns[i++]->insert(static_cast<UInt64>(part->getMaxDate())); columns[i++]->insert(static_cast<UInt64>(part->getMaxDate()));
columns[i++]->insert(part->info.partition_id);
columns[i++]->insert(part->info.min_block); columns[i++]->insert(part->info.min_block);
columns[i++]->insert(part->info.max_block); columns[i++]->insert(part->info.max_block);
columns[i++]->insert(static_cast<UInt64>(part->info.level)); columns[i++]->insert(static_cast<UInt64>(part->info.level));
columns[i++]->insert(static_cast<UInt64>(part->info.getDataVersion()));
columns[i++]->insert(static_cast<UInt64>(part->getIndexSizeInBytes())); columns[i++]->insert(static_cast<UInt64>(part->getIndexSizeInBytes()));
columns[i++]->insert(static_cast<UInt64>(part->getIndexSizeInAllocatedBytes())); columns[i++]->insert(static_cast<UInt64>(part->getIndexSizeInAllocatedBytes()));

View File

@ -32,9 +32,11 @@ StorageSystemPartsColumns::StorageSystemPartsColumns(const std::string & name)
{"refcount", std::make_shared<DataTypeUInt32>()}, {"refcount", std::make_shared<DataTypeUInt32>()},
{"min_date", std::make_shared<DataTypeDate>()}, {"min_date", std::make_shared<DataTypeDate>()},
{"max_date", std::make_shared<DataTypeDate>()}, {"max_date", std::make_shared<DataTypeDate>()},
{"partition_id", std::make_shared<DataTypeString>()},
{"min_block_number", std::make_shared<DataTypeInt64>()}, {"min_block_number", std::make_shared<DataTypeInt64>()},
{"max_block_number", std::make_shared<DataTypeInt64>()}, {"max_block_number", std::make_shared<DataTypeInt64>()},
{"level", std::make_shared<DataTypeUInt32>()}, {"level", std::make_shared<DataTypeUInt32>()},
{"data_version", std::make_shared<DataTypeUInt64>()},
{"primary_key_bytes_in_memory", std::make_shared<DataTypeUInt64>()}, {"primary_key_bytes_in_memory", std::make_shared<DataTypeUInt64>()},
{"primary_key_bytes_in_memory_allocated", std::make_shared<DataTypeUInt64>()}, {"primary_key_bytes_in_memory_allocated", std::make_shared<DataTypeUInt64>()},
@ -123,9 +125,11 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns, con
columns[j++]->insert(static_cast<UInt64>(min_date)); columns[j++]->insert(static_cast<UInt64>(min_date));
columns[j++]->insert(static_cast<UInt64>(max_date)); columns[j++]->insert(static_cast<UInt64>(max_date));
columns[j++]->insert(part->info.partition_id);
columns[j++]->insert(part->info.min_block); columns[j++]->insert(part->info.min_block);
columns[j++]->insert(part->info.max_block); columns[j++]->insert(part->info.max_block);
columns[j++]->insert(static_cast<UInt64>(part->info.level)); columns[j++]->insert(static_cast<UInt64>(part->info.level));
columns[j++]->insert(static_cast<UInt64>(part->info.getDataVersion()));
columns[j++]->insert(static_cast<UInt64>(index_size_in_bytes)); columns[j++]->insert(static_cast<UInt64>(index_size_in_bytes));
columns[j++]->insert(static_cast<UInt64>(index_size_in_allocated_bytes)); columns[j++]->insert(static_cast<UInt64>(index_size_in_allocated_bytes));

View File

@ -14,6 +14,7 @@
#include <Storages/System/StorageSystemMerges.h> #include <Storages/System/StorageSystemMerges.h>
#include <Storages/System/StorageSystemMetrics.h> #include <Storages/System/StorageSystemMetrics.h>
#include <Storages/System/StorageSystemModels.h> #include <Storages/System/StorageSystemModels.h>
#include <Storages/System/StorageSystemMutations.h>
#include <Storages/System/StorageSystemNumbers.h> #include <Storages/System/StorageSystemNumbers.h>
#include <Storages/System/StorageSystemOne.h> #include <Storages/System/StorageSystemOne.h>
#include <Storages/System/StorageSystemParts.h> #include <Storages/System/StorageSystemParts.h>
@ -51,6 +52,7 @@ void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper)
system_database.attachTable("processes", StorageSystemProcesses::create("processes")); system_database.attachTable("processes", StorageSystemProcesses::create("processes"));
system_database.attachTable("metrics", StorageSystemMetrics::create("metrics")); system_database.attachTable("metrics", StorageSystemMetrics::create("metrics"));
system_database.attachTable("merges", StorageSystemMerges::create("merges")); system_database.attachTable("merges", StorageSystemMerges::create("merges"));
system_database.attachTable("mutations", StorageSystemMutations::create("mutations"));
system_database.attachTable("replicas", StorageSystemReplicas::create("replicas")); system_database.attachTable("replicas", StorageSystemReplicas::create("replicas"));
system_database.attachTable("replication_queue", StorageSystemReplicationQueue::create("replication_queue")); system_database.attachTable("replication_queue", StorageSystemReplicationQueue::create("replication_queue"));
system_database.attachTable("dictionaries", StorageSystemDictionaries::create("dictionaries")); system_database.attachTable("dictionaries", StorageSystemDictionaries::create("dictionaries"));