Better system queue log management

This commit is contained in:
kssenii 2024-06-25 12:40:09 +02:00
parent 5447145c7a
commit 304f8c7cff
4 changed files with 30 additions and 13 deletions

View File

@ -23,8 +23,12 @@ void ObjectStorageQueueSettings::loadFromQuery(ASTStorage & storage_def)
{
/// We support settings starting with s3_ for compatibility.
for (auto & change : storage_def.settings->changes)
{
if (change.name.starts_with("s3queue_"))
change.name = change.name.substr(std::strlen("s3queue_"));
if (change.name == "enable_logging_to_s3queue_log")
change.name = "enable_logging_to_queue_log";
}
applyChanges(storage_def.settings->changes);
}

View File

@ -21,8 +21,7 @@ class ASTStorage;
M(String, keeper_path, "", "Zookeeper node path", 0) \
M(UInt32, loading_retries, 0, "Retry loading up to specified number of times", 0) \
M(UInt32, processing_threads_num, 1, "Number of processing threads", 0) \
M(UInt32, enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \
M(UInt32, enable_logging_to_azure_queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \
M(UInt32, enable_logging_to_queue_log, 1, "Enable logging to system table system.(s3/azure_)queue_log", 0) \
M(String, last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \
M(UInt32, tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
M(UInt32, polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \

View File

@ -60,7 +60,7 @@ namespace
return zkutil::extractZooKeeperPath(result_zk_path, true);
}
void checkAndAdjustSettings(ObjectStorageQueueSettings & queue_settings, const Settings & settings, bool is_attach)
void checkAndAdjustSettings(ObjectStorageQueueSettings & queue_settings, bool is_attach)
{
if (!is_attach && !queue_settings.mode.changed)
{
@ -73,11 +73,6 @@ namespace
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `processing_threads_num` cannot be set to zero");
}
if (!queue_settings.enable_logging_to_s3queue_log.changed)
{
queue_settings.enable_logging_to_s3queue_log = settings.s3queue_enable_logging_to_s3queue_log;
}
if (queue_settings.cleanup_interval_min_ms > queue_settings.cleanup_interval_max_ms)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -85,6 +80,28 @@ namespace
queue_settings.cleanup_interval_min_ms, queue_settings.cleanup_interval_max_ms);
}
}
std::shared_ptr<ObjectStorageQueueLog> getQueueLog(const ObjectStoragePtr & storage, const ContextPtr & context, const ObjectStorageQueueSettings & table_settings)
{
const auto & settings = context->getSettingsRef();
switch (storage->getType())
{
case DB::ObjectStorageType::S3:
{
if (table_settings.enable_logging_to_queue_log || settings.s3queue_enable_logging_to_s3queue_log)
return context->getS3QueueLog();
return nullptr;
}
case DB::ObjectStorageType::Azure:
{
if (table_settings.enable_logging_to_queue_log)
return context->getAzureQueueLog();
return nullptr;
}
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected object storage type: {}", storage->getType());
}
}
}
StorageObjectStorageQueue::StorageObjectStorageQueue(
@ -120,7 +137,7 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "ObjectStorageQueue url must either end with '/' or contain globs");
}
checkAndAdjustSettings(*queue_settings, context_->getSettingsRef(), mode > LoadingStrictnessLevel::CREATE);
checkAndAdjustSettings(*queue_settings, mode > LoadingStrictnessLevel::CREATE);
object_storage = configuration->createObjectStorage(context_, /* is_readonly */true);
FormatFactory::instance().checkFormatName(configuration->format);
@ -332,9 +349,6 @@ std::shared_ptr<ObjectStorageQueueSource> StorageObjectStorageQueue::createSourc
{
object_storage->removeObject(StoredObject(path));
};
auto system_queue_log = queue_settings->enable_logging_to_s3queue_log
? local_context->getS3QueueLog()
: queue_settings->enable_logging_to_azure_queue_log ? local_context->getAzureQueueLog() : nullptr;
return std::make_shared<ObjectStorageQueueSource>(
getName(),
@ -348,7 +362,7 @@ std::shared_ptr<ObjectStorageQueueSource> StorageObjectStorageQueue::createSourc
local_context,
shutdown_called,
table_is_being_dropped,
system_queue_log,
getQueueLog(object_storage, local_context, *queue_settings),
getStorageID(),
log);
}