Merge pull request #69480 from ClickHouse/backport/24.8/69454

Backport #69454 to 24.8: Quick fix for s3queue problem
This commit is contained in:
Kseniia Sumarokova 2024-09-17 13:08:34 +02:00 committed by GitHub
commit b998263132
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -64,9 +64,7 @@ namespace
void checkAndAdjustSettings( void checkAndAdjustSettings(
ObjectStorageQueueSettings & queue_settings, ObjectStorageQueueSettings & queue_settings,
ASTStorage * engine_args, bool is_attach)
bool is_attach,
const LoggerPtr & log)
{ {
if (!is_attach && !queue_settings.mode.changed) if (!is_attach && !queue_settings.mode.changed)
{ {
@ -85,16 +83,6 @@ namespace
"Setting `cleanup_interval_min_ms` ({}) must be less or equal to `cleanup_interval_max_ms` ({})", "Setting `cleanup_interval_min_ms` ({}) must be less or equal to `cleanup_interval_max_ms` ({})",
queue_settings.cleanup_interval_min_ms, queue_settings.cleanup_interval_max_ms); queue_settings.cleanup_interval_min_ms, queue_settings.cleanup_interval_max_ms);
} }
if (!is_attach && !queue_settings.processing_threads_num.changed)
{
queue_settings.processing_threads_num = std::max<uint32_t>(getNumberOfPhysicalCPUCores(), 16);
engine_args->settings->as<ASTSetQuery>()->changes.insertSetting(
"processing_threads_num",
queue_settings.processing_threads_num.value);
LOG_TRACE(log, "Set `processing_threads_num` to {}", queue_settings.processing_threads_num);
}
} }
std::shared_ptr<ObjectStorageQueueLog> getQueueLog(const ObjectStoragePtr & storage, const ContextPtr & context, const ObjectStorageQueueSettings & table_settings) std::shared_ptr<ObjectStorageQueueLog> getQueueLog(const ObjectStoragePtr & storage, const ContextPtr & context, const ObjectStorageQueueSettings & table_settings)
@ -130,7 +118,7 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
const String & comment, const String & comment,
ContextPtr context_, ContextPtr context_,
std::optional<FormatSettings> format_settings_, std::optional<FormatSettings> format_settings_,
ASTStorage * engine_args, ASTStorage * /* engine_args */,
LoadingStrictnessLevel mode) LoadingStrictnessLevel mode)
: IStorage(table_id_) : IStorage(table_id_)
, WithContext(context_) , WithContext(context_)
@ -154,7 +142,7 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
throw Exception(ErrorCodes::BAD_QUERY_PARAMETER, "ObjectStorageQueue url must either end with '/' or contain globs"); throw Exception(ErrorCodes::BAD_QUERY_PARAMETER, "ObjectStorageQueue url must either end with '/' or contain globs");
} }
checkAndAdjustSettings(*queue_settings, engine_args, mode > LoadingStrictnessLevel::CREATE, log); checkAndAdjustSettings(*queue_settings, mode > LoadingStrictnessLevel::CREATE);
object_storage = configuration->createObjectStorage(context_, /* is_readonly */true); object_storage = configuration->createObjectStorage(context_, /* is_readonly */true);
FormatFactory::instance().checkFormatName(configuration->format); FormatFactory::instance().checkFormatName(configuration->format);