disable window view with analyzer properly

This commit is contained in:
Alexander Tokmakov 2024-04-06 21:51:28 +02:00
parent f8ef9fc5d3
commit 982d3db5b1
2 changed files with 39 additions and 3 deletions

View File

@ -439,6 +439,7 @@ bool StorageWindowView::optimize(
bool cleanup,
ContextPtr local_context)
{
throwIfWindowViewIsDisabled(local_context);
auto storage_ptr = getInnerTable();
auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
return getInnerTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, local_context);
@ -449,6 +450,7 @@ void StorageWindowView::alter(
ContextPtr local_context,
AlterLockHolder &)
{
throwIfWindowViewIsDisabled(local_context);
auto table_id = getStorageID();
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
@ -508,8 +510,9 @@ void StorageWindowView::alter(
startup();
}
void StorageWindowView::checkAlterIsPossible(const AlterCommands & commands, ContextPtr /*local_context*/) const
void StorageWindowView::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
{
throwIfWindowViewIsDisabled(local_context);
for (const auto & command : commands)
{
if (!command.isCommentAlter() && command.type != AlterCommand::MODIFY_QUERY)
@ -519,6 +522,7 @@ void StorageWindowView::checkAlterIsPossible(const AlterCommands & commands, Con
std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
{
throwIfWindowViewIsDisabled();
UInt32 w_start = addTime(watermark, window_kind, -window_num_units, *time_zone);
auto inner_table = getInnerTable();
@ -654,6 +658,7 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
inline void StorageWindowView::fire(UInt32 watermark)
{
throwIfWindowViewIsDisabled();
LOG_TRACE(log, "Watch streams number: {}, target table: {}",
watch_streams.size(),
target_table_id.empty() ? "None" : target_table_id.getNameForLogs());
@ -722,6 +727,7 @@ inline void StorageWindowView::fire(UInt32 watermark)
ASTPtr StorageWindowView::getSourceTableSelectQuery()
{
throwIfWindowViewIsDisabled();
auto query = select_query->clone();
auto & modified_select = query->as<ASTSelectQuery &>();
@ -947,6 +953,7 @@ UInt32 StorageWindowView::getWindowUpperBound(UInt32 time_sec)
void StorageWindowView::addFireSignal(std::set<UInt32> & signals)
{
throwIfWindowViewIsDisabled();
std::lock_guard lock(fire_signal_mutex);
for (const auto & signal : signals)
fire_signal.push_back(signal);
@ -962,6 +969,7 @@ void StorageWindowView::updateMaxTimestamp(UInt32 timestamp)
void StorageWindowView::updateMaxWatermark(UInt32 watermark)
{
throwIfWindowViewIsDisabled();
if (is_proctime)
{
max_watermark = watermark;
@ -1014,6 +1022,7 @@ void StorageWindowView::cleanup()
void StorageWindowView::threadFuncCleanup()
{
throwIfWindowViewIsDisabled();
if (shutdown_called)
return;
@ -1033,6 +1042,7 @@ void StorageWindowView::threadFuncCleanup()
void StorageWindowView::threadFuncFireProc()
{
throwIfWindowViewIsDisabled();
if (shutdown_called)
return;
@ -1069,6 +1079,7 @@ void StorageWindowView::threadFuncFireProc()
void StorageWindowView::threadFuncFireEvent()
{
throwIfWindowViewIsDisabled();
std::lock_guard lock(fire_signal_mutex);
LOG_TRACE(log, "Fire events: {}", fire_signal.size());
@ -1100,6 +1111,7 @@ void StorageWindowView::read(
const size_t max_block_size,
const size_t num_streams)
{
throwIfWindowViewIsDisabled(local_context);
if (target_table_id.empty())
return;
@ -1140,6 +1152,7 @@ Pipe StorageWindowView::watch(
size_t /*max_block_size*/,
const size_t /*num_streams*/)
{
throwIfWindowViewIsDisabled(local_context);
ASTWatchQuery & query = typeid_cast<ASTWatchQuery &>(*query_info.query);
bool has_limit = false;
@ -1178,8 +1191,10 @@ StorageWindowView::StorageWindowView(
, clean_interval_usec(context_->getSettingsRef().window_view_clean_interval.totalMicroseconds())
{
if (context_->getSettingsRef().allow_experimental_analyzer)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Experimental WINDOW VIEW feature is not supported with new infrastructure for query analysis (the setting 'allow_experimental_analyzer')");
disabled_due_to_analyzer = true;
if (mode <= LoadingStrictnessLevel::CREATE)
throwIfWindowViewIsDisabled();
if (!query.select)
throw Exception(ErrorCodes::INCORRECT_QUERY, "SELECT query is not specified for {}", getName());
@ -1243,6 +1258,9 @@ StorageWindowView::StorageWindowView(
}
}
if (disabled_due_to_analyzer)
return;
clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); });
fire_task = getContext()->getSchedulePool().createTask(
getStorageID().getFullTableName(), [this] { is_proctime ? threadFuncFireProc() : threadFuncFireEvent(); });
@ -1400,6 +1418,7 @@ void StorageWindowView::eventTimeParser(const ASTCreateQuery & query)
void StorageWindowView::writeIntoWindowView(
StorageWindowView & window_view, const Block & block, ContextPtr local_context)
{
window_view.throwIfWindowViewIsDisabled(local_context);
while (window_view.modifying_query)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
@ -1589,6 +1608,9 @@ void StorageWindowView::writeIntoWindowView(
void StorageWindowView::startup()
{
if (disabled_due_to_analyzer)
return;
DatabaseCatalog::instance().addViewDependency(select_table_id, getStorageID());
fire_task->activate();
@ -1602,6 +1624,8 @@ void StorageWindowView::startup()
void StorageWindowView::shutdown(bool)
{
shutdown_called = true;
if (disabled_due_to_analyzer)
return;
fire_condition.notify_all();
@ -1657,6 +1681,7 @@ Block StorageWindowView::getInputHeader() const
const Block & StorageWindowView::getOutputHeader() const
{
throwIfWindowViewIsDisabled();
std::lock_guard lock(sample_block_lock);
if (!output_header)
{
@ -1681,6 +1706,13 @@ StoragePtr StorageWindowView::getTargetTable() const
return DatabaseCatalog::instance().getTable(target_table_id, getContext());
}
void StorageWindowView::throwIfWindowViewIsDisabled(ContextPtr local_context) const
{
if (disabled_due_to_analyzer || (local_context && local_context->getSettingsRef().allow_experimental_analyzer))
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Experimental WINDOW VIEW feature is not supported "
"with new infrastructure for query analysis (the setting 'allow_experimental_analyzer')");
}
void registerStorageWindowView(StorageFactory & factory)
{
factory.registerStorage("WindowView", [](const StorageFactory::Arguments & args)

View File

@ -271,5 +271,9 @@ private:
StoragePtr getSourceTable() const;
StoragePtr getInnerTable() const;
StoragePtr getTargetTable() const;
bool disabled_due_to_analyzer = false;
void throwIfWindowViewIsDisabled(ContextPtr local_context = nullptr) const;
};
}