execute ALTER UPDATE [#CLICKHOUSE-13]

This commit is contained in:
Alexey Zatelepin 2018-08-09 22:17:55 +03:00
parent 8e7c58dd75
commit dc2a4c21e9
6 changed files with 123 additions and 65 deletions

View File

@ -1,6 +1,9 @@
#include <DataStreams/ApplyingMutationsBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <IO/WriteHelpers.h>
@ -9,60 +12,54 @@ namespace DB
{
ApplyingMutationsBlockInputStream::ApplyingMutationsBlockInputStream(
const BlockInputStreamPtr & input, const std::vector<MutationCommand> & commands, const Context & context)
const BlockInputStreamPtr & input, const MutationCommand & command, const Context & context)
{
children.push_back(input);
if (commands.empty())
switch (command.type)
{
impl = input;
return;
case MutationCommand::DELETE:
{
auto negated_predicate = makeASTFunction("not", command.predicate);
auto predicate_expr = ExpressionAnalyzer(
negated_predicate, context, nullptr, input->getHeader().getNamesAndTypesList())
.getActions(false);
String col_name = negated_predicate->getColumnName();
impl = std::make_shared<FilterBlockInputStream>(input, predicate_expr, col_name);
break;
}
/// Create a total predicate for all mutations and then pass it to a single FilterBlockInputStream
/// because ExpressionAnalyzer won't detect that some columns in the block are already calculated
/// and will try to calculate them twice. This works as long as all mutations are DELETE.
/// TODO: fix ExpressionAnalyzer.
std::vector<ASTPtr> predicates;
for (const MutationCommand & cmd : commands)
case MutationCommand::UPDATE:
{
switch (cmd.type)
auto new_column_exprs = std::make_shared<ASTExpressionList>();
std::unordered_map<String, ASTPtr> column_to_updated;
for (const auto & pair : command.column_to_update_expression)
{
case MutationCommand::DELETE:
{
auto predicate = std::make_shared<ASTFunction>();
predicate->name = "not";
predicate->arguments = std::make_shared<ASTExpressionList>();
predicate->arguments->children.push_back(cmd.predicate);
predicate->children.push_back(predicate->arguments);
predicates.push_back(predicate);
break;
auto new_col = makeASTFunction("CAST",
makeASTFunction("if",
command.predicate,
pair.second->clone(),
std::make_shared<ASTIdentifier>(pair.first)),
std::make_shared<ASTLiteral>(input->getHeader().getByName(pair.first).type->getName()));
new_column_exprs->children.push_back(new_col);
column_to_updated.emplace(pair.first, new_col);
}
default:
throw Exception("Unsupported mutation cmd type: " + toString<int>(cmd.type),
ErrorCodes::LOGICAL_ERROR);
}
}
ASTPtr total_predicate;
if (predicates.size() == 1)
total_predicate = predicates[0];
else
{
auto and_func = std::make_shared<ASTFunction>();
and_func->name = "and";
and_func->arguments = std::make_shared<ASTExpressionList>();
and_func->children.push_back(and_func->arguments);
and_func->arguments->children = predicates;
total_predicate = and_func;
}
auto updating_expr = ExpressionAnalyzer(
new_column_exprs, context, nullptr, input->getHeader().getNamesAndTypesList())
.getActions(false);
auto predicate_expr = ExpressionAnalyzer(
total_predicate, context, nullptr, input->getHeader().getNamesAndTypesList()).getActions(false);
String col_name = total_predicate->getColumnName();
impl = std::make_shared<FilterBlockInputStream>(input, predicate_expr, col_name);
/// Calling getColumnName() for updating expressions after the ExpressionAnalyzer pass, because
/// it can change the AST of the expressions.
for (const auto & pair : column_to_updated)
updating_expr->add(ExpressionAction::copyColumn(pair.second->getColumnName(), pair.first, /* can_replace = */ true));
impl = std::make_shared<ExpressionBlockInputStream>(input, updating_expr);
break;
}
default:
throw Exception("Unsupported mutation command type: " + toString<int>(command.type),
ErrorCodes::LOGICAL_ERROR);
}
}
Block ApplyingMutationsBlockInputStream::getHeader() const

View File

@ -7,14 +7,14 @@
namespace DB
{
/// A stream that pulls the blocks from `input` and executes mutation commands on these blocks
/// A stream that pulls the blocks from `input` and executes a mutation command on these blocks
/// (see Storages/MutationCommands.h for a full list).
/// Example: if mutation commands contain ALTER DELETE, this stream will delete rows that satisfy the predicate specified in the command.
class ApplyingMutationsBlockInputStream : public IProfilingBlockInputStream
{
public:
ApplyingMutationsBlockInputStream(
const BlockInputStreamPtr & input, const std::vector<MutationCommand> & commands, const Context & context);
const BlockInputStreamPtr & input, const MutationCommand & command, const Context & context);
String getName() const override { return "ApplyingMutations"; }

View File

@ -106,12 +106,13 @@ ExpressionAction ExpressionAction::removeColumn(const std::string & removed_name
return a;
}
ExpressionAction ExpressionAction::copyColumn(const std::string & from_name, const std::string & to_name)
ExpressionAction ExpressionAction::copyColumn(const std::string & from_name, const std::string & to_name, bool can_replace)
{
ExpressionAction a;
a.type = COPY_COLUMN;
a.source_name = from_name;
a.result_name = to_name;
a.can_replace = can_replace;
return a;
}
@ -284,8 +285,23 @@ void ExpressionAction::prepare(Block & sample_block)
case COPY_COLUMN:
{
result_type = sample_block.getByName(source_name).type;
sample_block.insert(ColumnWithTypeAndName(sample_block.getByName(source_name).column, result_type, result_name));
const auto & source = sample_block.getByName(source_name);
result_type = source.type;
if (sample_block.has(result_name))
{
if (can_replace)
{
auto & result = sample_block.getByName(result_name);
result.type = result_type;
result.column = source.column;
}
else
throw Exception("Column '" + result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);
}
else
sample_block.insert(ColumnWithTypeAndName(source.column, result_type, result_name));
break;
}
@ -333,7 +349,7 @@ void ExpressionAction::execute(Block & block, std::unordered_map<std::string, si
if (!block.has(source_name))
throw Exception("Not found column '" + source_name + "'. There are columns: " + block.dumpNames(), ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
if (type == ADD_COLUMN || type == COPY_COLUMN || type == APPLY_FUNCTION)
if (type == ADD_COLUMN || (type == COPY_COLUMN && !can_replace) || type == APPLY_FUNCTION)
if (block.has(result_name))
throw Exception("Column '" + result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);
@ -459,7 +475,15 @@ void ExpressionAction::execute(Block & block, std::unordered_map<std::string, si
break;
case COPY_COLUMN:
block.insert({ block.getByName(source_name).column, result_type, result_name });
if (can_replace && block.has(result_name))
{
auto & result = block.getByName(result_name);
result.type = result_type;
result.column = block.getByName(source_name).column;
}
else
block.insert({ block.getByName(source_name).column, result_type, result_name });
break;
default:
@ -496,6 +520,8 @@ std::string ExpressionAction::toString() const
case COPY_COLUMN:
ss << "COPY " << result_name << " = " << source_name;
if (can_replace)
ss << " (can replace)";
break;
case APPLY_FUNCTION:
@ -602,9 +628,6 @@ void ExpressionActions::add(const ExpressionAction & action)
void ExpressionActions::addImpl(ExpressionAction action, Names & new_names)
{
if (sample_block.has(action.result_name))
return;
if (action.result_name != "")
new_names.push_back(action.result_name);
new_names.insert(new_names.end(), action.array_joined_columns.begin(), action.array_joined_columns.end());

View File

@ -70,6 +70,9 @@ public:
std::string result_name;
DataTypePtr result_type;
/// If COPY_COLUMN can replace the result column.
bool can_replace = false;
/// For conditional projections (projections on subset of rows)
std::string row_projection_column;
bool is_row_projection_complementary = false;
@ -103,7 +106,7 @@ public:
const std::string & row_projection_column,
bool is_row_projection_complementary);
static ExpressionAction removeColumn(const std::string & removed_name);
static ExpressionAction copyColumn(const std::string & from_name, const std::string & to_name);
static ExpressionAction copyColumn(const std::string & from_name, const std::string & to_name, bool can_replace = false);
static ExpressionAction project(const NamesWithAliases & projected_columns_);
static ExpressionAction project(const Names & projected_columns_);
static ExpressionAction arrayJoin(const NameSet & array_joined_columns, bool array_join_is_left, const Context & context);

View File

@ -833,7 +833,9 @@ static bool isStorageTouchedByMutation(
}
/// Execute `SELECT count() FROM storage WHERE predicate1 OR predicate2 OR ...` query.
/// The result is tne number of affected rows.
/// The result can differ from tne number of affected rows (e.g. if there is an UPDATE command that
/// changes how many rows satisfy the predicates of the subsequent commands).
/// But we can be sure that if count = 0, then no rows will be touched.
auto select = std::make_shared<ASTSelectQuery>();
@ -933,8 +935,8 @@ static BlockInputStreamPtr createInputStreamWithMutatedData(
InterpreterSelectQuery interpreter_select(select, context, storage);
BlockInputStreamPtr in = interpreter_select.execute().in;
if (!commands.empty())
in = std::make_shared<ApplyingMutationsBlockInputStream>(in, commands, context);
for (const auto & command : commands)
in = std::make_shared<ApplyingMutationsBlockInputStream>(in, command, context);
return in;
}

View File

@ -19,6 +19,7 @@ namespace ErrorCodes
{
extern const int UNKNOWN_MUTATION_COMMAND;
extern const int MULTIPLE_ASSIGNMENTS_TO_COLUMN;
extern const int NO_SUCH_COLUMN_IN_TABLE;
}
std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command)
@ -64,21 +65,53 @@ void MutationCommands::validate(const IStorage & table, const Context & context)
{
auto all_columns = table.getColumns().getAll();
auto validate_predicate = [&](const ASTPtr & predicate)
{
auto actions = ExpressionAnalyzer(predicate, context, {}, all_columns).getActions(true);
/// Try executing the resulting actions on the table sample block to detect malformed queries.
auto table_sample_block = table.getSampleBlock();
actions->execute(table_sample_block);
const ColumnWithTypeAndName & predicate_column = actions->getSampleBlock().getByName(
predicate->getColumnName());
checkColumnCanBeUsedAsFilter(predicate_column);
};
for (const MutationCommand & command : *this)
{
switch (command.type)
{
case MutationCommand::DELETE:
{
auto actions = ExpressionAnalyzer(command.predicate, context, {}, all_columns).getActions(true);
validate_predicate(command.predicate);
break;
}
case MutationCommand::UPDATE:
{
/// TODO: better and more thorough validation.
validate_predicate(command.predicate);
/// Try executing the resulting actions on the table sample block to detect malformed queries.
auto table_sample_block = table.getSampleBlock();
actions->execute(table_sample_block);
for (const auto & pair : command.column_to_update_expression)
{
const String & column_name = pair.first;
auto found = false;
for (const auto & col : table.getColumns().ordinary)
{
if (col.name == column_name)
{
found = true;
break;
}
}
/// TODO: separate error message for the case when the query tries to update a
/// MATERIALIZED column.
if (!found)
throw Exception("There is no updateable column " + column_name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
}
const ColumnWithTypeAndName & predicate_column = actions->getSampleBlock().getByName(
command.predicate->getColumnName());
checkColumnCanBeUsedAsFilter(predicate_column);
break;
}
default: