This commit is contained in:
Michael Kolupaev 2024-11-20 15:12:05 -08:00 committed by GitHub
commit 84a27dff9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 237 additions and 84 deletions

View File

@ -11,7 +11,7 @@ tests_with_query_log=( $(
for test_case in "${tests_with_query_log[@]}"; do
grep -qE current_database.*currentDatabase "$test_case" || {
grep -qE 'current_database.*\$CLICKHOUSE_DATABASE' "$test_case"
} || echo "Queries to system.query_log/system.query_thread_log does not have current_database = currentDatabase() condition in $test_case"
} || echo "Query to system.query_log/system.query_thread_log does not have current_database = currentDatabase() condition in $test_case"
done
grep -iE 'SYSTEM STOP MERGES;?$' -R $ROOT_PATH/tests/queries && echo "Merges cannot be disabled globally in fast/stateful/stateless tests, because it will break concurrently running queries"

View File

@ -56,7 +56,7 @@ SELECT * FROM view(column1=value1, column2=value2 ...)
``` sql
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster_name] [TO[db.]name] [ENGINE = engine] [POPULATE]
[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | INVOKER | NONE }]
[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | NONE }]
AS SELECT ...
[COMMENT 'comment']
```
@ -154,7 +154,7 @@ This feature is deprecated and will be removed in the future.
For your convenience, the old documentation is located [here](https://pastila.nl/?00f32652/fdf07272a7b54bda7e13b919264e449f.md)
## Refreshable Materialized View [Experimental] {#refreshable-materialized-view}
## Refreshable Materialized View {#refreshable-materialized-view}
```sql
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name
@ -164,6 +164,7 @@ DEPENDS ON [db.]name [, [db.]name [, ...]]
SETTINGS name = value [, name = value [, ...]]
[APPEND]
[TO[db.]name] [(columns)] [ENGINE = engine] [EMPTY]
[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | NONE }]
AS SELECT ...
[COMMENT 'comment']
```

View File

@ -919,7 +919,7 @@ try
query_for_logging,
key.query,
pipeline,
interpreter,
interpreter.get(),
internal,
query_database,
query_table,

View File

@ -256,6 +256,8 @@ String toString(ClientInfo::Interface interface)
return "TCP_INTERSERVER";
case ClientInfo::Interface::PROMETHEUS:
return "PROMETHEUS";
case ClientInfo::Interface::BACKGROUND:
return "BACKGROUND";
}
return std::format("Unknown server interface ({}).", static_cast<int>(interface));

View File

@ -39,6 +39,7 @@ public:
LOCAL = 6,
TCP_INTERSERVER = 7,
PROMETHEUS = 8,
BACKGROUND = 9, // e.g. queries from refreshable materialized views
};
enum class HTTPMethod : uint8_t

View File

@ -1302,7 +1302,7 @@ void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query)
RefreshTaskList InterpreterSystemQuery::getRefreshTasks()
{
auto ctx = getContext();
ctx->checkAccess(AccessType::SYSTEM_VIEWS);
ctx->checkAccess(AccessType::SYSTEM_VIEWS, table_id.database_name, table_id.table_name);
auto tasks = ctx->getRefreshSet().findTasks(table_id);
if (tasks.empty())
throw Exception(

View File

@ -107,8 +107,9 @@ ColumnsDescription SessionLogElement::getColumnsDescription()
{"Local", static_cast<Int8>(Interface::LOCAL)},
{"TCP_Interserver", static_cast<Int8>(Interface::TCP_INTERSERVER)},
{"Prometheus", static_cast<Int8>(Interface::PROMETHEUS)},
{"Background", static_cast<Int8>(Interface::BACKGROUND)},
});
static_assert(magic_enum::enum_count<Interface>() == 8);
static_assert(magic_enum::enum_count<Interface>() == 9, "Please update the array above to match the enum.");
auto lc_string_datatype = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());

View File

@ -389,7 +389,7 @@ QueryLogElement logQueryStart(
const String & query_for_logging,
const ASTPtr & query_ast,
const QueryPipeline & pipeline,
const std::unique_ptr<IInterpreter> & interpreter,
const IInterpreter * interpreter,
bool internal,
const String & query_database,
const String & query_table,
@ -1458,7 +1458,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
query_for_logging,
ast,
pipeline,
interpreter,
interpreter.get(),
internal,
query_database,
query_table,

View File

@ -81,7 +81,7 @@ QueryLogElement logQueryStart(
const String & query_for_logging,
const ASTPtr & query_ast,
const QueryPipeline & pipeline,
const std::unique_ptr<IInterpreter> & interpreter,
const IInterpreter * interpreter,
bool internal,
const String & query_database,
const String & query_table,

View File

@ -229,7 +229,7 @@ void RefreshTask::alterRefreshParams(const DB::ASTRefreshStrategy & new_strategy
RefreshTask::Info RefreshTask::getInfo() const
{
std::lock_guard guard(mutex);
return Info {.view_id = set_handle.getID(), .state = state, .next_refresh_time = next_refresh_time, .znode = coordination.root_znode, .refresh_running = coordination.running_znode_exists, .progress = execution.progress.getValues()};
return Info {.view_id = set_handle.getID(), .state = state, .next_refresh_time = next_refresh_time, .znode = coordination.root_znode, .refresh_running = coordination.running_znode_exists, .progress = execution.progress.getValues(), .unexpected_error = scheduling.unexpected_error};
}
void RefreshTask::start()
@ -237,6 +237,7 @@ void RefreshTask::start()
std::lock_guard guard(mutex);
if (!std::exchange(scheduling.stop_requested, false))
return;
scheduling.unexpected_error = std::nullopt;
refresh_task->schedule();
}
@ -391,10 +392,10 @@ void RefreshTask::refreshTask()
}
/// Check if it's time to refresh.
auto now = currentTime();
auto start_time = std::chrono::floor<std::chrono::seconds>(now);
auto start_time_steady = std::chrono::steady_clock::now();
auto [when, timeslot, start_znode] = determineNextRefreshTime(start_time);
auto start_time = currentTime();
auto start_time_seconds = std::chrono::floor<std::chrono::seconds>(start_time);
Stopwatch stopwatch;
auto [when, timeslot, start_znode] = determineNextRefreshTime(start_time_seconds);
next_refresh_time = when;
bool out_of_schedule = scheduling.out_of_schedule_refresh_requested;
if (out_of_schedule)
@ -402,9 +403,9 @@ void RefreshTask::refreshTask()
chassert(start_znode.attempt_number > 0);
start_znode.attempt_number -= 1;
}
else if (now < when)
else if (start_time < when)
{
size_t delay_ms = std::chrono::duration_cast<std::chrono::milliseconds>(when - now).count();
size_t delay_ms = std::chrono::duration_cast<std::chrono::milliseconds>(when - start_time).count();
/// If we're in a test that fakes the clock, poll every 100ms.
if (scheduling.fake_clock.load(std::memory_order_relaxed) != INT64_MIN)
delay_ms = 100;
@ -443,53 +444,35 @@ void RefreshTask::refreshTask()
int32_t root_znode_version = coordination.coordinated ? coordination.root_znode.version : -1;
CurrentMetrics::Increment metric_inc(CurrentMetrics::RefreshingViews);
String log_comment = fmt::format("refresh of {}", view->getStorageID().getFullTableName());
if (start_znode.attempt_number > 1)
log_comment += fmt::format(" (attempt {}/{})", start_znode.attempt_number, refresh_settings[RefreshSetting::refresh_retries] + 1);
lock.unlock();
bool refreshed = false;
String error_message;
UUID new_table_uuid;
try
{
new_table_uuid = executeRefreshUnlocked(append, root_znode_version);
refreshed = true;
}
catch (...)
{
if (execution.interrupt_execution.load())
{
error_message = "cancelled";
LOG_INFO(log, "{}: Refresh cancelled", view->getStorageID().getFullTableName());
}
else
{
error_message = getCurrentExceptionMessage(true);
LOG_ERROR(log, "{}: Refresh failed (attempt {}/{}): {}", view->getStorageID().getFullTableName(), start_znode.attempt_number, refresh_settings[RefreshSetting::refresh_retries] + 1, error_message);
}
}
auto new_table_uuid = executeRefreshUnlocked(append, root_znode_version, start_time, stopwatch, log_comment, error_message);
bool refreshed = new_table_uuid.has_value();
lock.lock();
setState(RefreshState::Scheduling, lock);
auto end_time = std::chrono::floor<std::chrono::seconds>(currentTime());
auto end_time_seconds = std::chrono::floor<std::chrono::seconds>(currentTime());
auto znode = coordination.root_znode;
znode.last_attempt_time = end_time;
znode.last_attempt_time = end_time_seconds;
znode.last_attempt_error = error_message;
if (refreshed)
{
znode.last_attempt_succeeded = true;
znode.last_completed_timeslot = refresh_schedule.timeslotForCompletedRefresh(znode.last_completed_timeslot, start_time, end_time, out_of_schedule);
znode.last_success_time = start_time;
znode.last_success_duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time_steady);
znode.last_success_table_uuid = new_table_uuid;
znode.last_completed_timeslot = refresh_schedule.timeslotForCompletedRefresh(znode.last_completed_timeslot, start_time_seconds, end_time_seconds, out_of_schedule);
znode.last_success_time = start_time_seconds;
znode.last_success_duration = std::chrono::milliseconds(stopwatch.elapsedMilliseconds());
znode.last_success_table_uuid = *new_table_uuid;
znode.previous_attempt_error = "";
znode.attempt_number = 0;
znode.randomize();
}
else
{
znode.last_attempt_error = error_message;
}
bool ok = updateCoordinationState(znode, false, zookeeper, lock);
chassert(ok);
@ -517,8 +500,10 @@ void RefreshTask::refreshTask()
if (!lock.owns_lock())
lock.lock();
scheduling.stop_requested = true;
scheduling.unexpected_error = getCurrentExceptionMessage(true);
coordination.watches->should_reread_znodes.store(true);
coordination.running_znode_exists = false;
refresh_task->schedule();
lock.unlock();
tryLogCurrentException(log,
@ -532,34 +517,43 @@ void RefreshTask::refreshTask()
}
}
UUID RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version)
std::optional<UUID> RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version, std::chrono::system_clock::time_point start_time, const Stopwatch & stopwatch, const String & log_comment, String & out_error_message)
{
LOG_DEBUG(log, "Refreshing view {}", view->getStorageID().getFullTableName());
StorageID view_storage_id = view->getStorageID();
LOG_DEBUG(log, "Refreshing view {}", view_storage_id.getFullTableName());
execution.progress.reset();
ContextMutablePtr refresh_context = view->createRefreshContext();
ContextMutablePtr refresh_context = view->createRefreshContext(log_comment);
if (!append)
{
refresh_context->setParentTable(view->getStorageID().uuid);
refresh_context->setParentTable(view_storage_id.uuid);
refresh_context->setDDLQueryCancellation(execution.cancel_ddl_queries.get_token());
if (root_znode_version != -1)
refresh_context->setDDLAdditionalChecksOnEnqueue({zkutil::makeCheckRequest(coordination.path, root_znode_version)});
}
std::optional<QueryLogElement> query_log_elem;
std::shared_ptr<ASTInsertQuery> refresh_query;
String query_for_logging;
std::shared_ptr<OpenTelemetry::SpanHolder> query_span = std::make_shared<OpenTelemetry::SpanHolder>("query");
ProcessList::EntryPtr process_list_entry;
std::optional<StorageID> table_to_drop;
auto new_table_id = StorageID::createEmpty();
try
{
{
/// Create a table.
auto [refresh_query, query_scope] = view->prepareRefresh(append, refresh_context, table_to_drop);
query_for_logging = "(create target table)";
std::unique_ptr<CurrentThread::QueryScope> query_scope;
std::tie(refresh_query, query_scope) = view->prepareRefresh(append, refresh_context, table_to_drop);
new_table_id = refresh_query->table_id;
/// Add the query to system.processes and allow it to be killed with KILL QUERY.
String query_for_logging = refresh_query->formatForLogging(
query_for_logging = refresh_query->formatForLogging(
refresh_context->getSettingsRef()[Setting::log_queries_cut_to_length]);
auto process_list_entry = refresh_context->getProcessList().insert(
process_list_entry = refresh_context->getProcessList().insert(
query_for_logging, refresh_query.get(), refresh_context, Stopwatch{CLOCK_MONOTONIC}.getStart());
refresh_context->setProcessListElement(process_list_entry->getQueryStatus());
refresh_context->setProgressCallback([this](const Progress & prog)
@ -569,15 +563,23 @@ UUID RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version
/// Run the query.
BlockIO block_io = InterpreterInsertQuery(
InterpreterInsertQuery interpreter(
refresh_query,
refresh_context,
/* allow_materialized */ false,
/* no_squash */ false,
/* no_destination */ false,
/* async_isnert */ false).execute();
/* async_isnert */ false);
BlockIO block_io = interpreter.execute();
QueryPipeline & pipeline = block_io.pipeline;
/// We log the refresh as one INSERT SELECT query, but the timespan and exceptions also
/// cover the surrounding CREATE, EXCHANGE, and DROP queries.
query_log_elem = logQueryStart(
start_time, refresh_context, query_for_logging, refresh_query, pipeline,
&interpreter, /*internal*/ false, view_storage_id.database_name,
view_storage_id.table_name, /*async_insert*/ false);
if (!pipeline.completed())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for view refresh must be completed");
@ -605,21 +607,51 @@ UUID RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version
/// (specifically, the assert in ~WriteBuffer()).
if (execution.interrupt_execution.load())
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Refresh cancelled");
logQueryFinish(*query_log_elem, refresh_context, refresh_query, pipeline, /*pulling_pipeline*/ false, query_span, QueryCache::Usage::None, /*internal*/ false);
query_log_elem = std::nullopt;
query_span = nullptr;
process_list_entry.reset(); // otherwise it needs to be alive for logQueryException
}
/// Exchange tables.
if (!append)
{
query_for_logging = "(exchange tables)";
table_to_drop = view->exchangeTargetTable(new_table_id, refresh_context);
}
}
catch (...)
{
bool cancelled = execution.interrupt_execution.load();
if (table_to_drop.has_value())
view->dropTempTable(table_to_drop.value(), refresh_context);
throw;
{
String discard_error_message;
view->dropTempTable(table_to_drop.value(), refresh_context, discard_error_message);
}
if (query_log_elem.has_value())
{
logQueryException(*query_log_elem, refresh_context, stopwatch, refresh_query, query_span, /*internal*/ false, /*log_error*/ !cancelled);
}
else
{
/// Failed when creating new table or when swapping tables.
logExceptionBeforeStart(query_for_logging, refresh_context, /*ast*/ nullptr, query_span,
stopwatch.elapsedMilliseconds());
}
if (cancelled)
out_error_message = "cancelled";
else
out_error_message = getCurrentExceptionMessage(true);
return std::nullopt;
}
if (table_to_drop.has_value())
view->dropTempTable(table_to_drop.value(), refresh_context);
view->dropTempTable(table_to_drop.value(), refresh_context, out_error_message);
return new_table_id.uuid;
}

View File

@ -151,6 +151,7 @@ public:
CoordinationZnode znode;
bool refresh_running;
ProgressValues progress;
std::optional<String> unexpected_error; // refreshing is stopped because of unexpected error
};
private:
@ -203,6 +204,8 @@ private:
{
/// Refreshes are stopped, e.g. by SYSTEM STOP VIEW.
bool stop_requested = false;
/// Refreshes are stopped because we got an unexpected error. Can be resumed with SYSTEM START VIEW.
std::optional<String> unexpected_error;
/// An out-of-schedule refresh was requested, e.g. by SYSTEM REFRESH VIEW.
bool out_of_schedule_refresh_requested = false;
@ -257,8 +260,8 @@ private:
void refreshTask();
/// Perform an actual refresh: create new table, run INSERT SELECT, exchange tables, drop old table.
/// Mutex must be unlocked. Called only from refresh_task.
UUID executeRefreshUnlocked(bool append, int32_t root_znode_version);
/// Mutex must be unlocked. Called only from refresh_task. Doesn't throw.
std::optional<UUID> executeRefreshUnlocked(bool append, int32_t root_znode_version, std::chrono::system_clock::time_point start_time, const Stopwatch & stopwatch, const String & log_comment, String & out_error_message);
/// Assigns dependencies_satisfied_until.
void updateDependenciesIfNeeded(std::unique_lock<std::mutex> & lock);

View File

@ -117,7 +117,7 @@ UUID StorageInMemoryMetadata::getDefinerID(DB::ContextPtr context) const
return access_control.getID<User>(*definer);
}
ContextMutablePtr StorageInMemoryMetadata::getSQLSecurityOverriddenContext(ContextPtr context) const
ContextMutablePtr StorageInMemoryMetadata::getSQLSecurityOverriddenContext(ContextPtr context, const ClientInfo * client_info) const
{
if (!sql_security_type)
return Context::createCopy(context);
@ -126,7 +126,10 @@ ContextMutablePtr StorageInMemoryMetadata::getSQLSecurityOverriddenContext(Conte
return Context::createCopy(context);
auto new_context = Context::createCopy(context->getGlobalContext());
new_context->setClientInfo(context->getClientInfo());
if (client_info)
new_context->setClientInfo(*client_info);
else
new_context->setClientInfo(context->getClientInfo());
new_context->makeQueryContext();
const auto & database = context->getCurrentDatabase();

View File

@ -18,6 +18,8 @@
namespace DB
{
class ClientInfo;
/// Common metadata for all storages. Contains all possible parts of CREATE
/// query from all storages, but only some subset used.
struct StorageInMemoryMetadata
@ -122,7 +124,7 @@ struct StorageInMemoryMetadata
/// Returns a copy of the context with the correct user from SQL security options.
/// If the SQL security wasn't set, this is equivalent to `Context::createCopy(context)`.
/// The context from this function must be used every time whenever views execute any read/write operations or subqueries.
ContextMutablePtr getSQLSecurityOverriddenContext(ContextPtr context) const;
ContextMutablePtr getSQLSecurityOverriddenContext(ContextPtr context, const ClientInfo * client_info = nullptr) const;
/// Returns combined set of columns
const ColumnsDescription & getColumns() const;

View File

@ -46,6 +46,7 @@ namespace Setting
{
extern const SettingsBool allow_experimental_analyzer;
extern const SettingsSeconds lock_acquire_timeout;
extern const SettingsUInt64 log_queries_cut_to_length;
}
namespace ServerSetting
@ -191,9 +192,9 @@ StorageMaterializedView::StorageMaterializedView(
}
}
/// Sanity-check the table engine.
if (mode < LoadingStrictnessLevel::ATTACH && !fixed_uuid)
{
/// Sanity-check the table engine.
String inner_engine;
if (has_inner_table)
{
@ -221,6 +222,12 @@ StorageMaterializedView::StorageMaterializedView(
/// each refresh would append to a table on one arbitrarily chosen replica. But in principle it can be useful,
/// e.g. if SELECTs are done using clusterAllReplicas(). (For the two disallowed cases above, clusterAllReplicas() wouldn't work reliably.)
}
/// Sanity-check permissions. This is just for usability, the main checks are done by the
/// actual CREATE/INSERT/SELECT/EXCHANGE/DROP interpreters during refresh.
String inner_db_name = has_inner_table ? table_id_.database_name : to_table_id.database_name;
auto refresh_context = storage_metadata.getSQLSecurityOverriddenContext(getContext());
refresh_context->checkAccess(AccessType::DROP_TABLE | AccessType::CREATE_TABLE | AccessType::SELECT | AccessType::INSERT, inner_db_name);
}
refresher = RefreshTask::create(this, getContext(), *query.refresh_strategy, mode >= LoadingStrictnessLevel::ATTACH, refresh_coordinated, query.is_create_empty);
@ -503,10 +510,16 @@ bool StorageMaterializedView::optimize(
return storage_ptr->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, local_context);
}
ContextMutablePtr StorageMaterializedView::createRefreshContext() const
ContextMutablePtr StorageMaterializedView::createRefreshContext(const String & log_comment) const
{
auto refresh_context = getInMemoryMetadataPtr()->getSQLSecurityOverriddenContext(getContext());
ContextPtr table_context = getContext();
ClientInfo client_info = table_context->getClientInfo();
client_info.interface = ClientInfo::Interface::BACKGROUND;
client_info.client_name = "refreshable materialized view";
auto refresh_context = getInMemoryMetadataPtr()->getSQLSecurityOverriddenContext(table_context, &client_info);
refresh_context->setClientInfo(client_info);
refresh_context->setSetting("database_replicated_allow_replicated_engine_arguments", 3);
refresh_context->setSetting("log_comment", log_comment);
refresh_context->setQueryKind(ClientInfo::QueryKind::INITIAL_QUERY);
/// Generate a random query id.
refresh_context->setCurrentQueryId("");
@ -530,6 +543,9 @@ StorageMaterializedView::prepareRefresh(bool append, ContextMutablePtr refresh_c
String db_name = db->getDatabaseName();
auto new_table_name = ".tmp" + generateInnerTableName(getStorageID());
/// Pre-check the permissions. Would be awkward if we create a temporary table and can't drop it.
refresh_context->checkAccess(AccessType::DROP_TABLE | AccessType::CREATE_TABLE | AccessType::SELECT | AccessType::INSERT, db_name);
auto create_query = std::dynamic_pointer_cast<ASTCreateQuery>(db->getCreateTableQuery(inner_table_id.table_name, getContext()));
create_query->setTable(new_table_name);
create_query->setDatabase(db_name);
@ -595,24 +611,30 @@ std::optional<StorageID> StorageMaterializedView::exchangeTargetTable(StorageID
return exchange ? std::make_optional(fresh_table) : std::nullopt;
}
void StorageMaterializedView::dropTempTable(StorageID table_id, ContextMutablePtr refresh_context)
void StorageMaterializedView::dropTempTable(StorageID table_id, ContextMutablePtr refresh_context, String & out_exception)
{
CurrentThread::QueryScope query_scope(refresh_context);
auto drop_query = std::make_shared<ASTDropQuery>();
drop_query->setDatabase(table_id.database_name);
drop_query->setTable(table_id.table_name);
drop_query->kind = ASTDropQuery::Kind::Drop;
drop_query->if_exists = true;
drop_query->sync = false;
Stopwatch stopwatch;
try
{
auto drop_query = std::make_shared<ASTDropQuery>();
drop_query->setDatabase(table_id.database_name);
drop_query->setTable(table_id.table_name);
drop_query->kind = ASTDropQuery::Kind::Drop;
drop_query->if_exists = true;
drop_query->sync = false;
InterpreterDropQuery(drop_query, refresh_context).execute();
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::get("StorageMaterializedView"), "Failed to drop temporary table after refresh");
auto query_for_logging = drop_query->formatForLogging(refresh_context->getSettingsRef()[Setting::log_queries_cut_to_length]);
logExceptionBeforeStart(query_for_logging, refresh_context, drop_query, nullptr, stopwatch.elapsedMilliseconds());
LOG_ERROR(getLogger("StorageMaterializedView"),
"{}: Failed to drop temporary table after refresh. Table {} is left behind and requires manual cleanup.",
getStorageID().getFullTableName(), table_id.getFullTableName());
out_exception = getCurrentExceptionMessage(true);
}
}

View File

@ -120,7 +120,7 @@ private:
void checkStatementCanBeForwarded() const;
ContextMutablePtr createRefreshContext() const;
ContextMutablePtr createRefreshContext(const String & log_comment) const;
/// Prepare to refresh a refreshable materialized view: create temporary table (if needed) and
/// form the insert-select query.
/// out_temp_table_id may be assigned before throwing an exception, in which case the caller
@ -128,7 +128,7 @@ private:
std::tuple<std::shared_ptr<ASTInsertQuery>, std::unique_ptr<CurrentThread::QueryScope>>
prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional<StorageID> & out_temp_table_id) const;
std::optional<StorageID> exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context) const;
void dropTempTable(StorageID table, ContextMutablePtr refresh_context);
void dropTempTable(StorageID table, ContextMutablePtr refresh_context, String & out_exception);
void updateTargetTableId(std::optional<String> database_name, std::optional<String> table_name);
};

View File

@ -79,14 +79,16 @@ void StorageSystemViewRefreshes::fillData(
res_columns[i++]->insert(std::chrono::duration_cast<std::chrono::seconds>(refresh.next_refresh_time.time_since_epoch()).count());
if (refresh.znode.last_attempt_succeeded || refresh.znode.last_attempt_time.time_since_epoch().count() == 0)
res_columns[i++]->insertDefault();
if (refresh.unexpected_error.has_value())
res_columns[i++]->insert(*refresh.unexpected_error);
else if (!refresh.znode.last_attempt_error.empty())
res_columns[i++]->insert(refresh.znode.last_attempt_error);
else if (refresh.refresh_running)
res_columns[i++]->insert(refresh.znode.previous_attempt_error);
else if (refresh.znode.last_attempt_error.empty())
res_columns[i++]->insert("Replica went away");
else if (refresh.znode.last_attempt_succeeded || refresh.znode.last_attempt_time.time_since_epoch().count() == 0)
res_columns[i++]->insertDefault();
else
res_columns[i++]->insert(refresh.znode.last_attempt_error);
res_columns[i++]->insert("Replica went away");
Int64 retries = refresh.znode.attempt_number;
if (refresh.refresh_running && retries)

View File

@ -0,0 +1,7 @@
hi
ACCESS_DENIED
FUNCTION_THROW_IF_VALUE_IS_NON_ZERO
Ex**ptionBeforeStart 9 refreshable materialized view 1 0
Ex**ptionWhileProcessing 9 refreshable materialized view 1 1
QueryFinish 9 refreshable materialized view 0 1
QueryStart 9 refreshable materialized view 0 1

View File

@ -0,0 +1,77 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
test_user="user_03258_$CLICKHOUSE_DATABASE"
second_db="${CLICKHOUSE_DATABASE}_03258"
$CLICKHOUSE_CLIENT -q "
create user $test_user;
create database $second_db;
create table a (x String) engine Memory;
insert into a values ('hi');
grant create, insert, select on ${second_db}.* to $test_user; -- no drop yet
grant select on a to $test_user;
grant system views on ${second_db}.* to $test_user;
"
# TODO: After https://github.com/ClickHouse/ClickHouse/pull/71336 is merged, remove
# "definer CURRENT_USER sql security definer" part from both queries.
# Check that permissions are checked on creation.
$CLICKHOUSE_CLIENT --user $test_user -q "
create materialized view ${second_db}.v refresh every 2 second (x String) engine Memory definer CURRENT_USER sql security definer as select * from a; -- {serverError ACCESS_DENIED}
"
$CLICKHOUSE_CLIENT -q "
grant drop on ${second_db}.* to $test_user;
"
$CLICKHOUSE_CLIENT --user $test_user -q "
create materialized view ${second_db}.v refresh every 1 second (x String) engine Memory definer CURRENT_USER sql security definer as select * from a;
system wait view ${second_db}.v;
select * from ${second_db}.v;
"
# Check that permissions are checked on refresh.
$CLICKHOUSE_CLIENT -q "revoke select on a from $test_user"
for _ in {1..10}
do
$CLICKHOUSE_CLIENT -q "system refresh view ${second_db}.v"
res=$($CLICKHOUSE_CLIENT -q "system wait view ${second_db}.v" 2>&1)
if [ "$?" != 0 ]
then
echo "$res" | grep -o -m1 ACCESS_DENIED || echo "expected ACCESS_DENIED error, got:" "$res"
break
fi
sleep 1
done
$CLICKHOUSE_CLIENT -q "alter table ${second_db}.v modify query select throwIf(1) as x"
# Get an exception during query execution.
for _ in {1..10}
do
$CLICKHOUSE_CLIENT -q "system refresh view ${second_db}.v"
res=$($CLICKHOUSE_CLIENT -q "system wait view ${second_db}.v" 2>&1)
if [ "$?" == 0 ]
then
echo "unexpected success"
break
else
echo "$res" | grep -o -m1 FUNCTION_THROW_IF_VALUE_IS_NON_ZERO && break
fi
sleep 1
done
# Check that refreshes and both kinds of errors appear in query log.
# (Magic word to silence check-style warning: **current_database = currentDatabase()**.
# It's ok that we don't have this condition in the query, we're checking the db name in `tables` column instead.)
$CLICKHOUSE_CLIENT -q "
system flush logs;
select replaceAll(toString(type), 'Exception', 'Ex**ption'), interface, client_name, exception != '', has(tables, '${second_db}.v') from system.query_log where event_time > now() - interval 30 minute and log_comment = 'refresh of ${second_db}.v' group by all order by all;
"
$CLICKHOUSE_CLIENT -q "
drop user $test_user;
drop database $second_db;
"

View File

@ -181,7 +181,7 @@ tests_with_query_log=( $(
for test_case in "${tests_with_query_log[@]}"; do
grep -qE current_database.*currentDatabase "$test_case" || {
grep -qE 'current_database.*\$CLICKHOUSE_DATABASE' "$test_case"
} || echo "Queries to system.query_log/system.query_thread_log does not have current_database = currentDatabase() condition in $test_case"
} || echo "Query to system.query_log/system.query_thread_log does not have current_database = currentDatabase() condition in $test_case"
done
grep -iE 'SYSTEM STOP MERGES;?$' -R $ROOT_PATH/tests/queries && echo "Merges cannot be disabled globally in fast/stateful/stateless tests, because it will break concurrently running queries"