diff --git a/dbms/src/Functions/FunctionsJSON.h b/dbms/src/Functions/FunctionsJSON.h index 08f42dd2b05..85088bed61c 100644 --- a/dbms/src/Functions/FunctionsJSON.h +++ b/dbms/src/Functions/FunctionsJSON.h @@ -65,7 +65,7 @@ public: { /// Choose JSONParser. #if USE_SIMDJSON - if (context.getSettings().allow_simdjson && Cpu::CpuFlagsCache::have_SSE42 && Cpu::CpuFlagsCache::have_PCLMUL) + if (context.getSettingsRef().allow_simdjson && Cpu::CpuFlagsCache::have_SSE42 && Cpu::CpuFlagsCache::have_PCLMUL) { Executor::run(block, arguments, result_pos, input_rows_count); return; diff --git a/dbms/src/Interpreters/InterpreterAlterQuery.cpp b/dbms/src/Interpreters/InterpreterAlterQuery.cpp index 5798e182b69..94d27a7157b 100644 --- a/dbms/src/Interpreters/InterpreterAlterQuery.cpp +++ b/dbms/src/Interpreters/InterpreterAlterQuery.cpp @@ -75,7 +75,7 @@ BlockIO InterpreterAlterQuery::execute() if (!mutation_commands.empty()) { auto table_lock_holder = table->lockStructureForShare(false /* because mutation is executed asyncronously */, context.getCurrentQueryId()); - MutationsInterpreter(table, mutation_commands, context).validate(table_lock_holder); + MutationsInterpreter(table, mutation_commands, context, false).validate(table_lock_holder); table->mutate(mutation_commands, context); } diff --git a/dbms/src/Interpreters/MutationsInterpreter.cpp b/dbms/src/Interpreters/MutationsInterpreter.cpp index 2641ab2a5c4..9c6982d6f22 100644 --- a/dbms/src/Interpreters/MutationsInterpreter.cpp +++ b/dbms/src/Interpreters/MutationsInterpreter.cpp @@ -91,6 +91,21 @@ std::optional findFirstNonDeterministicFuncName(const MutationCommand & } }; +MutationsInterpreter::MutationsInterpreter( + StoragePtr storage_, + std::vector commands_, + const Context & context_, + bool can_execute_) + : storage(std::move(storage_)) + , commands(std::move(commands_)) + , context(context_) + , can_execute(can_execute_) +{ + mutation_ast = prepare(!can_execute); + auto limits = SelectQueryOptions().analyze(!can_execute).ignoreLimits(); + select_interpreter = std::make_unique(mutation_ast, context, storage, limits); +} + bool MutationsInterpreter::isStorageTouchedByMutations() const { @@ -103,12 +118,17 @@ bool MutationsInterpreter::isStorageTouchedByMutations() const return true; } - auto context_copy = context; + Context context_copy = context; context_copy.getSettingsRef().merge_tree_uniform_read_distribution = 0; context_copy.getSettingsRef().max_threads = 1; - const ASTPtr & select_query = prepareQueryAffectedAST(); - BlockInputStreamPtr in = InterpreterSelectQuery(select_query, context_copy, storage, SelectQueryOptions().ignoreLimits()).execute().in; + ASTPtr select_query = prepareQueryAffectedAST(); + + /// Interpreter must be alive, when we use result of execute() method. + /// For some reason it may copy context and and give it into ExpressionBlockInputStream + /// after that we will use context from destroyed stack frame in our stream. + InterpreterSelectQuery interpreter(select_query, context_copy, storage, SelectQueryOptions().ignoreLimits()); + BlockInputStreamPtr in = interpreter.execute().in; Block block = in->read(); if (!block.rows()) @@ -520,19 +540,18 @@ void MutationsInterpreter::validate(TableStructureReadLockHolder &) } } - const auto & select_query = prepare(/* dry_run = */ true); - InterpreterSelectQuery interpreter{select_query, context, storage, SelectQueryOptions().analyze(/* dry_run = */ true).ignoreLimits()}; /// Do not use getSampleBlock in order to check the whole pipeline. - Block first_stage_header = interpreter.execute().in->getHeader(); + Block first_stage_header = select_interpreter->execute().in->getHeader(); BlockInputStreamPtr in = std::make_shared(first_stage_header); addStreamsForLaterStages(stages, in)->getHeader(); } BlockInputStreamPtr MutationsInterpreter::execute(TableStructureReadLockHolder &) { - const auto & select_query = prepare(/* dry_run = */ false); - InterpreterSelectQuery interpreter{select_query, context, storage, SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits()}; - BlockInputStreamPtr in = interpreter.execute().in; + if (!can_execute) + throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR); + + BlockInputStreamPtr in = select_interpreter->execute().in; auto result_stream = addStreamsForLaterStages(stages, in); if (!updated_header) updated_header = std::make_unique(result_stream->getHeader()); @@ -581,9 +600,9 @@ size_t MutationsInterpreter::evaluateCommandsSize() { for (const MutationCommand & command : commands) if (unlikely(!command.predicate)) /// The command touches all rows. - return prepare(/* dry_run = */ true)->size(); + return mutation_ast->size(); - return std::max(prepareQueryAffectedAST()->size(), prepare(/* dry_run = */ true)->size()); + return std::max(prepareQueryAffectedAST()->size(), mutation_ast->size()); } } diff --git a/dbms/src/Interpreters/MutationsInterpreter.h b/dbms/src/Interpreters/MutationsInterpreter.h index 5df5b1ca2f7..ff9a8ddccc3 100644 --- a/dbms/src/Interpreters/MutationsInterpreter.h +++ b/dbms/src/Interpreters/MutationsInterpreter.h @@ -18,12 +18,9 @@ class Context; class MutationsInterpreter { public: - MutationsInterpreter(StoragePtr storage_, std::vector commands_, const Context & context_) - : storage(std::move(storage_)) - , commands(std::move(commands_)) - , context(context_) - { - } + /// Storage to mutate, array of mutations commands and context. If you really want to execute mutation + /// use can_execute = true, in other cases (validation, amount of commands) it can be false + MutationsInterpreter(StoragePtr storage_, std::vector commands_, const Context & context_, bool can_execute_); void validate(TableStructureReadLockHolder & table_lock_holder); @@ -50,6 +47,13 @@ private: StoragePtr storage; std::vector commands; const Context & context; + bool can_execute; + + ASTPtr mutation_ast; + + /// We have to store interpreter because it use own copy of context + /// and some streams from execute method may use it. + std::unique_ptr select_interpreter; /// A sequence of mutation commands is executed as a sequence of stages. Each stage consists of several /// filters, followed by updating values of some columns. Commands can reuse expressions calculated by the diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index a4332f7df56..47a796fbb3c 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -943,7 +943,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor command.partition, context_for_reading); }); - MutationsInterpreter mutations_interpreter(storage_from_source_part, commands_for_part, context_for_reading); + MutationsInterpreter mutations_interpreter(storage_from_source_part, commands_for_part, context_for_reading, true); if (!mutations_interpreter.isStorageTouchedByMutations()) { diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index bb3f70afd79..b9f5f8dfeda 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -689,7 +689,7 @@ bool StorageMergeTree::tryMutatePart() size_t current_ast_elements = 0; for (auto it = mutations_begin_it; it != mutations_end_it; ++it) { - MutationsInterpreter interpreter(shared_from_this(), it->second.commands, global_context); + MutationsInterpreter interpreter(shared_from_this(), it->second.commands, global_context, false); size_t commands_size = interpreter.evaluateCommandsSize(); if (current_ast_elements + commands_size >= max_ast_elements) diff --git a/dbms/tests/queries/0_stateless/01031_mutations_interpreter_and_context.reference b/dbms/tests/queries/0_stateless/01031_mutations_interpreter_and_context.reference new file mode 100644 index 00000000000..5f1d0ecea5d --- /dev/null +++ b/dbms/tests/queries/0_stateless/01031_mutations_interpreter_and_context.reference @@ -0,0 +1,2 @@ +2 +1 diff --git a/dbms/tests/queries/0_stateless/01031_mutations_interpreter_and_context.sh b/dbms/tests/queries/0_stateless/01031_mutations_interpreter_and_context.sh new file mode 100755 index 00000000000..8ac912bfa0c --- /dev/null +++ b/dbms/tests/queries/0_stateless/01031_mutations_interpreter_and_context.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +. $CURDIR/mergetree_mutations.lib + + +${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS json_test" + +${CLICKHOUSE_CLIENT} --query="CREATE TABLE json_test (id UInt32, metadata String) ENGINE = MergeTree() ORDER BY id" + +${CLICKHOUSE_CLIENT} --query="INSERT INTO json_test VALUES (1, '{\"date\": \"2018-01-01\", \"task_id\": \"billing_history__billing_history.load_history_payments_data__20180101\"}'), (2, '{\"date\": \"2018-01-02\", \"task_id\": \"billing_history__billing_history.load_history_payments_data__20180101\"}')" + +${CLICKHOUSE_CLIENT} --query="SELECT COUNT() FROM json_test" + +${CLICKHOUSE_CLIENT} --query="ALTER TABLE json_test DELETE WHERE JSONExtractString(metadata, 'date') = '2018-01-01'" + +wait_for_mutation "json_test" "mutation_2.txt" + +${CLICKHOUSE_CLIENT} --query="SELECT COUNT() FROM json_test" + +${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS json_test"