Merge pull request #56377 from ClickHouse/minor-improvements-for-s3-queue

Minor improvements for S3Queue
This commit is contained in:
Kseniia Sumarokova 2023-11-10 11:28:09 +01:00 committed by GitHub
commit e48df94772
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 176 additions and 89 deletions

View File

@ -267,7 +267,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue
bool check_loading_deps = !check_ref_deps && getContext()->getSettingsRef().check_table_dependencies; bool check_loading_deps = !check_ref_deps && getContext()->getSettingsRef().check_table_dependencies;
DatabaseCatalog::instance().checkTableCanBeRemovedOrRenamed(table_id, check_ref_deps, check_loading_deps, is_drop_or_detach_database); DatabaseCatalog::instance().checkTableCanBeRemovedOrRenamed(table_id, check_ref_deps, check_loading_deps, is_drop_or_detach_database);
table->flushAndShutdown(); table->flushAndShutdown(true);
TableExclusiveLockHolder table_lock; TableExclusiveLockHolder table_lock;
if (database->getUUID() == UUIDHelpers::Nil) if (database->getUUID() == UUIDHelpers::Nil)

View File

@ -382,7 +382,7 @@ void StorageFileLog::startup()
task->holder->activateAndSchedule(); task->holder->activateAndSchedule();
} }
void StorageFileLog::shutdown() void StorageFileLog::shutdown(bool)
{ {
if (task) if (task)
{ {

View File

@ -47,7 +47,7 @@ public:
bool noPushingToViews() const override { return true; } bool noPushingToViews() const override { return true; }
void startup() override; void startup() override;
void shutdown() override; void shutdown(bool is_drop) override;
Pipe read( Pipe read(
const Names & column_names, const Names & column_names,

View File

@ -564,10 +564,10 @@ public:
* @see shutdown() * @see shutdown()
* @see flushAndPrepareForShutdown() * @see flushAndPrepareForShutdown()
*/ */
void flushAndShutdown() void flushAndShutdown(bool is_drop = false)
{ {
flushAndPrepareForShutdown(); flushAndPrepareForShutdown();
shutdown(); shutdown(is_drop);
} }
/** If the table have to do some complicated work when destroying an object - do it in advance. /** If the table have to do some complicated work when destroying an object - do it in advance.
@ -575,7 +575,7 @@ public:
* By default, does nothing. * By default, does nothing.
* Can be called simultaneously from different threads, even after a call to drop(). * Can be called simultaneously from different threads, even after a call to drop().
*/ */
virtual void shutdown() {} virtual void shutdown(bool is_drop = false) { UNUSED(is_drop); } // NOLINT
/// Called before shutdown() to flush data to underlying storage /// Called before shutdown() to flush data to underlying storage
/// Data in memory need to be persistent /// Data in memory need to be persistent

View File

@ -435,7 +435,7 @@ void StorageKafka::startup()
} }
void StorageKafka::shutdown() void StorageKafka::shutdown(bool)
{ {
for (auto & task : tasks) for (auto & task : tasks)
{ {

View File

@ -50,7 +50,7 @@ public:
bool noPushingToViews() const override { return true; } bool noPushingToViews() const override { return true; }
void startup() override; void startup() override;
void shutdown() override; void shutdown(bool is_drop) override;
Pipe read( Pipe read(
const Names & column_names, const Names & column_names,

View File

@ -253,7 +253,7 @@ StorageLiveView::StorageLiveView(
StorageLiveView::~StorageLiveView() StorageLiveView::~StorageLiveView()
{ {
shutdown(); shutdown(false);
} }
NamesAndTypesList StorageLiveView::getVirtuals() const NamesAndTypesList StorageLiveView::getVirtuals() const
@ -289,7 +289,7 @@ void StorageLiveView::startup()
periodic_refresh_task->activate(); periodic_refresh_task->activate();
} }
void StorageLiveView::shutdown() void StorageLiveView::shutdown(bool)
{ {
shutdown_called = true; shutdown_called = true;

View File

@ -81,7 +81,7 @@ public:
void startup() override; void startup() override;
void shutdown() override; void shutdown(bool is_drop) override;
Pipe read( Pipe read(
const Names & column_names, const Names & column_names,

View File

@ -419,7 +419,7 @@ void StorageNATS::startup()
} }
void StorageNATS::shutdown() void StorageNATS::shutdown(bool /* is_drop */)
{ {
shutdown_called = true; shutdown_called = true;

View File

@ -31,7 +31,7 @@ public:
bool noPushingToViews() const override { return true; } bool noPushingToViews() const override { return true; }
void startup() override; void startup() override;
void shutdown() override; void shutdown(bool is_drop) override;
/// This is a bad way to let storage know in shutdown() that table is going to be dropped. There are some actions which need /// This is a bad way to let storage know in shutdown() that table is going to be dropped. There are some actions which need
/// to be done only when table is dropped (not when detached). Also connection must be closed only in shutdown, but those /// to be done only when table is dropped (not when detached). Also connection must be closed only in shutdown, but those

View File

@ -228,7 +228,7 @@ void StorageMaterializedPostgreSQL::set(StoragePtr nested_storage)
} }
void StorageMaterializedPostgreSQL::shutdown() void StorageMaterializedPostgreSQL::shutdown(bool)
{ {
if (replication_handler) if (replication_handler)
replication_handler->shutdown(); replication_handler->shutdown();

View File

@ -81,7 +81,7 @@ public:
String getName() const override { return "MaterializedPostgreSQL"; } String getName() const override { return "MaterializedPostgreSQL"; }
void shutdown() override; void shutdown(bool is_drop) override;
/// Used only for single MaterializedPostgreSQL storage. /// Used only for single MaterializedPostgreSQL storage.
void dropInnerTableIfAny(bool sync, ContextPtr local_context) override; void dropInnerTableIfAny(bool sync, ContextPtr local_context) override;

View File

@ -801,7 +801,7 @@ void StorageRabbitMQ::startup()
} }
void StorageRabbitMQ::shutdown() void StorageRabbitMQ::shutdown(bool)
{ {
shutdown_called = true; shutdown_called = true;

View File

@ -34,7 +34,7 @@ public:
bool noPushingToViews() const override { return true; } bool noPushingToViews() const override { return true; }
void startup() override; void startup() override;
void shutdown() override; void shutdown(bool is_drop) override;
/// This is a bad way to let storage know in shutdown() that table is going to be dropped. There are some actions which need /// This is a bad way to let storage know in shutdown() that table is going to be dropped. There are some actions which need
/// to be done only when table is dropped (not when detached). Also connection must be closed only in shutdown, but those /// to be done only when table is dropped (not when detached). Also connection must be closed only in shutdown, but those

View File

@ -197,8 +197,7 @@ S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::createNodeMetadata(
return metadata; return metadata;
} }
std::pair<S3QueueFilesMetadata::ProcessingNodeHolderPtr, S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path)
S3QueueFilesMetadata::FileStatusPtr> S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path)
{ {
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessingMicroseconds); auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessingMicroseconds);
auto file_status = local_file_statuses.get(path, /* create */true); auto file_status = local_file_statuses.get(path, /* create */true);
@ -261,12 +260,12 @@ std::pair<S3QueueFilesMetadata::ProcessingNodeHolderPtr,
{ {
case S3QueueMode::ORDERED: case S3QueueMode::ORDERED:
{ {
std::tie(result, processing_node_holder) = trySetFileAsProcessingForOrderedMode(path); std::tie(result, processing_node_holder) = trySetFileAsProcessingForOrderedMode(path, file_status);
break; break;
} }
case S3QueueMode::UNORDERED: case S3QueueMode::UNORDERED:
{ {
std::tie(result, processing_node_holder) = trySetFileAsProcessingForUnorderedMode(path); std::tie(result, processing_node_holder) = trySetFileAsProcessingForUnorderedMode(path, file_status);
break; break;
} }
} }
@ -307,13 +306,13 @@ std::pair<S3QueueFilesMetadata::ProcessingNodeHolderPtr,
} }
if (result == SetFileProcessingResult::Success) if (result == SetFileProcessingResult::Success)
return std::pair(processing_node_holder, file_status); return processing_node_holder;
return {}; return {};
} }
std::pair<S3QueueFilesMetadata::SetFileProcessingResult, std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
S3QueueFilesMetadata::ProcessingNodeHolderPtr> S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path) S3QueueFilesMetadata::ProcessingNodeHolderPtr> S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path, const FileStatusPtr & file_status)
{ {
/// In one zookeeper transaction do the following: /// In one zookeeper transaction do the following:
/// 1. check that corresponding persistent nodes do not exist in processed/ and failed/; /// 1. check that corresponding persistent nodes do not exist in processed/ and failed/;
@ -340,7 +339,7 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
if (code == Coordination::Error::ZOK) if (code == Coordination::Error::ZOK)
{ {
auto holder = std::make_unique<ProcessingNodeHolder>(node_metadata.processing_id, path, zookeeper_processing_path / node_name, zk_client); auto holder = std::make_unique<ProcessingNodeHolder>(node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client);
return std::pair{SetFileProcessingResult::Success, std::move(holder)}; return std::pair{SetFileProcessingResult::Success, std::move(holder)};
} }
@ -363,7 +362,7 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
} }
std::pair<S3QueueFilesMetadata::SetFileProcessingResult, std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
S3QueueFilesMetadata::ProcessingNodeHolderPtr> S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path) S3QueueFilesMetadata::ProcessingNodeHolderPtr> S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path, const FileStatusPtr & file_status)
{ {
/// Same as for Unordered mode. /// Same as for Unordered mode.
/// The only difference is the check if the file is already processed. /// The only difference is the check if the file is already processed.
@ -394,7 +393,10 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
auto max_processed_file_path = processed_node_metadata.file_path; auto max_processed_file_path = processed_node_metadata.file_path;
if (!max_processed_file_path.empty() && path <= max_processed_file_path) if (!max_processed_file_path.empty() && path <= max_processed_file_path)
{
LOG_TEST(log, "File {} is already processed, max processed file: {}", path, max_processed_file_path);
return std::pair{SetFileProcessingResult::AlreadyProcessed, nullptr}; return std::pair{SetFileProcessingResult::AlreadyProcessed, nullptr};
}
Coordination::Requests requests; Coordination::Requests requests;
requests.push_back(zkutil::makeCreateRequest(zookeeper_failed_path / node_name, "", zkutil::CreateMode::Persistent)); requests.push_back(zkutil::makeCreateRequest(zookeeper_failed_path / node_name, "", zkutil::CreateMode::Persistent));
@ -407,7 +409,7 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
auto code = zk_client->tryMulti(requests, responses); auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK) if (code == Coordination::Error::ZOK)
{ {
auto holder = std::make_unique<ProcessingNodeHolder>(node_metadata.processing_id, path, zookeeper_processing_path / node_name, zk_client); auto holder = std::make_unique<ProcessingNodeHolder>(node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client);
return std::pair{SetFileProcessingResult::Success, std::move(holder)}; return std::pair{SetFileProcessingResult::Success, std::move(holder)};
} }
@ -423,7 +425,7 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
} }
else else
{ {
LOG_TEST(log, "Version of max processed file changed. Retring the check for file `{}`", path); LOG_TEST(log, "Version of max processed file changed. Retrying the check for file `{}`", path);
} }
} }
} }
@ -431,9 +433,7 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
void S3QueueFilesMetadata::setFileProcessed(ProcessingNodeHolderPtr holder) void S3QueueFilesMetadata::setFileProcessed(ProcessingNodeHolderPtr holder)
{ {
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessedMicroseconds); auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessedMicroseconds);
const auto & path = holder->path; auto file_status = holder->getFileStatus();
auto file_status = local_file_statuses.get(path, /* create */false);
{ {
std::lock_guard lock(file_status->metadata_lock); std::lock_guard lock(file_status->metadata_lock);
file_status->state = FileStatus::State::Processed; file_status->state = FileStatus::State::Processed;
@ -559,7 +559,7 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds); auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds);
const auto & path = holder->path; const auto & path = holder->path;
auto file_status = local_file_statuses.get(path, /* create */false); auto file_status = holder->getFileStatus();
{ {
std::lock_guard lock(file_status->metadata_lock); std::lock_guard lock(file_status->metadata_lock);
file_status->state = FileStatus::State::Failed; file_status->state = FileStatus::State::Failed;
@ -682,8 +682,10 @@ S3QueueFilesMetadata::ProcessingNodeHolder::ProcessingNodeHolder(
const std::string & processing_id_, const std::string & processing_id_,
const std::string & path_, const std::string & path_,
const std::string & zk_node_path_, const std::string & zk_node_path_,
FileStatusPtr file_status_,
zkutil::ZooKeeperPtr zk_client_) zkutil::ZooKeeperPtr zk_client_)
: zk_client(zk_client_) : zk_client(zk_client_)
, file_status(file_status_)
, path(path_) , path(path_)
, zk_node_path(zk_node_path_) , zk_node_path(zk_node_path_)
, processing_id(processing_id_) , processing_id(processing_id_)
@ -790,7 +792,18 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
const bool check_nodes_ttl = max_set_age_sec > 0; const bool check_nodes_ttl = max_set_age_sec > 0;
const auto zk_client = getZooKeeper(); const auto zk_client = getZooKeeper();
auto nodes = zk_client->getChildren(zookeeper_processed_path); Strings nodes;
auto code = zk_client->tryGetChildren(zookeeper_processed_path, nodes);
if (code != Coordination::Error::ZOK)
{
if (code == Coordination::Error::ZNONODE)
{
LOG_TEST(log, "A `processed` not is not yet created");
return;
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error: {}", magic_enum::enum_name(code));
}
if (nodes.empty()) if (nodes.empty())
{ {
LOG_TEST(log, "A set of nodes is empty"); LOG_TEST(log, "A set of nodes is empty");
@ -869,7 +882,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
local_file_statuses.remove(node.metadata.file_path, /* if_exists */true); local_file_statuses.remove(node.metadata.file_path, /* if_exists */true);
auto code = zk_client->tryRemove(path); code = zk_client->tryRemove(path);
if (code == Coordination::Error::ZOK) if (code == Coordination::Error::ZOK)
--nodes_to_remove; --nodes_to_remove;
else else
@ -886,7 +899,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
local_file_statuses.remove(node.metadata.file_path, /* if_exists */true); local_file_statuses.remove(node.metadata.file_path, /* if_exists */true);
auto code = zk_client->tryRemove(path); code = zk_client->tryRemove(path);
if (code != Coordination::Error::ZOK) if (code != Coordination::Error::ZOK)
LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", path.string(), code); LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", path.string(), code);
} }

View File

@ -70,7 +70,7 @@ public:
using FileStatuses = std::unordered_map<std::string, FileStatusPtr>; using FileStatuses = std::unordered_map<std::string, FileStatusPtr>;
/// Set file as processing, if it is not alreaty processed, failed or processing. /// Set file as processing, if it is not alreaty processed, failed or processing.
std::pair<ProcessingNodeHolderPtr, FileStatusPtr> trySetFileAsProcessing(const std::string & path); ProcessingNodeHolderPtr trySetFileAsProcessing(const std::string & path);
FileStatusPtr getFileStatus(const std::string & path); FileStatusPtr getFileStatus(const std::string & path);
@ -112,8 +112,8 @@ private:
AlreadyProcessed, AlreadyProcessed,
AlreadyFailed, AlreadyFailed,
}; };
std::pair<SetFileProcessingResult, ProcessingNodeHolderPtr> trySetFileAsProcessingForOrderedMode(const std::string & path); std::pair<SetFileProcessingResult, ProcessingNodeHolderPtr> trySetFileAsProcessingForOrderedMode(const std::string & path, const FileStatusPtr & file_status);
std::pair<SetFileProcessingResult, ProcessingNodeHolderPtr> trySetFileAsProcessingForUnorderedMode(const std::string & path); std::pair<SetFileProcessingResult, ProcessingNodeHolderPtr> trySetFileAsProcessingForUnorderedMode(const std::string & path, const FileStatusPtr & file_status);
struct NodeMetadata struct NodeMetadata
{ {
@ -153,14 +153,18 @@ public:
const std::string & processing_id_, const std::string & processing_id_,
const std::string & path_, const std::string & path_,
const std::string & zk_node_path_, const std::string & zk_node_path_,
FileStatusPtr file_status_,
zkutil::ZooKeeperPtr zk_client_); zkutil::ZooKeeperPtr zk_client_);
~ProcessingNodeHolder(); ~ProcessingNodeHolder();
FileStatusPtr getFileStatus() { return file_status; }
private: private:
bool remove(Coordination::Requests * requests = nullptr, Coordination::Responses * responses = nullptr); bool remove(Coordination::Requests * requests = nullptr, Coordination::Responses * responses = nullptr);
zkutil::ZooKeeperPtr zk_client; zkutil::ZooKeeperPtr zk_client;
FileStatusPtr file_status;
std::string path; std::string path;
std::string zk_node_path; std::string zk_node_path;
std::string processing_id; std::string processing_id;

View File

@ -33,11 +33,9 @@ namespace ErrorCodes
StorageS3QueueSource::S3QueueKeyWithInfo::S3QueueKeyWithInfo( StorageS3QueueSource::S3QueueKeyWithInfo::S3QueueKeyWithInfo(
const std::string & key_, const std::string & key_,
std::optional<S3::ObjectInfo> info_, std::optional<S3::ObjectInfo> info_,
Metadata::ProcessingNodeHolderPtr processing_holder_, Metadata::ProcessingNodeHolderPtr processing_holder_)
FileStatusPtr file_status_)
: StorageS3Source::KeyWithInfo(key_, info_) : StorageS3Source::KeyWithInfo(key_, info_)
, processing_holder(processing_holder_) , processing_holder(processing_holder_)
, file_status(file_status_)
{ {
} }
@ -57,13 +55,19 @@ StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next()
{ {
KeyWithInfoPtr val = glob_iterator->next(); KeyWithInfoPtr val = glob_iterator->next();
if (!val || shutdown_called) if (!val)
return {}; return {};
if (auto [processing_holder, processing_file_status] = metadata->trySetFileAsProcessing(val->key); if (shutdown_called)
{
LOG_TEST(&Poco::Logger::get("StorageS3QueueSource"), "Shutdown was called, stopping file iterator");
return {};
}
if (auto processing_holder = metadata->trySetFileAsProcessing(val->key);
processing_holder && !shutdown_called) processing_holder && !shutdown_called)
{ {
return std::make_shared<S3QueueKeyWithInfo>(val->key, val->info, processing_holder, processing_file_status); return std::make_shared<S3QueueKeyWithInfo>(val->key, val->info, processing_holder);
} }
} }
return {}; return {};
@ -84,8 +88,10 @@ StorageS3QueueSource::StorageS3QueueSource(
const NamesAndTypesList & requested_virtual_columns_, const NamesAndTypesList & requested_virtual_columns_,
ContextPtr context_, ContextPtr context_,
const std::atomic<bool> & shutdown_called_, const std::atomic<bool> & shutdown_called_,
const std::atomic<bool> & table_is_being_dropped_,
std::shared_ptr<S3QueueLog> s3_queue_log_, std::shared_ptr<S3QueueLog> s3_queue_log_,
const StorageID & storage_id_) const StorageID & storage_id_,
Poco::Logger * log_)
: ISource(header_) : ISource(header_)
, WithContext(context_) , WithContext(context_)
, name(std::move(name_)) , name(std::move(name_))
@ -94,10 +100,11 @@ StorageS3QueueSource::StorageS3QueueSource(
, internal_source(std::move(internal_source_)) , internal_source(std::move(internal_source_))
, requested_virtual_columns(requested_virtual_columns_) , requested_virtual_columns(requested_virtual_columns_)
, shutdown_called(shutdown_called_) , shutdown_called(shutdown_called_)
, table_is_being_dropped(table_is_being_dropped_)
, s3_queue_log(s3_queue_log_) , s3_queue_log(s3_queue_log_)
, storage_id(storage_id_) , storage_id(storage_id_)
, remove_file_func(remove_file_func_) , remove_file_func(remove_file_func_)
, log(&Poco::Logger::get("StorageS3QueueSource")) , log(log_)
{ {
} }
@ -132,29 +139,60 @@ Chunk StorageS3QueueSource::generate()
if (!reader) if (!reader)
break; break;
const auto * key_with_info = dynamic_cast<const S3QueueKeyWithInfo *>(&reader.getKeyWithInfo());
auto file_status = key_with_info->processing_holder->getFileStatus();
if (isCancelled()) if (isCancelled())
{ {
reader->cancel(); reader->cancel();
if (processed_rows_from_file)
{
try
{
files_metadata->setFileFailed(key_with_info->processing_holder, "Cancelled");
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
appendLogElement(reader.getFile(), *file_status, processed_rows_from_file, false);
}
break; break;
} }
if (shutdown_called) if (shutdown_called)
{ {
if (processed_rows_from_file) if (processed_rows_from_file == 0)
{ break;
/// We could delay shutdown until files, which already started processing before the shutdown, finished.
/// But if files are big and `s3queue_processing_threads_num` is not small, it can take a significant time.
/// Anyway we cannot do anything in case of SIGTERM, so destination table must anyway support deduplication,
/// so here we will rely on it here as well.
LOG_WARNING(
log, "Shutdown called, {} rows are already processed, but file is not fully processed",
processed_rows_from_file);
}
break;
}
const auto * key_with_info = dynamic_cast<const S3QueueKeyWithInfo *>(&reader.getKeyWithInfo()); if (table_is_being_dropped)
auto file_status = key_with_info->file_status; {
LOG_DEBUG(
log, "Table is being dropped, {} rows are already processed from {}, but file is not fully processed",
processed_rows_from_file, reader.getFile());
try
{
files_metadata->setFileFailed(key_with_info->processing_holder, "Table is dropped");
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
appendLogElement(reader.getFile(), *file_status, processed_rows_from_file, false);
/// Leave the file half processed. Table is being dropped, so we do not care.
break;
}
LOG_DEBUG(log, "Shutdown called, but file {} is partially processed ({} rows). "
"Will process the file fully and then shutdown",
reader.getFile(), processed_rows_from_file);
}
auto * prev_scope = CurrentThread::get().attachProfileCountersScope(&file_status->profile_counters); auto * prev_scope = CurrentThread::get().attachProfileCountersScope(&file_status->profile_counters);
SCOPE_EXIT({ CurrentThread::get().attachProfileCountersScope(prev_scope); }); SCOPE_EXIT({ CurrentThread::get().attachProfileCountersScope(prev_scope); });

View File

@ -30,11 +30,9 @@ public:
S3QueueKeyWithInfo( S3QueueKeyWithInfo(
const std::string & key_, const std::string & key_,
std::optional<S3::ObjectInfo> info_, std::optional<S3::ObjectInfo> info_,
Metadata::ProcessingNodeHolderPtr processing_holder_, Metadata::ProcessingNodeHolderPtr processing_holder_);
FileStatusPtr file_status_);
Metadata::ProcessingNodeHolderPtr processing_holder; Metadata::ProcessingNodeHolderPtr processing_holder;
FileStatusPtr file_status;
}; };
class FileIterator : public IIterator class FileIterator : public IIterator
@ -66,8 +64,10 @@ public:
const NamesAndTypesList & requested_virtual_columns_, const NamesAndTypesList & requested_virtual_columns_,
ContextPtr context_, ContextPtr context_,
const std::atomic<bool> & shutdown_called_, const std::atomic<bool> & shutdown_called_,
const std::atomic<bool> & table_is_being_dropped_,
std::shared_ptr<S3QueueLog> s3_queue_log_, std::shared_ptr<S3QueueLog> s3_queue_log_,
const StorageID & storage_id_); const StorageID & storage_id_,
Poco::Logger * log_);
~StorageS3QueueSource() override; ~StorageS3QueueSource() override;
@ -84,6 +84,7 @@ private:
const std::shared_ptr<StorageS3Source> internal_source; const std::shared_ptr<StorageS3Source> internal_source;
const NamesAndTypesList requested_virtual_columns; const NamesAndTypesList requested_virtual_columns;
const std::atomic<bool> & shutdown_called; const std::atomic<bool> & shutdown_called;
const std::atomic<bool> & table_is_being_dropped;
const std::shared_ptr<S3QueueLog> s3_queue_log; const std::shared_ptr<S3QueueLog> s3_queue_log;
const StorageID storage_id; const StorageID storage_id;

View File

@ -161,8 +161,9 @@ void StorageS3Queue::startup()
task->activateAndSchedule(); task->activateAndSchedule();
} }
void StorageS3Queue::shutdown() void StorageS3Queue::shutdown(bool is_drop)
{ {
table_is_being_dropped = is_drop;
shutdown_called = true; shutdown_called = true;
if (task) if (task)
@ -257,7 +258,7 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
return std::make_shared<StorageS3QueueSource>( return std::make_shared<StorageS3QueueSource>(
getName(), read_from_format_info.source_header, std::move(internal_source), getName(), read_from_format_info.source_header, std::move(internal_source),
files_metadata, after_processing, file_deleter, read_from_format_info.requested_virtual_columns, files_metadata, after_processing, file_deleter, read_from_format_info.requested_virtual_columns,
local_context, shutdown_called, s3_queue_log, getStorageID()); local_context, shutdown_called, table_is_being_dropped, s3_queue_log, getStorageID(), log);
} }
bool StorageS3Queue::hasDependencies(const StorageID & table_id) bool StorageS3Queue::hasDependencies(const StorageID & table_id)

View File

@ -73,10 +73,11 @@ private:
std::atomic<bool> mv_attached = false; std::atomic<bool> mv_attached = false;
std::atomic<bool> shutdown_called = false; std::atomic<bool> shutdown_called = false;
std::atomic<bool> table_is_being_dropped = false;
Poco::Logger * log; Poco::Logger * log;
void startup() override; void startup() override;
void shutdown() override; void shutdown(bool is_drop) override;
void drop() override; void drop() override;
bool supportsSubsetOfColumns(const ContextPtr & context_) const; bool supportsSubsetOfColumns(const ContextPtr & context_) const;
bool supportsSubcolumns() const override { return true; } bool supportsSubcolumns() const override { return true; }

View File

@ -170,7 +170,7 @@ std::shared_ptr<const IDictionary> StorageDictionary::getDictionary() const
return getContext()->getExternalDictionariesLoader().getDictionary(registered_dictionary_name, getContext()); return getContext()->getExternalDictionariesLoader().getDictionary(registered_dictionary_name, getContext());
} }
void StorageDictionary::shutdown() void StorageDictionary::shutdown(bool)
{ {
removeDictionaryConfigurationFromRepository(); removeDictionaryConfigurationFromRepository();
} }

View File

@ -83,7 +83,7 @@ public:
static NamesAndTypesList getNamesAndTypes(const DictionaryStructure & dictionary_structure); static NamesAndTypesList getNamesAndTypes(const DictionaryStructure & dictionary_structure);
bool isDictionary() const override { return true; } bool isDictionary() const override { return true; }
void shutdown() override; void shutdown(bool is_drop) override;
void startup() override; void startup() override;
void renameInMemory(const StorageID & new_table_id) override; void renameInMemory(const StorageID & new_table_id) override;

View File

@ -1248,7 +1248,7 @@ void StorageDistributed::initializeFromDisk()
} }
void StorageDistributed::shutdown() void StorageDistributed::shutdown(bool)
{ {
async_insert_blocker.cancelForever(); async_insert_blocker.cancelForever();
@ -1269,7 +1269,7 @@ void StorageDistributed::drop()
// And second time shutdown() should be fast, since none of // And second time shutdown() should be fast, since none of
// DirectoryMonitor should not do anything, because ActionBlocker is // DirectoryMonitor should not do anything, because ActionBlocker is
// canceled (in shutdown()). // canceled (in shutdown()).
shutdown(); shutdown(true);
// Distributed table without sharding_key does not allows INSERTs // Distributed table without sharding_key does not allows INSERTs
if (relative_data_path.empty()) if (relative_data_path.empty())

View File

@ -137,7 +137,7 @@ public:
void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & table_lock_holder) override; void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & table_lock_holder) override;
void initializeFromDisk(); void initializeFromDisk();
void shutdown() override; void shutdown(bool is_drop) override;
void flushAndPrepareForShutdown() override; void flushAndPrepareForShutdown() override;
void drop() override; void drop() override;

View File

@ -398,7 +398,7 @@ void StorageMaterializedView::startup()
DatabaseCatalog::instance().addViewDependency(select_query.select_table_id, getStorageID()); DatabaseCatalog::instance().addViewDependency(select_query.select_table_id, getStorageID());
} }
void StorageMaterializedView::shutdown() void StorageMaterializedView::shutdown(bool)
{ {
auto metadata_snapshot = getInMemoryMetadataPtr(); auto metadata_snapshot = getInMemoryMetadataPtr();
const auto & select_query = metadata_snapshot->getSelectQuery(); const auto & select_query = metadata_snapshot->getSelectQuery();

View File

@ -72,7 +72,7 @@ public:
void renameInMemory(const StorageID & new_table_id) override; void renameInMemory(const StorageID & new_table_id) override;
void startup() override; void startup() override;
void shutdown() override; void shutdown(bool is_drop) override;
QueryProcessingStage::Enum QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;

View File

@ -158,7 +158,7 @@ void StorageMergeTree::startup()
/// It means that failed "startup" must not create any background tasks that we will have to wait. /// It means that failed "startup" must not create any background tasks that we will have to wait.
try try
{ {
shutdown(); shutdown(false);
} }
catch (...) catch (...)
{ {
@ -170,7 +170,7 @@ void StorageMergeTree::startup()
} }
} }
void StorageMergeTree::shutdown() void StorageMergeTree::shutdown(bool)
{ {
if (shutdown_called.exchange(true)) if (shutdown_called.exchange(true))
return; return;
@ -196,7 +196,7 @@ void StorageMergeTree::shutdown()
StorageMergeTree::~StorageMergeTree() StorageMergeTree::~StorageMergeTree()
{ {
shutdown(); shutdown(false);
} }
void StorageMergeTree::read( void StorageMergeTree::read(
@ -290,7 +290,7 @@ void StorageMergeTree::checkTableCanBeDropped([[ maybe_unused ]] ContextPtr quer
void StorageMergeTree::drop() void StorageMergeTree::drop()
{ {
shutdown(); shutdown(true);
/// In case there is read-only disk we cannot allow to call dropAllData(), but dropping tables is allowed. /// In case there is read-only disk we cannot allow to call dropAllData(), but dropping tables is allowed.
if (isStaticStorage()) if (isStaticStorage())
return; return;

View File

@ -45,7 +45,7 @@ public:
bool has_force_restore_data_flag); bool has_force_restore_data_flag);
void startup() override; void startup() override;
void shutdown() override; void shutdown(bool is_drop) override;
~StorageMergeTree() override; ~StorageMergeTree() override;

View File

@ -138,7 +138,7 @@ public:
CancellationCode killMutation(const String & mutation_id) override { return getNested()->killMutation(mutation_id); } CancellationCode killMutation(const String & mutation_id) override { return getNested()->killMutation(mutation_id); }
void startup() override { getNested()->startup(); } void startup() override { getNested()->startup(); }
void shutdown() override { getNested()->shutdown(); } void shutdown(bool is_drop) override { getNested()->shutdown(is_drop); }
void flushAndPrepareForShutdown() override { getNested()->flushAndPrepareForShutdown(); } void flushAndPrepareForShutdown() override { getNested()->flushAndPrepareForShutdown(); }
ActionLock getActionLock(StorageActionBlockType action_type) override { return getNested()->getActionLock(action_type); } ActionLock getActionLock(StorageActionBlockType action_type) override { return getNested()->getActionLock(action_type); }

View File

@ -4995,7 +4995,7 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread)
} }
else else
{ {
shutdown(); shutdown(false);
} }
} }
catch (...) catch (...)
@ -5070,7 +5070,7 @@ void StorageReplicatedMergeTree::partialShutdown()
LOG_TRACE(log, "Threads finished"); LOG_TRACE(log, "Threads finished");
} }
void StorageReplicatedMergeTree::shutdown() void StorageReplicatedMergeTree::shutdown(bool)
{ {
if (shutdown_called.exchange(true)) if (shutdown_called.exchange(true))
return; return;
@ -5124,7 +5124,7 @@ StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
{ {
try try
{ {
shutdown(); shutdown(false);
} }
catch (...) catch (...)
{ {

View File

@ -139,7 +139,7 @@ public:
/// In shutdown we completely terminate table -- remove /// In shutdown we completely terminate table -- remove
/// is_active node and interserver handler. Also optionally /// is_active node and interserver handler. Also optionally
/// wait until other replicas will download some parts from our replica. /// wait until other replicas will download some parts from our replica.
void shutdown() override; void shutdown(bool is_drop) override;
~StorageReplicatedMergeTree() override; ~StorageReplicatedMergeTree() override;

View File

@ -72,11 +72,11 @@ public:
} }
void startup() override { } void startup() override { }
void shutdown() override void shutdown(bool is_drop) override
{ {
std::lock_guard lock{nested_mutex}; std::lock_guard lock{nested_mutex};
if (nested) if (nested)
nested->shutdown(); nested->shutdown(is_drop);
} }
void flushAndPrepareForShutdown() override void flushAndPrepareForShutdown() override

View File

@ -461,7 +461,7 @@ void StorageWindowView::alter(
modifying_query = false; modifying_query = false;
}); });
shutdown(); shutdown(false);
auto inner_query = initInnerQuery(new_select_query->as<ASTSelectQuery &>(), local_context); auto inner_query = initInnerQuery(new_select_query->as<ASTSelectQuery &>(), local_context);
@ -1586,7 +1586,7 @@ void StorageWindowView::startup()
fire_task->schedule(); fire_task->schedule();
} }
void StorageWindowView::shutdown() void StorageWindowView::shutdown(bool)
{ {
shutdown_called = true; shutdown_called = true;

View File

@ -142,7 +142,7 @@ public:
void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override; void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override;
void startup() override; void startup() override;
void shutdown() override; void shutdown(bool is_drop) override;
void read( void read(
QueryPlan & query_plan, QueryPlan & query_plan,

View File

@ -164,7 +164,6 @@ def generate_random_files(
values_csv = ( values_csv = (
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n" "\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
).encode() ).encode()
print(f"File {filename}, content: {rand_values}")
put_s3_file_content(started_cluster, filename, values_csv) put_s3_file_content(started_cluster, filename, values_csv)
return total_values return total_values
@ -889,3 +888,33 @@ def test_max_set_size(started_cluster):
time.sleep(10) time.sleep(10)
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()] res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
assert res1 == [total_values[1]] assert res1 == [total_values[1]]
def test_drop_table(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"test_drop"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
create_table(
started_cluster,
node,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 5,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=100000
)
create_mv(node, table_name, dst_table_name)
node.wait_for_log_line(f"Reading from file: test_drop_data")
node.query(f"DROP TABLE {table_name} SYNC")
assert node.contains_in_log(
f"StorageS3Queue ({table_name}): Table is being dropped"
)