2018-09-03 13:36:58 +00:00
|
|
|
#include <Interpreters/MutationsInterpreter.h>
|
2018-11-08 15:43:14 +00:00
|
|
|
#include <Interpreters/SyntaxAnalyzer.h>
|
2018-09-03 13:36:58 +00:00
|
|
|
#include <Interpreters/InterpreterSelectQuery.h>
|
2019-05-03 02:00:57 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeData.h>
|
2018-09-03 13:36:58 +00:00
|
|
|
#include <DataStreams/FilterBlockInputStream.h>
|
|
|
|
#include <DataStreams/ExpressionBlockInputStream.h>
|
2018-09-04 11:38:41 +00:00
|
|
|
#include <DataStreams/CreatingSetsBlockInputStream.h>
|
2018-09-03 13:36:58 +00:00
|
|
|
#include <DataStreams/MaterializingBlockInputStream.h>
|
2018-09-04 13:45:39 +00:00
|
|
|
#include <DataStreams/NullBlockInputStream.h>
|
2018-09-03 13:36:58 +00:00
|
|
|
#include <Parsers/ASTIdentifier.h>
|
|
|
|
#include <Parsers/ASTFunction.h>
|
|
|
|
#include <Parsers/ASTLiteral.h>
|
|
|
|
#include <Parsers/ASTExpressionList.h>
|
|
|
|
#include <Parsers/ASTSelectQuery.h>
|
|
|
|
#include <Parsers/formatAST.h>
|
|
|
|
#include <IO/WriteHelpers.h>
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int UNKNOWN_MUTATION_COMMAND;
|
2018-09-04 13:45:39 +00:00
|
|
|
extern const int NO_SUCH_COLUMN_IN_TABLE;
|
|
|
|
extern const int CANNOT_UPDATE_COLUMN;
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bool MutationsInterpreter::isStorageTouchedByMutations() const
|
|
|
|
{
|
|
|
|
if (commands.empty())
|
|
|
|
return false;
|
|
|
|
|
|
|
|
for (const MutationCommand & command : commands)
|
|
|
|
{
|
|
|
|
if (!command.predicate) /// The command touches all rows.
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Execute `SELECT count() FROM storage WHERE predicate1 OR predicate2 OR ...` query.
|
|
|
|
/// The result can differ from tne number of affected rows (e.g. if there is an UPDATE command that
|
|
|
|
/// changes how many rows satisfy the predicates of the subsequent commands).
|
|
|
|
/// But we can be sure that if count = 0, then no rows will be touched.
|
|
|
|
|
|
|
|
auto select = std::make_shared<ASTSelectQuery>();
|
|
|
|
|
2019-04-09 14:22:35 +00:00
|
|
|
select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
|
2018-09-03 13:36:58 +00:00
|
|
|
auto count_func = std::make_shared<ASTFunction>();
|
|
|
|
count_func->name = "count";
|
|
|
|
count_func->arguments = std::make_shared<ASTExpressionList>();
|
2019-04-09 14:22:35 +00:00
|
|
|
select->select()->children.push_back(count_func);
|
2018-09-03 13:36:58 +00:00
|
|
|
|
|
|
|
if (commands.size() == 1)
|
2019-04-09 14:22:35 +00:00
|
|
|
select->setExpression(ASTSelectQuery::Expression::WHERE, commands[0].predicate->clone());
|
2018-09-03 13:36:58 +00:00
|
|
|
else
|
|
|
|
{
|
|
|
|
auto coalesced_predicates = std::make_shared<ASTFunction>();
|
|
|
|
coalesced_predicates->name = "or";
|
|
|
|
coalesced_predicates->arguments = std::make_shared<ASTExpressionList>();
|
|
|
|
coalesced_predicates->children.push_back(coalesced_predicates->arguments);
|
|
|
|
|
|
|
|
for (const MutationCommand & command : commands)
|
2019-01-23 13:27:01 +00:00
|
|
|
coalesced_predicates->arguments->children.push_back(command.predicate->clone());
|
2018-09-03 13:36:58 +00:00
|
|
|
|
2019-04-09 14:22:35 +00:00
|
|
|
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(coalesced_predicates));
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
auto context_copy = context;
|
|
|
|
context_copy.getSettingsRef().merge_tree_uniform_read_distribution = 0;
|
|
|
|
context_copy.getSettingsRef().max_threads = 1;
|
|
|
|
|
2019-06-14 19:27:53 +00:00
|
|
|
BlockInputStreamPtr in = InterpreterSelectQuery(select, context_copy, storage, SelectQueryOptions().ignoreLimits()).execute().in;
|
2018-09-03 13:36:58 +00:00
|
|
|
|
|
|
|
Block block = in->read();
|
|
|
|
if (!block.rows())
|
|
|
|
return false;
|
|
|
|
else if (block.rows() != 1)
|
|
|
|
throw Exception("count() expression returned " + toString(block.rows()) + " rows, not 1",
|
|
|
|
ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
auto count = (*block.getByName("count()").column)[0].get<UInt64>();
|
|
|
|
return count != 0;
|
|
|
|
}
|
|
|
|
|
2018-09-04 13:45:39 +00:00
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
static NameSet getKeyColumns(const StoragePtr & storage)
|
2018-09-04 13:45:39 +00:00
|
|
|
{
|
2019-05-03 02:00:57 +00:00
|
|
|
const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get());
|
|
|
|
if (!merge_tree_data)
|
2018-09-07 15:44:51 +00:00
|
|
|
return {};
|
2018-09-04 13:45:39 +00:00
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
NameSet key_columns;
|
|
|
|
|
2018-11-06 18:25:36 +00:00
|
|
|
if (merge_tree_data->partition_key_expr)
|
|
|
|
for (const String & col : merge_tree_data->partition_key_expr->getRequiredColumns())
|
2018-09-07 15:44:51 +00:00
|
|
|
key_columns.insert(col);
|
|
|
|
|
2018-11-09 19:01:39 +00:00
|
|
|
auto sorting_key_expr = merge_tree_data->sorting_key_expr;
|
2018-10-12 19:00:43 +00:00
|
|
|
if (sorting_key_expr)
|
|
|
|
for (const String & col : sorting_key_expr->getRequiredColumns())
|
2018-09-07 15:44:51 +00:00
|
|
|
key_columns.insert(col);
|
2018-11-09 19:01:39 +00:00
|
|
|
/// We don't process sample_by_ast separately because it must be among the primary key columns.
|
2018-09-07 15:44:51 +00:00
|
|
|
|
|
|
|
if (!merge_tree_data->merging_params.sign_column.empty())
|
|
|
|
key_columns.insert(merge_tree_data->merging_params.sign_column);
|
|
|
|
|
2018-10-11 14:53:23 +00:00
|
|
|
if (!merge_tree_data->merging_params.version_column.empty())
|
|
|
|
key_columns.insert(merge_tree_data->merging_params.version_column);
|
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
return key_columns;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void validateUpdateColumns(
|
|
|
|
const StoragePtr & storage, const NameSet & updated_columns,
|
|
|
|
const std::unordered_map<String, Names> & column_to_affected_materialized)
|
|
|
|
{
|
|
|
|
NameSet key_columns = getKeyColumns(storage);
|
|
|
|
|
|
|
|
for (const String & column_name : updated_columns)
|
|
|
|
{
|
|
|
|
auto found = false;
|
2019-03-14 15:20:51 +00:00
|
|
|
for (const auto & col : storage->getColumns().getOrdinary())
|
2018-09-04 13:45:39 +00:00
|
|
|
{
|
2018-09-07 15:44:51 +00:00
|
|
|
if (col.name == column_name)
|
|
|
|
{
|
|
|
|
found = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2018-09-04 13:45:39 +00:00
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
if (!found)
|
|
|
|
{
|
2019-03-14 15:20:51 +00:00
|
|
|
for (const auto & col : storage->getColumns().getMaterialized())
|
2018-09-04 13:45:39 +00:00
|
|
|
{
|
|
|
|
if (col.name == column_name)
|
2019-06-15 12:06:22 +00:00
|
|
|
throw Exception("Cannot UPDATE materialized column " + backQuote(column_name), ErrorCodes::CANNOT_UPDATE_COLUMN);
|
2018-09-04 13:45:39 +00:00
|
|
|
}
|
|
|
|
|
2019-06-15 12:06:22 +00:00
|
|
|
throw Exception("There is no column " + backQuote(column_name) + " in table", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
|
2018-09-07 15:44:51 +00:00
|
|
|
}
|
2018-09-04 13:45:39 +00:00
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
if (key_columns.count(column_name))
|
2019-06-15 12:06:22 +00:00
|
|
|
throw Exception("Cannot UPDATE key column " + backQuote(column_name), ErrorCodes::CANNOT_UPDATE_COLUMN);
|
2018-09-07 15:44:51 +00:00
|
|
|
|
|
|
|
auto materialized_it = column_to_affected_materialized.find(column_name);
|
|
|
|
if (materialized_it != column_to_affected_materialized.end())
|
|
|
|
{
|
|
|
|
for (const String & materialized : materialized_it->second)
|
|
|
|
{
|
|
|
|
if (key_columns.count(materialized))
|
2019-06-15 12:06:22 +00:00
|
|
|
throw Exception("Updated column " + backQuote(column_name) + " affects MATERIALIZED column "
|
|
|
|
+ backQuote(materialized) + ", which is a key column. Cannot UPDATE it.",
|
2018-09-07 15:44:51 +00:00
|
|
|
ErrorCodes::CANNOT_UPDATE_COLUMN);
|
2018-09-04 13:45:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void MutationsInterpreter::prepare(bool dry_run)
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
|
|
|
if (is_prepared)
|
2018-09-04 13:45:39 +00:00
|
|
|
throw Exception("MutationsInterpreter is already prepared. It is a bug.", ErrorCodes::LOGICAL_ERROR);
|
2018-09-03 13:36:58 +00:00
|
|
|
|
|
|
|
if (commands.empty())
|
|
|
|
throw Exception("Empty mutation commands list", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
const ColumnsDescription & columns_desc = storage->getColumns();
|
2019-05-05 11:34:10 +00:00
|
|
|
const IndicesDescription & indices_desc = storage->getIndices();
|
2018-09-07 15:44:51 +00:00
|
|
|
NamesAndTypesList all_columns = columns_desc.getAllPhysical();
|
|
|
|
|
|
|
|
NameSet updated_columns;
|
|
|
|
for (const MutationCommand & command : commands)
|
|
|
|
{
|
|
|
|
for (const auto & kv : command.column_to_update_expression)
|
|
|
|
updated_columns.insert(kv.first);
|
|
|
|
}
|
|
|
|
|
2019-04-17 17:07:07 +00:00
|
|
|
/// We need to know which columns affect which MATERIALIZED columns and data skipping indices
|
|
|
|
/// to recalculate them if dependencies are updated.
|
2018-09-07 15:44:51 +00:00
|
|
|
std::unordered_map<String, Names> column_to_affected_materialized;
|
2019-04-17 17:07:07 +00:00
|
|
|
NameSet affected_indices_columns;
|
2018-09-07 15:44:51 +00:00
|
|
|
if (!updated_columns.empty())
|
|
|
|
{
|
2019-03-14 15:20:51 +00:00
|
|
|
for (const auto & column : columns_desc)
|
2018-09-07 15:44:51 +00:00
|
|
|
{
|
2019-03-14 15:20:51 +00:00
|
|
|
if (column.default_desc.kind == ColumnDefaultKind::Materialized)
|
2018-09-07 15:44:51 +00:00
|
|
|
{
|
2019-03-14 15:20:51 +00:00
|
|
|
auto query = column.default_desc.expression->clone();
|
2019-01-09 16:16:59 +00:00
|
|
|
auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns);
|
2019-08-09 14:50:04 +00:00
|
|
|
for (const String & dependency : syntax_result->requiredSourceColumns())
|
2018-09-07 15:44:51 +00:00
|
|
|
{
|
|
|
|
if (updated_columns.count(dependency))
|
2019-03-14 15:20:51 +00:00
|
|
|
column_to_affected_materialized[dependency].push_back(column.name);
|
2018-09-07 15:44:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-04-17 17:07:07 +00:00
|
|
|
for (const auto & index : indices_desc.indices)
|
|
|
|
{
|
|
|
|
auto query = index->expr->clone();
|
|
|
|
auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns);
|
2019-08-15 17:46:35 +00:00
|
|
|
const auto required_columns = syntax_result->requiredSourceColumns();
|
2019-04-17 17:07:07 +00:00
|
|
|
|
|
|
|
for (const String & dependency : required_columns)
|
|
|
|
{
|
|
|
|
if (updated_columns.count(dependency))
|
|
|
|
{
|
|
|
|
affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns));
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2018-09-04 13:45:39 +00:00
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
validateUpdateColumns(storage, updated_columns, column_to_affected_materialized);
|
2019-01-09 15:44:20 +00:00
|
|
|
}
|
2018-09-03 13:36:58 +00:00
|
|
|
|
|
|
|
/// First, break a sequence of commands into stages.
|
|
|
|
for (const auto & command : commands)
|
|
|
|
{
|
|
|
|
if (command.type == MutationCommand::DELETE)
|
2018-09-07 15:44:51 +00:00
|
|
|
{
|
2019-08-05 18:06:05 +00:00
|
|
|
if (stages.empty() || !stages.back().column_to_updated.empty())
|
|
|
|
stages.emplace_back(context);
|
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
auto negated_predicate = makeASTFunction("not", command.predicate->clone());
|
|
|
|
stages.back().filters.push_back(negated_predicate);
|
|
|
|
}
|
2018-09-03 13:36:58 +00:00
|
|
|
else if (command.type == MutationCommand::UPDATE)
|
|
|
|
{
|
2019-08-05 18:06:05 +00:00
|
|
|
if (stages.empty() || !stages.back().column_to_updated.empty())
|
|
|
|
stages.emplace_back(context);
|
2018-09-07 15:44:51 +00:00
|
|
|
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
|
2018-09-07 19:14:05 +00:00
|
|
|
stages.emplace_back(context);
|
2018-09-03 13:36:58 +00:00
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
NameSet affected_materialized;
|
|
|
|
|
|
|
|
for (const auto & kv : command.column_to_update_expression)
|
|
|
|
{
|
|
|
|
const String & column = kv.first;
|
|
|
|
|
|
|
|
auto materialized_it = column_to_affected_materialized.find(column);
|
|
|
|
if (materialized_it != column_to_affected_materialized.end())
|
|
|
|
{
|
|
|
|
for (const String & mat_column : materialized_it->second)
|
|
|
|
affected_materialized.emplace(mat_column);
|
|
|
|
}
|
|
|
|
|
|
|
|
const auto & update_expr = kv.second;
|
|
|
|
auto updated_column = makeASTFunction("CAST",
|
|
|
|
makeASTFunction("if",
|
|
|
|
command.predicate->clone(),
|
|
|
|
update_expr->clone(),
|
|
|
|
std::make_shared<ASTIdentifier>(column)),
|
|
|
|
std::make_shared<ASTLiteral>(columns_desc.getPhysical(column).type->getName()));
|
|
|
|
stages.back().column_to_updated.emplace(column, updated_column);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!affected_materialized.empty())
|
|
|
|
{
|
2018-09-07 19:14:05 +00:00
|
|
|
stages.emplace_back(context);
|
2019-03-14 15:20:51 +00:00
|
|
|
for (const auto & column : columns_desc)
|
2018-09-07 15:44:51 +00:00
|
|
|
{
|
2019-03-14 15:20:51 +00:00
|
|
|
if (column.default_desc.kind == ColumnDefaultKind::Materialized)
|
|
|
|
{
|
|
|
|
stages.back().column_to_updated.emplace(
|
|
|
|
column.name,
|
|
|
|
column.default_desc.expression->clone());
|
|
|
|
}
|
2018-09-07 15:44:51 +00:00
|
|
|
}
|
|
|
|
}
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
2019-05-05 17:01:54 +00:00
|
|
|
else if (command.type == MutationCommand::MATERIALIZE_INDEX)
|
|
|
|
{
|
|
|
|
auto it = std::find_if(
|
|
|
|
std::cbegin(indices_desc.indices), std::end(indices_desc.indices),
|
|
|
|
[&](const std::shared_ptr<ASTIndexDeclaration> & index)
|
|
|
|
{
|
|
|
|
return index->name == command.index_name;
|
|
|
|
});
|
|
|
|
if (it == std::cend(indices_desc.indices))
|
|
|
|
throw Exception("Unknown index: " + command.index_name, ErrorCodes::BAD_ARGUMENTS);
|
|
|
|
|
|
|
|
auto query = (*it)->expr->clone();
|
|
|
|
auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns);
|
2019-08-15 17:46:35 +00:00
|
|
|
const auto required_columns = syntax_result->requiredSourceColumns();
|
2019-05-05 18:19:07 +00:00
|
|
|
affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns));
|
2019-05-05 17:01:54 +00:00
|
|
|
}
|
2018-09-03 13:36:58 +00:00
|
|
|
else
|
|
|
|
throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
|
|
|
|
}
|
|
|
|
|
2019-04-17 17:07:07 +00:00
|
|
|
if (!affected_indices_columns.empty())
|
|
|
|
{
|
2019-08-05 18:06:05 +00:00
|
|
|
if (!stages.empty())
|
|
|
|
{
|
|
|
|
std::vector<Stage> stages_copy;
|
|
|
|
/// Copy all filled stages except index calculation stage.
|
|
|
|
for (const auto &stage : stages)
|
|
|
|
{
|
|
|
|
stages_copy.emplace_back(context);
|
|
|
|
stages_copy.back().column_to_updated = stage.column_to_updated;
|
|
|
|
stages_copy.back().output_columns = stage.output_columns;
|
|
|
|
stages_copy.back().filters = stage.filters;
|
|
|
|
}
|
2019-08-05 18:22:44 +00:00
|
|
|
auto first_stage_header = prepareInterpreterSelect(stages_copy, /* dry_run = */ true)->getSampleBlock();
|
2019-08-05 18:06:05 +00:00
|
|
|
auto in = std::make_shared<NullBlockInputStream>(first_stage_header);
|
2019-08-05 18:22:44 +00:00
|
|
|
updated_header = std::make_unique<Block>(addStreamsForLaterStages(stages_copy, in)->getHeader());
|
2019-08-05 18:06:05 +00:00
|
|
|
}
|
|
|
|
/// Special step to recalculate affected indices.
|
2019-04-17 17:07:07 +00:00
|
|
|
stages.emplace_back(context);
|
|
|
|
for (const auto & column : affected_indices_columns)
|
|
|
|
stages.back().column_to_updated.emplace(
|
2019-04-18 15:41:07 +00:00
|
|
|
column, std::make_shared<ASTIdentifier>(column));
|
2019-04-17 17:07:07 +00:00
|
|
|
}
|
|
|
|
|
2019-08-05 18:22:44 +00:00
|
|
|
interpreter_select = prepareInterpreterSelect(stages, dry_run);
|
2019-08-05 18:06:05 +00:00
|
|
|
|
|
|
|
is_prepared = true;
|
|
|
|
}
|
|
|
|
|
2019-08-05 18:22:44 +00:00
|
|
|
std::unique_ptr<InterpreterSelectQuery> MutationsInterpreter::prepareInterpreterSelect(std::vector<Stage> & prepared_stages, bool dry_run)
|
2019-08-05 18:06:05 +00:00
|
|
|
{
|
|
|
|
NamesAndTypesList all_columns = storage->getColumns().getAllPhysical();
|
2019-04-17 17:07:07 +00:00
|
|
|
|
2018-09-03 13:36:58 +00:00
|
|
|
/// Next, for each stage calculate columns changed by this and previous stages.
|
2019-08-05 18:22:44 +00:00
|
|
|
for (size_t i = 0; i < prepared_stages.size(); ++i)
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
2019-08-05 18:22:44 +00:00
|
|
|
if (!prepared_stages[i].filters.empty())
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
|
|
|
for (const auto & column : all_columns)
|
2019-08-05 18:22:44 +00:00
|
|
|
prepared_stages[i].output_columns.insert(column.name);
|
2018-09-03 13:36:58 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (i > 0)
|
2019-08-05 18:22:44 +00:00
|
|
|
prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns;
|
2018-09-03 13:36:58 +00:00
|
|
|
|
2019-08-05 18:22:44 +00:00
|
|
|
if (prepared_stages[i].output_columns.size() < all_columns.size())
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
2019-08-05 18:22:44 +00:00
|
|
|
for (const auto & kv : prepared_stages[i].column_to_updated)
|
|
|
|
prepared_stages[i].output_columns.insert(kv.first);
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Now, calculate `expressions_chain` for each stage except the first.
|
|
|
|
/// Do it backwards to propagate information about columns required as input for a stage to the previous stage.
|
2019-08-05 18:22:44 +00:00
|
|
|
for (size_t i = prepared_stages.size() - 1; i > 0; --i)
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
2019-08-05 18:22:44 +00:00
|
|
|
auto & stage = prepared_stages[i];
|
2018-09-03 13:36:58 +00:00
|
|
|
|
|
|
|
ASTPtr all_asts = std::make_shared<ASTExpressionList>();
|
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
for (const auto & ast : stage.filters)
|
|
|
|
all_asts->children.push_back(ast);
|
2018-09-03 13:36:58 +00:00
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
for (const auto & kv : stage.column_to_updated)
|
|
|
|
all_asts->children.push_back(kv.second);
|
2018-09-03 13:36:58 +00:00
|
|
|
|
|
|
|
/// Add all output columns to prevent ExpressionAnalyzer from deleting them from source columns.
|
|
|
|
for (const String & column : stage.output_columns)
|
|
|
|
all_asts->children.push_back(std::make_shared<ASTIdentifier>(column));
|
|
|
|
|
2019-01-09 16:16:59 +00:00
|
|
|
auto syntax_result = SyntaxAnalyzer(context).analyze(all_asts, all_columns);
|
2018-11-08 17:28:52 +00:00
|
|
|
stage.analyzer = std::make_unique<ExpressionAnalyzer>(all_asts, syntax_result, context);
|
2018-09-03 13:36:58 +00:00
|
|
|
|
|
|
|
ExpressionActionsChain & actions_chain = stage.expressions_chain;
|
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
for (const auto & ast : stage.filters)
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
|
|
|
if (!actions_chain.steps.empty())
|
|
|
|
actions_chain.addStep();
|
2018-09-04 13:45:39 +00:00
|
|
|
stage.analyzer->appendExpression(actions_chain, ast, dry_run);
|
2018-09-07 15:44:51 +00:00
|
|
|
stage.filter_column_names.push_back(ast->getColumnName());
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
if (!stage.column_to_updated.empty())
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
|
|
|
if (!actions_chain.steps.empty())
|
|
|
|
actions_chain.addStep();
|
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
for (const auto & kv : stage.column_to_updated)
|
2018-09-04 13:45:39 +00:00
|
|
|
stage.analyzer->appendExpression(actions_chain, kv.second, dry_run);
|
2018-09-03 13:36:58 +00:00
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
for (const auto & kv : stage.column_to_updated)
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
|
|
|
actions_chain.getLastActions()->add(ExpressionAction::copyColumn(
|
2019-08-05 18:06:05 +00:00
|
|
|
kv.second->getColumnName(), kv.first, /* can_replace = */ true));
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Remove all intermediate columns.
|
|
|
|
actions_chain.addStep();
|
|
|
|
actions_chain.getLastStep().required_output.assign(stage.output_columns.begin(), stage.output_columns.end());
|
|
|
|
|
|
|
|
actions_chain.finalize();
|
|
|
|
|
|
|
|
/// Propagate information about columns needed as input.
|
|
|
|
for (const auto & column : actions_chain.steps.front().actions->getRequiredColumnsWithTypes())
|
2019-08-05 18:22:44 +00:00
|
|
|
prepared_stages[i - 1].output_columns.insert(column.name);
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Execute first stage as a SELECT statement.
|
|
|
|
|
|
|
|
auto select = std::make_shared<ASTSelectQuery>();
|
|
|
|
|
2019-04-09 14:22:35 +00:00
|
|
|
select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
|
2019-08-05 18:22:44 +00:00
|
|
|
for (const auto & column_name : prepared_stages[0].output_columns)
|
2019-04-09 14:22:35 +00:00
|
|
|
select->select()->children.push_back(std::make_shared<ASTIdentifier>(column_name));
|
2018-09-03 13:36:58 +00:00
|
|
|
|
2019-08-05 18:22:44 +00:00
|
|
|
if (!prepared_stages[0].filters.empty())
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
|
|
|
ASTPtr where_expression;
|
2019-08-05 18:22:44 +00:00
|
|
|
if (prepared_stages[0].filters.size() == 1)
|
|
|
|
where_expression = prepared_stages[0].filters[0];
|
2018-09-03 13:36:58 +00:00
|
|
|
else
|
|
|
|
{
|
|
|
|
auto coalesced_predicates = std::make_shared<ASTFunction>();
|
|
|
|
coalesced_predicates->name = "and";
|
|
|
|
coalesced_predicates->arguments = std::make_shared<ASTExpressionList>();
|
|
|
|
coalesced_predicates->children.push_back(coalesced_predicates->arguments);
|
2019-08-05 18:22:44 +00:00
|
|
|
coalesced_predicates->arguments->children = prepared_stages[0].filters;
|
2018-09-03 13:36:58 +00:00
|
|
|
where_expression = std::move(coalesced_predicates);
|
|
|
|
}
|
2019-04-09 14:22:35 +00:00
|
|
|
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
|
2019-08-05 18:06:05 +00:00
|
|
|
return std::make_unique<InterpreterSelectQuery>(select, context, storage, SelectQueryOptions().analyze(dry_run).ignoreLimits());
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
|
2019-08-05 18:22:44 +00:00
|
|
|
BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, BlockInputStreamPtr in) const
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
2019-08-05 18:22:44 +00:00
|
|
|
for (size_t i_stage = 1; i_stage < prepared_stages.size(); ++i_stage)
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
2019-08-05 18:22:44 +00:00
|
|
|
const Stage & stage = prepared_stages[i_stage];
|
2018-09-03 13:36:58 +00:00
|
|
|
|
|
|
|
for (size_t i = 0; i < stage.expressions_chain.steps.size(); ++i)
|
|
|
|
{
|
|
|
|
const auto & step = stage.expressions_chain.steps[i];
|
2018-09-07 15:44:51 +00:00
|
|
|
if (i < stage.filter_column_names.size())
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
2018-09-07 15:44:51 +00:00
|
|
|
/// Execute DELETEs.
|
|
|
|
in = std::make_shared<FilterBlockInputStream>(in, step.actions, stage.filter_column_names[i]);
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// Execute UPDATE or final projection.
|
|
|
|
in = std::make_shared<ExpressionBlockInputStream>(in, step.actions);
|
|
|
|
}
|
|
|
|
}
|
2018-09-04 11:38:41 +00:00
|
|
|
|
|
|
|
const SubqueriesForSets & subqueries_for_sets = stage.analyzer->getSubqueriesForSets();
|
|
|
|
if (!subqueries_for_sets.empty())
|
2019-02-27 18:26:24 +00:00
|
|
|
in = std::make_shared<CreatingSetsBlockInputStream>(in, subqueries_for_sets, context);
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
in = std::make_shared<MaterializingBlockInputStream>(in);
|
|
|
|
|
|
|
|
return in;
|
|
|
|
}
|
|
|
|
|
2019-08-19 19:02:20 +00:00
|
|
|
void MutationsInterpreter::validate(TableStructureReadLockHolder &)
|
2018-09-04 13:45:39 +00:00
|
|
|
{
|
|
|
|
prepare(/* dry_run = */ true);
|
2019-08-09 13:37:42 +00:00
|
|
|
/// Do not use getSampleBlock in order to check the whole pipeline.
|
|
|
|
Block first_stage_header = interpreter_select->execute().in->getHeader();
|
2018-09-04 13:45:39 +00:00
|
|
|
BlockInputStreamPtr in = std::make_shared<NullBlockInputStream>(first_stage_header);
|
2019-08-05 18:22:44 +00:00
|
|
|
addStreamsForLaterStages(stages, in)->getHeader();
|
2018-09-04 13:45:39 +00:00
|
|
|
}
|
|
|
|
|
2019-08-19 19:02:20 +00:00
|
|
|
BlockInputStreamPtr MutationsInterpreter::execute(TableStructureReadLockHolder &)
|
2018-09-04 13:45:39 +00:00
|
|
|
{
|
|
|
|
prepare(/* dry_run = */ false);
|
|
|
|
BlockInputStreamPtr in = interpreter_select->execute().in;
|
2019-08-05 18:22:44 +00:00
|
|
|
auto result_stream = addStreamsForLaterStages(stages, in);
|
2019-08-05 18:06:05 +00:00
|
|
|
if (!updated_header)
|
|
|
|
updated_header = std::make_unique<Block>(result_stream->getHeader());
|
|
|
|
return result_stream;
|
|
|
|
}
|
|
|
|
|
|
|
|
const Block & MutationsInterpreter::getUpdatedHeader() const
|
|
|
|
{
|
|
|
|
return *updated_header;
|
2018-09-04 13:45:39 +00:00
|
|
|
}
|
|
|
|
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|