Revert changes in WindowView

This commit is contained in:
Dmitry Novik 2024-09-17 22:41:59 +02:00 committed by GitHub
parent f44960bb63
commit 1ca79602ad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 33 additions and 45 deletions

View File

@ -79,19 +79,6 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD;
}
namespace
{
ContextPtr getCorrectContext()
{
auto result = Context::createCopy(Context::getGlobalContextInstance());
result->setSetting("allow_experimental_analyzer", false);
return result;
}
}
namespace
{
/// Fetch all window info and replace tumble or hop node names with windowID
@ -441,7 +428,7 @@ ASTPtr StorageWindowView::getCleanupQuery()
void StorageWindowView::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
{
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Truncate, getCorrectContext(), local_context, inner_table_id, true);
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Truncate, getContext(), local_context, inner_table_id, true);
}
bool StorageWindowView::optimize(
@ -486,7 +473,7 @@ void StorageWindowView::alter(
output_header.clear();
InterpreterDropQuery::executeDropQuery(
ASTDropQuery::Kind::Drop, getCorrectContext(), local_context, inner_table_id, true);
ASTDropQuery::Kind::Drop, getContext(), local_context, inner_table_id, true);
/// create inner table
auto create_context = Context::createCopy(local_context);
@ -499,8 +486,8 @@ void StorageWindowView::alter(
shutdown_called = false;
clean_cache_task = getCorrectContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); });
fire_task = getCorrectContext()->getSchedulePool().createTask(
clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); });
fire_task = getContext()->getSchedulePool().createTask(
getStorageID().getFullTableName(), [this] { is_proctime ? threadFuncFireProc() : threadFuncFireEvent(); });
clean_cache_task->deactivate();
fire_task->deactivate();
@ -543,7 +530,7 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
auto inner_table = getInnerTable();
InterpreterSelectQuery fetch(
inner_fetch_query,
getCorrectContext(),
getContext(),
inner_table,
inner_table->getInMemoryMetadataPtr(),
SelectQueryOptions(QueryProcessingStage::FetchColumns));
@ -577,8 +564,8 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
filter_function = makeASTFunction("has", func_array, std::make_shared<ASTIdentifier>(window_id_name));
}
auto syntax_result = TreeRewriter(getCorrectContext()).analyze(filter_function, builder.getHeader().getNamesAndTypesList());
auto filter_expression = ExpressionAnalyzer(filter_function, syntax_result, getCorrectContext()).getActionsDAG(false);
auto syntax_result = TreeRewriter(getContext()).analyze(filter_function, builder.getHeader().getNamesAndTypesList());
auto filter_expression = ExpressionAnalyzer(filter_function, syntax_result, getContext()).getActionsDAG(false);
auto filter_actions = std::make_shared<ExpressionActions>(std::move(filter_expression));
builder.addSimpleTransform([&](const Block & header)
@ -594,7 +581,7 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
column.column = column.type->createColumnConst(0, Tuple{w_start, watermark});
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_actions
= std::make_shared<ExpressionActions>(std::move(adding_column_dag), ExpressionActionsSettings::fromContext(getCorrectContext()));
= std::make_shared<ExpressionActions>(std::move(adding_column_dag), ExpressionActionsSettings::fromContext(getContext()));
builder.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, adding_column_actions);
@ -608,7 +595,7 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
new_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto actions = std::make_shared<ExpressionActions>(
std::move(convert_actions_dag), ExpressionActionsSettings::fromContext(getCorrectContext(), CompileExpressions::yes));
std::move(convert_actions_dag), ExpressionActionsSettings::fromContext(getContext(), CompileExpressions::yes));
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
@ -632,11 +619,11 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
return StorageBlocks::createStorage(blocks_id_global, required_columns, std::move(pipes), QueryProcessingStage::WithMergeableState);
};
TemporaryTableHolder blocks_storage(getCorrectContext(), creator);
TemporaryTableHolder blocks_storage(getContext(), creator);
InterpreterSelectQuery select(
getFinalQuery(),
getCorrectContext(),
getContext(),
blocks_storage.getTable(),
blocks_storage.getTable()->getInMemoryMetadataPtr(),
SelectQueryOptions(QueryProcessingStage::Complete));
@ -651,8 +638,8 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
{
return std::make_shared<SquashingTransform>(
current_header,
getCorrectContext()->getSettingsRef().min_insert_block_size_rows,
getCorrectContext()->getSettingsRef().min_insert_block_size_bytes);
getContext()->getSettingsRef().min_insert_block_size_rows,
getContext()->getSettingsRef().min_insert_block_size_bytes);
});
auto header = builder.getHeader();
@ -707,7 +694,7 @@ inline void StorageWindowView::fire(UInt32 watermark)
insert->table_id = target_table->getStorageID();
InterpreterInsertQuery interpreter(
insert,
getCorrectContext(),
getContext(),
/* allow_materialized */ false,
/* no_squash */ false,
/* no_destination */ false,
@ -720,8 +707,8 @@ inline void StorageWindowView::fire(UInt32 watermark)
pipe.getHeader(),
block_io.pipeline.getHeader().getNamesAndTypesList(),
getTargetTable()->getInMemoryMetadataPtr()->getColumns(),
getCorrectContext(),
getCorrectContext()->getSettingsRef().insert_null_as_default);
getContext(),
getContext()->getSettingsRef().insert_null_as_default);
auto adding_missing_defaults_actions = std::make_shared<ExpressionActions>(std::move(adding_missing_defaults_dag));
pipe.addSimpleTransform([&](const Block & stream_header)
{
@ -734,7 +721,7 @@ inline void StorageWindowView::fire(UInt32 watermark)
ActionsDAG::MatchColumnsMode::Position);
auto actions = std::make_shared<ExpressionActions>(
std::move(convert_actions_dag),
ExpressionActionsSettings::fromContext(getCorrectContext(), CompileExpressions::yes));
ExpressionActionsSettings::fromContext(getContext(), CompileExpressions::yes));
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
@ -755,7 +742,7 @@ ASTPtr StorageWindowView::getSourceTableSelectQuery()
if (hasJoin(modified_select))
{
auto analyzer_res = TreeRewriterResult({});
removeJoin(modified_select, analyzer_res, getCorrectContext());
removeJoin(modified_select, analyzer_res, getContext());
}
else
{
@ -804,13 +791,13 @@ ASTPtr StorageWindowView::getInnerTableCreateQuery(const ASTPtr & inner_query, c
Aliases aliases;
QueryAliasesVisitor(aliases).visit(inner_query);
auto inner_query_normalized = inner_query->clone();
QueryNormalizer::Data normalizer_data(aliases, {}, false, getCorrectContext()->getSettingsRef(), false);
QueryNormalizer::Data normalizer_data(aliases, {}, false, getContext()->getSettingsRef(), false);
QueryNormalizer(normalizer_data).visit(inner_query_normalized);
auto inner_select_query = std::static_pointer_cast<ASTSelectQuery>(inner_query_normalized);
auto t_sample_block
= InterpreterSelectQuery(inner_select_query, getCorrectContext(), SelectQueryOptions(QueryProcessingStage::WithMergeableState))
= InterpreterSelectQuery(inner_select_query, getContext(), SelectQueryOptions(QueryProcessingStage::WithMergeableState))
.getSampleBlock();
ASTPtr columns_list = InterpreterCreateQuery::formatColumns(t_sample_block.getNamesAndTypesList());
@ -1028,7 +1015,7 @@ void StorageWindowView::cleanup()
std::lock_guard mutex_lock(mutex);
auto alter_query = getCleanupQuery();
auto cleanup_context = Context::createCopy(getCorrectContext());
auto cleanup_context = Context::createCopy(getContext());
cleanup_context->makeQueryContext();
cleanup_context->setCurrentQueryId("");
cleanup_context->setQueryKindReplicatedDatabaseInternal();
@ -1227,6 +1214,7 @@ StorageWindowView::StorageWindowView(
const String & comment,
LoadingStrictnessLevel mode)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, log(getLogger(fmt::format("StorageWindowView({}.{})", table_id_.database_name, table_id_.table_name)))
, fire_signal_timeout_s(context_->getSettingsRef().wait_for_window_view_fire_signal_timeout.totalSeconds())
, clean_interval_usec(context_->getSettingsRef().window_view_clean_interval.totalMicroseconds())
@ -1306,8 +1294,8 @@ StorageWindowView::StorageWindowView(
if (disabled_due_to_analyzer)
return;
clean_cache_task = getCorrectContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); });
fire_task = getCorrectContext()->getSchedulePool().createTask(
clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); });
fire_task = getContext()->getSchedulePool().createTask(
getStorageID().getFullTableName(), [this] { is_proctime ? threadFuncFireProc() : threadFuncFireEvent(); });
clean_cache_task->deactivate();
fire_task->deactivate();
@ -1318,7 +1306,7 @@ ASTPtr StorageWindowView::initInnerQuery(ASTSelectQuery query, ContextPtr contex
select_query = query.clone();
output_header.clear();
String select_database_name = getCorrectContext()->getCurrentDatabase();
String select_database_name = getContext()->getCurrentDatabase();
String select_table_name;
auto select_query_tmp = query.clone();
extractDependentTable(context_, select_query_tmp, select_database_name, select_table_name);
@ -1735,7 +1723,7 @@ void StorageWindowView::drop()
{
/// Must be guaranteed at this point for database engine Atomic that has_inner_table == false,
/// because otherwise will be a deadlock.
dropInnerTableIfAny(false, getCorrectContext());
dropInnerTableIfAny(false, getContext());
}
void StorageWindowView::dropInnerTableIfAny(bool sync, ContextPtr local_context)
@ -1746,10 +1734,10 @@ void StorageWindowView::dropInnerTableIfAny(bool sync, ContextPtr local_context)
try
{
InterpreterDropQuery::executeDropQuery(
ASTDropQuery::Kind::Drop, getCorrectContext(), local_context, inner_table_id, sync);
ASTDropQuery::Kind::Drop, getContext(), local_context, inner_table_id, sync);
if (has_inner_target_table)
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getCorrectContext(), local_context, target_table_id, sync, /* ignore_sync_setting */ true);
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id, sync, /* ignore_sync_setting */ true);
}
catch (...)
{
@ -1769,7 +1757,7 @@ const Block & StorageWindowView::getOutputHeader() const
std::lock_guard lock(sample_block_lock);
if (!output_header)
{
output_header = InterpreterSelectQuery(select_query->clone(), getCorrectContext(), SelectQueryOptions(QueryProcessingStage::Complete))
output_header = InterpreterSelectQuery(select_query->clone(), getContext(), SelectQueryOptions(QueryProcessingStage::Complete))
.getSampleBlock();
}
return output_header;
@ -1777,17 +1765,17 @@ const Block & StorageWindowView::getOutputHeader() const
StoragePtr StorageWindowView::getSourceTable() const
{
return DatabaseCatalog::instance().getTable(select_table_id, getCorrectContext());
return DatabaseCatalog::instance().getTable(select_table_id, getContext());
}
StoragePtr StorageWindowView::getInnerTable() const
{
return DatabaseCatalog::instance().getTable(inner_table_id, getCorrectContext());
return DatabaseCatalog::instance().getTable(inner_table_id, getContext());
}
StoragePtr StorageWindowView::getTargetTable() const
{
return DatabaseCatalog::instance().getTable(target_table_id, getCorrectContext());
return DatabaseCatalog::instance().getTable(target_table_id, getContext());
}
void StorageWindowView::throwIfWindowViewIsDisabled(ContextPtr local_context) const

View File

@ -100,7 +100,7 @@ using ASTPtr = std::shared_ptr<IAST>;
* Users need to take these duplicated results into account.
*/
class StorageWindowView final : public IStorage
class StorageWindowView final : public IStorage, WithContext
{
friend class WindowViewSource;
friend class WatermarkTransform;