diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index 79173503f28..2077967e5b6 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -50,7 +50,7 @@ namespace DB M(UInt32, asynchronous_heavy_metrics_update_period_s, 120, "Period in seconds for updating heavy asynchronous metrics.", 0) \ M(String, default_database, "default", "Default database name.", 0) \ M(String, tmp_policy, "", "Policy for storage with temporary data.", 0) \ - M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting., ", 0) \ + M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting.", 0) \ M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \ M(UInt64, aggregate_function_group_array_max_element_size, 0xFFFFFF, "Max array element size in bytes for groupArray function. This limit is checked at serialization and help to avoid large state size.", 0) \ M(GroupArrayActionWhenLimitReached, aggregate_function_group_array_action_when_limit_is_reached, GroupArrayActionWhenLimitReached::THROW, "Action to execute when max array element size is exceeded in groupArray: `throw` exception, or `discard` extra values", 0) \ @@ -65,6 +65,7 @@ namespace DB M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \ M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \ M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \ + M(UInt64, max_thread_pool_size_for_vector_similarity_index_creation, 16, "The maximum number of threads that could be allocated from the OS and used for vector similarity index creation(0 means all cores).", 0) \ \ /* Database Catalog */ \ M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 373cc91ebcb..051378ef6d4 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -120,7 +121,6 @@ #include #include - namespace fs = std::filesystem; namespace ProfileEvents @@ -168,6 +168,9 @@ namespace CurrentMetrics extern const Metric AttachedDictionary; extern const Metric AttachedDatabase; extern const Metric PartsActive; + extern const Metric UsearchUpdateThreads; + extern const Metric UsearchUpdateThreadsActive; + extern const Metric UsearchUpdateThreadsScheduled; } @@ -296,6 +299,8 @@ struct ContextSharedPart : boost::noncopyable mutable std::unique_ptr load_marks_threadpool; /// Threadpool for loading marks cache. mutable OnceFlag prefetch_threadpool_initialized; mutable std::unique_ptr prefetch_threadpool; /// Threadpool for loading marks cache. + mutable OnceFlag vector_similarity_index_creation_threadpool_initialized; + mutable std::unique_ptr vector_similarity_index_creation_threadpool; /// Threadpool for vector-similarity index creation. mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices. mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results. mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices. @@ -3095,6 +3100,25 @@ ThreadPool & Context::getLoadMarksThreadpool() const return *shared->load_marks_threadpool; } +ThreadPool & Context::getVectorSimilarityIndexCreationThreadPool() const +{ + callOnce( + shared->vector_similarity_index_creation_threadpool_initialized, + [&] + { + const auto & server_setting = getServerSettings(); + size_t max_thread_pool_size = server_setting.max_thread_pool_size_for_vector_similarity_index_creation > 0 + ? server_setting.max_thread_pool_size_for_vector_similarity_index_creation + : getNumberOfPhysicalCPUCores(); + shared->vector_similarity_index_creation_threadpool = std::make_unique( + CurrentMetrics::UsearchUpdateThreads, + CurrentMetrics::UsearchUpdateThreadsActive, + CurrentMetrics::UsearchUpdateThreadsScheduled, + max_thread_pool_size); + }); + return *shared->vector_similarity_index_creation_threadpool; +} + void Context::setIndexUncompressedCache(const String & cache_policy, size_t max_size_in_bytes, double size_ratio) { std::lock_guard lock(shared->mutex); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index fb5337158ba..e8f6ce9b3e1 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1096,6 +1096,8 @@ public: /// and make a prefetch by putting a read task to threadpoolReader. size_t getPrefetchThreadpoolSize() const; + ThreadPool & getVectorSimilarityIndexCreationThreadPool() const; + /// Settings for MergeTree background tasks stored in config.xml BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const; BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const; diff --git a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp index 6379c837281..e5ed19585f8 100644 --- a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -27,13 +26,6 @@ namespace ProfileEvents extern const Event USearchSearchComputedDistances; } -namespace CurrentMetrics -{ -extern const Metric UsearchUpdateThreads; -extern const Metric UsearchUpdateThreadsActive; -extern const Metric UsearchUpdateThreadsScheduled; -} - namespace DB { @@ -283,14 +275,7 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows))) throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index"); - size_t max_threads = Context::getGlobalContextInstance()->getSettingsRef().max_threads; - max_threads = max_threads > 0 ? max_threads : getNumberOfPhysicalCPUCores(); - - auto thread_pool = std::make_unique( - CurrentMetrics::UsearchUpdateThreads, - CurrentMetrics::UsearchUpdateThreadsActive, - CurrentMetrics::UsearchUpdateThreadsScheduled, - max_threads); + auto & thread_pool = Context::getGlobalContextInstance()->getVectorSimilarityIndexCreationThreadPool(); auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group) { @@ -316,9 +301,9 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c { auto key = static_cast(current_index_size + row); auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); }; - thread_pool->scheduleOrThrowOnError(task); + thread_pool.scheduleOrThrowOnError(task); } - thread_pool->wait(); + thread_pool.wait(); } }