proper tmp table cleanup

This commit is contained in:
koloshmet 2023-02-15 04:58:26 +02:00 committed by Michael Kolupaev
parent d1932763f3
commit f14114dafc
4 changed files with 37 additions and 17 deletions

View File

@ -4,6 +4,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Processors/Executors/ManualPipelineExecutor.h>
@ -168,12 +169,14 @@ void RefreshTask::refresh()
storeLastState(LastTaskState::Finished);
break;
case ExecutionResult::Cancelled:
cancelRefresh(view);
storeLastState(LastTaskState::Canceled);
break;
}
refresh_executor.reset();
refresh_block.reset();
refresh_query.reset();
storeLastRefresh(std::chrono::system_clock::now());
scheduleRefresh(last_refresh);
@ -193,9 +196,12 @@ RefreshTask::ExecutionResult RefreshTask::executeRefresh()
}
void RefreshTask::initializeRefresh(std::shared_ptr<StorageMaterializedView> view)
void RefreshTask::initializeRefresh(std::shared_ptr<const StorageMaterializedView> view)
{
auto fresh_table = view->createFreshTable();
refresh_query = view->prepareRefreshQuery();
refresh_query->setTable(fresh_table.table_name);
refresh_query->setDatabase(fresh_table.database_name);
auto refresh_context = Context::createCopy(view->getContext());
refresh_block = InterpreterInsertQuery(refresh_query, refresh_context).execute();
refresh_block->pipeline.setProgressCallback([this](const Progress & progress){ progressCallback(progress); });
@ -208,8 +214,17 @@ void RefreshTask::initializeRefresh(std::shared_ptr<StorageMaterializedView> vie
void RefreshTask::completeRefresh(std::shared_ptr<StorageMaterializedView> view)
{
view->updateInnerTableAfterRefresh(refresh_query);
auto stale_table = view->exchangeTargetTable(refresh_query->table_id);
dependencies.notifyAll(view->getStorageID());
auto drop_context = Context::createCopy(view->getContext());
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, drop_context, drop_context, stale_table, /*sync=*/true);
}
void RefreshTask::cancelRefresh(std::shared_ptr<const StorageMaterializedView> view)
{
auto drop_context = Context::createCopy(view->getContext());
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, drop_context, drop_context, refresh_query->table_id, /*sync=*/true);
}
void RefreshTask::scheduleRefresh(std::chrono::system_clock::time_point now)

View File

@ -85,10 +85,12 @@ private:
ExecutionResult executeRefresh();
void initializeRefresh(std::shared_ptr<StorageMaterializedView> view);
void initializeRefresh(std::shared_ptr<const StorageMaterializedView> view);
void completeRefresh(std::shared_ptr<StorageMaterializedView> view);
void cancelRefresh(std::shared_ptr<const StorageMaterializedView> view);
std::chrono::sys_seconds calculateRefreshTime(std::chrono::system_clock::time_point now) const;
std::chrono::seconds genSpreadSeconds();

View File

@ -285,7 +285,7 @@ bool StorageMaterializedView::optimize(
return getTargetTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, local_context);
}
std::shared_ptr<ASTInsertQuery> StorageMaterializedView::prepareRefreshQuery()
StorageID StorageMaterializedView::createFreshTable() const
{
auto inner_table_id = getTargetTableId();
auto new_table_name = ".tmp" + generateInnerTableName(getStorageID());
@ -305,26 +305,29 @@ std::shared_ptr<ASTInsertQuery> StorageMaterializedView::prepareRefreshQuery()
create_interpreter.setInternal(true);
create_interpreter.execute();
auto insert_query = std::make_shared<ASTInsertQuery>();
insert_query->setTable(new_table_name);
insert_query->setDatabase(db->getDatabaseName());
insert_query->select = getInMemoryMetadataPtr()->getSelectQuery().select_query;
return DatabaseCatalog::instance().getTable({create_query.getDatabase(), create_query.getTable()}, getContext())->getStorageID();
}
std::shared_ptr<ASTInsertQuery> StorageMaterializedView::prepareRefreshQuery() const
{
auto insert_query = std::make_shared<ASTInsertQuery>();
insert_query->select = getInMemoryMetadataPtr()->getSelectQuery().select_query;
return insert_query;
}
void StorageMaterializedView::updateInnerTableAfterRefresh(std::shared_ptr<ASTInsertQuery> refresh_query)
StorageID StorageMaterializedView::exchangeTargetTable(const StorageID & fresh_table)
{
auto inner_table_id = getTargetTableId();
auto stale_table_id = getTargetTableId();
auto db = DatabaseCatalog::instance().getDatabase(inner_table_id.database_name);
auto target_db = DatabaseCatalog::instance().getDatabase(refresh_query->getDatabase());
auto db = DatabaseCatalog::instance().getDatabase(stale_table_id.database_name);
auto target_db = DatabaseCatalog::instance().getDatabase(fresh_table.database_name);
auto rename_ctx = Context::createCopy(getContext());
target_db->renameTable(
rename_ctx, refresh_query->getTable(), *db, inner_table_id.table_name, /*exchange=*/true, /*dictionary=*/false);
rename_ctx, fresh_table.table_name, *db, stale_table_id.table_name, /*exchange=*/true, /*dictionary=*/false);
setTargetTableId(db->getTable(refresh_query->getTable(), getContext())->getStorageID());
setTargetTableId(fresh_table);
return stale_table_id;
}
void StorageMaterializedView::alter(

View File

@ -119,9 +119,9 @@ private:
void checkStatementCanBeForwarded() const;
std::shared_ptr<ASTInsertQuery> prepareRefreshQuery();
void updateInnerTableAfterRefresh(std::shared_ptr<ASTInsertQuery> refresh_query);
StorageID createFreshTable() const;
std::shared_ptr<ASTInsertQuery> prepareRefreshQuery() const;
StorageID exchangeTargetTable(const StorageID & fresh_table);
StorageID getTargetTableId() const;
void setTargetTableId(StorageID id);