Merge pull request #3035 from yandex/alter-update

ALTER UPDATE
This commit is contained in:
alexey-milovidov 2018-09-13 06:03:35 +03:00 committed by GitHub
commit 0223225a05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 1074 additions and 377 deletions

View File

@ -393,7 +393,9 @@ namespace ErrorCodes
extern const int REPLICA_STATUS_CHANGED = 416;
extern const int EXPECTED_ALL_OR_ANY = 417;
extern const int UNKNOWN_JOIN_STRICTNESS = 418;
extern const int CANNOT_ADD_DIFFERENT_AGGREGATE_STATES = 419;
extern const int MULTIPLE_ASSIGNMENTS_TO_COLUMN = 419;
extern const int CANNOT_UPDATE_COLUMN = 420;
extern const int CANNOT_ADD_DIFFERENT_AGGREGATE_STATES = 421;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -0,0 +1,36 @@
#include <Common/createHardLink.h>
#include <Common/Exception.h>
#include <errno.h>
#include <unistd.h>
#include <sys/stat.h>
namespace DB
{
void createHardLink(const String & source_path, const String & destination_path)
{
if (0 != link(source_path.c_str(), destination_path.c_str()))
{
if (errno == EEXIST)
{
auto link_errno = errno;
struct stat source_descr;
struct stat destination_descr;
if (0 != lstat(source_path.c_str(), &source_descr))
throwFromErrno("Cannot stat " + source_path);
if (0 != lstat(destination_path.c_str(), &destination_descr))
throwFromErrno("Cannot stat " + destination_path);
if (source_descr.st_ino != destination_descr.st_ino)
throwFromErrno("Destination file " + destination_path + " is already exist and have different inode.", 0, link_errno);
}
else
throwFromErrno("Cannot link " + source_path + " to " + destination_path);
}
}
}

View File

@ -0,0 +1,12 @@
#pragma once
#include <Core/Types.h>
namespace DB
{
/// Create a hard link `destination_path` pointing to `source_path`.
/// If the destination already exists, check that it has the same inode (and throw if they are different).
void createHardLink(const String & source_path, const String & destination_path);
}

View File

@ -1,22 +1,21 @@
#include "localBackup.h"
#include <sys/stat.h>
#include <string>
#include <iostream>
#include <Common/localBackup.h>
#include <Common/createHardLink.h>
#include <Common/Exception.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
#include <Common/Exception.h>
#include <port/unistd.h>
#include <string>
#include <iostream>
#include <errno.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_DEEP_RECURSION;
extern const int DIRECTORY_ALREADY_EXISTS;
}
}
static void localBackupImpl(const Poco::Path & source_path, const Poco::Path & destination_path, size_t level,
@ -41,33 +40,7 @@ static void localBackupImpl(const Poco::Path & source_path, const Poco::Path & d
{
dir_it->setReadOnly();
std::string source_str = source.toString();
std::string destination_str = destination.toString();
/** We are trying to create a hard link.
* If it already exists, we check that source and destination point to the same inode.
*/
if (0 != link(source_str.c_str(), destination_str.c_str()))
{
if (errno == EEXIST)
{
auto link_errno = errno;
struct stat source_descr;
struct stat destination_descr;
if (0 != lstat(source_str.c_str(), &source_descr))
DB::throwFromErrno("Cannot stat " + source_str);
if (0 != lstat(destination_str.c_str(), &destination_descr))
DB::throwFromErrno("Cannot stat " + destination_str);
if (source_descr.st_ino != destination_descr.st_ino)
DB::throwFromErrno("Destination file " + destination_str + " is already exist and have different inode.", 0, link_errno);
}
else
DB::throwFromErrno("Cannot link " + source_str + " to " + destination_str);
}
createHardLink(source.toString(), destination.toString());
}
else
{
@ -120,3 +93,5 @@ void localBackup(const Poco::Path & source_path, const Poco::Path & destination_
break;
}
}
}

View File

@ -4,6 +4,9 @@
#include <optional>
namespace DB
{
/** Creates a local (at the same mount point) backup (snapshot) directory.
*
* In the specified destination directory, it creates a hard links on all source-directory files
@ -19,3 +22,4 @@
*/
void localBackup(const Poco::Path & source_path, const Poco::Path & destination_path, std::optional<size_t> max_level = {});
}

View File

@ -1,86 +0,0 @@
#include <DataStreams/ApplyingMutationsBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <Parsers/ASTFunction.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <IO/WriteHelpers.h>
namespace DB
{
ApplyingMutationsBlockInputStream::ApplyingMutationsBlockInputStream(
const BlockInputStreamPtr & input, const std::vector<MutationCommand> & commands, const Context & context)
{
children.push_back(input);
if (commands.empty())
{
impl = input;
return;
}
/// Create a total predicate for all mutations and then pass it to a single FilterBlockInputStream
/// because ExpressionAnalyzer won't detect that some columns in the block are already calculated
/// and will try to calculate them twice. This works as long as all mutations are DELETE.
/// TODO: fix ExpressionAnalyzer.
std::vector<ASTPtr> predicates;
for (const MutationCommand & cmd : commands)
{
switch (cmd.type)
{
case MutationCommand::DELETE:
{
auto predicate = std::make_shared<ASTFunction>();
predicate->name = "not";
predicate->arguments = std::make_shared<ASTExpressionList>();
predicate->arguments->children.push_back(cmd.predicate);
predicate->children.push_back(predicate->arguments);
predicates.push_back(predicate);
break;
}
default:
throw Exception("Unsupported mutation cmd type: " + toString<int>(cmd.type),
ErrorCodes::LOGICAL_ERROR);
}
}
ASTPtr total_predicate;
if (predicates.size() == 1)
total_predicate = predicates[0];
else
{
auto and_func = std::make_shared<ASTFunction>();
and_func->name = "and";
and_func->arguments = std::make_shared<ASTExpressionList>();
and_func->children.push_back(and_func->arguments);
and_func->arguments->children = predicates;
total_predicate = and_func;
}
auto predicate_expr = ExpressionAnalyzer(
total_predicate, context, nullptr, input->getHeader().getNamesAndTypesList()).getActions(false);
String col_name = total_predicate->getColumnName();
impl = std::make_shared<FilterBlockInputStream>(input, predicate_expr, col_name);
}
Block ApplyingMutationsBlockInputStream::getHeader() const
{
return impl->getHeader();
}
Block ApplyingMutationsBlockInputStream::getTotals()
{
if (IProfilingBlockInputStream * profiling = dynamic_cast<IProfilingBlockInputStream *>(impl.get()))
return profiling->getTotals();
return IProfilingBlockInputStream::getTotals();
}
Block ApplyingMutationsBlockInputStream::readImpl()
{
return impl->read();
}
}

View File

@ -1,30 +0,0 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/MutationCommands.h>
#include <Interpreters/Context.h>
namespace DB
{
/// A stream that pulls the blocks from `input` and executes mutation commands on these blocks
/// (see Storages/MutationCommands.h for a full list).
/// Example: if mutation commands contain ALTER DELETE, this stream will delete rows that satisfy the predicate specified in the command.
class ApplyingMutationsBlockInputStream : public IProfilingBlockInputStream
{
public:
ApplyingMutationsBlockInputStream(
const BlockInputStreamPtr & input, const std::vector<MutationCommand> & commands, const Context & context);
String getName() const override { return "ApplyingMutations"; }
Block getHeader() const override;
Block getTotals() override;
private:
Block readImpl() override;
BlockInputStreamPtr impl;
};
}

View File

@ -110,12 +110,13 @@ ExpressionAction ExpressionAction::removeColumn(const std::string & removed_name
return a;
}
ExpressionAction ExpressionAction::copyColumn(const std::string & from_name, const std::string & to_name)
ExpressionAction ExpressionAction::copyColumn(const std::string & from_name, const std::string & to_name, bool can_replace)
{
ExpressionAction a;
a.type = COPY_COLUMN;
a.source_name = from_name;
a.result_name = to_name;
a.can_replace = can_replace;
return a;
}
@ -309,8 +310,23 @@ void ExpressionAction::prepare(Block & sample_block)
case COPY_COLUMN:
{
result_type = sample_block.getByName(source_name).type;
sample_block.insert(ColumnWithTypeAndName(sample_block.getByName(source_name).column, result_type, result_name));
const auto & source = sample_block.getByName(source_name);
result_type = source.type;
if (sample_block.has(result_name))
{
if (can_replace)
{
auto & result = sample_block.getByName(result_name);
result.type = result_type;
result.column = source.column;
}
else
throw Exception("Column '" + result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);
}
else
sample_block.insert(ColumnWithTypeAndName(source.column, result_type, result_name));
break;
}
@ -358,7 +374,7 @@ void ExpressionAction::execute(Block & block, std::unordered_map<std::string, si
if (!block.has(source_name))
throw Exception("Not found column '" + source_name + "'. There are columns: " + block.dumpNames(), ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
if (type == ADD_COLUMN || type == COPY_COLUMN || type == APPLY_FUNCTION)
if (type == ADD_COLUMN || (type == COPY_COLUMN && !can_replace) || type == APPLY_FUNCTION)
if (block.has(result_name))
throw Exception("Column '" + result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);
@ -501,7 +517,15 @@ void ExpressionAction::execute(Block & block, std::unordered_map<std::string, si
break;
case COPY_COLUMN:
if (can_replace && block.has(result_name))
{
auto & result = block.getByName(result_name);
result.type = result_type;
result.column = block.getByName(source_name).column;
}
else
block.insert({ block.getByName(source_name).column, result_type, result_name });
break;
default:
@ -538,6 +562,8 @@ std::string ExpressionAction::toString() const
case COPY_COLUMN:
ss << "COPY " << result_name << " = " << source_name;
if (can_replace)
ss << " (can replace)";
break;
case APPLY_FUNCTION:
@ -645,9 +671,6 @@ void ExpressionActions::add(const ExpressionAction & action)
void ExpressionActions::addImpl(ExpressionAction action, Names & new_names)
{
if (sample_block.has(action.result_name))
return;
if (action.result_name != "")
new_names.push_back(action.result_name);
new_names.insert(new_names.end(), action.array_joined_columns.begin(), action.array_joined_columns.end());

View File

@ -75,6 +75,9 @@ public:
std::string result_name;
DataTypePtr result_type;
/// If COPY_COLUMN can replace the result column.
bool can_replace = false;
/// For conditional projections (projections on subset of rows)
std::string row_projection_column;
bool is_row_projection_complementary = false;
@ -109,7 +112,7 @@ public:
const std::string & row_projection_column,
bool is_row_projection_complementary);
static ExpressionAction removeColumn(const std::string & removed_name);
static ExpressionAction copyColumn(const std::string & from_name, const std::string & to_name);
static ExpressionAction copyColumn(const std::string & from_name, const std::string & to_name, bool can_replace = false);
static ExpressionAction project(const NamesWithAliases & projected_columns_);
static ExpressionAction project(const Names & projected_columns_);
static ExpressionAction addAliases(const NamesWithAliases & aliased_columns_);

View File

@ -2906,6 +2906,15 @@ void ExpressionAnalyzer::appendProjectResult(ExpressionActionsChain & chain) con
}
void ExpressionAnalyzer::appendExpression(ExpressionActionsChain & chain, const ASTPtr & expr, bool only_types)
{
initChain(chain, source_columns);
ExpressionActionsChain::Step & step = chain.steps.back();
getRootActions(expr, only_types, false, step.actions);
step.required_output.push_back(expr->getColumnName());
}
void ExpressionAnalyzer::getActionsBeforeAggregation(const ASTPtr & ast, ExpressionActionsPtr & actions, bool no_subqueries)
{
ASTFunction * node = typeid_cast<ASTFunction *>(ast.get());

View File

@ -156,6 +156,8 @@ public:
/// Deletes all columns except mentioned by SELECT, arranges the remaining columns and renames them to aliases.
void appendProjectResult(ExpressionActionsChain & chain) const;
void appendExpression(ExpressionActionsChain & chain, const ASTPtr & expr, bool only_types);
/// If `ast` is not a SELECT query, just gets all the actions to evaluate the expression.
/// If add_aliases, only the calculated values in the desired order and add aliases.
/// If also project_result, than only aliases remain in the output block.
@ -171,9 +173,9 @@ public:
* That is, you need to call getSetsWithSubqueries after all calls of `append*` or `getActions`
* and create all the returned sets before performing the actions.
*/
SubqueriesForSets getSubqueriesForSets() const { return subqueries_for_sets; }
const SubqueriesForSets & getSubqueriesForSets() const { return subqueries_for_sets; }
PreparedSets getPreparedSets() { return prepared_sets; }
const PreparedSets & getPreparedSets() const { return prepared_sets; }
/** Tables that will need to be sent to remote servers for distributed query processing.
*/

View File

@ -1,5 +1,6 @@
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/MutationsInterpreter.h>
#include <Parsers/ASTAlterQuery.h>
#include <Common/typeid_cast.h>
@ -49,7 +50,7 @@ BlockIO InterpreterAlterQuery::execute()
if (!mutation_commands.empty())
{
mutation_commands.validate(*table, context);
MutationsInterpreter(table, mutation_commands, context).validate();
table->mutate(mutation_commands, context);
}

View File

@ -0,0 +1,424 @@
#include <Interpreters/MutationsInterpreter.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/formatAST.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_MUTATION_COMMAND;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int CANNOT_UPDATE_COLUMN;
}
bool MutationsInterpreter::isStorageTouchedByMutations() const
{
if (commands.empty())
return false;
for (const MutationCommand & command : commands)
{
if (!command.predicate) /// The command touches all rows.
return true;
}
/// Execute `SELECT count() FROM storage WHERE predicate1 OR predicate2 OR ...` query.
/// The result can differ from tne number of affected rows (e.g. if there is an UPDATE command that
/// changes how many rows satisfy the predicates of the subsequent commands).
/// But we can be sure that if count = 0, then no rows will be touched.
auto select = std::make_shared<ASTSelectQuery>();
select->select_expression_list = std::make_shared<ASTExpressionList>();
select->children.push_back(select->select_expression_list);
auto count_func = std::make_shared<ASTFunction>();
count_func->name = "count";
count_func->arguments = std::make_shared<ASTExpressionList>();
select->select_expression_list->children.push_back(count_func);
if (commands.size() == 1)
select->where_expression = commands[0].predicate;
else
{
auto coalesced_predicates = std::make_shared<ASTFunction>();
coalesced_predicates->name = "or";
coalesced_predicates->arguments = std::make_shared<ASTExpressionList>();
coalesced_predicates->children.push_back(coalesced_predicates->arguments);
for (const MutationCommand & command : commands)
coalesced_predicates->arguments->children.push_back(command.predicate);
select->where_expression = std::move(coalesced_predicates);
}
select->children.push_back(select->where_expression);
auto context_copy = context;
context_copy.getSettingsRef().merge_tree_uniform_read_distribution = 0;
context_copy.getSettingsRef().max_threads = 1;
InterpreterSelectQuery interpreter_select(select, context_copy, storage, QueryProcessingStage::Complete);
BlockInputStreamPtr in = interpreter_select.execute().in;
Block block = in->read();
if (!block.rows())
return false;
else if (block.rows() != 1)
throw Exception("count() expression returned " + toString(block.rows()) + " rows, not 1",
ErrorCodes::LOGICAL_ERROR);
auto count = (*block.getByName("count()").column)[0].get<UInt64>();
return count != 0;
}
static NameSet getKeyColumns(const StoragePtr & storage)
{
const MergeTreeData * merge_tree_data = nullptr;
if (auto merge_tree = dynamic_cast<StorageMergeTree *>(storage.get()))
merge_tree_data = &merge_tree->getData();
else if (auto replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(storage.get()))
merge_tree_data = &replicated_merge_tree->getData();
else
return {};
NameSet key_columns;
if (merge_tree_data->partition_expr)
for (const String & col : merge_tree_data->partition_expr->getRequiredColumns())
key_columns.insert(col);
auto primary_expr = merge_tree_data->getPrimaryExpression();
if (primary_expr)
for (const String & col : primary_expr->getRequiredColumns())
key_columns.insert(col);
/// We don't process sampling_expression separately because it must be among the primary key columns.
auto secondary_sort_expr = merge_tree_data->getSecondarySortExpression();
if (secondary_sort_expr)
for (const String & col : secondary_sort_expr->getRequiredColumns())
key_columns.insert(col);
if (!merge_tree_data->merging_params.sign_column.empty())
key_columns.insert(merge_tree_data->merging_params.sign_column);
return key_columns;
}
static void validateUpdateColumns(
const StoragePtr & storage, const NameSet & updated_columns,
const std::unordered_map<String, Names> & column_to_affected_materialized)
{
NameSet key_columns = getKeyColumns(storage);
for (const String & column_name : updated_columns)
{
auto found = false;
for (const auto & col : storage->getColumns().ordinary)
{
if (col.name == column_name)
{
found = true;
break;
}
}
if (!found)
{
for (const auto & col : storage->getColumns().materialized)
{
if (col.name == column_name)
throw Exception("Cannot UPDATE materialized column `" + column_name + "`", ErrorCodes::CANNOT_UPDATE_COLUMN);
}
throw Exception("There is no column `" + column_name + "` in table", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
}
if (key_columns.count(column_name))
throw Exception("Cannot UPDATE key column `" + column_name + "`", ErrorCodes::CANNOT_UPDATE_COLUMN);
auto materialized_it = column_to_affected_materialized.find(column_name);
if (materialized_it != column_to_affected_materialized.end())
{
for (const String & materialized : materialized_it->second)
{
if (key_columns.count(materialized))
throw Exception("Updated column `" + column_name + "` affects MATERIALIZED column `"
+ materialized + "`, which is a key column. Cannot UPDATE it.",
ErrorCodes::CANNOT_UPDATE_COLUMN);
}
}
}
}
void MutationsInterpreter::prepare(bool dry_run)
{
if (is_prepared)
throw Exception("MutationsInterpreter is already prepared. It is a bug.", ErrorCodes::LOGICAL_ERROR);
if (commands.empty())
throw Exception("Empty mutation commands list", ErrorCodes::LOGICAL_ERROR);
const ColumnsDescription & columns_desc = storage->getColumns();
NamesAndTypesList all_columns = columns_desc.getAllPhysical();
NameSet updated_columns;
for (const MutationCommand & command : commands)
{
for (const auto & kv : command.column_to_update_expression)
updated_columns.insert(kv.first);
}
/// We need to know which columns affect which MATERIALIZED columns to recalculate them if dependencies
/// are updated.
std::unordered_map<String, Names> column_to_affected_materialized;
if (!updated_columns.empty())
{
for (const auto & kv : columns_desc.defaults)
{
const String & column = kv.first;
const ColumnDefault & col_default = kv.second;
if (col_default.kind == ColumnDefaultKind::Materialized)
{
ExpressionAnalyzer analyzer(col_default.expression->clone(), context, nullptr, all_columns);
for (const String & dependency : analyzer.getRequiredSourceColumns())
{
if (updated_columns.count(dependency))
column_to_affected_materialized[dependency].push_back(column);
}
}
}
}
if (!updated_columns.empty())
validateUpdateColumns(storage, updated_columns, column_to_affected_materialized);
/// First, break a sequence of commands into stages.
stages.emplace_back(context);
for (const auto & command : commands)
{
if (!stages.back().column_to_updated.empty())
stages.emplace_back(context);
if (command.type == MutationCommand::DELETE)
{
auto negated_predicate = makeASTFunction("not", command.predicate->clone());
stages.back().filters.push_back(negated_predicate);
}
else if (command.type == MutationCommand::UPDATE)
{
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
stages.emplace_back(context);
NameSet affected_materialized;
for (const auto & kv : command.column_to_update_expression)
{
const String & column = kv.first;
auto materialized_it = column_to_affected_materialized.find(column);
if (materialized_it != column_to_affected_materialized.end())
{
for (const String & mat_column : materialized_it->second)
affected_materialized.emplace(mat_column);
}
const auto & update_expr = kv.second;
auto updated_column = makeASTFunction("CAST",
makeASTFunction("if",
command.predicate->clone(),
update_expr->clone(),
std::make_shared<ASTIdentifier>(column)),
std::make_shared<ASTLiteral>(columns_desc.getPhysical(column).type->getName()));
stages.back().column_to_updated.emplace(column, updated_column);
}
if (!affected_materialized.empty())
{
stages.emplace_back(context);
for (const auto & column : columns_desc.materialized)
{
stages.back().column_to_updated.emplace(
column.name,
columns_desc.defaults.at(column.name).expression->clone());
}
}
}
else
throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
}
/// Next, for each stage calculate columns changed by this and previous stages.
for (size_t i = 0; i < stages.size(); ++i)
{
if (!stages[i].filters.empty())
{
for (const auto & column : all_columns)
stages[i].output_columns.insert(column.name);
continue;
}
if (i > 0)
stages[i].output_columns = stages[i - 1].output_columns;
if (stages[i].output_columns.size() < all_columns.size())
{
for (const auto & kv : stages[i].column_to_updated)
stages[i].output_columns.insert(kv.first);
}
}
/// Now, calculate `expressions_chain` for each stage except the first.
/// Do it backwards to propagate information about columns required as input for a stage to the previous stage.
for (size_t i = stages.size() - 1; i > 0; --i)
{
auto & stage = stages[i];
ASTPtr all_asts = std::make_shared<ASTExpressionList>();
for (const auto & ast : stage.filters)
all_asts->children.push_back(ast);
for (const auto & kv : stage.column_to_updated)
all_asts->children.push_back(kv.second);
/// Add all output columns to prevent ExpressionAnalyzer from deleting them from source columns.
for (const String & column : stage.output_columns)
all_asts->children.push_back(std::make_shared<ASTIdentifier>(column));
stage.analyzer = std::make_unique<ExpressionAnalyzer>(all_asts, context, nullptr, all_columns);
ExpressionActionsChain & actions_chain = stage.expressions_chain;
for (const auto & ast : stage.filters)
{
if (!actions_chain.steps.empty())
actions_chain.addStep();
stage.analyzer->appendExpression(actions_chain, ast, dry_run);
stage.filter_column_names.push_back(ast->getColumnName());
}
if (!stage.column_to_updated.empty())
{
if (!actions_chain.steps.empty())
actions_chain.addStep();
for (const auto & kv : stage.column_to_updated)
stage.analyzer->appendExpression(actions_chain, kv.second, dry_run);
for (const auto & kv : stage.column_to_updated)
{
actions_chain.getLastActions()->add(ExpressionAction::copyColumn(
kv.second->getColumnName(), kv.first, /* can_replace = */ true));
}
}
/// Remove all intermediate columns.
actions_chain.addStep();
actions_chain.getLastStep().required_output.assign(stage.output_columns.begin(), stage.output_columns.end());
actions_chain.finalize();
/// Propagate information about columns needed as input.
for (const auto & column : actions_chain.steps.front().actions->getRequiredColumnsWithTypes())
stages[i - 1].output_columns.insert(column.name);
}
/// Execute first stage as a SELECT statement.
auto select = std::make_shared<ASTSelectQuery>();
select->select_expression_list = std::make_shared<ASTExpressionList>();
select->children.push_back(select->select_expression_list);
for (const auto & column_name : stages[0].output_columns)
select->select_expression_list->children.push_back(std::make_shared<ASTIdentifier>(column_name));
if (!stages[0].filters.empty())
{
ASTPtr where_expression;
if (stages[0].filters.size() == 1)
where_expression = stages[0].filters[0];
else
{
auto coalesced_predicates = std::make_shared<ASTFunction>();
coalesced_predicates->name = "and";
coalesced_predicates->arguments = std::make_shared<ASTExpressionList>();
coalesced_predicates->children.push_back(coalesced_predicates->arguments);
coalesced_predicates->arguments->children = stages[0].filters;
where_expression = std::move(coalesced_predicates);
}
select->where_expression = where_expression;
select->children.push_back(where_expression);
}
interpreter_select = std::make_unique<InterpreterSelectQuery>(select, context, storage, QueryProcessingStage::Complete, dry_run);
is_prepared = true;
}
BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(BlockInputStreamPtr in) const
{
for (size_t i_stage = 1; i_stage < stages.size(); ++i_stage)
{
const Stage & stage = stages[i_stage];
for (size_t i = 0; i < stage.expressions_chain.steps.size(); ++i)
{
const auto & step = stage.expressions_chain.steps[i];
if (i < stage.filter_column_names.size())
{
/// Execute DELETEs.
in = std::make_shared<FilterBlockInputStream>(in, step.actions, stage.filter_column_names[i]);
}
else
{
/// Execute UPDATE or final projection.
in = std::make_shared<ExpressionBlockInputStream>(in, step.actions);
}
}
const SubqueriesForSets & subqueries_for_sets = stage.analyzer->getSubqueriesForSets();
if (!subqueries_for_sets.empty())
{
const auto & settings = context.getSettingsRef();
in = std::make_shared<CreatingSetsBlockInputStream>(in, subqueries_for_sets,
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode));
}
}
in = std::make_shared<MaterializingBlockInputStream>(in);
return in;
}
void MutationsInterpreter::validate()
{
prepare(/* dry_run = */ true);
Block first_stage_header = interpreter_select->getSampleBlock();
BlockInputStreamPtr in = std::make_shared<NullBlockInputStream>(first_stage_header);
addStreamsForLaterStages(in)->getHeader();
}
BlockInputStreamPtr MutationsInterpreter::execute()
{
prepare(/* dry_run = */ false);
BlockInputStreamPtr in = interpreter_select->execute().in;
return addStreamsForLaterStages(in);
}
}

View File

@ -0,0 +1,85 @@
#pragma once
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/IStorage.h>
#include <Storages/MutationCommands.h>
namespace DB
{
class Context;
/// Create an input stream that will read data from storage and apply mutation commands (UPDATEs, DELETEs)
/// to this data.
class MutationsInterpreter
{
public:
MutationsInterpreter(StoragePtr storage_, std::vector<MutationCommand> commands_, const Context & context_)
: storage(std::move(storage_))
, commands(std::move(commands_))
, context(context_)
{
}
void validate();
/// Return false if the data isn't going to be changed by mutations.
bool isStorageTouchedByMutations() const;
/// The resulting stream will return blocks containing changed columns only.
BlockInputStreamPtr execute();
private:
void prepare(bool dry_run);
BlockInputStreamPtr addStreamsForLaterStages(BlockInputStreamPtr in) const;
private:
StoragePtr storage;
std::vector<MutationCommand> commands;
const Context & context;
/// A sequence of mutation commands is executed as a sequence of stages. Each stage consists of several
/// filters, followed by updating values of some columns. Commands can reuse expressions calculated by the
/// previous commands in the same stage, but at the end of each stage intermediate columns are thrown away
/// (they may contain wrong values because the column values have been updated).
///
/// If an UPDATE command changes some columns that some MATERIALIZED columns depend on, a stage to
/// recalculate these columns is added.
///
/// Each stage has output_columns that contain columns that are changed at the end of that stage
/// plus columns needed for the next mutations.
///
/// First stage is special: it can contain only DELETEs and is executed using InterpreterSelectQuery
/// to take advantage of table indexes (if there are any).
struct Stage
{
Stage(const Context & context) : expressions_chain(context) {}
ASTs filters;
std::unordered_map<String, ASTPtr> column_to_updated;
/// Contains columns that are changed by this stage,
/// columns changed by the previous stages and also columns needed by the next stages.
NameSet output_columns;
std::unique_ptr<ExpressionAnalyzer> analyzer;
/// A chain of actions needed to execute this stage.
/// First steps calculate filter columns for DELETEs (in the same order as in `filter_column_names`),
/// then there is (possibly) an UPDATE stage, and finally a projection stage.
ExpressionActionsChain expressions_chain;
Names filter_column_names;
};
std::unique_ptr<InterpreterSelectQuery> interpreter_select;
std::vector<Stage> stages;
bool is_prepared = false; /// Has the sequence of stages been prepared.
};
}

View File

@ -133,6 +133,14 @@ void ASTAlterCommand::formatImpl(
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "DELETE WHERE " << (settings.hilite ? hilite_none : "");
predicate->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::UPDATE)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "UPDATE " << (settings.hilite ? hilite_none : "");
update_assignments->formatImpl(settings, state, frame);
settings.ostr << (settings.hilite ? hilite_keyword : "") << " WHERE " << (settings.hilite ? hilite_none : "");
predicate->formatImpl(settings, state, frame);
}
else
throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
}

View File

@ -33,6 +33,7 @@ public:
FREEZE_PARTITION,
DELETE,
UPDATE,
NO_TYPE,
};
@ -59,9 +60,12 @@ public:
*/
ASTPtr partition;
/// For DELETE WHERE: the predicate that filters the rows to delete.
/// For DELETE/UPDATE WHERE: the predicate that filters the rows to delete/update.
ASTPtr predicate;
/// A list of expressions of the form `column = expr` for the UPDATE command.
ASTPtr update_assignments;
bool detach = false; /// true for DETACH PARTITION
bool part = false; /// true for ATTACH PART

View File

@ -0,0 +1,44 @@
#pragma once
#include <Parsers/IAST.h>
namespace DB
{
/// Part of the ALTER UPDATE statement of the form: column = expr
class ASTAssignment : public IAST
{
public:
String column_name;
ASTPtr expression;
String getID() const override { return "Assignment_" + column_name; }
ASTPtr clone() const override
{
auto res = std::make_shared<ASTAssignment>(*this);
res->children.clear();
if (expression)
{
res->expression = expression->clone();
res->children.push_back(res->expression);
}
return res;
}
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
{
settings.ostr << (settings.hilite ? hilite_identifier : "");
settings.writeIdentifier(column_name);
settings.ostr << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_operator : "") << " = " << (settings.hilite ? hilite_none : "");
expression->formatImpl(settings, state, frame);
}
};
}

View File

@ -21,8 +21,6 @@ public:
ASTPtr clone() const override
{
const auto res = std::make_shared<ASTColumnDeclaration>(*this);
ASTPtr ptr{res};
res->children.clear();
if (type)
@ -37,7 +35,7 @@ public:
res->children.push_back(res->default_expression);
}
return ptr;
return res;
}
protected:

View File

@ -11,11 +11,7 @@ void ASTIdentifier::formatImplWithoutAlias(const FormatSettings & settings, Form
auto format_element = [&](const String & elem_name)
{
settings.ostr << (settings.hilite ? hilite_identifier : "");
WriteBufferFromOStream wb(settings.ostr, 32);
settings.writeIdentifier(elem_name, wb);
wb.next();
settings.writeIdentifier(elem_name);
settings.ostr << (settings.hilite ? hilite_none : "");
};

View File

@ -9,11 +9,7 @@ namespace DB
void ASTWithAlias::writeAlias(const String & name, const FormatSettings & settings) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " AS " << (settings.hilite ? hilite_alias : "");
WriteBufferFromOStream wb(settings.ostr, 32);
settings.writeIdentifier(name, wb);
wb.next();
settings.writeIdentifier(name);
settings.ostr << (settings.hilite ? hilite_none : "");
}
@ -25,8 +21,7 @@ void ASTWithAlias::formatImpl(const FormatSettings & settings, FormatState & sta
/// If we have previously output this node elsewhere in the query, now it is enough to output only the alias.
if (!state.printed_asts_with_alias.emplace(frame.current_select, alias).second)
{
WriteBufferFromOStream wb(settings.ostr, 32);
settings.writeIdentifier(alias, wb);
settings.writeIdentifier(alias);
return;
}
}

View File

@ -99,8 +99,10 @@ String IAST::getColumnName() const
}
void IAST::FormatSettings::writeIdentifier(const String & name, WriteBuffer & out) const
void IAST::FormatSettings::writeIdentifier(const String & name) const
{
WriteBufferFromOStream out(ostr, 32);
switch (identifier_quoting_style)
{
case IdentifierQuotingStyle::None:
@ -128,6 +130,8 @@ void IAST::FormatSettings::writeIdentifier(const String & name, WriteBuffer & ou
break;
}
}
out.next();
}
}

View File

@ -164,7 +164,7 @@ public:
nl_or_ws = one_line ? ' ' : '\n';
}
void writeIdentifier(const String & name, WriteBuffer & out) const;
void writeIdentifier(const String & name) const;
};
/// State. For example, a set of nodes can be remembered, which we already walk through.

View File

@ -1,11 +1,13 @@
#include <Parsers/ParserAlterQuery.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserPartition.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTAssignment.h>
#include <Parsers/parseDatabaseAndTableName.h>
@ -38,12 +40,17 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_name("NAME");
ParserKeyword s_delete_where("DELETE WHERE");
ParserKeyword s_update("UPDATE");
ParserKeyword s_where("WHERE");
ParserCompoundIdentifier parser_name;
ParserStringLiteral parser_string_literal;
ParserCompoundColumnDeclaration parser_col_decl;
ParserPartition parser_partition;
ParserExpression exp_elem;
ParserExpression parser_exp_elem;
ParserList parser_assignment_list(
std::make_unique<ParserAssignment>(), std::make_unique<ParserToken>(TokenType::Comma),
/* allow_empty = */ false);
if (s_add_column.ignore(pos, expected))
{
@ -195,11 +202,24 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
}
else if (s_delete_where.ignore(pos, expected))
{
if (!exp_elem.parse(pos, command->predicate, expected))
if (!parser_exp_elem.parse(pos, command->predicate, expected))
return false;
command->type = ASTAlterCommand::DELETE;
}
else if (s_update.ignore(pos, expected))
{
if (!parser_assignment_list.parse(pos, command->update_assignments, expected))
return false;
if (!s_where.ignore(pos, expected))
return false;
if (!parser_exp_elem.parse(pos, command->predicate, expected))
return false;
command->type = ASTAlterCommand::UPDATE;
}
else
return false;
@ -213,6 +233,8 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->children.push_back(command->partition);
if (command->predicate)
command->children.push_back(command->predicate);
if (command->update_assignments)
command->children.push_back(command->update_assignments);
return true;
}
@ -240,6 +262,33 @@ bool ParserAlterCommandList::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
}
bool ParserAssignment::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto assignment = std::make_shared<ASTAssignment>();
node = assignment;
ParserIdentifier p_identifier;
ParserToken s_equals(TokenType::Equals);
ParserExpression p_expression;
ASTPtr column;
if (!p_identifier.parse(pos, column, expected))
return false;
if (!s_equals.ignore(pos, expected))
return false;
if (!p_expression.parse(pos, assignment->expression, expected))
return false;
assignment->column_name = typeid_cast<const ASTIdentifier &>(*column).name;
if (assignment->expression)
assignment->children.push_back(assignment->expression);
return true;
}
bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto query = std::make_shared<ASTAlterQuery>();
@ -269,4 +318,5 @@ bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return true;
}
}

View File

@ -17,12 +17,13 @@ namespace DB
* [FETCH PARTITION partition FROM ...]
* [FREEZE PARTITION]
* [DELETE WHERE ...]
* [UPDATE col_name = expr, ... WHERE ...]
*/
class ParserAlterCommand : public IParserBase
class ParserAlterQuery : public IParserBase
{
protected:
const char * getName() const { return "ALTER command"; }
const char * getName() const { return "ALTER query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
};
@ -35,10 +36,19 @@ protected:
};
class ParserAlterQuery : public IParserBase
class ParserAlterCommand : public IParserBase
{
protected:
const char * getName() const { return "ALTER query"; }
const char * getName() const { return "ALTER command"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
};
/// Part of the UPDATE command of the form: col_name = expr
class ParserAssignment : public IParserBase
{
protected:
const char * getName() const { return "column assignment"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
};

View File

@ -78,6 +78,7 @@ namespace ErrorCodes
extern const int PART_IS_TEMPORARILY_LOCKED;
extern const int TOO_MANY_PARTS;
extern const int INCOMPATIBLE_COLUMNS;
extern const int CANNOT_UPDATE_COLUMN;
}

View File

@ -19,9 +19,7 @@
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
#include <DataStreams/ApplyingMutationsBlockInputStream.h>
#include <Parsers/ASTIdentifier.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/MutationsInterpreter.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/CompressedReadBufferFromFile.h>
#include <DataTypes/NestedUtils.h>
@ -30,8 +28,10 @@
#include <Common/interpolate.h>
#include <Common/typeid_cast.h>
#include <Common/localBackup.h>
#include <Common/createHardLink.h>
#include <Poco/File.h>
#include <Poco/DirectoryIterator.h>
#include <cmath>
#include <numeric>
@ -851,133 +851,21 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
}
static bool isStorageTouchedByMutation(
const StoragePtr & storage, const std::vector<MutationCommand> & commands, const Context & context)
{
if (commands.empty())
return false;
for (const MutationCommand & command : commands)
{
if (!command.predicate) /// The command touches all rows.
return true;
}
/// Execute `SELECT count() FROM storage WHERE predicate1 OR predicate2 OR ...` query.
/// The result is tne number of affected rows.
auto select = std::make_shared<ASTSelectQuery>();
select->select_expression_list = std::make_shared<ASTExpressionList>();
select->children.push_back(select->select_expression_list);
auto count_func = std::make_shared<ASTFunction>();
count_func->name = "count";
count_func->arguments = std::make_shared<ASTExpressionList>();
select->select_expression_list->children.push_back(count_func);
if (commands.size() == 1)
select->where_expression = commands[0].predicate;
else
{
auto coalesced_predicates = std::make_shared<ASTFunction>();
coalesced_predicates->name = "or";
coalesced_predicates->arguments = std::make_shared<ASTExpressionList>();
coalesced_predicates->children.push_back(coalesced_predicates->arguments);
for (const MutationCommand & command : commands)
coalesced_predicates->arguments->children.push_back(command.predicate);
select->where_expression = std::move(coalesced_predicates);
}
select->children.push_back(select->where_expression);
auto context_copy = context;
context_copy.getSettingsRef().merge_tree_uniform_read_distribution = 0;
context_copy.getSettingsRef().max_threads = 1;
InterpreterSelectQuery interpreter_select(select, context_copy, storage, QueryProcessingStage::Complete);
BlockInputStreamPtr in = interpreter_select.execute().in;
Block block = in->read();
if (!block.rows())
return false;
else if (block.rows() != 1)
throw Exception("count() expression returned " + toString(block.rows()) + " rows, not 1",
ErrorCodes::LOGICAL_ERROR);
auto count = (*block.getByName("count()").column)[0].get<UInt64>();
return count != 0;
}
static BlockInputStreamPtr createInputStreamWithMutatedData(
const StoragePtr & storage, std::vector<MutationCommand> commands, const Context & context)
{
auto select = std::make_shared<ASTSelectQuery>();
select->select_expression_list = std::make_shared<ASTExpressionList>();
select->children.push_back(select->select_expression_list);
for (const auto & column : storage->getColumns().getAllPhysical())
select->select_expression_list->children.push_back(std::make_shared<ASTIdentifier>(column.name));
/// For all commands that are in front of the list and are DELETE commands, we can push them down
/// to the SELECT statement and remove them from commands.
auto deletes_end = commands.begin();
for (; deletes_end != commands.end(); ++deletes_end)
{
if (deletes_end->type != MutationCommand::DELETE)
break;
}
std::vector<ASTPtr> predicates;
for (auto it = commands.begin(); it != deletes_end; ++it)
{
auto predicate = std::make_shared<ASTFunction>();
predicate->name = "not";
predicate->arguments = std::make_shared<ASTExpressionList>();
predicate->arguments->children.push_back(it->predicate);
predicate->children.push_back(predicate->arguments);
predicates.push_back(predicate);
}
commands.erase(commands.begin(), deletes_end);
if (!predicates.empty())
{
ASTPtr where_expression;
if (predicates.size() == 1)
where_expression = predicates[0];
else
{
auto coalesced_predicates = std::make_shared<ASTFunction>();
coalesced_predicates->name = "and";
coalesced_predicates->arguments = std::make_shared<ASTExpressionList>();
coalesced_predicates->children.push_back(coalesced_predicates->arguments);
coalesced_predicates->arguments->children = predicates;
where_expression = std::move(coalesced_predicates);
}
select->where_expression = where_expression;
select->children.push_back(where_expression);
}
InterpreterSelectQuery interpreter_select(select, context, storage);
BlockInputStreamPtr in = interpreter_select.execute().in;
if (!commands.empty())
in = std::make_shared<ApplyingMutationsBlockInputStream>(in, commands, context);
return in;
}
MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTemporaryPart(
const FuturePart & future_part,
const std::vector<MutationCommand> & commands,
const Context & context)
{
auto check_not_cancelled = [&]()
{
if (actions_blocker.isCancelled())
throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED);
return true;
};
check_not_cancelled();
if (future_part.parts.size() != 1)
throw Exception("Trying to mutate " + toString(future_part.parts.size()) + " parts, not one. "
"This is a bug.", ErrorCodes::LOGICAL_ERROR);
@ -991,7 +879,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
context_for_reading.getSettingsRef().merge_tree_uniform_read_distribution = 0;
context_for_reading.getSettingsRef().max_threads = 1;
if (!isStorageTouchedByMutation(storage_from_source_part, commands, context_for_reading))
MutationsInterpreter mutations_interpreter(storage_from_source_part, commands, context_for_reading);
if (!mutations_interpreter.isStorageTouchedByMutations())
{
LOG_TRACE(log, "Part " << source_part->name << " doesn't change up to mutation version " << future_part.part_info.mutation);
return data.cloneAndLoadDataPart(source_part, "tmp_clone_", future_part.part_info);
@ -1015,38 +905,105 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
source_part->bytes_on_disk,
static_cast<double>(source_part->bytes_on_disk) / data.getTotalActiveSizeInBytes());
auto in = createInputStreamWithMutatedData(storage_from_source_part, commands, context_for_reading);
Poco::File(new_part_tmp_path).createDirectories();
auto in = mutations_interpreter.execute();
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
if (in->getHeader().columns() == all_columns.size())
{
/// All columns are modified, proceed to write a new part from scratch.
if (data.hasPrimaryKey())
in = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(in, data.getPrimaryExpression()));
Poco::File(new_part_tmp_path).createDirectories();
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
MergeTreeDataPart::MinMaxIndex minmax_idx;
MergedBlockOutputStream out(data, new_part_tmp_path, all_columns, compression_settings);
MergeTreeDataPart::MinMaxIndex minmax_idx;
in->readPrefix();
out.writePrefix();
Block block;
while (!actions_blocker.isCancelled() && (block = in->read()))
while (check_not_cancelled() && (block = in->read()))
{
minmax_idx.update(block, data.minmax_idx_columns);
out.write(block);
}
if (actions_blocker.isCancelled())
throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED);
new_data_part->partition.assign(source_part->partition);
new_data_part->minmax_idx = std::move(minmax_idx);
in->readSuffix();
out.writeSuffixAndFinalizePart(new_data_part);
}
else
{
/// We will modify only some of the columns. Other columns and key values can be copied as-is.
/// TODO: check that we modify only non-key columns in this case.
NameSet files_to_skip = {"checksums.txt", "columns.txt"};
for (const auto & entry : in->getHeader())
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
{
String stream_name = IDataType::getFileNameForStream(entry.name, substream_path);
files_to_skip.insert(stream_name + ".bin");
files_to_skip.insert(stream_name + ".mrk");
};
IDataType::SubstreamPath stream_path;
entry.type->enumerateStreams(callback, stream_path);
}
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it(source_part->getFullPath()); dir_it != dir_end; ++dir_it)
{
if (files_to_skip.count(dir_it.name()))
continue;
Poco::Path destination(new_part_tmp_path);
destination.append(dir_it.name());
createHardLink(dir_it.path().toString(), destination.toString());
}
MergedColumnOnlyOutputStream out(data, in->getHeader(), new_part_tmp_path, /* sync = */ false, compression_settings, /* skip_offsets = */ false);
in->readPrefix();
out.writePrefix();
Block block;
while (check_not_cancelled() && (block = in->read()))
out.write(block);
in->readSuffix();
auto changed_checksums = out.writeSuffixAndGetChecksums();
new_data_part->checksums = source_part->checksums;
new_data_part->checksums.add(std::move(changed_checksums));
{
/// Write file with checksums.
WriteBufferFromFile out(new_part_tmp_path + "checksums.txt", 4096);
new_data_part->checksums.write(out);
}
new_data_part->columns = all_columns;
{
/// Write a file with a description of columns.
WriteBufferFromFile out(new_part_tmp_path + "columns.txt", 4096);
all_columns.writeText(out);
}
new_data_part->rows_count = source_part->rows_count;
new_data_part->marks_count = source_part->marks_count;
new_data_part->index = source_part->index;
new_data_part->partition.assign(source_part->partition);
new_data_part->minmax_idx = source_part->minmax_idx;
new_data_part->modification_time = time(nullptr);
new_data_part->bytes_on_disk = MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->getFullPath());
}
return new_data_part;
}

View File

@ -30,6 +30,13 @@ public:
{part}, column_names, query_info, context, max_block_size, num_streams, 0);
}
bool supportsIndexForIn() const override { return true; }
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) const override
{
return part->storage.mayBenefitFromIndexForIn(left_in_operand);
}
protected:
StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
: IStorage(part_->storage.getColumns()), part(part_)

View File

@ -1,12 +1,11 @@
#include <Storages/MutationCommands.h>
#include <Storages/IStorage.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Columns/FilterDescription.h>
#include <IO/Operators.h>
#include <Parsers/formatAST.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ParserAlterQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTAssignment.h>
#include <Common/typeid_cast.h>
@ -16,6 +15,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_MUTATION_COMMAND;
extern const int MULTIPLE_ASSIGNMENTS_TO_COLUMN;
}
std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command)
@ -28,6 +28,22 @@ std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command)
res.predicate = command->predicate;
return res;
}
else if (command->type == ASTAlterCommand::UPDATE)
{
MutationCommand res;
res.ast = command->ptr();
res.type = UPDATE;
res.predicate = command->predicate;
for (const ASTPtr & assignment_ast : command->update_assignments->children)
{
const auto & assignment = typeid_cast<const ASTAssignment &>(*assignment_ast);
auto insertion = res.column_to_update_expression.emplace(assignment.column_name, assignment.expression);
if (!insertion.second)
throw Exception("Multiple assignments in the single statement to column `" + assignment.column_name + "`",
ErrorCodes::MULTIPLE_ASSIGNMENTS_TO_COLUMN);
}
return res;
}
else
return {};
}
@ -41,33 +57,6 @@ std::shared_ptr<ASTAlterCommandList> MutationCommands::ast() const
return res;
}
void MutationCommands::validate(const IStorage & table, const Context & context) const
{
auto all_columns = table.getColumns().getAll();
for (const MutationCommand & command : *this)
{
switch (command.type)
{
case MutationCommand::DELETE:
{
auto actions = ExpressionAnalyzer(command.predicate, context, {}, all_columns).getActions(true);
/// Try executing the resulting actions on the table sample block to detect malformed queries.
auto table_sample_block = table.getSampleBlock();
actions->execute(table_sample_block);
const ColumnWithTypeAndName & predicate_column = actions->getSampleBlock().getByName(
command.predicate->getColumnName());
checkColumnCanBeUsedAsFilter(predicate_column);
break;
}
default:
throw Exception("Bad mutation type: " + toString<int>(command.type), ErrorCodes::LOGICAL_ERROR);
}
}
}
void MutationCommands::writeText(WriteBuffer & out) const
{
std::stringstream commands_ss;

View File

@ -2,6 +2,7 @@
#include <Parsers/ASTAlterQuery.h>
#include <optional>
#include <unordered_map>
namespace DB
@ -20,12 +21,15 @@ struct MutationCommand
{
EMPTY, /// Not used.
DELETE,
UPDATE,
};
Type type = EMPTY;
ASTPtr predicate;
std::unordered_map<String, ASTPtr> column_to_update_expression;
static std::optional<MutationCommand> parse(ASTAlterCommand * command);
};
@ -34,8 +38,6 @@ class MutationCommands : public std::vector<MutationCommand>
public:
std::shared_ptr<ASTAlterCommandList> ast() const;
void validate(const IStorage & table, const Context & context) const;
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
};

View File

@ -305,15 +305,17 @@ struct CurrentlyMergingPartsTagger
void StorageMergeTree::mutate(const MutationCommands & commands, const Context &)
{
MergeTreeMutationEntry entry(commands, full_path, data.insert_increment.get());
String file_name;
{
std::lock_guard lock(currently_merging_mutex);
Int64 version = increment.get();
entry.commit(version);
file_name = entry.file_name;
current_mutations_by_version.emplace(version, std::move(entry));
}
LOG_INFO(log, "Added mutation: " << entry.file_name);
LOG_INFO(log, "Added mutation: " << file_name);
background_task_handle->wake();
}

View File

@ -4166,7 +4166,6 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const
///
/// After all needed parts are mutated (i.e. all active parts have the mutation version greater than
/// the version of this mutation), the mutation is considered done and can be deleted.
/// TODO: add a way to track the progress of mutations and a process to clean old mutations.
ReplicatedMergeTreeMutationEntry entry;
entry.source_replica = replica_name;

View File

@ -43,6 +43,8 @@ ${CLICKHOUSE_CLIENT} --query="SELECT d, x, s, m FROM test.mutations ORDER BY d,
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, command, block_numbers.partition_id, block_numbers.number, parts_to_do, is_done \
FROM system.mutations WHERE table = 'mutations' ORDER BY mutation_id"
${CLICKHOUSE_CLIENT} --query="DROP TABLE test.mutations"
${CLICKHOUSE_CLIENT} --query="SELECT '*** Test mutations cleaner ***'"
@ -69,5 +71,4 @@ sleep 0.1
# Check that the first mutation is cleaned
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, command, is_done FROM system.mutations WHERE table = 'mutations_cleaner' ORDER BY mutation_id"
${CLICKHOUSE_CLIENT} --query="DROP TABLE test.mutations"
${CLICKHOUSE_CLIENT} --query="DROP TABLE test.mutations_cleaner"

View File

@ -0,0 +1,24 @@
*** Test expected failures ***
Updating partition key should fail
Updating primary key should fail
Updating MATERIALIZED column should fail
Updating with non-UInt8 predicate should fail
*** Test updating according to a predicate ***
2000-01-01 123 aaa 101
2000-01-01 234 cde 2
*** Test several UPDATE commands with common subexpressions ***
2000-01-01 123 abc 26
*** Test predicates with IN operator ***
2000-01-01 234 cdeccc 20
2000-01-01 345 fghccc 30
2000-01-01 456 iii 40
*** Test UPDATE of columns that DELETE depends on ***
2000-01-01 234 cde 30
*** Test complex mixture of UPDATEs and DELETEs ***
2000-01-01 456 ijk_materialized_47 40
*** Test updating columns that MATERIALIZED columns depend on ***
Updating column that MATERIALIZED key column depends on should fail
17 materialized_24
27 materialized_34
30 materialized_37
40 materialized_47

View File

@ -0,0 +1,134 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
. $CURDIR/mergetree_mutations.lib
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS test.alter_update"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE test.alter_update \
(d Date, key UInt32, value1 String, value2 UInt64, materialized_value String MATERIALIZED concat('materialized_', toString(value2 + 7))) \
ENGINE MergeTree ORDER BY key PARTITION BY toYYYYMM(d)"
${CLICKHOUSE_CLIENT} --query="SELECT '*** Test expected failures ***'"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update UPDATE d = today() WHERE 1" 2>/dev/null || echo "Updating partition key should fail"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update UPDATE key = 1 WHERE 1" 2>/dev/null || echo "Updating primary key should fail"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update UPDATE materialized_value = 'aaa' WHERE 1" 2>/dev/null || echo "Updating MATERIALIZED column should fail"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update UPDATE value1 = 'aaa' WHERE 'string'" 2>/dev/null || echo "Updating with non-UInt8 predicate should fail"
${CLICKHOUSE_CLIENT} --query="SELECT '*** Test updating according to a predicate ***'"
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.alter_update VALUES \
('2000-01-01', 123, 'abc', 1), \
('2000-01-01', 234, 'cde', 2)"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update UPDATE value1 = 'aaa', value2 = value2 + 100 WHERE key < 200"
wait_for_mutation "alter_update" "mutation_2.txt"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.alter_update ORDER BY key"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update DROP PARTITION 200001"
${CLICKHOUSE_CLIENT} --query="SELECT '*** Test several UPDATE commands with common subexpressions ***'"
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.alter_update VALUES ('2000-01-01', 123, 'abc', 49)"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update \
UPDATE value2 = (value2 + 1) / 2 WHERE 1, \
UPDATE value2 = value2 + 1 WHERE 1"
wait_for_mutation "alter_update" "mutation_4.txt"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.alter_update ORDER BY key"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update DROP PARTITION 200001"
${CLICKHOUSE_CLIENT} --query="SELECT '*** Test predicates with IN operator ***'"
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.alter_update VALUES \
('2000-01-01', 123, 'abc', 10), \
('2000-01-01', 234, 'cde', 20), \
('2000-01-01', 345, 'fgh', 30), \
('2000-01-01', 456, 'ijk', 40)"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update \
DELETE WHERE key IN (SELECT toUInt32(arrayJoin([121, 122, 123]))), \
UPDATE value1 = concat(value1, 'ccc') WHERE value2 IN (20, 30), \
UPDATE value1 = 'iii' WHERE value2 IN (SELECT toUInt64(40))"
wait_for_mutation "alter_update" "mutation_6.txt"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.alter_update ORDER BY key"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update DROP PARTITION 200001"
${CLICKHOUSE_CLIENT} --query="SELECT '*** Test UPDATE of columns that DELETE depends on ***'"
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.alter_update VALUES \
('2000-01-01', 123, 'abc', 10), \
('2000-01-01', 234, 'cde', 20)"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update \
UPDATE value2 = value2 + 10 WHERE 1, \
DELETE WHERE value2 = 20"
wait_for_mutation "alter_update" "mutation_8.txt"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.alter_update ORDER BY key"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update DROP PARTITION 200001"
${CLICKHOUSE_CLIENT} --query="SELECT '*** Test complex mixture of UPDATEs and DELETEs ***'"
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.alter_update VALUES \
('2000-01-01', 123, 'abc', 10), \
('2000-01-01', 234, 'cde', 20), \
('2000-01-01', 345, 'fgh', 30), \
('2000-01-01', 456, 'ijk', 40)"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update \
DELETE WHERE value2 IN (8, 9, 10), \
UPDATE value2 = value2 + 10 WHERE value2 <= 10, \
DELETE WHERE length(value1) + value2 = 23, \
DELETE WHERE materialized_value = 'materialized_37', \
UPDATE value1 = concat(value1, '_', materialized_value) WHERE key = 456"
wait_for_mutation "alter_update" "mutation_10.txt"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.alter_update ORDER BY key"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update DROP PARTITION 200001"
${CLICKHOUSE_CLIENT} --query="SELECT '*** Test updating columns that MATERIALIZED columns depend on ***'"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS test.materialized_key"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE test.materialized_key \
(key UInt32 MATERIALIZED value + 1, value UInt32) \
ENGINE MergeTree ORDER BY key"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.materialized_key UPDATE value = 1 WHERE 1" 2>/dev/null || echo "Updating column that MATERIALIZED key column depends on should fail"
${CLICKHOUSE_CLIENT} --query="DROP TABLE test.materialized_key"
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.alter_update VALUES \
('2000-01-01', 123, 'abc', 10), \
('2000-01-01', 234, 'cde', 20), \
('2000-01-01', 345, 'fgh', 30), \
('2000-01-01', 456, 'ijk', 40)"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update \
UPDATE value2 = value2 + 7 WHERE value2 <= 20"
wait_for_mutation "alter_update" "mutation_12.txt"
${CLICKHOUSE_CLIENT} --query="SELECT value2, materialized_value FROM test.alter_update ORDER BY key"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.alter_update DROP PARTITION 200001"
${CLICKHOUSE_CLIENT} --query="DROP TABLE test.alter_update"

View File

@ -8,7 +8,7 @@ function wait_for_mutation()
for i in {1..100}
do
sleep 0.1
if [[ $(${CLICKHOUSE_CLIENT} --query="SELECT is_done FROM system.mutations WHERE table='$table' AND mutation_id='$mutation_id'") -eq 1 ]]; then
if [[ $(${CLICKHOUSE_CLIENT} --query="SELECT min(is_done) FROM system.mutations WHERE table='$table' AND mutation_id='$mutation_id'") -eq 1 ]]; then
break
fi

View File

@ -230,13 +230,19 @@ The functionality is in beta stage and is available starting with the 1.1.54388
Existing tables are ready for mutations as-is (no conversion necessary), but after the first mutation is applied to a table, its metadata format becomes incompatible with previous server versions and falling back to a previous version becomes impossible.
At the moment the `ALTER DELETE` command is available:
Currently available commands:
```sql
ALTER TABLE [db.]table DELETE WHERE expr
ALTER TABLE [db.]table DELETE WHERE filter_expr
```
The expression `expr` must be of UInt8 type. The query deletes rows for which this expression evaluates to a non-zero value.
The `filter_expr` must be of type UInt8. The query deletes rows in the table for which this expression takes a non-zero value.
```sql
ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr
```
The `filter_expr` must be of type UInt8. This query updates values of specified columns to the values of corresponding expressions in rows for which the `filter_expr` takes a non-zero value. Values are casted to the column type using the `CAST` operator. Updating columns that are used in the calculation of the primary or the partition key is not supported.
One query can contain several commands separated by commas.

View File

@ -229,13 +229,19 @@ ALTER TABLE [db.]table FETCH PARTITION 'name' FROM 'path-in-zookeeper'
Конвертировать существующие таблицы для работы с мутациями не нужно. Но после применения первой мутации формат данных таблицы становится несовместимым с предыдущими версиями и откатиться на предыдущую версию уже не получится.
На данный момент доступна команда `ALTER DELETE`:
На данный момент доступны команды:
```sql
ALTER TABLE [db.]table DELETE WHERE expr
ALTER TABLE [db.]table DELETE WHERE filter_expr
```
Выражение `expr` должно иметь тип UInt8. Запрос удаляет строки таблицы, для которых это выражение принимает ненулевое значение.
Выражение `filter_expr` должно иметь тип UInt8. Запрос удаляет строки таблицы, для которых это выражение принимает ненулевое значение.
```sql
ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr
```
Выражение `filter_expr` должно иметь тип UInt8. Запрос изменяет значение указанных столбцов на вычисленное значение соответствующих выражений в каждой строке, для которой `filter_expr` пронимает ненулевое значение. Вычисленные значения преобразуются к типу столбца с помощью оператора `CAST`. Изменение столбцов, которые используются при вычислении первичного ключа или ключа партиционирования, не поддерживается.
В одном запросе можно указать несколько команд через запятую.