diff --git a/src/Interpreters/InterpreterDropQuery.cpp b/src/Interpreters/InterpreterDropQuery.cpp index dd52b6c2e14..b8c9d5dabb5 100644 --- a/src/Interpreters/InterpreterDropQuery.cpp +++ b/src/Interpreters/InterpreterDropQuery.cpp @@ -267,7 +267,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue bool check_loading_deps = !check_ref_deps && getContext()->getSettingsRef().check_table_dependencies; DatabaseCatalog::instance().checkTableCanBeRemovedOrRenamed(table_id, check_ref_deps, check_loading_deps, is_drop_or_detach_database); - table->flushAndShutdown(); + table->flushAndShutdown(true); TableExclusiveLockHolder table_lock; if (database->getUUID() == UUIDHelpers::Nil) diff --git a/src/Storages/FileLog/StorageFileLog.cpp b/src/Storages/FileLog/StorageFileLog.cpp index de41ede8a5c..2eea619d654 100644 --- a/src/Storages/FileLog/StorageFileLog.cpp +++ b/src/Storages/FileLog/StorageFileLog.cpp @@ -382,7 +382,7 @@ void StorageFileLog::startup() task->holder->activateAndSchedule(); } -void StorageFileLog::shutdown() +void StorageFileLog::shutdown(bool) { if (task) { diff --git a/src/Storages/FileLog/StorageFileLog.h b/src/Storages/FileLog/StorageFileLog.h index 0fd62a22a18..3cb6ac1fbbf 100644 --- a/src/Storages/FileLog/StorageFileLog.h +++ b/src/Storages/FileLog/StorageFileLog.h @@ -47,7 +47,7 @@ public: bool noPushingToViews() const override { return true; } void startup() override; - void shutdown() override; + void shutdown(bool is_drop) override; Pipe read( const Names & column_names, diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 83249ae5151..0521f14e0f9 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -564,10 +564,10 @@ public: * @see shutdown() * @see flushAndPrepareForShutdown() */ - void flushAndShutdown() + void flushAndShutdown(bool is_drop = false) { flushAndPrepareForShutdown(); - shutdown(); + shutdown(is_drop); } /** If the table have to do some complicated work when destroying an object - do it in advance. @@ -575,7 +575,7 @@ public: * By default, does nothing. * Can be called simultaneously from different threads, even after a call to drop(). */ - virtual void shutdown() {} + virtual void shutdown(bool is_drop = false) { UNUSED(is_drop); } // NOLINT /// Called before shutdown() to flush data to underlying storage /// Data in memory need to be persistent diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index b9abe175e5f..423d295cdf2 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -435,7 +435,7 @@ void StorageKafka::startup() } -void StorageKafka::shutdown() +void StorageKafka::shutdown(bool) { for (auto & task : tasks) { diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index e08baf9fc80..9280809be0e 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -50,7 +50,7 @@ public: bool noPushingToViews() const override { return true; } void startup() override; - void shutdown() override; + void shutdown(bool is_drop) override; Pipe read( const Names & column_names, diff --git a/src/Storages/LiveView/StorageLiveView.cpp b/src/Storages/LiveView/StorageLiveView.cpp index aec2405b973..3c116321083 100644 --- a/src/Storages/LiveView/StorageLiveView.cpp +++ b/src/Storages/LiveView/StorageLiveView.cpp @@ -253,7 +253,7 @@ StorageLiveView::StorageLiveView( StorageLiveView::~StorageLiveView() { - shutdown(); + shutdown(false); } NamesAndTypesList StorageLiveView::getVirtuals() const @@ -289,7 +289,7 @@ void StorageLiveView::startup() periodic_refresh_task->activate(); } -void StorageLiveView::shutdown() +void StorageLiveView::shutdown(bool) { shutdown_called = true; diff --git a/src/Storages/LiveView/StorageLiveView.h b/src/Storages/LiveView/StorageLiveView.h index 92ffd4dc642..e0566d586ee 100644 --- a/src/Storages/LiveView/StorageLiveView.h +++ b/src/Storages/LiveView/StorageLiveView.h @@ -81,7 +81,7 @@ public: void startup() override; - void shutdown() override; + void shutdown(bool is_drop) override; Pipe read( const Names & column_names, diff --git a/src/Storages/NATS/StorageNATS.cpp b/src/Storages/NATS/StorageNATS.cpp index f3ed8ed7825..cd7e99a6d18 100644 --- a/src/Storages/NATS/StorageNATS.cpp +++ b/src/Storages/NATS/StorageNATS.cpp @@ -419,7 +419,7 @@ void StorageNATS::startup() } -void StorageNATS::shutdown() +void StorageNATS::shutdown(bool /* is_drop */) { shutdown_called = true; diff --git a/src/Storages/NATS/StorageNATS.h b/src/Storages/NATS/StorageNATS.h index cc7b0d88be5..16a162b8500 100644 --- a/src/Storages/NATS/StorageNATS.h +++ b/src/Storages/NATS/StorageNATS.h @@ -31,7 +31,7 @@ public: bool noPushingToViews() const override { return true; } void startup() override; - void shutdown() override; + void shutdown(bool is_drop) override; /// This is a bad way to let storage know in shutdown() that table is going to be dropped. There are some actions which need /// to be done only when table is dropped (not when detached). Also connection must be closed only in shutdown, but those diff --git a/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp b/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp index d83722dba6c..a7650983db8 100644 --- a/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp +++ b/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp @@ -228,7 +228,7 @@ void StorageMaterializedPostgreSQL::set(StoragePtr nested_storage) } -void StorageMaterializedPostgreSQL::shutdown() +void StorageMaterializedPostgreSQL::shutdown(bool) { if (replication_handler) replication_handler->shutdown(); diff --git a/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.h b/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.h index af0adb10f9f..ca7b801cb7c 100644 --- a/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.h +++ b/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.h @@ -81,7 +81,7 @@ public: String getName() const override { return "MaterializedPostgreSQL"; } - void shutdown() override; + void shutdown(bool is_drop) override; /// Used only for single MaterializedPostgreSQL storage. void dropInnerTableIfAny(bool sync, ContextPtr local_context) override; diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 3fca458310c..ec2e002b285 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -801,7 +801,7 @@ void StorageRabbitMQ::startup() } -void StorageRabbitMQ::shutdown() +void StorageRabbitMQ::shutdown(bool) { shutdown_called = true; diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index a5ff60f0c6e..120930cf01d 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -34,7 +34,7 @@ public: bool noPushingToViews() const override { return true; } void startup() override; - void shutdown() override; + void shutdown(bool is_drop) override; /// This is a bad way to let storage know in shutdown() that table is going to be dropped. There are some actions which need /// to be done only when table is dropped (not when detached). Also connection must be closed only in shutdown, but those diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp index d952160491c..f49e1d6f25c 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp @@ -197,8 +197,7 @@ S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::createNodeMetadata( return metadata; } -std::pair S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path) +S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path) { auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessingMicroseconds); auto file_status = local_file_statuses.get(path, /* create */true); @@ -261,12 +260,12 @@ std::pair S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path) + S3QueueFilesMetadata::ProcessingNodeHolderPtr> S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path, const FileStatusPtr & file_status) { /// In one zookeeper transaction do the following: /// 1. check that corresponding persistent nodes do not exist in processed/ and failed/; @@ -340,7 +339,7 @@ std::pair(node_metadata.processing_id, path, zookeeper_processing_path / node_name, zk_client); + auto holder = std::make_unique(node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client); return std::pair{SetFileProcessingResult::Success, std::move(holder)}; } @@ -363,7 +362,7 @@ std::pair S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path) + S3QueueFilesMetadata::ProcessingNodeHolderPtr> S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path, const FileStatusPtr & file_status) { /// Same as for Unordered mode. /// The only difference is the check if the file is already processed. @@ -394,7 +393,10 @@ std::pairtryMulti(requests, responses); if (code == Coordination::Error::ZOK) { - auto holder = std::make_unique(node_metadata.processing_id, path, zookeeper_processing_path / node_name, zk_client); + auto holder = std::make_unique(node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client); return std::pair{SetFileProcessingResult::Success, std::move(holder)}; } @@ -423,7 +425,7 @@ std::pairpath; - - auto file_status = local_file_statuses.get(path, /* create */false); + auto file_status = holder->getFileStatus(); { std::lock_guard lock(file_status->metadata_lock); file_status->state = FileStatus::State::Processed; @@ -559,7 +559,7 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds); const auto & path = holder->path; - auto file_status = local_file_statuses.get(path, /* create */false); + auto file_status = holder->getFileStatus(); { std::lock_guard lock(file_status->metadata_lock); file_status->state = FileStatus::State::Failed; @@ -682,8 +682,10 @@ S3QueueFilesMetadata::ProcessingNodeHolder::ProcessingNodeHolder( const std::string & processing_id_, const std::string & path_, const std::string & zk_node_path_, + FileStatusPtr file_status_, zkutil::ZooKeeperPtr zk_client_) : zk_client(zk_client_) + , file_status(file_status_) , path(path_) , zk_node_path(zk_node_path_) , processing_id(processing_id_) @@ -790,7 +792,18 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() const bool check_nodes_ttl = max_set_age_sec > 0; const auto zk_client = getZooKeeper(); - auto nodes = zk_client->getChildren(zookeeper_processed_path); + Strings nodes; + auto code = zk_client->tryGetChildren(zookeeper_processed_path, nodes); + if (code != Coordination::Error::ZOK) + { + if (code == Coordination::Error::ZNONODE) + { + LOG_TEST(log, "A `processed` not is not yet created"); + return; + } + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error: {}", magic_enum::enum_name(code)); + } + if (nodes.empty()) { LOG_TEST(log, "A set of nodes is empty"); @@ -869,7 +882,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() local_file_statuses.remove(node.metadata.file_path, /* if_exists */true); - auto code = zk_client->tryRemove(path); + code = zk_client->tryRemove(path); if (code == Coordination::Error::ZOK) --nodes_to_remove; else @@ -886,7 +899,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() local_file_statuses.remove(node.metadata.file_path, /* if_exists */true); - auto code = zk_client->tryRemove(path); + code = zk_client->tryRemove(path); if (code != Coordination::Error::ZOK) LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", path.string(), code); } diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.h b/src/Storages/S3Queue/S3QueueFilesMetadata.h index df9db87a621..f3be7c5c3a0 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.h +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.h @@ -70,7 +70,7 @@ public: using FileStatuses = std::unordered_map; /// Set file as processing, if it is not alreaty processed, failed or processing. - std::pair trySetFileAsProcessing(const std::string & path); + ProcessingNodeHolderPtr trySetFileAsProcessing(const std::string & path); FileStatusPtr getFileStatus(const std::string & path); @@ -112,8 +112,8 @@ private: AlreadyProcessed, AlreadyFailed, }; - std::pair trySetFileAsProcessingForOrderedMode(const std::string & path); - std::pair trySetFileAsProcessingForUnorderedMode(const std::string & path); + std::pair trySetFileAsProcessingForOrderedMode(const std::string & path, const FileStatusPtr & file_status); + std::pair trySetFileAsProcessingForUnorderedMode(const std::string & path, const FileStatusPtr & file_status); struct NodeMetadata { @@ -153,14 +153,18 @@ public: const std::string & processing_id_, const std::string & path_, const std::string & zk_node_path_, + FileStatusPtr file_status_, zkutil::ZooKeeperPtr zk_client_); ~ProcessingNodeHolder(); + FileStatusPtr getFileStatus() { return file_status; } + private: bool remove(Coordination::Requests * requests = nullptr, Coordination::Responses * responses = nullptr); zkutil::ZooKeeperPtr zk_client; + FileStatusPtr file_status; std::string path; std::string zk_node_path; std::string processing_id; diff --git a/src/Storages/S3Queue/S3QueueSource.cpp b/src/Storages/S3Queue/S3QueueSource.cpp index 5d957d885f5..1afd17edbe1 100644 --- a/src/Storages/S3Queue/S3QueueSource.cpp +++ b/src/Storages/S3Queue/S3QueueSource.cpp @@ -33,11 +33,9 @@ namespace ErrorCodes StorageS3QueueSource::S3QueueKeyWithInfo::S3QueueKeyWithInfo( const std::string & key_, std::optional info_, - Metadata::ProcessingNodeHolderPtr processing_holder_, - FileStatusPtr file_status_) + Metadata::ProcessingNodeHolderPtr processing_holder_) : StorageS3Source::KeyWithInfo(key_, info_) , processing_holder(processing_holder_) - , file_status(file_status_) { } @@ -57,13 +55,19 @@ StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next() { KeyWithInfoPtr val = glob_iterator->next(); - if (!val || shutdown_called) + if (!val) return {}; - if (auto [processing_holder, processing_file_status] = metadata->trySetFileAsProcessing(val->key); + if (shutdown_called) + { + LOG_TEST(&Poco::Logger::get("StorageS3QueueSource"), "Shutdown was called, stopping file iterator"); + return {}; + } + + if (auto processing_holder = metadata->trySetFileAsProcessing(val->key); processing_holder && !shutdown_called) { - return std::make_shared(val->key, val->info, processing_holder, processing_file_status); + return std::make_shared(val->key, val->info, processing_holder); } } return {}; @@ -84,8 +88,10 @@ StorageS3QueueSource::StorageS3QueueSource( const NamesAndTypesList & requested_virtual_columns_, ContextPtr context_, const std::atomic & shutdown_called_, + const std::atomic & table_is_being_dropped_, std::shared_ptr s3_queue_log_, - const StorageID & storage_id_) + const StorageID & storage_id_, + Poco::Logger * log_) : ISource(header_) , WithContext(context_) , name(std::move(name_)) @@ -94,10 +100,11 @@ StorageS3QueueSource::StorageS3QueueSource( , internal_source(std::move(internal_source_)) , requested_virtual_columns(requested_virtual_columns_) , shutdown_called(shutdown_called_) + , table_is_being_dropped(table_is_being_dropped_) , s3_queue_log(s3_queue_log_) , storage_id(storage_id_) , remove_file_func(remove_file_func_) - , log(&Poco::Logger::get("StorageS3QueueSource")) + , log(log_) { } @@ -132,29 +139,60 @@ Chunk StorageS3QueueSource::generate() if (!reader) break; + const auto * key_with_info = dynamic_cast(&reader.getKeyWithInfo()); + auto file_status = key_with_info->processing_holder->getFileStatus(); + if (isCancelled()) { reader->cancel(); + + if (processed_rows_from_file) + { + try + { + files_metadata->setFileFailed(key_with_info->processing_holder, "Cancelled"); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + appendLogElement(reader.getFile(), *file_status, processed_rows_from_file, false); + } + break; } if (shutdown_called) { - if (processed_rows_from_file) - { - /// We could delay shutdown until files, which already started processing before the shutdown, finished. - /// But if files are big and `s3queue_processing_threads_num` is not small, it can take a significant time. - /// Anyway we cannot do anything in case of SIGTERM, so destination table must anyway support deduplication, - /// so here we will rely on it here as well. - LOG_WARNING( - log, "Shutdown called, {} rows are already processed, but file is not fully processed", - processed_rows_from_file); - } - break; - } + if (processed_rows_from_file == 0) + break; - const auto * key_with_info = dynamic_cast(&reader.getKeyWithInfo()); - auto file_status = key_with_info->file_status; + if (table_is_being_dropped) + { + LOG_DEBUG( + log, "Table is being dropped, {} rows are already processed from {}, but file is not fully processed", + processed_rows_from_file, reader.getFile()); + + try + { + files_metadata->setFileFailed(key_with_info->processing_holder, "Table is dropped"); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + appendLogElement(reader.getFile(), *file_status, processed_rows_from_file, false); + + /// Leave the file half processed. Table is being dropped, so we do not care. + break; + } + + LOG_DEBUG(log, "Shutdown called, but file {} is partially processed ({} rows). " + "Will process the file fully and then shutdown", + reader.getFile(), processed_rows_from_file); + } auto * prev_scope = CurrentThread::get().attachProfileCountersScope(&file_status->profile_counters); SCOPE_EXIT({ CurrentThread::get().attachProfileCountersScope(prev_scope); }); diff --git a/src/Storages/S3Queue/S3QueueSource.h b/src/Storages/S3Queue/S3QueueSource.h index 8af5256899a..542f8e8fd8c 100644 --- a/src/Storages/S3Queue/S3QueueSource.h +++ b/src/Storages/S3Queue/S3QueueSource.h @@ -30,11 +30,9 @@ public: S3QueueKeyWithInfo( const std::string & key_, std::optional info_, - Metadata::ProcessingNodeHolderPtr processing_holder_, - FileStatusPtr file_status_); + Metadata::ProcessingNodeHolderPtr processing_holder_); Metadata::ProcessingNodeHolderPtr processing_holder; - FileStatusPtr file_status; }; class FileIterator : public IIterator @@ -66,8 +64,10 @@ public: const NamesAndTypesList & requested_virtual_columns_, ContextPtr context_, const std::atomic & shutdown_called_, + const std::atomic & table_is_being_dropped_, std::shared_ptr s3_queue_log_, - const StorageID & storage_id_); + const StorageID & storage_id_, + Poco::Logger * log_); ~StorageS3QueueSource() override; @@ -84,6 +84,7 @@ private: const std::shared_ptr internal_source; const NamesAndTypesList requested_virtual_columns; const std::atomic & shutdown_called; + const std::atomic & table_is_being_dropped; const std::shared_ptr s3_queue_log; const StorageID storage_id; diff --git a/src/Storages/S3Queue/StorageS3Queue.cpp b/src/Storages/S3Queue/StorageS3Queue.cpp index 72e74d3c2a0..99699aab709 100644 --- a/src/Storages/S3Queue/StorageS3Queue.cpp +++ b/src/Storages/S3Queue/StorageS3Queue.cpp @@ -161,8 +161,9 @@ void StorageS3Queue::startup() task->activateAndSchedule(); } -void StorageS3Queue::shutdown() +void StorageS3Queue::shutdown(bool is_drop) { + table_is_being_dropped = is_drop; shutdown_called = true; if (task) @@ -257,7 +258,7 @@ std::shared_ptr StorageS3Queue::createSource( return std::make_shared( getName(), read_from_format_info.source_header, std::move(internal_source), files_metadata, after_processing, file_deleter, read_from_format_info.requested_virtual_columns, - local_context, shutdown_called, s3_queue_log, getStorageID()); + local_context, shutdown_called, table_is_being_dropped, s3_queue_log, getStorageID(), log); } bool StorageS3Queue::hasDependencies(const StorageID & table_id) diff --git a/src/Storages/S3Queue/StorageS3Queue.h b/src/Storages/S3Queue/StorageS3Queue.h index 000015951ea..e594ddcce3e 100644 --- a/src/Storages/S3Queue/StorageS3Queue.h +++ b/src/Storages/S3Queue/StorageS3Queue.h @@ -73,10 +73,11 @@ private: std::atomic mv_attached = false; std::atomic shutdown_called = false; + std::atomic table_is_being_dropped = false; Poco::Logger * log; void startup() override; - void shutdown() override; + void shutdown(bool is_drop) override; void drop() override; bool supportsSubsetOfColumns(const ContextPtr & context_) const; bool supportsSubcolumns() const override { return true; } diff --git a/src/Storages/StorageDictionary.cpp b/src/Storages/StorageDictionary.cpp index 592195a918d..4c354371574 100644 --- a/src/Storages/StorageDictionary.cpp +++ b/src/Storages/StorageDictionary.cpp @@ -170,7 +170,7 @@ std::shared_ptr StorageDictionary::getDictionary() const return getContext()->getExternalDictionariesLoader().getDictionary(registered_dictionary_name, getContext()); } -void StorageDictionary::shutdown() +void StorageDictionary::shutdown(bool) { removeDictionaryConfigurationFromRepository(); } diff --git a/src/Storages/StorageDictionary.h b/src/Storages/StorageDictionary.h index aab704305e1..995a0192269 100644 --- a/src/Storages/StorageDictionary.h +++ b/src/Storages/StorageDictionary.h @@ -83,7 +83,7 @@ public: static NamesAndTypesList getNamesAndTypes(const DictionaryStructure & dictionary_structure); bool isDictionary() const override { return true; } - void shutdown() override; + void shutdown(bool is_drop) override; void startup() override; void renameInMemory(const StorageID & new_table_id) override; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 7705d0f193f..7d6f8aa5812 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1248,7 +1248,7 @@ void StorageDistributed::initializeFromDisk() } -void StorageDistributed::shutdown() +void StorageDistributed::shutdown(bool) { async_insert_blocker.cancelForever(); @@ -1269,7 +1269,7 @@ void StorageDistributed::drop() // And second time shutdown() should be fast, since none of // DirectoryMonitor should not do anything, because ActionBlocker is // canceled (in shutdown()). - shutdown(); + shutdown(true); // Distributed table without sharding_key does not allows INSERTs if (relative_data_path.empty()) diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 520e1445d09..a9e5e93cc92 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -137,7 +137,7 @@ public: void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & table_lock_holder) override; void initializeFromDisk(); - void shutdown() override; + void shutdown(bool is_drop) override; void flushAndPrepareForShutdown() override; void drop() override; diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index d54a7aaccf3..97cfd550769 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -398,7 +398,7 @@ void StorageMaterializedView::startup() DatabaseCatalog::instance().addViewDependency(select_query.select_table_id, getStorageID()); } -void StorageMaterializedView::shutdown() +void StorageMaterializedView::shutdown(bool) { auto metadata_snapshot = getInMemoryMetadataPtr(); const auto & select_query = metadata_snapshot->getSelectQuery(); diff --git a/src/Storages/StorageMaterializedView.h b/src/Storages/StorageMaterializedView.h index 0f6a6fd3db7..ae38cfb7e59 100644 --- a/src/Storages/StorageMaterializedView.h +++ b/src/Storages/StorageMaterializedView.h @@ -72,7 +72,7 @@ public: void renameInMemory(const StorageID & new_table_id) override; void startup() override; - void shutdown() override; + void shutdown(bool is_drop) override; QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 474171ba1b1..256f46607e7 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -158,7 +158,7 @@ void StorageMergeTree::startup() /// It means that failed "startup" must not create any background tasks that we will have to wait. try { - shutdown(); + shutdown(false); } catch (...) { @@ -170,7 +170,7 @@ void StorageMergeTree::startup() } } -void StorageMergeTree::shutdown() +void StorageMergeTree::shutdown(bool) { if (shutdown_called.exchange(true)) return; @@ -196,7 +196,7 @@ void StorageMergeTree::shutdown() StorageMergeTree::~StorageMergeTree() { - shutdown(); + shutdown(false); } void StorageMergeTree::read( @@ -290,7 +290,7 @@ void StorageMergeTree::checkTableCanBeDropped([[ maybe_unused ]] ContextPtr quer void StorageMergeTree::drop() { - shutdown(); + shutdown(true); /// In case there is read-only disk we cannot allow to call dropAllData(), but dropping tables is allowed. if (isStaticStorage()) return; diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index bd992bd09ac..539037a90ae 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -45,7 +45,7 @@ public: bool has_force_restore_data_flag); void startup() override; - void shutdown() override; + void shutdown(bool is_drop) override; ~StorageMergeTree() override; diff --git a/src/Storages/StorageProxy.h b/src/Storages/StorageProxy.h index 991d37b0b35..5d57f75a620 100644 --- a/src/Storages/StorageProxy.h +++ b/src/Storages/StorageProxy.h @@ -138,7 +138,7 @@ public: CancellationCode killMutation(const String & mutation_id) override { return getNested()->killMutation(mutation_id); } void startup() override { getNested()->startup(); } - void shutdown() override { getNested()->shutdown(); } + void shutdown(bool is_drop) override { getNested()->shutdown(is_drop); } void flushAndPrepareForShutdown() override { getNested()->flushAndPrepareForShutdown(); } ActionLock getActionLock(StorageActionBlockType action_type) override { return getNested()->getActionLock(action_type); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index c96b376f8b0..74821a9186c 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4995,7 +4995,7 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread) } else { - shutdown(); + shutdown(false); } } catch (...) @@ -5070,7 +5070,7 @@ void StorageReplicatedMergeTree::partialShutdown() LOG_TRACE(log, "Threads finished"); } -void StorageReplicatedMergeTree::shutdown() +void StorageReplicatedMergeTree::shutdown(bool) { if (shutdown_called.exchange(true)) return; @@ -5124,7 +5124,7 @@ StorageReplicatedMergeTree::~StorageReplicatedMergeTree() { try { - shutdown(); + shutdown(false); } catch (...) { diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index f196991ae07..8c90d0e2679 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -139,7 +139,7 @@ public: /// In shutdown we completely terminate table -- remove /// is_active node and interserver handler. Also optionally /// wait until other replicas will download some parts from our replica. - void shutdown() override; + void shutdown(bool is_drop) override; ~StorageReplicatedMergeTree() override; diff --git a/src/Storages/StorageTableFunction.h b/src/Storages/StorageTableFunction.h index 8f96cb46910..9d966fb899b 100644 --- a/src/Storages/StorageTableFunction.h +++ b/src/Storages/StorageTableFunction.h @@ -72,11 +72,11 @@ public: } void startup() override { } - void shutdown() override + void shutdown(bool is_drop) override { std::lock_guard lock{nested_mutex}; if (nested) - nested->shutdown(); + nested->shutdown(is_drop); } void flushAndPrepareForShutdown() override diff --git a/src/Storages/WindowView/StorageWindowView.cpp b/src/Storages/WindowView/StorageWindowView.cpp index e3fcd6249d1..46c38ffa129 100644 --- a/src/Storages/WindowView/StorageWindowView.cpp +++ b/src/Storages/WindowView/StorageWindowView.cpp @@ -461,7 +461,7 @@ void StorageWindowView::alter( modifying_query = false; }); - shutdown(); + shutdown(false); auto inner_query = initInnerQuery(new_select_query->as(), local_context); @@ -1586,7 +1586,7 @@ void StorageWindowView::startup() fire_task->schedule(); } -void StorageWindowView::shutdown() +void StorageWindowView::shutdown(bool) { shutdown_called = true; diff --git a/src/Storages/WindowView/StorageWindowView.h b/src/Storages/WindowView/StorageWindowView.h index 231616ff820..de8f880c602 100644 --- a/src/Storages/WindowView/StorageWindowView.h +++ b/src/Storages/WindowView/StorageWindowView.h @@ -142,7 +142,7 @@ public: void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override; void startup() override; - void shutdown() override; + void shutdown(bool is_drop) override; void read( QueryPlan & query_plan, diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index a4f6739046b..9f41cfd176d 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -164,7 +164,6 @@ def generate_random_files( values_csv = ( "\n".join((",".join(map(str, row)) for row in rand_values)) + "\n" ).encode() - print(f"File {filename}, content: {rand_values}") put_s3_file_content(started_cluster, filename, values_csv) return total_values @@ -889,3 +888,33 @@ def test_max_set_size(started_cluster): time.sleep(10) res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()] assert res1 == [total_values[1]] + + +def test_drop_table(started_cluster): + node = started_cluster.instances["instance"] + table_name = f"test_drop" + dst_table_name = f"{table_name}_dst" + keeper_path = f"/clickhouse/test_{table_name}" + files_path = f"{table_name}_data" + files_to_generate = 300 + + create_table( + started_cluster, + node, + table_name, + "unordered", + files_path, + additional_settings={ + "keeper_path": keeper_path, + "s3queue_processing_threads_num": 5, + }, + ) + total_values = generate_random_files( + started_cluster, files_path, files_to_generate, start_ind=0, row_num=100000 + ) + create_mv(node, table_name, dst_table_name) + node.wait_for_log_line(f"Reading from file: test_drop_data") + node.query(f"DROP TABLE {table_name} SYNC") + assert node.contains_in_log( + f"StorageS3Queue ({table_name}): Table is being dropped" + )