Fix conflicts, 'context has expired', lint

This commit is contained in:
Michael Kolupaev 2024-06-11 03:52:50 +00:00
parent 39ed484e10
commit c7bf29970f
6 changed files with 30 additions and 17 deletions

View File

@ -29,7 +29,7 @@ private:
StopStatePtr state;
StopToken(StopStatePtr s) : state(std::move(s)) {}
explicit StopToken(StopStatePtr s) : state(std::move(s)) {}
};
class StopSource

View File

@ -414,6 +414,7 @@ void RefreshTask::refreshTask()
int32_t root_znode_version = coordination.coordinated ? coordination.root_znode.version : -1;
CurrentMetrics::Increment metric_inc(CurrentMetrics::RefreshingViews);
lock.unlock();
bool refreshed = false;
@ -435,7 +436,7 @@ void RefreshTask::refreshTask()
else
{
error_message = getCurrentExceptionMessage(true);
LOG_ERROR(log, "{}: Refresh failed (attempt {}/{}): {}", view->getStorageID().getFullTableName(), znode.attempt_number, refresh_settings.refresh_retries + 1, error_message);
LOG_ERROR(log, "{}: Refresh failed (attempt {}/{}): {}", view->getStorageID().getFullTableName(), start_znode.attempt_number, refresh_settings.refresh_retries + 1, error_message);
}
}
@ -519,16 +520,15 @@ UUID RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version
}
std::optional<StorageID> table_to_drop;
UUID new_table_uuid;
auto new_table_id = StorageID::createEmpty();
try
{
/// Create a table.
auto refresh_query = view->prepareRefresh(append, refresh_context, table_to_drop);
new_table_uuid = refresh_query->table_id.uuid;
/// Run the query.
{
CurrentThread::QueryScope query_scope(refresh_context); // create a thread group for the query
/// Create a table.
auto [refresh_query, query_scope] = view->prepareRefresh(append, refresh_context, table_to_drop);
new_table_id = refresh_query->table_id;
/// Run the query.
BlockIO block_io = InterpreterInsertQuery(
refresh_query,
@ -584,7 +584,7 @@ UUID RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version
/// Exchange tables.
if (!append)
table_to_drop = view->exchangeTargetTable(refresh_query->table_id, refresh_context);
table_to_drop = view->exchangeTargetTable(new_table_id, refresh_context);
}
catch (...)
{
@ -596,7 +596,7 @@ UUID RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version
if (table_to_drop.has_value())
view->dropTempTable(table_to_drop.value(), refresh_context);
return new_table_uuid;
return new_table_id.uuid;
}
void RefreshTask::updateDependenciesIfNeeded(std::unique_lock<std::mutex> & lock)

View File

@ -467,7 +467,8 @@ ContextMutablePtr StorageMaterializedView::createRefreshContext() const
return refresh_context;
}
std::shared_ptr<ASTInsertQuery> StorageMaterializedView::prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional<StorageID> & out_temp_table_id) const
std::tuple<std::shared_ptr<ASTInsertQuery>, std::unique_ptr<CurrentThread::QueryScope>>
StorageMaterializedView::prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional<StorageID> & out_temp_table_id) const
{
auto inner_table_id = getTargetTableId();
StorageID target_table = inner_table_id;
@ -498,6 +499,9 @@ std::shared_ptr<ASTInsertQuery> StorageMaterializedView::prepareRefresh(bool app
out_temp_table_id = target_table;
}
// Create a thread group for the query.
auto query_scope = std::make_unique<CurrentThread::QueryScope>(refresh_context);
auto insert_query = std::make_shared<ASTInsertQuery>();
insert_query->select = getInMemoryMetadataPtr()->getSelectQuery().select_query;
insert_query->setTable(target_table.table_name);
@ -515,7 +519,7 @@ std::shared_ptr<ASTInsertQuery> StorageMaterializedView::prepareRefresh(bool app
columns->children.push_back(std::make_shared<ASTIdentifier>(name));
insert_query->columns = std::move(columns);
return insert_query;
return {std::move(insert_query), std::move(query_scope)};
}
std::optional<StorageID> StorageMaterializedView::exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context)

View File

@ -2,9 +2,10 @@
#include <Parsers/IAST_fwd.h>
#include <Common/CurrentThread.h>
#include <Storages/IStorage.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/MaterializedView/RefreshTask.h>
namespace DB
@ -125,11 +126,12 @@ private:
void checkStatementCanBeForwarded() const;
ContextMutablePtr createRefreshContext() const;
/// Prepare to refresh a refreshable materialized view: create temporary table and form the
/// insert-select query.
/// Prepare to refresh a refreshable materialized view: create temporary table (if needed) and
/// form the insert-select query.
/// out_temp_table_id may be assigned before throwing an exception, in which case the caller
/// must drop the temp table before rethrowing.
std::shared_ptr<ASTInsertQuery> prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional<StorageID> & out_temp_table_id) const;
std::tuple<std::shared_ptr<ASTInsertQuery>, std::unique_ptr<CurrentThread::QueryScope>>
prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional<StorageID> & out_temp_table_id) const;
std::optional<StorageID> exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context);
void dropTempTable(StorageID table, ContextMutablePtr refresh_context);

View File

@ -27,3 +27,4 @@ ALTER from APPEND to non-APPEND failed, as expected
<38: append chain> 100
<38: append chain> 200
creating MergeTree without ORDER BY failed, as expected
2 1

View File

@ -204,5 +204,11 @@ $CLICKHOUSE_CLIENT -nq "
create materialized view o refresh every 1 second (x Int64) engine Memory as select x from nope.nonexist settings allow_materialized_view_with_bad_select = 1;
drop table o;"
$CLICKHOUSE_CLIENT -nq "
create materialized view o refresh every 1 second append (number UInt64) engine Memory as select number from numbers(2);
system wait view o;
select count(), sum(number) from o;
drop table o;"
$CLICKHOUSE_CLIENT -nq "
drop table refreshes;"