mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #62367 from ClickHouse/experimental_conflict
Disable window view with analyzer properly
This commit is contained in:
commit
57985d9fb4
@ -439,6 +439,7 @@ bool StorageWindowView::optimize(
|
||||
bool cleanup,
|
||||
ContextPtr local_context)
|
||||
{
|
||||
throwIfWindowViewIsDisabled(local_context);
|
||||
auto storage_ptr = getInnerTable();
|
||||
auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
|
||||
return getInnerTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, local_context);
|
||||
@ -449,6 +450,7 @@ void StorageWindowView::alter(
|
||||
ContextPtr local_context,
|
||||
AlterLockHolder &)
|
||||
{
|
||||
throwIfWindowViewIsDisabled(local_context);
|
||||
auto table_id = getStorageID();
|
||||
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
|
||||
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
|
||||
@ -508,8 +510,9 @@ void StorageWindowView::alter(
|
||||
startup();
|
||||
}
|
||||
|
||||
void StorageWindowView::checkAlterIsPossible(const AlterCommands & commands, ContextPtr /*local_context*/) const
|
||||
void StorageWindowView::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
|
||||
{
|
||||
throwIfWindowViewIsDisabled(local_context);
|
||||
for (const auto & command : commands)
|
||||
{
|
||||
if (!command.isCommentAlter() && command.type != AlterCommand::MODIFY_QUERY)
|
||||
@ -519,6 +522,7 @@ void StorageWindowView::checkAlterIsPossible(const AlterCommands & commands, Con
|
||||
|
||||
std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
|
||||
{
|
||||
throwIfWindowViewIsDisabled();
|
||||
UInt32 w_start = addTime(watermark, window_kind, -window_num_units, *time_zone);
|
||||
|
||||
auto inner_table = getInnerTable();
|
||||
@ -654,6 +658,7 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
|
||||
|
||||
inline void StorageWindowView::fire(UInt32 watermark)
|
||||
{
|
||||
throwIfWindowViewIsDisabled();
|
||||
LOG_TRACE(log, "Watch streams number: {}, target table: {}",
|
||||
watch_streams.size(),
|
||||
target_table_id.empty() ? "None" : target_table_id.getNameForLogs());
|
||||
@ -722,6 +727,7 @@ inline void StorageWindowView::fire(UInt32 watermark)
|
||||
|
||||
ASTPtr StorageWindowView::getSourceTableSelectQuery()
|
||||
{
|
||||
throwIfWindowViewIsDisabled();
|
||||
auto query = select_query->clone();
|
||||
auto & modified_select = query->as<ASTSelectQuery &>();
|
||||
|
||||
@ -947,6 +953,7 @@ UInt32 StorageWindowView::getWindowUpperBound(UInt32 time_sec)
|
||||
|
||||
void StorageWindowView::addFireSignal(std::set<UInt32> & signals)
|
||||
{
|
||||
throwIfWindowViewIsDisabled();
|
||||
std::lock_guard lock(fire_signal_mutex);
|
||||
for (const auto & signal : signals)
|
||||
fire_signal.push_back(signal);
|
||||
@ -962,6 +969,7 @@ void StorageWindowView::updateMaxTimestamp(UInt32 timestamp)
|
||||
|
||||
void StorageWindowView::updateMaxWatermark(UInt32 watermark)
|
||||
{
|
||||
throwIfWindowViewIsDisabled();
|
||||
if (is_proctime)
|
||||
{
|
||||
max_watermark = watermark;
|
||||
@ -1014,6 +1022,7 @@ void StorageWindowView::cleanup()
|
||||
|
||||
void StorageWindowView::threadFuncCleanup()
|
||||
{
|
||||
throwIfWindowViewIsDisabled();
|
||||
if (shutdown_called)
|
||||
return;
|
||||
|
||||
@ -1033,6 +1042,7 @@ void StorageWindowView::threadFuncCleanup()
|
||||
|
||||
void StorageWindowView::threadFuncFireProc()
|
||||
{
|
||||
throwIfWindowViewIsDisabled();
|
||||
if (shutdown_called)
|
||||
return;
|
||||
|
||||
@ -1069,6 +1079,7 @@ void StorageWindowView::threadFuncFireProc()
|
||||
|
||||
void StorageWindowView::threadFuncFireEvent()
|
||||
{
|
||||
throwIfWindowViewIsDisabled();
|
||||
std::lock_guard lock(fire_signal_mutex);
|
||||
|
||||
LOG_TRACE(log, "Fire events: {}", fire_signal.size());
|
||||
@ -1100,6 +1111,7 @@ void StorageWindowView::read(
|
||||
const size_t max_block_size,
|
||||
const size_t num_streams)
|
||||
{
|
||||
throwIfWindowViewIsDisabled(local_context);
|
||||
if (target_table_id.empty())
|
||||
return;
|
||||
|
||||
@ -1140,6 +1152,7 @@ Pipe StorageWindowView::watch(
|
||||
size_t /*max_block_size*/,
|
||||
const size_t /*num_streams*/)
|
||||
{
|
||||
throwIfWindowViewIsDisabled(local_context);
|
||||
ASTWatchQuery & query = typeid_cast<ASTWatchQuery &>(*query_info.query);
|
||||
|
||||
bool has_limit = false;
|
||||
@ -1178,8 +1191,10 @@ StorageWindowView::StorageWindowView(
|
||||
, clean_interval_usec(context_->getSettingsRef().window_view_clean_interval.totalMicroseconds())
|
||||
{
|
||||
if (context_->getSettingsRef().allow_experimental_analyzer)
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
|
||||
"Experimental WINDOW VIEW feature is not supported with new infrastructure for query analysis (the setting 'allow_experimental_analyzer')");
|
||||
disabled_due_to_analyzer = true;
|
||||
|
||||
if (mode <= LoadingStrictnessLevel::CREATE)
|
||||
throwIfWindowViewIsDisabled();
|
||||
|
||||
if (!query.select)
|
||||
throw Exception(ErrorCodes::INCORRECT_QUERY, "SELECT query is not specified for {}", getName());
|
||||
@ -1243,6 +1258,9 @@ StorageWindowView::StorageWindowView(
|
||||
}
|
||||
}
|
||||
|
||||
if (disabled_due_to_analyzer)
|
||||
return;
|
||||
|
||||
clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); });
|
||||
fire_task = getContext()->getSchedulePool().createTask(
|
||||
getStorageID().getFullTableName(), [this] { is_proctime ? threadFuncFireProc() : threadFuncFireEvent(); });
|
||||
@ -1400,6 +1418,7 @@ void StorageWindowView::eventTimeParser(const ASTCreateQuery & query)
|
||||
void StorageWindowView::writeIntoWindowView(
|
||||
StorageWindowView & window_view, const Block & block, ContextPtr local_context)
|
||||
{
|
||||
window_view.throwIfWindowViewIsDisabled(local_context);
|
||||
while (window_view.modifying_query)
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
|
||||
@ -1589,6 +1608,9 @@ void StorageWindowView::writeIntoWindowView(
|
||||
|
||||
void StorageWindowView::startup()
|
||||
{
|
||||
if (disabled_due_to_analyzer)
|
||||
return;
|
||||
|
||||
DatabaseCatalog::instance().addViewDependency(select_table_id, getStorageID());
|
||||
|
||||
fire_task->activate();
|
||||
@ -1602,6 +1624,8 @@ void StorageWindowView::startup()
|
||||
void StorageWindowView::shutdown(bool)
|
||||
{
|
||||
shutdown_called = true;
|
||||
if (disabled_due_to_analyzer)
|
||||
return;
|
||||
|
||||
fire_condition.notify_all();
|
||||
|
||||
@ -1657,6 +1681,7 @@ Block StorageWindowView::getInputHeader() const
|
||||
|
||||
const Block & StorageWindowView::getOutputHeader() const
|
||||
{
|
||||
throwIfWindowViewIsDisabled();
|
||||
std::lock_guard lock(sample_block_lock);
|
||||
if (!output_header)
|
||||
{
|
||||
@ -1681,6 +1706,13 @@ StoragePtr StorageWindowView::getTargetTable() const
|
||||
return DatabaseCatalog::instance().getTable(target_table_id, getContext());
|
||||
}
|
||||
|
||||
void StorageWindowView::throwIfWindowViewIsDisabled(ContextPtr local_context) const
|
||||
{
|
||||
if (disabled_due_to_analyzer || (local_context && local_context->getSettingsRef().allow_experimental_analyzer))
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Experimental WINDOW VIEW feature is not supported "
|
||||
"in the current infrastructure for query analysis (the setting 'allow_experimental_analyzer')");
|
||||
}
|
||||
|
||||
void registerStorageWindowView(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage("WindowView", [](const StorageFactory::Arguments & args)
|
||||
|
@ -271,5 +271,9 @@ private:
|
||||
StoragePtr getSourceTable() const;
|
||||
StoragePtr getInnerTable() const;
|
||||
StoragePtr getTargetTable() const;
|
||||
|
||||
bool disabled_due_to_analyzer = false;
|
||||
|
||||
void throwIfWindowViewIsDisabled(ContextPtr local_context = nullptr) const;
|
||||
};
|
||||
}
|
||||
|
@ -28,6 +28,8 @@ with client(name="client1>", log=log) as client1, client(
|
||||
client1.expect(prompt)
|
||||
client2.send("SET allow_experimental_window_view = 1")
|
||||
client2.expect(prompt)
|
||||
client2.send("SET allow_experimental_analyzer = 0")
|
||||
client2.expect(prompt)
|
||||
|
||||
client1.send("CREATE DATABASE IF NOT EXISTS 01056_window_view_proc_hop_watch")
|
||||
client1.expect(prompt)
|
||||
|
@ -26,6 +26,8 @@ with client(name="client1>", log=log) as client1, client(
|
||||
client1.expect(prompt)
|
||||
client1.send("SET window_view_heartbeat_interval = 1")
|
||||
client1.expect(prompt)
|
||||
client2.send("SET allow_experimental_analyzer = 0")
|
||||
client2.expect(prompt)
|
||||
|
||||
client1.send("CREATE DATABASE IF NOT EXISTS db_01059_event_hop_watch_strict_asc")
|
||||
client1.expect(prompt)
|
||||
|
@ -28,6 +28,8 @@ with client(name="client1>", log=log) as client1, client(
|
||||
client1.expect(prompt)
|
||||
client2.send("SET allow_experimental_window_view = 1")
|
||||
client2.expect(prompt)
|
||||
client2.send("SET allow_experimental_analyzer = 0")
|
||||
client2.expect(prompt)
|
||||
|
||||
client1.send("CREATE DATABASE IF NOT EXISTS 01062_window_view_event_hop_watch_asc")
|
||||
client1.expect(prompt)
|
||||
|
@ -27,6 +27,8 @@ with client(name="client1>", log=log) as client1, client(
|
||||
client1.expect(prompt)
|
||||
client2.send("SET allow_experimental_window_view = 1")
|
||||
client2.expect(prompt)
|
||||
client2.send("SET allow_experimental_analyzer = 0")
|
||||
client2.expect(prompt)
|
||||
|
||||
client1.send(
|
||||
"CREATE DATABASE IF NOT EXISTS 01065_window_view_event_hop_watch_bounded"
|
||||
|
@ -28,6 +28,8 @@ with client(name="client1>", log=log) as client1, client(
|
||||
client1.expect(prompt)
|
||||
client2.send("SET allow_experimental_window_view = 1")
|
||||
client2.expect(prompt)
|
||||
client2.send("SET allow_experimental_analyzer = 0")
|
||||
client2.expect(prompt)
|
||||
|
||||
client1.send("CREATE DATABASE 01069_window_view_proc_tumble_watch")
|
||||
client1.expect(prompt)
|
||||
|
@ -28,6 +28,8 @@ with client(name="client1>", log=log) as client1, client(
|
||||
client1.expect(prompt)
|
||||
client2.send("SET allow_experimental_window_view = 1")
|
||||
client2.expect(prompt)
|
||||
client2.send("SET allow_experimental_analyzer = 0")
|
||||
client2.expect(prompt)
|
||||
|
||||
client1.send("CREATE DATABASE IF NOT EXISTS 01070_window_view_watch_events")
|
||||
client1.expect(prompt)
|
||||
|
@ -28,10 +28,14 @@ with client(name="client1>", log=log) as client1, client(
|
||||
client1.expect(prompt)
|
||||
client2.send("SET allow_experimental_window_view = 1")
|
||||
client2.expect(prompt)
|
||||
client2.send("SET allow_experimental_analyzer = 0")
|
||||
client2.expect(prompt)
|
||||
client3.send("SET allow_experimental_window_view = 1")
|
||||
client3.expect(prompt)
|
||||
client3.send("SET window_view_heartbeat_interval = 1")
|
||||
client3.expect(prompt)
|
||||
client3.send("SET allow_experimental_analyzer = 0")
|
||||
client3.expect(prompt)
|
||||
|
||||
client1.send("CREATE DATABASE IF NOT EXISTS 01078_window_view_alter_query_watch")
|
||||
client1.expect(prompt)
|
||||
|
@ -27,6 +27,8 @@ with client(name="client1>", log=log) as client1, client(
|
||||
client1.expect(prompt)
|
||||
client2.send("SET allow_experimental_window_view = 1")
|
||||
client2.expect(prompt)
|
||||
client2.send("SET allow_experimental_analyzer = 0")
|
||||
client2.expect(prompt)
|
||||
|
||||
client1.send("CREATE DATABASE IF NOT EXISTS 01082_window_view_watch_limit")
|
||||
client1.expect(prompt)
|
||||
|
Loading…
Reference in New Issue
Block a user