Backport #69454 to 24.8: Quick fix for s3queue problem

This commit is contained in:
robot-clickhouse 2024-09-10 22:07:34 +00:00
parent 9f3412bf9a
commit 55ef8386a7

View File

@ -64,9 +64,7 @@ namespace
void checkAndAdjustSettings( void checkAndAdjustSettings(
ObjectStorageQueueSettings & queue_settings, ObjectStorageQueueSettings & queue_settings,
ASTStorage * engine_args, bool is_attach)
bool is_attach,
const LoggerPtr & log)
{ {
if (!is_attach && !queue_settings.mode.changed) if (!is_attach && !queue_settings.mode.changed)
{ {
@ -85,16 +83,6 @@ namespace
"Setting `cleanup_interval_min_ms` ({}) must be less or equal to `cleanup_interval_max_ms` ({})", "Setting `cleanup_interval_min_ms` ({}) must be less or equal to `cleanup_interval_max_ms` ({})",
queue_settings.cleanup_interval_min_ms, queue_settings.cleanup_interval_max_ms); queue_settings.cleanup_interval_min_ms, queue_settings.cleanup_interval_max_ms);
} }
if (!is_attach && !queue_settings.processing_threads_num.changed)
{
queue_settings.processing_threads_num = std::max<uint32_t>(getNumberOfPhysicalCPUCores(), 16);
engine_args->settings->as<ASTSetQuery>()->changes.insertSetting(
"processing_threads_num",
queue_settings.processing_threads_num.value);
LOG_TRACE(log, "Set `processing_threads_num` to {}", queue_settings.processing_threads_num);
}
} }
std::shared_ptr<ObjectStorageQueueLog> getQueueLog(const ObjectStoragePtr & storage, const ContextPtr & context, const ObjectStorageQueueSettings & table_settings) std::shared_ptr<ObjectStorageQueueLog> getQueueLog(const ObjectStoragePtr & storage, const ContextPtr & context, const ObjectStorageQueueSettings & table_settings)
@ -130,7 +118,7 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
const String & comment, const String & comment,
ContextPtr context_, ContextPtr context_,
std::optional<FormatSettings> format_settings_, std::optional<FormatSettings> format_settings_,
ASTStorage * engine_args, ASTStorage * /* engine_args */,
LoadingStrictnessLevel mode) LoadingStrictnessLevel mode)
: IStorage(table_id_) : IStorage(table_id_)
, WithContext(context_) , WithContext(context_)
@ -154,7 +142,7 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
throw Exception(ErrorCodes::BAD_QUERY_PARAMETER, "ObjectStorageQueue url must either end with '/' or contain globs"); throw Exception(ErrorCodes::BAD_QUERY_PARAMETER, "ObjectStorageQueue url must either end with '/' or contain globs");
} }
checkAndAdjustSettings(*queue_settings, engine_args, mode > LoadingStrictnessLevel::CREATE, log); checkAndAdjustSettings(*queue_settings, mode > LoadingStrictnessLevel::CREATE);
object_storage = configuration->createObjectStorage(context_, /* is_readonly */true); object_storage = configuration->createObjectStorage(context_, /* is_readonly */true);
FormatFactory::instance().checkFormatName(configuration->format); FormatFactory::instance().checkFormatName(configuration->format);