From fe5e061fffe3ef448dfc8d22b5f1236b09e036ca Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Thu, 12 Sep 2024 10:16:29 +0000 Subject: [PATCH] Some fixups --- .../mergetree-family/annindexes.md | 4 ++ .../settings.md | 8 ++++ src/Common/CurrentMetrics.cpp | 6 +-- src/Core/ServerSettings.h | 2 +- src/Interpreters/Context.cpp | 44 +++++++++---------- src/Interpreters/Context.h | 2 +- .../MergeTreeIndexVectorSimilarity.cpp | 29 ++++++------ 7 files changed, 53 insertions(+), 42 deletions(-) diff --git a/docs/en/engines/table-engines/mergetree-family/annindexes.md b/docs/en/engines/table-engines/mergetree-family/annindexes.md index 3c75b8dbef0..b73700c40f4 100644 --- a/docs/en/engines/table-engines/mergetree-family/annindexes.md +++ b/docs/en/engines/table-engines/mergetree-family/annindexes.md @@ -107,6 +107,10 @@ The vector similarity index currently does not work with per-table, non-default [here](https://github.com/ClickHouse/ClickHouse/pull/51325#issuecomment-1605920475)). If necessary, the value must be changed in config.xml. ::: +Vector index creation is known to be slow. To speed the process up, index creation can be parallelized. The maximum number of threads can be +configured using server configuration +setting [max_build_vector_similarity_index_thread_pool_size](server-configuration-parameters/settings.md#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size). + ANN indexes are built during column insertion and merge. As a result, `INSERT` and `OPTIMIZE` statements will be slower than for ordinary tables. ANNIndexes are ideally used only with immutable or rarely changed data, respectively when are far more read requests than write requests. diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md index ccc8cf017ca..14a23964100 100644 --- a/docs/en/operations/server-configuration-parameters/settings.md +++ b/docs/en/operations/server-configuration-parameters/settings.md @@ -491,6 +491,14 @@ Type: Double Default: 0.9 +## max_build_vector_similarity_index_thread_pool_size {#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size} + +The maximum number of threads to use for building vector indexes. 0 means all cores. + +Type: UInt64 + +Default: 16 + ## cgroups_memory_usage_observer_wait_time Interval in seconds during which the server's maximum allowed memory consumption is adjusted by the corresponding threshold in cgroups. (see diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 1d37b4761e1..658eaedbda1 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -178,9 +178,9 @@ M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \ M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \ M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \ - M(UsearchUpdateThreads, "Number of threads in the Aggregator thread pool.") \ - M(UsearchUpdateThreadsActive, "Number of threads in the Aggregator thread pool running a task.") \ - M(UsearchUpdateThreadsScheduled, "Number of queued or active jobs in the Aggregator thread pool.") \ + M(BuildVectorSimilarityIndexThreads, "Number of threads in the build vector similarity index thread pool.") \ + M(BuildVectorSimilarityIndexThreadsActive, "Number of threads in the build vector similarity index thread pool running a task.") \ + M(BuildVectorSimilarityIndexThreadsScheduled, "Number of queued or active jobs in the build vector similarity index thread pool.") \ \ M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \ M(DiskPlainRewritableLocalDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for LocalObjectStorage.") \ diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index b5071e6fa5d..689b18cb74f 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -65,7 +65,7 @@ namespace DB M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \ M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \ M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \ - M(UInt64, max_thread_pool_size_for_vector_similarity_index_creation, 16, "The maximum number of threads that could be allocated from the OS and used for vector similarity index creation(0 means all cores).", 0) \ + M(UInt64, max_build_vector_similarity_index_thread_pool_size, 16, "The maximum number of threads to use to build vector similarity indexes. 0 means all cores.", 0) \ \ /* Database Catalog */ \ M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 7350fdecd25..311fd094706 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -164,14 +164,14 @@ namespace CurrentMetrics extern const Metric TablesLoaderForegroundThreadsActive; extern const Metric TablesLoaderForegroundThreadsScheduled; extern const Metric IOWriterThreadsScheduled; + extern const Metric BuildVectorSimilarityIndexThreads; + extern const Metric BuildVectorSimilarityIndexThreadsActive; + extern const Metric BuildVectorSimilarityIndexThreadsScheduled; extern const Metric AttachedTable; extern const Metric AttachedView; extern const Metric AttachedDictionary; extern const Metric AttachedDatabase; extern const Metric PartsActive; - extern const Metric UsearchUpdateThreads; - extern const Metric UsearchUpdateThreadsActive; - extern const Metric UsearchUpdateThreadsScheduled; } @@ -300,8 +300,8 @@ struct ContextSharedPart : boost::noncopyable mutable std::unique_ptr load_marks_threadpool; /// Threadpool for loading marks cache. mutable OnceFlag prefetch_threadpool_initialized; mutable std::unique_ptr prefetch_threadpool; /// Threadpool for loading marks cache. - mutable OnceFlag vector_similarity_index_creation_threadpool_initialized; - mutable std::unique_ptr vector_similarity_index_creation_threadpool; /// Threadpool for vector-similarity index creation. + mutable OnceFlag build_vector_similarity_index_threadpool_initialized; + mutable std::unique_ptr build_vector_similarity_index_threadpool; /// Threadpool for vector-similarity index creation. mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices. mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results. mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices. @@ -3101,25 +3101,6 @@ ThreadPool & Context::getLoadMarksThreadpool() const return *shared->load_marks_threadpool; } -ThreadPool & Context::getVectorSimilarityIndexCreationThreadPool() const -{ - callOnce( - shared->vector_similarity_index_creation_threadpool_initialized, - [&] - { - const auto & server_setting = getServerSettings(); - size_t max_thread_pool_size = server_setting.max_thread_pool_size_for_vector_similarity_index_creation > 0 - ? server_setting.max_thread_pool_size_for_vector_similarity_index_creation - : getNumberOfPhysicalCPUCores(); - shared->vector_similarity_index_creation_threadpool = std::make_unique( - CurrentMetrics::UsearchUpdateThreads, - CurrentMetrics::UsearchUpdateThreadsActive, - CurrentMetrics::UsearchUpdateThreadsScheduled, - max_thread_pool_size); - }); - return *shared->vector_similarity_index_creation_threadpool; -} - void Context::setIndexUncompressedCache(const String & cache_policy, size_t max_size_in_bytes, double size_ratio) { std::lock_guard lock(shared->mutex); @@ -3321,6 +3302,21 @@ size_t Context::getPrefetchThreadpoolSize() const return config.getUInt(".prefetch_threadpool_pool_size", 100); } +ThreadPool & Context::getBuildVectorSimilarityIndexThreadPool() const +{ + callOnce(shared->build_vector_similarity_index_threadpool_initialized, [&] { + size_t pool_size = shared->server_settings.max_build_vector_similarity_index_thread_pool_size > 0 + ? shared->server_settings.max_build_vector_similarity_index_thread_pool_size + : getNumberOfPhysicalCPUCores(); + shared->build_vector_similarity_index_threadpool = std::make_unique( + CurrentMetrics::BuildVectorSimilarityIndexThreads, + CurrentMetrics::BuildVectorSimilarityIndexThreadsActive, + CurrentMetrics::BuildVectorSimilarityIndexThreadsScheduled, + pool_size); + }); + return *shared->build_vector_similarity_index_threadpool; +} + BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const { callOnce(shared->buffer_flush_schedule_pool_initialized, [&] { diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index d309b964dbd..0daef2243aa 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1097,7 +1097,7 @@ public: /// and make a prefetch by putting a read task to threadpoolReader. size_t getPrefetchThreadpoolSize() const; - ThreadPool & getVectorSimilarityIndexCreationThreadPool() const; + ThreadPool & getBuildVectorSimilarityIndexThreadPool() const; /// Settings for MergeTree background tasks stored in config.xml BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const; diff --git a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp index e5ed19585f8..8a850141a67 100644 --- a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp @@ -4,12 +4,10 @@ #include #include -#include #include #include #include #include -#include #include #include #include @@ -31,7 +29,6 @@ namespace DB namespace ErrorCodes { - extern const int CANNOT_ALLOCATE_MEMORY; extern const int FORMAT_VERSION_TOO_OLD; extern const int ILLEGAL_COLUMN; extern const int INCORRECT_DATA; @@ -133,8 +130,7 @@ void USearchIndexWithSerialization::deserialize(ReadBuffer & istr) /// See the comment in MergeTreeIndexGranuleVectorSimilarity::deserializeBinary why we throw here throw Exception(ErrorCodes::INCORRECT_DATA, "Could not load vector similarity index. Please drop the index and create it again. Error: {}", String(result.error.release())); - if (!try_reserve(limits())) - throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for usearch index"); + try_reserve(limits()); } USearchIndexWithSerialization::Statistics USearchIndexWithSerialization::getStatistics() const @@ -272,21 +268,27 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column with vector similarity index must have equal length"); /// Reserving space is mandatory - if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows))) - throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index"); + index->reserve(roundUpToPowerOfTwoOrZero(index->size() + rows)); - auto & thread_pool = Context::getGlobalContextInstance()->getVectorSimilarityIndexCreationThreadPool(); + /// Vector index creation is slooooow. Add the new rows in parallel. The threadpool is global to avoid oversubscription when multiple + /// indexes are build simultaneously (e.g. multiple merges run at the same time). + auto & thread_pool = Context::getGlobalContextInstance()->getBuildVectorSimilarityIndexThreadPool(); auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group) { - SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachFromGroupIfNotDetached();); + SCOPE_EXIT_SAFE( + if (thread_group) + CurrentThread::detachFromGroupIfNotDetached(); + ); if (thread_group) CurrentThread::attachToGroupIfDetached(thread_group); + /// add is thread-safe if (auto result = index->add(key, &column_array_data_float_data[column_array_offsets[row - 1]]); !result) - throw Exception( - ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release())); + { + throw Exception(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release())); + } else { ProfileEvents::increment(ProfileEvents::USearchAddCount); @@ -295,14 +297,15 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c } }; - size_t current_index_size = index->size(); + size_t index_size = index->size(); for (size_t row = 0; row < rows; ++row) { - auto key = static_cast(current_index_size + row); + auto key = static_cast(index_size + row); auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); }; thread_pool.scheduleOrThrowOnError(task); } + thread_pool.wait(); }