Make vector similarity index creation thread pool globally

This commit is contained in:
flynn 2024-09-12 03:54:25 +00:00
parent 7425d4aa1a
commit 22c3b71196
4 changed files with 32 additions and 20 deletions

View File

@ -50,7 +50,7 @@ namespace DB
M(UInt32, asynchronous_heavy_metrics_update_period_s, 120, "Period in seconds for updating heavy asynchronous metrics.", 0) \ M(UInt32, asynchronous_heavy_metrics_update_period_s, 120, "Period in seconds for updating heavy asynchronous metrics.", 0) \
M(String, default_database, "default", "Default database name.", 0) \ M(String, default_database, "default", "Default database name.", 0) \
M(String, tmp_policy, "", "Policy for storage with temporary data.", 0) \ M(String, tmp_policy, "", "Policy for storage with temporary data.", 0) \
M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting., ", 0) \ M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting.", 0) \
M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \ M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \
M(UInt64, aggregate_function_group_array_max_element_size, 0xFFFFFF, "Max array element size in bytes for groupArray function. This limit is checked at serialization and help to avoid large state size.", 0) \ M(UInt64, aggregate_function_group_array_max_element_size, 0xFFFFFF, "Max array element size in bytes for groupArray function. This limit is checked at serialization and help to avoid large state size.", 0) \
M(GroupArrayActionWhenLimitReached, aggregate_function_group_array_action_when_limit_is_reached, GroupArrayActionWhenLimitReached::THROW, "Action to execute when max array element size is exceeded in groupArray: `throw` exception, or `discard` extra values", 0) \ M(GroupArrayActionWhenLimitReached, aggregate_function_group_array_action_when_limit_is_reached, GroupArrayActionWhenLimitReached::THROW, "Action to execute when max array element size is exceeded in groupArray: `throw` exception, or `discard` extra values", 0) \
@ -65,6 +65,7 @@ namespace DB
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \ M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \ M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \
M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \ M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \
M(UInt64, max_thread_pool_size_for_vector_similarity_index_creation, 16, "The maximum number of threads that could be allocated from the OS and used for vector similarity index creation(0 means all cores).", 0) \
\ \
/* Database Catalog */ \ /* Database Catalog */ \
M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \ M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \

View File

@ -9,6 +9,7 @@
#include <Common/SensitiveDataMasker.h> #include <Common/SensitiveDataMasker.h>
#include <Common/Macros.h> #include <Common/Macros.h>
#include <Common/EventNotifier.h> #include <Common/EventNotifier.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/formatReadable.h> #include <Common/formatReadable.h>
#include <Common/Throttler.h> #include <Common/Throttler.h>
@ -120,7 +121,6 @@
#include <Interpreters/InterpreterSelectWithUnionQuery.h> #include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <base/defines.h> #include <base/defines.h>
namespace fs = std::filesystem; namespace fs = std::filesystem;
namespace ProfileEvents namespace ProfileEvents
@ -168,6 +168,9 @@ namespace CurrentMetrics
extern const Metric AttachedDictionary; extern const Metric AttachedDictionary;
extern const Metric AttachedDatabase; extern const Metric AttachedDatabase;
extern const Metric PartsActive; extern const Metric PartsActive;
extern const Metric UsearchUpdateThreads;
extern const Metric UsearchUpdateThreadsActive;
extern const Metric UsearchUpdateThreadsScheduled;
} }
@ -296,6 +299,8 @@ struct ContextSharedPart : boost::noncopyable
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache. mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
mutable OnceFlag prefetch_threadpool_initialized; mutable OnceFlag prefetch_threadpool_initialized;
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache. mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
mutable OnceFlag vector_similarity_index_creation_threadpool_initialized;
mutable std::unique_ptr<ThreadPool> vector_similarity_index_creation_threadpool; /// Threadpool for vector-similarity index creation.
mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices. mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices.
mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results. mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results.
mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices. mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices.
@ -3095,6 +3100,25 @@ ThreadPool & Context::getLoadMarksThreadpool() const
return *shared->load_marks_threadpool; return *shared->load_marks_threadpool;
} }
ThreadPool & Context::getVectorSimilarityIndexCreationThreadPool() const
{
callOnce(
shared->vector_similarity_index_creation_threadpool_initialized,
[&]
{
const auto & server_setting = getServerSettings();
size_t max_thread_pool_size = server_setting.max_thread_pool_size_for_vector_similarity_index_creation > 0
? server_setting.max_thread_pool_size_for_vector_similarity_index_creation
: getNumberOfPhysicalCPUCores();
shared->vector_similarity_index_creation_threadpool = std::make_unique<ThreadPool>(
CurrentMetrics::UsearchUpdateThreads,
CurrentMetrics::UsearchUpdateThreadsActive,
CurrentMetrics::UsearchUpdateThreadsScheduled,
max_thread_pool_size);
});
return *shared->vector_similarity_index_creation_threadpool;
}
void Context::setIndexUncompressedCache(const String & cache_policy, size_t max_size_in_bytes, double size_ratio) void Context::setIndexUncompressedCache(const String & cache_policy, size_t max_size_in_bytes, double size_ratio)
{ {
std::lock_guard lock(shared->mutex); std::lock_guard lock(shared->mutex);

View File

@ -1096,6 +1096,8 @@ public:
/// and make a prefetch by putting a read task to threadpoolReader. /// and make a prefetch by putting a read task to threadpoolReader.
size_t getPrefetchThreadpoolSize() const; size_t getPrefetchThreadpoolSize() const;
ThreadPool & getVectorSimilarityIndexCreationThreadPool() const;
/// Settings for MergeTree background tasks stored in config.xml /// Settings for MergeTree background tasks stored in config.xml
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const; BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;
BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const; BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const;

View File

@ -6,7 +6,6 @@
#include <Common/BitHelpers.h> #include <Common/BitHelpers.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/formatReadable.h> #include <Common/formatReadable.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Core/Field.h> #include <Core/Field.h>
@ -27,13 +26,6 @@ namespace ProfileEvents
extern const Event USearchSearchComputedDistances; extern const Event USearchSearchComputedDistances;
} }
namespace CurrentMetrics
{
extern const Metric UsearchUpdateThreads;
extern const Metric UsearchUpdateThreadsActive;
extern const Metric UsearchUpdateThreadsScheduled;
}
namespace DB namespace DB
{ {
@ -283,14 +275,7 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c
if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows))) if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows)))
throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index"); throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index");
size_t max_threads = Context::getGlobalContextInstance()->getSettingsRef().max_threads; auto & thread_pool = Context::getGlobalContextInstance()->getVectorSimilarityIndexCreationThreadPool();
max_threads = max_threads > 0 ? max_threads : getNumberOfPhysicalCPUCores();
auto thread_pool = std::make_unique<ThreadPool>(
CurrentMetrics::UsearchUpdateThreads,
CurrentMetrics::UsearchUpdateThreadsActive,
CurrentMetrics::UsearchUpdateThreadsScheduled,
max_threads);
auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group) auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group)
{ {
@ -316,9 +301,9 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c
{ {
auto key = static_cast<USearchIndex::vector_key_t>(current_index_size + row); auto key = static_cast<USearchIndex::vector_key_t>(current_index_size + row);
auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); }; auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); };
thread_pool->scheduleOrThrowOnError(task); thread_pool.scheduleOrThrowOnError(task);
} }
thread_pool->wait(); thread_pool.wait();
} }
} }