diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index f151f59f7f1..4e724c995c4 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -393,7 +393,9 @@ namespace ErrorCodes extern const int REPLICA_STATUS_CHANGED = 416; extern const int EXPECTED_ALL_OR_ANY = 417; extern const int UNKNOWN_JOIN_STRICTNESS = 418; - extern const int CANNOT_ADD_DIFFERENT_AGGREGATE_STATES = 419; + extern const int MULTIPLE_ASSIGNMENTS_TO_COLUMN = 419; + extern const int CANNOT_UPDATE_COLUMN = 420; + extern const int CANNOT_ADD_DIFFERENT_AGGREGATE_STATES = 421; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Common/createHardLink.cpp b/dbms/src/Common/createHardLink.cpp new file mode 100644 index 00000000000..06647cd437c --- /dev/null +++ b/dbms/src/Common/createHardLink.cpp @@ -0,0 +1,36 @@ +#include +#include +#include +#include +#include + + +namespace DB +{ + +void createHardLink(const String & source_path, const String & destination_path) +{ + if (0 != link(source_path.c_str(), destination_path.c_str())) + { + if (errno == EEXIST) + { + auto link_errno = errno; + + struct stat source_descr; + struct stat destination_descr; + + if (0 != lstat(source_path.c_str(), &source_descr)) + throwFromErrno("Cannot stat " + source_path); + + if (0 != lstat(destination_path.c_str(), &destination_descr)) + throwFromErrno("Cannot stat " + destination_path); + + if (source_descr.st_ino != destination_descr.st_ino) + throwFromErrno("Destination file " + destination_path + " is already exist and have different inode.", 0, link_errno); + } + else + throwFromErrno("Cannot link " + source_path + " to " + destination_path); + } +} + +} diff --git a/dbms/src/Common/createHardLink.h b/dbms/src/Common/createHardLink.h new file mode 100644 index 00000000000..8f8e5c27d9f --- /dev/null +++ b/dbms/src/Common/createHardLink.h @@ -0,0 +1,12 @@ +#pragma once + +#include + +namespace DB +{ + +/// Create a hard link `destination_path` pointing to `source_path`. +/// If the destination already exists, check that it has the same inode (and throw if they are different). +void createHardLink(const String & source_path, const String & destination_path); + +} diff --git a/dbms/src/Common/localBackup.cpp b/dbms/src/Common/localBackup.cpp index aec7da1f65d..d7a0786e320 100644 --- a/dbms/src/Common/localBackup.cpp +++ b/dbms/src/Common/localBackup.cpp @@ -1,22 +1,21 @@ -#include "localBackup.h" -#include -#include -#include +#include +#include +#include #include #include -#include -#include +#include +#include #include namespace DB { + namespace ErrorCodes { extern const int TOO_DEEP_RECURSION; extern const int DIRECTORY_ALREADY_EXISTS; } -} static void localBackupImpl(const Poco::Path & source_path, const Poco::Path & destination_path, size_t level, @@ -41,33 +40,7 @@ static void localBackupImpl(const Poco::Path & source_path, const Poco::Path & d { dir_it->setReadOnly(); - std::string source_str = source.toString(); - std::string destination_str = destination.toString(); - - /** We are trying to create a hard link. - * If it already exists, we check that source and destination point to the same inode. - */ - if (0 != link(source_str.c_str(), destination_str.c_str())) - { - if (errno == EEXIST) - { - auto link_errno = errno; - - struct stat source_descr; - struct stat destination_descr; - - if (0 != lstat(source_str.c_str(), &source_descr)) - DB::throwFromErrno("Cannot stat " + source_str); - - if (0 != lstat(destination_str.c_str(), &destination_descr)) - DB::throwFromErrno("Cannot stat " + destination_str); - - if (source_descr.st_ino != destination_descr.st_ino) - DB::throwFromErrno("Destination file " + destination_str + " is already exist and have different inode.", 0, link_errno); - } - else - DB::throwFromErrno("Cannot link " + source_str + " to " + destination_str); - } + createHardLink(source.toString(), destination.toString()); } else { @@ -120,3 +93,5 @@ void localBackup(const Poco::Path & source_path, const Poco::Path & destination_ break; } } + +} diff --git a/dbms/src/Common/localBackup.h b/dbms/src/Common/localBackup.h index 49dca80b8a6..604c323ba48 100644 --- a/dbms/src/Common/localBackup.h +++ b/dbms/src/Common/localBackup.h @@ -4,6 +4,9 @@ #include +namespace DB +{ + /** Creates a local (at the same mount point) backup (snapshot) directory. * * In the specified destination directory, it creates a hard links on all source-directory files @@ -19,3 +22,4 @@ */ void localBackup(const Poco::Path & source_path, const Poco::Path & destination_path, std::optional max_level = {}); +} diff --git a/dbms/src/DataStreams/ApplyingMutationsBlockInputStream.cpp b/dbms/src/DataStreams/ApplyingMutationsBlockInputStream.cpp deleted file mode 100644 index 0de7b5acbaf..00000000000 --- a/dbms/src/DataStreams/ApplyingMutationsBlockInputStream.cpp +++ /dev/null @@ -1,86 +0,0 @@ -#include -#include -#include -#include -#include - - -namespace DB -{ - -ApplyingMutationsBlockInputStream::ApplyingMutationsBlockInputStream( - const BlockInputStreamPtr & input, const std::vector & commands, const Context & context) -{ - children.push_back(input); - - if (commands.empty()) - { - impl = input; - return; - } - - /// Create a total predicate for all mutations and then pass it to a single FilterBlockInputStream - /// because ExpressionAnalyzer won't detect that some columns in the block are already calculated - /// and will try to calculate them twice. This works as long as all mutations are DELETE. - /// TODO: fix ExpressionAnalyzer. - - std::vector predicates; - - for (const MutationCommand & cmd : commands) - { - switch (cmd.type) - { - case MutationCommand::DELETE: - { - auto predicate = std::make_shared(); - predicate->name = "not"; - predicate->arguments = std::make_shared(); - predicate->arguments->children.push_back(cmd.predicate); - predicate->children.push_back(predicate->arguments); - predicates.push_back(predicate); - break; - } - default: - throw Exception("Unsupported mutation cmd type: " + toString(cmd.type), - ErrorCodes::LOGICAL_ERROR); - } - } - - ASTPtr total_predicate; - if (predicates.size() == 1) - total_predicate = predicates[0]; - else - { - auto and_func = std::make_shared(); - and_func->name = "and"; - and_func->arguments = std::make_shared(); - and_func->children.push_back(and_func->arguments); - and_func->arguments->children = predicates; - total_predicate = and_func; - } - - auto predicate_expr = ExpressionAnalyzer( - total_predicate, context, nullptr, input->getHeader().getNamesAndTypesList()).getActions(false); - String col_name = total_predicate->getColumnName(); - impl = std::make_shared(input, predicate_expr, col_name); -} - -Block ApplyingMutationsBlockInputStream::getHeader() const -{ - return impl->getHeader(); -} - -Block ApplyingMutationsBlockInputStream::getTotals() -{ - if (IProfilingBlockInputStream * profiling = dynamic_cast(impl.get())) - return profiling->getTotals(); - - return IProfilingBlockInputStream::getTotals(); -} - -Block ApplyingMutationsBlockInputStream::readImpl() -{ - return impl->read(); -} - -} diff --git a/dbms/src/DataStreams/ApplyingMutationsBlockInputStream.h b/dbms/src/DataStreams/ApplyingMutationsBlockInputStream.h deleted file mode 100644 index 40a25ea5cd9..00000000000 --- a/dbms/src/DataStreams/ApplyingMutationsBlockInputStream.h +++ /dev/null @@ -1,30 +0,0 @@ -#pragma once - -#include -#include -#include - -namespace DB -{ - -/// A stream that pulls the blocks from `input` and executes mutation commands on these blocks -/// (see Storages/MutationCommands.h for a full list). -/// Example: if mutation commands contain ALTER DELETE, this stream will delete rows that satisfy the predicate specified in the command. -class ApplyingMutationsBlockInputStream : public IProfilingBlockInputStream -{ -public: - ApplyingMutationsBlockInputStream( - const BlockInputStreamPtr & input, const std::vector & commands, const Context & context); - - String getName() const override { return "ApplyingMutations"; } - - Block getHeader() const override; - Block getTotals() override; - -private: - Block readImpl() override; - - BlockInputStreamPtr impl; -}; - -} diff --git a/dbms/src/Interpreters/ExpressionActions.cpp b/dbms/src/Interpreters/ExpressionActions.cpp index 9ad5637b1bc..97f38c627e0 100644 --- a/dbms/src/Interpreters/ExpressionActions.cpp +++ b/dbms/src/Interpreters/ExpressionActions.cpp @@ -110,12 +110,13 @@ ExpressionAction ExpressionAction::removeColumn(const std::string & removed_name return a; } -ExpressionAction ExpressionAction::copyColumn(const std::string & from_name, const std::string & to_name) +ExpressionAction ExpressionAction::copyColumn(const std::string & from_name, const std::string & to_name, bool can_replace) { ExpressionAction a; a.type = COPY_COLUMN; a.source_name = from_name; a.result_name = to_name; + a.can_replace = can_replace; return a; } @@ -309,8 +310,23 @@ void ExpressionAction::prepare(Block & sample_block) case COPY_COLUMN: { - result_type = sample_block.getByName(source_name).type; - sample_block.insert(ColumnWithTypeAndName(sample_block.getByName(source_name).column, result_type, result_name)); + const auto & source = sample_block.getByName(source_name); + result_type = source.type; + + if (sample_block.has(result_name)) + { + if (can_replace) + { + auto & result = sample_block.getByName(result_name); + result.type = result_type; + result.column = source.column; + } + else + throw Exception("Column '" + result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN); + } + else + sample_block.insert(ColumnWithTypeAndName(source.column, result_type, result_name)); + break; } @@ -358,7 +374,7 @@ void ExpressionAction::execute(Block & block, std::unordered_mapgetColumnName()); +} + + void ExpressionAnalyzer::getActionsBeforeAggregation(const ASTPtr & ast, ExpressionActionsPtr & actions, bool no_subqueries) { ASTFunction * node = typeid_cast(ast.get()); diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.h b/dbms/src/Interpreters/ExpressionAnalyzer.h index 8b11a8225a2..86c5cb8ce5a 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.h +++ b/dbms/src/Interpreters/ExpressionAnalyzer.h @@ -156,6 +156,8 @@ public: /// Deletes all columns except mentioned by SELECT, arranges the remaining columns and renames them to aliases. void appendProjectResult(ExpressionActionsChain & chain) const; + void appendExpression(ExpressionActionsChain & chain, const ASTPtr & expr, bool only_types); + /// If `ast` is not a SELECT query, just gets all the actions to evaluate the expression. /// If add_aliases, only the calculated values in the desired order and add aliases. /// If also project_result, than only aliases remain in the output block. @@ -171,9 +173,9 @@ public: * That is, you need to call getSetsWithSubqueries after all calls of `append*` or `getActions` * and create all the returned sets before performing the actions. */ - SubqueriesForSets getSubqueriesForSets() const { return subqueries_for_sets; } + const SubqueriesForSets & getSubqueriesForSets() const { return subqueries_for_sets; } - PreparedSets getPreparedSets() { return prepared_sets; } + const PreparedSets & getPreparedSets() const { return prepared_sets; } /** Tables that will need to be sent to remote servers for distributed query processing. */ diff --git a/dbms/src/Interpreters/InterpreterAlterQuery.cpp b/dbms/src/Interpreters/InterpreterAlterQuery.cpp index e652065f2e1..1d20f163925 100644 --- a/dbms/src/Interpreters/InterpreterAlterQuery.cpp +++ b/dbms/src/Interpreters/InterpreterAlterQuery.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -49,7 +50,7 @@ BlockIO InterpreterAlterQuery::execute() if (!mutation_commands.empty()) { - mutation_commands.validate(*table, context); + MutationsInterpreter(table, mutation_commands, context).validate(); table->mutate(mutation_commands, context); } diff --git a/dbms/src/Interpreters/MutationsInterpreter.cpp b/dbms/src/Interpreters/MutationsInterpreter.cpp new file mode 100644 index 00000000000..16517532e38 --- /dev/null +++ b/dbms/src/Interpreters/MutationsInterpreter.cpp @@ -0,0 +1,424 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_MUTATION_COMMAND; + extern const int NO_SUCH_COLUMN_IN_TABLE; + extern const int CANNOT_UPDATE_COLUMN; +} + +bool MutationsInterpreter::isStorageTouchedByMutations() const +{ + if (commands.empty()) + return false; + + for (const MutationCommand & command : commands) + { + if (!command.predicate) /// The command touches all rows. + return true; + } + + /// Execute `SELECT count() FROM storage WHERE predicate1 OR predicate2 OR ...` query. + /// The result can differ from tne number of affected rows (e.g. if there is an UPDATE command that + /// changes how many rows satisfy the predicates of the subsequent commands). + /// But we can be sure that if count = 0, then no rows will be touched. + + auto select = std::make_shared(); + + select->select_expression_list = std::make_shared(); + select->children.push_back(select->select_expression_list); + auto count_func = std::make_shared(); + count_func->name = "count"; + count_func->arguments = std::make_shared(); + select->select_expression_list->children.push_back(count_func); + + if (commands.size() == 1) + select->where_expression = commands[0].predicate; + else + { + auto coalesced_predicates = std::make_shared(); + coalesced_predicates->name = "or"; + coalesced_predicates->arguments = std::make_shared(); + coalesced_predicates->children.push_back(coalesced_predicates->arguments); + + for (const MutationCommand & command : commands) + coalesced_predicates->arguments->children.push_back(command.predicate); + + select->where_expression = std::move(coalesced_predicates); + } + select->children.push_back(select->where_expression); + + auto context_copy = context; + context_copy.getSettingsRef().merge_tree_uniform_read_distribution = 0; + context_copy.getSettingsRef().max_threads = 1; + + InterpreterSelectQuery interpreter_select(select, context_copy, storage, QueryProcessingStage::Complete); + BlockInputStreamPtr in = interpreter_select.execute().in; + + Block block = in->read(); + if (!block.rows()) + return false; + else if (block.rows() != 1) + throw Exception("count() expression returned " + toString(block.rows()) + " rows, not 1", + ErrorCodes::LOGICAL_ERROR); + + auto count = (*block.getByName("count()").column)[0].get(); + return count != 0; +} + + +static NameSet getKeyColumns(const StoragePtr & storage) +{ + const MergeTreeData * merge_tree_data = nullptr; + if (auto merge_tree = dynamic_cast(storage.get())) + merge_tree_data = &merge_tree->getData(); + else if (auto replicated_merge_tree = dynamic_cast(storage.get())) + merge_tree_data = &replicated_merge_tree->getData(); + else + return {}; + + NameSet key_columns; + + if (merge_tree_data->partition_expr) + for (const String & col : merge_tree_data->partition_expr->getRequiredColumns()) + key_columns.insert(col); + + auto primary_expr = merge_tree_data->getPrimaryExpression(); + if (primary_expr) + for (const String & col : primary_expr->getRequiredColumns()) + key_columns.insert(col); + /// We don't process sampling_expression separately because it must be among the primary key columns. + + auto secondary_sort_expr = merge_tree_data->getSecondarySortExpression(); + if (secondary_sort_expr) + for (const String & col : secondary_sort_expr->getRequiredColumns()) + key_columns.insert(col); + + if (!merge_tree_data->merging_params.sign_column.empty()) + key_columns.insert(merge_tree_data->merging_params.sign_column); + + return key_columns; +} + +static void validateUpdateColumns( + const StoragePtr & storage, const NameSet & updated_columns, + const std::unordered_map & column_to_affected_materialized) +{ + NameSet key_columns = getKeyColumns(storage); + + for (const String & column_name : updated_columns) + { + auto found = false; + for (const auto & col : storage->getColumns().ordinary) + { + if (col.name == column_name) + { + found = true; + break; + } + } + + if (!found) + { + for (const auto & col : storage->getColumns().materialized) + { + if (col.name == column_name) + throw Exception("Cannot UPDATE materialized column `" + column_name + "`", ErrorCodes::CANNOT_UPDATE_COLUMN); + } + + throw Exception("There is no column `" + column_name + "` in table", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); + } + + if (key_columns.count(column_name)) + throw Exception("Cannot UPDATE key column `" + column_name + "`", ErrorCodes::CANNOT_UPDATE_COLUMN); + + auto materialized_it = column_to_affected_materialized.find(column_name); + if (materialized_it != column_to_affected_materialized.end()) + { + for (const String & materialized : materialized_it->second) + { + if (key_columns.count(materialized)) + throw Exception("Updated column `" + column_name + "` affects MATERIALIZED column `" + + materialized + "`, which is a key column. Cannot UPDATE it.", + ErrorCodes::CANNOT_UPDATE_COLUMN); + } + } + } +} + + +void MutationsInterpreter::prepare(bool dry_run) +{ + if (is_prepared) + throw Exception("MutationsInterpreter is already prepared. It is a bug.", ErrorCodes::LOGICAL_ERROR); + + if (commands.empty()) + throw Exception("Empty mutation commands list", ErrorCodes::LOGICAL_ERROR); + + const ColumnsDescription & columns_desc = storage->getColumns(); + NamesAndTypesList all_columns = columns_desc.getAllPhysical(); + + NameSet updated_columns; + for (const MutationCommand & command : commands) + { + for (const auto & kv : command.column_to_update_expression) + updated_columns.insert(kv.first); + } + + /// We need to know which columns affect which MATERIALIZED columns to recalculate them if dependencies + /// are updated. + std::unordered_map column_to_affected_materialized; + if (!updated_columns.empty()) + { + for (const auto & kv : columns_desc.defaults) + { + const String & column = kv.first; + const ColumnDefault & col_default = kv.second; + if (col_default.kind == ColumnDefaultKind::Materialized) + { + ExpressionAnalyzer analyzer(col_default.expression->clone(), context, nullptr, all_columns); + for (const String & dependency : analyzer.getRequiredSourceColumns()) + { + if (updated_columns.count(dependency)) + column_to_affected_materialized[dependency].push_back(column); + } + } + } + } + + if (!updated_columns.empty()) + validateUpdateColumns(storage, updated_columns, column_to_affected_materialized); + + /// First, break a sequence of commands into stages. + stages.emplace_back(context); + for (const auto & command : commands) + { + if (!stages.back().column_to_updated.empty()) + stages.emplace_back(context); + + if (command.type == MutationCommand::DELETE) + { + auto negated_predicate = makeASTFunction("not", command.predicate->clone()); + stages.back().filters.push_back(negated_predicate); + } + else if (command.type == MutationCommand::UPDATE) + { + if (stages.size() == 1) /// First stage only supports filtering and can't update columns. + stages.emplace_back(context); + + NameSet affected_materialized; + + for (const auto & kv : command.column_to_update_expression) + { + const String & column = kv.first; + + auto materialized_it = column_to_affected_materialized.find(column); + if (materialized_it != column_to_affected_materialized.end()) + { + for (const String & mat_column : materialized_it->second) + affected_materialized.emplace(mat_column); + } + + const auto & update_expr = kv.second; + auto updated_column = makeASTFunction("CAST", + makeASTFunction("if", + command.predicate->clone(), + update_expr->clone(), + std::make_shared(column)), + std::make_shared(columns_desc.getPhysical(column).type->getName())); + stages.back().column_to_updated.emplace(column, updated_column); + } + + if (!affected_materialized.empty()) + { + stages.emplace_back(context); + for (const auto & column : columns_desc.materialized) + { + stages.back().column_to_updated.emplace( + column.name, + columns_desc.defaults.at(column.name).expression->clone()); + } + } + } + else + throw Exception("Unknown mutation command type: " + DB::toString(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND); + } + + /// Next, for each stage calculate columns changed by this and previous stages. + for (size_t i = 0; i < stages.size(); ++i) + { + if (!stages[i].filters.empty()) + { + for (const auto & column : all_columns) + stages[i].output_columns.insert(column.name); + continue; + } + + if (i > 0) + stages[i].output_columns = stages[i - 1].output_columns; + + if (stages[i].output_columns.size() < all_columns.size()) + { + for (const auto & kv : stages[i].column_to_updated) + stages[i].output_columns.insert(kv.first); + } + } + + /// Now, calculate `expressions_chain` for each stage except the first. + /// Do it backwards to propagate information about columns required as input for a stage to the previous stage. + for (size_t i = stages.size() - 1; i > 0; --i) + { + auto & stage = stages[i]; + + ASTPtr all_asts = std::make_shared(); + + for (const auto & ast : stage.filters) + all_asts->children.push_back(ast); + + for (const auto & kv : stage.column_to_updated) + all_asts->children.push_back(kv.second); + + /// Add all output columns to prevent ExpressionAnalyzer from deleting them from source columns. + for (const String & column : stage.output_columns) + all_asts->children.push_back(std::make_shared(column)); + + stage.analyzer = std::make_unique(all_asts, context, nullptr, all_columns); + + ExpressionActionsChain & actions_chain = stage.expressions_chain; + + for (const auto & ast : stage.filters) + { + if (!actions_chain.steps.empty()) + actions_chain.addStep(); + stage.analyzer->appendExpression(actions_chain, ast, dry_run); + stage.filter_column_names.push_back(ast->getColumnName()); + } + + if (!stage.column_to_updated.empty()) + { + if (!actions_chain.steps.empty()) + actions_chain.addStep(); + + for (const auto & kv : stage.column_to_updated) + stage.analyzer->appendExpression(actions_chain, kv.second, dry_run); + + for (const auto & kv : stage.column_to_updated) + { + actions_chain.getLastActions()->add(ExpressionAction::copyColumn( + kv.second->getColumnName(), kv.first, /* can_replace = */ true)); + } + } + + /// Remove all intermediate columns. + actions_chain.addStep(); + actions_chain.getLastStep().required_output.assign(stage.output_columns.begin(), stage.output_columns.end()); + + actions_chain.finalize(); + + /// Propagate information about columns needed as input. + for (const auto & column : actions_chain.steps.front().actions->getRequiredColumnsWithTypes()) + stages[i - 1].output_columns.insert(column.name); + } + + /// Execute first stage as a SELECT statement. + + auto select = std::make_shared(); + + select->select_expression_list = std::make_shared(); + select->children.push_back(select->select_expression_list); + for (const auto & column_name : stages[0].output_columns) + select->select_expression_list->children.push_back(std::make_shared(column_name)); + + if (!stages[0].filters.empty()) + { + ASTPtr where_expression; + if (stages[0].filters.size() == 1) + where_expression = stages[0].filters[0]; + else + { + auto coalesced_predicates = std::make_shared(); + coalesced_predicates->name = "and"; + coalesced_predicates->arguments = std::make_shared(); + coalesced_predicates->children.push_back(coalesced_predicates->arguments); + coalesced_predicates->arguments->children = stages[0].filters; + where_expression = std::move(coalesced_predicates); + } + select->where_expression = where_expression; + select->children.push_back(where_expression); + } + + interpreter_select = std::make_unique(select, context, storage, QueryProcessingStage::Complete, dry_run); + + is_prepared = true; +} + +BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(BlockInputStreamPtr in) const +{ + for (size_t i_stage = 1; i_stage < stages.size(); ++i_stage) + { + const Stage & stage = stages[i_stage]; + + for (size_t i = 0; i < stage.expressions_chain.steps.size(); ++i) + { + const auto & step = stage.expressions_chain.steps[i]; + if (i < stage.filter_column_names.size()) + { + /// Execute DELETEs. + in = std::make_shared(in, step.actions, stage.filter_column_names[i]); + } + else + { + /// Execute UPDATE or final projection. + in = std::make_shared(in, step.actions); + } + } + + const SubqueriesForSets & subqueries_for_sets = stage.analyzer->getSubqueriesForSets(); + if (!subqueries_for_sets.empty()) + { + const auto & settings = context.getSettingsRef(); + in = std::make_shared(in, subqueries_for_sets, + SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode)); + } + } + + in = std::make_shared(in); + + return in; +} + +void MutationsInterpreter::validate() +{ + prepare(/* dry_run = */ true); + Block first_stage_header = interpreter_select->getSampleBlock(); + BlockInputStreamPtr in = std::make_shared(first_stage_header); + addStreamsForLaterStages(in)->getHeader(); +} + +BlockInputStreamPtr MutationsInterpreter::execute() +{ + prepare(/* dry_run = */ false); + BlockInputStreamPtr in = interpreter_select->execute().in; + return addStreamsForLaterStages(in); +} + +} diff --git a/dbms/src/Interpreters/MutationsInterpreter.h b/dbms/src/Interpreters/MutationsInterpreter.h new file mode 100644 index 00000000000..8c91c9cef60 --- /dev/null +++ b/dbms/src/Interpreters/MutationsInterpreter.h @@ -0,0 +1,85 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +class Context; + +/// Create an input stream that will read data from storage and apply mutation commands (UPDATEs, DELETEs) +/// to this data. +class MutationsInterpreter +{ +public: + MutationsInterpreter(StoragePtr storage_, std::vector commands_, const Context & context_) + : storage(std::move(storage_)) + , commands(std::move(commands_)) + , context(context_) + { + } + + void validate(); + + /// Return false if the data isn't going to be changed by mutations. + bool isStorageTouchedByMutations() const; + + /// The resulting stream will return blocks containing changed columns only. + BlockInputStreamPtr execute(); + +private: + void prepare(bool dry_run); + + BlockInputStreamPtr addStreamsForLaterStages(BlockInputStreamPtr in) const; + +private: + StoragePtr storage; + std::vector commands; + const Context & context; + + /// A sequence of mutation commands is executed as a sequence of stages. Each stage consists of several + /// filters, followed by updating values of some columns. Commands can reuse expressions calculated by the + /// previous commands in the same stage, but at the end of each stage intermediate columns are thrown away + /// (they may contain wrong values because the column values have been updated). + /// + /// If an UPDATE command changes some columns that some MATERIALIZED columns depend on, a stage to + /// recalculate these columns is added. + /// + /// Each stage has output_columns that contain columns that are changed at the end of that stage + /// plus columns needed for the next mutations. + /// + /// First stage is special: it can contain only DELETEs and is executed using InterpreterSelectQuery + /// to take advantage of table indexes (if there are any). + + struct Stage + { + Stage(const Context & context) : expressions_chain(context) {} + + ASTs filters; + std::unordered_map column_to_updated; + + /// Contains columns that are changed by this stage, + /// columns changed by the previous stages and also columns needed by the next stages. + NameSet output_columns; + + std::unique_ptr analyzer; + + /// A chain of actions needed to execute this stage. + /// First steps calculate filter columns for DELETEs (in the same order as in `filter_column_names`), + /// then there is (possibly) an UPDATE stage, and finally a projection stage. + ExpressionActionsChain expressions_chain; + Names filter_column_names; + }; + + std::unique_ptr interpreter_select; + std::vector stages; + bool is_prepared = false; /// Has the sequence of stages been prepared. +}; + +} diff --git a/dbms/src/Parsers/ASTAlterQuery.cpp b/dbms/src/Parsers/ASTAlterQuery.cpp index cbf1c934b05..7081b512247 100644 --- a/dbms/src/Parsers/ASTAlterQuery.cpp +++ b/dbms/src/Parsers/ASTAlterQuery.cpp @@ -133,6 +133,14 @@ void ASTAlterCommand::formatImpl( settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "DELETE WHERE " << (settings.hilite ? hilite_none : ""); predicate->formatImpl(settings, state, frame); } + else if (type == ASTAlterCommand::UPDATE) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "UPDATE " << (settings.hilite ? hilite_none : ""); + update_assignments->formatImpl(settings, state, frame); + + settings.ostr << (settings.hilite ? hilite_keyword : "") << " WHERE " << (settings.hilite ? hilite_none : ""); + predicate->formatImpl(settings, state, frame); + } else throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE); } diff --git a/dbms/src/Parsers/ASTAlterQuery.h b/dbms/src/Parsers/ASTAlterQuery.h index 683d0780876..c79f9ba8b2f 100644 --- a/dbms/src/Parsers/ASTAlterQuery.h +++ b/dbms/src/Parsers/ASTAlterQuery.h @@ -33,6 +33,7 @@ public: FREEZE_PARTITION, DELETE, + UPDATE, NO_TYPE, }; @@ -59,9 +60,12 @@ public: */ ASTPtr partition; - /// For DELETE WHERE: the predicate that filters the rows to delete. + /// For DELETE/UPDATE WHERE: the predicate that filters the rows to delete/update. ASTPtr predicate; + /// A list of expressions of the form `column = expr` for the UPDATE command. + ASTPtr update_assignments; + bool detach = false; /// true for DETACH PARTITION bool part = false; /// true for ATTACH PART diff --git a/dbms/src/Parsers/ASTAssignment.h b/dbms/src/Parsers/ASTAssignment.h new file mode 100644 index 00000000000..18bf46c171c --- /dev/null +++ b/dbms/src/Parsers/ASTAssignment.h @@ -0,0 +1,44 @@ +#pragma once + +#include + +namespace DB +{ + +/// Part of the ALTER UPDATE statement of the form: column = expr +class ASTAssignment : public IAST +{ +public: + String column_name; + ASTPtr expression; + + String getID() const override { return "Assignment_" + column_name; } + + ASTPtr clone() const override + { + auto res = std::make_shared(*this); + res->children.clear(); + + if (expression) + { + res->expression = expression->clone(); + res->children.push_back(res->expression); + } + + return res; + } + +protected: + void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override + { + settings.ostr << (settings.hilite ? hilite_identifier : ""); + settings.writeIdentifier(column_name); + settings.ostr << (settings.hilite ? hilite_none : ""); + + settings.ostr << (settings.hilite ? hilite_operator : "") << " = " << (settings.hilite ? hilite_none : ""); + + expression->formatImpl(settings, state, frame); + } +}; + +} diff --git a/dbms/src/Parsers/ASTColumnDeclaration.h b/dbms/src/Parsers/ASTColumnDeclaration.h index 57a1f7695d7..308e9b66526 100644 --- a/dbms/src/Parsers/ASTColumnDeclaration.h +++ b/dbms/src/Parsers/ASTColumnDeclaration.h @@ -21,8 +21,6 @@ public: ASTPtr clone() const override { const auto res = std::make_shared(*this); - ASTPtr ptr{res}; - res->children.clear(); if (type) @@ -37,7 +35,7 @@ public: res->children.push_back(res->default_expression); } - return ptr; + return res; } protected: diff --git a/dbms/src/Parsers/ASTIdentifier.cpp b/dbms/src/Parsers/ASTIdentifier.cpp index f2de63848c3..efe796c3018 100644 --- a/dbms/src/Parsers/ASTIdentifier.cpp +++ b/dbms/src/Parsers/ASTIdentifier.cpp @@ -11,11 +11,7 @@ void ASTIdentifier::formatImplWithoutAlias(const FormatSettings & settings, Form auto format_element = [&](const String & elem_name) { settings.ostr << (settings.hilite ? hilite_identifier : ""); - - WriteBufferFromOStream wb(settings.ostr, 32); - settings.writeIdentifier(elem_name, wb); - wb.next(); - + settings.writeIdentifier(elem_name); settings.ostr << (settings.hilite ? hilite_none : ""); }; diff --git a/dbms/src/Parsers/ASTWithAlias.cpp b/dbms/src/Parsers/ASTWithAlias.cpp index ef261386cd0..67a4401f9a5 100644 --- a/dbms/src/Parsers/ASTWithAlias.cpp +++ b/dbms/src/Parsers/ASTWithAlias.cpp @@ -9,11 +9,7 @@ namespace DB void ASTWithAlias::writeAlias(const String & name, const FormatSettings & settings) const { settings.ostr << (settings.hilite ? hilite_keyword : "") << " AS " << (settings.hilite ? hilite_alias : ""); - - WriteBufferFromOStream wb(settings.ostr, 32); - settings.writeIdentifier(name, wb); - wb.next(); - + settings.writeIdentifier(name); settings.ostr << (settings.hilite ? hilite_none : ""); } @@ -25,8 +21,7 @@ void ASTWithAlias::formatImpl(const FormatSettings & settings, FormatState & sta /// If we have previously output this node elsewhere in the query, now it is enough to output only the alias. if (!state.printed_asts_with_alias.emplace(frame.current_select, alias).second) { - WriteBufferFromOStream wb(settings.ostr, 32); - settings.writeIdentifier(alias, wb); + settings.writeIdentifier(alias); return; } } diff --git a/dbms/src/Parsers/IAST.cpp b/dbms/src/Parsers/IAST.cpp index e5d9b7fc29d..a3244e1abeb 100644 --- a/dbms/src/Parsers/IAST.cpp +++ b/dbms/src/Parsers/IAST.cpp @@ -99,8 +99,10 @@ String IAST::getColumnName() const } -void IAST::FormatSettings::writeIdentifier(const String & name, WriteBuffer & out) const +void IAST::FormatSettings::writeIdentifier(const String & name) const { + WriteBufferFromOStream out(ostr, 32); + switch (identifier_quoting_style) { case IdentifierQuotingStyle::None: @@ -128,6 +130,8 @@ void IAST::FormatSettings::writeIdentifier(const String & name, WriteBuffer & ou break; } } + + out.next(); } } diff --git a/dbms/src/Parsers/IAST.h b/dbms/src/Parsers/IAST.h index b3d06abf584..5135e08a965 100644 --- a/dbms/src/Parsers/IAST.h +++ b/dbms/src/Parsers/IAST.h @@ -164,7 +164,7 @@ public: nl_or_ws = one_line ? ' ' : '\n'; } - void writeIdentifier(const String & name, WriteBuffer & out) const; + void writeIdentifier(const String & name) const; }; /// State. For example, a set of nodes can be remembered, which we already walk through. diff --git a/dbms/src/Parsers/ParserAlterQuery.cpp b/dbms/src/Parsers/ParserAlterQuery.cpp index c592cb66d7d..56eaddb38ee 100644 --- a/dbms/src/Parsers/ParserAlterQuery.cpp +++ b/dbms/src/Parsers/ParserAlterQuery.cpp @@ -1,11 +1,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include @@ -38,12 +40,17 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected ParserKeyword s_name("NAME"); ParserKeyword s_delete_where("DELETE WHERE"); + ParserKeyword s_update("UPDATE"); + ParserKeyword s_where("WHERE"); ParserCompoundIdentifier parser_name; ParserStringLiteral parser_string_literal; ParserCompoundColumnDeclaration parser_col_decl; ParserPartition parser_partition; - ParserExpression exp_elem; + ParserExpression parser_exp_elem; + ParserList parser_assignment_list( + std::make_unique(), std::make_unique(TokenType::Comma), + /* allow_empty = */ false); if (s_add_column.ignore(pos, expected)) { @@ -195,11 +202,24 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected } else if (s_delete_where.ignore(pos, expected)) { - if (!exp_elem.parse(pos, command->predicate, expected)) + if (!parser_exp_elem.parse(pos, command->predicate, expected)) return false; command->type = ASTAlterCommand::DELETE; } + else if (s_update.ignore(pos, expected)) + { + if (!parser_assignment_list.parse(pos, command->update_assignments, expected)) + return false; + + if (!s_where.ignore(pos, expected)) + return false; + + if (!parser_exp_elem.parse(pos, command->predicate, expected)) + return false; + + command->type = ASTAlterCommand::UPDATE; + } else return false; @@ -213,6 +233,8 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected command->children.push_back(command->partition); if (command->predicate) command->children.push_back(command->predicate); + if (command->update_assignments) + command->children.push_back(command->update_assignments); return true; } @@ -240,6 +262,33 @@ bool ParserAlterCommandList::parseImpl(Pos & pos, ASTPtr & node, Expected & expe } +bool ParserAssignment::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) +{ + auto assignment = std::make_shared(); + node = assignment; + + ParserIdentifier p_identifier; + ParserToken s_equals(TokenType::Equals); + ParserExpression p_expression; + + ASTPtr column; + if (!p_identifier.parse(pos, column, expected)) + return false; + + if (!s_equals.ignore(pos, expected)) + return false; + + if (!p_expression.parse(pos, assignment->expression, expected)) + return false; + + assignment->column_name = typeid_cast(*column).name; + if (assignment->expression) + assignment->children.push_back(assignment->expression); + + return true; +} + + bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { auto query = std::make_shared(); @@ -269,4 +318,5 @@ bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) return true; } + } diff --git a/dbms/src/Parsers/ParserAlterQuery.h b/dbms/src/Parsers/ParserAlterQuery.h index c758e0304b4..46908ae135d 100644 --- a/dbms/src/Parsers/ParserAlterQuery.h +++ b/dbms/src/Parsers/ParserAlterQuery.h @@ -17,12 +17,13 @@ namespace DB * [FETCH PARTITION partition FROM ...] * [FREEZE PARTITION] * [DELETE WHERE ...] + * [UPDATE col_name = expr, ... WHERE ...] */ -class ParserAlterCommand : public IParserBase +class ParserAlterQuery : public IParserBase { protected: - const char * getName() const { return "ALTER command"; } + const char * getName() const { return "ALTER query"; } bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected); }; @@ -35,10 +36,19 @@ protected: }; -class ParserAlterQuery : public IParserBase +class ParserAlterCommand : public IParserBase { protected: - const char * getName() const { return "ALTER query"; } + const char * getName() const { return "ALTER command"; } + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected); +}; + + +/// Part of the UPDATE command of the form: col_name = expr +class ParserAssignment : public IParserBase +{ +protected: + const char * getName() const { return "column assignment"; } bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected); }; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index ea7a52876ed..f140fbaa5b5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -78,6 +78,7 @@ namespace ErrorCodes extern const int PART_IS_TEMPORARILY_LOCKED; extern const int TOO_MANY_PARTS; extern const int INCOMPATIBLE_COLUMNS; + extern const int CANNOT_UPDATE_COLUMN; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index a8088e1f6cb..a5d247d82e3 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -19,9 +19,7 @@ #include #include #include -#include -#include -#include +#include #include #include #include @@ -30,8 +28,10 @@ #include #include #include +#include #include +#include #include #include @@ -851,132 +851,20 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor } -static bool isStorageTouchedByMutation( - const StoragePtr & storage, const std::vector & commands, const Context & context) -{ - if (commands.empty()) - return false; - - for (const MutationCommand & command : commands) - { - if (!command.predicate) /// The command touches all rows. - return true; - } - - /// Execute `SELECT count() FROM storage WHERE predicate1 OR predicate2 OR ...` query. - /// The result is tne number of affected rows. - - auto select = std::make_shared(); - - select->select_expression_list = std::make_shared(); - select->children.push_back(select->select_expression_list); - auto count_func = std::make_shared(); - count_func->name = "count"; - count_func->arguments = std::make_shared(); - select->select_expression_list->children.push_back(count_func); - - if (commands.size() == 1) - select->where_expression = commands[0].predicate; - else - { - auto coalesced_predicates = std::make_shared(); - coalesced_predicates->name = "or"; - coalesced_predicates->arguments = std::make_shared(); - coalesced_predicates->children.push_back(coalesced_predicates->arguments); - - for (const MutationCommand & command : commands) - coalesced_predicates->arguments->children.push_back(command.predicate); - - select->where_expression = std::move(coalesced_predicates); - } - select->children.push_back(select->where_expression); - - auto context_copy = context; - context_copy.getSettingsRef().merge_tree_uniform_read_distribution = 0; - context_copy.getSettingsRef().max_threads = 1; - - InterpreterSelectQuery interpreter_select(select, context_copy, storage, QueryProcessingStage::Complete); - BlockInputStreamPtr in = interpreter_select.execute().in; - - Block block = in->read(); - if (!block.rows()) - return false; - else if (block.rows() != 1) - throw Exception("count() expression returned " + toString(block.rows()) + " rows, not 1", - ErrorCodes::LOGICAL_ERROR); - - auto count = (*block.getByName("count()").column)[0].get(); - return count != 0; -} - -static BlockInputStreamPtr createInputStreamWithMutatedData( - const StoragePtr & storage, std::vector commands, const Context & context) -{ - auto select = std::make_shared(); - - select->select_expression_list = std::make_shared(); - select->children.push_back(select->select_expression_list); - for (const auto & column : storage->getColumns().getAllPhysical()) - select->select_expression_list->children.push_back(std::make_shared(column.name)); - - /// For all commands that are in front of the list and are DELETE commands, we can push them down - /// to the SELECT statement and remove them from commands. - - auto deletes_end = commands.begin(); - for (; deletes_end != commands.end(); ++deletes_end) - { - if (deletes_end->type != MutationCommand::DELETE) - break; - } - - std::vector predicates; - for (auto it = commands.begin(); it != deletes_end; ++it) - { - auto predicate = std::make_shared(); - predicate->name = "not"; - predicate->arguments = std::make_shared(); - predicate->arguments->children.push_back(it->predicate); - predicate->children.push_back(predicate->arguments); - predicates.push_back(predicate); - } - - commands.erase(commands.begin(), deletes_end); - - if (!predicates.empty()) - { - ASTPtr where_expression; - if (predicates.size() == 1) - where_expression = predicates[0]; - else - { - auto coalesced_predicates = std::make_shared(); - coalesced_predicates->name = "and"; - coalesced_predicates->arguments = std::make_shared(); - coalesced_predicates->children.push_back(coalesced_predicates->arguments); - coalesced_predicates->arguments->children = predicates; - - where_expression = std::move(coalesced_predicates); - } - select->where_expression = where_expression; - select->children.push_back(where_expression); - } - - InterpreterSelectQuery interpreter_select(select, context, storage); - BlockInputStreamPtr in = interpreter_select.execute().in; - - if (!commands.empty()) - in = std::make_shared(in, commands, context); - - return in; -} - MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTemporaryPart( const FuturePart & future_part, const std::vector & commands, const Context & context) { - if (actions_blocker.isCancelled()) - throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED); + auto check_not_cancelled = [&]() + { + if (actions_blocker.isCancelled()) + throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED); + + return true; + }; + + check_not_cancelled(); if (future_part.parts.size() != 1) throw Exception("Trying to mutate " + toString(future_part.parts.size()) + " parts, not one. " @@ -991,7 +879,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor context_for_reading.getSettingsRef().merge_tree_uniform_read_distribution = 0; context_for_reading.getSettingsRef().max_threads = 1; - if (!isStorageTouchedByMutation(storage_from_source_part, commands, context_for_reading)) + MutationsInterpreter mutations_interpreter(storage_from_source_part, commands, context_for_reading); + + if (!mutations_interpreter.isStorageTouchedByMutations()) { LOG_TRACE(log, "Part " << source_part->name << " doesn't change up to mutation version " << future_part.part_info.mutation); return data.cloneAndLoadDataPart(source_part, "tmp_clone_", future_part.part_info); @@ -1015,38 +905,105 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor source_part->bytes_on_disk, static_cast(source_part->bytes_on_disk) / data.getTotalActiveSizeInBytes()); - auto in = createInputStreamWithMutatedData(storage_from_source_part, commands, context_for_reading); - - if (data.hasPrimaryKey()) - in = std::make_shared( - std::make_shared(in, data.getPrimaryExpression())); - Poco::File(new_part_tmp_path).createDirectories(); + auto in = mutations_interpreter.execute(); NamesAndTypesList all_columns = data.getColumns().getAllPhysical(); - MergedBlockOutputStream out(data, new_part_tmp_path, all_columns, compression_settings); - - MergeTreeDataPart::MinMaxIndex minmax_idx; - - in->readPrefix(); - out.writePrefix(); - - Block block; - while (!actions_blocker.isCancelled() && (block = in->read())) + if (in->getHeader().columns() == all_columns.size()) { - minmax_idx.update(block, data.minmax_idx_columns); - out.write(block); + /// All columns are modified, proceed to write a new part from scratch. + + if (data.hasPrimaryKey()) + in = std::make_shared( + std::make_shared(in, data.getPrimaryExpression())); + + MergeTreeDataPart::MinMaxIndex minmax_idx; + + MergedBlockOutputStream out(data, new_part_tmp_path, all_columns, compression_settings); + + in->readPrefix(); + out.writePrefix(); + + Block block; + while (check_not_cancelled() && (block = in->read())) + { + minmax_idx.update(block, data.minmax_idx_columns); + out.write(block); + } + + new_data_part->partition.assign(source_part->partition); + new_data_part->minmax_idx = std::move(minmax_idx); + + in->readSuffix(); + out.writeSuffixAndFinalizePart(new_data_part); } + else + { + /// We will modify only some of the columns. Other columns and key values can be copied as-is. + /// TODO: check that we modify only non-key columns in this case. - if (actions_blocker.isCancelled()) - throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED); + NameSet files_to_skip = {"checksums.txt", "columns.txt"}; + for (const auto & entry : in->getHeader()) + { + IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path) + { + String stream_name = IDataType::getFileNameForStream(entry.name, substream_path); + files_to_skip.insert(stream_name + ".bin"); + files_to_skip.insert(stream_name + ".mrk"); + }; - new_data_part->partition.assign(source_part->partition); - new_data_part->minmax_idx = std::move(minmax_idx); + IDataType::SubstreamPath stream_path; + entry.type->enumerateStreams(callback, stream_path); + } - in->readSuffix(); - out.writeSuffixAndFinalizePart(new_data_part); + Poco::DirectoryIterator dir_end; + for (Poco::DirectoryIterator dir_it(source_part->getFullPath()); dir_it != dir_end; ++dir_it) + { + if (files_to_skip.count(dir_it.name())) + continue; + + Poco::Path destination(new_part_tmp_path); + destination.append(dir_it.name()); + + createHardLink(dir_it.path().toString(), destination.toString()); + } + + MergedColumnOnlyOutputStream out(data, in->getHeader(), new_part_tmp_path, /* sync = */ false, compression_settings, /* skip_offsets = */ false); + + in->readPrefix(); + out.writePrefix(); + + Block block; + while (check_not_cancelled() && (block = in->read())) + out.write(block); + + in->readSuffix(); + auto changed_checksums = out.writeSuffixAndGetChecksums(); + + new_data_part->checksums = source_part->checksums; + new_data_part->checksums.add(std::move(changed_checksums)); + { + /// Write file with checksums. + WriteBufferFromFile out(new_part_tmp_path + "checksums.txt", 4096); + new_data_part->checksums.write(out); + } + + new_data_part->columns = all_columns; + { + /// Write a file with a description of columns. + WriteBufferFromFile out(new_part_tmp_path + "columns.txt", 4096); + all_columns.writeText(out); + } + + new_data_part->rows_count = source_part->rows_count; + new_data_part->marks_count = source_part->marks_count; + new_data_part->index = source_part->index; + new_data_part->partition.assign(source_part->partition); + new_data_part->minmax_idx = source_part->minmax_idx; + new_data_part->modification_time = time(nullptr); + new_data_part->bytes_on_disk = MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->getFullPath()); + } return new_data_part; } diff --git a/dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h b/dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h index 60331a4049c..e1e38e9b18a 100644 --- a/dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h +++ b/dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h @@ -30,6 +30,13 @@ public: {part}, column_names, query_info, context, max_block_size, num_streams, 0); } + bool supportsIndexForIn() const override { return true; } + + bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) const override + { + return part->storage.mayBenefitFromIndexForIn(left_in_operand); + } + protected: StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_) : IStorage(part_->storage.getColumns()), part(part_) diff --git a/dbms/src/Storages/MutationCommands.cpp b/dbms/src/Storages/MutationCommands.cpp index 38bacbfd9b6..685aa883190 100644 --- a/dbms/src/Storages/MutationCommands.cpp +++ b/dbms/src/Storages/MutationCommands.cpp @@ -1,12 +1,11 @@ #include -#include -#include -#include #include #include #include #include #include +#include +#include #include @@ -16,6 +15,7 @@ namespace DB namespace ErrorCodes { extern const int UNKNOWN_MUTATION_COMMAND; + extern const int MULTIPLE_ASSIGNMENTS_TO_COLUMN; } std::optional MutationCommand::parse(ASTAlterCommand * command) @@ -28,6 +28,22 @@ std::optional MutationCommand::parse(ASTAlterCommand * command) res.predicate = command->predicate; return res; } + else if (command->type == ASTAlterCommand::UPDATE) + { + MutationCommand res; + res.ast = command->ptr(); + res.type = UPDATE; + res.predicate = command->predicate; + for (const ASTPtr & assignment_ast : command->update_assignments->children) + { + const auto & assignment = typeid_cast(*assignment_ast); + auto insertion = res.column_to_update_expression.emplace(assignment.column_name, assignment.expression); + if (!insertion.second) + throw Exception("Multiple assignments in the single statement to column `" + assignment.column_name + "`", + ErrorCodes::MULTIPLE_ASSIGNMENTS_TO_COLUMN); + } + return res; + } else return {}; } @@ -41,33 +57,6 @@ std::shared_ptr MutationCommands::ast() const return res; } -void MutationCommands::validate(const IStorage & table, const Context & context) const -{ - auto all_columns = table.getColumns().getAll(); - - for (const MutationCommand & command : *this) - { - switch (command.type) - { - case MutationCommand::DELETE: - { - auto actions = ExpressionAnalyzer(command.predicate, context, {}, all_columns).getActions(true); - - /// Try executing the resulting actions on the table sample block to detect malformed queries. - auto table_sample_block = table.getSampleBlock(); - actions->execute(table_sample_block); - - const ColumnWithTypeAndName & predicate_column = actions->getSampleBlock().getByName( - command.predicate->getColumnName()); - checkColumnCanBeUsedAsFilter(predicate_column); - break; - } - default: - throw Exception("Bad mutation type: " + toString(command.type), ErrorCodes::LOGICAL_ERROR); - } - } -} - void MutationCommands::writeText(WriteBuffer & out) const { std::stringstream commands_ss; diff --git a/dbms/src/Storages/MutationCommands.h b/dbms/src/Storages/MutationCommands.h index 2d9d9fa5fde..b5894c93598 100644 --- a/dbms/src/Storages/MutationCommands.h +++ b/dbms/src/Storages/MutationCommands.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB @@ -20,12 +21,15 @@ struct MutationCommand { EMPTY, /// Not used. DELETE, + UPDATE, }; Type type = EMPTY; ASTPtr predicate; + std::unordered_map column_to_update_expression; + static std::optional parse(ASTAlterCommand * command); }; @@ -34,8 +38,6 @@ class MutationCommands : public std::vector public: std::shared_ptr ast() const; - void validate(const IStorage & table, const Context & context) const; - void writeText(WriteBuffer & out) const; void readText(ReadBuffer & in); }; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index a69e19b3d9d..8fe68a561b2 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -305,15 +305,17 @@ struct CurrentlyMergingPartsTagger void StorageMergeTree::mutate(const MutationCommands & commands, const Context &) { MergeTreeMutationEntry entry(commands, full_path, data.insert_increment.get()); + String file_name; { std::lock_guard lock(currently_merging_mutex); Int64 version = increment.get(); entry.commit(version); + file_name = entry.file_name; current_mutations_by_version.emplace(version, std::move(entry)); } - LOG_INFO(log, "Added mutation: " << entry.file_name); + LOG_INFO(log, "Added mutation: " << file_name); background_task_handle->wake(); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index eb826023b90..4c0ba3c5e64 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -4166,7 +4166,6 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const /// /// After all needed parts are mutated (i.e. all active parts have the mutation version greater than /// the version of this mutation), the mutation is considered done and can be deleted. - /// TODO: add a way to track the progress of mutations and a process to clean old mutations. ReplicatedMergeTreeMutationEntry entry; entry.source_replica = replica_name; diff --git a/dbms/tests/queries/0_stateless/00652_mergetree_mutations.sh b/dbms/tests/queries/0_stateless/00652_mergetree_mutations.sh index bb29f6d31fe..d762d49a6ce 100755 --- a/dbms/tests/queries/0_stateless/00652_mergetree_mutations.sh +++ b/dbms/tests/queries/0_stateless/00652_mergetree_mutations.sh @@ -43,6 +43,8 @@ ${CLICKHOUSE_CLIENT} --query="SELECT d, x, s, m FROM test.mutations ORDER BY d, ${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, command, block_numbers.partition_id, block_numbers.number, parts_to_do, is_done \ FROM system.mutations WHERE table = 'mutations' ORDER BY mutation_id" +${CLICKHOUSE_CLIENT} --query="DROP TABLE test.mutations" + ${CLICKHOUSE_CLIENT} --query="SELECT '*** Test mutations cleaner ***'" @@ -69,5 +71,4 @@ sleep 0.1 # Check that the first mutation is cleaned ${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, command, is_done FROM system.mutations WHERE table = 'mutations_cleaner' ORDER BY mutation_id" -${CLICKHOUSE_CLIENT} --query="DROP TABLE test.mutations" ${CLICKHOUSE_CLIENT} --query="DROP TABLE test.mutations_cleaner" diff --git a/dbms/tests/queries/0_stateless/00652_mutations_alter_update.reference b/dbms/tests/queries/0_stateless/00652_mutations_alter_update.reference new file mode 100644 index 00000000000..5c9affb978a --- /dev/null +++ b/dbms/tests/queries/0_stateless/00652_mutations_alter_update.reference @@ -0,0 +1,24 @@ +*** Test expected failures *** +Updating partition key should fail +Updating primary key should fail +Updating MATERIALIZED column should fail +Updating with non-UInt8 predicate should fail +*** Test updating according to a predicate *** +2000-01-01 123 aaa 101 +2000-01-01 234 cde 2 +*** Test several UPDATE commands with common subexpressions *** +2000-01-01 123 abc 26 +*** Test predicates with IN operator *** +2000-01-01 234 cdeccc 20 +2000-01-01 345 fghccc 30 +2000-01-01 456 iii 40 +*** Test UPDATE of columns that DELETE depends on *** +2000-01-01 234 cde 30 +*** Test complex mixture of UPDATEs and DELETEs *** +2000-01-01 456 ijk_materialized_47 40 +*** Test updating columns that MATERIALIZED columns depend on *** +Updating column that MATERIALIZED key column depends on should fail +17 materialized_24 +27 materialized_34 +30 materialized_37 +40 materialized_47 diff --git a/dbms/tests/queries/0_stateless/00652_mutations_alter_update.sh b/dbms/tests/queries/0_stateless/00652_mutations_alter_update.sh new file mode 100755 index 00000000000..01a17481590 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00652_mutations_alter_update.sh @@ -0,0 +1,134 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +. $CURDIR/mergetree_mutations.lib + +${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS test.alter_update" + +${CLICKHOUSE_CLIENT} --query="CREATE TABLE test.alter_update \ + (d Date, key UInt32, value1 String, value2 UInt64, materialized_value String MATERIALIZED concat('materialized_', toString(value2 + 7))) \ + ENGINE MergeTree ORDER BY key PARTITION BY toYYYYMM(d)" + + +${CLICKHOUSE_CLIENT} --query="SELECT '*** Test expected failures ***'" + +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update UPDATE d = today() WHERE 1" 2>/dev/null || echo "Updating partition key should fail" +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update UPDATE key = 1 WHERE 1" 2>/dev/null || echo "Updating primary key should fail" +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update UPDATE materialized_value = 'aaa' WHERE 1" 2>/dev/null || echo "Updating MATERIALIZED column should fail" +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update UPDATE value1 = 'aaa' WHERE 'string'" 2>/dev/null || echo "Updating with non-UInt8 predicate should fail" + + +${CLICKHOUSE_CLIENT} --query="SELECT '*** Test updating according to a predicate ***'" + +${CLICKHOUSE_CLIENT} --query="INSERT INTO test.alter_update VALUES \ + ('2000-01-01', 123, 'abc', 1), \ + ('2000-01-01', 234, 'cde', 2)" + +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update UPDATE value1 = 'aaa', value2 = value2 + 100 WHERE key < 200" +wait_for_mutation "alter_update" "mutation_2.txt" + +${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.alter_update ORDER BY key" + +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update DROP PARTITION 200001" + + +${CLICKHOUSE_CLIENT} --query="SELECT '*** Test several UPDATE commands with common subexpressions ***'" + +${CLICKHOUSE_CLIENT} --query="INSERT INTO test.alter_update VALUES ('2000-01-01', 123, 'abc', 49)" + +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update \ + UPDATE value2 = (value2 + 1) / 2 WHERE 1, \ + UPDATE value2 = value2 + 1 WHERE 1" +wait_for_mutation "alter_update" "mutation_4.txt" + +${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.alter_update ORDER BY key" + +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update DROP PARTITION 200001" + + +${CLICKHOUSE_CLIENT} --query="SELECT '*** Test predicates with IN operator ***'" + +${CLICKHOUSE_CLIENT} --query="INSERT INTO test.alter_update VALUES \ + ('2000-01-01', 123, 'abc', 10), \ + ('2000-01-01', 234, 'cde', 20), \ + ('2000-01-01', 345, 'fgh', 30), \ + ('2000-01-01', 456, 'ijk', 40)" + +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update \ + DELETE WHERE key IN (SELECT toUInt32(arrayJoin([121, 122, 123]))), \ + UPDATE value1 = concat(value1, 'ccc') WHERE value2 IN (20, 30), \ + UPDATE value1 = 'iii' WHERE value2 IN (SELECT toUInt64(40))" +wait_for_mutation "alter_update" "mutation_6.txt" + +${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.alter_update ORDER BY key" + +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update DROP PARTITION 200001" + + +${CLICKHOUSE_CLIENT} --query="SELECT '*** Test UPDATE of columns that DELETE depends on ***'" + +${CLICKHOUSE_CLIENT} --query="INSERT INTO test.alter_update VALUES \ + ('2000-01-01', 123, 'abc', 10), \ + ('2000-01-01', 234, 'cde', 20)" + +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update \ + UPDATE value2 = value2 + 10 WHERE 1, \ + DELETE WHERE value2 = 20" +wait_for_mutation "alter_update" "mutation_8.txt" + +${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.alter_update ORDER BY key" + +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update DROP PARTITION 200001" + + +${CLICKHOUSE_CLIENT} --query="SELECT '*** Test complex mixture of UPDATEs and DELETEs ***'" + +${CLICKHOUSE_CLIENT} --query="INSERT INTO test.alter_update VALUES \ + ('2000-01-01', 123, 'abc', 10), \ + ('2000-01-01', 234, 'cde', 20), \ + ('2000-01-01', 345, 'fgh', 30), \ + ('2000-01-01', 456, 'ijk', 40)" + +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update \ + DELETE WHERE value2 IN (8, 9, 10), \ + UPDATE value2 = value2 + 10 WHERE value2 <= 10, \ + DELETE WHERE length(value1) + value2 = 23, \ + DELETE WHERE materialized_value = 'materialized_37', \ + UPDATE value1 = concat(value1, '_', materialized_value) WHERE key = 456" +wait_for_mutation "alter_update" "mutation_10.txt" + +${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.alter_update ORDER BY key" + +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update DROP PARTITION 200001" + + +${CLICKHOUSE_CLIENT} --query="SELECT '*** Test updating columns that MATERIALIZED columns depend on ***'" + +${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS test.materialized_key" + +${CLICKHOUSE_CLIENT} --query="CREATE TABLE test.materialized_key \ + (key UInt32 MATERIALIZED value + 1, value UInt32) \ + ENGINE MergeTree ORDER BY key" + +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.materialized_key UPDATE value = 1 WHERE 1" 2>/dev/null || echo "Updating column that MATERIALIZED key column depends on should fail" + +${CLICKHOUSE_CLIENT} --query="DROP TABLE test.materialized_key" + +${CLICKHOUSE_CLIENT} --query="INSERT INTO test.alter_update VALUES \ + ('2000-01-01', 123, 'abc', 10), \ + ('2000-01-01', 234, 'cde', 20), \ + ('2000-01-01', 345, 'fgh', 30), \ + ('2000-01-01', 456, 'ijk', 40)" + +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update \ + UPDATE value2 = value2 + 7 WHERE value2 <= 20" +wait_for_mutation "alter_update" "mutation_12.txt" + +${CLICKHOUSE_CLIENT} --query="SELECT value2, materialized_value FROM test.alter_update ORDER BY key" + +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update DROP PARTITION 200001" + + +${CLICKHOUSE_CLIENT} --query="DROP TABLE test.alter_update" diff --git a/dbms/tests/queries/0_stateless/mergetree_mutations.lib b/dbms/tests/queries/0_stateless/mergetree_mutations.lib index 0df275092fe..eb2f4030eba 100644 --- a/dbms/tests/queries/0_stateless/mergetree_mutations.lib +++ b/dbms/tests/queries/0_stateless/mergetree_mutations.lib @@ -8,7 +8,7 @@ function wait_for_mutation() for i in {1..100} do sleep 0.1 - if [[ $(${CLICKHOUSE_CLIENT} --query="SELECT is_done FROM system.mutations WHERE table='$table' AND mutation_id='$mutation_id'") -eq 1 ]]; then + if [[ $(${CLICKHOUSE_CLIENT} --query="SELECT min(is_done) FROM system.mutations WHERE table='$table' AND mutation_id='$mutation_id'") -eq 1 ]]; then break fi diff --git a/docs/en/query_language/alter.md b/docs/en/query_language/alter.md index d5db2ae5abb..420a943b17b 100644 --- a/docs/en/query_language/alter.md +++ b/docs/en/query_language/alter.md @@ -230,13 +230,19 @@ The functionality is in beta stage and is available starting with the 1.1.54388 Existing tables are ready for mutations as-is (no conversion necessary), but after the first mutation is applied to a table, its metadata format becomes incompatible with previous server versions and falling back to a previous version becomes impossible. -At the moment the `ALTER DELETE` command is available: +Currently available commands: ```sql -ALTER TABLE [db.]table DELETE WHERE expr +ALTER TABLE [db.]table DELETE WHERE filter_expr ``` -The expression `expr` must be of UInt8 type. The query deletes rows for which this expression evaluates to a non-zero value. +The `filter_expr` must be of type UInt8. The query deletes rows in the table for which this expression takes a non-zero value. + +```sql +ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr +``` + +The `filter_expr` must be of type UInt8. This query updates values of specified columns to the values of corresponding expressions in rows for which the `filter_expr` takes a non-zero value. Values are casted to the column type using the `CAST` operator. Updating columns that are used in the calculation of the primary or the partition key is not supported. One query can contain several commands separated by commas. diff --git a/docs/ru/query_language/alter.md b/docs/ru/query_language/alter.md index 2478954f26e..d6c8a5137ed 100644 --- a/docs/ru/query_language/alter.md +++ b/docs/ru/query_language/alter.md @@ -229,13 +229,19 @@ ALTER TABLE [db.]table FETCH PARTITION 'name' FROM 'path-in-zookeeper' Конвертировать существующие таблицы для работы с мутациями не нужно. Но после применения первой мутации формат данных таблицы становится несовместимым с предыдущими версиями и откатиться на предыдущую версию уже не получится. -На данный момент доступна команда `ALTER DELETE`: +На данный момент доступны команды: ```sql -ALTER TABLE [db.]table DELETE WHERE expr +ALTER TABLE [db.]table DELETE WHERE filter_expr ``` -Выражение `expr` должно иметь тип UInt8. Запрос удаляет строки таблицы, для которых это выражение принимает ненулевое значение. +Выражение `filter_expr` должно иметь тип UInt8. Запрос удаляет строки таблицы, для которых это выражение принимает ненулевое значение. + +```sql +ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr +``` + +Выражение `filter_expr` должно иметь тип UInt8. Запрос изменяет значение указанных столбцов на вычисленное значение соответствующих выражений в каждой строке, для которой `filter_expr` пронимает ненулевое значение. Вычисленные значения преобразуются к типу столбца с помощью оператора `CAST`. Изменение столбцов, которые используются при вычислении первичного ключа или ключа партиционирования, не поддерживается. В одном запросе можно указать несколько команд через запятую.