From 14947b494f543a32b664ee6135628a1de408f16a Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Sat, 9 Jun 2018 18:53:14 +0300 Subject: [PATCH 1/7] make ASTAlterQuery::Parameters a real AST element ASTAlterCommand [#CLICKHOUSE-3747] --- dbms/src/Interpreters/DDLWorker.cpp | 22 +- .../Interpreters/InterpreterAlterQuery.cpp | 60 +-- dbms/src/Interpreters/InterpreterAlterQuery.h | 2 +- dbms/src/Parsers/ASTAlterQuery.cpp | 277 ++++++------ dbms/src/Parsers/ASTAlterQuery.h | 132 +++--- dbms/src/Parsers/ParserAlterQuery.cpp | 394 ++++++++++-------- dbms/src/Parsers/ParserAlterQuery.h | 17 + 7 files changed, 501 insertions(+), 403 deletions(-) diff --git a/dbms/src/Interpreters/DDLWorker.cpp b/dbms/src/Interpreters/DDLWorker.cpp index 7fed4201b8c..1d46d33ac26 100644 --- a/dbms/src/Interpreters/DDLWorker.cpp +++ b/dbms/src/Interpreters/DDLWorker.cpp @@ -207,12 +207,12 @@ static std::unique_ptr createSimpleZooKeeperLock( static bool isSupportedAlterType(int type) { static const std::unordered_set supported_alter_types{ - ASTAlterQuery::ADD_COLUMN, - ASTAlterQuery::DROP_COLUMN, - ASTAlterQuery::MODIFY_COLUMN, - ASTAlterQuery::MODIFY_PRIMARY_KEY, - ASTAlterQuery::DROP_PARTITION, - ASTAlterQuery::DELETE, + ASTAlterCommand::ADD_COLUMN, + ASTAlterCommand::DROP_COLUMN, + ASTAlterCommand::MODIFY_COLUMN, + ASTAlterCommand::MODIFY_PRIMARY_KEY, + ASTAlterCommand::DROP_PARTITION, + ASTAlterCommand::DELETE, }; return supported_alter_types.count(type) != 0; @@ -621,13 +621,13 @@ void DDLWorker::processTaskAlter( bool execute_once_on_replica = storage->supportsReplication(); bool execute_on_leader_replica = false; - for (const auto & param : ast_alter->parameters) + for (const auto & command : ast_alter->command_list->commands) { - if (!isSupportedAlterType(param.type)) + if (!isSupportedAlterType(command->type)) throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED); if (execute_once_on_replica) - execute_on_leader_replica |= param.type == ASTAlterQuery::DROP_PARTITION; + execute_on_leader_replica |= command->type == ASTAlterCommand::DROP_PARTITION; } const auto & shard_info = task.cluster->getShardsInfo().at(task.host_shard_num); @@ -1142,9 +1142,9 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont if (auto query_alter = dynamic_cast(query_ptr.get())) { - for (const auto & param : query_alter->parameters) + for (const auto & command : query_alter->command_list->commands) { - if (!isSupportedAlterType(param.type)) + if (!isSupportedAlterType(command->type)) throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED); } } diff --git a/dbms/src/Interpreters/InterpreterAlterQuery.cpp b/dbms/src/Interpreters/InterpreterAlterQuery.cpp index 8934ef8f9eb..785edd2e229 100644 --- a/dbms/src/Interpreters/InterpreterAlterQuery.cpp +++ b/dbms/src/Interpreters/InterpreterAlterQuery.cpp @@ -51,7 +51,7 @@ BlockIO InterpreterAlterQuery::execute() AlterCommands alter_commands; PartitionCommands partition_commands; MutationCommands mutation_commands; - parseAlter(alter.parameters, alter_commands, partition_commands, mutation_commands); + parseAlter(alter.command_list->commands, alter_commands, partition_commands, mutation_commands); if (!mutation_commands.commands.empty()) { @@ -104,21 +104,21 @@ BlockIO InterpreterAlterQuery::execute() } void InterpreterAlterQuery::parseAlter( - const ASTAlterQuery::ParameterContainer & params_container, + const std::vector & command_asts, AlterCommands & out_alter_commands, PartitionCommands & out_partition_commands, MutationCommands & out_mutation_commands) { const DataTypeFactory & data_type_factory = DataTypeFactory::instance(); - for (const auto & params : params_container) + for (const auto & command_ast : command_asts) { - if (params.type == ASTAlterQuery::ADD_COLUMN) + if (command_ast->type == ASTAlterCommand::ADD_COLUMN) { AlterCommand command; command.type = AlterCommand::ADD_COLUMN; - const auto & ast_col_decl = typeid_cast(*params.col_decl); + const auto & ast_col_decl = typeid_cast(*command_ast->col_decl); command.column_name = ast_col_decl.name; if (ast_col_decl.type) @@ -131,40 +131,40 @@ void InterpreterAlterQuery::parseAlter( command.default_expression = ast_col_decl.default_expression; } - if (params.column) - command.after_column = typeid_cast(*params.column).name; + if (command_ast->column) + command.after_column = typeid_cast(*command_ast->column).name; out_alter_commands.emplace_back(std::move(command)); } - else if (params.type == ASTAlterQuery::DROP_COLUMN) + else if (command_ast->type == ASTAlterCommand::DROP_COLUMN) { - if (params.partition) + if (command_ast->partition) { - if (!params.clear_column) + if (!command_ast->clear_column) throw Exception("Can't DROP COLUMN from partition. It is possible only CLEAR COLUMN in partition", ErrorCodes::BAD_ARGUMENTS); - const Field & column_name = typeid_cast(*(params.column)).name; + const Field & column_name = typeid_cast(*(command_ast->column)).name; - out_partition_commands.emplace_back(PartitionCommand::clearColumn(params.partition, column_name)); + out_partition_commands.emplace_back(PartitionCommand::clearColumn(command_ast->partition, column_name)); } else { - if (params.clear_column) + if (command_ast->clear_column) throw Exception("\"ALTER TABLE table CLEAR COLUMN column\" queries are not supported yet. Use \"CLEAR COLUMN column IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED); AlterCommand command; command.type = AlterCommand::DROP_COLUMN; - command.column_name = typeid_cast(*(params.column)).name; + command.column_name = typeid_cast(*(command_ast->column)).name; out_alter_commands.emplace_back(std::move(command)); } } - else if (params.type == ASTAlterQuery::MODIFY_COLUMN) + else if (command_ast->type == ASTAlterCommand::MODIFY_COLUMN) { AlterCommand command; command.type = AlterCommand::MODIFY_COLUMN; - const auto & ast_col_decl = typeid_cast(*params.col_decl); + const auto & ast_col_decl = typeid_cast(*command_ast->col_decl); command.column_name = ast_col_decl.name; if (ast_col_decl.type) @@ -180,37 +180,37 @@ void InterpreterAlterQuery::parseAlter( out_alter_commands.emplace_back(std::move(command)); } - else if (params.type == ASTAlterQuery::MODIFY_PRIMARY_KEY) + else if (command_ast->type == ASTAlterCommand::MODIFY_PRIMARY_KEY) { AlterCommand command; command.type = AlterCommand::MODIFY_PRIMARY_KEY; - command.primary_key = params.primary_key; + command.primary_key = command_ast->primary_key; out_alter_commands.emplace_back(std::move(command)); } - else if (params.type == ASTAlterQuery::DROP_PARTITION) + else if (command_ast->type == ASTAlterCommand::DROP_PARTITION) { - out_partition_commands.emplace_back(PartitionCommand::dropPartition(params.partition, params.detach)); + out_partition_commands.emplace_back(PartitionCommand::dropPartition(command_ast->partition, command_ast->detach)); } - else if (params.type == ASTAlterQuery::ATTACH_PARTITION) + else if (command_ast->type == ASTAlterCommand::ATTACH_PARTITION) { - out_partition_commands.emplace_back(PartitionCommand::attachPartition(params.partition, params.part)); + out_partition_commands.emplace_back(PartitionCommand::attachPartition(command_ast->partition, command_ast->part)); } - else if (params.type == ASTAlterQuery::REPLACE_PARTITION) + else if (command_ast->type == ASTAlterCommand::REPLACE_PARTITION) { out_partition_commands.emplace_back( - PartitionCommand::replacePartition(params.partition, params.replace, params.from_database, params.from_table)); + PartitionCommand::replacePartition(command_ast->partition, command_ast->replace, command_ast->from_database, command_ast->from_table)); } - else if (params.type == ASTAlterQuery::FETCH_PARTITION) + else if (command_ast->type == ASTAlterCommand::FETCH_PARTITION) { - out_partition_commands.emplace_back(PartitionCommand::fetchPartition(params.partition, params.from)); + out_partition_commands.emplace_back(PartitionCommand::fetchPartition(command_ast->partition, command_ast->from)); } - else if (params.type == ASTAlterQuery::FREEZE_PARTITION) + else if (command_ast->type == ASTAlterCommand::FREEZE_PARTITION) { - out_partition_commands.emplace_back(PartitionCommand::freezePartition(params.partition, params.with_name)); + out_partition_commands.emplace_back(PartitionCommand::freezePartition(command_ast->partition, command_ast->with_name)); } - else if (params.type == ASTAlterQuery::DELETE) + else if (command_ast->type == ASTAlterCommand::DELETE) { - out_mutation_commands.commands.emplace_back(MutationCommand::delete_(params.predicate)); + out_mutation_commands.commands.emplace_back(MutationCommand::delete_(command_ast->predicate)); } else throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR); diff --git a/dbms/src/Interpreters/InterpreterAlterQuery.h b/dbms/src/Interpreters/InterpreterAlterQuery.h index ea9fe925a4a..31b89f74864 100644 --- a/dbms/src/Interpreters/InterpreterAlterQuery.h +++ b/dbms/src/Interpreters/InterpreterAlterQuery.h @@ -123,7 +123,7 @@ private: const Context & context; - static void parseAlter(const ASTAlterQuery::ParameterContainer & params, + static void parseAlter(const std::vector & commands, AlterCommands & out_alter_commands, PartitionCommands & out_partition_commands, MutationCommands & out_mutation_commands); diff --git a/dbms/src/Parsers/ASTAlterQuery.cpp b/dbms/src/Parsers/ASTAlterQuery.cpp index 11c21ff134a..cbf1c934b05 100644 --- a/dbms/src/Parsers/ASTAlterQuery.cpp +++ b/dbms/src/Parsers/ASTAlterQuery.cpp @@ -10,45 +10,172 @@ namespace ErrorCodes extern const int UNEXPECTED_AST_STRUCTURE; } -ASTAlterQuery::Parameters::Parameters() {} - -void ASTAlterQuery::Parameters::clone(Parameters & p) const +ASTPtr ASTAlterCommand::clone() const { - p = *this; - if (col_decl) p.col_decl = col_decl->clone(); - if (column) p.column = column->clone(); - if (primary_key) p.primary_key = primary_key->clone(); - if (partition) p.partition = partition->clone(); - if (predicate) p.predicate = predicate->clone(); + auto res = std::make_shared(*this); + res->children.clear(); + + if (col_decl) + { + res->col_decl = col_decl->clone(); + res->children.push_back(res->col_decl); + } + if (column) + { + res->column = column->clone(); + res->children.push_back(res->column); + } + if (primary_key) + { + res->primary_key = primary_key->clone(); + res->children.push_back(res->primary_key); + } + if (partition) + { + res->partition = partition->clone(); + res->children.push_back(res->partition); + } + if (predicate) + { + res->predicate = predicate->clone(); + res->children.push_back(res->predicate); + } + + return res; } -void ASTAlterQuery::addParameters(const Parameters & params) +void ASTAlterCommand::formatImpl( + const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const { - parameters.push_back(params); - if (params.col_decl) - children.push_back(params.col_decl); - if (params.column) - children.push_back(params.column); - if (params.partition) - children.push_back(params.partition); - if (params.primary_key) - children.push_back(params.primary_key); - if (params.predicate) - children.push_back(params.predicate); + std::string indent_str = settings.one_line ? "" : std::string(4 * frame.indent, ' '); + + if (type == ASTAlterCommand::ADD_COLUMN) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ADD COLUMN " << (settings.hilite ? hilite_none : ""); + col_decl->formatImpl(settings, state, frame); + + /// AFTER + if (column) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << " AFTER " << (settings.hilite ? hilite_none : ""); + column->formatImpl(settings, state, frame); + } + } + else if (type == ASTAlterCommand::DROP_COLUMN) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str + << (clear_column ? "CLEAR " : "DROP ") << "COLUMN " << (settings.hilite ? hilite_none : ""); + column->formatImpl(settings, state, frame); + if (partition) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str<< " IN PARTITION " << (settings.hilite ? hilite_none : ""); + partition->formatImpl(settings, state, frame); + } + } + else if (type == ASTAlterCommand::MODIFY_COLUMN) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY COLUMN " << (settings.hilite ? hilite_none : ""); + col_decl->formatImpl(settings, state, frame); + } + else if (type == ASTAlterCommand::MODIFY_PRIMARY_KEY) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY PRIMARY KEY " << (settings.hilite ? hilite_none : ""); + settings.ostr << "("; + primary_key->formatImpl(settings, state, frame); + settings.ostr << ")"; + } + else if (type == ASTAlterCommand::DROP_PARTITION) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << (detach ? "DETACH" : "DROP") << " PARTITION " + << (settings.hilite ? hilite_none : ""); + partition->formatImpl(settings, state, frame); + } + else if (type == ASTAlterCommand::ATTACH_PARTITION) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ATTACH " + << (part ? "PART " : "PARTITION ") << (settings.hilite ? hilite_none : ""); + partition->formatImpl(settings, state, frame); + } + else if (type == ASTAlterCommand::REPLACE_PARTITION) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << (replace ? "REPLACE" : "ATTACH") << " PARTITION " + << (settings.hilite ? hilite_none : ""); + partition->formatImpl(settings, state, frame); + settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM " << (settings.hilite ? hilite_none : ""); + if (!from_database.empty()) + { + settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(from_database) + << (settings.hilite ? hilite_none : "") << "."; + } + settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(from_table) << (settings.hilite ? hilite_none : ""); + } + else if (type == ASTAlterCommand::FETCH_PARTITION) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "FETCH " + << "PARTITION " << (settings.hilite ? hilite_none : ""); + partition->formatImpl(settings, state, frame); + settings.ostr << (settings.hilite ? hilite_keyword : "") + << " FROM " << (settings.hilite ? hilite_none : "") << std::quoted(from, '\''); + } + else if (type == ASTAlterCommand::FREEZE_PARTITION) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "FREEZE PARTITION " << (settings.hilite ? hilite_none : ""); + partition->formatImpl(settings, state, frame); + + if (!with_name.empty()) + { + settings.ostr << " " << (settings.hilite ? hilite_keyword : "") << "WITH NAME" << (settings.hilite ? hilite_none : "") + << " " << std::quoted(with_name, '\''); + } + } + else if (type == ASTAlterCommand::DELETE) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "DELETE WHERE " << (settings.hilite ? hilite_none : ""); + predicate->formatImpl(settings, state, frame); + } + else + throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE); } + +ASTPtr ASTAlterCommandList::clone() const +{ + auto res = std::make_shared(); + for (ASTAlterCommand * command : commands) + res->add(command->clone()); + return res; +} + +void ASTAlterCommandList::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const +{ + std::string indent_str = settings.one_line ? "" : std::string(4 * frame.indent, ' '); + + for (size_t i = 0; i < commands.size(); ++i) + { + static_cast(commands[i])->formatImpl(settings, state, frame); + + std::string comma = (i < (commands.size() - 1)) ? "," : ""; + settings.ostr << (settings.hilite ? hilite_keyword : "") << comma << (settings.hilite ? hilite_none : ""); + + settings.ostr << settings.nl_or_ws; + } +} + + /** Get the text that identifies this element. */ String ASTAlterQuery::getID() const { - return ("AlterQuery_" + database + "_" + table); + return "AlterQuery_" + database + "_" + table; } ASTPtr ASTAlterQuery::clone() const { auto res = std::make_shared(*this); - for (ParameterContainer::size_type i = 0; i < parameters.size(); ++i) - parameters[i].clone(res->parameters[i]); - cloneOutputOptions(*res); + res->children.clear(); + + if (command_list) + res->set(res->command_list, command_list->clone()); + return res; } @@ -84,102 +211,10 @@ void ASTAlterQuery::formatQueryImpl(const FormatSettings & settings, FormatState formatOnCluster(settings); settings.ostr << settings.nl_or_ws; - for (size_t i = 0; i < parameters.size(); ++i) - { - const ASTAlterQuery::Parameters & p = parameters[i]; - - if (p.type == ASTAlterQuery::ADD_COLUMN) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ADD COLUMN " << (settings.hilite ? hilite_none : ""); - p.col_decl->formatImpl(settings, state, frame); - - /// AFTER - if (p.column) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << " AFTER " << (settings.hilite ? hilite_none : ""); - p.column->formatImpl(settings, state, frame); - } - } - else if (p.type == ASTAlterQuery::DROP_COLUMN) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str - << (p.clear_column ? "CLEAR " : "DROP ") << "COLUMN " << (settings.hilite ? hilite_none : ""); - p.column->formatImpl(settings, state, frame); - if (p.partition) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str<< " IN PARTITION " << (settings.hilite ? hilite_none : ""); - p.partition->formatImpl(settings, state, frame); - } - } - else if (p.type == ASTAlterQuery::MODIFY_COLUMN) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY COLUMN " << (settings.hilite ? hilite_none : ""); - p.col_decl->formatImpl(settings, state, frame); - } - else if (p.type == ASTAlterQuery::MODIFY_PRIMARY_KEY) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY PRIMARY KEY " << (settings.hilite ? hilite_none : ""); - settings.ostr << "("; - p.primary_key->formatImpl(settings, state, frame); - settings.ostr << ")"; - } - else if (p.type == ASTAlterQuery::DROP_PARTITION) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << (p.detach ? "DETACH" : "DROP") << " PARTITION " - << (settings.hilite ? hilite_none : ""); - p.partition->formatImpl(settings, state, frame); - } - else if (p.type == ASTAlterQuery::ATTACH_PARTITION) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ATTACH " - << (p.part ? "PART " : "PARTITION ") << (settings.hilite ? hilite_none : ""); - p.partition->formatImpl(settings, state, frame); - } - else if (p.type == ASTAlterQuery::REPLACE_PARTITION) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << (p.replace ? "REPLACE" : "ATTACH") << " PARTITION " - << (settings.hilite ? hilite_none : ""); - p.partition->formatImpl(settings, state, frame); - settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM " << (settings.hilite ? hilite_none : ""); - if (!p.from_database.empty()) - { - settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(p.from_database) - << (settings.hilite ? hilite_none : "") << "."; - } - settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(p.from_table) << (settings.hilite ? hilite_none : ""); - } - else if (p.type == ASTAlterQuery::FETCH_PARTITION) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "FETCH " - << "PARTITION " << (settings.hilite ? hilite_none : ""); - p.partition->formatImpl(settings, state, frame); - settings.ostr << (settings.hilite ? hilite_keyword : "") - << " FROM " << (settings.hilite ? hilite_none : "") << std::quoted(p.from, '\''); - } - else if (p.type == ASTAlterQuery::FREEZE_PARTITION) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "FREEZE PARTITION " << (settings.hilite ? hilite_none : ""); - p.partition->formatImpl(settings, state, frame); - - if (!p.with_name.empty()) - { - settings.ostr << " " << (settings.hilite ? hilite_keyword : "") << "WITH NAME" << (settings.hilite ? hilite_none : "") - << " " << std::quoted(p.with_name, '\''); - } - } - else if (p.type == ASTAlterQuery::DELETE) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "DELETE WHERE " << (settings.hilite ? hilite_none : ""); - p.predicate->formatImpl(settings, state, frame); - } - else - throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE); - - std::string comma = (i < (parameters.size() -1) ) ? "," : ""; - settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << comma << (settings.hilite ? hilite_none : ""); - - settings.ostr << settings.nl_or_ws; - } + FormatStateStacked frame_nested = frame; + frame_nested.need_parens = false; + ++frame_nested.indent; + static_cast(command_list)->formatImpl(settings, state, frame_nested); } } diff --git a/dbms/src/Parsers/ASTAlterQuery.h b/dbms/src/Parsers/ASTAlterQuery.h index cdde19d061a..a97503305f5 100644 --- a/dbms/src/Parsers/ASTAlterQuery.h +++ b/dbms/src/Parsers/ASTAlterQuery.h @@ -16,10 +16,10 @@ namespace DB * DROP PARTITION partition, */ -class ASTAlterQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster +class ASTAlterCommand : public IAST { public: - enum ParameterType + enum Type { ADD_COLUMN, DROP_COLUMN, @@ -37,70 +37,88 @@ public: NO_TYPE, }; - struct Parameters + Type type = NO_TYPE; + + /** The ADD COLUMN query stores the name and type of the column to add + * This field is not used in the DROP query + * In MODIFY query, the column name and the new type are stored here + */ + ASTPtr col_decl; + + /** The ADD COLUMN query here optionally stores the name of the column following AFTER + * The DROP query stores the column name for deletion here + */ + ASTPtr column; + + /** For MODIFY PRIMARY KEY + */ + ASTPtr primary_key; + + /** Used in DROP PARTITION, RESHARD PARTITION and ATTACH PARTITION FROM queries. + * The value or ID of the partition is stored here. + */ + ASTPtr partition; + + /// For DELETE WHERE: the predicate that filters the rows to delete. + ASTPtr predicate; + + bool detach = false; /// true for DETACH PARTITION + + bool part = false; /// true for ATTACH PART + + bool do_copy = false; /// for RESHARD PARTITION + + bool clear_column = false; /// for CLEAR COLUMN (do not drop column from metadata) + + /** For FETCH PARTITION - the path in ZK to the shard, from which to download the partition. + */ + String from; + + /** For FREEZE PARTITION - place local backup to directory with specified name. + */ + String with_name; + + /// REPLACE(ATTACH) PARTITION partition FROM db.table + String from_database; + String from_table; + /// To distinguish REPLACE and ATTACH PARTITION partition FROM db.table + bool replace = true; + + String getID() const override { return "AlterCommand_" + std::to_string(static_cast(type)); } + + ASTPtr clone() const override; + +protected: + void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; +}; + +class ASTAlterCommandList : public IAST +{ +public: + std::vector commands; + + void add(const ASTPtr & command) { - Parameters(); + commands.push_back(static_cast(command.get())); + children.push_back(command); + } - int type = NO_TYPE; + String getID() const override { return "AlterCommandList"; } - /** The ADD COLUMN query stores the name and type of the column to add - * This field is not used in the DROP query - * In MODIFY query, the column name and the new type are stored here - */ - ASTPtr col_decl; + ASTPtr clone() const override; - /** The ADD COLUMN query here optionally stores the name of the column following AFTER - * The DROP query stores the column name for deletion here - */ - ASTPtr column; +protected: + void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; +}; - /** For MODIFY PRIMARY KEY - */ - ASTPtr primary_key; - - /** Used in DROP PARTITION, RESHARD PARTITION and ATTACH PARTITION FROM queries. - * The value or ID of the partition is stored here. - */ - ASTPtr partition; - - /// For DELETE WHERE: the predicate that filters the rows to delete. - ASTPtr predicate; - - bool detach = false; /// true for DETACH PARTITION - - bool part = false; /// true for ATTACH PART - - bool do_copy = false; /// for RESHARD PARTITION - - bool clear_column = false; /// for CLEAR COLUMN (do not drop column from metadata) - - /** For FETCH PARTITION - the path in ZK to the shard, from which to download the partition. - */ - String from; - - /** For FREEZE PARTITION - place local backup to directory with specified name. - */ - String with_name; - - /// REPLACE(ATTACH) PARTITION partition FROM db.table - String from_database; - String from_table; - /// To distinguish REPLACE and ATTACH PARTITION partition FROM db.table - bool replace = true; - - /// deep copy - void clone(Parameters & p) const; - }; - - using ParameterContainer = std::vector; - ParameterContainer parameters; +class ASTAlterQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster +{ +public: String database; String table; + ASTAlterCommandList * command_list = nullptr; - void addParameters(const Parameters & params); - - /** Get the text that identifies this element. */ String getID() const override; ASTPtr clone() const override; diff --git a/dbms/src/Parsers/ParserAlterQuery.cpp b/dbms/src/Parsers/ParserAlterQuery.cpp index 6715ada2ece..c592cb66d7d 100644 --- a/dbms/src/Parsers/ParserAlterQuery.cpp +++ b/dbms/src/Parsers/ParserAlterQuery.cpp @@ -12,9 +12,11 @@ namespace DB { -bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) +bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { - ParserKeyword s_alter_table("ALTER TABLE"); + auto command = std::make_shared(); + node = command; + ParserKeyword s_add_column("ADD COLUMN"); ParserKeyword s_drop_column("DROP COLUMN"); ParserKeyword s_clear_column("CLEAR COLUMN"); @@ -37,207 +39,233 @@ bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserKeyword s_delete_where("DELETE WHERE"); - ParserToken s_dot(TokenType::Dot); - ParserToken s_comma(TokenType::Comma); - - ParserIdentifier table_parser; ParserCompoundIdentifier parser_name; + ParserStringLiteral parser_string_literal; ParserCompoundColumnDeclaration parser_col_decl; ParserPartition parser_partition; - ParserStringLiteral parser_string_literal; ParserExpression exp_elem; - String cluster_str; - ASTPtr col_type; - ASTPtr col_after; - ASTPtr col_drop; + if (s_add_column.ignore(pos, expected)) + { + if (!parser_col_decl.parse(pos, command->col_decl, expected)) + return false; + if (s_after.ignore(pos, expected)) + { + if (!parser_name.parse(pos, command->column, expected)) + return false; + } + + command->type = ASTAlterCommand::ADD_COLUMN; + } + else if (s_drop_partition.ignore(pos, expected)) + { + if (!parser_partition.parse(pos, command->partition, expected)) + return false; + + command->type = ASTAlterCommand::DROP_PARTITION; + } + else if (s_drop_column.ignore(pos, expected)) + { + if (!parser_name.parse(pos, command->column, expected)) + return false; + + command->type = ASTAlterCommand::DROP_COLUMN; + command->detach = false; + } + else if (s_clear_column.ignore(pos, expected)) + { + if (!parser_name.parse(pos, command->column, expected)) + return false; + + command->type = ASTAlterCommand::DROP_COLUMN; + command->clear_column = true; + command->detach = false; + + if (s_in_partition.ignore(pos, expected)) + { + if (!parser_partition.parse(pos, command->partition, expected)) + return false; + } + } + else if (s_detach_partition.ignore(pos, expected)) + { + if (!parser_partition.parse(pos, command->partition, expected)) + return false; + + command->type = ASTAlterCommand::DROP_PARTITION; + command->detach = true; + } + else if (s_attach_partition.ignore(pos, expected)) + { + if (!parser_partition.parse(pos, command->partition, expected)) + return false; + + if (s_from.ignore(pos)) + { + if (!parseDatabaseAndTableName(pos, expected, command->from_database, command->from_table)) + return false; + + command->replace = false; + command->type = ASTAlterCommand::REPLACE_PARTITION; + } + else + { + command->type = ASTAlterCommand::ATTACH_PARTITION; + } + } + else if (ParserKeyword{"REPLACE PARTITION"}.ignore(pos, expected)) + { + if (!parser_partition.parse(pos, command->partition, expected)) + return false; + + if (!s_from.ignore(pos, expected)) + return false; + + if (!parseDatabaseAndTableName(pos, expected, command->from_database, command->from_table)) + return false; + + command->replace = true; + command->type = ASTAlterCommand::REPLACE_PARTITION; + } + else if (s_attach_part.ignore(pos, expected)) + { + if (!parser_string_literal.parse(pos, command->partition, expected)) + return false; + + command->part = true; + command->type = ASTAlterCommand::ATTACH_PARTITION; + } + else if (s_fetch_partition.ignore(pos, expected)) + { + if (!parser_partition.parse(pos, command->partition, expected)) + return false; + + if (!s_from.ignore(pos, expected)) + return false; + + ASTPtr ast_from; + if (!parser_string_literal.parse(pos, ast_from, expected)) + return false; + + command->from = typeid_cast(*ast_from).value.get(); + command->type = ASTAlterCommand::FETCH_PARTITION; + } + else if (s_freeze_partition.ignore(pos, expected)) + { + if (!parser_partition.parse(pos, command->partition, expected)) + return false; + + /// WITH NAME 'name' - place local backup to directory with specified name + if (s_with.ignore(pos, expected)) + { + if (!s_name.ignore(pos, expected)) + return false; + + ASTPtr ast_with_name; + if (!parser_string_literal.parse(pos, ast_with_name, expected)) + return false; + + command->with_name = typeid_cast(*ast_with_name).value.get(); + } + + command->type = ASTAlterCommand::FREEZE_PARTITION; + } + else if (s_modify_column.ignore(pos, expected)) + { + if (!parser_col_decl.parse(pos, command->col_decl, expected)) + return false; + + command->type = ASTAlterCommand::MODIFY_COLUMN; + } + else if (s_modify_primary_key.ignore(pos, expected)) + { + if (pos->type != TokenType::OpeningRoundBracket) + return false; + ++pos; + + if (!ParserNotEmptyExpressionList(false).parse(pos, command->primary_key, expected)) + return false; + + if (pos->type != TokenType::ClosingRoundBracket) + return false; + ++pos; + + command->type = ASTAlterCommand::MODIFY_PRIMARY_KEY; + } + else if (s_delete_where.ignore(pos, expected)) + { + if (!exp_elem.parse(pos, command->predicate, expected)) + return false; + + command->type = ASTAlterCommand::DELETE; + } + else + return false; + + if (command->col_decl) + command->children.push_back(command->col_decl); + if (command->column) + command->children.push_back(command->column); + if (command->primary_key) + command->children.push_back(command->primary_key); + if (command->partition) + command->children.push_back(command->partition); + if (command->predicate) + command->children.push_back(command->predicate); + + return true; +} + + +bool ParserAlterCommandList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) +{ + auto command_list = std::make_shared(); + node = command_list; + + ParserToken s_comma(TokenType::Comma); + ParserAlterCommand p_command; + + do + { + ASTPtr command; + if (!p_command.parse(pos, command, expected)) + return false; + + command_list->add(command); + } + while (s_comma.ignore(pos, expected)); + + return true; +} + + +bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) +{ auto query = std::make_shared(); + node = query; + ParserKeyword s_alter_table("ALTER TABLE"); if (!s_alter_table.ignore(pos, expected)) return false; if (!parseDatabaseAndTableName(pos, expected, query->database, query->table)) return false; + String cluster_str; if (ParserKeyword{"ON"}.ignore(pos, expected)) { if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected)) return false; } - - bool parsing_finished = false; - do - { - ASTAlterQuery::Parameters params; - - if (s_add_column.ignore(pos, expected)) - { - if (!parser_col_decl.parse(pos, params.col_decl, expected)) - return false; - - if (s_after.ignore(pos, expected)) - { - if (!parser_name.parse(pos, params.column, expected)) - return false; - } - - params.type = ASTAlterQuery::ADD_COLUMN; - } - else if (s_drop_partition.ignore(pos, expected)) - { - if (!parser_partition.parse(pos, params.partition, expected)) - return false; - - params.type = ASTAlterQuery::DROP_PARTITION; - } - else if (s_drop_column.ignore(pos, expected)) - { - if (!parser_name.parse(pos, params.column, expected)) - return false; - - params.type = ASTAlterQuery::DROP_COLUMN; - params.detach = false; - } - else if (s_clear_column.ignore(pos, expected)) - { - if (!parser_name.parse(pos, params.column, expected)) - return false; - - params.type = ASTAlterQuery::DROP_COLUMN; - params.clear_column = true; - params.detach = false; - - if (s_in_partition.ignore(pos, expected)) - { - if (!parser_partition.parse(pos, params.partition, expected)) - return false; - } - } - else if (s_detach_partition.ignore(pos, expected)) - { - if (!parser_partition.parse(pos, params.partition, expected)) - return false; - - params.type = ASTAlterQuery::DROP_PARTITION; - params.detach = true; - } - else if (s_attach_partition.ignore(pos, expected)) - { - if (!parser_partition.parse(pos, params.partition, expected)) - return false; - - if (s_from.ignore(pos)) - { - if (!parseDatabaseAndTableName(pos, expected, params.from_database, params.from_table)) - return false; - - params.replace = false; - params.type = ASTAlterQuery::REPLACE_PARTITION; - } - else - { - params.type = ASTAlterQuery::ATTACH_PARTITION; - } - } - else if (ParserKeyword{"REPLACE PARTITION"}.ignore(pos, expected)) - { - if (!parser_partition.parse(pos, params.partition, expected)) - return false; - - if (!s_from.ignore(pos, expected)) - return false; - - if (!parseDatabaseAndTableName(pos, expected, params.from_database, params.from_table)) - return false; - - params.replace = true; - params.type = ASTAlterQuery::REPLACE_PARTITION; - } - else if (s_attach_part.ignore(pos, expected)) - { - if (!parser_string_literal.parse(pos, params.partition, expected)) - return false; - - params.part = true; - params.type = ASTAlterQuery::ATTACH_PARTITION; - } - else if (s_fetch_partition.ignore(pos, expected)) - { - if (!parser_partition.parse(pos, params.partition, expected)) - return false; - - if (!s_from.ignore(pos, expected)) - return false; - - ASTPtr ast_from; - if (!parser_string_literal.parse(pos, ast_from, expected)) - return false; - - params.from = typeid_cast(*ast_from).value.get(); - params.type = ASTAlterQuery::FETCH_PARTITION; - } - else if (s_freeze_partition.ignore(pos, expected)) - { - if (!parser_partition.parse(pos, params.partition, expected)) - return false; - - /// WITH NAME 'name' - place local backup to directory with specified name - if (s_with.ignore(pos, expected)) - { - if (!s_name.ignore(pos, expected)) - return false; - - ASTPtr ast_with_name; - if (!parser_string_literal.parse(pos, ast_with_name, expected)) - return false; - - params.with_name = typeid_cast(*ast_with_name).value.get(); - } - - params.type = ASTAlterQuery::FREEZE_PARTITION; - } - else if (s_modify_column.ignore(pos, expected)) - { - if (!parser_col_decl.parse(pos, params.col_decl, expected)) - return false; - - params.type = ASTAlterQuery::MODIFY_COLUMN; - } - else if (s_modify_primary_key.ignore(pos, expected)) - { - if (pos->type != TokenType::OpeningRoundBracket) - return false; - ++pos; - - if (!ParserNotEmptyExpressionList(false).parse(pos, params.primary_key, expected)) - return false; - - if (pos->type != TokenType::ClosingRoundBracket) - return false; - ++pos; - - params.type = ASTAlterQuery::MODIFY_PRIMARY_KEY; - } - else if (s_delete_where.ignore(pos, expected)) - { - if (!exp_elem.parse(pos, params.predicate, expected)) - return false; - - params.type = ASTAlterQuery::DELETE; - } - else - return false; - - if (!s_comma.ignore(pos, expected)) - parsing_finished = true; - - query->addParameters(params); - } - while (!parsing_finished); - query->cluster = cluster_str; - node = query; + + ParserAlterCommandList p_command_list; + ASTPtr command_list; + if (!p_command_list.parse(pos, command_list, expected)) + return false; + + query->set(query->command_list, command_list); return true; } diff --git a/dbms/src/Parsers/ParserAlterQuery.h b/dbms/src/Parsers/ParserAlterQuery.h index 03c23c6f47f..c758e0304b4 100644 --- a/dbms/src/Parsers/ParserAlterQuery.h +++ b/dbms/src/Parsers/ParserAlterQuery.h @@ -18,6 +18,23 @@ namespace DB * [FREEZE PARTITION] * [DELETE WHERE ...] */ + +class ParserAlterCommand : public IParserBase +{ +protected: + const char * getName() const { return "ALTER command"; } + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected); +}; + + +class ParserAlterCommandList : public IParserBase +{ +protected: + const char * getName() const { return "a list of ALTER commands"; } + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected); +}; + + class ParserAlterQuery : public IParserBase { protected: From c5c601f6622e29e865f802dc172ccfd2067de906 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 13 Jun 2018 16:49:27 +0300 Subject: [PATCH 2/7] save serialized commands in ReplicatedMergeTreeMutationEntry; split InterpreterAlterQuery::parseAlter() [#CLICKHOUSE-3747] --- .../ApplyingMutationsBlockInputStream.cpp | 4 +- .../Interpreters/InterpreterAlterQuery.cpp | 165 ++---------------- dbms/src/Interpreters/InterpreterAlterQuery.h | 103 +---------- dbms/src/Storages/AlterCommands.cpp | 89 +++++++++- dbms/src/Storages/AlterCommands.h | 16 +- .../ReplicatedMergeTreeMutationEntry.cpp | 30 +++- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 6 +- dbms/src/Storages/MutationCommands.cpp | 91 ++-------- dbms/src/Storages/MutationCommands.h | 27 +-- dbms/src/Storages/PartitionCommands.cpp | 94 ++++++++++ dbms/src/Storages/PartitionCommands.h | 59 +++++++ .../Storages/StorageReplicatedMergeTree.cpp | 2 +- 12 files changed, 319 insertions(+), 367 deletions(-) create mode 100644 dbms/src/Storages/PartitionCommands.cpp create mode 100644 dbms/src/Storages/PartitionCommands.h diff --git a/dbms/src/DataStreams/ApplyingMutationsBlockInputStream.cpp b/dbms/src/DataStreams/ApplyingMutationsBlockInputStream.cpp index 51f155e0bfc..bfcfcb85418 100644 --- a/dbms/src/DataStreams/ApplyingMutationsBlockInputStream.cpp +++ b/dbms/src/DataStreams/ApplyingMutationsBlockInputStream.cpp @@ -2,6 +2,8 @@ #include #include #include +#include + namespace DB { @@ -32,7 +34,7 @@ ApplyingMutationsBlockInputStream::ApplyingMutationsBlockInputStream( break; } default: - throw Exception("Unsupported mutation cmd type: " + toString(static_cast(cmd.type)), + throw Exception("Unsupported mutation cmd type: " + toString(cmd.type), ErrorCodes::LOGICAL_ERROR); } } diff --git a/dbms/src/Interpreters/InterpreterAlterQuery.cpp b/dbms/src/Interpreters/InterpreterAlterQuery.cpp index 785edd2e229..c58d358dd63 100644 --- a/dbms/src/Interpreters/InterpreterAlterQuery.cpp +++ b/dbms/src/Interpreters/InterpreterAlterQuery.cpp @@ -1,21 +1,7 @@ #include #include #include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#include +#include #include @@ -26,8 +12,6 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; - extern const int ARGUMENT_OUT_OF_BOUND; - extern const int BAD_ARGUMENTS; extern const int ILLEGAL_COLUMN; } @@ -51,9 +35,19 @@ BlockIO InterpreterAlterQuery::execute() AlterCommands alter_commands; PartitionCommands partition_commands; MutationCommands mutation_commands; - parseAlter(alter.command_list->commands, alter_commands, partition_commands, mutation_commands); + for (ASTAlterCommand * command_ast : alter.command_list->commands) + { + if (auto alter_command = AlterCommand::parse(command_ast)) + alter_commands.emplace_back(std::move(*alter_command)); + else if (auto partition_command = PartitionCommand::parse(command_ast)) + partition_commands.emplace_back(std::move(*partition_command)); + else if (auto mut_command = MutationCommand::parse(command_ast)) + mutation_commands.emplace_back(std::move(*mut_command)); + else + throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR); + } - if (!mutation_commands.commands.empty()) + if (!mutation_commands.empty()) { mutation_commands.validate(*table, context); table->mutate(mutation_commands, context); @@ -103,137 +97,4 @@ BlockIO InterpreterAlterQuery::execute() return {}; } -void InterpreterAlterQuery::parseAlter( - const std::vector & command_asts, - AlterCommands & out_alter_commands, - PartitionCommands & out_partition_commands, - MutationCommands & out_mutation_commands) -{ - const DataTypeFactory & data_type_factory = DataTypeFactory::instance(); - - for (const auto & command_ast : command_asts) - { - if (command_ast->type == ASTAlterCommand::ADD_COLUMN) - { - AlterCommand command; - command.type = AlterCommand::ADD_COLUMN; - - const auto & ast_col_decl = typeid_cast(*command_ast->col_decl); - - command.column_name = ast_col_decl.name; - if (ast_col_decl.type) - { - command.data_type = data_type_factory.get(ast_col_decl.type); - } - if (ast_col_decl.default_expression) - { - command.default_kind = columnDefaultKindFromString(ast_col_decl.default_specifier); - command.default_expression = ast_col_decl.default_expression; - } - - if (command_ast->column) - command.after_column = typeid_cast(*command_ast->column).name; - - out_alter_commands.emplace_back(std::move(command)); - } - else if (command_ast->type == ASTAlterCommand::DROP_COLUMN) - { - if (command_ast->partition) - { - if (!command_ast->clear_column) - throw Exception("Can't DROP COLUMN from partition. It is possible only CLEAR COLUMN in partition", ErrorCodes::BAD_ARGUMENTS); - - const Field & column_name = typeid_cast(*(command_ast->column)).name; - - out_partition_commands.emplace_back(PartitionCommand::clearColumn(command_ast->partition, column_name)); - } - else - { - if (command_ast->clear_column) - throw Exception("\"ALTER TABLE table CLEAR COLUMN column\" queries are not supported yet. Use \"CLEAR COLUMN column IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED); - - AlterCommand command; - command.type = AlterCommand::DROP_COLUMN; - command.column_name = typeid_cast(*(command_ast->column)).name; - - out_alter_commands.emplace_back(std::move(command)); - } - } - else if (command_ast->type == ASTAlterCommand::MODIFY_COLUMN) - { - AlterCommand command; - command.type = AlterCommand::MODIFY_COLUMN; - - const auto & ast_col_decl = typeid_cast(*command_ast->col_decl); - - command.column_name = ast_col_decl.name; - if (ast_col_decl.type) - { - command.data_type = data_type_factory.get(ast_col_decl.type); - } - - if (ast_col_decl.default_expression) - { - command.default_kind = columnDefaultKindFromString(ast_col_decl.default_specifier); - command.default_expression = ast_col_decl.default_expression; - } - - out_alter_commands.emplace_back(std::move(command)); - } - else if (command_ast->type == ASTAlterCommand::MODIFY_PRIMARY_KEY) - { - AlterCommand command; - command.type = AlterCommand::MODIFY_PRIMARY_KEY; - command.primary_key = command_ast->primary_key; - out_alter_commands.emplace_back(std::move(command)); - } - else if (command_ast->type == ASTAlterCommand::DROP_PARTITION) - { - out_partition_commands.emplace_back(PartitionCommand::dropPartition(command_ast->partition, command_ast->detach)); - } - else if (command_ast->type == ASTAlterCommand::ATTACH_PARTITION) - { - out_partition_commands.emplace_back(PartitionCommand::attachPartition(command_ast->partition, command_ast->part)); - } - else if (command_ast->type == ASTAlterCommand::REPLACE_PARTITION) - { - out_partition_commands.emplace_back( - PartitionCommand::replacePartition(command_ast->partition, command_ast->replace, command_ast->from_database, command_ast->from_table)); - } - else if (command_ast->type == ASTAlterCommand::FETCH_PARTITION) - { - out_partition_commands.emplace_back(PartitionCommand::fetchPartition(command_ast->partition, command_ast->from)); - } - else if (command_ast->type == ASTAlterCommand::FREEZE_PARTITION) - { - out_partition_commands.emplace_back(PartitionCommand::freezePartition(command_ast->partition, command_ast->with_name)); - } - else if (command_ast->type == ASTAlterCommand::DELETE) - { - out_mutation_commands.commands.emplace_back(MutationCommand::delete_(command_ast->predicate)); - } - else - throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR); - } -} - - -void InterpreterAlterQuery::PartitionCommands::validate(const IStorage & table) -{ - for (const PartitionCommand & command : *this) - { - if (command.type == PartitionCommand::CLEAR_COLUMN) - { - String column_name = command.column_name.safeGet(); - - if (!table.getColumns().hasPhysical(column_name)) - { - throw Exception("Wrong column name. Cannot find column " + column_name + " to clear it from partition", - DB::ErrorCodes::ILLEGAL_COLUMN); - } - } - } -} - - } diff --git a/dbms/src/Interpreters/InterpreterAlterQuery.h b/dbms/src/Interpreters/InterpreterAlterQuery.h index 31b89f74864..3e4453608ca 100644 --- a/dbms/src/Interpreters/InterpreterAlterQuery.h +++ b/dbms/src/Interpreters/InterpreterAlterQuery.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -22,111 +23,9 @@ public: BlockIO execute() override; private: - struct PartitionCommand - { - enum Type - { - DROP_PARTITION, - ATTACH_PARTITION, - REPLACE_PARTITION, - FETCH_PARTITION, - FREEZE_PARTITION, - CLEAR_COLUMN, - }; - - Type type; - - ASTPtr partition; - Field column_name; - - /// true for DETACH PARTITION. - bool detach = false; - - /// true for ATTACH PART (and false for PARTITION) - bool part = false; - - /// For ATTACH PARTITION partition FROM db.table - String from_database; - String from_table; - bool replace = true; - - /// For FETCH PARTITION - path in ZK to the shard, from which to download the partition. - String from_zookeeper_path; - - /// For FREEZE PARTITION - String with_name; - - static PartitionCommand dropPartition(const ASTPtr & partition, bool detach) - { - PartitionCommand res; - res.type = DROP_PARTITION; - res.partition = partition; - res.detach = detach; - return res; - } - - static PartitionCommand clearColumn(const ASTPtr & partition, const Field & column_name) - { - PartitionCommand res; - res.type = CLEAR_COLUMN; - res.partition = partition; - res.column_name = column_name; - return res; - } - - static PartitionCommand attachPartition(const ASTPtr & partition, bool part) - { - PartitionCommand res; - res.type = ATTACH_PARTITION; - res.partition = partition; - res.part = part; - return res; - } - - static PartitionCommand replacePartition(const ASTPtr & partition, bool replace, const String & from_database, const String & from_table) - { - PartitionCommand res; - res.type = REPLACE_PARTITION; - res.partition = partition; - res.replace = replace; - res.from_database = from_database; - res.from_table = from_table; - return res; - } - - static PartitionCommand fetchPartition(const ASTPtr & partition, const String & from) - { - PartitionCommand res; - res.type = FETCH_PARTITION; - res.partition = partition; - res.from_zookeeper_path = from; - return res; - } - - static PartitionCommand freezePartition(const ASTPtr & partition, const String & with_name) - { - PartitionCommand res; - res.type = FREEZE_PARTITION; - res.partition = partition; - res.with_name = with_name; - return res; - } - }; - - class PartitionCommands : public std::vector - { - public: - void validate(const IStorage & table); - }; - ASTPtr query_ptr; const Context & context; - - static void parseAlter(const std::vector & commands, - AlterCommands & out_alter_commands, - PartitionCommands & out_partition_commands, - MutationCommands & out_mutation_commands); }; } diff --git a/dbms/src/Storages/AlterCommands.cpp b/dbms/src/Storages/AlterCommands.cpp index cea0a6b68eb..9e6d525f685 100644 --- a/dbms/src/Storages/AlterCommands.cpp +++ b/dbms/src/Storages/AlterCommands.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -9,6 +10,9 @@ #include #include #include +#include +#include +#include namespace DB @@ -21,6 +25,83 @@ namespace ErrorCodes } +std::optional AlterCommand::parse(const ASTAlterCommand * command_ast) +{ + const DataTypeFactory & data_type_factory = DataTypeFactory::instance(); + + if (command_ast->type == ASTAlterCommand::ADD_COLUMN) + { + AlterCommand command; + command.type = AlterCommand::ADD_COLUMN; + + const auto & ast_col_decl = typeid_cast(*command_ast->col_decl); + + command.column_name = ast_col_decl.name; + if (ast_col_decl.type) + { + command.data_type = data_type_factory.get(ast_col_decl.type); + } + if (ast_col_decl.default_expression) + { + command.default_kind = columnDefaultKindFromString(ast_col_decl.default_specifier); + command.default_expression = ast_col_decl.default_expression; + } + + if (command_ast->column) + command.after_column = typeid_cast(*command_ast->column).name; + + return command; + } + else if (command_ast->type == ASTAlterCommand::DROP_COLUMN && !command_ast->partition) + { + if (command_ast->clear_column) + throw Exception("\"ALTER TABLE table CLEAR COLUMN column\" queries are not supported yet. Use \"CLEAR COLUMN column IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED); + + AlterCommand command; + command.type = AlterCommand::DROP_COLUMN; + command.column_name = typeid_cast(*(command_ast->column)).name; + return command; + } + else if (command_ast->type == ASTAlterCommand::MODIFY_COLUMN) + { + AlterCommand command; + command.type = AlterCommand::MODIFY_COLUMN; + + const auto & ast_col_decl = typeid_cast(*command_ast->col_decl); + + command.column_name = ast_col_decl.name; + if (ast_col_decl.type) + { + command.data_type = data_type_factory.get(ast_col_decl.type); + } + + if (ast_col_decl.default_expression) + { + command.default_kind = columnDefaultKindFromString(ast_col_decl.default_specifier); + command.default_expression = ast_col_decl.default_expression; + } + + return command; + } + else if (command_ast->type == ASTAlterCommand::MODIFY_PRIMARY_KEY) + { + AlterCommand command; + command.type = AlterCommand::MODIFY_PRIMARY_KEY; + command.primary_key = command_ast->primary_key; + return command; + } + else + return {}; +} + + +/// the names are the same if they match the whole name or name_without_dot matches the part of the name up to the dot +static bool namesEqual(const String & name_without_dot, const DB::NameAndTypePair & name_type) +{ + String name_with_dot = name_without_dot + "."; + return (name_with_dot == name_type.name.substr(0, name_without_dot.length() + 1) || name_without_dot == name_type.name); +} + void AlterCommand::apply(ColumnsDescription & columns_description) const { if (type == ADD_COLUMN) @@ -187,7 +268,7 @@ void AlterCommands::validate(const IStorage & table, const Context & context) { const auto & column_name = command.column_name; const auto column_it = std::find_if(std::begin(all_columns), std::end(all_columns), - std::bind(AlterCommand::namesEqual, std::cref(command.column_name), std::placeholders::_1)); + std::bind(namesEqual, std::cref(command.column_name), std::placeholders::_1)); if (command.type == AlterCommand::ADD_COLUMN) { @@ -251,7 +332,7 @@ void AlterCommands::validate(const IStorage & table, const Context & context) auto found = false; for (auto it = std::begin(all_columns); it != std::end(all_columns);) { - if (AlterCommand::namesEqual(command.column_name, *it)) + if (namesEqual(command.column_name, *it)) { found = true; it = all_columns.erase(it); @@ -262,7 +343,7 @@ void AlterCommands::validate(const IStorage & table, const Context & context) for (auto it = std::begin(defaults); it != std::end(defaults);) { - if (AlterCommand::namesEqual(command.column_name, { it->first, nullptr })) + if (namesEqual(command.column_name, { it->first, nullptr })) it = defaults.erase(it); else ++it; @@ -280,7 +361,7 @@ void AlterCommands::validate(const IStorage & table, const Context & context) { const auto & column_name = col_def.first; const auto column_it = std::find_if(all_columns.begin(), all_columns.end(), [&] (const NameAndTypePair & name_type) - { return AlterCommand::namesEqual(column_name, name_type); }); + { return namesEqual(column_name, name_type); }); const auto tmp_column_name = column_name + "_tmp"; const auto & column_type_ptr = column_it->type; diff --git a/dbms/src/Storages/AlterCommands.h b/dbms/src/Storages/AlterCommands.h index 796f48eea1a..fca8a68f70d 100644 --- a/dbms/src/Storages/AlterCommands.h +++ b/dbms/src/Storages/AlterCommands.h @@ -6,6 +6,8 @@ namespace DB { +class ASTAlterCommand; + /// Operation from the ALTER query (except for manipulation with PART/PARTITION). Adding Nested columns is not expanded to add individual columns. struct AlterCommand { @@ -36,15 +38,6 @@ struct AlterCommand /// For MODIFY_PRIMARY_KEY ASTPtr primary_key; - /// the names are the same if they match the whole name or name_without_dot matches the part of the name up to the dot - static bool namesEqual(const String & name_without_dot, const DB::NameAndTypePair & name_type) - { - String name_with_dot = name_without_dot + "."; - return (name_with_dot == name_type.name.substr(0, name_without_dot.length() + 1) || name_without_dot == name_type.name); - } - - void apply(ColumnsDescription & columns_description) const; - AlterCommand() = default; AlterCommand(const Type type, const String & column_name, const DataTypePtr & data_type, const ColumnDefaultKind default_kind, const ASTPtr & default_expression, @@ -52,6 +45,11 @@ struct AlterCommand : type{type}, column_name{column_name}, data_type{data_type}, default_kind{default_kind}, default_expression{default_expression}, after_column{after_column} {} + + static std::optional parse(const ASTAlterCommand * command); + + void apply(ColumnsDescription & columns_description) const; + }; class IStorage; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.cpp index a0bc59a0e69..739a9304b23 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.cpp @@ -1,12 +1,21 @@ #include +#include +#include +#include #include #include #include +#include namespace DB { +namespace ErrorCodes +{ + extern const int UNKNOWN_MUTATION_COMMAND; +} + void ReplicatedMergeTreeMutationEntry::writeText(WriteBuffer & out) const { out << "format version: 1\n" @@ -21,8 +30,9 @@ void ReplicatedMergeTreeMutationEntry::writeText(WriteBuffer & out) const out << partition_id << "\t" << number << "\n"; } - out << "mutation commands:\n"; - commands.writeText(out); + std::stringstream commands_ss; + formatAST(*commands.ast(), commands_ss, /* hilite = */ false, /* one_line = */ true); + out << "commands: " << escape << commands_ss.str(); } void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in) @@ -45,8 +55,20 @@ void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in) block_numbers[partition_id] = number; } - in >> "mutation commands:\n"; - commands.readText(in); + String commands_str; + in >> "commands: " >> escape >> commands_str; + + ParserAlterCommandList p_alter_commands; + auto commands_ast = parseQuery( + p_alter_commands, commands_str.data(), commands_str.data() + commands_str.length(), "mutation commands list", 0); + for (ASTAlterCommand * command_ast : typeid_cast(*commands_ast).commands) + { + auto command = MutationCommand::parse(command_ast); + if (!command) + throw Exception("Unknown mutation command type: " + DB::toString(command_ast->type), ErrorCodes::UNKNOWN_MUTATION_COMMAND); + commands.push_back(std::move(*command)); + } + } String ReplicatedMergeTreeMutationEntry::toString() const diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index b2dccae82bc..06a01933e9d 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -951,11 +951,11 @@ MutationCommands ReplicatedMergeTreeQueue::getMutationCommands( else ++end; - std::vector commands; + MutationCommands commands; for (auto it = begin; it != end; ++it) - commands.insert(commands.end(), it->second->commands.commands.begin(), it->second->commands.commands.end()); + commands.insert(commands.end(), it->second->commands.begin(), it->second->commands.end()); - return MutationCommands{commands}; + return commands; } void ReplicatedMergeTreeQueue::disableMergesInRange(const String & part_name) diff --git a/dbms/src/Storages/MutationCommands.cpp b/dbms/src/Storages/MutationCommands.cpp index 0119be9132c..ff36f6ed27e 100644 --- a/dbms/src/Storages/MutationCommands.cpp +++ b/dbms/src/Storages/MutationCommands.cpp @@ -11,64 +11,34 @@ namespace DB { -namespace ErrorCodes +std::optional MutationCommand::parse(ASTAlterCommand * command) { - extern const int UNKNOWN_MUTATION_COMMAND; -} - -static String typeToString(MutationCommand::Type type) -{ - switch (type) + if (command->type == ASTAlterCommand::DELETE) { - case MutationCommand::DELETE: return "DELETE"; - default: - throw Exception("Bad mutation type: " + toString(type), ErrorCodes::LOGICAL_ERROR); - } -} - -void MutationCommand::writeText(WriteBuffer & out) const -{ - out << typeToString(type) << "\n"; - - switch (type) - { - case MutationCommand::DELETE: - { - std::stringstream ss; - formatAST(*predicate, ss, /* hilite = */ false, /* one_line = */ true); - out << "predicate: " << escape << ss.str() << "\n"; - break; - } - default: - throw Exception("Bad mutation type: " + toString(type), ErrorCodes::LOGICAL_ERROR); - } -} - -void MutationCommand::readText(ReadBuffer & in) -{ - String type_str; - in >> type_str >> "\n"; - - if (type_str == "DELETE") - { - type = DELETE; - - String predicate_str; - in >> "predicate: " >> escape >> predicate_str >> "\n"; - ParserExpressionWithOptionalAlias p_expr(false); - predicate = parseQuery( - p_expr, predicate_str.data(), predicate_str.data() + predicate_str.length(), "mutation predicate", 0); + MutationCommand res; + res.ast = command->ptr(); + res.type = DELETE; + res.predicate = command->predicate; + return res; } else - throw Exception("Unknown mutation command: `" + type_str + "'", ErrorCodes::UNKNOWN_MUTATION_COMMAND); + return {}; } -void MutationCommands::validate(const IStorage & table, const Context & context) +std::shared_ptr MutationCommands::ast() const +{ + auto res = std::make_shared(); + for (const MutationCommand & command : *this) + res->add(command.ast->clone()); + return res; +} + +void MutationCommands::validate(const IStorage & table, const Context & context) const { auto all_columns = table.getColumns().getAll(); - for (const MutationCommand & command : commands) + for (const MutationCommand & command : *this) { switch (command.type) { @@ -86,29 +56,4 @@ void MutationCommands::validate(const IStorage & table, const Context & context) } } -void MutationCommands::writeText(WriteBuffer & out) const -{ - out << "format version: 1\n" - << "count: " << commands.size() << "\n"; - for (const MutationCommand & command : commands) - { - command.writeText(out); - } -} - -void MutationCommands::readText(ReadBuffer & in) -{ - in >> "format version: 1\n"; - - size_t count; - in >> "count: " >> count >> "\n"; - - for (size_t i = 0; i < count; ++i) - { - MutationCommand command; - command.readText(in); - commands.push_back(std::move(command)); - } -} - } diff --git a/dbms/src/Storages/MutationCommands.h b/dbms/src/Storages/MutationCommands.h index a7a2c24ef8f..0a382d2f80c 100644 --- a/dbms/src/Storages/MutationCommands.h +++ b/dbms/src/Storages/MutationCommands.h @@ -1,7 +1,7 @@ #pragma once -#include -#include +#include +#include namespace DB @@ -12,6 +12,8 @@ class Context; struct MutationCommand { + ASTPtr ast; /// The AST of the whole command + enum Type { EMPTY, /// Not used. @@ -22,26 +24,15 @@ struct MutationCommand ASTPtr predicate; - static MutationCommand delete_(const ASTPtr & predicate) - { - MutationCommand res; - res.type = DELETE; - res.predicate = predicate; - return res; - } - - void writeText(WriteBuffer & out) const; - void readText(ReadBuffer & in); + static std::optional parse(ASTAlterCommand * command); }; -struct MutationCommands +class MutationCommands : public std::vector { - std::vector commands; +public: + std::shared_ptr ast() const; - void validate(const IStorage & table, const Context & context); - - void writeText(WriteBuffer & out) const; - void readText(ReadBuffer & in); + void validate(const IStorage & table, const Context & context) const; }; } diff --git a/dbms/src/Storages/PartitionCommands.cpp b/dbms/src/Storages/PartitionCommands.cpp new file mode 100644 index 00000000000..e7daabb246c --- /dev/null +++ b/dbms/src/Storages/PartitionCommands.cpp @@ -0,0 +1,94 @@ +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int ILLEGAL_COLUMN; +} + +std::optional PartitionCommand::parse(const ASTAlterCommand * command_ast) +{ + if (command_ast->type == ASTAlterCommand::DROP_PARTITION) + { + PartitionCommand res; + res.type = DROP_PARTITION; + res.partition = command_ast->partition; + res.detach = command_ast->detach; + return res; + } + else if (command_ast->type == ASTAlterCommand::ATTACH_PARTITION) + { + PartitionCommand res; + res.type = ATTACH_PARTITION; + res.partition = command_ast->partition; + res.part = command_ast->part; + return res; + } + else if (command_ast->type == ASTAlterCommand::REPLACE_PARTITION) + { + PartitionCommand res; + res.type = REPLACE_PARTITION; + res.partition = command_ast->partition; + res.replace = command_ast->replace; + res.from_database = command_ast->from_database; + res.from_table = command_ast->from_table; + return res; + } + else if (command_ast->type == ASTAlterCommand::FETCH_PARTITION) + { + PartitionCommand res; + res.type = FETCH_PARTITION; + res.partition = command_ast->partition; + res.from_zookeeper_path = command_ast->from; + return res; + } + else if (command_ast->type == ASTAlterCommand::FREEZE_PARTITION) + { + PartitionCommand res; + res.type = FREEZE_PARTITION; + res.partition = command_ast->partition; + res.with_name = command_ast->with_name; + return res; + } + else if (command_ast->type == ASTAlterCommand::DROP_COLUMN && command_ast->partition) + { + if (!command_ast->clear_column) + throw Exception("Can't DROP COLUMN from partition. It is possible only CLEAR COLUMN in partition", ErrorCodes::BAD_ARGUMENTS); + + PartitionCommand res; + res.type = CLEAR_COLUMN; + res.partition = command_ast->partition; + const Field & column_name = typeid_cast(*(command_ast->column)).name; + res.column_name = column_name; + return res; + } + else + return {}; +} + +void PartitionCommands::validate(const IStorage & table) +{ + for (const PartitionCommand & command : *this) + { + if (command.type == PartitionCommand::CLEAR_COLUMN) + { + String column_name = command.column_name.safeGet(); + + if (!table.getColumns().hasPhysical(column_name)) + { + throw Exception("Wrong column name. Cannot find column " + column_name + " to clear it from partition", + DB::ErrorCodes::ILLEGAL_COLUMN); + } + } + } +} + +} diff --git a/dbms/src/Storages/PartitionCommands.h b/dbms/src/Storages/PartitionCommands.h new file mode 100644 index 00000000000..6fa127de899 --- /dev/null +++ b/dbms/src/Storages/PartitionCommands.h @@ -0,0 +1,59 @@ +#pragma once + +#include +#include +#include +#include + + +namespace DB +{ + +class IStorage; +class ASTAlterCommand; + +struct PartitionCommand +{ + enum Type + { + DROP_PARTITION, + ATTACH_PARTITION, + REPLACE_PARTITION, + FETCH_PARTITION, + FREEZE_PARTITION, + CLEAR_COLUMN, + }; + + Type type; + + ASTPtr partition; + Field column_name; + + /// true for DETACH PARTITION. + bool detach = false; + + /// true for ATTACH PART (and false for PARTITION) + bool part = false; + + /// For ATTACH PARTITION partition FROM db.table + String from_database; + String from_table; + bool replace = true; + + /// For FETCH PARTITION - path in ZK to the shard, from which to download the partition. + String from_zookeeper_path; + + /// For FREEZE PARTITION + String with_name; + + static std::optional parse(const ASTAlterCommand * command); +}; + +class PartitionCommands : public std::vector +{ +public: + void validate(const IStorage & table); +}; + + +} diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index aaff1e3f97c..702cc81a7ec 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1382,7 +1382,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM try { - new_part = merger_mutator.mutatePartToTemporaryPart(future_mutated_part, commands.commands, context); + new_part = merger_mutator.mutatePartToTemporaryPart(future_mutated_part, commands, context); data.renameTempPartAndReplace(new_part, nullptr, &transaction); try From 2c61a5940cf5d0383507991e46ce3f00ddb7d41f Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 7 Jun 2018 14:00:43 +0300 Subject: [PATCH 3/7] store mutation entries in shared_ptr --- .../ReplicatedMergeTreeMutationEntry.h | 2 ++ .../MergeTree/ReplicatedMergeTreeQueue.cpp | 18 ++++++++---------- .../MergeTree/ReplicatedMergeTreeQueue.h | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h index e92230d3cc6..737270cb024 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h @@ -29,4 +29,6 @@ struct ReplicatedMergeTreeMutationEntry MutationCommands commands; }; +using ReplicatedMergeTreeMutationEntryPtr = std::shared_ptr; + } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 06a01933e9d..b2349ea83a6 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -445,7 +445,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, z for (auto it = mutations_by_znode.begin(); it != mutations_by_znode.end(); ) { - const ReplicatedMergeTreeMutationEntry & entry = it->second; + const ReplicatedMergeTreeMutationEntry & entry = *it->second; if (!entries_in_zk_set.count(entry.znode_name)) { LOG_DEBUG(log, "Removing obsolete mutation " + entry.znode_name + " from local state."); @@ -478,25 +478,23 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, z for (const String & entry : entries_to_load) futures.emplace_back(zookeeper->asyncGet(zookeeper_path + "/mutations/" + entry)); - std::vector new_mutations; + std::vector new_mutations; for (size_t i = 0; i < entries_to_load.size(); ++i) { - new_mutations.push_back( - ReplicatedMergeTreeMutationEntry::parse(futures[i].get().data, entries_to_load[i])); + new_mutations.push_back(std::make_shared( + ReplicatedMergeTreeMutationEntry::parse(futures[i].get().data, entries_to_load[i]))); } { std::lock_guard lock(target_state_mutex); - for (ReplicatedMergeTreeMutationEntry & entry : new_mutations) + for (const ReplicatedMergeTreeMutationEntryPtr & entry : new_mutations) { - String znode = entry.znode_name; - const ReplicatedMergeTreeMutationEntry & inserted_entry = - mutations_by_znode.emplace(znode, std::move(entry)).first->second; + mutations_by_znode.emplace(entry->znode_name, entry); - for (const auto & partition_and_block_num : inserted_entry.block_numbers) + for (const auto & partition_and_block_num : entry->block_numbers) mutations_by_partition[partition_and_block_num.first].emplace( - partition_and_block_num.second, &inserted_entry); + partition_and_block_num.second, entry); } } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 13ed28f2b35..73bd6cc8feb 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -91,8 +91,8 @@ private: /// mutations_by_partition is an index partition ID -> block ID -> mutation into this list. /// Note that mutations are updated in such a way that they are always more recent than /// log_pointer (see pullLogsToQueue()). - std::map mutations_by_znode; - std::unordered_map> mutations_by_partition; + std::map mutations_by_znode; + std::unordered_map> mutations_by_partition; /// Provides only one simultaneous call to pullLogsToQueue. From 4ee581117b59acb4ad011e1cf1ac30b211974b71 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 7 Jun 2018 16:28:39 +0300 Subject: [PATCH 4/7] system.mutations table skeleton [#CLICKHOUSE-3747] --- .../MergeTree/MergeTreeMutationStatus.h | 18 +++ .../ReplicatedMergeTreeMutationEntry.h | 2 +- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 28 ++++ .../MergeTree/ReplicatedMergeTreeQueue.h | 3 + .../Storages/StorageReplicatedMergeTree.cpp | 5 + .../src/Storages/StorageReplicatedMergeTree.h | 2 + .../System/StorageSystemMutations.cpp | 123 ++++++++++++++++++ .../Storages/System/StorageSystemMutations.h | 36 +++++ .../Storages/System/attachSystemTables.cpp | 2 + 9 files changed, 218 insertions(+), 1 deletion(-) create mode 100644 dbms/src/Storages/MergeTree/MergeTreeMutationStatus.h create mode 100644 dbms/src/Storages/System/StorageSystemMutations.cpp create mode 100644 dbms/src/Storages/System/StorageSystemMutations.h diff --git a/dbms/src/Storages/MergeTree/MergeTreeMutationStatus.h b/dbms/src/Storages/MergeTree/MergeTreeMutationStatus.h new file mode 100644 index 00000000000..6df3bf58d20 --- /dev/null +++ b/dbms/src/Storages/MergeTree/MergeTreeMutationStatus.h @@ -0,0 +1,18 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +struct MergeTreeMutationStatus +{ + String id; + String command; + time_t create_time = 0; + std::map block_numbers; +}; + +} diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h index 737270cb024..68aab6fa021 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h @@ -25,7 +25,7 @@ struct ReplicatedMergeTreeMutationEntry time_t create_time = 0; String source_replica; - std::unordered_map block_numbers; + std::map block_numbers; MutationCommands commands; }; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index b2349ea83a6..1c468f859c1 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1043,6 +1043,33 @@ void ReplicatedMergeTreeQueue::getInsertTimes(time_t & out_min_unprocessed_inser } +std::vector ReplicatedMergeTreeQueue::getMutationsStatus() const +{ + std::lock_guard lock(target_state_mutex); + + std::vector result; + for (const auto & pair : mutations_by_znode) + { + const ReplicatedMergeTreeMutationEntry & entry = *pair.second; + + for (const MutationCommand & command : entry.commands) + { + std::stringstream ss; + formatAST(*command.ast, ss, false, true); + result.push_back(MergeTreeMutationStatus + { + entry.znode_name, + ss.str(), + entry.create_time, + entry.block_numbers, + }); + } + } + + return result; +} + + ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate( ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper) : queue(queue_) @@ -1346,4 +1373,5 @@ String padIndex(Int64 index) String index_str = toString(index); return std::string(10 - index_str.size(), '0') + index_str; } + } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 73bd6cc8feb..5a6a99e7da3 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -304,6 +305,8 @@ public: /// Get information about the insertion times. void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const; + + std::vector getMutationsStatus() const; }; class ReplicatedMergeTreeMergePredicate diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 702cc81a7ec..837da9625e8 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -4050,6 +4050,11 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const } } +std::vector StorageReplicatedMergeTree::getMutationsStatus() const +{ + return queue.getMutationsStatus(); +} + void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK() { diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index bb9d0056a50..b8b670fbdd2 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -125,6 +125,8 @@ public: void mutate(const MutationCommands & commands, const Context & context) override; + std::vector getMutationsStatus() const; + /** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper. */ void drop() override; diff --git a/dbms/src/Storages/System/StorageSystemMutations.cpp b/dbms/src/Storages/System/StorageSystemMutations.cpp new file mode 100644 index 00000000000..90fce84d709 --- /dev/null +++ b/dbms/src/Storages/System/StorageSystemMutations.cpp @@ -0,0 +1,123 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +StorageSystemMutations::StorageSystemMutations(const std::string & name_) + : name(name_) +{ + setColumns(ColumnsDescription({ + { "database", std::make_shared() }, + { "table", std::make_shared() }, + { "mutation_id", std::make_shared() }, + { "command", std::make_shared() }, + { "create_time", std::make_shared() }, + { "block_numbers.partition_id", std::make_shared( + std::make_shared()) }, + { "block_numbers.number", std::make_shared( + std::make_shared()) }, + })); +} + + +BlockInputStreams StorageSystemMutations::read( + const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & processed_stage, + const size_t /*max_block_size*/, + const unsigned /*num_streams*/) +{ + check(column_names); + processed_stage = QueryProcessingStage::FetchColumns; + + /// Collect a set of replicated tables. + std::map> replicated_tables; + for (const auto & db : context.getDatabases()) + for (auto iterator = db.second->getIterator(context); iterator->isValid(); iterator->next()) + if (dynamic_cast(iterator->table().get())) + replicated_tables[db.first][iterator->name()] = iterator->table(); + + MutableColumnPtr col_database_mut = ColumnString::create(); + MutableColumnPtr col_table_mut = ColumnString::create(); + + for (auto & db : replicated_tables) + { + for (auto & table : db.second) + { + col_database_mut->insert(db.first); + col_table_mut->insert(table.first); + } + } + + ColumnPtr col_database = std::move(col_database_mut); + ColumnPtr col_table = std::move(col_table_mut); + + /// Determine what tables are needed by the conditions in the query. + { + Block filtered_block + { + { col_database, std::make_shared(), "database" }, + { col_table, std::make_shared(), "table" }, + }; + + VirtualColumnUtils::filterBlockWithQuery(query_info.query, filtered_block, context); + + if (!filtered_block.rows()) + return BlockInputStreams(); + + col_database = filtered_block.getByName("database").column; + col_table = filtered_block.getByName("table").column; + } + + MutableColumns res_columns = getSampleBlock().cloneEmptyColumns(); + for (size_t i_storage = 0; i_storage < col_database->size(); ++i_storage) + { + auto database = (*col_database)[i_storage].safeGet(); + auto table = (*col_table)[i_storage].safeGet(); + + std::vector states = + dynamic_cast(*replicated_tables[database][table]) + .getMutationsStatus(); + + for (const MergeTreeMutationStatus & status : states) + { + Array block_partition_ids; + block_partition_ids.reserve(status.block_numbers.size()); + Array block_numbers; + block_numbers.reserve(status.block_numbers.size()); + for (const auto & pair : status.block_numbers) + { + block_partition_ids.emplace_back(pair.first); + block_numbers.emplace_back(pair.second); + } + + size_t col_num = 0; + res_columns[col_num++]->insert(database); + res_columns[col_num++]->insert(table); + + res_columns[col_num++]->insert(status.id); + res_columns[col_num++]->insert(status.command); + res_columns[col_num++]->insert(UInt64(status.create_time)); + res_columns[col_num++]->insert(block_partition_ids); + res_columns[col_num++]->insert(block_numbers); + } + } + + Block res = getSampleBlock().cloneEmpty(); + for (size_t i_col = 0; i_col < res.columns(); ++i_col) + res.getByPosition(i_col).column = std::move(res_columns[i_col]); + + return BlockInputStreams(1, std::make_shared(res)); +} + +} diff --git a/dbms/src/Storages/System/StorageSystemMutations.h b/dbms/src/Storages/System/StorageSystemMutations.h new file mode 100644 index 00000000000..3b82f3f46be --- /dev/null +++ b/dbms/src/Storages/System/StorageSystemMutations.h @@ -0,0 +1,36 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +class Context; + + +/// Implements the `mutations` system table, which provides information about the status of mutations +/// in the MergeTree tables. +class StorageSystemMutations : public ext::shared_ptr_helper, public IStorage +{ +public: + String getName() const override { return "SystemMutations"; } + String getTableName() const override { return name; } + + BlockInputStreams read( + const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size, + unsigned num_streams) override; + +private: + const String name; + +protected: + StorageSystemMutations(const String & name_); +}; + +} diff --git a/dbms/src/Storages/System/attachSystemTables.cpp b/dbms/src/Storages/System/attachSystemTables.cpp index 93d4809b3c7..705d01fb9c2 100644 --- a/dbms/src/Storages/System/attachSystemTables.cpp +++ b/dbms/src/Storages/System/attachSystemTables.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -51,6 +52,7 @@ void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper) system_database.attachTable("processes", StorageSystemProcesses::create("processes")); system_database.attachTable("metrics", StorageSystemMetrics::create("metrics")); system_database.attachTable("merges", StorageSystemMerges::create("merges")); + system_database.attachTable("mutations", StorageSystemMutations::create("mutations")); system_database.attachTable("replicas", StorageSystemReplicas::create("replicas")); system_database.attachTable("replication_queue", StorageSystemReplicationQueue::create("replication_queue")); system_database.attachTable("dictionaries", StorageSystemDictionaries::create("dictionaries")); From b60a2a90fab820a7bcbec6f5dc85df4b3a77b041 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 6 Jun 2018 22:15:10 +0300 Subject: [PATCH 5/7] rename method for clarity --- dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp | 4 ++-- dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 1c468f859c1..50f2b9c3ffa 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -195,7 +195,7 @@ void ReplicatedMergeTreeQueue::updateTimesInZooKeeper( } -void ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry) +void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry) { auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name); @@ -891,7 +891,7 @@ bool ReplicatedMergeTreeQueue::processEntry( try { if (func(entry)) - remove(get_zookeeper(), entry); + removeProcessedEntry(get_zookeeper(), entry); } catch (...) { diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 5a6a99e7da3..cacdab7c288 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -139,7 +139,7 @@ private: std::lock_guard & target_state_lock, std::lock_guard & queue_lock); - void remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry); + void removeProcessedEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry); /** Can I now try this action. If not, you need to leave it in the queue and try another one. * Called under the queue_mutex. From a3bf3e6d20030d002415827881a0a17bf3056dc0 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 13 Jun 2018 18:47:40 +0300 Subject: [PATCH 6/7] add missing columns to system.parts [#CLICKHOUSE-3747] --- dbms/src/Storages/System/StorageSystemParts.cpp | 4 ++++ dbms/src/Storages/System/StorageSystemPartsColumns.cpp | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/dbms/src/Storages/System/StorageSystemParts.cpp b/dbms/src/Storages/System/StorageSystemParts.cpp index b2f3d2208c7..a5fe5e5285d 100644 --- a/dbms/src/Storages/System/StorageSystemParts.cpp +++ b/dbms/src/Storages/System/StorageSystemParts.cpp @@ -31,9 +31,11 @@ StorageSystemParts::StorageSystemParts(const std::string & name) {"refcount", std::make_shared()}, {"min_date", std::make_shared()}, {"max_date", std::make_shared()}, + {"partition_id", std::make_shared()}, {"min_block_number", std::make_shared()}, {"max_block_number", std::make_shared()}, {"level", std::make_shared()}, + {"data_version", std::make_shared()}, {"primary_key_bytes_in_memory", std::make_shared()}, {"primary_key_bytes_in_memory_allocated", std::make_shared()}, @@ -80,9 +82,11 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns, const Stor columns[i++]->insert(static_cast(part->getMinDate())); columns[i++]->insert(static_cast(part->getMaxDate())); + columns[i++]->insert(part->info.partition_id); columns[i++]->insert(part->info.min_block); columns[i++]->insert(part->info.max_block); columns[i++]->insert(static_cast(part->info.level)); + columns[i++]->insert(static_cast(part->info.getDataVersion())); columns[i++]->insert(static_cast(part->getIndexSizeInBytes())); columns[i++]->insert(static_cast(part->getIndexSizeInAllocatedBytes())); diff --git a/dbms/src/Storages/System/StorageSystemPartsColumns.cpp b/dbms/src/Storages/System/StorageSystemPartsColumns.cpp index 921b32b6c0a..964c256b492 100644 --- a/dbms/src/Storages/System/StorageSystemPartsColumns.cpp +++ b/dbms/src/Storages/System/StorageSystemPartsColumns.cpp @@ -32,9 +32,11 @@ StorageSystemPartsColumns::StorageSystemPartsColumns(const std::string & name) {"refcount", std::make_shared()}, {"min_date", std::make_shared()}, {"max_date", std::make_shared()}, + {"partition_id", std::make_shared()}, {"min_block_number", std::make_shared()}, {"max_block_number", std::make_shared()}, {"level", std::make_shared()}, + {"data_version", std::make_shared()}, {"primary_key_bytes_in_memory", std::make_shared()}, {"primary_key_bytes_in_memory_allocated", std::make_shared()}, @@ -123,9 +125,11 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns, con columns[j++]->insert(static_cast(min_date)); columns[j++]->insert(static_cast(max_date)); + columns[j++]->insert(part->info.partition_id); columns[j++]->insert(part->info.min_block); columns[j++]->insert(part->info.max_block); columns[j++]->insert(static_cast(part->info.level)); + columns[j++]->insert(static_cast(part->info.getDataVersion())); columns[j++]->insert(static_cast(index_size_in_bytes)); columns[j++]->insert(static_cast(index_size_in_allocated_bytes)); From c1910fef377e01114e94f5b3e3c68e262fa0c40a Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Wed, 13 Jun 2018 23:00:10 +0300 Subject: [PATCH 7/7] Update ASTAlterQuery.h --- dbms/src/Parsers/ASTAlterQuery.h | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dbms/src/Parsers/ASTAlterQuery.h b/dbms/src/Parsers/ASTAlterQuery.h index a97503305f5..683d0780876 100644 --- a/dbms/src/Parsers/ASTAlterQuery.h +++ b/dbms/src/Parsers/ASTAlterQuery.h @@ -54,7 +54,7 @@ public: */ ASTPtr primary_key; - /** Used in DROP PARTITION, RESHARD PARTITION and ATTACH PARTITION FROM queries. + /** Used in DROP PARTITION and ATTACH PARTITION FROM queries. * The value or ID of the partition is stored here. */ ASTPtr partition; @@ -66,8 +66,6 @@ public: bool part = false; /// true for ATTACH PART - bool do_copy = false; /// for RESHARD PARTITION - bool clear_column = false; /// for CLEAR COLUMN (do not drop column from metadata) /** For FETCH PARTITION - the path in ZK to the shard, from which to download the partition.