diff --git a/src/Storages/MaterializedView/RefreshTask.cpp b/src/Storages/MaterializedView/RefreshTask.cpp index 163ab3362eb..fdf3948de70 100644 --- a/src/Storages/MaterializedView/RefreshTask.cpp +++ b/src/Storages/MaterializedView/RefreshTask.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -168,12 +169,14 @@ void RefreshTask::refresh() storeLastState(LastTaskState::Finished); break; case ExecutionResult::Cancelled: + cancelRefresh(view); storeLastState(LastTaskState::Canceled); break; } refresh_executor.reset(); refresh_block.reset(); + refresh_query.reset(); storeLastRefresh(std::chrono::system_clock::now()); scheduleRefresh(last_refresh); @@ -193,9 +196,12 @@ RefreshTask::ExecutionResult RefreshTask::executeRefresh() } -void RefreshTask::initializeRefresh(std::shared_ptr view) +void RefreshTask::initializeRefresh(std::shared_ptr view) { + auto fresh_table = view->createFreshTable(); refresh_query = view->prepareRefreshQuery(); + refresh_query->setTable(fresh_table.table_name); + refresh_query->setDatabase(fresh_table.database_name); auto refresh_context = Context::createCopy(view->getContext()); refresh_block = InterpreterInsertQuery(refresh_query, refresh_context).execute(); refresh_block->pipeline.setProgressCallback([this](const Progress & progress){ progressCallback(progress); }); @@ -208,8 +214,17 @@ void RefreshTask::initializeRefresh(std::shared_ptr vie void RefreshTask::completeRefresh(std::shared_ptr view) { - view->updateInnerTableAfterRefresh(refresh_query); + auto stale_table = view->exchangeTargetTable(refresh_query->table_id); dependencies.notifyAll(view->getStorageID()); + + auto drop_context = Context::createCopy(view->getContext()); + InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, drop_context, drop_context, stale_table, /*sync=*/true); +} + +void RefreshTask::cancelRefresh(std::shared_ptr view) +{ + auto drop_context = Context::createCopy(view->getContext()); + InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, drop_context, drop_context, refresh_query->table_id, /*sync=*/true); } void RefreshTask::scheduleRefresh(std::chrono::system_clock::time_point now) diff --git a/src/Storages/MaterializedView/RefreshTask.h b/src/Storages/MaterializedView/RefreshTask.h index ea289562c7e..36c03e2c7df 100644 --- a/src/Storages/MaterializedView/RefreshTask.h +++ b/src/Storages/MaterializedView/RefreshTask.h @@ -85,10 +85,12 @@ private: ExecutionResult executeRefresh(); - void initializeRefresh(std::shared_ptr view); + void initializeRefresh(std::shared_ptr view); void completeRefresh(std::shared_ptr view); + void cancelRefresh(std::shared_ptr view); + std::chrono::sys_seconds calculateRefreshTime(std::chrono::system_clock::time_point now) const; std::chrono::seconds genSpreadSeconds(); diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index 43e8e0d6f33..42191bde90e 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -285,7 +285,7 @@ bool StorageMaterializedView::optimize( return getTargetTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, local_context); } -std::shared_ptr StorageMaterializedView::prepareRefreshQuery() +StorageID StorageMaterializedView::createFreshTable() const { auto inner_table_id = getTargetTableId(); auto new_table_name = ".tmp" + generateInnerTableName(getStorageID()); @@ -305,26 +305,29 @@ std::shared_ptr StorageMaterializedView::prepareRefreshQuery() create_interpreter.setInternal(true); create_interpreter.execute(); - auto insert_query = std::make_shared(); - insert_query->setTable(new_table_name); - insert_query->setDatabase(db->getDatabaseName()); - insert_query->select = getInMemoryMetadataPtr()->getSelectQuery().select_query; + return DatabaseCatalog::instance().getTable({create_query.getDatabase(), create_query.getTable()}, getContext())->getStorageID(); +} +std::shared_ptr StorageMaterializedView::prepareRefreshQuery() const +{ + auto insert_query = std::make_shared(); + insert_query->select = getInMemoryMetadataPtr()->getSelectQuery().select_query; return insert_query; } -void StorageMaterializedView::updateInnerTableAfterRefresh(std::shared_ptr refresh_query) +StorageID StorageMaterializedView::exchangeTargetTable(const StorageID & fresh_table) { - auto inner_table_id = getTargetTableId(); + auto stale_table_id = getTargetTableId(); - auto db = DatabaseCatalog::instance().getDatabase(inner_table_id.database_name); - auto target_db = DatabaseCatalog::instance().getDatabase(refresh_query->getDatabase()); + auto db = DatabaseCatalog::instance().getDatabase(stale_table_id.database_name); + auto target_db = DatabaseCatalog::instance().getDatabase(fresh_table.database_name); auto rename_ctx = Context::createCopy(getContext()); target_db->renameTable( - rename_ctx, refresh_query->getTable(), *db, inner_table_id.table_name, /*exchange=*/true, /*dictionary=*/false); + rename_ctx, fresh_table.table_name, *db, stale_table_id.table_name, /*exchange=*/true, /*dictionary=*/false); - setTargetTableId(db->getTable(refresh_query->getTable(), getContext())->getStorageID()); + setTargetTableId(fresh_table); + return stale_table_id; } void StorageMaterializedView::alter( diff --git a/src/Storages/StorageMaterializedView.h b/src/Storages/StorageMaterializedView.h index 2dce8355b47..f3b777d34fa 100644 --- a/src/Storages/StorageMaterializedView.h +++ b/src/Storages/StorageMaterializedView.h @@ -119,9 +119,9 @@ private: void checkStatementCanBeForwarded() const; - std::shared_ptr prepareRefreshQuery(); - - void updateInnerTableAfterRefresh(std::shared_ptr refresh_query); + StorageID createFreshTable() const; + std::shared_ptr prepareRefreshQuery() const; + StorageID exchangeTargetTable(const StorageID & fresh_table); StorageID getTargetTableId() const; void setTargetTableId(StorageID id);