Speedup insert data with vector similarity index by add data to index parallel

This commit is contained in:
flynn 2024-09-11 09:31:46 +00:00
parent 26f21a0d94
commit cf12e3924f
2 changed files with 42 additions and 3 deletions

View File

@ -178,6 +178,9 @@
M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \ M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \
M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \ M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \
M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \ M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \
M(UsearchUpdateThreads, "Number of threads in the Aggregator thread pool.") \
M(UsearchUpdateThreadsActive, "Number of threads in the Aggregator thread pool running a task.") \
M(UsearchUpdateThreadsScheduled, "Number of queued or active jobs in the Aggregator thread pool.") \
\ \
M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \ M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \
M(DiskPlainRewritableLocalDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for LocalObjectStorage.") \ M(DiskPlainRewritableLocalDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for LocalObjectStorage.") \

View File

@ -4,16 +4,20 @@
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
#include <Common/BitHelpers.h> #include <Common/BitHelpers.h>
#include <Common/ThreadPool.h>
#include <Common/formatReadable.h> #include <Common/formatReadable.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Core/Field.h> #include <Core/Field.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/castColumn.h> #include <Interpreters/castColumn.h>
namespace ProfileEvents namespace ProfileEvents
{ {
extern const Event USearchAddCount; extern const Event USearchAddCount;
@ -24,6 +28,13 @@ namespace ProfileEvents
extern const Event USearchSearchComputedDistances; extern const Event USearchSearchComputedDistances;
} }
namespace CurrentMetrics
{
extern const Metric UsearchUpdateThreads;
extern const Metric UsearchUpdateThreadsActive;
extern const Metric UsearchUpdateThreadsScheduled;
}
namespace DB namespace DB
{ {
@ -273,17 +284,42 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c
if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows))) if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows)))
throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index"); throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index");
for (size_t row = 0; row < rows; ++row) size_t max_threads = Context::getGlobalContextInstance()->getSettingsRef().max_threads;
max_threads = max_threads > 0 ? max_threads : getNumberOfPhysicalCPUCores();
auto thread_pool = std::make_unique<ThreadPool>(
CurrentMetrics::UsearchUpdateThreads,
CurrentMetrics::UsearchUpdateThreadsActive,
CurrentMetrics::UsearchUpdateThreadsScheduled,
max_threads);
auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group)
{ {
if (auto result = index->add(static_cast<USearchIndex::vector_key_t>(index->size()), &column_array_data_float_data[column_array_offsets[row - 1]]); !result) SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachFromGroupIfNotDetached(););
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release()));
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
if (auto result = index->add(key, &column_array_data_float_data[column_array_offsets[row - 1]]); !result)
throw Exception(
ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release()));
else else
{ {
ProfileEvents::increment(ProfileEvents::USearchAddCount); ProfileEvents::increment(ProfileEvents::USearchAddCount);
ProfileEvents::increment(ProfileEvents::USearchAddVisitedMembers, result.visited_members); ProfileEvents::increment(ProfileEvents::USearchAddVisitedMembers, result.visited_members);
ProfileEvents::increment(ProfileEvents::USearchAddComputedDistances, result.computed_distances); ProfileEvents::increment(ProfileEvents::USearchAddComputedDistances, result.computed_distances);
} }
};
size_t current_index_size = index->size();
for (size_t row = 0; row < rows; ++row)
{
auto key = static_cast<USearchIndex::vector_key_t>(current_index_size + row);
auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); };
thread_pool->scheduleOrThrowOnError(task);
} }
thread_pool->wait();
} }
} }