mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
Write view refreshes to query_log, pre-check permissions
This commit is contained in:
parent
3fb367e038
commit
6e292cef71
@ -56,7 +56,7 @@ SELECT * FROM view(column1=value1, column2=value2 ...)
|
|||||||
|
|
||||||
``` sql
|
``` sql
|
||||||
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER] [TO[db.]name] [ENGINE = engine] [POPULATE]
|
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER] [TO[db.]name] [ENGINE = engine] [POPULATE]
|
||||||
[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | INVOKER | NONE }]
|
[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | NONE }]
|
||||||
AS SELECT ...
|
AS SELECT ...
|
||||||
[COMMENT 'comment']
|
[COMMENT 'comment']
|
||||||
```
|
```
|
||||||
@ -154,7 +154,7 @@ This feature is deprecated and will be removed in the future.
|
|||||||
|
|
||||||
For your convenience, the old documentation is located [here](https://pastila.nl/?00f32652/fdf07272a7b54bda7e13b919264e449f.md)
|
For your convenience, the old documentation is located [here](https://pastila.nl/?00f32652/fdf07272a7b54bda7e13b919264e449f.md)
|
||||||
|
|
||||||
## Refreshable Materialized View [Experimental] {#refreshable-materialized-view}
|
## Refreshable Materialized View {#refreshable-materialized-view}
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name
|
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name
|
||||||
@ -164,6 +164,7 @@ DEPENDS ON [db.]name [, [db.]name [, ...]]
|
|||||||
SETTINGS name = value [, name = value [, ...]]
|
SETTINGS name = value [, name = value [, ...]]
|
||||||
[APPEND]
|
[APPEND]
|
||||||
[TO[db.]name] [(columns)] [ENGINE = engine] [EMPTY]
|
[TO[db.]name] [(columns)] [ENGINE = engine] [EMPTY]
|
||||||
|
[DEFINER = { user | CURRENT_USER }] [SQL SECURITY { DEFINER | NONE }]
|
||||||
AS SELECT ...
|
AS SELECT ...
|
||||||
[COMMENT 'comment']
|
[COMMENT 'comment']
|
||||||
```
|
```
|
||||||
|
@ -921,7 +921,7 @@ try
|
|||||||
query_for_logging,
|
query_for_logging,
|
||||||
key.query,
|
key.query,
|
||||||
pipeline,
|
pipeline,
|
||||||
interpreter,
|
interpreter.get(),
|
||||||
internal,
|
internal,
|
||||||
query_database,
|
query_database,
|
||||||
query_table,
|
query_table,
|
||||||
|
@ -256,6 +256,8 @@ String toString(ClientInfo::Interface interface)
|
|||||||
return "TCP_INTERSERVER";
|
return "TCP_INTERSERVER";
|
||||||
case ClientInfo::Interface::PROMETHEUS:
|
case ClientInfo::Interface::PROMETHEUS:
|
||||||
return "PROMETHEUS";
|
return "PROMETHEUS";
|
||||||
|
case ClientInfo::Interface::BACKGROUND:
|
||||||
|
return "BACKGROUND";
|
||||||
}
|
}
|
||||||
|
|
||||||
return std::format("Unknown server interface ({}).", static_cast<int>(interface));
|
return std::format("Unknown server interface ({}).", static_cast<int>(interface));
|
||||||
|
@ -39,6 +39,7 @@ public:
|
|||||||
LOCAL = 6,
|
LOCAL = 6,
|
||||||
TCP_INTERSERVER = 7,
|
TCP_INTERSERVER = 7,
|
||||||
PROMETHEUS = 8,
|
PROMETHEUS = 8,
|
||||||
|
BACKGROUND = 9, // e.g. queries from refreshable materialized views
|
||||||
};
|
};
|
||||||
|
|
||||||
enum class HTTPMethod : uint8_t
|
enum class HTTPMethod : uint8_t
|
||||||
|
@ -1299,7 +1299,7 @@ void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query)
|
|||||||
RefreshTaskList InterpreterSystemQuery::getRefreshTasks()
|
RefreshTaskList InterpreterSystemQuery::getRefreshTasks()
|
||||||
{
|
{
|
||||||
auto ctx = getContext();
|
auto ctx = getContext();
|
||||||
ctx->checkAccess(AccessType::SYSTEM_VIEWS);
|
ctx->checkAccess(AccessType::SYSTEM_VIEWS, table_id.database_name, table_id.table_name);
|
||||||
auto tasks = ctx->getRefreshSet().findTasks(table_id);
|
auto tasks = ctx->getRefreshSet().findTasks(table_id);
|
||||||
if (tasks.empty())
|
if (tasks.empty())
|
||||||
throw Exception(
|
throw Exception(
|
||||||
|
@ -107,8 +107,9 @@ ColumnsDescription SessionLogElement::getColumnsDescription()
|
|||||||
{"Local", static_cast<Int8>(Interface::LOCAL)},
|
{"Local", static_cast<Int8>(Interface::LOCAL)},
|
||||||
{"TCP_Interserver", static_cast<Int8>(Interface::TCP_INTERSERVER)},
|
{"TCP_Interserver", static_cast<Int8>(Interface::TCP_INTERSERVER)},
|
||||||
{"Prometheus", static_cast<Int8>(Interface::PROMETHEUS)},
|
{"Prometheus", static_cast<Int8>(Interface::PROMETHEUS)},
|
||||||
|
{"Background", static_cast<Int8>(Interface::BACKGROUND)},
|
||||||
});
|
});
|
||||||
static_assert(magic_enum::enum_count<Interface>() == 8);
|
static_assert(magic_enum::enum_count<Interface>() == 9, "Please update the array above to match the enum.");
|
||||||
|
|
||||||
auto lc_string_datatype = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
|
auto lc_string_datatype = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
|
||||||
|
|
||||||
|
@ -386,7 +386,7 @@ QueryLogElement logQueryStart(
|
|||||||
const String & query_for_logging,
|
const String & query_for_logging,
|
||||||
const ASTPtr & query_ast,
|
const ASTPtr & query_ast,
|
||||||
const QueryPipeline & pipeline,
|
const QueryPipeline & pipeline,
|
||||||
const std::unique_ptr<IInterpreter> & interpreter,
|
const IInterpreter * interpreter,
|
||||||
bool internal,
|
bool internal,
|
||||||
const String & query_database,
|
const String & query_database,
|
||||||
const String & query_table,
|
const String & query_table,
|
||||||
@ -1489,7 +1489,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
|||||||
query_for_logging,
|
query_for_logging,
|
||||||
ast,
|
ast,
|
||||||
pipeline,
|
pipeline,
|
||||||
interpreter,
|
interpreter.get(),
|
||||||
internal,
|
internal,
|
||||||
query_database,
|
query_database,
|
||||||
query_table,
|
query_table,
|
||||||
|
@ -81,7 +81,7 @@ QueryLogElement logQueryStart(
|
|||||||
const String & query_for_logging,
|
const String & query_for_logging,
|
||||||
const ASTPtr & query_ast,
|
const ASTPtr & query_ast,
|
||||||
const QueryPipeline & pipeline,
|
const QueryPipeline & pipeline,
|
||||||
const std::unique_ptr<IInterpreter> & interpreter,
|
const IInterpreter * interpreter,
|
||||||
bool internal,
|
bool internal,
|
||||||
const String & query_database,
|
const String & query_database,
|
||||||
const String & query_table,
|
const String & query_table,
|
||||||
|
@ -229,7 +229,7 @@ void RefreshTask::alterRefreshParams(const DB::ASTRefreshStrategy & new_strategy
|
|||||||
RefreshTask::Info RefreshTask::getInfo() const
|
RefreshTask::Info RefreshTask::getInfo() const
|
||||||
{
|
{
|
||||||
std::lock_guard guard(mutex);
|
std::lock_guard guard(mutex);
|
||||||
return Info {.view_id = set_handle.getID(), .state = state, .next_refresh_time = next_refresh_time, .znode = coordination.root_znode, .refresh_running = coordination.running_znode_exists, .progress = execution.progress.getValues()};
|
return Info {.view_id = set_handle.getID(), .state = state, .next_refresh_time = next_refresh_time, .znode = coordination.root_znode, .refresh_running = coordination.running_znode_exists, .progress = execution.progress.getValues(), .unexpected_error = scheduling.unexpected_error};
|
||||||
}
|
}
|
||||||
|
|
||||||
void RefreshTask::start()
|
void RefreshTask::start()
|
||||||
@ -237,6 +237,7 @@ void RefreshTask::start()
|
|||||||
std::lock_guard guard(mutex);
|
std::lock_guard guard(mutex);
|
||||||
if (!std::exchange(scheduling.stop_requested, false))
|
if (!std::exchange(scheduling.stop_requested, false))
|
||||||
return;
|
return;
|
||||||
|
scheduling.unexpected_error = std::nullopt;
|
||||||
refresh_task->schedule();
|
refresh_task->schedule();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -391,10 +392,10 @@ void RefreshTask::refreshTask()
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Check if it's time to refresh.
|
/// Check if it's time to refresh.
|
||||||
auto now = currentTime();
|
auto start_time = currentTime();
|
||||||
auto start_time = std::chrono::floor<std::chrono::seconds>(now);
|
auto start_time_seconds = std::chrono::floor<std::chrono::seconds>(start_time);
|
||||||
auto start_time_steady = std::chrono::steady_clock::now();
|
Stopwatch stopwatch;
|
||||||
auto [when, timeslot, start_znode] = determineNextRefreshTime(start_time);
|
auto [when, timeslot, start_znode] = determineNextRefreshTime(start_time_seconds);
|
||||||
next_refresh_time = when;
|
next_refresh_time = when;
|
||||||
bool out_of_schedule = scheduling.out_of_schedule_refresh_requested;
|
bool out_of_schedule = scheduling.out_of_schedule_refresh_requested;
|
||||||
if (out_of_schedule)
|
if (out_of_schedule)
|
||||||
@ -402,9 +403,9 @@ void RefreshTask::refreshTask()
|
|||||||
chassert(start_znode.attempt_number > 0);
|
chassert(start_znode.attempt_number > 0);
|
||||||
start_znode.attempt_number -= 1;
|
start_znode.attempt_number -= 1;
|
||||||
}
|
}
|
||||||
else if (now < when)
|
else if (start_time < when)
|
||||||
{
|
{
|
||||||
size_t delay_ms = std::chrono::duration_cast<std::chrono::milliseconds>(when - now).count();
|
size_t delay_ms = std::chrono::duration_cast<std::chrono::milliseconds>(when - start_time).count();
|
||||||
/// If we're in a test that fakes the clock, poll every 100ms.
|
/// If we're in a test that fakes the clock, poll every 100ms.
|
||||||
if (scheduling.fake_clock.load(std::memory_order_relaxed) != INT64_MIN)
|
if (scheduling.fake_clock.load(std::memory_order_relaxed) != INT64_MIN)
|
||||||
delay_ms = 100;
|
delay_ms = 100;
|
||||||
@ -443,53 +444,35 @@ void RefreshTask::refreshTask()
|
|||||||
int32_t root_znode_version = coordination.coordinated ? coordination.root_znode.version : -1;
|
int32_t root_znode_version = coordination.coordinated ? coordination.root_znode.version : -1;
|
||||||
CurrentMetrics::Increment metric_inc(CurrentMetrics::RefreshingViews);
|
CurrentMetrics::Increment metric_inc(CurrentMetrics::RefreshingViews);
|
||||||
|
|
||||||
|
String log_comment = fmt::format("refresh of {}", view->getStorageID().getFullTableName());
|
||||||
|
if (start_znode.attempt_number > 1)
|
||||||
|
log_comment += fmt::format(" (attempt {}/{})", start_znode.attempt_number, refresh_settings[RefreshSetting::refresh_retries] + 1);
|
||||||
|
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
||||||
bool refreshed = false;
|
|
||||||
String error_message;
|
String error_message;
|
||||||
UUID new_table_uuid;
|
auto new_table_uuid = executeRefreshUnlocked(append, root_znode_version, start_time, stopwatch, log_comment, error_message);
|
||||||
|
bool refreshed = new_table_uuid.has_value();
|
||||||
try
|
|
||||||
{
|
|
||||||
new_table_uuid = executeRefreshUnlocked(append, root_znode_version);
|
|
||||||
refreshed = true;
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
if (execution.interrupt_execution.load())
|
|
||||||
{
|
|
||||||
error_message = "cancelled";
|
|
||||||
LOG_INFO(log, "{}: Refresh cancelled", view->getStorageID().getFullTableName());
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
error_message = getCurrentExceptionMessage(true);
|
|
||||||
LOG_ERROR(log, "{}: Refresh failed (attempt {}/{}): {}", view->getStorageID().getFullTableName(), start_znode.attempt_number, refresh_settings[RefreshSetting::refresh_retries] + 1, error_message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
lock.lock();
|
lock.lock();
|
||||||
|
|
||||||
setState(RefreshState::Scheduling, lock);
|
setState(RefreshState::Scheduling, lock);
|
||||||
|
|
||||||
auto end_time = std::chrono::floor<std::chrono::seconds>(currentTime());
|
auto end_time_seconds = std::chrono::floor<std::chrono::seconds>(currentTime());
|
||||||
auto znode = coordination.root_znode;
|
auto znode = coordination.root_znode;
|
||||||
znode.last_attempt_time = end_time;
|
znode.last_attempt_time = end_time_seconds;
|
||||||
|
znode.last_attempt_error = error_message;
|
||||||
if (refreshed)
|
if (refreshed)
|
||||||
{
|
{
|
||||||
znode.last_attempt_succeeded = true;
|
znode.last_attempt_succeeded = true;
|
||||||
znode.last_completed_timeslot = refresh_schedule.timeslotForCompletedRefresh(znode.last_completed_timeslot, start_time, end_time, out_of_schedule);
|
znode.last_completed_timeslot = refresh_schedule.timeslotForCompletedRefresh(znode.last_completed_timeslot, start_time_seconds, end_time_seconds, out_of_schedule);
|
||||||
znode.last_success_time = start_time;
|
znode.last_success_time = start_time_seconds;
|
||||||
znode.last_success_duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time_steady);
|
znode.last_success_duration = std::chrono::milliseconds(stopwatch.elapsedMilliseconds());
|
||||||
znode.last_success_table_uuid = new_table_uuid;
|
znode.last_success_table_uuid = *new_table_uuid;
|
||||||
znode.previous_attempt_error = "";
|
znode.previous_attempt_error = "";
|
||||||
znode.attempt_number = 0;
|
znode.attempt_number = 0;
|
||||||
znode.randomize();
|
znode.randomize();
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
znode.last_attempt_error = error_message;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool ok = updateCoordinationState(znode, false, zookeeper, lock);
|
bool ok = updateCoordinationState(znode, false, zookeeper, lock);
|
||||||
chassert(ok);
|
chassert(ok);
|
||||||
@ -517,8 +500,10 @@ void RefreshTask::refreshTask()
|
|||||||
if (!lock.owns_lock())
|
if (!lock.owns_lock())
|
||||||
lock.lock();
|
lock.lock();
|
||||||
scheduling.stop_requested = true;
|
scheduling.stop_requested = true;
|
||||||
|
scheduling.unexpected_error = getCurrentExceptionMessage(true);
|
||||||
coordination.watches->should_reread_znodes.store(true);
|
coordination.watches->should_reread_znodes.store(true);
|
||||||
coordination.running_znode_exists = false;
|
coordination.running_znode_exists = false;
|
||||||
|
refresh_task->schedule();
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
||||||
tryLogCurrentException(log,
|
tryLogCurrentException(log,
|
||||||
@ -532,34 +517,43 @@ void RefreshTask::refreshTask()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
UUID RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version)
|
std::optional<UUID> RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version, std::chrono::system_clock::time_point start_time, const Stopwatch & stopwatch, const String & log_comment, String & out_error_message)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Refreshing view {}", view->getStorageID().getFullTableName());
|
StorageID view_storage_id = view->getStorageID();
|
||||||
|
LOG_DEBUG(log, "Refreshing view {}", view_storage_id.getFullTableName());
|
||||||
execution.progress.reset();
|
execution.progress.reset();
|
||||||
|
|
||||||
ContextMutablePtr refresh_context = view->createRefreshContext();
|
ContextMutablePtr refresh_context = view->createRefreshContext(log_comment);
|
||||||
|
|
||||||
if (!append)
|
if (!append)
|
||||||
{
|
{
|
||||||
refresh_context->setParentTable(view->getStorageID().uuid);
|
refresh_context->setParentTable(view_storage_id.uuid);
|
||||||
refresh_context->setDDLQueryCancellation(execution.cancel_ddl_queries.get_token());
|
refresh_context->setDDLQueryCancellation(execution.cancel_ddl_queries.get_token());
|
||||||
if (root_znode_version != -1)
|
if (root_znode_version != -1)
|
||||||
refresh_context->setDDLAdditionalChecksOnEnqueue({zkutil::makeCheckRequest(coordination.path, root_znode_version)});
|
refresh_context->setDDLAdditionalChecksOnEnqueue({zkutil::makeCheckRequest(coordination.path, root_znode_version)});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::optional<QueryLogElement> query_log_elem;
|
||||||
|
std::shared_ptr<ASTInsertQuery> refresh_query;
|
||||||
|
String query_for_logging;
|
||||||
|
std::shared_ptr<OpenTelemetry::SpanHolder> query_span = std::make_shared<OpenTelemetry::SpanHolder>("query");
|
||||||
|
ProcessList::EntryPtr process_list_entry;
|
||||||
|
|
||||||
std::optional<StorageID> table_to_drop;
|
std::optional<StorageID> table_to_drop;
|
||||||
auto new_table_id = StorageID::createEmpty();
|
auto new_table_id = StorageID::createEmpty();
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
/// Create a table.
|
/// Create a table.
|
||||||
auto [refresh_query, query_scope] = view->prepareRefresh(append, refresh_context, table_to_drop);
|
query_for_logging = "(create target table)";
|
||||||
|
std::unique_ptr<CurrentThread::QueryScope> query_scope;
|
||||||
|
std::tie(refresh_query, query_scope) = view->prepareRefresh(append, refresh_context, table_to_drop);
|
||||||
new_table_id = refresh_query->table_id;
|
new_table_id = refresh_query->table_id;
|
||||||
|
|
||||||
/// Add the query to system.processes and allow it to be killed with KILL QUERY.
|
/// Add the query to system.processes and allow it to be killed with KILL QUERY.
|
||||||
String query_for_logging = refresh_query->formatForLogging(
|
query_for_logging = refresh_query->formatForLogging(
|
||||||
refresh_context->getSettingsRef()[Setting::log_queries_cut_to_length]);
|
refresh_context->getSettingsRef()[Setting::log_queries_cut_to_length]);
|
||||||
auto process_list_entry = refresh_context->getProcessList().insert(
|
process_list_entry = refresh_context->getProcessList().insert(
|
||||||
query_for_logging, refresh_query.get(), refresh_context, Stopwatch{CLOCK_MONOTONIC}.getStart());
|
query_for_logging, refresh_query.get(), refresh_context, Stopwatch{CLOCK_MONOTONIC}.getStart());
|
||||||
refresh_context->setProcessListElement(process_list_entry->getQueryStatus());
|
refresh_context->setProcessListElement(process_list_entry->getQueryStatus());
|
||||||
refresh_context->setProgressCallback([this](const Progress & prog)
|
refresh_context->setProgressCallback([this](const Progress & prog)
|
||||||
@ -569,15 +563,23 @@ UUID RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version
|
|||||||
|
|
||||||
/// Run the query.
|
/// Run the query.
|
||||||
|
|
||||||
BlockIO block_io = InterpreterInsertQuery(
|
InterpreterInsertQuery interpreter(
|
||||||
refresh_query,
|
refresh_query,
|
||||||
refresh_context,
|
refresh_context,
|
||||||
/* allow_materialized */ false,
|
/* allow_materialized */ false,
|
||||||
/* no_squash */ false,
|
/* no_squash */ false,
|
||||||
/* no_destination */ false,
|
/* no_destination */ false,
|
||||||
/* async_isnert */ false).execute();
|
/* async_isnert */ false);
|
||||||
|
BlockIO block_io = interpreter.execute();
|
||||||
QueryPipeline & pipeline = block_io.pipeline;
|
QueryPipeline & pipeline = block_io.pipeline;
|
||||||
|
|
||||||
|
/// We log the refresh as one INSERT SELECT query, but the timespan and exceptions also
|
||||||
|
/// cover the surrounding CREATE, EXCHANGE, and DROP queries.
|
||||||
|
query_log_elem = logQueryStart(
|
||||||
|
start_time, refresh_context, query_for_logging, refresh_query, pipeline,
|
||||||
|
&interpreter, /*internal*/ false, view_storage_id.database_name,
|
||||||
|
view_storage_id.table_name, /*async_insert*/ false);
|
||||||
|
|
||||||
if (!pipeline.completed())
|
if (!pipeline.completed())
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for view refresh must be completed");
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for view refresh must be completed");
|
||||||
|
|
||||||
@ -605,21 +607,51 @@ UUID RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version
|
|||||||
/// (specifically, the assert in ~WriteBuffer()).
|
/// (specifically, the assert in ~WriteBuffer()).
|
||||||
if (execution.interrupt_execution.load())
|
if (execution.interrupt_execution.load())
|
||||||
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Refresh cancelled");
|
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Refresh cancelled");
|
||||||
|
|
||||||
|
logQueryFinish(*query_log_elem, refresh_context, refresh_query, pipeline, /*pulling_pipeline*/ false, query_span, QueryCache::Usage::None, /*internal*/ false);
|
||||||
|
query_log_elem = std::nullopt;
|
||||||
|
query_span = nullptr;
|
||||||
|
process_list_entry.reset(); // otherwise it needs to be alive for logQueryException
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Exchange tables.
|
/// Exchange tables.
|
||||||
if (!append)
|
if (!append)
|
||||||
|
{
|
||||||
|
query_for_logging = "(exchange tables)";
|
||||||
table_to_drop = view->exchangeTargetTable(new_table_id, refresh_context);
|
table_to_drop = view->exchangeTargetTable(new_table_id, refresh_context);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
|
bool cancelled = execution.interrupt_execution.load();
|
||||||
|
|
||||||
if (table_to_drop.has_value())
|
if (table_to_drop.has_value())
|
||||||
view->dropTempTable(table_to_drop.value(), refresh_context);
|
{
|
||||||
throw;
|
String discard_error_message;
|
||||||
|
view->dropTempTable(table_to_drop.value(), refresh_context, discard_error_message);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (query_log_elem.has_value())
|
||||||
|
{
|
||||||
|
logQueryException(*query_log_elem, refresh_context, stopwatch, refresh_query, query_span, /*internal*/ false, /*log_error*/ !cancelled);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/// Failed when creating new table or when swapping tables.
|
||||||
|
logExceptionBeforeStart(query_for_logging, refresh_context, /*ast*/ nullptr, query_span,
|
||||||
|
stopwatch.elapsedMilliseconds());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cancelled)
|
||||||
|
out_error_message = "cancelled";
|
||||||
|
else
|
||||||
|
out_error_message = getCurrentExceptionMessage(true);
|
||||||
|
|
||||||
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (table_to_drop.has_value())
|
if (table_to_drop.has_value())
|
||||||
view->dropTempTable(table_to_drop.value(), refresh_context);
|
view->dropTempTable(table_to_drop.value(), refresh_context, out_error_message);
|
||||||
|
|
||||||
return new_table_id.uuid;
|
return new_table_id.uuid;
|
||||||
}
|
}
|
||||||
|
@ -151,6 +151,7 @@ public:
|
|||||||
CoordinationZnode znode;
|
CoordinationZnode znode;
|
||||||
bool refresh_running;
|
bool refresh_running;
|
||||||
ProgressValues progress;
|
ProgressValues progress;
|
||||||
|
std::optional<String> unexpected_error; // refreshing is stopped because of unexpected error
|
||||||
};
|
};
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -203,6 +204,8 @@ private:
|
|||||||
{
|
{
|
||||||
/// Refreshes are stopped, e.g. by SYSTEM STOP VIEW.
|
/// Refreshes are stopped, e.g. by SYSTEM STOP VIEW.
|
||||||
bool stop_requested = false;
|
bool stop_requested = false;
|
||||||
|
/// Refreshes are stopped because we got an unexpected error. Can be resumed with SYSTEM START VIEW.
|
||||||
|
std::optional<String> unexpected_error;
|
||||||
/// An out-of-schedule refresh was requested, e.g. by SYSTEM REFRESH VIEW.
|
/// An out-of-schedule refresh was requested, e.g. by SYSTEM REFRESH VIEW.
|
||||||
bool out_of_schedule_refresh_requested = false;
|
bool out_of_schedule_refresh_requested = false;
|
||||||
|
|
||||||
@ -257,8 +260,8 @@ private:
|
|||||||
void refreshTask();
|
void refreshTask();
|
||||||
|
|
||||||
/// Perform an actual refresh: create new table, run INSERT SELECT, exchange tables, drop old table.
|
/// Perform an actual refresh: create new table, run INSERT SELECT, exchange tables, drop old table.
|
||||||
/// Mutex must be unlocked. Called only from refresh_task.
|
/// Mutex must be unlocked. Called only from refresh_task. Doesn't throw.
|
||||||
UUID executeRefreshUnlocked(bool append, int32_t root_znode_version);
|
std::optional<UUID> executeRefreshUnlocked(bool append, int32_t root_znode_version, std::chrono::system_clock::time_point start_time, const Stopwatch & stopwatch, const String & log_comment, String & out_error_message);
|
||||||
|
|
||||||
/// Assigns dependencies_satisfied_until.
|
/// Assigns dependencies_satisfied_until.
|
||||||
void updateDependenciesIfNeeded(std::unique_lock<std::mutex> & lock);
|
void updateDependenciesIfNeeded(std::unique_lock<std::mutex> & lock);
|
||||||
|
@ -117,7 +117,7 @@ UUID StorageInMemoryMetadata::getDefinerID(DB::ContextPtr context) const
|
|||||||
return access_control.getID<User>(*definer);
|
return access_control.getID<User>(*definer);
|
||||||
}
|
}
|
||||||
|
|
||||||
ContextMutablePtr StorageInMemoryMetadata::getSQLSecurityOverriddenContext(ContextPtr context) const
|
ContextMutablePtr StorageInMemoryMetadata::getSQLSecurityOverriddenContext(ContextPtr context, const ClientInfo * client_info) const
|
||||||
{
|
{
|
||||||
if (!sql_security_type)
|
if (!sql_security_type)
|
||||||
return Context::createCopy(context);
|
return Context::createCopy(context);
|
||||||
@ -126,7 +126,10 @@ ContextMutablePtr StorageInMemoryMetadata::getSQLSecurityOverriddenContext(Conte
|
|||||||
return Context::createCopy(context);
|
return Context::createCopy(context);
|
||||||
|
|
||||||
auto new_context = Context::createCopy(context->getGlobalContext());
|
auto new_context = Context::createCopy(context->getGlobalContext());
|
||||||
new_context->setClientInfo(context->getClientInfo());
|
if (client_info)
|
||||||
|
new_context->setClientInfo(*client_info);
|
||||||
|
else
|
||||||
|
new_context->setClientInfo(context->getClientInfo());
|
||||||
new_context->makeQueryContext();
|
new_context->makeQueryContext();
|
||||||
|
|
||||||
const auto & database = context->getCurrentDatabase();
|
const auto & database = context->getCurrentDatabase();
|
||||||
|
@ -18,6 +18,8 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
class ClientInfo;
|
||||||
|
|
||||||
/// Common metadata for all storages. Contains all possible parts of CREATE
|
/// Common metadata for all storages. Contains all possible parts of CREATE
|
||||||
/// query from all storages, but only some subset used.
|
/// query from all storages, but only some subset used.
|
||||||
struct StorageInMemoryMetadata
|
struct StorageInMemoryMetadata
|
||||||
@ -122,7 +124,7 @@ struct StorageInMemoryMetadata
|
|||||||
/// Returns a copy of the context with the correct user from SQL security options.
|
/// Returns a copy of the context with the correct user from SQL security options.
|
||||||
/// If the SQL security wasn't set, this is equivalent to `Context::createCopy(context)`.
|
/// If the SQL security wasn't set, this is equivalent to `Context::createCopy(context)`.
|
||||||
/// The context from this function must be used every time whenever views execute any read/write operations or subqueries.
|
/// The context from this function must be used every time whenever views execute any read/write operations or subqueries.
|
||||||
ContextMutablePtr getSQLSecurityOverriddenContext(ContextPtr context) const;
|
ContextMutablePtr getSQLSecurityOverriddenContext(ContextPtr context, const ClientInfo * client_info = nullptr) const;
|
||||||
|
|
||||||
/// Returns combined set of columns
|
/// Returns combined set of columns
|
||||||
const ColumnsDescription & getColumns() const;
|
const ColumnsDescription & getColumns() const;
|
||||||
|
@ -46,6 +46,7 @@ namespace Setting
|
|||||||
{
|
{
|
||||||
extern const SettingsBool allow_experimental_analyzer;
|
extern const SettingsBool allow_experimental_analyzer;
|
||||||
extern const SettingsSeconds lock_acquire_timeout;
|
extern const SettingsSeconds lock_acquire_timeout;
|
||||||
|
extern const SettingsUInt64 log_queries_cut_to_length;
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace ServerSetting
|
namespace ServerSetting
|
||||||
@ -191,9 +192,9 @@ StorageMaterializedView::StorageMaterializedView(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sanity-check the table engine.
|
|
||||||
if (mode < LoadingStrictnessLevel::ATTACH && !fixed_uuid)
|
if (mode < LoadingStrictnessLevel::ATTACH && !fixed_uuid)
|
||||||
{
|
{
|
||||||
|
/// Sanity-check the table engine.
|
||||||
String inner_engine;
|
String inner_engine;
|
||||||
if (has_inner_table)
|
if (has_inner_table)
|
||||||
{
|
{
|
||||||
@ -221,6 +222,12 @@ StorageMaterializedView::StorageMaterializedView(
|
|||||||
/// each refresh would append to a table on one arbitrarily chosen replica. But in principle it can be useful,
|
/// each refresh would append to a table on one arbitrarily chosen replica. But in principle it can be useful,
|
||||||
/// e.g. if SELECTs are done using clusterAllReplicas(). (For the two disallowed cases above, clusterAllReplicas() wouldn't work reliably.)
|
/// e.g. if SELECTs are done using clusterAllReplicas(). (For the two disallowed cases above, clusterAllReplicas() wouldn't work reliably.)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sanity-check permissions. This is just for usability, the main checks are done by the
|
||||||
|
/// actual CREATE/INSERT/SELECT/EXCHANGE/DROP interpreters during refresh.
|
||||||
|
String inner_db_name = has_inner_table ? table_id_.database_name : to_table_id.database_name;
|
||||||
|
auto refresh_context = storage_metadata.getSQLSecurityOverriddenContext(getContext());
|
||||||
|
refresh_context->checkAccess(AccessType::DROP_TABLE | AccessType::CREATE_TABLE | AccessType::SELECT | AccessType::INSERT, inner_db_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
refresher = RefreshTask::create(this, getContext(), *query.refresh_strategy, mode >= LoadingStrictnessLevel::ATTACH, refresh_coordinated, query.is_create_empty);
|
refresher = RefreshTask::create(this, getContext(), *query.refresh_strategy, mode >= LoadingStrictnessLevel::ATTACH, refresh_coordinated, query.is_create_empty);
|
||||||
@ -491,10 +498,16 @@ bool StorageMaterializedView::optimize(
|
|||||||
return storage_ptr->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, local_context);
|
return storage_ptr->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, local_context);
|
||||||
}
|
}
|
||||||
|
|
||||||
ContextMutablePtr StorageMaterializedView::createRefreshContext() const
|
ContextMutablePtr StorageMaterializedView::createRefreshContext(const String & log_comment) const
|
||||||
{
|
{
|
||||||
auto refresh_context = getInMemoryMetadataPtr()->getSQLSecurityOverriddenContext(getContext());
|
ContextPtr table_context = getContext();
|
||||||
|
ClientInfo client_info = table_context->getClientInfo();
|
||||||
|
client_info.interface = ClientInfo::Interface::BACKGROUND;
|
||||||
|
client_info.client_name = "refreshable materialized view";
|
||||||
|
auto refresh_context = getInMemoryMetadataPtr()->getSQLSecurityOverriddenContext(table_context, &client_info);
|
||||||
|
refresh_context->setClientInfo(client_info);
|
||||||
refresh_context->setSetting("database_replicated_allow_replicated_engine_arguments", 3);
|
refresh_context->setSetting("database_replicated_allow_replicated_engine_arguments", 3);
|
||||||
|
refresh_context->setSetting("log_comment", log_comment);
|
||||||
refresh_context->setQueryKind(ClientInfo::QueryKind::INITIAL_QUERY);
|
refresh_context->setQueryKind(ClientInfo::QueryKind::INITIAL_QUERY);
|
||||||
/// Generate a random query id.
|
/// Generate a random query id.
|
||||||
refresh_context->setCurrentQueryId("");
|
refresh_context->setCurrentQueryId("");
|
||||||
@ -518,6 +531,9 @@ StorageMaterializedView::prepareRefresh(bool append, ContextMutablePtr refresh_c
|
|||||||
String db_name = db->getDatabaseName();
|
String db_name = db->getDatabaseName();
|
||||||
auto new_table_name = ".tmp" + generateInnerTableName(getStorageID());
|
auto new_table_name = ".tmp" + generateInnerTableName(getStorageID());
|
||||||
|
|
||||||
|
/// Pre-check the permissions. Would be awkward if we create a temporary table and can't drop it.
|
||||||
|
refresh_context->checkAccess(AccessType::DROP_TABLE | AccessType::CREATE_TABLE | AccessType::SELECT | AccessType::INSERT, db_name);
|
||||||
|
|
||||||
auto create_query = std::dynamic_pointer_cast<ASTCreateQuery>(db->getCreateTableQuery(inner_table_id.table_name, getContext()));
|
auto create_query = std::dynamic_pointer_cast<ASTCreateQuery>(db->getCreateTableQuery(inner_table_id.table_name, getContext()));
|
||||||
create_query->setTable(new_table_name);
|
create_query->setTable(new_table_name);
|
||||||
create_query->setDatabase(db_name);
|
create_query->setDatabase(db_name);
|
||||||
@ -583,24 +599,30 @@ std::optional<StorageID> StorageMaterializedView::exchangeTargetTable(StorageID
|
|||||||
return exchange ? std::make_optional(fresh_table) : std::nullopt;
|
return exchange ? std::make_optional(fresh_table) : std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageMaterializedView::dropTempTable(StorageID table_id, ContextMutablePtr refresh_context)
|
void StorageMaterializedView::dropTempTable(StorageID table_id, ContextMutablePtr refresh_context, String & out_exception)
|
||||||
{
|
{
|
||||||
CurrentThread::QueryScope query_scope(refresh_context);
|
CurrentThread::QueryScope query_scope(refresh_context);
|
||||||
|
|
||||||
|
auto drop_query = std::make_shared<ASTDropQuery>();
|
||||||
|
drop_query->setDatabase(table_id.database_name);
|
||||||
|
drop_query->setTable(table_id.table_name);
|
||||||
|
drop_query->kind = ASTDropQuery::Kind::Drop;
|
||||||
|
drop_query->if_exists = true;
|
||||||
|
drop_query->sync = false;
|
||||||
|
|
||||||
|
Stopwatch stopwatch;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
auto drop_query = std::make_shared<ASTDropQuery>();
|
|
||||||
drop_query->setDatabase(table_id.database_name);
|
|
||||||
drop_query->setTable(table_id.table_name);
|
|
||||||
drop_query->kind = ASTDropQuery::Kind::Drop;
|
|
||||||
drop_query->if_exists = true;
|
|
||||||
drop_query->sync = false;
|
|
||||||
|
|
||||||
InterpreterDropQuery(drop_query, refresh_context).execute();
|
InterpreterDropQuery(drop_query, refresh_context).execute();
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
tryLogCurrentException(&Poco::Logger::get("StorageMaterializedView"), "Failed to drop temporary table after refresh");
|
auto query_for_logging = drop_query->formatForLogging(refresh_context->getSettingsRef()[Setting::log_queries_cut_to_length]);
|
||||||
|
logExceptionBeforeStart(query_for_logging, refresh_context, drop_query, nullptr, stopwatch.elapsedMilliseconds());
|
||||||
|
LOG_ERROR(getLogger("StorageMaterializedView"),
|
||||||
|
"{}: Failed to drop temporary table after refresh. Table {} is left behind and requires manual cleanup!",
|
||||||
|
getStorageID().getFullTableName(), table_id.getFullTableName());
|
||||||
|
out_exception = getCurrentExceptionMessage(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,7 +120,7 @@ private:
|
|||||||
|
|
||||||
void checkStatementCanBeForwarded() const;
|
void checkStatementCanBeForwarded() const;
|
||||||
|
|
||||||
ContextMutablePtr createRefreshContext() const;
|
ContextMutablePtr createRefreshContext(const String & log_comment) const;
|
||||||
/// Prepare to refresh a refreshable materialized view: create temporary table (if needed) and
|
/// Prepare to refresh a refreshable materialized view: create temporary table (if needed) and
|
||||||
/// form the insert-select query.
|
/// form the insert-select query.
|
||||||
/// out_temp_table_id may be assigned before throwing an exception, in which case the caller
|
/// out_temp_table_id may be assigned before throwing an exception, in which case the caller
|
||||||
@ -128,7 +128,7 @@ private:
|
|||||||
std::tuple<std::shared_ptr<ASTInsertQuery>, std::unique_ptr<CurrentThread::QueryScope>>
|
std::tuple<std::shared_ptr<ASTInsertQuery>, std::unique_ptr<CurrentThread::QueryScope>>
|
||||||
prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional<StorageID> & out_temp_table_id) const;
|
prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional<StorageID> & out_temp_table_id) const;
|
||||||
std::optional<StorageID> exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context) const;
|
std::optional<StorageID> exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context) const;
|
||||||
void dropTempTable(StorageID table, ContextMutablePtr refresh_context);
|
void dropTempTable(StorageID table, ContextMutablePtr refresh_context, String & out_exception);
|
||||||
|
|
||||||
void updateTargetTableId(std::optional<String> database_name, std::optional<String> table_name);
|
void updateTargetTableId(std::optional<String> database_name, std::optional<String> table_name);
|
||||||
};
|
};
|
||||||
|
@ -79,14 +79,16 @@ void StorageSystemViewRefreshes::fillData(
|
|||||||
|
|
||||||
res_columns[i++]->insert(std::chrono::duration_cast<std::chrono::seconds>(refresh.next_refresh_time.time_since_epoch()).count());
|
res_columns[i++]->insert(std::chrono::duration_cast<std::chrono::seconds>(refresh.next_refresh_time.time_since_epoch()).count());
|
||||||
|
|
||||||
if (refresh.znode.last_attempt_succeeded || refresh.znode.last_attempt_time.time_since_epoch().count() == 0)
|
if (refresh.unexpected_error.has_value())
|
||||||
res_columns[i++]->insertDefault();
|
res_columns[i++]->insert(*refresh.unexpected_error);
|
||||||
|
else if (!refresh.znode.last_attempt_error.empty())
|
||||||
|
res_columns[i++]->insert(refresh.znode.last_attempt_error);
|
||||||
else if (refresh.refresh_running)
|
else if (refresh.refresh_running)
|
||||||
res_columns[i++]->insert(refresh.znode.previous_attempt_error);
|
res_columns[i++]->insert(refresh.znode.previous_attempt_error);
|
||||||
else if (refresh.znode.last_attempt_error.empty())
|
else if (refresh.znode.last_attempt_succeeded || refresh.znode.last_attempt_time.time_since_epoch().count() == 0)
|
||||||
res_columns[i++]->insert("Replica went away");
|
res_columns[i++]->insertDefault();
|
||||||
else
|
else
|
||||||
res_columns[i++]->insert(refresh.znode.last_attempt_error);
|
res_columns[i++]->insert("Replica went away");
|
||||||
|
|
||||||
Int64 retries = refresh.znode.attempt_number;
|
Int64 retries = refresh.znode.attempt_number;
|
||||||
if (refresh.refresh_running && retries)
|
if (refresh.refresh_running && retries)
|
||||||
|
@ -0,0 +1,7 @@
|
|||||||
|
hi
|
||||||
|
ACCESS_DENIED
|
||||||
|
FUNCTION_THROW_IF_VALUE_IS_NON_ZERO
|
||||||
|
Ex**ptionBeforeStart 9 refreshable materialized view 1 0
|
||||||
|
Ex**ptionWhileProcessing 9 refreshable materialized view 1 1
|
||||||
|
QueryFinish 9 refreshable materialized view 0 1
|
||||||
|
QueryStart 9 refreshable materialized view 0 1
|
72
tests/queries/0_stateless/03258_refreshable_mv_misc.sh
Executable file
72
tests/queries/0_stateless/03258_refreshable_mv_misc.sh
Executable file
@ -0,0 +1,72 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CURDIR"/../shell_config.sh
|
||||||
|
|
||||||
|
test_user="user_03258_$CLICKHOUSE_DATABASE"
|
||||||
|
second_db="${CLICKHOUSE_DATABASE}_03258"
|
||||||
|
$CLICKHOUSE_CLIENT -q "
|
||||||
|
create user $test_user;
|
||||||
|
create database $second_db;
|
||||||
|
create table a (x String) engine Memory;
|
||||||
|
insert into a values ('hi');
|
||||||
|
grant create, insert, select on ${second_db}.* to $test_user; -- no drop yet
|
||||||
|
grant select on a to $test_user;
|
||||||
|
grant system views on ${second_db}.* to $test_user;
|
||||||
|
"
|
||||||
|
|
||||||
|
# Check that permissions are checked on creation.
|
||||||
|
$CLICKHOUSE_CLIENT --user $test_user -q "
|
||||||
|
create materialized view ${second_db}.v refresh every 2 second (x String) engine Memory definer CURRENT_USER sql security definer as select * from a; -- {serverError ACCESS_DENIED}
|
||||||
|
"
|
||||||
|
$CLICKHOUSE_CLIENT -q "
|
||||||
|
grant drop on ${second_db}.* to $test_user;
|
||||||
|
"
|
||||||
|
$CLICKHOUSE_CLIENT --user $test_user -q "
|
||||||
|
create materialized view ${second_db}.v refresh every 1 second (x String) engine Memory definer CURRENT_USER sql security definer as select * from a;
|
||||||
|
system wait view ${second_db}.v;
|
||||||
|
select * from ${second_db}.v;
|
||||||
|
"
|
||||||
|
|
||||||
|
# Check that permissions are checked on refresh.
|
||||||
|
$CLICKHOUSE_CLIENT -q "revoke select on a from $test_user"
|
||||||
|
for attempt in {1..10}
|
||||||
|
do
|
||||||
|
$CLICKHOUSE_CLIENT -q "system refresh view ${second_db}.v"
|
||||||
|
res=$($CLICKHOUSE_CLIENT -q "system wait view ${second_db}.v" 2>&1)
|
||||||
|
if [ "$?" != 0 ]
|
||||||
|
then
|
||||||
|
echo "$res" | grep -o -m1 ACCESS_DENIED || echo "expected ACCESS_DENIED error, got:" "$res"
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT -q "alter table ${second_db}.v modify query select throwIf(1) as x"
|
||||||
|
|
||||||
|
# Get an exception during query execution.
|
||||||
|
for attempt in {1..10}
|
||||||
|
do
|
||||||
|
$CLICKHOUSE_CLIENT -q "system refresh view ${second_db}.v"
|
||||||
|
res=$($CLICKHOUSE_CLIENT -q "system wait view ${second_db}.v" 2>&1)
|
||||||
|
if [ "$?" == 0 ]
|
||||||
|
then
|
||||||
|
echo "unexpected success"
|
||||||
|
break
|
||||||
|
else
|
||||||
|
echo "$res" | grep -o -m1 FUNCTION_THROW_IF_VALUE_IS_NON_ZERO && break
|
||||||
|
fi
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
|
||||||
|
# Check that refreshes and both kinds of errors appear in query log.
|
||||||
|
$CLICKHOUSE_CLIENT -q "
|
||||||
|
system flush logs;
|
||||||
|
select replaceAll(toString(type), 'Exception', 'Ex**ption'), interface, client_name, exception != '', has(tables, '${second_db}.v') from system.query_log where event_time > now() - interval 30 minute and log_comment = 'refresh of ${second_db}.v' group by all order by all;
|
||||||
|
"
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT -q "
|
||||||
|
drop user $test_user;
|
||||||
|
drop database $second_db;
|
||||||
|
"
|
Loading…
Reference in New Issue
Block a user