More safier, but still ugly mutations interpreter

This commit is contained in:
alesapin 2019-11-06 20:28:03 +03:00
parent 233212ac68
commit 8c7deaa70d
8 changed files with 69 additions and 21 deletions

View File

@ -65,7 +65,7 @@ public:
{
/// Choose JSONParser.
#if USE_SIMDJSON
if (context.getSettings().allow_simdjson && Cpu::CpuFlagsCache::have_SSE42 && Cpu::CpuFlagsCache::have_PCLMUL)
if (context.getSettingsRef().allow_simdjson && Cpu::CpuFlagsCache::have_SSE42 && Cpu::CpuFlagsCache::have_PCLMUL)
{
Executor<SimdJSONParser>::run(block, arguments, result_pos, input_rows_count);
return;

View File

@ -75,7 +75,7 @@ BlockIO InterpreterAlterQuery::execute()
if (!mutation_commands.empty())
{
auto table_lock_holder = table->lockStructureForShare(false /* because mutation is executed asyncronously */, context.getCurrentQueryId());
MutationsInterpreter(table, mutation_commands, context).validate(table_lock_holder);
MutationsInterpreter(table, mutation_commands, context, false).validate(table_lock_holder);
table->mutate(mutation_commands, context);
}

View File

@ -91,6 +91,21 @@ std::optional<String> findFirstNonDeterministicFuncName(const MutationCommand &
}
};
MutationsInterpreter::MutationsInterpreter(
StoragePtr storage_,
std::vector<MutationCommand> commands_,
const Context & context_,
bool can_execute_)
: storage(std::move(storage_))
, commands(std::move(commands_))
, context(context_)
, can_execute(can_execute_)
{
mutation_ast = prepare(!can_execute);
auto limits = SelectQueryOptions().analyze(!can_execute).ignoreLimits();
select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context, storage, limits);
}
bool MutationsInterpreter::isStorageTouchedByMutations() const
{
@ -103,12 +118,17 @@ bool MutationsInterpreter::isStorageTouchedByMutations() const
return true;
}
auto context_copy = context;
Context context_copy = context;
context_copy.getSettingsRef().merge_tree_uniform_read_distribution = 0;
context_copy.getSettingsRef().max_threads = 1;
const ASTPtr & select_query = prepareQueryAffectedAST();
BlockInputStreamPtr in = InterpreterSelectQuery(select_query, context_copy, storage, SelectQueryOptions().ignoreLimits()).execute().in;
ASTPtr select_query = prepareQueryAffectedAST();
/// Interpreter must be alive, when we use result of execute() method.
/// For some reason it may copy context and and give it into ExpressionBlockInputStream
/// after that we will use context from destroyed stack frame in our stream.
InterpreterSelectQuery interpreter(select_query, context_copy, storage, SelectQueryOptions().ignoreLimits());
BlockInputStreamPtr in = interpreter.execute().in;
Block block = in->read();
if (!block.rows())
@ -520,19 +540,18 @@ void MutationsInterpreter::validate(TableStructureReadLockHolder &)
}
}
const auto & select_query = prepare(/* dry_run = */ true);
InterpreterSelectQuery interpreter{select_query, context, storage, SelectQueryOptions().analyze(/* dry_run = */ true).ignoreLimits()};
/// Do not use getSampleBlock in order to check the whole pipeline.
Block first_stage_header = interpreter.execute().in->getHeader();
Block first_stage_header = select_interpreter->execute().in->getHeader();
BlockInputStreamPtr in = std::make_shared<NullBlockInputStream>(first_stage_header);
addStreamsForLaterStages(stages, in)->getHeader();
}
BlockInputStreamPtr MutationsInterpreter::execute(TableStructureReadLockHolder &)
{
const auto & select_query = prepare(/* dry_run = */ false);
InterpreterSelectQuery interpreter{select_query, context, storage, SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits()};
BlockInputStreamPtr in = interpreter.execute().in;
if (!can_execute)
throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR);
BlockInputStreamPtr in = select_interpreter->execute().in;
auto result_stream = addStreamsForLaterStages(stages, in);
if (!updated_header)
updated_header = std::make_unique<Block>(result_stream->getHeader());
@ -581,9 +600,9 @@ size_t MutationsInterpreter::evaluateCommandsSize()
{
for (const MutationCommand & command : commands)
if (unlikely(!command.predicate)) /// The command touches all rows.
return prepare(/* dry_run = */ true)->size();
return mutation_ast->size();
return std::max(prepareQueryAffectedAST()->size(), prepare(/* dry_run = */ true)->size());
return std::max(prepareQueryAffectedAST()->size(), mutation_ast->size());
}
}

View File

@ -18,12 +18,9 @@ class Context;
class MutationsInterpreter
{
public:
MutationsInterpreter(StoragePtr storage_, std::vector<MutationCommand> commands_, const Context & context_)
: storage(std::move(storage_))
, commands(std::move(commands_))
, context(context_)
{
}
/// Storage to mutate, array of mutations commands and context. If you really want to execute mutation
/// use can_execute = true, in other cases (validation, amount of commands) it can be false
MutationsInterpreter(StoragePtr storage_, std::vector<MutationCommand> commands_, const Context & context_, bool can_execute_);
void validate(TableStructureReadLockHolder & table_lock_holder);
@ -50,6 +47,13 @@ private:
StoragePtr storage;
std::vector<MutationCommand> commands;
const Context & context;
bool can_execute;
ASTPtr mutation_ast;
/// We have to store interpreter because it use own copy of context
/// and some streams from execute method may use it.
std::unique_ptr<InterpreterSelectQuery> select_interpreter;
/// A sequence of mutation commands is executed as a sequence of stages. Each stage consists of several
/// filters, followed by updating values of some columns. Commands can reuse expressions calculated by the

View File

@ -943,7 +943,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
command.partition, context_for_reading);
});
MutationsInterpreter mutations_interpreter(storage_from_source_part, commands_for_part, context_for_reading);
MutationsInterpreter mutations_interpreter(storage_from_source_part, commands_for_part, context_for_reading, true);
if (!mutations_interpreter.isStorageTouchedByMutations())
{

View File

@ -689,7 +689,7 @@ bool StorageMergeTree::tryMutatePart()
size_t current_ast_elements = 0;
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
MutationsInterpreter interpreter(shared_from_this(), it->second.commands, global_context);
MutationsInterpreter interpreter(shared_from_this(), it->second.commands, global_context, false);
size_t commands_size = interpreter.evaluateCommandsSize();
if (current_ast_elements + commands_size >= max_ast_elements)

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
. $CURDIR/mergetree_mutations.lib
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS json_test"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE json_test (id UInt32, metadata String) ENGINE = MergeTree() ORDER BY id"
${CLICKHOUSE_CLIENT} --query="INSERT INTO json_test VALUES (1, '{\"date\": \"2018-01-01\", \"task_id\": \"billing_history__billing_history.load_history_payments_data__20180101\"}'), (2, '{\"date\": \"2018-01-02\", \"task_id\": \"billing_history__billing_history.load_history_payments_data__20180101\"}')"
${CLICKHOUSE_CLIENT} --query="SELECT COUNT() FROM json_test"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE json_test DELETE WHERE JSONExtractString(metadata, 'date') = '2018-01-01'"
wait_for_mutation "json_test" "mutation_2.txt"
${CLICKHOUSE_CLIENT} --query="SELECT COUNT() FROM json_test"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS json_test"