mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Some fixups
This commit is contained in:
parent
f6b965872f
commit
fe5e061fff
@ -107,6 +107,10 @@ The vector similarity index currently does not work with per-table, non-default
|
|||||||
[here](https://github.com/ClickHouse/ClickHouse/pull/51325#issuecomment-1605920475)). If necessary, the value must be changed in config.xml.
|
[here](https://github.com/ClickHouse/ClickHouse/pull/51325#issuecomment-1605920475)). If necessary, the value must be changed in config.xml.
|
||||||
:::
|
:::
|
||||||
|
|
||||||
|
Vector index creation is known to be slow. To speed the process up, index creation can be parallelized. The maximum number of threads can be
|
||||||
|
configured using server configuration
|
||||||
|
setting [max_build_vector_similarity_index_thread_pool_size](server-configuration-parameters/settings.md#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size).
|
||||||
|
|
||||||
ANN indexes are built during column insertion and merge. As a result, `INSERT` and `OPTIMIZE` statements will be slower than for ordinary
|
ANN indexes are built during column insertion and merge. As a result, `INSERT` and `OPTIMIZE` statements will be slower than for ordinary
|
||||||
tables. ANNIndexes are ideally used only with immutable or rarely changed data, respectively when are far more read requests than write
|
tables. ANNIndexes are ideally used only with immutable or rarely changed data, respectively when are far more read requests than write
|
||||||
requests.
|
requests.
|
||||||
|
@ -491,6 +491,14 @@ Type: Double
|
|||||||
|
|
||||||
Default: 0.9
|
Default: 0.9
|
||||||
|
|
||||||
|
## max_build_vector_similarity_index_thread_pool_size {#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size}
|
||||||
|
|
||||||
|
The maximum number of threads to use for building vector indexes. 0 means all cores.
|
||||||
|
|
||||||
|
Type: UInt64
|
||||||
|
|
||||||
|
Default: 16
|
||||||
|
|
||||||
## cgroups_memory_usage_observer_wait_time
|
## cgroups_memory_usage_observer_wait_time
|
||||||
|
|
||||||
Interval in seconds during which the server's maximum allowed memory consumption is adjusted by the corresponding threshold in cgroups. (see
|
Interval in seconds during which the server's maximum allowed memory consumption is adjusted by the corresponding threshold in cgroups. (see
|
||||||
|
@ -178,9 +178,9 @@
|
|||||||
M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \
|
M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \
|
||||||
M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \
|
M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \
|
||||||
M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \
|
M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \
|
||||||
M(UsearchUpdateThreads, "Number of threads in the Aggregator thread pool.") \
|
M(BuildVectorSimilarityIndexThreads, "Number of threads in the build vector similarity index thread pool.") \
|
||||||
M(UsearchUpdateThreadsActive, "Number of threads in the Aggregator thread pool running a task.") \
|
M(BuildVectorSimilarityIndexThreadsActive, "Number of threads in the build vector similarity index thread pool running a task.") \
|
||||||
M(UsearchUpdateThreadsScheduled, "Number of queued or active jobs in the Aggregator thread pool.") \
|
M(BuildVectorSimilarityIndexThreadsScheduled, "Number of queued or active jobs in the build vector similarity index thread pool.") \
|
||||||
\
|
\
|
||||||
M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \
|
M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \
|
||||||
M(DiskPlainRewritableLocalDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for LocalObjectStorage.") \
|
M(DiskPlainRewritableLocalDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for LocalObjectStorage.") \
|
||||||
|
@ -65,7 +65,7 @@ namespace DB
|
|||||||
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
|
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
|
||||||
M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \
|
M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \
|
||||||
M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \
|
M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \
|
||||||
M(UInt64, max_thread_pool_size_for_vector_similarity_index_creation, 16, "The maximum number of threads that could be allocated from the OS and used for vector similarity index creation(0 means all cores).", 0) \
|
M(UInt64, max_build_vector_similarity_index_thread_pool_size, 16, "The maximum number of threads to use to build vector similarity indexes. 0 means all cores.", 0) \
|
||||||
\
|
\
|
||||||
/* Database Catalog */ \
|
/* Database Catalog */ \
|
||||||
M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \
|
M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \
|
||||||
|
@ -164,14 +164,14 @@ namespace CurrentMetrics
|
|||||||
extern const Metric TablesLoaderForegroundThreadsActive;
|
extern const Metric TablesLoaderForegroundThreadsActive;
|
||||||
extern const Metric TablesLoaderForegroundThreadsScheduled;
|
extern const Metric TablesLoaderForegroundThreadsScheduled;
|
||||||
extern const Metric IOWriterThreadsScheduled;
|
extern const Metric IOWriterThreadsScheduled;
|
||||||
|
extern const Metric BuildVectorSimilarityIndexThreads;
|
||||||
|
extern const Metric BuildVectorSimilarityIndexThreadsActive;
|
||||||
|
extern const Metric BuildVectorSimilarityIndexThreadsScheduled;
|
||||||
extern const Metric AttachedTable;
|
extern const Metric AttachedTable;
|
||||||
extern const Metric AttachedView;
|
extern const Metric AttachedView;
|
||||||
extern const Metric AttachedDictionary;
|
extern const Metric AttachedDictionary;
|
||||||
extern const Metric AttachedDatabase;
|
extern const Metric AttachedDatabase;
|
||||||
extern const Metric PartsActive;
|
extern const Metric PartsActive;
|
||||||
extern const Metric UsearchUpdateThreads;
|
|
||||||
extern const Metric UsearchUpdateThreadsActive;
|
|
||||||
extern const Metric UsearchUpdateThreadsScheduled;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -300,8 +300,8 @@ struct ContextSharedPart : boost::noncopyable
|
|||||||
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
|
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
|
||||||
mutable OnceFlag prefetch_threadpool_initialized;
|
mutable OnceFlag prefetch_threadpool_initialized;
|
||||||
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
|
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
|
||||||
mutable OnceFlag vector_similarity_index_creation_threadpool_initialized;
|
mutable OnceFlag build_vector_similarity_index_threadpool_initialized;
|
||||||
mutable std::unique_ptr<ThreadPool> vector_similarity_index_creation_threadpool; /// Threadpool for vector-similarity index creation.
|
mutable std::unique_ptr<ThreadPool> build_vector_similarity_index_threadpool; /// Threadpool for vector-similarity index creation.
|
||||||
mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices.
|
mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices.
|
||||||
mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results.
|
mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results.
|
||||||
mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices.
|
mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices.
|
||||||
@ -3101,25 +3101,6 @@ ThreadPool & Context::getLoadMarksThreadpool() const
|
|||||||
return *shared->load_marks_threadpool;
|
return *shared->load_marks_threadpool;
|
||||||
}
|
}
|
||||||
|
|
||||||
ThreadPool & Context::getVectorSimilarityIndexCreationThreadPool() const
|
|
||||||
{
|
|
||||||
callOnce(
|
|
||||||
shared->vector_similarity_index_creation_threadpool_initialized,
|
|
||||||
[&]
|
|
||||||
{
|
|
||||||
const auto & server_setting = getServerSettings();
|
|
||||||
size_t max_thread_pool_size = server_setting.max_thread_pool_size_for_vector_similarity_index_creation > 0
|
|
||||||
? server_setting.max_thread_pool_size_for_vector_similarity_index_creation
|
|
||||||
: getNumberOfPhysicalCPUCores();
|
|
||||||
shared->vector_similarity_index_creation_threadpool = std::make_unique<ThreadPool>(
|
|
||||||
CurrentMetrics::UsearchUpdateThreads,
|
|
||||||
CurrentMetrics::UsearchUpdateThreadsActive,
|
|
||||||
CurrentMetrics::UsearchUpdateThreadsScheduled,
|
|
||||||
max_thread_pool_size);
|
|
||||||
});
|
|
||||||
return *shared->vector_similarity_index_creation_threadpool;
|
|
||||||
}
|
|
||||||
|
|
||||||
void Context::setIndexUncompressedCache(const String & cache_policy, size_t max_size_in_bytes, double size_ratio)
|
void Context::setIndexUncompressedCache(const String & cache_policy, size_t max_size_in_bytes, double size_ratio)
|
||||||
{
|
{
|
||||||
std::lock_guard lock(shared->mutex);
|
std::lock_guard lock(shared->mutex);
|
||||||
@ -3321,6 +3302,21 @@ size_t Context::getPrefetchThreadpoolSize() const
|
|||||||
return config.getUInt(".prefetch_threadpool_pool_size", 100);
|
return config.getUInt(".prefetch_threadpool_pool_size", 100);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ThreadPool & Context::getBuildVectorSimilarityIndexThreadPool() const
|
||||||
|
{
|
||||||
|
callOnce(shared->build_vector_similarity_index_threadpool_initialized, [&] {
|
||||||
|
size_t pool_size = shared->server_settings.max_build_vector_similarity_index_thread_pool_size > 0
|
||||||
|
? shared->server_settings.max_build_vector_similarity_index_thread_pool_size
|
||||||
|
: getNumberOfPhysicalCPUCores();
|
||||||
|
shared->build_vector_similarity_index_threadpool = std::make_unique<ThreadPool>(
|
||||||
|
CurrentMetrics::BuildVectorSimilarityIndexThreads,
|
||||||
|
CurrentMetrics::BuildVectorSimilarityIndexThreadsActive,
|
||||||
|
CurrentMetrics::BuildVectorSimilarityIndexThreadsScheduled,
|
||||||
|
pool_size);
|
||||||
|
});
|
||||||
|
return *shared->build_vector_similarity_index_threadpool;
|
||||||
|
}
|
||||||
|
|
||||||
BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
|
BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
|
||||||
{
|
{
|
||||||
callOnce(shared->buffer_flush_schedule_pool_initialized, [&] {
|
callOnce(shared->buffer_flush_schedule_pool_initialized, [&] {
|
||||||
|
@ -1097,7 +1097,7 @@ public:
|
|||||||
/// and make a prefetch by putting a read task to threadpoolReader.
|
/// and make a prefetch by putting a read task to threadpoolReader.
|
||||||
size_t getPrefetchThreadpoolSize() const;
|
size_t getPrefetchThreadpoolSize() const;
|
||||||
|
|
||||||
ThreadPool & getVectorSimilarityIndexCreationThreadPool() const;
|
ThreadPool & getBuildVectorSimilarityIndexThreadPool() const;
|
||||||
|
|
||||||
/// Settings for MergeTree background tasks stored in config.xml
|
/// Settings for MergeTree background tasks stored in config.xml
|
||||||
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;
|
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;
|
||||||
|
@ -4,12 +4,10 @@
|
|||||||
|
|
||||||
#include <Columns/ColumnArray.h>
|
#include <Columns/ColumnArray.h>
|
||||||
#include <Common/BitHelpers.h>
|
#include <Common/BitHelpers.h>
|
||||||
#include <Common/ThreadPool.h>
|
|
||||||
#include <Common/formatReadable.h>
|
#include <Common/formatReadable.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
#include <Common/typeid_cast.h>
|
#include <Common/typeid_cast.h>
|
||||||
#include <Core/Field.h>
|
#include <Core/Field.h>
|
||||||
#include <Core/Settings.h>
|
|
||||||
#include <DataTypes/DataTypeArray.h>
|
#include <DataTypes/DataTypeArray.h>
|
||||||
#include <IO/ReadHelpers.h>
|
#include <IO/ReadHelpers.h>
|
||||||
#include <IO/WriteHelpers.h>
|
#include <IO/WriteHelpers.h>
|
||||||
@ -31,7 +29,6 @@ namespace DB
|
|||||||
|
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int CANNOT_ALLOCATE_MEMORY;
|
|
||||||
extern const int FORMAT_VERSION_TOO_OLD;
|
extern const int FORMAT_VERSION_TOO_OLD;
|
||||||
extern const int ILLEGAL_COLUMN;
|
extern const int ILLEGAL_COLUMN;
|
||||||
extern const int INCORRECT_DATA;
|
extern const int INCORRECT_DATA;
|
||||||
@ -133,8 +130,7 @@ void USearchIndexWithSerialization::deserialize(ReadBuffer & istr)
|
|||||||
/// See the comment in MergeTreeIndexGranuleVectorSimilarity::deserializeBinary why we throw here
|
/// See the comment in MergeTreeIndexGranuleVectorSimilarity::deserializeBinary why we throw here
|
||||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not load vector similarity index. Please drop the index and create it again. Error: {}", String(result.error.release()));
|
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not load vector similarity index. Please drop the index and create it again. Error: {}", String(result.error.release()));
|
||||||
|
|
||||||
if (!try_reserve(limits()))
|
try_reserve(limits());
|
||||||
throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for usearch index");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
USearchIndexWithSerialization::Statistics USearchIndexWithSerialization::getStatistics() const
|
USearchIndexWithSerialization::Statistics USearchIndexWithSerialization::getStatistics() const
|
||||||
@ -272,21 +268,27 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c
|
|||||||
throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column with vector similarity index must have equal length");
|
throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column with vector similarity index must have equal length");
|
||||||
|
|
||||||
/// Reserving space is mandatory
|
/// Reserving space is mandatory
|
||||||
if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows)))
|
index->reserve(roundUpToPowerOfTwoOrZero(index->size() + rows));
|
||||||
throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index");
|
|
||||||
|
|
||||||
auto & thread_pool = Context::getGlobalContextInstance()->getVectorSimilarityIndexCreationThreadPool();
|
/// Vector index creation is slooooow. Add the new rows in parallel. The threadpool is global to avoid oversubscription when multiple
|
||||||
|
/// indexes are build simultaneously (e.g. multiple merges run at the same time).
|
||||||
|
auto & thread_pool = Context::getGlobalContextInstance()->getBuildVectorSimilarityIndexThreadPool();
|
||||||
|
|
||||||
auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group)
|
auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group)
|
||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachFromGroupIfNotDetached(););
|
SCOPE_EXIT_SAFE(
|
||||||
|
if (thread_group)
|
||||||
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
|
);
|
||||||
|
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachToGroupIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
|
|
||||||
|
/// add is thread-safe
|
||||||
if (auto result = index->add(key, &column_array_data_float_data[column_array_offsets[row - 1]]); !result)
|
if (auto result = index->add(key, &column_array_data_float_data[column_array_offsets[row - 1]]); !result)
|
||||||
throw Exception(
|
{
|
||||||
ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release()));
|
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release()));
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ProfileEvents::increment(ProfileEvents::USearchAddCount);
|
ProfileEvents::increment(ProfileEvents::USearchAddCount);
|
||||||
@ -295,14 +297,15 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
size_t current_index_size = index->size();
|
size_t index_size = index->size();
|
||||||
|
|
||||||
for (size_t row = 0; row < rows; ++row)
|
for (size_t row = 0; row < rows; ++row)
|
||||||
{
|
{
|
||||||
auto key = static_cast<USearchIndex::vector_key_t>(current_index_size + row);
|
auto key = static_cast<USearchIndex::vector_key_t>(index_size + row);
|
||||||
auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); };
|
auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); };
|
||||||
thread_pool.scheduleOrThrowOnError(task);
|
thread_pool.scheduleOrThrowOnError(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
thread_pool.wait();
|
thread_pool.wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user