mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-09 17:14:47 +00:00
Merge pull request #69493 from ucasfl/vector-index-insert
Speedup insert performance of vector similarity index by parallelization
This commit is contained in:
commit
1a4c7b7c61
@ -107,6 +107,10 @@ The vector similarity index currently does not work with per-table, non-default
|
|||||||
[here](https://github.com/ClickHouse/ClickHouse/pull/51325#issuecomment-1605920475)). If necessary, the value must be changed in config.xml.
|
[here](https://github.com/ClickHouse/ClickHouse/pull/51325#issuecomment-1605920475)). If necessary, the value must be changed in config.xml.
|
||||||
:::
|
:::
|
||||||
|
|
||||||
|
Vector index creation is known to be slow. To speed the process up, index creation can be parallelized. The maximum number of threads can be
|
||||||
|
configured using server configuration
|
||||||
|
setting [max_build_vector_similarity_index_thread_pool_size](../../../operations/server-configuration-parameters/settings.md#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size).
|
||||||
|
|
||||||
ANN indexes are built during column insertion and merge. As a result, `INSERT` and `OPTIMIZE` statements will be slower than for ordinary
|
ANN indexes are built during column insertion and merge. As a result, `INSERT` and `OPTIMIZE` statements will be slower than for ordinary
|
||||||
tables. ANNIndexes are ideally used only with immutable or rarely changed data, respectively when are far more read requests than write
|
tables. ANNIndexes are ideally used only with immutable or rarely changed data, respectively when are far more read requests than write
|
||||||
requests.
|
requests.
|
||||||
|
@ -491,6 +491,14 @@ Type: Double
|
|||||||
|
|
||||||
Default: 0.9
|
Default: 0.9
|
||||||
|
|
||||||
|
## max_build_vector_similarity_index_thread_pool_size {#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size}
|
||||||
|
|
||||||
|
The maximum number of threads to use for building vector indexes. 0 means all cores.
|
||||||
|
|
||||||
|
Type: UInt64
|
||||||
|
|
||||||
|
Default: 16
|
||||||
|
|
||||||
## cgroups_memory_usage_observer_wait_time
|
## cgroups_memory_usage_observer_wait_time
|
||||||
|
|
||||||
Interval in seconds during which the server's maximum allowed memory consumption is adjusted by the corresponding threshold in cgroups. (see
|
Interval in seconds during which the server's maximum allowed memory consumption is adjusted by the corresponding threshold in cgroups. (see
|
||||||
|
@ -178,6 +178,9 @@
|
|||||||
M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \
|
M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \
|
||||||
M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \
|
M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \
|
||||||
M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \
|
M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \
|
||||||
|
M(BuildVectorSimilarityIndexThreads, "Number of threads in the build vector similarity index thread pool.") \
|
||||||
|
M(BuildVectorSimilarityIndexThreadsActive, "Number of threads in the build vector similarity index thread pool running a task.") \
|
||||||
|
M(BuildVectorSimilarityIndexThreadsScheduled, "Number of queued or active jobs in the build vector similarity index thread pool.") \
|
||||||
\
|
\
|
||||||
M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \
|
M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \
|
||||||
M(DiskPlainRewritableLocalDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for LocalObjectStorage.") \
|
M(DiskPlainRewritableLocalDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for LocalObjectStorage.") \
|
||||||
|
@ -50,7 +50,7 @@ namespace DB
|
|||||||
M(UInt32, asynchronous_heavy_metrics_update_period_s, 120, "Period in seconds for updating heavy asynchronous metrics.", 0) \
|
M(UInt32, asynchronous_heavy_metrics_update_period_s, 120, "Period in seconds for updating heavy asynchronous metrics.", 0) \
|
||||||
M(String, default_database, "default", "Default database name.", 0) \
|
M(String, default_database, "default", "Default database name.", 0) \
|
||||||
M(String, tmp_policy, "", "Policy for storage with temporary data.", 0) \
|
M(String, tmp_policy, "", "Policy for storage with temporary data.", 0) \
|
||||||
M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting., ", 0) \
|
M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting.", 0) \
|
||||||
M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \
|
M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \
|
||||||
M(UInt64, aggregate_function_group_array_max_element_size, 0xFFFFFF, "Max array element size in bytes for groupArray function. This limit is checked at serialization and help to avoid large state size.", 0) \
|
M(UInt64, aggregate_function_group_array_max_element_size, 0xFFFFFF, "Max array element size in bytes for groupArray function. This limit is checked at serialization and help to avoid large state size.", 0) \
|
||||||
M(GroupArrayActionWhenLimitReached, aggregate_function_group_array_action_when_limit_is_reached, GroupArrayActionWhenLimitReached::THROW, "Action to execute when max array element size is exceeded in groupArray: `throw` exception, or `discard` extra values", 0) \
|
M(GroupArrayActionWhenLimitReached, aggregate_function_group_array_action_when_limit_is_reached, GroupArrayActionWhenLimitReached::THROW, "Action to execute when max array element size is exceeded in groupArray: `throw` exception, or `discard` extra values", 0) \
|
||||||
@ -65,6 +65,7 @@ namespace DB
|
|||||||
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
|
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
|
||||||
M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \
|
M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \
|
||||||
M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \
|
M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \
|
||||||
|
M(UInt64, max_build_vector_similarity_index_thread_pool_size, 16, "The maximum number of threads to use to build vector similarity indexes. 0 means all cores.", 0) \
|
||||||
\
|
\
|
||||||
/* Database Catalog */ \
|
/* Database Catalog */ \
|
||||||
M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \
|
M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \
|
||||||
|
@ -10,6 +10,7 @@
|
|||||||
#include <Common/SensitiveDataMasker.h>
|
#include <Common/SensitiveDataMasker.h>
|
||||||
#include <Common/Macros.h>
|
#include <Common/Macros.h>
|
||||||
#include <Common/EventNotifier.h>
|
#include <Common/EventNotifier.h>
|
||||||
|
#include <Common/getNumberOfPhysicalCPUCores.h>
|
||||||
#include <Common/Stopwatch.h>
|
#include <Common/Stopwatch.h>
|
||||||
#include <Common/formatReadable.h>
|
#include <Common/formatReadable.h>
|
||||||
#include <Common/Throttler.h>
|
#include <Common/Throttler.h>
|
||||||
@ -121,7 +122,6 @@
|
|||||||
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
|
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
|
||||||
#include <base/defines.h>
|
#include <base/defines.h>
|
||||||
|
|
||||||
|
|
||||||
namespace fs = std::filesystem;
|
namespace fs = std::filesystem;
|
||||||
|
|
||||||
namespace ProfileEvents
|
namespace ProfileEvents
|
||||||
@ -164,6 +164,9 @@ namespace CurrentMetrics
|
|||||||
extern const Metric TablesLoaderForegroundThreadsActive;
|
extern const Metric TablesLoaderForegroundThreadsActive;
|
||||||
extern const Metric TablesLoaderForegroundThreadsScheduled;
|
extern const Metric TablesLoaderForegroundThreadsScheduled;
|
||||||
extern const Metric IOWriterThreadsScheduled;
|
extern const Metric IOWriterThreadsScheduled;
|
||||||
|
extern const Metric BuildVectorSimilarityIndexThreads;
|
||||||
|
extern const Metric BuildVectorSimilarityIndexThreadsActive;
|
||||||
|
extern const Metric BuildVectorSimilarityIndexThreadsScheduled;
|
||||||
extern const Metric AttachedTable;
|
extern const Metric AttachedTable;
|
||||||
extern const Metric AttachedView;
|
extern const Metric AttachedView;
|
||||||
extern const Metric AttachedDictionary;
|
extern const Metric AttachedDictionary;
|
||||||
@ -297,6 +300,8 @@ struct ContextSharedPart : boost::noncopyable
|
|||||||
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
|
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
|
||||||
mutable OnceFlag prefetch_threadpool_initialized;
|
mutable OnceFlag prefetch_threadpool_initialized;
|
||||||
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
|
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
|
||||||
|
mutable OnceFlag build_vector_similarity_index_threadpool_initialized;
|
||||||
|
mutable std::unique_ptr<ThreadPool> build_vector_similarity_index_threadpool; /// Threadpool for vector-similarity index creation.
|
||||||
mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices.
|
mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices.
|
||||||
mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results.
|
mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results.
|
||||||
mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices.
|
mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices.
|
||||||
@ -3297,6 +3302,21 @@ size_t Context::getPrefetchThreadpoolSize() const
|
|||||||
return config.getUInt(".prefetch_threadpool_pool_size", 100);
|
return config.getUInt(".prefetch_threadpool_pool_size", 100);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ThreadPool & Context::getBuildVectorSimilarityIndexThreadPool() const
|
||||||
|
{
|
||||||
|
callOnce(shared->build_vector_similarity_index_threadpool_initialized, [&] {
|
||||||
|
size_t pool_size = shared->server_settings.max_build_vector_similarity_index_thread_pool_size > 0
|
||||||
|
? shared->server_settings.max_build_vector_similarity_index_thread_pool_size
|
||||||
|
: getNumberOfPhysicalCPUCores();
|
||||||
|
shared->build_vector_similarity_index_threadpool = std::make_unique<ThreadPool>(
|
||||||
|
CurrentMetrics::BuildVectorSimilarityIndexThreads,
|
||||||
|
CurrentMetrics::BuildVectorSimilarityIndexThreadsActive,
|
||||||
|
CurrentMetrics::BuildVectorSimilarityIndexThreadsScheduled,
|
||||||
|
pool_size);
|
||||||
|
});
|
||||||
|
return *shared->build_vector_similarity_index_threadpool;
|
||||||
|
}
|
||||||
|
|
||||||
BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
|
BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
|
||||||
{
|
{
|
||||||
callOnce(shared->buffer_flush_schedule_pool_initialized, [&] {
|
callOnce(shared->buffer_flush_schedule_pool_initialized, [&] {
|
||||||
|
@ -1097,6 +1097,8 @@ public:
|
|||||||
/// and make a prefetch by putting a read task to threadpoolReader.
|
/// and make a prefetch by putting a read task to threadpoolReader.
|
||||||
size_t getPrefetchThreadpoolSize() const;
|
size_t getPrefetchThreadpoolSize() const;
|
||||||
|
|
||||||
|
ThreadPool & getBuildVectorSimilarityIndexThreadPool() const;
|
||||||
|
|
||||||
/// Settings for MergeTree background tasks stored in config.xml
|
/// Settings for MergeTree background tasks stored in config.xml
|
||||||
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;
|
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;
|
||||||
BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const;
|
BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const;
|
||||||
|
@ -5,9 +5,11 @@
|
|||||||
#include <Columns/ColumnArray.h>
|
#include <Columns/ColumnArray.h>
|
||||||
#include <Common/BitHelpers.h>
|
#include <Common/BitHelpers.h>
|
||||||
#include <Common/formatReadable.h>
|
#include <Common/formatReadable.h>
|
||||||
|
#include <Common/getNumberOfPhysicalCPUCores.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
#include <Common/typeid_cast.h>
|
#include <Common/typeid_cast.h>
|
||||||
#include <Core/Field.h>
|
#include <Core/Field.h>
|
||||||
|
#include <Core/ServerSettings.h>
|
||||||
#include <DataTypes/DataTypeArray.h>
|
#include <DataTypes/DataTypeArray.h>
|
||||||
#include <IO/ReadHelpers.h>
|
#include <IO/ReadHelpers.h>
|
||||||
#include <IO/WriteHelpers.h>
|
#include <IO/WriteHelpers.h>
|
||||||
@ -29,7 +31,6 @@ namespace DB
|
|||||||
|
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int CANNOT_ALLOCATE_MEMORY;
|
|
||||||
extern const int FORMAT_VERSION_TOO_OLD;
|
extern const int FORMAT_VERSION_TOO_OLD;
|
||||||
extern const int ILLEGAL_COLUMN;
|
extern const int ILLEGAL_COLUMN;
|
||||||
extern const int INCORRECT_DATA;
|
extern const int INCORRECT_DATA;
|
||||||
@ -131,8 +132,7 @@ void USearchIndexWithSerialization::deserialize(ReadBuffer & istr)
|
|||||||
/// See the comment in MergeTreeIndexGranuleVectorSimilarity::deserializeBinary why we throw here
|
/// See the comment in MergeTreeIndexGranuleVectorSimilarity::deserializeBinary why we throw here
|
||||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not load vector similarity index. Please drop the index and create it again. Error: {}", String(result.error.release()));
|
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not load vector similarity index. Please drop the index and create it again. Error: {}", String(result.error.release()));
|
||||||
|
|
||||||
if (!try_reserve(limits()))
|
try_reserve(limits());
|
||||||
throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for usearch index");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
USearchIndexWithSerialization::Statistics USearchIndexWithSerialization::getStatistics() const
|
USearchIndexWithSerialization::Statistics USearchIndexWithSerialization::getStatistics() const
|
||||||
@ -270,20 +270,49 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c
|
|||||||
throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column with vector similarity index must have equal length");
|
throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column with vector similarity index must have equal length");
|
||||||
|
|
||||||
/// Reserving space is mandatory
|
/// Reserving space is mandatory
|
||||||
if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows)))
|
size_t max_thread_pool_size = Context::getGlobalContextInstance()->getServerSettings().max_build_vector_similarity_index_thread_pool_size;
|
||||||
throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index");
|
if (max_thread_pool_size == 0)
|
||||||
|
max_thread_pool_size = getNumberOfPhysicalCPUCores();
|
||||||
|
unum::usearch::index_limits_t limits(roundUpToPowerOfTwoOrZero(index->size() + rows), max_thread_pool_size);
|
||||||
|
index->reserve(limits);
|
||||||
|
|
||||||
for (size_t row = 0; row < rows; ++row)
|
/// Vector index creation is slooooow. Add the new rows in parallel. The threadpool is global to avoid oversubscription when multiple
|
||||||
|
/// indexes are build simultaneously (e.g. multiple merges run at the same time).
|
||||||
|
auto & thread_pool = Context::getGlobalContextInstance()->getBuildVectorSimilarityIndexThreadPool();
|
||||||
|
|
||||||
|
auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group)
|
||||||
|
{
|
||||||
|
SCOPE_EXIT_SAFE(
|
||||||
|
if (thread_group)
|
||||||
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
|
);
|
||||||
|
|
||||||
|
if (thread_group)
|
||||||
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
|
|
||||||
|
/// add is thread-safe
|
||||||
|
if (auto result = index->add(key, &column_array_data_float_data[column_array_offsets[row - 1]]); !result)
|
||||||
{
|
{
|
||||||
if (auto result = index->add(static_cast<USearchIndex::vector_key_t>(index->size()), &column_array_data_float_data[column_array_offsets[row - 1]]); !result)
|
|
||||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release()));
|
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release()));
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ProfileEvents::increment(ProfileEvents::USearchAddCount);
|
ProfileEvents::increment(ProfileEvents::USearchAddCount);
|
||||||
ProfileEvents::increment(ProfileEvents::USearchAddVisitedMembers, result.visited_members);
|
ProfileEvents::increment(ProfileEvents::USearchAddVisitedMembers, result.visited_members);
|
||||||
ProfileEvents::increment(ProfileEvents::USearchAddComputedDistances, result.computed_distances);
|
ProfileEvents::increment(ProfileEvents::USearchAddComputedDistances, result.computed_distances);
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
size_t index_size = index->size();
|
||||||
|
|
||||||
|
for (size_t row = 0; row < rows; ++row)
|
||||||
|
{
|
||||||
|
auto key = static_cast<USearchIndex::vector_key_t>(index_size + row);
|
||||||
|
auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); };
|
||||||
|
thread_pool.scheduleOrThrowOnError(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
thread_pool.wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user