2019-10-10 14:38:08 +00:00
|
|
|
#include <Functions/FunctionFactory.h>
|
|
|
|
#include <Functions/IFunction.h>
|
|
|
|
#include <Interpreters/InDepthNodeVisitor.h>
|
|
|
|
#include <Interpreters/InterpreterSelectQuery.h>
|
2018-09-03 13:36:58 +00:00
|
|
|
#include <Interpreters/MutationsInterpreter.h>
|
2020-07-22 17:13:05 +00:00
|
|
|
#include <Interpreters/TreeRewriter.h>
|
2019-05-03 02:00:57 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeData.h>
|
2020-11-10 10:23:46 +00:00
|
|
|
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
|
2020-08-31 12:45:20 +00:00
|
|
|
#include <Processors/Transforms/FilterTransform.h>
|
|
|
|
#include <Processors/Transforms/ExpressionTransform.h>
|
|
|
|
#include <Processors/Transforms/CreatingSetsTransform.h>
|
|
|
|
#include <Processors/Transforms/MaterializingTransform.h>
|
|
|
|
#include <Processors/Sources/NullSource.h>
|
|
|
|
#include <Processors/QueryPipeline.h>
|
2020-09-15 17:13:13 +00:00
|
|
|
#include <Processors/QueryPlan/QueryPlan.h>
|
|
|
|
#include <Processors/QueryPlan/ExpressionStep.h>
|
|
|
|
#include <Processors/QueryPlan/FilterStep.h>
|
|
|
|
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
|
|
|
|
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
|
2020-03-26 15:59:09 +00:00
|
|
|
#include <DataStreams/CheckSortedBlockInputStream.h>
|
2018-09-03 13:36:58 +00:00
|
|
|
#include <Parsers/ASTIdentifier.h>
|
|
|
|
#include <Parsers/ASTFunction.h>
|
|
|
|
#include <Parsers/ASTLiteral.h>
|
|
|
|
#include <Parsers/ASTExpressionList.h>
|
|
|
|
#include <Parsers/ASTSelectQuery.h>
|
|
|
|
#include <Parsers/formatAST.h>
|
|
|
|
#include <IO/WriteHelpers.h>
|
2020-09-15 17:13:13 +00:00
|
|
|
#include <Processors/QueryPlan/CreatingSetsStep.h>
|
2021-04-02 11:31:33 +00:00
|
|
|
#include <DataTypes/NestedUtils.h>
|
2018-09-03 13:36:58 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2020-11-10 10:23:46 +00:00
|
|
|
extern const int NOT_IMPLEMENTED;
|
2020-02-25 18:02:41 +00:00
|
|
|
extern const int BAD_ARGUMENTS;
|
|
|
|
extern const int LOGICAL_ERROR;
|
2018-09-03 13:36:58 +00:00
|
|
|
extern const int UNKNOWN_MUTATION_COMMAND;
|
2018-09-04 13:45:39 +00:00
|
|
|
extern const int NO_SUCH_COLUMN_IN_TABLE;
|
|
|
|
extern const int CANNOT_UPDATE_COLUMN;
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
|
2019-10-10 14:38:08 +00:00
|
|
|
namespace
|
|
|
|
{
|
2020-07-06 02:07:55 +00:00
|
|
|
|
2020-04-13 14:34:11 +00:00
|
|
|
/// Helps to detect situations, where non-deterministic functions may be used in mutations of Replicated*MergeTree.
|
2020-07-06 02:07:55 +00:00
|
|
|
class FirstNonDeterministicFunctionMatcher
|
2019-10-10 14:38:08 +00:00
|
|
|
{
|
2020-04-13 14:34:11 +00:00
|
|
|
public:
|
2020-04-13 17:04:17 +00:00
|
|
|
struct Data
|
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr context;
|
2020-04-13 14:34:11 +00:00
|
|
|
std::optional<String> nondeterministic_function_name;
|
|
|
|
};
|
2019-10-10 14:38:08 +00:00
|
|
|
|
2020-04-13 14:34:11 +00:00
|
|
|
static bool needChildVisit(const ASTPtr & /*node*/, const ASTPtr & child)
|
|
|
|
{
|
|
|
|
return child != nullptr;
|
|
|
|
}
|
2019-10-10 14:38:08 +00:00
|
|
|
|
2020-04-13 14:34:11 +00:00
|
|
|
static void visit(const ASTPtr & node, Data & data)
|
2019-10-10 14:38:08 +00:00
|
|
|
{
|
2020-04-13 14:34:11 +00:00
|
|
|
if (data.nondeterministic_function_name)
|
2019-10-10 14:38:08 +00:00
|
|
|
return;
|
|
|
|
|
2020-04-13 14:34:11 +00:00
|
|
|
if (const auto * function = typeid_cast<const ASTFunction *>(node.get()))
|
|
|
|
{
|
2020-04-13 20:02:44 +00:00
|
|
|
/// Property of being deterministic for lambda expression is completely determined
|
|
|
|
/// by the contents of its definition, so we just proceed to it.
|
2020-04-13 14:34:11 +00:00
|
|
|
if (function->name != "lambda")
|
|
|
|
{
|
|
|
|
const auto func = FunctionFactory::instance().get(function->name, data.context);
|
|
|
|
if (!func->isDeterministic())
|
|
|
|
data.nondeterministic_function_name = func->getName();
|
|
|
|
}
|
|
|
|
}
|
2019-10-10 14:38:08 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2020-07-06 02:07:55 +00:00
|
|
|
using FirstNonDeterministicFunctionFinder = InDepthNodeVisitor<FirstNonDeterministicFunctionMatcher, true>;
|
2019-10-10 14:38:08 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
std::optional<String> findFirstNonDeterministicFunctionName(const MutationCommand & command, ContextPtr context)
|
2019-10-10 14:38:08 +00:00
|
|
|
{
|
2020-07-06 02:07:55 +00:00
|
|
|
FirstNonDeterministicFunctionMatcher::Data finder_data{context, std::nullopt};
|
2019-10-10 14:38:08 +00:00
|
|
|
|
|
|
|
switch (command.type)
|
|
|
|
{
|
|
|
|
case MutationCommand::UPDATE:
|
|
|
|
{
|
|
|
|
auto update_assignments_ast = command.ast->as<const ASTAlterCommand &>().update_assignments->clone();
|
2020-07-06 02:07:55 +00:00
|
|
|
FirstNonDeterministicFunctionFinder(finder_data).visit(update_assignments_ast);
|
2019-10-10 14:38:08 +00:00
|
|
|
|
|
|
|
if (finder_data.nondeterministic_function_name)
|
|
|
|
return finder_data.nondeterministic_function_name;
|
|
|
|
|
2020-11-10 10:23:46 +00:00
|
|
|
/// Currently UPDATE and DELETE both always have predicates so we can use fallthrough
|
2019-10-10 14:38:08 +00:00
|
|
|
[[fallthrough]];
|
|
|
|
}
|
|
|
|
|
|
|
|
case MutationCommand::DELETE:
|
|
|
|
{
|
|
|
|
auto predicate_ast = command.predicate->clone();
|
2020-07-06 02:07:55 +00:00
|
|
|
FirstNonDeterministicFunctionFinder(finder_data).visit(predicate_ast);
|
2019-10-10 14:38:08 +00:00
|
|
|
|
|
|
|
return finder_data.nondeterministic_function_name;
|
|
|
|
}
|
|
|
|
|
|
|
|
default:
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
return {};
|
|
|
|
}
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
ASTPtr prepareQueryAffectedAST(const std::vector<MutationCommand> & commands, const StoragePtr & storage, ContextPtr context)
|
2019-11-06 17:28:03 +00:00
|
|
|
{
|
2019-11-07 08:40:36 +00:00
|
|
|
/// Execute `SELECT count() FROM storage WHERE predicate1 OR predicate2 OR ...` query.
|
2020-08-15 14:50:56 +00:00
|
|
|
/// The result can differ from the number of affected rows (e.g. if there is an UPDATE command that
|
2019-11-07 08:40:36 +00:00
|
|
|
/// changes how many rows satisfy the predicates of the subsequent commands).
|
|
|
|
/// But we can be sure that if count = 0, then no rows will be touched.
|
|
|
|
|
|
|
|
auto select = std::make_shared<ASTSelectQuery>();
|
|
|
|
|
|
|
|
select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
|
|
|
|
auto count_func = std::make_shared<ASTFunction>();
|
|
|
|
count_func->name = "count";
|
|
|
|
count_func->arguments = std::make_shared<ASTExpressionList>();
|
|
|
|
select->select()->children.push_back(count_func);
|
|
|
|
|
2020-11-10 10:23:46 +00:00
|
|
|
ASTs conditions;
|
|
|
|
for (const MutationCommand & command : commands)
|
2019-11-07 08:40:36 +00:00
|
|
|
{
|
2020-11-10 10:23:46 +00:00
|
|
|
if (ASTPtr condition = getPartitionAndPredicateExpressionForMutationCommand(command, storage, context))
|
|
|
|
conditions.push_back(std::move(condition));
|
|
|
|
}
|
2019-11-07 08:40:36 +00:00
|
|
|
|
2020-11-10 10:23:46 +00:00
|
|
|
if (conditions.size() > 1)
|
|
|
|
{
|
|
|
|
auto coalesced_predicates = makeASTFunction("or");
|
|
|
|
coalesced_predicates->arguments->children = std::move(conditions);
|
2019-11-07 08:40:36 +00:00
|
|
|
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(coalesced_predicates));
|
|
|
|
}
|
2020-11-10 10:23:46 +00:00
|
|
|
else if (conditions.size() == 1)
|
|
|
|
{
|
|
|
|
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(conditions.front()));
|
|
|
|
}
|
2019-11-07 08:40:36 +00:00
|
|
|
|
|
|
|
return select;
|
2019-11-06 17:28:03 +00:00
|
|
|
}
|
|
|
|
|
2020-06-16 12:19:21 +00:00
|
|
|
ColumnDependencies getAllColumnDependencies(const StorageMetadataPtr & metadata_snapshot, const NameSet & updated_columns)
|
2020-02-17 20:39:24 +00:00
|
|
|
{
|
|
|
|
NameSet new_updated_columns = updated_columns;
|
|
|
|
ColumnDependencies dependencies;
|
|
|
|
while (!new_updated_columns.empty())
|
|
|
|
{
|
2020-06-16 12:19:21 +00:00
|
|
|
auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns);
|
2020-02-17 20:39:24 +00:00
|
|
|
new_updated_columns.clear();
|
|
|
|
for (const auto & dependency : new_dependencies)
|
|
|
|
{
|
|
|
|
if (!dependencies.count(dependency))
|
|
|
|
{
|
|
|
|
dependencies.insert(dependency);
|
|
|
|
if (!dependency.isReadOnly())
|
|
|
|
new_updated_columns.insert(dependency.column_name);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return dependencies;
|
|
|
|
}
|
|
|
|
|
2020-08-31 12:45:20 +00:00
|
|
|
}
|
2019-10-10 14:38:08 +00:00
|
|
|
|
2020-11-10 10:23:46 +00:00
|
|
|
|
2019-11-07 08:40:36 +00:00
|
|
|
bool isStorageTouchedByMutations(
|
2020-11-10 10:23:46 +00:00
|
|
|
const StoragePtr & storage,
|
2020-06-17 11:52:19 +00:00
|
|
|
const StorageMetadataPtr & metadata_snapshot,
|
2019-11-07 08:40:36 +00:00
|
|
|
const std::vector<MutationCommand> & commands,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr context_copy)
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
|
|
|
if (commands.empty())
|
|
|
|
return false;
|
|
|
|
|
2020-11-10 10:23:46 +00:00
|
|
|
bool all_commands_can_be_skipped = true;
|
|
|
|
auto storage_from_merge_tree_data_part = std::dynamic_pointer_cast<StorageFromMergeTreeDataPart>(storage);
|
2018-09-03 13:36:58 +00:00
|
|
|
for (const MutationCommand & command : commands)
|
|
|
|
{
|
|
|
|
if (!command.predicate) /// The command touches all rows.
|
|
|
|
return true;
|
2020-11-10 10:23:46 +00:00
|
|
|
|
|
|
|
if (command.partition && !storage_from_merge_tree_data_part)
|
|
|
|
throw Exception("ALTER UPDATE/DELETE ... IN PARTITION is not supported for non-MergeTree tables", ErrorCodes::NOT_IMPLEMENTED);
|
|
|
|
|
|
|
|
if (command.partition && storage_from_merge_tree_data_part)
|
|
|
|
{
|
|
|
|
const String partition_id = storage_from_merge_tree_data_part->getPartitionIDFromQuery(command.partition, context_copy);
|
|
|
|
if (partition_id == storage_from_merge_tree_data_part->getPartitionId())
|
|
|
|
all_commands_can_be_skipped = false;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
all_commands_can_be_skipped = false;
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
|
2020-11-10 10:23:46 +00:00
|
|
|
if (all_commands_can_be_skipped)
|
|
|
|
return false;
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
context_copy->setSetting("max_streams_to_max_threads_ratio", 1);
|
|
|
|
context_copy->setSetting("max_threads", 1);
|
2018-09-03 13:36:58 +00:00
|
|
|
|
2020-11-10 10:23:46 +00:00
|
|
|
ASTPtr select_query = prepareQueryAffectedAST(commands, storage, context_copy);
|
2019-11-06 17:28:03 +00:00
|
|
|
|
|
|
|
/// Interpreter must be alive, when we use result of execute() method.
|
|
|
|
/// For some reason it may copy context and and give it into ExpressionBlockInputStream
|
|
|
|
/// after that we will use context from destroyed stack frame in our stream.
|
2020-06-17 11:52:19 +00:00
|
|
|
InterpreterSelectQuery interpreter(select_query, context_copy, storage, metadata_snapshot, SelectQueryOptions().ignoreLimits());
|
2020-05-28 14:03:32 +00:00
|
|
|
BlockInputStreamPtr in = interpreter.execute().getInputStream();
|
2018-09-03 13:36:58 +00:00
|
|
|
|
|
|
|
Block block = in->read();
|
|
|
|
if (!block.rows())
|
|
|
|
return false;
|
|
|
|
else if (block.rows() != 1)
|
|
|
|
throw Exception("count() expression returned " + toString(block.rows()) + " rows, not 1",
|
|
|
|
ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
auto count = (*block.getByName("count()").column)[0].get<UInt64>();
|
|
|
|
return count != 0;
|
2020-11-10 10:23:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ASTPtr getPartitionAndPredicateExpressionForMutationCommand(
|
|
|
|
const MutationCommand & command,
|
|
|
|
const StoragePtr & storage,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr context
|
2020-11-10 10:23:46 +00:00
|
|
|
)
|
|
|
|
{
|
|
|
|
ASTPtr partition_predicate_as_ast_func;
|
|
|
|
if (command.partition)
|
|
|
|
{
|
|
|
|
String partition_id;
|
|
|
|
|
|
|
|
auto storage_merge_tree = std::dynamic_pointer_cast<MergeTreeData>(storage);
|
|
|
|
auto storage_from_merge_tree_data_part = std::dynamic_pointer_cast<StorageFromMergeTreeDataPart>(storage);
|
|
|
|
if (storage_merge_tree)
|
|
|
|
partition_id = storage_merge_tree->getPartitionIDFromQuery(command.partition, context);
|
|
|
|
else if (storage_from_merge_tree_data_part)
|
|
|
|
partition_id = storage_from_merge_tree_data_part->getPartitionIDFromQuery(command.partition, context);
|
|
|
|
else
|
|
|
|
throw Exception("ALTER UPDATE/DELETE ... IN PARTITION is not supported for non-MergeTree tables", ErrorCodes::NOT_IMPLEMENTED);
|
2019-11-07 08:40:36 +00:00
|
|
|
|
2020-11-10 10:23:46 +00:00
|
|
|
partition_predicate_as_ast_func = makeASTFunction("equals",
|
|
|
|
std::make_shared<ASTIdentifier>("_partition_id"),
|
|
|
|
std::make_shared<ASTLiteral>(partition_id)
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (command.predicate && command.partition)
|
|
|
|
return makeASTFunction("and", command.predicate->clone(), std::move(partition_predicate_as_ast_func));
|
|
|
|
else
|
|
|
|
return command.predicate ? command.predicate->clone() : partition_predicate_as_ast_func;
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
|
2020-11-10 10:23:46 +00:00
|
|
|
|
2019-11-07 08:40:36 +00:00
|
|
|
MutationsInterpreter::MutationsInterpreter(
|
|
|
|
StoragePtr storage_,
|
2020-06-17 11:52:19 +00:00
|
|
|
const StorageMetadataPtr & metadata_snapshot_,
|
2019-12-19 13:10:57 +00:00
|
|
|
MutationCommands commands_,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr context_,
|
2019-11-07 08:40:36 +00:00
|
|
|
bool can_execute_)
|
|
|
|
: storage(std::move(storage_))
|
2020-06-17 11:52:19 +00:00
|
|
|
, metadata_snapshot(metadata_snapshot_)
|
2019-11-07 08:40:36 +00:00
|
|
|
, commands(std::move(commands_))
|
|
|
|
, context(context_)
|
|
|
|
, can_execute(can_execute_)
|
2021-03-05 12:32:55 +00:00
|
|
|
, select_limits(SelectQueryOptions().analyze(!can_execute).ignoreLimits())
|
2019-11-07 08:40:36 +00:00
|
|
|
{
|
|
|
|
mutation_ast = prepare(!can_execute);
|
|
|
|
}
|
2018-09-04 13:45:39 +00:00
|
|
|
|
2020-06-17 10:34:23 +00:00
|
|
|
static NameSet getKeyColumns(const StoragePtr & storage, const StorageMetadataPtr & metadata_snapshot)
|
2018-09-04 13:45:39 +00:00
|
|
|
{
|
2019-05-03 02:00:57 +00:00
|
|
|
const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get());
|
|
|
|
if (!merge_tree_data)
|
2018-09-07 15:44:51 +00:00
|
|
|
return {};
|
2018-09-04 13:45:39 +00:00
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
NameSet key_columns;
|
|
|
|
|
2020-06-17 10:34:23 +00:00
|
|
|
for (const String & col : metadata_snapshot->getColumnsRequiredForPartitionKey())
|
2020-05-20 12:16:55 +00:00
|
|
|
key_columns.insert(col);
|
2018-09-07 15:44:51 +00:00
|
|
|
|
2020-06-17 11:05:11 +00:00
|
|
|
for (const String & col : metadata_snapshot->getColumnsRequiredForSortingKey())
|
2020-05-20 18:11:38 +00:00
|
|
|
key_columns.insert(col);
|
2018-11-09 19:01:39 +00:00
|
|
|
/// We don't process sample_by_ast separately because it must be among the primary key columns.
|
2018-09-07 15:44:51 +00:00
|
|
|
|
|
|
|
if (!merge_tree_data->merging_params.sign_column.empty())
|
|
|
|
key_columns.insert(merge_tree_data->merging_params.sign_column);
|
|
|
|
|
2018-10-11 14:53:23 +00:00
|
|
|
if (!merge_tree_data->merging_params.version_column.empty())
|
|
|
|
key_columns.insert(merge_tree_data->merging_params.version_column);
|
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
return key_columns;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void validateUpdateColumns(
|
2020-06-17 10:34:23 +00:00
|
|
|
const StoragePtr & storage,
|
|
|
|
const StorageMetadataPtr & metadata_snapshot, const NameSet & updated_columns,
|
2018-09-07 15:44:51 +00:00
|
|
|
const std::unordered_map<String, Names> & column_to_affected_materialized)
|
|
|
|
{
|
2020-06-17 10:34:23 +00:00
|
|
|
NameSet key_columns = getKeyColumns(storage, metadata_snapshot);
|
2018-09-07 15:44:51 +00:00
|
|
|
|
|
|
|
for (const String & column_name : updated_columns)
|
|
|
|
{
|
|
|
|
auto found = false;
|
2020-06-17 10:34:23 +00:00
|
|
|
for (const auto & col : metadata_snapshot->getColumns().getOrdinary())
|
2018-09-04 13:45:39 +00:00
|
|
|
{
|
2018-09-07 15:44:51 +00:00
|
|
|
if (col.name == column_name)
|
|
|
|
{
|
|
|
|
found = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2018-09-04 13:45:39 +00:00
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
if (!found)
|
|
|
|
{
|
2020-06-17 10:34:23 +00:00
|
|
|
for (const auto & col : metadata_snapshot->getColumns().getMaterialized())
|
2018-09-04 13:45:39 +00:00
|
|
|
{
|
|
|
|
if (col.name == column_name)
|
2019-06-15 12:06:22 +00:00
|
|
|
throw Exception("Cannot UPDATE materialized column " + backQuote(column_name), ErrorCodes::CANNOT_UPDATE_COLUMN);
|
2018-09-04 13:45:39 +00:00
|
|
|
}
|
|
|
|
|
2019-06-15 12:06:22 +00:00
|
|
|
throw Exception("There is no column " + backQuote(column_name) + " in table", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
|
2018-09-07 15:44:51 +00:00
|
|
|
}
|
2018-09-04 13:45:39 +00:00
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
if (key_columns.count(column_name))
|
2019-06-15 12:06:22 +00:00
|
|
|
throw Exception("Cannot UPDATE key column " + backQuote(column_name), ErrorCodes::CANNOT_UPDATE_COLUMN);
|
2018-09-07 15:44:51 +00:00
|
|
|
|
|
|
|
auto materialized_it = column_to_affected_materialized.find(column_name);
|
|
|
|
if (materialized_it != column_to_affected_materialized.end())
|
|
|
|
{
|
|
|
|
for (const String & materialized : materialized_it->second)
|
|
|
|
{
|
|
|
|
if (key_columns.count(materialized))
|
2019-06-15 12:06:22 +00:00
|
|
|
throw Exception("Updated column " + backQuote(column_name) + " affects MATERIALIZED column "
|
|
|
|
+ backQuote(materialized) + ", which is a key column. Cannot UPDATE it.",
|
2018-09-07 15:44:51 +00:00
|
|
|
ErrorCodes::CANNOT_UPDATE_COLUMN);
|
2018-09-04 13:45:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-16 14:19:22 +00:00
|
|
|
/// Returns ASTs of updated nested subcolumns, if all of subcolumns were updated.
|
|
|
|
/// They are used to validate sizes of nested arrays.
|
|
|
|
/// If some of subcolumns were updated and some weren't,
|
|
|
|
/// it makes sense to validate only updated columns with their old versions,
|
|
|
|
/// because their sizes couldn't change, since sizes of all nested subcolumns must be consistent.
|
|
|
|
static std::optional<std::vector<ASTPtr>> getExpressionsOfUpdatedNestedSubcolumns(
|
|
|
|
const String & column_name,
|
|
|
|
const NamesAndTypesList & all_columns,
|
|
|
|
const std::unordered_map<String, ASTPtr> & column_to_update_expression)
|
2021-04-02 11:31:33 +00:00
|
|
|
{
|
|
|
|
std::vector<ASTPtr> res;
|
2021-04-16 14:19:22 +00:00
|
|
|
auto source_name = Nested::splitName(column_name).first;
|
2021-04-02 11:31:33 +00:00
|
|
|
|
2021-04-13 09:31:49 +00:00
|
|
|
/// Check this nested subcolumn
|
2021-04-16 14:19:22 +00:00
|
|
|
for (const auto & column : all_columns)
|
2021-04-13 09:31:49 +00:00
|
|
|
{
|
2021-04-16 14:19:22 +00:00
|
|
|
auto split = Nested::splitName(column.name);
|
|
|
|
if (isArray(column.type) && split.first == source_name && !split.second.empty())
|
2021-04-02 11:31:33 +00:00
|
|
|
{
|
2021-04-16 14:19:22 +00:00
|
|
|
auto it = column_to_update_expression.find(column.name);
|
|
|
|
if (it == column_to_update_expression.end())
|
|
|
|
return {};
|
|
|
|
|
|
|
|
res.push_back(it->second);
|
2021-04-02 11:31:33 +00:00
|
|
|
}
|
2021-04-13 09:31:49 +00:00
|
|
|
}
|
2021-04-02 11:31:33 +00:00
|
|
|
|
2021-04-16 14:19:22 +00:00
|
|
|
return res;
|
2021-04-02 11:31:33 +00:00
|
|
|
}
|
2018-09-04 13:45:39 +00:00
|
|
|
|
2019-08-05 03:24:59 +00:00
|
|
|
ASTPtr MutationsInterpreter::prepare(bool dry_run)
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
|
|
|
if (is_prepared)
|
2018-09-04 13:45:39 +00:00
|
|
|
throw Exception("MutationsInterpreter is already prepared. It is a bug.", ErrorCodes::LOGICAL_ERROR);
|
2018-09-03 13:36:58 +00:00
|
|
|
|
|
|
|
if (commands.empty())
|
|
|
|
throw Exception("Empty mutation commands list", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2020-01-15 13:00:08 +00:00
|
|
|
|
2020-06-17 09:38:47 +00:00
|
|
|
const ColumnsDescription & columns_desc = metadata_snapshot->getColumns();
|
|
|
|
const IndicesDescription & indices_desc = metadata_snapshot->getSecondaryIndices();
|
2018-09-07 15:44:51 +00:00
|
|
|
NamesAndTypesList all_columns = columns_desc.getAllPhysical();
|
|
|
|
|
|
|
|
NameSet updated_columns;
|
|
|
|
for (const MutationCommand & command : commands)
|
|
|
|
{
|
|
|
|
for (const auto & kv : command.column_to_update_expression)
|
2020-01-15 13:00:08 +00:00
|
|
|
{
|
2018-09-07 15:44:51 +00:00
|
|
|
updated_columns.insert(kv.first);
|
2020-01-15 13:00:08 +00:00
|
|
|
}
|
2018-09-07 15:44:51 +00:00
|
|
|
}
|
|
|
|
|
2019-04-17 17:07:07 +00:00
|
|
|
/// We need to know which columns affect which MATERIALIZED columns and data skipping indices
|
|
|
|
/// to recalculate them if dependencies are updated.
|
2018-09-07 15:44:51 +00:00
|
|
|
std::unordered_map<String, Names> column_to_affected_materialized;
|
|
|
|
if (!updated_columns.empty())
|
|
|
|
{
|
2019-03-14 15:20:51 +00:00
|
|
|
for (const auto & column : columns_desc)
|
2018-09-07 15:44:51 +00:00
|
|
|
{
|
2019-03-14 15:20:51 +00:00
|
|
|
if (column.default_desc.kind == ColumnDefaultKind::Materialized)
|
2018-09-07 15:44:51 +00:00
|
|
|
{
|
2019-03-14 15:20:51 +00:00
|
|
|
auto query = column.default_desc.expression->clone();
|
2020-07-22 17:13:05 +00:00
|
|
|
auto syntax_result = TreeRewriter(context).analyze(query, all_columns);
|
2019-08-09 14:50:04 +00:00
|
|
|
for (const String & dependency : syntax_result->requiredSourceColumns())
|
2018-09-07 15:44:51 +00:00
|
|
|
{
|
|
|
|
if (updated_columns.count(dependency))
|
2019-03-14 15:20:51 +00:00
|
|
|
column_to_affected_materialized[dependency].push_back(column.name);
|
2018-09-07 15:44:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2018-09-04 13:45:39 +00:00
|
|
|
|
2020-06-17 10:34:23 +00:00
|
|
|
validateUpdateColumns(storage, metadata_snapshot, updated_columns, column_to_affected_materialized);
|
2019-01-09 15:44:20 +00:00
|
|
|
}
|
2018-09-03 13:36:58 +00:00
|
|
|
|
2020-02-18 19:03:40 +00:00
|
|
|
/// Columns, that we need to read for calculation of skip indices or TTL expressions.
|
2020-06-16 12:19:21 +00:00
|
|
|
auto dependencies = getAllColumnDependencies(metadata_snapshot, updated_columns);
|
2020-02-17 20:39:24 +00:00
|
|
|
|
2018-09-03 13:36:58 +00:00
|
|
|
/// First, break a sequence of commands into stages.
|
2021-04-13 09:31:49 +00:00
|
|
|
for (auto & command : commands)
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
|
|
|
if (command.type == MutationCommand::DELETE)
|
2018-09-07 15:44:51 +00:00
|
|
|
{
|
2019-08-05 18:06:05 +00:00
|
|
|
if (stages.empty() || !stages.back().column_to_updated.empty())
|
|
|
|
stages.emplace_back(context);
|
|
|
|
|
2020-11-10 10:23:46 +00:00
|
|
|
auto negated_predicate = makeASTFunction("isZeroOrNull", getPartitionAndPredicateExpressionForMutationCommand(command));
|
2018-09-07 15:44:51 +00:00
|
|
|
stages.back().filters.push_back(negated_predicate);
|
|
|
|
}
|
2018-09-03 13:36:58 +00:00
|
|
|
else if (command.type == MutationCommand::UPDATE)
|
|
|
|
{
|
2019-08-05 18:06:05 +00:00
|
|
|
if (stages.empty() || !stages.back().column_to_updated.empty())
|
|
|
|
stages.emplace_back(context);
|
2018-09-07 15:44:51 +00:00
|
|
|
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
|
2018-09-07 19:14:05 +00:00
|
|
|
stages.emplace_back(context);
|
2018-09-03 13:36:58 +00:00
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
NameSet affected_materialized;
|
|
|
|
|
|
|
|
for (const auto & kv : command.column_to_update_expression)
|
|
|
|
{
|
|
|
|
const String & column = kv.first;
|
|
|
|
|
|
|
|
auto materialized_it = column_to_affected_materialized.find(column);
|
|
|
|
if (materialized_it != column_to_affected_materialized.end())
|
|
|
|
{
|
|
|
|
for (const String & mat_column : materialized_it->second)
|
|
|
|
affected_materialized.emplace(mat_column);
|
|
|
|
}
|
|
|
|
|
2020-07-21 13:31:59 +00:00
|
|
|
/// When doing UPDATE column = expression WHERE condition
|
|
|
|
/// we will replace column to the result of the following expression:
|
|
|
|
///
|
|
|
|
/// CAST(if(condition, CAST(expression, type), column), type)
|
|
|
|
///
|
|
|
|
/// Inner CAST is needed to make 'if' work when branches have no common type,
|
|
|
|
/// example: type is UInt64, UPDATE x = -1 or UPDATE x = x - 1.
|
|
|
|
///
|
|
|
|
/// Outer CAST is added just in case if we don't trust the returning type of 'if'.
|
|
|
|
|
2021-04-16 14:19:22 +00:00
|
|
|
const auto & type = columns_desc.getPhysical(column).type;
|
|
|
|
auto type_literal = std::make_shared<ASTLiteral>(type->getName());
|
2020-07-21 13:31:59 +00:00
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
const auto & update_expr = kv.second;
|
2021-04-02 11:31:33 +00:00
|
|
|
|
|
|
|
ASTPtr condition = getPartitionAndPredicateExpressionForMutationCommand(command);
|
|
|
|
|
|
|
|
/// And new check validateNestedArraySizes for Nested subcolumns
|
2021-04-16 14:19:22 +00:00
|
|
|
if (isArray(type) && !Nested::splitName(column).second.empty())
|
2021-04-02 11:31:33 +00:00
|
|
|
{
|
|
|
|
std::shared_ptr<ASTFunction> function = nullptr;
|
|
|
|
|
2021-04-16 14:19:22 +00:00
|
|
|
auto nested_update_exprs = getExpressionsOfUpdatedNestedSubcolumns(column, all_columns, command.column_to_update_expression);
|
|
|
|
if (!nested_update_exprs)
|
2021-04-02 11:31:33 +00:00
|
|
|
{
|
|
|
|
function = makeASTFunction("validateNestedArraySizes",
|
|
|
|
condition,
|
|
|
|
update_expr->clone(),
|
|
|
|
std::make_shared<ASTIdentifier>(column));
|
2021-04-07 02:39:01 +00:00
|
|
|
condition = makeASTFunction("and", condition, function);
|
2021-04-02 11:31:33 +00:00
|
|
|
}
|
2021-04-16 14:19:22 +00:00
|
|
|
else if (nested_update_exprs->size() > 1)
|
2021-04-02 11:31:33 +00:00
|
|
|
{
|
|
|
|
function = std::make_shared<ASTFunction>();
|
|
|
|
function->name = "validateNestedArraySizes";
|
|
|
|
function->arguments = std::make_shared<ASTExpressionList>();
|
|
|
|
function->children.push_back(function->arguments);
|
|
|
|
function->arguments->children.push_back(condition);
|
2021-04-16 14:19:22 +00:00
|
|
|
for (const auto & it : *nested_update_exprs)
|
2021-04-02 11:31:33 +00:00
|
|
|
function->arguments->children.push_back(it->clone());
|
2021-04-07 02:39:01 +00:00
|
|
|
condition = makeASTFunction("and", condition, function);
|
2021-04-02 11:31:33 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-14 11:09:36 +00:00
|
|
|
auto updated_column = makeASTFunction("CAST",
|
2018-09-07 15:44:51 +00:00
|
|
|
makeASTFunction("if",
|
2021-04-02 11:31:33 +00:00
|
|
|
condition,
|
2021-02-14 11:09:36 +00:00
|
|
|
makeASTFunction("CAST",
|
2020-07-21 13:31:59 +00:00
|
|
|
update_expr->clone(),
|
|
|
|
type_literal),
|
2018-09-07 15:44:51 +00:00
|
|
|
std::make_shared<ASTIdentifier>(column)),
|
2020-07-21 13:31:59 +00:00
|
|
|
type_literal);
|
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
stages.back().column_to_updated.emplace(column, updated_column);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!affected_materialized.empty())
|
|
|
|
{
|
2018-09-07 19:14:05 +00:00
|
|
|
stages.emplace_back(context);
|
2019-03-14 15:20:51 +00:00
|
|
|
for (const auto & column : columns_desc)
|
2018-09-07 15:44:51 +00:00
|
|
|
{
|
2019-03-14 15:20:51 +00:00
|
|
|
if (column.default_desc.kind == ColumnDefaultKind::Materialized)
|
|
|
|
{
|
|
|
|
stages.back().column_to_updated.emplace(
|
|
|
|
column.name,
|
|
|
|
column.default_desc.expression->clone());
|
|
|
|
}
|
2018-09-07 15:44:51 +00:00
|
|
|
}
|
|
|
|
}
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
2019-05-05 17:01:54 +00:00
|
|
|
else if (command.type == MutationCommand::MATERIALIZE_INDEX)
|
|
|
|
{
|
|
|
|
auto it = std::find_if(
|
2020-05-28 12:47:17 +00:00
|
|
|
std::cbegin(indices_desc), std::end(indices_desc),
|
2020-05-28 13:09:03 +00:00
|
|
|
[&](const IndexDescription & index)
|
2019-05-05 17:01:54 +00:00
|
|
|
{
|
2020-05-28 12:37:05 +00:00
|
|
|
return index.name == command.index_name;
|
2019-05-05 17:01:54 +00:00
|
|
|
});
|
2020-05-28 12:47:17 +00:00
|
|
|
if (it == std::cend(indices_desc))
|
2019-05-05 17:01:54 +00:00
|
|
|
throw Exception("Unknown index: " + command.index_name, ErrorCodes::BAD_ARGUMENTS);
|
|
|
|
|
2020-05-28 12:37:05 +00:00
|
|
|
auto query = (*it).expression_list_ast->clone();
|
2020-07-22 17:13:05 +00:00
|
|
|
auto syntax_result = TreeRewriter(context).analyze(query, all_columns);
|
2019-08-15 17:46:35 +00:00
|
|
|
const auto required_columns = syntax_result->requiredSourceColumns();
|
2020-02-17 20:39:24 +00:00
|
|
|
for (const auto & column : required_columns)
|
|
|
|
dependencies.emplace(column, ColumnDependency::SKIP_INDEX);
|
2019-05-05 17:01:54 +00:00
|
|
|
}
|
2020-01-22 13:24:20 +00:00
|
|
|
else if (command.type == MutationCommand::MATERIALIZE_TTL)
|
|
|
|
{
|
2020-06-17 13:39:26 +00:00
|
|
|
if (metadata_snapshot->hasRowsTTL())
|
2020-02-18 19:03:40 +00:00
|
|
|
{
|
|
|
|
for (const auto & column : all_columns)
|
|
|
|
dependencies.emplace(column.name, ColumnDependency::TTL_TARGET);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
NameSet new_updated_columns;
|
2020-06-17 16:39:58 +00:00
|
|
|
auto column_ttls = metadata_snapshot->getColumns().getColumnTTLs();
|
2020-02-18 19:03:40 +00:00
|
|
|
for (const auto & elem : column_ttls)
|
|
|
|
{
|
|
|
|
dependencies.emplace(elem.first, ColumnDependency::TTL_TARGET);
|
|
|
|
new_updated_columns.insert(elem.first);
|
|
|
|
}
|
2020-01-22 13:24:20 +00:00
|
|
|
|
2020-02-18 19:03:40 +00:00
|
|
|
auto all_columns_vec = all_columns.getNames();
|
2020-06-16 12:19:21 +00:00
|
|
|
auto all_dependencies = getAllColumnDependencies(metadata_snapshot, NameSet(all_columns_vec.begin(), all_columns_vec.end()));
|
2020-01-22 13:24:20 +00:00
|
|
|
|
2020-02-18 19:03:40 +00:00
|
|
|
for (const auto & dependency : all_dependencies)
|
|
|
|
{
|
|
|
|
if (dependency.kind == ColumnDependency::TTL_EXPRESSION)
|
|
|
|
dependencies.insert(dependency);
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Recalc only skip indices of columns, that could be updated by TTL.
|
2020-06-16 12:19:21 +00:00
|
|
|
auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns);
|
2020-02-18 19:03:40 +00:00
|
|
|
for (const auto & dependency : new_dependencies)
|
|
|
|
{
|
|
|
|
if (dependency.kind == ColumnDependency::SKIP_INDEX)
|
|
|
|
dependencies.insert(dependency);
|
|
|
|
}
|
2020-02-19 17:18:12 +00:00
|
|
|
|
|
|
|
if (dependencies.empty())
|
|
|
|
{
|
|
|
|
/// Very rare case. It can happen if we have only one MOVE TTL with constant expression.
|
|
|
|
/// But we still have to read at least one column.
|
|
|
|
dependencies.emplace(all_columns.front().name, ColumnDependency::TTL_EXPRESSION);
|
|
|
|
}
|
2020-02-18 19:03:40 +00:00
|
|
|
}
|
2019-05-05 17:01:54 +00:00
|
|
|
}
|
2020-01-17 13:54:22 +00:00
|
|
|
else if (command.type == MutationCommand::READ_COLUMN)
|
2020-01-13 16:39:20 +00:00
|
|
|
{
|
2020-01-15 13:00:08 +00:00
|
|
|
if (stages.empty() || !stages.back().column_to_updated.empty())
|
|
|
|
stages.emplace_back(context);
|
|
|
|
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
|
|
|
|
stages.emplace_back(context);
|
|
|
|
|
2020-01-17 13:54:22 +00:00
|
|
|
stages.back().column_to_updated.emplace(command.column_name, std::make_shared<ASTIdentifier>(command.column_name));
|
2020-01-13 16:39:20 +00:00
|
|
|
}
|
2018-09-03 13:36:58 +00:00
|
|
|
else
|
|
|
|
throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
|
|
|
|
}
|
|
|
|
|
2020-07-21 17:48:39 +00:00
|
|
|
/// We care about affected indices because we also need to rewrite them
|
2020-02-17 20:39:24 +00:00
|
|
|
/// when one of index columns updated or filtered with delete.
|
2020-08-08 01:01:47 +00:00
|
|
|
/// The same about columns, that are needed for calculation of TTL expressions.
|
2020-02-17 20:39:24 +00:00
|
|
|
if (!dependencies.empty())
|
2019-04-17 17:07:07 +00:00
|
|
|
{
|
2020-02-18 19:03:40 +00:00
|
|
|
NameSet changed_columns;
|
|
|
|
NameSet unchanged_columns;
|
|
|
|
for (const auto & dependency : dependencies)
|
2019-08-05 18:06:05 +00:00
|
|
|
{
|
2020-02-18 19:03:40 +00:00
|
|
|
if (dependency.isReadOnly())
|
|
|
|
unchanged_columns.insert(dependency.column_name);
|
|
|
|
else
|
|
|
|
changed_columns.insert(dependency.column_name);
|
|
|
|
}
|
2019-09-03 05:48:02 +00:00
|
|
|
|
2020-02-18 19:03:40 +00:00
|
|
|
if (!changed_columns.empty())
|
|
|
|
{
|
|
|
|
if (stages.empty() || !stages.back().column_to_updated.empty())
|
|
|
|
stages.emplace_back(context);
|
|
|
|
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
|
|
|
|
stages.emplace_back(context);
|
2019-09-03 05:48:02 +00:00
|
|
|
|
2020-02-18 19:03:40 +00:00
|
|
|
for (const auto & column : changed_columns)
|
|
|
|
stages.back().column_to_updated.emplace(
|
|
|
|
column, std::make_shared<ASTIdentifier>(column));
|
2019-08-05 18:06:05 +00:00
|
|
|
}
|
2020-02-18 19:03:40 +00:00
|
|
|
|
|
|
|
if (!unchanged_columns.empty())
|
2020-01-17 14:39:18 +00:00
|
|
|
{
|
2020-02-18 19:03:40 +00:00
|
|
|
if (!stages.empty())
|
|
|
|
{
|
|
|
|
std::vector<Stage> stages_copy;
|
|
|
|
/// Copy all filled stages except index calculation stage.
|
|
|
|
for (const auto & stage : stages)
|
|
|
|
{
|
|
|
|
stages_copy.emplace_back(context);
|
|
|
|
stages_copy.back().column_to_updated = stage.column_to_updated;
|
|
|
|
stages_copy.back().output_columns = stage.output_columns;
|
|
|
|
stages_copy.back().filters = stage.filters;
|
|
|
|
}
|
|
|
|
|
|
|
|
const ASTPtr select_query = prepareInterpreterSelectQuery(stages_copy, /* dry_run = */ true);
|
2020-07-06 01:57:10 +00:00
|
|
|
InterpreterSelectQuery interpreter{
|
|
|
|
select_query, context, storage, metadata_snapshot,
|
|
|
|
SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits()};
|
2020-02-18 19:03:40 +00:00
|
|
|
|
|
|
|
auto first_stage_header = interpreter.getSampleBlock();
|
2020-09-15 17:13:13 +00:00
|
|
|
QueryPlan plan;
|
|
|
|
auto source = std::make_shared<NullSource>(first_stage_header);
|
|
|
|
plan.addStep(std::make_unique<ReadFromPreparedSource>(Pipe(std::move(source))));
|
|
|
|
auto pipeline = addStreamsForLaterStages(stages_copy, plan);
|
|
|
|
updated_header = std::make_unique<Block>(pipeline->getHeader());
|
2020-02-18 19:03:40 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Special step to recalculate affected indices and TTL expressions.
|
|
|
|
stages.emplace_back(context);
|
|
|
|
for (const auto & column : unchanged_columns)
|
2020-02-17 20:39:24 +00:00
|
|
|
stages.back().column_to_updated.emplace(
|
2019-04-18 15:41:07 +00:00
|
|
|
column, std::make_shared<ASTIdentifier>(column));
|
2020-01-17 14:39:18 +00:00
|
|
|
}
|
2019-04-17 17:07:07 +00:00
|
|
|
}
|
|
|
|
|
2019-08-05 18:06:05 +00:00
|
|
|
is_prepared = true;
|
2019-08-05 03:24:59 +00:00
|
|
|
|
|
|
|
return prepareInterpreterSelectQuery(stages, dry_run);
|
2019-08-05 18:06:05 +00:00
|
|
|
}
|
|
|
|
|
2019-12-12 16:24:03 +00:00
|
|
|
ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> & prepared_stages, bool dry_run)
|
2019-08-05 18:06:05 +00:00
|
|
|
{
|
2020-06-17 16:39:58 +00:00
|
|
|
NamesAndTypesList all_columns = metadata_snapshot->getColumns().getAllPhysical();
|
2020-01-15 13:00:08 +00:00
|
|
|
|
2018-09-03 13:36:58 +00:00
|
|
|
/// Next, for each stage calculate columns changed by this and previous stages.
|
2019-08-05 18:22:44 +00:00
|
|
|
for (size_t i = 0; i < prepared_stages.size(); ++i)
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
2019-08-05 18:22:44 +00:00
|
|
|
if (!prepared_stages[i].filters.empty())
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
|
|
|
for (const auto & column : all_columns)
|
2019-08-05 18:22:44 +00:00
|
|
|
prepared_stages[i].output_columns.insert(column.name);
|
2018-09-03 13:36:58 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (i > 0)
|
2019-08-05 18:22:44 +00:00
|
|
|
prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns;
|
2018-09-03 13:36:58 +00:00
|
|
|
|
2019-08-05 18:22:44 +00:00
|
|
|
if (prepared_stages[i].output_columns.size() < all_columns.size())
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
2019-08-05 18:22:44 +00:00
|
|
|
for (const auto & kv : prepared_stages[i].column_to_updated)
|
|
|
|
prepared_stages[i].output_columns.insert(kv.first);
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Now, calculate `expressions_chain` for each stage except the first.
|
|
|
|
/// Do it backwards to propagate information about columns required as input for a stage to the previous stage.
|
2019-08-05 18:22:44 +00:00
|
|
|
for (size_t i = prepared_stages.size() - 1; i > 0; --i)
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
2019-08-05 18:22:44 +00:00
|
|
|
auto & stage = prepared_stages[i];
|
2018-09-03 13:36:58 +00:00
|
|
|
|
|
|
|
ASTPtr all_asts = std::make_shared<ASTExpressionList>();
|
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
for (const auto & ast : stage.filters)
|
|
|
|
all_asts->children.push_back(ast);
|
2018-09-03 13:36:58 +00:00
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
for (const auto & kv : stage.column_to_updated)
|
|
|
|
all_asts->children.push_back(kv.second);
|
2018-09-03 13:36:58 +00:00
|
|
|
|
|
|
|
/// Add all output columns to prevent ExpressionAnalyzer from deleting them from source columns.
|
|
|
|
for (const String & column : stage.output_columns)
|
|
|
|
all_asts->children.push_back(std::make_shared<ASTIdentifier>(column));
|
|
|
|
|
2020-11-10 10:23:46 +00:00
|
|
|
auto syntax_result = TreeRewriter(context).analyze(all_asts, all_columns, storage, metadata_snapshot);
|
2021-04-10 23:33:54 +00:00
|
|
|
if (context->hasQueryContext())
|
2020-03-27 15:10:27 +00:00
|
|
|
for (const auto & it : syntax_result->getScalars())
|
2021-04-10 23:33:54 +00:00
|
|
|
context->getQueryContext()->addScalar(it.first, it.second);
|
2020-03-27 15:10:27 +00:00
|
|
|
|
2018-11-08 17:28:52 +00:00
|
|
|
stage.analyzer = std::make_unique<ExpressionAnalyzer>(all_asts, syntax_result, context);
|
2018-09-03 13:36:58 +00:00
|
|
|
|
|
|
|
ExpressionActionsChain & actions_chain = stage.expressions_chain;
|
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
for (const auto & ast : stage.filters)
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
|
|
|
if (!actions_chain.steps.empty())
|
|
|
|
actions_chain.addStep();
|
2018-09-04 13:45:39 +00:00
|
|
|
stage.analyzer->appendExpression(actions_chain, ast, dry_run);
|
2018-09-07 15:44:51 +00:00
|
|
|
stage.filter_column_names.push_back(ast->getColumnName());
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
if (!stage.column_to_updated.empty())
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
|
|
|
if (!actions_chain.steps.empty())
|
|
|
|
actions_chain.addStep();
|
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
for (const auto & kv : stage.column_to_updated)
|
2018-09-04 13:45:39 +00:00
|
|
|
stage.analyzer->appendExpression(actions_chain, kv.second, dry_run);
|
2018-09-03 13:36:58 +00:00
|
|
|
|
2021-03-10 08:41:24 +00:00
|
|
|
auto & actions = actions_chain.getLastStep().actions();
|
|
|
|
|
2018-09-07 15:44:51 +00:00
|
|
|
for (const auto & kv : stage.column_to_updated)
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
2021-03-03 20:01:07 +00:00
|
|
|
auto column_name = kv.second->getColumnName();
|
2021-03-11 17:03:39 +00:00
|
|
|
const auto & dag_node = actions->findInIndex(column_name);
|
|
|
|
const auto & alias = actions->addAlias(dag_node, kv.first);
|
|
|
|
actions->addOrReplaceInIndex(alias);
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Remove all intermediate columns.
|
|
|
|
actions_chain.addStep();
|
2021-03-10 08:41:24 +00:00
|
|
|
actions_chain.getLastStep().required_output.clear();
|
|
|
|
ActionsDAG::NodeRawConstPtrs new_index;
|
2021-03-05 13:56:44 +00:00
|
|
|
for (const auto & name : stage.output_columns)
|
|
|
|
actions_chain.getLastStep().addRequiredOutput(name);
|
2021-03-10 08:41:24 +00:00
|
|
|
|
2020-09-13 13:51:31 +00:00
|
|
|
actions_chain.getLastActions();
|
2018-09-03 13:36:58 +00:00
|
|
|
|
|
|
|
actions_chain.finalize();
|
|
|
|
|
|
|
|
/// Propagate information about columns needed as input.
|
2020-09-11 12:24:41 +00:00
|
|
|
for (const auto & column : actions_chain.steps.front()->getRequiredColumns())
|
2019-08-05 18:22:44 +00:00
|
|
|
prepared_stages[i - 1].output_columns.insert(column.name);
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Execute first stage as a SELECT statement.
|
|
|
|
|
|
|
|
auto select = std::make_shared<ASTSelectQuery>();
|
|
|
|
|
2019-04-09 14:22:35 +00:00
|
|
|
select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
|
2019-08-05 18:22:44 +00:00
|
|
|
for (const auto & column_name : prepared_stages[0].output_columns)
|
2019-04-09 14:22:35 +00:00
|
|
|
select->select()->children.push_back(std::make_shared<ASTIdentifier>(column_name));
|
2018-09-03 13:36:58 +00:00
|
|
|
|
2020-11-03 11:28:28 +00:00
|
|
|
/// Don't let select list be empty.
|
|
|
|
if (select->select()->children.empty())
|
|
|
|
select->select()->children.push_back(std::make_shared<ASTLiteral>(Field(0)));
|
|
|
|
|
2019-08-05 18:22:44 +00:00
|
|
|
if (!prepared_stages[0].filters.empty())
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
|
|
|
ASTPtr where_expression;
|
2019-08-05 18:22:44 +00:00
|
|
|
if (prepared_stages[0].filters.size() == 1)
|
|
|
|
where_expression = prepared_stages[0].filters[0];
|
2018-09-03 13:36:58 +00:00
|
|
|
else
|
|
|
|
{
|
|
|
|
auto coalesced_predicates = std::make_shared<ASTFunction>();
|
|
|
|
coalesced_predicates->name = "and";
|
|
|
|
coalesced_predicates->arguments = std::make_shared<ASTExpressionList>();
|
|
|
|
coalesced_predicates->children.push_back(coalesced_predicates->arguments);
|
2019-08-05 18:22:44 +00:00
|
|
|
coalesced_predicates->arguments->children = prepared_stages[0].filters;
|
2018-09-03 13:36:58 +00:00
|
|
|
where_expression = std::move(coalesced_predicates);
|
|
|
|
}
|
2019-04-09 14:22:35 +00:00
|
|
|
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
|
2019-08-05 03:24:59 +00:00
|
|
|
return select;
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
|
2020-09-15 17:13:13 +00:00
|
|
|
QueryPipelinePtr MutationsInterpreter::addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, QueryPlan & plan) const
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
2019-08-05 18:22:44 +00:00
|
|
|
for (size_t i_stage = 1; i_stage < prepared_stages.size(); ++i_stage)
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
2019-08-05 18:22:44 +00:00
|
|
|
const Stage & stage = prepared_stages[i_stage];
|
2018-09-03 13:36:58 +00:00
|
|
|
|
|
|
|
for (size_t i = 0; i < stage.expressions_chain.steps.size(); ++i)
|
|
|
|
{
|
|
|
|
const auto & step = stage.expressions_chain.steps[i];
|
2018-09-07 15:44:51 +00:00
|
|
|
if (i < stage.filter_column_names.size())
|
2018-09-03 13:36:58 +00:00
|
|
|
{
|
2018-09-07 15:44:51 +00:00
|
|
|
/// Execute DELETEs.
|
2021-03-04 17:38:12 +00:00
|
|
|
plan.addStep(std::make_unique<FilterStep>(plan.getCurrentDataStream(), step->actions(), stage.filter_column_names[i], false));
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// Execute UPDATE or final projection.
|
2021-03-04 17:38:12 +00:00
|
|
|
plan.addStep(std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), step->actions()));
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
}
|
2018-09-04 11:38:41 +00:00
|
|
|
|
2020-09-02 13:04:54 +00:00
|
|
|
SubqueriesForSets & subqueries_for_sets = stage.analyzer->getSubqueriesForSets();
|
2018-09-04 11:38:41 +00:00
|
|
|
if (!subqueries_for_sets.empty())
|
2020-08-31 12:45:20 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
const Settings & settings = context->getSettingsRef();
|
2020-08-31 12:45:20 +00:00
|
|
|
SizeLimits network_transfer_limits(
|
|
|
|
settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode);
|
2020-09-15 17:13:13 +00:00
|
|
|
addCreatingSetsStep(plan, std::move(subqueries_for_sets), network_transfer_limits, context);
|
2020-08-31 12:45:20 +00:00
|
|
|
}
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
|
2021-03-04 17:38:12 +00:00
|
|
|
auto pipeline = plan.buildQueryPipeline(
|
|
|
|
QueryPlanOptimizationSettings::fromContext(context),
|
|
|
|
BuildQueryPipelineSettings::fromContext(context));
|
|
|
|
|
2020-09-15 17:13:13 +00:00
|
|
|
pipeline->addSimpleTransform([&](const Block & header)
|
2020-08-31 12:45:20 +00:00
|
|
|
{
|
|
|
|
return std::make_shared<MaterializingTransform>(header);
|
|
|
|
});
|
2020-09-15 17:13:13 +00:00
|
|
|
|
|
|
|
return pipeline;
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|
|
|
|
|
2020-06-18 16:10:47 +00:00
|
|
|
void MutationsInterpreter::validate()
|
2018-09-04 13:45:39 +00:00
|
|
|
{
|
2021-03-05 12:32:55 +00:00
|
|
|
if (!select_interpreter)
|
|
|
|
select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context, storage, metadata_snapshot, select_limits);
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
const Settings & settings = context->getSettingsRef();
|
2020-04-10 20:01:10 +00:00
|
|
|
|
2019-10-10 14:38:08 +00:00
|
|
|
/// For Replicated* storages mutations cannot employ non-deterministic functions
|
|
|
|
/// because that produces inconsistencies between replicas
|
2020-04-10 20:01:10 +00:00
|
|
|
if (startsWith(storage->getName(), "Replicated") && !settings.allow_nondeterministic_mutations)
|
2019-10-10 14:38:08 +00:00
|
|
|
{
|
|
|
|
for (const auto & command : commands)
|
|
|
|
{
|
2020-07-06 02:07:55 +00:00
|
|
|
const auto nondeterministic_func_name = findFirstNonDeterministicFunctionName(command, context);
|
2019-10-10 14:38:08 +00:00
|
|
|
if (nondeterministic_func_name)
|
|
|
|
throw Exception(
|
|
|
|
"ALTER UPDATE/ALTER DELETE statements must use only deterministic functions! "
|
|
|
|
"Function '" + *nondeterministic_func_name + "' is non-deterministic",
|
|
|
|
ErrorCodes::BAD_ARGUMENTS);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-15 17:13:13 +00:00
|
|
|
QueryPlan plan;
|
|
|
|
select_interpreter->buildQueryPlan(plan);
|
2020-09-16 08:58:27 +00:00
|
|
|
auto pipeline = addStreamsForLaterStages(stages, plan);
|
2018-09-04 13:45:39 +00:00
|
|
|
}
|
|
|
|
|
2020-06-18 16:10:47 +00:00
|
|
|
BlockInputStreamPtr MutationsInterpreter::execute()
|
2018-09-04 13:45:39 +00:00
|
|
|
{
|
2019-11-06 17:28:03 +00:00
|
|
|
if (!can_execute)
|
|
|
|
throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2021-03-05 12:32:55 +00:00
|
|
|
if (!select_interpreter)
|
|
|
|
select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context, storage, metadata_snapshot, select_limits);
|
|
|
|
|
2020-09-15 17:13:13 +00:00
|
|
|
QueryPlan plan;
|
|
|
|
select_interpreter->buildQueryPlan(plan);
|
2020-03-26 15:59:09 +00:00
|
|
|
|
2020-09-16 08:58:27 +00:00
|
|
|
auto pipeline = addStreamsForLaterStages(stages, plan);
|
2020-09-15 17:13:13 +00:00
|
|
|
BlockInputStreamPtr result_stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(*pipeline));
|
2020-03-26 15:59:09 +00:00
|
|
|
|
|
|
|
/// Sometimes we update just part of columns (for example UPDATE mutation)
|
|
|
|
/// in this case we don't read sorting key, so just we don't check anything.
|
|
|
|
if (auto sort_desc = getStorageSortDescriptionIfPossible(result_stream->getHeader()))
|
|
|
|
result_stream = std::make_shared<CheckSortedBlockInputStream>(result_stream, *sort_desc);
|
|
|
|
|
2019-08-05 18:06:05 +00:00
|
|
|
if (!updated_header)
|
|
|
|
updated_header = std::make_unique<Block>(result_stream->getHeader());
|
2020-03-26 15:59:09 +00:00
|
|
|
|
2019-08-05 18:06:05 +00:00
|
|
|
return result_stream;
|
|
|
|
}
|
|
|
|
|
|
|
|
const Block & MutationsInterpreter::getUpdatedHeader() const
|
|
|
|
{
|
|
|
|
return *updated_header;
|
2018-09-04 13:45:39 +00:00
|
|
|
}
|
|
|
|
|
2019-08-05 08:36:41 +00:00
|
|
|
|
|
|
|
size_t MutationsInterpreter::evaluateCommandsSize()
|
|
|
|
{
|
2019-09-03 05:48:02 +00:00
|
|
|
for (const MutationCommand & command : commands)
|
2020-11-10 10:23:46 +00:00
|
|
|
if (unlikely(!command.predicate && !command.partition)) /// The command touches all rows.
|
2019-11-06 17:28:03 +00:00
|
|
|
return mutation_ast->size();
|
2019-09-03 05:48:02 +00:00
|
|
|
|
2020-11-10 10:23:46 +00:00
|
|
|
return std::max(prepareQueryAffectedAST(commands, storage, context)->size(), mutation_ast->size());
|
2019-08-05 03:24:59 +00:00
|
|
|
}
|
|
|
|
|
2020-03-26 15:59:09 +00:00
|
|
|
std::optional<SortDescription> MutationsInterpreter::getStorageSortDescriptionIfPossible(const Block & header) const
|
|
|
|
{
|
2020-06-17 11:05:11 +00:00
|
|
|
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
|
2020-03-26 15:59:09 +00:00
|
|
|
SortDescription sort_description;
|
|
|
|
size_t sort_columns_size = sort_columns.size();
|
|
|
|
sort_description.reserve(sort_columns_size);
|
|
|
|
|
|
|
|
for (size_t i = 0; i < sort_columns_size; ++i)
|
|
|
|
{
|
|
|
|
if (header.has(sort_columns[i]))
|
|
|
|
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
|
|
|
|
else
|
|
|
|
return {};
|
|
|
|
}
|
|
|
|
|
|
|
|
return sort_description;
|
|
|
|
}
|
|
|
|
|
2020-11-10 10:23:46 +00:00
|
|
|
ASTPtr MutationsInterpreter::getPartitionAndPredicateExpressionForMutationCommand(const MutationCommand & command) const
|
|
|
|
{
|
|
|
|
return DB::getPartitionAndPredicateExpressionForMutationCommand(command, storage, context);
|
|
|
|
}
|
|
|
|
|
2020-07-26 14:21:57 +00:00
|
|
|
bool MutationsInterpreter::Stage::isAffectingAllColumns(const Names & storage_columns) const
|
|
|
|
{
|
|
|
|
/// is subset
|
|
|
|
for (const auto & storage_column : storage_columns)
|
2020-07-27 13:04:13 +00:00
|
|
|
if (!output_columns.count(storage_column))
|
2020-07-26 14:21:57 +00:00
|
|
|
return false;
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool MutationsInterpreter::isAffectingAllColumns() const
|
|
|
|
{
|
|
|
|
auto storage_columns = metadata_snapshot->getColumns().getNamesOfPhysical();
|
2020-07-27 09:42:37 +00:00
|
|
|
if (stages.empty())
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation interpreter has no stages");
|
|
|
|
|
|
|
|
return stages.back().isAffectingAllColumns(storage_columns);
|
2020-07-26 14:21:57 +00:00
|
|
|
}
|
|
|
|
|
2018-09-03 13:36:58 +00:00
|
|
|
}
|