diff --git a/ci/jobs/scripts/check_style/checks_to_refactor.sh b/ci/jobs/scripts/check_style/checks_to_refactor.sh index ae4aae23c12..4da7595151b 100755 --- a/ci/jobs/scripts/check_style/checks_to_refactor.sh +++ b/ci/jobs/scripts/check_style/checks_to_refactor.sh @@ -11,7 +11,7 @@ tests_with_query_log=( $( for test_case in "${tests_with_query_log[@]}"; do grep -qE current_database.*currentDatabase "$test_case" || { grep -qE 'current_database.*\$CLICKHOUSE_DATABASE' "$test_case" - } || echo "Queries to system.query_log/system.query_thread_log does not have current_database = currentDatabase() condition in $test_case" + } || echo "Query to system.query_log/system.query_thread_log does not have current_database = currentDatabase() condition in $test_case" done grep -iE 'SYSTEM STOP MERGES;?$' -R $ROOT_PATH/tests/queries && echo "Merges cannot be disabled globally in fast/stateful/stateless tests, because it will break concurrently running queries" diff --git a/docs/en/sql-reference/statements/create/view.md b/docs/en/sql-reference/statements/create/view.md index c770348bce0..a0576b36392 100644 --- a/docs/en/sql-reference/statements/create/view.md +++ b/docs/en/sql-reference/statements/create/view.md @@ -56,7 +56,7 @@ SELECT * FROM view(column1=value1, column2=value2 ...) ``` sql CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster_name] [TO[db.]name] [ENGINE = engine] [POPULATE] -[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | INVOKER | NONE }] +[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | NONE }] AS SELECT ... [COMMENT 'comment'] ``` @@ -154,7 +154,7 @@ This feature is deprecated and will be removed in the future. For your convenience, the old documentation is located [here](https://pastila.nl/?00f32652/fdf07272a7b54bda7e13b919264e449f.md) -## Refreshable Materialized View [Experimental] {#refreshable-materialized-view} +## Refreshable Materialized View {#refreshable-materialized-view} ```sql CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name @@ -164,6 +164,7 @@ DEPENDS ON [db.]name [, [db.]name [, ...]] SETTINGS name = value [, name = value [, ...]] [APPEND] [TO[db.]name] [(columns)] [ENGINE = engine] [EMPTY] +[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | NONE }] AS SELECT ... [COMMENT 'comment'] ``` diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index ceda7adbcd4..aa97672123f 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -919,7 +919,7 @@ try query_for_logging, key.query, pipeline, - interpreter, + interpreter.get(), internal, query_database, query_table, diff --git a/src/Interpreters/ClientInfo.cpp b/src/Interpreters/ClientInfo.cpp index daf1e300046..d5a1a5892da 100644 --- a/src/Interpreters/ClientInfo.cpp +++ b/src/Interpreters/ClientInfo.cpp @@ -256,6 +256,8 @@ String toString(ClientInfo::Interface interface) return "TCP_INTERSERVER"; case ClientInfo::Interface::PROMETHEUS: return "PROMETHEUS"; + case ClientInfo::Interface::BACKGROUND: + return "BACKGROUND"; } return std::format("Unknown server interface ({}).", static_cast(interface)); diff --git a/src/Interpreters/ClientInfo.h b/src/Interpreters/ClientInfo.h index 48dea3cc3ea..84f7b9b816a 100644 --- a/src/Interpreters/ClientInfo.h +++ b/src/Interpreters/ClientInfo.h @@ -39,6 +39,7 @@ public: LOCAL = 6, TCP_INTERSERVER = 7, PROMETHEUS = 8, + BACKGROUND = 9, // e.g. queries from refreshable materialized views }; enum class HTTPMethod : uint8_t diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 21b8be93f92..a8f16a8b73a 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -1302,7 +1302,7 @@ void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query) RefreshTaskList InterpreterSystemQuery::getRefreshTasks() { auto ctx = getContext(); - ctx->checkAccess(AccessType::SYSTEM_VIEWS); + ctx->checkAccess(AccessType::SYSTEM_VIEWS, table_id.database_name, table_id.table_name); auto tasks = ctx->getRefreshSet().findTasks(table_id); if (tasks.empty()) throw Exception( diff --git a/src/Interpreters/SessionLog.cpp b/src/Interpreters/SessionLog.cpp index afbd4ed45a1..c3bf3dd82a8 100644 --- a/src/Interpreters/SessionLog.cpp +++ b/src/Interpreters/SessionLog.cpp @@ -107,8 +107,9 @@ ColumnsDescription SessionLogElement::getColumnsDescription() {"Local", static_cast(Interface::LOCAL)}, {"TCP_Interserver", static_cast(Interface::TCP_INTERSERVER)}, {"Prometheus", static_cast(Interface::PROMETHEUS)}, + {"Background", static_cast(Interface::BACKGROUND)}, }); - static_assert(magic_enum::enum_count() == 8); + static_assert(magic_enum::enum_count() == 9, "Please update the array above to match the enum."); auto lc_string_datatype = std::make_shared(std::make_shared()); diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index a94eca3d7b1..ec9b626cac9 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -389,7 +389,7 @@ QueryLogElement logQueryStart( const String & query_for_logging, const ASTPtr & query_ast, const QueryPipeline & pipeline, - const std::unique_ptr & interpreter, + const IInterpreter * interpreter, bool internal, const String & query_database, const String & query_table, @@ -1458,7 +1458,7 @@ static std::tuple executeQueryImpl( query_for_logging, ast, pipeline, - interpreter, + interpreter.get(), internal, query_database, query_table, diff --git a/src/Interpreters/executeQuery.h b/src/Interpreters/executeQuery.h index c6b3e1fc34e..f2b6be5836e 100644 --- a/src/Interpreters/executeQuery.h +++ b/src/Interpreters/executeQuery.h @@ -81,7 +81,7 @@ QueryLogElement logQueryStart( const String & query_for_logging, const ASTPtr & query_ast, const QueryPipeline & pipeline, - const std::unique_ptr & interpreter, + const IInterpreter * interpreter, bool internal, const String & query_database, const String & query_table, diff --git a/src/Storages/MaterializedView/RefreshTask.cpp b/src/Storages/MaterializedView/RefreshTask.cpp index 3b893d4677a..98b0d88c21e 100644 --- a/src/Storages/MaterializedView/RefreshTask.cpp +++ b/src/Storages/MaterializedView/RefreshTask.cpp @@ -229,7 +229,7 @@ void RefreshTask::alterRefreshParams(const DB::ASTRefreshStrategy & new_strategy RefreshTask::Info RefreshTask::getInfo() const { std::lock_guard guard(mutex); - return Info {.view_id = set_handle.getID(), .state = state, .next_refresh_time = next_refresh_time, .znode = coordination.root_znode, .refresh_running = coordination.running_znode_exists, .progress = execution.progress.getValues()}; + return Info {.view_id = set_handle.getID(), .state = state, .next_refresh_time = next_refresh_time, .znode = coordination.root_znode, .refresh_running = coordination.running_znode_exists, .progress = execution.progress.getValues(), .unexpected_error = scheduling.unexpected_error}; } void RefreshTask::start() @@ -237,6 +237,7 @@ void RefreshTask::start() std::lock_guard guard(mutex); if (!std::exchange(scheduling.stop_requested, false)) return; + scheduling.unexpected_error = std::nullopt; refresh_task->schedule(); } @@ -391,10 +392,10 @@ void RefreshTask::refreshTask() } /// Check if it's time to refresh. - auto now = currentTime(); - auto start_time = std::chrono::floor(now); - auto start_time_steady = std::chrono::steady_clock::now(); - auto [when, timeslot, start_znode] = determineNextRefreshTime(start_time); + auto start_time = currentTime(); + auto start_time_seconds = std::chrono::floor(start_time); + Stopwatch stopwatch; + auto [when, timeslot, start_znode] = determineNextRefreshTime(start_time_seconds); next_refresh_time = when; bool out_of_schedule = scheduling.out_of_schedule_refresh_requested; if (out_of_schedule) @@ -402,9 +403,9 @@ void RefreshTask::refreshTask() chassert(start_znode.attempt_number > 0); start_znode.attempt_number -= 1; } - else if (now < when) + else if (start_time < when) { - size_t delay_ms = std::chrono::duration_cast(when - now).count(); + size_t delay_ms = std::chrono::duration_cast(when - start_time).count(); /// If we're in a test that fakes the clock, poll every 100ms. if (scheduling.fake_clock.load(std::memory_order_relaxed) != INT64_MIN) delay_ms = 100; @@ -443,53 +444,35 @@ void RefreshTask::refreshTask() int32_t root_znode_version = coordination.coordinated ? coordination.root_znode.version : -1; CurrentMetrics::Increment metric_inc(CurrentMetrics::RefreshingViews); + String log_comment = fmt::format("refresh of {}", view->getStorageID().getFullTableName()); + if (start_znode.attempt_number > 1) + log_comment += fmt::format(" (attempt {}/{})", start_znode.attempt_number, refresh_settings[RefreshSetting::refresh_retries] + 1); + lock.unlock(); - bool refreshed = false; String error_message; - UUID new_table_uuid; - - try - { - new_table_uuid = executeRefreshUnlocked(append, root_znode_version); - refreshed = true; - } - catch (...) - { - if (execution.interrupt_execution.load()) - { - error_message = "cancelled"; - LOG_INFO(log, "{}: Refresh cancelled", view->getStorageID().getFullTableName()); - } - else - { - error_message = getCurrentExceptionMessage(true); - LOG_ERROR(log, "{}: Refresh failed (attempt {}/{}): {}", view->getStorageID().getFullTableName(), start_znode.attempt_number, refresh_settings[RefreshSetting::refresh_retries] + 1, error_message); - } - } + auto new_table_uuid = executeRefreshUnlocked(append, root_znode_version, start_time, stopwatch, log_comment, error_message); + bool refreshed = new_table_uuid.has_value(); lock.lock(); setState(RefreshState::Scheduling, lock); - auto end_time = std::chrono::floor(currentTime()); + auto end_time_seconds = std::chrono::floor(currentTime()); auto znode = coordination.root_znode; - znode.last_attempt_time = end_time; + znode.last_attempt_time = end_time_seconds; + znode.last_attempt_error = error_message; if (refreshed) { znode.last_attempt_succeeded = true; - znode.last_completed_timeslot = refresh_schedule.timeslotForCompletedRefresh(znode.last_completed_timeslot, start_time, end_time, out_of_schedule); - znode.last_success_time = start_time; - znode.last_success_duration = std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time_steady); - znode.last_success_table_uuid = new_table_uuid; + znode.last_completed_timeslot = refresh_schedule.timeslotForCompletedRefresh(znode.last_completed_timeslot, start_time_seconds, end_time_seconds, out_of_schedule); + znode.last_success_time = start_time_seconds; + znode.last_success_duration = std::chrono::milliseconds(stopwatch.elapsedMilliseconds()); + znode.last_success_table_uuid = *new_table_uuid; znode.previous_attempt_error = ""; znode.attempt_number = 0; znode.randomize(); } - else - { - znode.last_attempt_error = error_message; - } bool ok = updateCoordinationState(znode, false, zookeeper, lock); chassert(ok); @@ -517,8 +500,10 @@ void RefreshTask::refreshTask() if (!lock.owns_lock()) lock.lock(); scheduling.stop_requested = true; + scheduling.unexpected_error = getCurrentExceptionMessage(true); coordination.watches->should_reread_znodes.store(true); coordination.running_znode_exists = false; + refresh_task->schedule(); lock.unlock(); tryLogCurrentException(log, @@ -532,34 +517,43 @@ void RefreshTask::refreshTask() } } -UUID RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version) +std::optional RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version, std::chrono::system_clock::time_point start_time, const Stopwatch & stopwatch, const String & log_comment, String & out_error_message) { - LOG_DEBUG(log, "Refreshing view {}", view->getStorageID().getFullTableName()); + StorageID view_storage_id = view->getStorageID(); + LOG_DEBUG(log, "Refreshing view {}", view_storage_id.getFullTableName()); execution.progress.reset(); - ContextMutablePtr refresh_context = view->createRefreshContext(); + ContextMutablePtr refresh_context = view->createRefreshContext(log_comment); if (!append) { - refresh_context->setParentTable(view->getStorageID().uuid); + refresh_context->setParentTable(view_storage_id.uuid); refresh_context->setDDLQueryCancellation(execution.cancel_ddl_queries.get_token()); if (root_znode_version != -1) refresh_context->setDDLAdditionalChecksOnEnqueue({zkutil::makeCheckRequest(coordination.path, root_znode_version)}); } + std::optional query_log_elem; + std::shared_ptr refresh_query; + String query_for_logging; + std::shared_ptr query_span = std::make_shared("query"); + ProcessList::EntryPtr process_list_entry; + std::optional table_to_drop; auto new_table_id = StorageID::createEmpty(); try { { /// Create a table. - auto [refresh_query, query_scope] = view->prepareRefresh(append, refresh_context, table_to_drop); + query_for_logging = "(create target table)"; + std::unique_ptr query_scope; + std::tie(refresh_query, query_scope) = view->prepareRefresh(append, refresh_context, table_to_drop); new_table_id = refresh_query->table_id; /// Add the query to system.processes and allow it to be killed with KILL QUERY. - String query_for_logging = refresh_query->formatForLogging( + query_for_logging = refresh_query->formatForLogging( refresh_context->getSettingsRef()[Setting::log_queries_cut_to_length]); - auto process_list_entry = refresh_context->getProcessList().insert( + process_list_entry = refresh_context->getProcessList().insert( query_for_logging, refresh_query.get(), refresh_context, Stopwatch{CLOCK_MONOTONIC}.getStart()); refresh_context->setProcessListElement(process_list_entry->getQueryStatus()); refresh_context->setProgressCallback([this](const Progress & prog) @@ -569,15 +563,23 @@ UUID RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version /// Run the query. - BlockIO block_io = InterpreterInsertQuery( + InterpreterInsertQuery interpreter( refresh_query, refresh_context, /* allow_materialized */ false, /* no_squash */ false, /* no_destination */ false, - /* async_isnert */ false).execute(); + /* async_isnert */ false); + BlockIO block_io = interpreter.execute(); QueryPipeline & pipeline = block_io.pipeline; + /// We log the refresh as one INSERT SELECT query, but the timespan and exceptions also + /// cover the surrounding CREATE, EXCHANGE, and DROP queries. + query_log_elem = logQueryStart( + start_time, refresh_context, query_for_logging, refresh_query, pipeline, + &interpreter, /*internal*/ false, view_storage_id.database_name, + view_storage_id.table_name, /*async_insert*/ false); + if (!pipeline.completed()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for view refresh must be completed"); @@ -605,21 +607,51 @@ UUID RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version /// (specifically, the assert in ~WriteBuffer()). if (execution.interrupt_execution.load()) throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Refresh cancelled"); + + logQueryFinish(*query_log_elem, refresh_context, refresh_query, pipeline, /*pulling_pipeline*/ false, query_span, QueryCache::Usage::None, /*internal*/ false); + query_log_elem = std::nullopt; + query_span = nullptr; + process_list_entry.reset(); // otherwise it needs to be alive for logQueryException } /// Exchange tables. if (!append) + { + query_for_logging = "(exchange tables)"; table_to_drop = view->exchangeTargetTable(new_table_id, refresh_context); + } } catch (...) { + bool cancelled = execution.interrupt_execution.load(); + if (table_to_drop.has_value()) - view->dropTempTable(table_to_drop.value(), refresh_context); - throw; + { + String discard_error_message; + view->dropTempTable(table_to_drop.value(), refresh_context, discard_error_message); + } + + if (query_log_elem.has_value()) + { + logQueryException(*query_log_elem, refresh_context, stopwatch, refresh_query, query_span, /*internal*/ false, /*log_error*/ !cancelled); + } + else + { + /// Failed when creating new table or when swapping tables. + logExceptionBeforeStart(query_for_logging, refresh_context, /*ast*/ nullptr, query_span, + stopwatch.elapsedMilliseconds()); + } + + if (cancelled) + out_error_message = "cancelled"; + else + out_error_message = getCurrentExceptionMessage(true); + + return std::nullopt; } if (table_to_drop.has_value()) - view->dropTempTable(table_to_drop.value(), refresh_context); + view->dropTempTable(table_to_drop.value(), refresh_context, out_error_message); return new_table_id.uuid; } diff --git a/src/Storages/MaterializedView/RefreshTask.h b/src/Storages/MaterializedView/RefreshTask.h index 59898390984..eb509278263 100644 --- a/src/Storages/MaterializedView/RefreshTask.h +++ b/src/Storages/MaterializedView/RefreshTask.h @@ -151,6 +151,7 @@ public: CoordinationZnode znode; bool refresh_running; ProgressValues progress; + std::optional unexpected_error; // refreshing is stopped because of unexpected error }; private: @@ -203,6 +204,8 @@ private: { /// Refreshes are stopped, e.g. by SYSTEM STOP VIEW. bool stop_requested = false; + /// Refreshes are stopped because we got an unexpected error. Can be resumed with SYSTEM START VIEW. + std::optional unexpected_error; /// An out-of-schedule refresh was requested, e.g. by SYSTEM REFRESH VIEW. bool out_of_schedule_refresh_requested = false; @@ -257,8 +260,8 @@ private: void refreshTask(); /// Perform an actual refresh: create new table, run INSERT SELECT, exchange tables, drop old table. - /// Mutex must be unlocked. Called only from refresh_task. - UUID executeRefreshUnlocked(bool append, int32_t root_znode_version); + /// Mutex must be unlocked. Called only from refresh_task. Doesn't throw. + std::optional executeRefreshUnlocked(bool append, int32_t root_znode_version, std::chrono::system_clock::time_point start_time, const Stopwatch & stopwatch, const String & log_comment, String & out_error_message); /// Assigns dependencies_satisfied_until. void updateDependenciesIfNeeded(std::unique_lock & lock); diff --git a/src/Storages/StorageInMemoryMetadata.cpp b/src/Storages/StorageInMemoryMetadata.cpp index 4c74c8f56d1..62565eff2ce 100644 --- a/src/Storages/StorageInMemoryMetadata.cpp +++ b/src/Storages/StorageInMemoryMetadata.cpp @@ -117,7 +117,7 @@ UUID StorageInMemoryMetadata::getDefinerID(DB::ContextPtr context) const return access_control.getID(*definer); } -ContextMutablePtr StorageInMemoryMetadata::getSQLSecurityOverriddenContext(ContextPtr context) const +ContextMutablePtr StorageInMemoryMetadata::getSQLSecurityOverriddenContext(ContextPtr context, const ClientInfo * client_info) const { if (!sql_security_type) return Context::createCopy(context); @@ -126,7 +126,10 @@ ContextMutablePtr StorageInMemoryMetadata::getSQLSecurityOverriddenContext(Conte return Context::createCopy(context); auto new_context = Context::createCopy(context->getGlobalContext()); - new_context->setClientInfo(context->getClientInfo()); + if (client_info) + new_context->setClientInfo(*client_info); + else + new_context->setClientInfo(context->getClientInfo()); new_context->makeQueryContext(); const auto & database = context->getCurrentDatabase(); diff --git a/src/Storages/StorageInMemoryMetadata.h b/src/Storages/StorageInMemoryMetadata.h index 64ae499ec6e..fbea916a2f3 100644 --- a/src/Storages/StorageInMemoryMetadata.h +++ b/src/Storages/StorageInMemoryMetadata.h @@ -18,6 +18,8 @@ namespace DB { +class ClientInfo; + /// Common metadata for all storages. Contains all possible parts of CREATE /// query from all storages, but only some subset used. struct StorageInMemoryMetadata @@ -122,7 +124,7 @@ struct StorageInMemoryMetadata /// Returns a copy of the context with the correct user from SQL security options. /// If the SQL security wasn't set, this is equivalent to `Context::createCopy(context)`. /// The context from this function must be used every time whenever views execute any read/write operations or subqueries. - ContextMutablePtr getSQLSecurityOverriddenContext(ContextPtr context) const; + ContextMutablePtr getSQLSecurityOverriddenContext(ContextPtr context, const ClientInfo * client_info = nullptr) const; /// Returns combined set of columns const ColumnsDescription & getColumns() const; diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index 9491c40b65f..d5cdaa3501a 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -46,6 +46,7 @@ namespace Setting { extern const SettingsBool allow_experimental_analyzer; extern const SettingsSeconds lock_acquire_timeout; + extern const SettingsUInt64 log_queries_cut_to_length; } namespace ServerSetting @@ -191,9 +192,9 @@ StorageMaterializedView::StorageMaterializedView( } } - /// Sanity-check the table engine. if (mode < LoadingStrictnessLevel::ATTACH && !fixed_uuid) { + /// Sanity-check the table engine. String inner_engine; if (has_inner_table) { @@ -221,6 +222,12 @@ StorageMaterializedView::StorageMaterializedView( /// each refresh would append to a table on one arbitrarily chosen replica. But in principle it can be useful, /// e.g. if SELECTs are done using clusterAllReplicas(). (For the two disallowed cases above, clusterAllReplicas() wouldn't work reliably.) } + + /// Sanity-check permissions. This is just for usability, the main checks are done by the + /// actual CREATE/INSERT/SELECT/EXCHANGE/DROP interpreters during refresh. + String inner_db_name = has_inner_table ? table_id_.database_name : to_table_id.database_name; + auto refresh_context = storage_metadata.getSQLSecurityOverriddenContext(getContext()); + refresh_context->checkAccess(AccessType::DROP_TABLE | AccessType::CREATE_TABLE | AccessType::SELECT | AccessType::INSERT, inner_db_name); } refresher = RefreshTask::create(this, getContext(), *query.refresh_strategy, mode >= LoadingStrictnessLevel::ATTACH, refresh_coordinated, query.is_create_empty); @@ -503,10 +510,16 @@ bool StorageMaterializedView::optimize( return storage_ptr->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, local_context); } -ContextMutablePtr StorageMaterializedView::createRefreshContext() const +ContextMutablePtr StorageMaterializedView::createRefreshContext(const String & log_comment) const { - auto refresh_context = getInMemoryMetadataPtr()->getSQLSecurityOverriddenContext(getContext()); + ContextPtr table_context = getContext(); + ClientInfo client_info = table_context->getClientInfo(); + client_info.interface = ClientInfo::Interface::BACKGROUND; + client_info.client_name = "refreshable materialized view"; + auto refresh_context = getInMemoryMetadataPtr()->getSQLSecurityOverriddenContext(table_context, &client_info); + refresh_context->setClientInfo(client_info); refresh_context->setSetting("database_replicated_allow_replicated_engine_arguments", 3); + refresh_context->setSetting("log_comment", log_comment); refresh_context->setQueryKind(ClientInfo::QueryKind::INITIAL_QUERY); /// Generate a random query id. refresh_context->setCurrentQueryId(""); @@ -530,6 +543,9 @@ StorageMaterializedView::prepareRefresh(bool append, ContextMutablePtr refresh_c String db_name = db->getDatabaseName(); auto new_table_name = ".tmp" + generateInnerTableName(getStorageID()); + /// Pre-check the permissions. Would be awkward if we create a temporary table and can't drop it. + refresh_context->checkAccess(AccessType::DROP_TABLE | AccessType::CREATE_TABLE | AccessType::SELECT | AccessType::INSERT, db_name); + auto create_query = std::dynamic_pointer_cast(db->getCreateTableQuery(inner_table_id.table_name, getContext())); create_query->setTable(new_table_name); create_query->setDatabase(db_name); @@ -595,24 +611,30 @@ std::optional StorageMaterializedView::exchangeTargetTable(StorageID return exchange ? std::make_optional(fresh_table) : std::nullopt; } -void StorageMaterializedView::dropTempTable(StorageID table_id, ContextMutablePtr refresh_context) +void StorageMaterializedView::dropTempTable(StorageID table_id, ContextMutablePtr refresh_context, String & out_exception) { CurrentThread::QueryScope query_scope(refresh_context); + auto drop_query = std::make_shared(); + drop_query->setDatabase(table_id.database_name); + drop_query->setTable(table_id.table_name); + drop_query->kind = ASTDropQuery::Kind::Drop; + drop_query->if_exists = true; + drop_query->sync = false; + + Stopwatch stopwatch; try { - auto drop_query = std::make_shared(); - drop_query->setDatabase(table_id.database_name); - drop_query->setTable(table_id.table_name); - drop_query->kind = ASTDropQuery::Kind::Drop; - drop_query->if_exists = true; - drop_query->sync = false; - InterpreterDropQuery(drop_query, refresh_context).execute(); } catch (...) { - tryLogCurrentException(&Poco::Logger::get("StorageMaterializedView"), "Failed to drop temporary table after refresh"); + auto query_for_logging = drop_query->formatForLogging(refresh_context->getSettingsRef()[Setting::log_queries_cut_to_length]); + logExceptionBeforeStart(query_for_logging, refresh_context, drop_query, nullptr, stopwatch.elapsedMilliseconds()); + LOG_ERROR(getLogger("StorageMaterializedView"), + "{}: Failed to drop temporary table after refresh. Table {} is left behind and requires manual cleanup.", + getStorageID().getFullTableName(), table_id.getFullTableName()); + out_exception = getCurrentExceptionMessage(true); } } diff --git a/src/Storages/StorageMaterializedView.h b/src/Storages/StorageMaterializedView.h index e39642066b4..3689608dd65 100644 --- a/src/Storages/StorageMaterializedView.h +++ b/src/Storages/StorageMaterializedView.h @@ -120,7 +120,7 @@ private: void checkStatementCanBeForwarded() const; - ContextMutablePtr createRefreshContext() const; + ContextMutablePtr createRefreshContext(const String & log_comment) const; /// Prepare to refresh a refreshable materialized view: create temporary table (if needed) and /// form the insert-select query. /// out_temp_table_id may be assigned before throwing an exception, in which case the caller @@ -128,7 +128,7 @@ private: std::tuple, std::unique_ptr> prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional & out_temp_table_id) const; std::optional exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context) const; - void dropTempTable(StorageID table, ContextMutablePtr refresh_context); + void dropTempTable(StorageID table, ContextMutablePtr refresh_context, String & out_exception); void updateTargetTableId(std::optional database_name, std::optional table_name); }; diff --git a/src/Storages/System/StorageSystemViewRefreshes.cpp b/src/Storages/System/StorageSystemViewRefreshes.cpp index d70671dc39b..95840d48ab4 100644 --- a/src/Storages/System/StorageSystemViewRefreshes.cpp +++ b/src/Storages/System/StorageSystemViewRefreshes.cpp @@ -79,14 +79,16 @@ void StorageSystemViewRefreshes::fillData( res_columns[i++]->insert(std::chrono::duration_cast(refresh.next_refresh_time.time_since_epoch()).count()); - if (refresh.znode.last_attempt_succeeded || refresh.znode.last_attempt_time.time_since_epoch().count() == 0) - res_columns[i++]->insertDefault(); + if (refresh.unexpected_error.has_value()) + res_columns[i++]->insert(*refresh.unexpected_error); + else if (!refresh.znode.last_attempt_error.empty()) + res_columns[i++]->insert(refresh.znode.last_attempt_error); else if (refresh.refresh_running) res_columns[i++]->insert(refresh.znode.previous_attempt_error); - else if (refresh.znode.last_attempt_error.empty()) - res_columns[i++]->insert("Replica went away"); + else if (refresh.znode.last_attempt_succeeded || refresh.znode.last_attempt_time.time_since_epoch().count() == 0) + res_columns[i++]->insertDefault(); else - res_columns[i++]->insert(refresh.znode.last_attempt_error); + res_columns[i++]->insert("Replica went away"); Int64 retries = refresh.znode.attempt_number; if (refresh.refresh_running && retries) diff --git a/tests/queries/0_stateless/03258_refreshable_mv_misc.reference b/tests/queries/0_stateless/03258_refreshable_mv_misc.reference new file mode 100644 index 00000000000..1a0e0e4ffc6 --- /dev/null +++ b/tests/queries/0_stateless/03258_refreshable_mv_misc.reference @@ -0,0 +1,7 @@ +hi +ACCESS_DENIED +FUNCTION_THROW_IF_VALUE_IS_NON_ZERO +Ex**ptionBeforeStart 9 refreshable materialized view 1 0 +Ex**ptionWhileProcessing 9 refreshable materialized view 1 1 +QueryFinish 9 refreshable materialized view 0 1 +QueryStart 9 refreshable materialized view 0 1 diff --git a/tests/queries/0_stateless/03258_refreshable_mv_misc.sh b/tests/queries/0_stateless/03258_refreshable_mv_misc.sh new file mode 100755 index 00000000000..981364b5e01 --- /dev/null +++ b/tests/queries/0_stateless/03258_refreshable_mv_misc.sh @@ -0,0 +1,77 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +test_user="user_03258_$CLICKHOUSE_DATABASE" +second_db="${CLICKHOUSE_DATABASE}_03258" +$CLICKHOUSE_CLIENT -q " + create user $test_user; + create database $second_db; + create table a (x String) engine Memory; + insert into a values ('hi'); + grant create, insert, select on ${second_db}.* to $test_user; -- no drop yet + grant select on a to $test_user; + grant system views on ${second_db}.* to $test_user; +" + +# TODO: After https://github.com/ClickHouse/ClickHouse/pull/71336 is merged, remove +# "definer CURRENT_USER sql security definer" part from both queries. + +# Check that permissions are checked on creation. +$CLICKHOUSE_CLIENT --user $test_user -q " + create materialized view ${second_db}.v refresh every 2 second (x String) engine Memory definer CURRENT_USER sql security definer as select * from a; -- {serverError ACCESS_DENIED} +" +$CLICKHOUSE_CLIENT -q " + grant drop on ${second_db}.* to $test_user; +" +$CLICKHOUSE_CLIENT --user $test_user -q " + create materialized view ${second_db}.v refresh every 1 second (x String) engine Memory definer CURRENT_USER sql security definer as select * from a; + system wait view ${second_db}.v; + select * from ${second_db}.v; +" + +# Check that permissions are checked on refresh. +$CLICKHOUSE_CLIENT -q "revoke select on a from $test_user" +for _ in {1..10} +do + $CLICKHOUSE_CLIENT -q "system refresh view ${second_db}.v" + res=$($CLICKHOUSE_CLIENT -q "system wait view ${second_db}.v" 2>&1) + if [ "$?" != 0 ] + then + echo "$res" | grep -o -m1 ACCESS_DENIED || echo "expected ACCESS_DENIED error, got:" "$res" + break + fi + sleep 1 +done + +$CLICKHOUSE_CLIENT -q "alter table ${second_db}.v modify query select throwIf(1) as x" + +# Get an exception during query execution. +for _ in {1..10} +do + $CLICKHOUSE_CLIENT -q "system refresh view ${second_db}.v" + res=$($CLICKHOUSE_CLIENT -q "system wait view ${second_db}.v" 2>&1) + if [ "$?" == 0 ] + then + echo "unexpected success" + break + else + echo "$res" | grep -o -m1 FUNCTION_THROW_IF_VALUE_IS_NON_ZERO && break + fi + sleep 1 +done + +# Check that refreshes and both kinds of errors appear in query log. +# (Magic word to silence check-style warning: **current_database = currentDatabase()**. +# It's ok that we don't have this condition in the query, we're checking the db name in `tables` column instead.) +$CLICKHOUSE_CLIENT -q " + system flush logs; + select replaceAll(toString(type), 'Exception', 'Ex**ption'), interface, client_name, exception != '', has(tables, '${second_db}.v') from system.query_log where event_time > now() - interval 30 minute and log_comment = 'refresh of ${second_db}.v' group by all order by all; +" + +$CLICKHOUSE_CLIENT -q " + drop user $test_user; + drop database $second_db; +" diff --git a/utils/check-style/check-style b/utils/check-style/check-style index c3b42be1519..a184f7734f5 100755 --- a/utils/check-style/check-style +++ b/utils/check-style/check-style @@ -181,7 +181,7 @@ tests_with_query_log=( $( for test_case in "${tests_with_query_log[@]}"; do grep -qE current_database.*currentDatabase "$test_case" || { grep -qE 'current_database.*\$CLICKHOUSE_DATABASE' "$test_case" - } || echo "Queries to system.query_log/system.query_thread_log does not have current_database = currentDatabase() condition in $test_case" + } || echo "Query to system.query_log/system.query_thread_log does not have current_database = currentDatabase() condition in $test_case" done grep -iE 'SYSTEM STOP MERGES;?$' -R $ROOT_PATH/tests/queries && echo "Merges cannot be disabled globally in fast/stateful/stateless tests, because it will break concurrently running queries"