Disable analyzer in WindowView correctly

This commit is contained in:
Dmitry Novik 2024-09-17 18:10:56 +02:00
parent 6eed426b14
commit 17e4616924
4 changed files with 48 additions and 34 deletions

View File

@ -79,6 +79,19 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD;
}
namespace
{
ContextPtr getCorrectContext()
{
auto result = Context::createCopy(Context::getGlobalContextInstance());
result->setSetting("allow_experimental_analyzer", false);
return result;
}
}
namespace
{
/// Fetch all window info and replace tumble or hop node names with windowID
@ -428,7 +441,7 @@ ASTPtr StorageWindowView::getCleanupQuery()
void StorageWindowView::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
{
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Truncate, getContext(), local_context, inner_table_id, true);
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Truncate, getCorrectContext(), local_context, inner_table_id, true);
}
bool StorageWindowView::optimize(
@ -473,7 +486,7 @@ void StorageWindowView::alter(
output_header.clear();
InterpreterDropQuery::executeDropQuery(
ASTDropQuery::Kind::Drop, getContext(), local_context, inner_table_id, true);
ASTDropQuery::Kind::Drop, getCorrectContext(), local_context, inner_table_id, true);
/// create inner table
auto create_context = Context::createCopy(local_context);
@ -486,8 +499,8 @@ void StorageWindowView::alter(
shutdown_called = false;
clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); });
fire_task = getContext()->getSchedulePool().createTask(
clean_cache_task = getCorrectContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); });
fire_task = getCorrectContext()->getSchedulePool().createTask(
getStorageID().getFullTableName(), [this] { is_proctime ? threadFuncFireProc() : threadFuncFireEvent(); });
clean_cache_task->deactivate();
fire_task->deactivate();
@ -530,7 +543,7 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
auto inner_table = getInnerTable();
InterpreterSelectQuery fetch(
inner_fetch_query,
getContext(),
getCorrectContext(),
inner_table,
inner_table->getInMemoryMetadataPtr(),
SelectQueryOptions(QueryProcessingStage::FetchColumns));
@ -564,8 +577,8 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
filter_function = makeASTFunction("has", func_array, std::make_shared<ASTIdentifier>(window_id_name));
}
auto syntax_result = TreeRewriter(getContext()).analyze(filter_function, builder.getHeader().getNamesAndTypesList());
auto filter_expression = ExpressionAnalyzer(filter_function, syntax_result, getContext()).getActionsDAG(false);
auto syntax_result = TreeRewriter(getCorrectContext()).analyze(filter_function, builder.getHeader().getNamesAndTypesList());
auto filter_expression = ExpressionAnalyzer(filter_function, syntax_result, getCorrectContext()).getActionsDAG(false);
auto filter_actions = std::make_shared<ExpressionActions>(std::move(filter_expression));
builder.addSimpleTransform([&](const Block & header)
@ -581,7 +594,7 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
column.column = column.type->createColumnConst(0, Tuple{w_start, watermark});
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_actions
= std::make_shared<ExpressionActions>(std::move(adding_column_dag), ExpressionActionsSettings::fromContext(getContext()));
= std::make_shared<ExpressionActions>(std::move(adding_column_dag), ExpressionActionsSettings::fromContext(getCorrectContext()));
builder.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, adding_column_actions);
@ -595,7 +608,7 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
new_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto actions = std::make_shared<ExpressionActions>(
std::move(convert_actions_dag), ExpressionActionsSettings::fromContext(getContext(), CompileExpressions::yes));
std::move(convert_actions_dag), ExpressionActionsSettings::fromContext(getCorrectContext(), CompileExpressions::yes));
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
@ -619,11 +632,11 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
return StorageBlocks::createStorage(blocks_id_global, required_columns, std::move(pipes), QueryProcessingStage::WithMergeableState);
};
TemporaryTableHolder blocks_storage(getContext(), creator);
TemporaryTableHolder blocks_storage(getCorrectContext(), creator);
InterpreterSelectQuery select(
getFinalQuery(),
getContext(),
getCorrectContext(),
blocks_storage.getTable(),
blocks_storage.getTable()->getInMemoryMetadataPtr(),
SelectQueryOptions(QueryProcessingStage::Complete));
@ -638,8 +651,8 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
{
return std::make_shared<SquashingTransform>(
current_header,
getContext()->getSettingsRef().min_insert_block_size_rows,
getContext()->getSettingsRef().min_insert_block_size_bytes);
getCorrectContext()->getSettingsRef().min_insert_block_size_rows,
getCorrectContext()->getSettingsRef().min_insert_block_size_bytes);
});
auto header = builder.getHeader();
@ -694,7 +707,7 @@ inline void StorageWindowView::fire(UInt32 watermark)
insert->table_id = target_table->getStorageID();
InterpreterInsertQuery interpreter(
insert,
getContext(),
getCorrectContext(),
/* allow_materialized */ false,
/* no_squash */ false,
/* no_destination */ false,
@ -707,8 +720,8 @@ inline void StorageWindowView::fire(UInt32 watermark)
pipe.getHeader(),
block_io.pipeline.getHeader().getNamesAndTypesList(),
getTargetTable()->getInMemoryMetadataPtr()->getColumns(),
getContext(),
getContext()->getSettingsRef().insert_null_as_default);
getCorrectContext(),
getCorrectContext()->getSettingsRef().insert_null_as_default);
auto adding_missing_defaults_actions = std::make_shared<ExpressionActions>(std::move(adding_missing_defaults_dag));
pipe.addSimpleTransform([&](const Block & stream_header)
{
@ -721,7 +734,7 @@ inline void StorageWindowView::fire(UInt32 watermark)
ActionsDAG::MatchColumnsMode::Position);
auto actions = std::make_shared<ExpressionActions>(
std::move(convert_actions_dag),
ExpressionActionsSettings::fromContext(getContext(), CompileExpressions::yes));
ExpressionActionsSettings::fromContext(getCorrectContext(), CompileExpressions::yes));
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
@ -742,7 +755,7 @@ ASTPtr StorageWindowView::getSourceTableSelectQuery()
if (hasJoin(modified_select))
{
auto analyzer_res = TreeRewriterResult({});
removeJoin(modified_select, analyzer_res, getContext());
removeJoin(modified_select, analyzer_res, getCorrectContext());
}
else
{
@ -791,13 +804,13 @@ ASTPtr StorageWindowView::getInnerTableCreateQuery(const ASTPtr & inner_query, c
Aliases aliases;
QueryAliasesVisitor(aliases).visit(inner_query);
auto inner_query_normalized = inner_query->clone();
QueryNormalizer::Data normalizer_data(aliases, {}, false, getContext()->getSettingsRef(), false);
QueryNormalizer::Data normalizer_data(aliases, {}, false, getCorrectContext()->getSettingsRef(), false);
QueryNormalizer(normalizer_data).visit(inner_query_normalized);
auto inner_select_query = std::static_pointer_cast<ASTSelectQuery>(inner_query_normalized);
auto t_sample_block
= InterpreterSelectQuery(inner_select_query, getContext(), SelectQueryOptions(QueryProcessingStage::WithMergeableState))
= InterpreterSelectQuery(inner_select_query, getCorrectContext(), SelectQueryOptions(QueryProcessingStage::WithMergeableState))
.getSampleBlock();
ASTPtr columns_list = InterpreterCreateQuery::formatColumns(t_sample_block.getNamesAndTypesList());
@ -1015,7 +1028,7 @@ void StorageWindowView::cleanup()
std::lock_guard mutex_lock(mutex);
auto alter_query = getCleanupQuery();
auto cleanup_context = Context::createCopy(getContext());
auto cleanup_context = Context::createCopy(getCorrectContext());
cleanup_context->makeQueryContext();
cleanup_context->setCurrentQueryId("");
cleanup_context->setQueryKindReplicatedDatabaseInternal();
@ -1214,7 +1227,6 @@ StorageWindowView::StorageWindowView(
const String & comment,
LoadingStrictnessLevel mode)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, log(getLogger(fmt::format("StorageWindowView({}.{})", table_id_.database_name, table_id_.table_name)))
, fire_signal_timeout_s(context_->getSettingsRef().wait_for_window_view_fire_signal_timeout.totalSeconds())
, clean_interval_usec(context_->getSettingsRef().window_view_clean_interval.totalMicroseconds())
@ -1294,8 +1306,8 @@ StorageWindowView::StorageWindowView(
if (disabled_due_to_analyzer)
return;
clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); });
fire_task = getContext()->getSchedulePool().createTask(
clean_cache_task = getCorrectContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); });
fire_task = getCorrectContext()->getSchedulePool().createTask(
getStorageID().getFullTableName(), [this] { is_proctime ? threadFuncFireProc() : threadFuncFireEvent(); });
clean_cache_task->deactivate();
fire_task->deactivate();
@ -1306,7 +1318,7 @@ ASTPtr StorageWindowView::initInnerQuery(ASTSelectQuery query, ContextPtr contex
select_query = query.clone();
output_header.clear();
String select_database_name = getContext()->getCurrentDatabase();
String select_database_name = getCorrectContext()->getCurrentDatabase();
String select_table_name;
auto select_query_tmp = query.clone();
extractDependentTable(context_, select_query_tmp, select_database_name, select_table_name);
@ -1723,7 +1735,7 @@ void StorageWindowView::drop()
{
/// Must be guaranteed at this point for database engine Atomic that has_inner_table == false,
/// because otherwise will be a deadlock.
dropInnerTableIfAny(false, getContext());
dropInnerTableIfAny(false, getCorrectContext());
}
void StorageWindowView::dropInnerTableIfAny(bool sync, ContextPtr local_context)
@ -1734,10 +1746,10 @@ void StorageWindowView::dropInnerTableIfAny(bool sync, ContextPtr local_context)
try
{
InterpreterDropQuery::executeDropQuery(
ASTDropQuery::Kind::Drop, getContext(), local_context, inner_table_id, sync);
ASTDropQuery::Kind::Drop, getCorrectContext(), local_context, inner_table_id, sync);
if (has_inner_target_table)
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id, sync, /* ignore_sync_setting */ true);
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getCorrectContext(), local_context, target_table_id, sync, /* ignore_sync_setting */ true);
}
catch (...)
{
@ -1757,7 +1769,7 @@ const Block & StorageWindowView::getOutputHeader() const
std::lock_guard lock(sample_block_lock);
if (!output_header)
{
output_header = InterpreterSelectQuery(select_query->clone(), getContext(), SelectQueryOptions(QueryProcessingStage::Complete))
output_header = InterpreterSelectQuery(select_query->clone(), getCorrectContext(), SelectQueryOptions(QueryProcessingStage::Complete))
.getSampleBlock();
}
return output_header;
@ -1765,17 +1777,17 @@ const Block & StorageWindowView::getOutputHeader() const
StoragePtr StorageWindowView::getSourceTable() const
{
return DatabaseCatalog::instance().getTable(select_table_id, getContext());
return DatabaseCatalog::instance().getTable(select_table_id, getCorrectContext());
}
StoragePtr StorageWindowView::getInnerTable() const
{
return DatabaseCatalog::instance().getTable(inner_table_id, getContext());
return DatabaseCatalog::instance().getTable(inner_table_id, getCorrectContext());
}
StoragePtr StorageWindowView::getTargetTable() const
{
return DatabaseCatalog::instance().getTable(target_table_id, getContext());
return DatabaseCatalog::instance().getTable(target_table_id, getCorrectContext());
}
void StorageWindowView::throwIfWindowViewIsDisabled(ContextPtr local_context) const

View File

@ -100,7 +100,7 @@ using ASTPtr = std::shared_ptr<IAST>;
* Users need to take these duplicated results into account.
*/
class StorageWindowView final : public IStorage, WithContext
class StorageWindowView final : public IStorage
{
friend class WindowViewSource;
friend class WatermarkTransform;

View File

@ -1 +1,2 @@
Unknown function
Function with name 'xyz' does not exist.

View File

@ -8,7 +8,8 @@ ${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test"
${CLICKHOUSE_CLIENT} --query "CREATE TABLE test (s String) ENGINE = Memory"
# Calling an unknown function should not lead to creation of a 'user_defined' directory in the current directory
${CLICKHOUSE_CLIENT} --query "INSERT INTO test VALUES (xyz('abc'))" 2>&1 | grep -o -F 'Unknown function'
${CLICKHOUSE_CLIENT} --query "SET allow_experimental_analyzer = 0;INSERT INTO test VALUES (xyz('abc'))" 2>&1 | grep -o -F 'Unknown function'
${CLICKHOUSE_CLIENT} --query "SET allow_experimental_analyzer = 1;INSERT INTO test VALUES (xyz('abc'))" 2>&1 | grep -o -F "Function with name 'xyz' does not exist."
ls -ld user_defined 2> /dev/null