Quick fix for s3queue problem

This commit is contained in:
kssenii 2024-09-10 17:41:52 +02:00
parent a34a544f4a
commit d43264c44e

View File

@ -64,9 +64,7 @@ namespace
void checkAndAdjustSettings( void checkAndAdjustSettings(
ObjectStorageQueueSettings & queue_settings, ObjectStorageQueueSettings & queue_settings,
ASTStorage * engine_args, bool is_attach)
bool is_attach,
const LoggerPtr & log)
{ {
if (!is_attach && !queue_settings.mode.changed) if (!is_attach && !queue_settings.mode.changed)
{ {
@ -85,16 +83,6 @@ namespace
"Setting `cleanup_interval_min_ms` ({}) must be less or equal to `cleanup_interval_max_ms` ({})", "Setting `cleanup_interval_min_ms` ({}) must be less or equal to `cleanup_interval_max_ms` ({})",
queue_settings.cleanup_interval_min_ms, queue_settings.cleanup_interval_max_ms); queue_settings.cleanup_interval_min_ms, queue_settings.cleanup_interval_max_ms);
} }
if (!is_attach && !queue_settings.processing_threads_num.changed)
{
queue_settings.processing_threads_num = std::max<uint32_t>(getNumberOfPhysicalCPUCores(), 16);
engine_args->settings->as<ASTSetQuery>()->changes.insertSetting(
"processing_threads_num",
queue_settings.processing_threads_num.value);
LOG_TRACE(log, "Set `processing_threads_num` to {}", queue_settings.processing_threads_num);
}
} }
std::shared_ptr<ObjectStorageQueueLog> getQueueLog(const ObjectStoragePtr & storage, const ContextPtr & context, const ObjectStorageQueueSettings & table_settings) std::shared_ptr<ObjectStorageQueueLog> getQueueLog(const ObjectStoragePtr & storage, const ContextPtr & context, const ObjectStorageQueueSettings & table_settings)
@ -154,7 +142,7 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
throw Exception(ErrorCodes::BAD_QUERY_PARAMETER, "ObjectStorageQueue url must either end with '/' or contain globs"); throw Exception(ErrorCodes::BAD_QUERY_PARAMETER, "ObjectStorageQueue url must either end with '/' or contain globs");
} }
checkAndAdjustSettings(*queue_settings, engine_args, mode > LoadingStrictnessLevel::CREATE, log); checkAndAdjustSettings(*queue_settings, mode > LoadingStrictnessLevel::CREATE);
object_storage = configuration->createObjectStorage(context_, /* is_readonly */true); object_storage = configuration->createObjectStorage(context_, /* is_readonly */true);
FormatFactory::instance().checkFormatName(configuration->format); FormatFactory::instance().checkFormatName(configuration->format);