From cf12e3924f51fd08c884519352a46495412a771a Mon Sep 17 00:00:00 2001 From: flynn Date: Wed, 11 Sep 2024 09:31:46 +0000 Subject: [PATCH] Speedup insert data with vector similarity index by add data to index parallel --- src/Common/CurrentMetrics.cpp | 3 ++ .../MergeTreeIndexVectorSimilarity.cpp | 42 +++++++++++++++++-- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 4bf2b0704f1..1d37b4761e1 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -178,6 +178,9 @@ M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \ M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \ M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \ + M(UsearchUpdateThreads, "Number of threads in the Aggregator thread pool.") \ + M(UsearchUpdateThreadsActive, "Number of threads in the Aggregator thread pool running a task.") \ + M(UsearchUpdateThreadsScheduled, "Number of queued or active jobs in the Aggregator thread pool.") \ \ M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \ M(DiskPlainRewritableLocalDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for LocalObjectStorage.") \ diff --git a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp index 58892d0dbf2..ad166839ce3 100644 --- a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp @@ -4,16 +4,20 @@ #include #include +#include #include +#include #include #include #include +#include #include #include #include #include #include + namespace ProfileEvents { extern const Event USearchAddCount; @@ -24,6 +28,13 @@ namespace ProfileEvents extern const Event USearchSearchComputedDistances; } +namespace CurrentMetrics +{ +extern const Metric UsearchUpdateThreads; +extern const Metric UsearchUpdateThreadsActive; +extern const Metric UsearchUpdateThreadsScheduled; +} + namespace DB { @@ -273,17 +284,42 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows))) throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index"); - for (size_t row = 0; row < rows; ++row) + size_t max_threads = Context::getGlobalContextInstance()->getSettingsRef().max_threads; + max_threads = max_threads > 0 ? max_threads : getNumberOfPhysicalCPUCores(); + + auto thread_pool = std::make_unique( + CurrentMetrics::UsearchUpdateThreads, + CurrentMetrics::UsearchUpdateThreadsActive, + CurrentMetrics::UsearchUpdateThreadsScheduled, + max_threads); + + auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group) { - if (auto result = index->add(static_cast(index->size()), &column_array_data_float_data[column_array_offsets[row - 1]]); !result) - throw Exception(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release())); + SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachFromGroupIfNotDetached();); + + if (thread_group) + CurrentThread::attachToGroupIfDetached(thread_group); + + if (auto result = index->add(key, &column_array_data_float_data[column_array_offsets[row - 1]]); !result) + throw Exception( + ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release())); else { ProfileEvents::increment(ProfileEvents::USearchAddCount); ProfileEvents::increment(ProfileEvents::USearchAddVisitedMembers, result.visited_members); ProfileEvents::increment(ProfileEvents::USearchAddComputedDistances, result.computed_distances); } + }; + + size_t current_index_size = index->size(); + + for (size_t row = 0; row < rows; ++row) + { + auto key = static_cast(current_index_size + row); + auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); }; + thread_pool->scheduleOrThrowOnError(task); } + thread_pool->wait(); } }