From cf12e3924f51fd08c884519352a46495412a771a Mon Sep 17 00:00:00 2001 From: flynn Date: Wed, 11 Sep 2024 09:31:46 +0000 Subject: [PATCH 1/7] Speedup insert data with vector similarity index by add data to index parallel --- src/Common/CurrentMetrics.cpp | 3 ++ .../MergeTreeIndexVectorSimilarity.cpp | 42 +++++++++++++++++-- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 4bf2b0704f1..1d37b4761e1 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -178,6 +178,9 @@ M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \ M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \ M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \ + M(UsearchUpdateThreads, "Number of threads in the Aggregator thread pool.") \ + M(UsearchUpdateThreadsActive, "Number of threads in the Aggregator thread pool running a task.") \ + M(UsearchUpdateThreadsScheduled, "Number of queued or active jobs in the Aggregator thread pool.") \ \ M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \ M(DiskPlainRewritableLocalDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for LocalObjectStorage.") \ diff --git a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp index 58892d0dbf2..ad166839ce3 100644 --- a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp @@ -4,16 +4,20 @@ #include #include +#include #include +#include #include #include #include +#include #include #include #include #include #include + namespace ProfileEvents { extern const Event USearchAddCount; @@ -24,6 +28,13 @@ namespace ProfileEvents extern const Event USearchSearchComputedDistances; } +namespace CurrentMetrics +{ +extern const Metric UsearchUpdateThreads; +extern const Metric UsearchUpdateThreadsActive; +extern const Metric UsearchUpdateThreadsScheduled; +} + namespace DB { @@ -273,17 +284,42 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows))) throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index"); - for (size_t row = 0; row < rows; ++row) + size_t max_threads = Context::getGlobalContextInstance()->getSettingsRef().max_threads; + max_threads = max_threads > 0 ? max_threads : getNumberOfPhysicalCPUCores(); + + auto thread_pool = std::make_unique( + CurrentMetrics::UsearchUpdateThreads, + CurrentMetrics::UsearchUpdateThreadsActive, + CurrentMetrics::UsearchUpdateThreadsScheduled, + max_threads); + + auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group) { - if (auto result = index->add(static_cast(index->size()), &column_array_data_float_data[column_array_offsets[row - 1]]); !result) - throw Exception(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release())); + SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachFromGroupIfNotDetached();); + + if (thread_group) + CurrentThread::attachToGroupIfDetached(thread_group); + + if (auto result = index->add(key, &column_array_data_float_data[column_array_offsets[row - 1]]); !result) + throw Exception( + ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release())); else { ProfileEvents::increment(ProfileEvents::USearchAddCount); ProfileEvents::increment(ProfileEvents::USearchAddVisitedMembers, result.visited_members); ProfileEvents::increment(ProfileEvents::USearchAddComputedDistances, result.computed_distances); } + }; + + size_t current_index_size = index->size(); + + for (size_t row = 0; row < rows; ++row) + { + auto key = static_cast(current_index_size + row); + auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); }; + thread_pool->scheduleOrThrowOnError(task); } + thread_pool->wait(); } } From 7425d4aa1ae690810db444796ec4c0a4469d3e76 Mon Sep 17 00:00:00 2001 From: flynn Date: Wed, 11 Sep 2024 10:12:42 +0000 Subject: [PATCH 2/7] remove blank line --- src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp index ad166839ce3..6379c837281 100644 --- a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp @@ -17,7 +17,6 @@ #include #include - namespace ProfileEvents { extern const Event USearchAddCount; From 22c3b71196bc15c3229edae5f6abd7e59950bbe6 Mon Sep 17 00:00:00 2001 From: flynn Date: Thu, 12 Sep 2024 03:54:25 +0000 Subject: [PATCH 3/7] Make vector similarity index creation thread pool globally --- src/Core/ServerSettings.h | 3 ++- src/Interpreters/Context.cpp | 26 ++++++++++++++++++- src/Interpreters/Context.h | 2 ++ .../MergeTreeIndexVectorSimilarity.cpp | 21 +++------------ 4 files changed, 32 insertions(+), 20 deletions(-) diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index 79173503f28..2077967e5b6 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -50,7 +50,7 @@ namespace DB M(UInt32, asynchronous_heavy_metrics_update_period_s, 120, "Period in seconds for updating heavy asynchronous metrics.", 0) \ M(String, default_database, "default", "Default database name.", 0) \ M(String, tmp_policy, "", "Policy for storage with temporary data.", 0) \ - M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting., ", 0) \ + M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting.", 0) \ M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \ M(UInt64, aggregate_function_group_array_max_element_size, 0xFFFFFF, "Max array element size in bytes for groupArray function. This limit is checked at serialization and help to avoid large state size.", 0) \ M(GroupArrayActionWhenLimitReached, aggregate_function_group_array_action_when_limit_is_reached, GroupArrayActionWhenLimitReached::THROW, "Action to execute when max array element size is exceeded in groupArray: `throw` exception, or `discard` extra values", 0) \ @@ -65,6 +65,7 @@ namespace DB M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \ M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \ M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \ + M(UInt64, max_thread_pool_size_for_vector_similarity_index_creation, 16, "The maximum number of threads that could be allocated from the OS and used for vector similarity index creation(0 means all cores).", 0) \ \ /* Database Catalog */ \ M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 373cc91ebcb..051378ef6d4 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -120,7 +121,6 @@ #include #include - namespace fs = std::filesystem; namespace ProfileEvents @@ -168,6 +168,9 @@ namespace CurrentMetrics extern const Metric AttachedDictionary; extern const Metric AttachedDatabase; extern const Metric PartsActive; + extern const Metric UsearchUpdateThreads; + extern const Metric UsearchUpdateThreadsActive; + extern const Metric UsearchUpdateThreadsScheduled; } @@ -296,6 +299,8 @@ struct ContextSharedPart : boost::noncopyable mutable std::unique_ptr load_marks_threadpool; /// Threadpool for loading marks cache. mutable OnceFlag prefetch_threadpool_initialized; mutable std::unique_ptr prefetch_threadpool; /// Threadpool for loading marks cache. + mutable OnceFlag vector_similarity_index_creation_threadpool_initialized; + mutable std::unique_ptr vector_similarity_index_creation_threadpool; /// Threadpool for vector-similarity index creation. mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices. mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results. mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices. @@ -3095,6 +3100,25 @@ ThreadPool & Context::getLoadMarksThreadpool() const return *shared->load_marks_threadpool; } +ThreadPool & Context::getVectorSimilarityIndexCreationThreadPool() const +{ + callOnce( + shared->vector_similarity_index_creation_threadpool_initialized, + [&] + { + const auto & server_setting = getServerSettings(); + size_t max_thread_pool_size = server_setting.max_thread_pool_size_for_vector_similarity_index_creation > 0 + ? server_setting.max_thread_pool_size_for_vector_similarity_index_creation + : getNumberOfPhysicalCPUCores(); + shared->vector_similarity_index_creation_threadpool = std::make_unique( + CurrentMetrics::UsearchUpdateThreads, + CurrentMetrics::UsearchUpdateThreadsActive, + CurrentMetrics::UsearchUpdateThreadsScheduled, + max_thread_pool_size); + }); + return *shared->vector_similarity_index_creation_threadpool; +} + void Context::setIndexUncompressedCache(const String & cache_policy, size_t max_size_in_bytes, double size_ratio) { std::lock_guard lock(shared->mutex); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index fb5337158ba..e8f6ce9b3e1 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1096,6 +1096,8 @@ public: /// and make a prefetch by putting a read task to threadpoolReader. size_t getPrefetchThreadpoolSize() const; + ThreadPool & getVectorSimilarityIndexCreationThreadPool() const; + /// Settings for MergeTree background tasks stored in config.xml BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const; BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const; diff --git a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp index 6379c837281..e5ed19585f8 100644 --- a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -27,13 +26,6 @@ namespace ProfileEvents extern const Event USearchSearchComputedDistances; } -namespace CurrentMetrics -{ -extern const Metric UsearchUpdateThreads; -extern const Metric UsearchUpdateThreadsActive; -extern const Metric UsearchUpdateThreadsScheduled; -} - namespace DB { @@ -283,14 +275,7 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows))) throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index"); - size_t max_threads = Context::getGlobalContextInstance()->getSettingsRef().max_threads; - max_threads = max_threads > 0 ? max_threads : getNumberOfPhysicalCPUCores(); - - auto thread_pool = std::make_unique( - CurrentMetrics::UsearchUpdateThreads, - CurrentMetrics::UsearchUpdateThreadsActive, - CurrentMetrics::UsearchUpdateThreadsScheduled, - max_threads); + auto & thread_pool = Context::getGlobalContextInstance()->getVectorSimilarityIndexCreationThreadPool(); auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group) { @@ -316,9 +301,9 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c { auto key = static_cast(current_index_size + row); auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); }; - thread_pool->scheduleOrThrowOnError(task); + thread_pool.scheduleOrThrowOnError(task); } - thread_pool->wait(); + thread_pool.wait(); } } From fe5e061fffe3ef448dfc8d22b5f1236b09e036ca Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Thu, 12 Sep 2024 10:16:29 +0000 Subject: [PATCH 4/7] Some fixups --- .../mergetree-family/annindexes.md | 4 ++ .../settings.md | 8 ++++ src/Common/CurrentMetrics.cpp | 6 +-- src/Core/ServerSettings.h | 2 +- src/Interpreters/Context.cpp | 44 +++++++++---------- src/Interpreters/Context.h | 2 +- .../MergeTreeIndexVectorSimilarity.cpp | 29 ++++++------ 7 files changed, 53 insertions(+), 42 deletions(-) diff --git a/docs/en/engines/table-engines/mergetree-family/annindexes.md b/docs/en/engines/table-engines/mergetree-family/annindexes.md index 3c75b8dbef0..b73700c40f4 100644 --- a/docs/en/engines/table-engines/mergetree-family/annindexes.md +++ b/docs/en/engines/table-engines/mergetree-family/annindexes.md @@ -107,6 +107,10 @@ The vector similarity index currently does not work with per-table, non-default [here](https://github.com/ClickHouse/ClickHouse/pull/51325#issuecomment-1605920475)). If necessary, the value must be changed in config.xml. ::: +Vector index creation is known to be slow. To speed the process up, index creation can be parallelized. The maximum number of threads can be +configured using server configuration +setting [max_build_vector_similarity_index_thread_pool_size](server-configuration-parameters/settings.md#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size). + ANN indexes are built during column insertion and merge. As a result, `INSERT` and `OPTIMIZE` statements will be slower than for ordinary tables. ANNIndexes are ideally used only with immutable or rarely changed data, respectively when are far more read requests than write requests. diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md index ccc8cf017ca..14a23964100 100644 --- a/docs/en/operations/server-configuration-parameters/settings.md +++ b/docs/en/operations/server-configuration-parameters/settings.md @@ -491,6 +491,14 @@ Type: Double Default: 0.9 +## max_build_vector_similarity_index_thread_pool_size {#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size} + +The maximum number of threads to use for building vector indexes. 0 means all cores. + +Type: UInt64 + +Default: 16 + ## cgroups_memory_usage_observer_wait_time Interval in seconds during which the server's maximum allowed memory consumption is adjusted by the corresponding threshold in cgroups. (see diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 1d37b4761e1..658eaedbda1 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -178,9 +178,9 @@ M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \ M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \ M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \ - M(UsearchUpdateThreads, "Number of threads in the Aggregator thread pool.") \ - M(UsearchUpdateThreadsActive, "Number of threads in the Aggregator thread pool running a task.") \ - M(UsearchUpdateThreadsScheduled, "Number of queued or active jobs in the Aggregator thread pool.") \ + M(BuildVectorSimilarityIndexThreads, "Number of threads in the build vector similarity index thread pool.") \ + M(BuildVectorSimilarityIndexThreadsActive, "Number of threads in the build vector similarity index thread pool running a task.") \ + M(BuildVectorSimilarityIndexThreadsScheduled, "Number of queued or active jobs in the build vector similarity index thread pool.") \ \ M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \ M(DiskPlainRewritableLocalDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for LocalObjectStorage.") \ diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index b5071e6fa5d..689b18cb74f 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -65,7 +65,7 @@ namespace DB M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \ M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \ M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \ - M(UInt64, max_thread_pool_size_for_vector_similarity_index_creation, 16, "The maximum number of threads that could be allocated from the OS and used for vector similarity index creation(0 means all cores).", 0) \ + M(UInt64, max_build_vector_similarity_index_thread_pool_size, 16, "The maximum number of threads to use to build vector similarity indexes. 0 means all cores.", 0) \ \ /* Database Catalog */ \ M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 7350fdecd25..311fd094706 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -164,14 +164,14 @@ namespace CurrentMetrics extern const Metric TablesLoaderForegroundThreadsActive; extern const Metric TablesLoaderForegroundThreadsScheduled; extern const Metric IOWriterThreadsScheduled; + extern const Metric BuildVectorSimilarityIndexThreads; + extern const Metric BuildVectorSimilarityIndexThreadsActive; + extern const Metric BuildVectorSimilarityIndexThreadsScheduled; extern const Metric AttachedTable; extern const Metric AttachedView; extern const Metric AttachedDictionary; extern const Metric AttachedDatabase; extern const Metric PartsActive; - extern const Metric UsearchUpdateThreads; - extern const Metric UsearchUpdateThreadsActive; - extern const Metric UsearchUpdateThreadsScheduled; } @@ -300,8 +300,8 @@ struct ContextSharedPart : boost::noncopyable mutable std::unique_ptr load_marks_threadpool; /// Threadpool for loading marks cache. mutable OnceFlag prefetch_threadpool_initialized; mutable std::unique_ptr prefetch_threadpool; /// Threadpool for loading marks cache. - mutable OnceFlag vector_similarity_index_creation_threadpool_initialized; - mutable std::unique_ptr vector_similarity_index_creation_threadpool; /// Threadpool for vector-similarity index creation. + mutable OnceFlag build_vector_similarity_index_threadpool_initialized; + mutable std::unique_ptr build_vector_similarity_index_threadpool; /// Threadpool for vector-similarity index creation. mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices. mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results. mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices. @@ -3101,25 +3101,6 @@ ThreadPool & Context::getLoadMarksThreadpool() const return *shared->load_marks_threadpool; } -ThreadPool & Context::getVectorSimilarityIndexCreationThreadPool() const -{ - callOnce( - shared->vector_similarity_index_creation_threadpool_initialized, - [&] - { - const auto & server_setting = getServerSettings(); - size_t max_thread_pool_size = server_setting.max_thread_pool_size_for_vector_similarity_index_creation > 0 - ? server_setting.max_thread_pool_size_for_vector_similarity_index_creation - : getNumberOfPhysicalCPUCores(); - shared->vector_similarity_index_creation_threadpool = std::make_unique( - CurrentMetrics::UsearchUpdateThreads, - CurrentMetrics::UsearchUpdateThreadsActive, - CurrentMetrics::UsearchUpdateThreadsScheduled, - max_thread_pool_size); - }); - return *shared->vector_similarity_index_creation_threadpool; -} - void Context::setIndexUncompressedCache(const String & cache_policy, size_t max_size_in_bytes, double size_ratio) { std::lock_guard lock(shared->mutex); @@ -3321,6 +3302,21 @@ size_t Context::getPrefetchThreadpoolSize() const return config.getUInt(".prefetch_threadpool_pool_size", 100); } +ThreadPool & Context::getBuildVectorSimilarityIndexThreadPool() const +{ + callOnce(shared->build_vector_similarity_index_threadpool_initialized, [&] { + size_t pool_size = shared->server_settings.max_build_vector_similarity_index_thread_pool_size > 0 + ? shared->server_settings.max_build_vector_similarity_index_thread_pool_size + : getNumberOfPhysicalCPUCores(); + shared->build_vector_similarity_index_threadpool = std::make_unique( + CurrentMetrics::BuildVectorSimilarityIndexThreads, + CurrentMetrics::BuildVectorSimilarityIndexThreadsActive, + CurrentMetrics::BuildVectorSimilarityIndexThreadsScheduled, + pool_size); + }); + return *shared->build_vector_similarity_index_threadpool; +} + BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const { callOnce(shared->buffer_flush_schedule_pool_initialized, [&] { diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index d309b964dbd..0daef2243aa 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1097,7 +1097,7 @@ public: /// and make a prefetch by putting a read task to threadpoolReader. size_t getPrefetchThreadpoolSize() const; - ThreadPool & getVectorSimilarityIndexCreationThreadPool() const; + ThreadPool & getBuildVectorSimilarityIndexThreadPool() const; /// Settings for MergeTree background tasks stored in config.xml BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const; diff --git a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp index e5ed19585f8..8a850141a67 100644 --- a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp @@ -4,12 +4,10 @@ #include #include -#include #include #include #include #include -#include #include #include #include @@ -31,7 +29,6 @@ namespace DB namespace ErrorCodes { - extern const int CANNOT_ALLOCATE_MEMORY; extern const int FORMAT_VERSION_TOO_OLD; extern const int ILLEGAL_COLUMN; extern const int INCORRECT_DATA; @@ -133,8 +130,7 @@ void USearchIndexWithSerialization::deserialize(ReadBuffer & istr) /// See the comment in MergeTreeIndexGranuleVectorSimilarity::deserializeBinary why we throw here throw Exception(ErrorCodes::INCORRECT_DATA, "Could not load vector similarity index. Please drop the index and create it again. Error: {}", String(result.error.release())); - if (!try_reserve(limits())) - throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for usearch index"); + try_reserve(limits()); } USearchIndexWithSerialization::Statistics USearchIndexWithSerialization::getStatistics() const @@ -272,21 +268,27 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column with vector similarity index must have equal length"); /// Reserving space is mandatory - if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows))) - throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index"); + index->reserve(roundUpToPowerOfTwoOrZero(index->size() + rows)); - auto & thread_pool = Context::getGlobalContextInstance()->getVectorSimilarityIndexCreationThreadPool(); + /// Vector index creation is slooooow. Add the new rows in parallel. The threadpool is global to avoid oversubscription when multiple + /// indexes are build simultaneously (e.g. multiple merges run at the same time). + auto & thread_pool = Context::getGlobalContextInstance()->getBuildVectorSimilarityIndexThreadPool(); auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group) { - SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachFromGroupIfNotDetached();); + SCOPE_EXIT_SAFE( + if (thread_group) + CurrentThread::detachFromGroupIfNotDetached(); + ); if (thread_group) CurrentThread::attachToGroupIfDetached(thread_group); + /// add is thread-safe if (auto result = index->add(key, &column_array_data_float_data[column_array_offsets[row - 1]]); !result) - throw Exception( - ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release())); + { + throw Exception(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release())); + } else { ProfileEvents::increment(ProfileEvents::USearchAddCount); @@ -295,14 +297,15 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c } }; - size_t current_index_size = index->size(); + size_t index_size = index->size(); for (size_t row = 0; row < rows; ++row) { - auto key = static_cast(current_index_size + row); + auto key = static_cast(index_size + row); auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); }; thread_pool.scheduleOrThrowOnError(task); } + thread_pool.wait(); } From 38b5ea9066a8eae29222e26595957d022d2de26c Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Thu, 12 Sep 2024 12:43:27 +0000 Subject: [PATCH 5/7] Fix docs --- docs/en/engines/table-engines/mergetree-family/annindexes.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/engines/table-engines/mergetree-family/annindexes.md b/docs/en/engines/table-engines/mergetree-family/annindexes.md index b73700c40f4..f507e2b9f86 100644 --- a/docs/en/engines/table-engines/mergetree-family/annindexes.md +++ b/docs/en/engines/table-engines/mergetree-family/annindexes.md @@ -109,7 +109,7 @@ The vector similarity index currently does not work with per-table, non-default Vector index creation is known to be slow. To speed the process up, index creation can be parallelized. The maximum number of threads can be configured using server configuration -setting [max_build_vector_similarity_index_thread_pool_size](server-configuration-parameters/settings.md#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size). +setting [max_build_vector_similarity_index_thread_pool_size](../../../operations/server-configuration-parameters/settings.md#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size). ANN indexes are built during column insertion and merge. As a result, `INSERT` and `OPTIMIZE` statements will be slower than for ordinary tables. ANNIndexes are ideally used only with immutable or rarely changed data, respectively when are far more read requests than write From 9ca149a487beef10de0864f1381927ebf4514b76 Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Fri, 13 Sep 2024 11:07:09 +0000 Subject: [PATCH 6/7] Fix GWP-asan crash --- src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp index 8a850141a67..641770b16e9 100644 --- a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -268,7 +269,9 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column with vector similarity index must have equal length"); /// Reserving space is mandatory - index->reserve(roundUpToPowerOfTwoOrZero(index->size() + rows)); + size_t max_thread_pool_size = Context::getGlobalContextInstance()->getServerSettings().max_build_vector_similarity_index_thread_pool_size; + unum::usearch::index_limits_t limits(roundUpToPowerOfTwoOrZero(index->size() + rows), max_thread_pool_size); + index->reserve(limits); /// Vector index creation is slooooow. Add the new rows in parallel. The threadpool is global to avoid oversubscription when multiple /// indexes are build simultaneously (e.g. multiple merges run at the same time). From 37411bf240e7c94a0e9e07c645a7ec74d6758aab Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Sun, 15 Sep 2024 15:06:14 +0000 Subject: [PATCH 7/7] Fix sizing with unconstrained thread pool size --- src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp index 641770b16e9..bf9aad6545d 100644 --- a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -270,6 +271,8 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c /// Reserving space is mandatory size_t max_thread_pool_size = Context::getGlobalContextInstance()->getServerSettings().max_build_vector_similarity_index_thread_pool_size; + if (max_thread_pool_size == 0) + max_thread_pool_size = getNumberOfPhysicalCPUCores(); unum::usearch::index_limits_t limits(roundUpToPowerOfTwoOrZero(index->size() + rows), max_thread_pool_size); index->reserve(limits);