Compare commits

...

13 Commits

Author SHA1 Message Date
Robert Schulze
06c1a4c019
Merge 7cc4f8460f into 44b4bd38b9 2024-11-20 15:25:26 -08:00
Mikhail Artemenko
44b4bd38b9
Merge pull request #72045 from ClickHouse/issues/70174/cluster_versions
Enable cluster table functions for DataLake Storages
2024-11-20 21:22:37 +00:00
Mikhail Artemenko
4ccebd9a24 fix syntax for iceberg in docs 2024-11-20 11:15:39 +00:00
Mikhail Artemenko
99177c0daf remove icebergCluster alias 2024-11-20 11:15:12 +00:00
Mikhail Artemenko
0951991c1d update aspell-dict.txt 2024-11-19 13:10:42 +00:00
Mikhail Artemenko
19aec5e572 Merge branch 'issues/70174/cluster_versions' of github.com:ClickHouse/ClickHouse into issues/70174/cluster_versions 2024-11-19 12:51:56 +00:00
Mikhail Artemenko
a367de9977 add docs 2024-11-19 12:49:59 +00:00
Mikhail Artemenko
6894e280b2 fix pr issues 2024-11-19 12:34:42 +00:00
Mikhail Artemenko
39ebe113d9 Merge branch 'master' into issues/70174/cluster_versions 2024-11-19 11:28:46 +00:00
Robert Schulze
7cc4f8460f
Scalar quantization for i8 2024-11-19 10:29:48 +00:00
robot-clickhouse
014608fb6b Automatic style fix 2024-11-18 17:51:51 +00:00
Mikhail Artemenko
a29ded4941 add test for iceberg 2024-11-18 17:39:46 +00:00
Mikhail Artemenko
d2efae7511 enable cluster versions for datalake storages 2024-11-18 17:35:21 +00:00
26 changed files with 737 additions and 62 deletions

2
contrib/SimSIMD vendored

@ -1 +1 @@
Subproject commit fa60f1b8e3582c50978f0ae86c2ebb6c9af957f3 Subproject commit da2d38537299ade247c2499131d936fb8db38f03

2
contrib/usearch vendored

@ -1 +1 @@
Subproject commit 7efe8b710c9831bfe06573b1df0fad001b04a2b5 Subproject commit 9561fcae1249ea8effbf71250e8a7a7ea97e5dfe

View File

@ -93,7 +93,10 @@ Vector similarity indexes currently support two distance functions:
([Wikipedia](https://en.wikipedia.org/wiki/Cosine_similarity)). ([Wikipedia](https://en.wikipedia.org/wiki/Cosine_similarity)).
Vector similarity indexes allows storing the vectors in reduced precision formats. Supported scalar kinds are `f64`, `f32`, `f16`, `bf16`, Vector similarity indexes allows storing the vectors in reduced precision formats. Supported scalar kinds are `f64`, `f32`, `f16`, `bf16`,
and `i8`. If no scalar kind was specified during index creation, `bf16` is used as default. and `i8`. If no scalar kind was specified during index creation, `bf16` is used as default. For scalar kinds `f64`, `f32`, and `bf16`, the
values are simply downsampled. For `i8`, the values are mapped to range [-127, 127]. To improve precision, scalar quantization is applied to
the uncompressed values. The quantization quantile can be specified using MergeTree setting
`scalar_quantization_quantile_for_vector_similarity_index` (default: 0.99).
For normalized data, `L2Distance` is usually a better choice, otherwise `cosineDistance` is recommended to compensate for scale. If no For normalized data, `L2Distance` is usually a better choice, otherwise `cosineDistance` is recommended to compensate for scale. If no
distance function was specified during index creation, `L2Distance` is used as default. distance function was specified during index creation, `L2Distance` is used as default.

View File

@ -49,4 +49,4 @@ LIMIT 2
**See Also** **See Also**
- [DeltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md) - [DeltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
- [DeltaLake cluster table function](/docs/en/sql-reference/table-functions/deltalakeCluster.md)

View File

@ -0,0 +1,30 @@
---
slug: /en/sql-reference/table-functions/deltalakeCluster
sidebar_position: 46
sidebar_label: deltaLakeCluster
title: "deltaLakeCluster Table Function"
---
This is an extension to the [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function.
Allows processing files from [Delta Lake](https://github.com/delta-io/delta) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
deltaLakeCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- Description of all other arguments coincides with description of arguments in equivalent [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function.
**Returned value**
A table with the specified structure for reading data from cluster in the specified Delta Lake table in S3.
**See Also**
- [deltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
- [deltaLake table function](/docs/en/sql-reference/table-functions/deltalake.md)

View File

@ -29,4 +29,4 @@ A table with the specified structure for reading data in the specified Hudi tabl
**See Also** **See Also**
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md) - [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
- [Hudi cluster table function](/docs/en/sql-reference/table-functions/hudiCluster.md)

View File

@ -0,0 +1,30 @@
---
slug: /en/sql-reference/table-functions/hudiCluster
sidebar_position: 86
sidebar_label: hudiCluster
title: "hudiCluster Table Function"
---
This is an extension to the [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function.
Allows processing files from Apache [Hudi](https://hudi.apache.org/) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
hudiCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- Description of all other arguments coincides with description of arguments in equivalent [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function.
**Returned value**
A table with the specified structure for reading data from cluster in the specified Hudi table in S3.
**See Also**
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
- [Hudi table function](/docs/en/sql-reference/table-functions/hudi.md)

View File

@ -72,3 +72,4 @@ Table function `iceberg` is an alias to `icebergS3` now.
**See Also** **See Also**
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md) - [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
- [Iceberg cluster table function](/docs/en/sql-reference/table-functions/icebergCluster.md)

View File

@ -0,0 +1,43 @@
---
slug: /en/sql-reference/table-functions/icebergCluster
sidebar_position: 91
sidebar_label: icebergCluster
title: "icebergCluster Table Function"
---
This is an extension to the [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function.
Allows processing files from Apache [Iceberg](https://iceberg.apache.org/) in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
icebergS3Cluster(cluster_name, url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method])
icebergS3Cluster(cluster_name, named_collection[, option=value [,..]])
icebergAzureCluster(cluster_name, connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method])
icebergAzureCluster(cluster_name, named_collection[, option=value [,..]])
icebergHDFSCluster(cluster_name, path_to_table, [,format] [,compression_method])
icebergHDFSCluster(cluster_name, named_collection[, option=value [,..]])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- Description of all other arguments coincides with description of arguments in equivalent [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function.
**Returned value**
A table with the specified structure for reading data from cluster in the specified Iceberg table.
**Examples**
```sql
SELECT * FROM icebergS3Cluster('cluster_simple', 'http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test')
```
**See Also**
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
- [Iceberg table function](/docs/en/sql-reference/table-functions/iceberg.md)

View File

@ -26,6 +26,8 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsString primary_key_compression_codec; extern const MergeTreeSettingsString primary_key_compression_codec;
extern const MergeTreeSettingsBool use_adaptive_write_buffer_for_dynamic_subcolumns; extern const MergeTreeSettingsBool use_adaptive_write_buffer_for_dynamic_subcolumns;
extern const MergeTreeSettingsBool use_compact_variant_discriminators_serialization; extern const MergeTreeSettingsBool use_compact_variant_discriminators_serialization;
extern const MergeTreeSettingsFloat scalar_quantization_quantile_for_vector_similarity_index;
extern const MergeTreeSettingsUInt64 scalar_quantization_buffer_size_for_vector_similarity_index;
} }
MergeTreeWriterSettings::MergeTreeWriterSettings( MergeTreeWriterSettings::MergeTreeWriterSettings(
@ -55,6 +57,8 @@ MergeTreeWriterSettings::MergeTreeWriterSettings(
, use_compact_variant_discriminators_serialization((*storage_settings)[MergeTreeSetting::use_compact_variant_discriminators_serialization]) , use_compact_variant_discriminators_serialization((*storage_settings)[MergeTreeSetting::use_compact_variant_discriminators_serialization])
, use_adaptive_write_buffer_for_dynamic_subcolumns((*storage_settings)[MergeTreeSetting::use_adaptive_write_buffer_for_dynamic_subcolumns]) , use_adaptive_write_buffer_for_dynamic_subcolumns((*storage_settings)[MergeTreeSetting::use_adaptive_write_buffer_for_dynamic_subcolumns])
, adaptive_write_buffer_initial_size((*storage_settings)[MergeTreeSetting::adaptive_write_buffer_initial_size]) , adaptive_write_buffer_initial_size((*storage_settings)[MergeTreeSetting::adaptive_write_buffer_initial_size])
, scalar_quantization_quantile_for_vector_similarity_index((*storage_settings)[MergeTreeSetting::scalar_quantization_quantile_for_vector_similarity_index])
, scalar_quantization_buffer_size_for_vector_similarity_index((*storage_settings)[MergeTreeSetting::scalar_quantization_buffer_size_for_vector_similarity_index])
{ {
} }

View File

@ -87,6 +87,9 @@ struct MergeTreeWriterSettings
bool use_compact_variant_discriminators_serialization; bool use_compact_variant_discriminators_serialization;
bool use_adaptive_write_buffer_for_dynamic_subcolumns; bool use_adaptive_write_buffer_for_dynamic_subcolumns;
size_t adaptive_write_buffer_initial_size; size_t adaptive_write_buffer_initial_size;
Float64 scalar_quantization_quantile_for_vector_similarity_index;
UInt64 scalar_quantization_buffer_size_for_vector_similarity_index;
}; };
} }

View File

@ -72,6 +72,13 @@ const std::unordered_map<String, unum::usearch::scalar_kind_t> quantizationToSca
{"i8", unum::usearch::scalar_kind_t::i8_k}}; {"i8", unum::usearch::scalar_kind_t::i8_k}};
/// Usearch provides more quantizations but ^^ above ones seem the only ones comprehensively supported across all distance functions. /// Usearch provides more quantizations but ^^ above ones seem the only ones comprehensively supported across all distance functions.
/// The vector similarity index implements scalar quantization on top of Usearch. This is the target type (currently, only i8 is supported).
using QuantizedValue = unum::usearch::i8_t;
/// The maximum number of dimensions for scalar quantization. The purpose is to be able to allocate space for the result row on the stack
/// (std::array) instead of the heap (std::vector). The value can be chosen randomly as long as the stack doesn't overflow.
constexpr size_t MAX_DIMENSIONS_FOR_SCALAR_QUANTIZATION = 3000;
template<typename T> template<typename T>
concept is_set = std::same_as<T, std::set<typename T::key_type, typename T::key_compare, typename T::allocator_type>>; concept is_set = std::same_as<T, std::set<typename T::key_type, typename T::key_compare, typename T::allocator_type>>;
@ -214,6 +221,16 @@ void MergeTreeIndexGranuleVectorSimilarity::serializeBinary(WriteBuffer & ostr)
index->serialize(ostr); index->serialize(ostr);
writeIntBinary(index->scalar_quantization_codebooks ? static_cast<UInt64>(1) : static_cast<UInt64>(0), ostr);
if (index->scalar_quantization_codebooks)
{
for (const auto codebook : *(index->scalar_quantization_codebooks))
{
writeFloatBinary(codebook.min, ostr);
writeFloatBinary(codebook.max, ostr);
}
}
auto statistics = index->getStatistics(); auto statistics = index->getStatistics();
LOG_TRACE(logger, "Wrote vector similarity index: {}", statistics.toString()); LOG_TRACE(logger, "Wrote vector similarity index: {}", statistics.toString());
} }
@ -232,12 +249,27 @@ void MergeTreeIndexGranuleVectorSimilarity::deserializeBinary(ReadBuffer & istr,
/// More fancy error handling would be: Set a flag on the index that it failed to load. During usage return all granules, i.e. /// More fancy error handling would be: Set a flag on the index that it failed to load. During usage return all granules, i.e.
/// behave as if the index does not exist. Since format changes are expected to happen only rarely and it is "only" an index, keep it simple for now. /// behave as if the index does not exist. Since format changes are expected to happen only rarely and it is "only" an index, keep it simple for now.
UInt64 dimension; UInt64 dimensions;
readIntBinary(dimension, istr); readIntBinary(dimensions, istr);
index = std::make_shared<USearchIndexWithSerialization>(dimension, metric_kind, scalar_kind, usearch_hnsw_params); index = std::make_shared<USearchIndexWithSerialization>(dimensions, metric_kind, scalar_kind, usearch_hnsw_params);
index->deserialize(istr); index->deserialize(istr);
UInt64 has_scalar_quantization_codebooks;
readIntBinary(has_scalar_quantization_codebooks, istr);
if (has_scalar_quantization_codebooks)
{
index->scalar_quantization_codebooks = std::make_optional<ScalarQuantizationCodebooks>();
for (size_t dimension = 0; dimension < dimensions; ++dimension)
{
Float64 min;
Float64 max;
readFloatBinary(min, istr);
readFloatBinary(max, istr);
index->scalar_quantization_codebooks->push_back({min, max});
}
}
auto statistics = index->getStatistics(); auto statistics = index->getStatistics();
LOG_TRACE(logger, "Loaded vector similarity index: {}", statistics.toString()); LOG_TRACE(logger, "Loaded vector similarity index: {}", statistics.toString());
} }
@ -247,12 +279,16 @@ MergeTreeIndexAggregatorVectorSimilarity::MergeTreeIndexAggregatorVectorSimilari
const Block & index_sample_block_, const Block & index_sample_block_,
unum::usearch::metric_kind_t metric_kind_, unum::usearch::metric_kind_t metric_kind_,
unum::usearch::scalar_kind_t scalar_kind_, unum::usearch::scalar_kind_t scalar_kind_,
UsearchHnswParams usearch_hnsw_params_) UsearchHnswParams usearch_hnsw_params_,
Float64 scalar_quantization_quantile_,
size_t scalar_quantization_buffer_size_)
: index_name(index_name_) : index_name(index_name_)
, index_sample_block(index_sample_block_) , index_sample_block(index_sample_block_)
, metric_kind(metric_kind_) , metric_kind(metric_kind_)
, scalar_kind(scalar_kind_) , scalar_kind(scalar_kind_)
, usearch_hnsw_params(usearch_hnsw_params_) , usearch_hnsw_params(usearch_hnsw_params_)
, scalar_quantization_quantile(scalar_quantization_quantile_)
, scalar_quantization_buffer_size(scalar_quantization_buffer_size_)
{ {
} }
@ -266,8 +302,80 @@ MergeTreeIndexGranulePtr MergeTreeIndexAggregatorVectorSimilarity::getGranuleAnd
namespace namespace
{ {
template <typename Value>
ScalarQuantizationCodebook calculateCodebook(std::vector<Value> & values, Float64 quantile)
{
if (values.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "values is empty");
std::ranges::sort(values);
size_t minimum_element_index = static_cast<size_t>(values.size() * (1.0 - quantile));
size_t maximum_element_index = std::min(static_cast<size_t>(values.size() * quantile), values.size() - 1);
return {values[minimum_element_index], values[maximum_element_index]};
}
template <typename Value>
void quantize(
const Value * values, size_t dimensions, const ScalarQuantizationCodebooks & codebooks,
std::array<QuantizedValue, MAX_DIMENSIONS_FOR_SCALAR_QUANTIZATION> & quantized_vector)
{
/// Does a similar calculation as in Usearch's cast_to_i8_gt::try_(byte_t const* input, std::size_t dim, byte_t* output)
/// For some reason, USearch does not map into range [-std::numeric_limits<Int8>, std::numeric_limits<Int8>]
/// aka. [-128, 127], it maps into [-127, 127]. Do the same here.
constexpr QuantizedValue i8_min = -127;
constexpr QuantizedValue i8_max = 127;
Float64 magnitude = 0.0;
for (size_t dimension = 0; dimension != dimensions; ++dimension)
{
Float64 value = static_cast<Float64>(*(values + dimension));
magnitude += value * value;
}
magnitude = std::sqrt(magnitude);
if (magnitude == 0.0)
{
for (std::size_t dimension = 0; dimension != dimensions; ++dimension)
quantized_vector[dimension] = 0;
return;
}
for (std::size_t dimension = 0; dimension != dimensions; ++dimension)
{
Float64 value = static_cast<Float64>(*(values + dimension));
const ScalarQuantizationCodebook & codebook = codebooks[dimension];
if (value < codebook.min)
{
quantized_vector[dimension] = i8_min;
continue;
}
if (value > codebook.max)
{
quantized_vector[dimension] = i8_max;
continue;
}
quantized_vector[dimension] = static_cast<QuantizedValue>(std::clamp(value * i8_max / magnitude, static_cast<Float64>(i8_min), static_cast<Float64>(i8_max)));
}
/// for (size_t dimension = 0; dimension < dimensions; ++dimension)
/// {
/// const ScalarQuantizationCodebook & codebook = codebooks[dimension];
/// Float64 value = static_cast<Float64>(*(values + dimension));
/// LOG_TRACE(getLogger("Vector Similarity Index"), "{}: {} --> {} (cb: [{}, {}])", dimension, value, quantized_vector[dimension], codebook.min, codebook.max);
/// }
}
template <typename Column> template <typename Column>
void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & column_array_offsets, USearchIndexWithSerializationPtr & index, size_t dimensions, size_t rows) void updateImpl(
const ColumnArray * column_array, const ColumnArray::Offsets & column_array_offsets, USearchIndexWithSerializationPtr & index,
size_t dimensions, size_t rows,
Float64 scalar_quantization_quantile, size_t scalar_quantization_buffer_size)
{ {
const auto & column_array_data = column_array->getData(); const auto & column_array_data = column_array->getData();
const auto & column_array_data_float = typeid_cast<const Column &>(column_array_data); const auto & column_array_data_float = typeid_cast<const Column &>(column_array_data);
@ -278,6 +386,51 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c
if (column_array_offsets[row + 1] - column_array_offsets[row] != dimensions) if (column_array_offsets[row + 1] - column_array_offsets[row] != dimensions)
throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column with vector similarity index must have equal length"); throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column with vector similarity index must have equal length");
/// ------------------
/// "Quantization" in Usearch means mere downsampling. We implement scalar quantization by ourselves.
/// The math only works for i8 and cosine distance.
/// --> compute for every dimension the quantiles and store them as "codebook" in the index.
if (index->scalar_kind() == unum::usearch::scalar_kind_t::i8_k
&& index->metric_kind() == unum::usearch::metric_kind_t::cos_k
&& scalar_quantization_buffer_size != 0 && dimensions < MAX_DIMENSIONS_FOR_SCALAR_QUANTIZATION)
{
const size_t buffer_size = std::min(rows, scalar_quantization_buffer_size);
/// Note: This function (update) can theoretically be called in a chunked fashion but this is currently not done, i.e. update is
/// called exactly once per index granule. This simplifies the code, so we make this assumption for now (otherwise, we'd need to
/// integrate with getGranuleAndReset which "finalizes" the insert of rows).
using ColumnValue = std::conditional_t<std::is_same_v<Column,ColumnFloat32>, Float32, Float64>;
std::vector<std::vector<ColumnValue>> values_per_dimension;
values_per_dimension.resize(dimensions);
for (auto & values : values_per_dimension)
values.resize(buffer_size);
/// Row-to-column conversion, needed because calculateCodebook sorts along each dimension
for (size_t i = 0; i < buffer_size * dimensions; ++i)
{
ColumnValue value = column_array_data_float_data[i];
size_t x = i % dimensions;
size_t y = i / dimensions;
values_per_dimension[x][y] = value;
}
index->scalar_quantization_codebooks = std::make_optional<ScalarQuantizationCodebooks>();
for (size_t dimension = 0; dimension < dimensions; ++dimension)
{
ScalarQuantizationCodebook codebook = calculateCodebook(values_per_dimension[dimension], scalar_quantization_quantile);
/// Invalid codebook that would lead to division-by-0 during quantizaiton. May happen if buffer size is too small or the data
/// distribution is too weird. Continue without quantization.
if (codebook.min == codebook.max)
{
index->scalar_quantization_codebooks = std::nullopt;
break;
}
index->scalar_quantization_codebooks->push_back(codebook);
}
}
/// ------------------
/// Reserving space is mandatory /// Reserving space is mandatory
size_t max_thread_pool_size = Context::getGlobalContextInstance()->getServerSettings()[ServerSetting::max_build_vector_similarity_index_thread_pool_size]; size_t max_thread_pool_size = Context::getGlobalContextInstance()->getServerSettings()[ServerSetting::max_build_vector_similarity_index_thread_pool_size];
if (max_thread_pool_size == 0) if (max_thread_pool_size == 0)
@ -299,24 +452,33 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c
if (thread_group) if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group); CurrentThread::attachToGroupIfDetached(thread_group);
/// add is thread-safe USearchIndexWithSerialization::add_result_t add_result;
auto result = index->add(key, &column_array_data_float_data[column_array_offsets[row - 1]]);
if (!result) if (index->scalar_quantization_codebooks)
{ {
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release())); const ScalarQuantizationCodebooks & codebooks = *(index->scalar_quantization_codebooks);
std::array<QuantizedValue, MAX_DIMENSIONS_FOR_SCALAR_QUANTIZATION> quantized_vector;
quantize(&column_array_data_float_data[column_array_offsets[row - 1]], dimensions, codebooks, quantized_vector);
add_result = index->add(key, quantized_vector.data());
}
else
{
add_result = index->add(key, &column_array_data_float_data[column_array_offsets[row - 1]]);
} }
if (!add_result)
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(add_result.error.release()));
ProfileEvents::increment(ProfileEvents::USearchAddCount); ProfileEvents::increment(ProfileEvents::USearchAddCount);
ProfileEvents::increment(ProfileEvents::USearchAddVisitedMembers, result.visited_members); ProfileEvents::increment(ProfileEvents::USearchAddVisitedMembers, add_result.visited_members);
ProfileEvents::increment(ProfileEvents::USearchAddComputedDistances, result.computed_distances); ProfileEvents::increment(ProfileEvents::USearchAddComputedDistances, add_result.computed_distances);
}; };
size_t index_size = index->size(); const size_t index_size = index->size();
for (size_t row = 0; row < rows; ++row) for (size_t row = 0; row < rows; ++row)
{ {
auto key = static_cast<USearchIndex::vector_key_t>(index_size + row); auto key = static_cast<USearchIndex::vector_key_t>(index_size + row);
auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); }; auto task = [&add_vector_to_index, key, row, thread_group = CurrentThread::getGroup()] { add_vector_to_index(key, row, thread_group); };
thread_pool.scheduleOrThrowOnError(task); thread_pool.scheduleOrThrowOnError(task);
} }
@ -386,13 +548,12 @@ void MergeTreeIndexAggregatorVectorSimilarity::update(const Block & block, size_
const TypeIndex nested_type_index = data_type_array->getNestedType()->getTypeId(); const TypeIndex nested_type_index = data_type_array->getNestedType()->getTypeId();
if (WhichDataType(nested_type_index).isFloat32()) if (WhichDataType(nested_type_index).isFloat32())
updateImpl<ColumnFloat32>(column_array, column_array_offsets, index, dimensions, rows); updateImpl<ColumnFloat32>(column_array, column_array_offsets, index, dimensions, rows, scalar_quantization_quantile, scalar_quantization_buffer_size);
else if (WhichDataType(nested_type_index).isFloat64()) else if (WhichDataType(nested_type_index).isFloat64())
updateImpl<ColumnFloat64>(column_array, column_array_offsets, index, dimensions, rows); updateImpl<ColumnFloat64>(column_array, column_array_offsets, index, dimensions, rows, scalar_quantization_quantile, scalar_quantization_buffer_size);
else else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected data type Array(Float*)"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected data type Array(Float*)");
*pos += rows_read; *pos += rows_read;
} }
@ -448,12 +609,35 @@ std::vector<UInt64> MergeTreeIndexConditionVectorSimilarity::calculateApproximat
/// synchronize index access, see https://github.com/unum-cloud/usearch/issues/500. As a workaround, we extended USearch' search method /// synchronize index access, see https://github.com/unum-cloud/usearch/issues/500. As a workaround, we extended USearch' search method
/// to accept a custom expansion_add setting. The config value is only used on the fly, i.e. not persisted in the index. /// to accept a custom expansion_add setting. The config value is only used on the fly, i.e. not persisted in the index.
auto search_result = index->search(reference_vector.data(), limit, USearchIndex::any_thread(), false, expansion_search); std::vector<USearchIndex::vector_key_t> neighbors; /// indexes of vectors which were closest to the reference vector
if (!search_result)
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not search in vector similarity index. Error: {}", String(search_result.error.release()));
std::vector<USearchIndex::vector_key_t> neighbors(search_result.size()); /// indexes of vectors which were closest to the reference vector if (index->scalar_quantization_codebooks)
search_result.dump_to(neighbors.data()); {
const ScalarQuantizationCodebooks & codebooks = *(index->scalar_quantization_codebooks);
std::array<QuantizedValue, MAX_DIMENSIONS_FOR_SCALAR_QUANTIZATION> quantized_vector;
quantize(reference_vector.data(), index->dimensions(), codebooks, quantized_vector);
auto search_result = index->search(quantized_vector.data(), limit, USearchIndex::any_thread(), false, expansion_search);
if (!search_result)
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not search in vector similarity index. Error: {}", search_result.error.release());
neighbors.resize(search_result.size());
search_result.dump_to(neighbors.data());
ProfileEvents::increment(ProfileEvents::USearchSearchCount);
ProfileEvents::increment(ProfileEvents::USearchSearchVisitedMembers, search_result.visited_members);
ProfileEvents::increment(ProfileEvents::USearchSearchComputedDistances, search_result.computed_distances);
}
else
{
auto search_result = index->search(reference_vector.data(), limit, USearchIndex::any_thread(), false, expansion_search);
if (!search_result)
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not search in vector similarity index. Error: {}", search_result.error.release());
neighbors.resize(search_result.size());
search_result.dump_to(neighbors.data());
ProfileEvents::increment(ProfileEvents::USearchSearchCount);
ProfileEvents::increment(ProfileEvents::USearchSearchVisitedMembers, search_result.visited_members);
ProfileEvents::increment(ProfileEvents::USearchSearchComputedDistances, search_result.computed_distances);
}
std::sort(neighbors.begin(), neighbors.end()); std::sort(neighbors.begin(), neighbors.end());
@ -466,10 +650,6 @@ std::vector<UInt64> MergeTreeIndexConditionVectorSimilarity::calculateApproximat
neighbors.erase(std::unique(neighbors.begin(), neighbors.end()), neighbors.end()); neighbors.erase(std::unique(neighbors.begin(), neighbors.end()), neighbors.end());
#endif #endif
ProfileEvents::increment(ProfileEvents::USearchSearchCount);
ProfileEvents::increment(ProfileEvents::USearchSearchVisitedMembers, search_result.visited_members);
ProfileEvents::increment(ProfileEvents::USearchSearchComputedDistances, search_result.computed_distances);
return neighbors; return neighbors;
} }
@ -490,9 +670,15 @@ MergeTreeIndexGranulePtr MergeTreeIndexVectorSimilarity::createIndexGranule() co
return std::make_shared<MergeTreeIndexGranuleVectorSimilarity>(index.name, metric_kind, scalar_kind, usearch_hnsw_params); return std::make_shared<MergeTreeIndexGranuleVectorSimilarity>(index.name, metric_kind, scalar_kind, usearch_hnsw_params);
} }
MergeTreeIndexAggregatorPtr MergeTreeIndexVectorSimilarity::createIndexAggregator(const MergeTreeWriterSettings & /*settings*/) const MergeTreeIndexAggregatorPtr MergeTreeIndexVectorSimilarity::createIndexAggregator(const MergeTreeWriterSettings & settings) const
{ {
return std::make_shared<MergeTreeIndexAggregatorVectorSimilarity>(index.name, index.sample_block, metric_kind, scalar_kind, usearch_hnsw_params); Float64 scalar_quantization_quantile = settings.scalar_quantization_quantile_for_vector_similarity_index;
size_t scalar_quantization_buffer_size = settings.scalar_quantization_buffer_size_for_vector_similarity_index;
if (scalar_quantization_quantile < 0.5 || scalar_quantization_quantile > 1.0)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Setting 'scalar_quantization_quantile_for_vector_similarity_index' must be in [0.5, 1.0]");
return std::make_shared<MergeTreeIndexAggregatorVectorSimilarity>(index.name, index.sample_block, metric_kind, scalar_kind, usearch_hnsw_params, scalar_quantization_quantile, scalar_quantization_buffer_size);
} }
MergeTreeIndexConditionPtr MergeTreeIndexVectorSimilarity::createIndexCondition(const SelectQueryInfo & query, ContextPtr context) const MergeTreeIndexConditionPtr MergeTreeIndexVectorSimilarity::createIndexCondition(const SelectQueryInfo & query, ContextPtr context) const

View File

@ -4,6 +4,7 @@
#if USE_USEARCH #if USE_USEARCH
#include <Storages/MergeTree/MergeTreeIOSettings.h>
#include <Storages/MergeTree/VectorSimilarityCondition.h> #include <Storages/MergeTree/VectorSimilarityCondition.h>
#include <Common/Logger.h> #include <Common/Logger.h>
#include <usearch/index_dense.hpp> #include <usearch/index_dense.hpp>
@ -25,6 +26,16 @@ struct UsearchHnswParams
size_t expansion_add = default_expansion_add; size_t expansion_add = default_expansion_add;
}; };
/// Statistics required to apply scalar quantization to a single dimension of a vector.
struct ScalarQuantizationCodebook
{
Float64 min;
Float64 max;
};
/// Statistics required to apply scalar quantization to all dimensions of a vector.
using ScalarQuantizationCodebooks = std::vector<ScalarQuantizationCodebook>;
using USearchIndex = unum::usearch::index_dense_t; using USearchIndex = unum::usearch::index_dense_t;
class USearchIndexWithSerialization : public USearchIndex class USearchIndexWithSerialization : public USearchIndex
@ -59,6 +70,8 @@ public:
String toString() const; String toString() const;
}; };
std::optional<ScalarQuantizationCodebooks> scalar_quantization_codebooks;
Statistics getStatistics() const; Statistics getStatistics() const;
}; };
@ -100,7 +113,7 @@ private:
/// Note: USearch prefixes the serialized data with its own version header. We can't rely on that because 1. the index in ClickHouse /// Note: USearch prefixes the serialized data with its own version header. We can't rely on that because 1. the index in ClickHouse
/// is (at least in theory) agnostic of specific vector search libraries, and 2. additional data (e.g. the number of dimensions) /// is (at least in theory) agnostic of specific vector search libraries, and 2. additional data (e.g. the number of dimensions)
/// outside USearch exists which we should version separately. /// outside USearch exists which we should version separately.
static constexpr UInt64 FILE_FORMAT_VERSION = 1; static constexpr UInt64 FILE_FORMAT_VERSION = 2;
}; };
@ -111,7 +124,9 @@ struct MergeTreeIndexAggregatorVectorSimilarity final : IMergeTreeIndexAggregato
const Block & index_sample_block, const Block & index_sample_block,
unum::usearch::metric_kind_t metric_kind_, unum::usearch::metric_kind_t metric_kind_,
unum::usearch::scalar_kind_t scalar_kind_, unum::usearch::scalar_kind_t scalar_kind_,
UsearchHnswParams usearch_hnsw_params_); UsearchHnswParams usearch_hnsw_params_,
Float64 scalar_quantization_quantile_,
size_t quantization_buffer_size_);
~MergeTreeIndexAggregatorVectorSimilarity() override = default; ~MergeTreeIndexAggregatorVectorSimilarity() override = default;
@ -124,6 +139,8 @@ struct MergeTreeIndexAggregatorVectorSimilarity final : IMergeTreeIndexAggregato
const unum::usearch::metric_kind_t metric_kind; const unum::usearch::metric_kind_t metric_kind;
const unum::usearch::scalar_kind_t scalar_kind; const unum::usearch::scalar_kind_t scalar_kind;
const UsearchHnswParams usearch_hnsw_params; const UsearchHnswParams usearch_hnsw_params;
const Float64 scalar_quantization_quantile;
const size_t scalar_quantization_buffer_size;
USearchIndexWithSerializationPtr index; USearchIndexWithSerializationPtr index;
}; };

View File

@ -118,8 +118,8 @@ namespace ErrorCodes
DECLARE(Bool, add_implicit_sign_column_constraint_for_collapsing_engine, false, "If true, add implicit constraint for sign column for CollapsingMergeTree engine.", 0) \ DECLARE(Bool, add_implicit_sign_column_constraint_for_collapsing_engine, false, "If true, add implicit constraint for sign column for CollapsingMergeTree engine.", 0) \
DECLARE(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \ DECLARE(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \
DECLARE(Bool, optimize_row_order, false, "Allow reshuffling of rows during part inserts and merges to improve the compressibility of the new part", 0) \ DECLARE(Bool, optimize_row_order, false, "Allow reshuffling of rows during part inserts and merges to improve the compressibility of the new part", 0) \
DECLARE(Bool, use_adaptive_write_buffer_for_dynamic_subcolumns, true, "Allow to use adaptive writer buffers during writing dynamic subcolumns to reduce memory usage", 0) \
DECLARE(UInt64, adaptive_write_buffer_initial_size, 16 * 1024, "Initial size of an adaptive write buffer", 0) \ DECLARE(UInt64, adaptive_write_buffer_initial_size, 16 * 1024, "Initial size of an adaptive write buffer", 0) \
DECLARE(Bool, use_adaptive_write_buffer_for_dynamic_subcolumns, true, "Allow to use adaptive writer buffers during writing dynamic subcolumns to reduce memory usage", 0) \
DECLARE(UInt64, min_free_disk_bytes_to_perform_insert, 0, "Minimum free disk space bytes to perform an insert.", 0) \ DECLARE(UInt64, min_free_disk_bytes_to_perform_insert, 0, "Minimum free disk space bytes to perform an insert.", 0) \
DECLARE(Float, min_free_disk_ratio_to_perform_insert, 0.0, "Minimum free disk space ratio to perform an insert.", 0) \ DECLARE(Float, min_free_disk_ratio_to_perform_insert, 0.0, "Minimum free disk space ratio to perform an insert.", 0) \
\ \
@ -229,6 +229,8 @@ namespace ErrorCodes
DECLARE(Bool, cache_populated_by_fetch, false, "Only available in ClickHouse Cloud", EXPERIMENTAL) \ DECLARE(Bool, cache_populated_by_fetch, false, "Only available in ClickHouse Cloud", EXPERIMENTAL) \
DECLARE(Bool, force_read_through_cache_for_merges, false, "Force read-through filesystem cache for merges", EXPERIMENTAL) \ DECLARE(Bool, force_read_through_cache_for_merges, false, "Force read-through filesystem cache for merges", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_replacing_merge_with_cleanup, false, "Allow experimental CLEANUP merges for ReplacingMergeTree with is_deleted column.", EXPERIMENTAL) \ DECLARE(Bool, allow_experimental_replacing_merge_with_cleanup, false, "Allow experimental CLEANUP merges for ReplacingMergeTree with is_deleted column.", EXPERIMENTAL) \
DECLARE(Float, scalar_quantization_quantile_for_vector_similarity_index, 0.99f, "The quantile for scalar quantization in the vector similarity index. Must be in [0.5, 1.0].", EXPERIMENTAL) \
DECLARE(UInt64, scalar_quantization_buffer_size_for_vector_similarity_index, 10'000, "The buffer size for scalar quantization in the vector similarity index. 0 disables scalar quantization.", EXPERIMENTAL) \
\ \
/** Compress marks and primary key. */ \ /** Compress marks and primary key. */ \
DECLARE(Bool, compress_marks, true, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \ DECLARE(Bool, compress_marks, true, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \

View File

@ -226,6 +226,26 @@ template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConf
#endif #endif
template class TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>; template class TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
#if USE_AVRO && USE_AWS_S3
template class TableFunctionObjectStorage<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
#endif
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
template class TableFunctionObjectStorage<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
#endif
#if USE_AVRO && USE_HDFS
template class TableFunctionObjectStorage<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
#endif
#if USE_PARQUET && USE_AWS_S3
template class TableFunctionObjectStorage<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
#endif
#if USE_AWS_S3
template class TableFunctionObjectStorage<HudiClusterDefinition, StorageS3HudiConfiguration>;
#endif
#if USE_AVRO #if USE_AVRO
void registerTableFunctionIceberg(TableFunctionFactory & factory) void registerTableFunctionIceberg(TableFunctionFactory & factory)
{ {

View File

@ -96,7 +96,7 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
{ {
.documentation = { .documentation = {
.description=R"(The table function can be used to read the data stored on HDFS in parallel for many nodes in a specified cluster.)", .description=R"(The table function can be used to read the data stored on HDFS in parallel for many nodes in a specified cluster.)",
.examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster_name, uri, format)", ""}}}, .examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster, uri, format)", ""}}},
.allow_readonly = false .allow_readonly = false
} }
); );
@ -105,15 +105,77 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
UNUSED(factory); UNUSED(factory);
} }
#if USE_AVRO
void registerTableFunctionIcebergCluster(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AWS_S3 #if USE_AWS_S3
template class TableFunctionObjectStorageCluster<S3ClusterDefinition, StorageS3Configuration>; factory.registerFunction<TableFunctionIcebergS3Cluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster.)",
.examples{{"icebergS3Cluster", "SELECT * FROM icebergS3Cluster(cluster, url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif #endif
#if USE_AZURE_BLOB_STORAGE #if USE_AZURE_BLOB_STORAGE
template class TableFunctionObjectStorageCluster<AzureClusterDefinition, StorageAzureConfiguration>; factory.registerFunction<TableFunctionIcebergAzureCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster.)",
.examples{{"icebergAzureCluster", "SELECT * FROM icebergAzureCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif #endif
#if USE_HDFS #if USE_HDFS
template class TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>; factory.registerFunction<TableFunctionIcebergHDFSCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster.)",
.examples{{"icebergHDFSCluster", "SELECT * FROM icebergHDFSCluster(cluster, uri, [format], [structure], [compression_method])", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif #endif
} }
#endif
#if USE_AWS_S3
#if USE_PARQUET
void registerTableFunctionDeltaLakeCluster(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionDeltaLakeCluster>(
{.documentation
= {.description = R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster.)",
.examples{{"deltaLakeCluster", "SELECT * FROM deltaLakeCluster(cluster, url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
void registerTableFunctionHudiCluster(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionHudiCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Hudi table stored on object store in parallel for many nodes in a specified cluster.)",
.examples{{"hudiCluster", "SELECT * FROM hudiCluster(cluster, url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AVRO
registerTableFunctionIcebergCluster(factory);
#endif
#if USE_AWS_S3
#if USE_PARQUET
registerTableFunctionDeltaLakeCluster(factory);
#endif
registerTableFunctionHudiCluster(factory);
#endif
}
}

View File

@ -33,6 +33,36 @@ struct HDFSClusterDefinition
static constexpr auto storage_type_name = "HDFSCluster"; static constexpr auto storage_type_name = "HDFSCluster";
}; };
struct IcebergS3ClusterDefinition
{
static constexpr auto name = "icebergS3Cluster";
static constexpr auto storage_type_name = "IcebergS3Cluster";
};
struct IcebergAzureClusterDefinition
{
static constexpr auto name = "icebergAzureCluster";
static constexpr auto storage_type_name = "IcebergAzureCluster";
};
struct IcebergHDFSClusterDefinition
{
static constexpr auto name = "icebergHDFSCluster";
static constexpr auto storage_type_name = "IcebergHDFSCluster";
};
struct DeltaLakeClusterDefinition
{
static constexpr auto name = "deltaLakeCluster";
static constexpr auto storage_type_name = "DeltaLakeS3Cluster";
};
struct HudiClusterDefinition
{
static constexpr auto name = "hudiCluster";
static constexpr auto storage_type_name = "HudiS3Cluster";
};
/** /**
* Class implementing s3/hdfs/azureBlobStorageCluster(...) table functions, * Class implementing s3/hdfs/azureBlobStorageCluster(...) table functions,
* which allow to process many files from S3/HDFS/Azure blob storage on a specific cluster. * which allow to process many files from S3/HDFS/Azure blob storage on a specific cluster.
@ -79,4 +109,25 @@ using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster<AzureClu
#if USE_HDFS #if USE_HDFS
using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>; using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
#endif #endif
#if USE_AVRO && USE_AWS_S3
using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
#endif
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
using TableFunctionIcebergAzureCluster = TableFunctionObjectStorageCluster<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
#endif
#if USE_AVRO && USE_HDFS
using TableFunctionIcebergHDFSCluster = TableFunctionObjectStorageCluster<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
#endif
#if USE_AWS_S3 && USE_PARQUET
using TableFunctionDeltaLakeCluster = TableFunctionObjectStorageCluster<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
#endif
#if USE_AWS_S3
using TableFunctionHudiCluster = TableFunctionObjectStorageCluster<HudiClusterDefinition, StorageS3HudiConfiguration>;
#endif
} }

View File

@ -66,6 +66,7 @@ void registerTableFunctions(bool use_legacy_mongodb_integration [[maybe_unused]]
registerTableFunctionObjectStorage(factory); registerTableFunctionObjectStorage(factory);
registerTableFunctionObjectStorageCluster(factory); registerTableFunctionObjectStorageCluster(factory);
registerDataLakeTableFunctions(factory); registerDataLakeTableFunctions(factory);
registerDataLakeClusterTableFunctions(factory);
} }
} }

View File

@ -70,6 +70,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory);
void registerTableFunctionObjectStorage(TableFunctionFactory & factory); void registerTableFunctionObjectStorage(TableFunctionFactory & factory);
void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory); void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory);
void registerDataLakeTableFunctions(TableFunctionFactory & factory); void registerDataLakeTableFunctions(TableFunctionFactory & factory);
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory);
void registerTableFunctionTimeSeries(TableFunctionFactory & factory); void registerTableFunctionTimeSeries(TableFunctionFactory & factory);

View File

@ -0,0 +1,20 @@
<clickhouse>
<remote_servers>
<cluster_simple>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</cluster_simple>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,6 @@
<clickhouse>
<query_log>
<database>system</database>
<table>query_log</table>
</query_log>
</clickhouse>

View File

@ -73,14 +73,38 @@ def started_cluster():
cluster.add_instance( cluster.add_instance(
"node1", "node1",
main_configs=[ main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml", "configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml", "configs/config.d/filesystem_caches.xml",
], ],
user_configs=["configs/users.d/users.xml"], user_configs=["configs/users.d/users.xml"],
with_minio=True, with_minio=True,
with_azurite=True, with_azurite=True,
stay_alive=True,
with_hdfs=with_hdfs, with_hdfs=with_hdfs,
stay_alive=True,
)
cluster.add_instance(
"node2",
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml",
],
user_configs=["configs/users.d/users.xml"],
stay_alive=True,
)
cluster.add_instance(
"node3",
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml",
],
user_configs=["configs/users.d/users.xml"],
stay_alive=True,
) )
logging.info("Starting cluster...") logging.info("Starting cluster...")
@ -182,6 +206,7 @@ def get_creation_expression(
cluster, cluster,
format="Parquet", format="Parquet",
table_function=False, table_function=False,
run_on_cluster=False,
**kwargs, **kwargs,
): ):
if storage_type == "s3": if storage_type == "s3":
@ -189,35 +214,56 @@ def get_creation_expression(
bucket = kwargs["bucket"] bucket = kwargs["bucket"]
else: else:
bucket = cluster.minio_bucket bucket = cluster.minio_bucket
print(bucket)
if table_function: if run_on_cluster:
return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')" assert table_function
return f"icebergS3Cluster('cluster_simple', s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
else: else:
return f""" if table_function:
DROP TABLE IF EXISTS {table_name}; return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
CREATE TABLE {table_name} else:
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')""" return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
elif storage_type == "azure": elif storage_type == "azure":
if table_function: if run_on_cluster:
assert table_function
return f""" return f"""
icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format}) icebergAzureCluster('cluster_simple', azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
""" """
else: else:
return f""" if table_function:
DROP TABLE IF EXISTS {table_name}; return f"""
CREATE TABLE {table_name} icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})""" """
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
elif storage_type == "hdfs": elif storage_type == "hdfs":
if table_function: if run_on_cluster:
assert table_function
return f""" return f"""
icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/') icebergHDFSCluster('cluster_simple', hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
""" """
else: else:
return f""" if table_function:
DROP TABLE IF EXISTS {table_name}; return f"""
CREATE TABLE {table_name} icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');""" """
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');"""
elif storage_type == "local": elif storage_type == "local":
assert not run_on_cluster
if table_function: if table_function:
return f""" return f"""
icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format}) icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format})
@ -227,6 +273,7 @@ def get_creation_expression(
DROP TABLE IF EXISTS {table_name}; DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name} CREATE TABLE {table_name}
ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});""" ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});"""
else: else:
raise Exception(f"Unknown iceberg storage type: {storage_type}") raise Exception(f"Unknown iceberg storage type: {storage_type}")
@ -492,6 +539,108 @@ def test_types(started_cluster, format_version, storage_type):
) )
@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs"])
def test_cluster_table_function(started_cluster, format_version, storage_type):
if is_arm() and storage_type == "hdfs":
pytest.skip("Disabled test IcebergHDFS for aarch64")
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
TABLE_NAME = (
"test_iceberg_cluster_"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)
def add_df(mode):
write_iceberg_from_df(
spark,
generate_data(spark, 0, 100),
TABLE_NAME,
mode=mode,
format_version=format_version,
)
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
logging.info(f"Adding another dataframe. result files: {files}")
return files
files = add_df(mode="overwrite")
for i in range(1, len(started_cluster.instances)):
files = add_df(mode="append")
logging.info(f"Setup complete. files: {files}")
assert len(files) == 5 + 4 * (len(started_cluster.instances) - 1)
clusters = instance.query(f"SELECT * FROM system.clusters")
logging.info(f"Clusters setup: {clusters}")
# Regular Query only node1
table_function_expr = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True
)
select_regular = (
instance.query(f"SELECT * FROM {table_function_expr}").strip().split()
)
# Cluster Query with node1 as coordinator
table_function_expr_cluster = get_creation_expression(
storage_type,
TABLE_NAME,
started_cluster,
table_function=True,
run_on_cluster=True,
)
select_cluster = (
instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split()
)
# Simple size check
assert len(select_regular) == 600
assert len(select_cluster) == 600
# Actual check
assert select_cluster == select_regular
# Check query_log
for replica in started_cluster.instances.values():
replica.query("SYSTEM FLUSH LOGS")
for node_name, replica in started_cluster.instances.items():
cluster_secondary_queries = (
replica.query(
f"""
SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
WHERE
type = 'QueryStart' AND
positionCaseInsensitive(query, '{storage_type}Cluster') != 0 AND
position(query, '{TABLE_NAME}') != 0 AND
position(query, 'system.query_log') = 0 AND
NOT is_initial_query
"""
)
.strip()
.split("\n")
)
logging.info(
f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}"
)
assert len(cluster_secondary_queries) == 1
@pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"]) @pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"])
def test_delete_files(started_cluster, format_version, storage_type): def test_delete_files(started_cluster, format_version, storage_type):

View File

@ -247,7 +247,7 @@ Expression (Projection)
Name: idx Name: idx
Description: vector_similarity GRANULARITY 2 Description: vector_similarity GRANULARITY 2
Parts: 1/1 Parts: 1/1
Granules: 3/4 Granules: 4/4
-- Index on Array(Float64) column -- Index on Array(Float64) column
6 [0,2] 0 6 [0,2] 0
7 [0,2.1] 0.10000000000000009 7 [0,2.1] 0.10000000000000009

View File

@ -0,0 +1,3 @@
4 [1.4,2.4] 9.830249853916663e-8
14 [1.4,2.4] 9.830249853916663e-8
3 [1.3,2.3] 0.14142142367226698

View File

@ -0,0 +1,34 @@
-- Tags: no-fasttest, no-ordinary-database
-- Tests various scalar quantization for vector similarity indexes with i8 quantization.
-- The effect of quantization is extremely subtle and hard to test, so we are only testing the related settings.
SET allow_experimental_vector_similarity_index = 1;
SET enable_analyzer = 0;
DROP TABLE IF EXISTS tab;
-- Quantization interval invalid
CREATE TABLE tab(id Int32, vec Array(Float32), INDEX idx vec TYPE vector_similarity('hnsw', 'L2Distance', 'i8', 0, 0) GRANULARITY 4) ENGINE = MergeTree ORDER BY id SETTINGS index_granularity = 5, scalar_quantization_quantile_for_vector_similarity_index = 1.1;
INSERT INTO tab VALUES (0, [1.0, 2.0]), (1, [1.1, 2.1]), (2, [1.2, 2.2]), (3, [1.3, 2.3]), (4, [1.4, 2.4]), (5, [1.5, 2.5]), (6, [1.6, 2.6]), (7, [1.7, 2.7]), (8, [1.8, 2.8]), (9, [1.9, 2.9]), (10, [1.0, 2.0]), (11, [1.1, 2.1]), (12, [1.2, 2.2]), (13, [1.3, 2.3]), (14, [1.4, 2.4]), (15, [1.5, 2.5]), (16, [1.6, 2.6]), (17, [1.7, 2.7]), (18, [1.8, 2.8]), (19, [1.9, 2.9]); -- { serverError INVALID_SETTING_VALUE }
DROP TABLE tab;
CREATE TABLE tab(id Int32, vec Array(Float32), INDEX idx vec TYPE vector_similarity('hnsw', 'L2Distance', 'i8', 0, 0) GRANULARITY 4) ENGINE = MergeTree ORDER BY id SETTINGS index_granularity = 5, scalar_quantization_quantile_for_vector_similarity_index = 0.4;
INSERT INTO tab VALUES (0, [1.0, 2.0]), (1, [1.1, 2.1]), (2, [1.2, 2.2]), (3, [1.3, 2.3]), (4, [1.4, 2.4]), (5, [1.5, 2.5]), (6, [1.6, 2.6]), (7, [1.7, 2.7]), (8, [1.8, 2.8]), (9, [1.9, 2.9]), (10, [1.0, 2.0]), (11, [1.1, 2.1]), (12, [1.2, 2.2]), (13, [1.3, 2.3]), (14, [1.4, 2.4]), (15, [1.5, 2.5]), (16, [1.6, 2.6]), (17, [1.7, 2.7]), (18, [1.8, 2.8]), (19, [1.9, 2.9]); -- { serverError INVALID_SETTING_VALUE }
DROP TABLE tab;
-- Test that no bad things happen
CREATE TABLE tab(id Int32, vec Array(Float32), INDEX idx vec TYPE vector_similarity('hnsw', 'L2Distance', 'i8', 0, 0) GRANULARITY 4) ENGINE = MergeTree ORDER BY id SETTINGS index_granularity = 5, scalar_quantization_quantile_for_vector_similarity_index = 0.9;
INSERT INTO tab VALUES (0, [1.0, 2.0]), (1, [1.1, 2.1]), (2, [1.2, 2.2]), (3, [1.3, 2.3]), (4, [1.4, 2.4]), (5, [1.5, 2.5]), (6, [1.6, 2.6]), (7, [1.7, 2.7]), (8, [1.8, 2.8]), (9, [1.9, 2.9]), (10, [1.0, 2.0]), (11, [1.1, 2.1]), (12, [1.2, 2.2]), (13, [1.3, 2.3]), (14, [1.4, 2.4]), (15, [1.5, 2.5]), (16, [1.6, 2.6]), (17, [1.7, 2.7]), (18, [1.8, 2.8]), (19, [1.9, 2.9]);
WITH [1.4, 2.4] AS reference_vec
SELECT id, vec, L2Distance(vec, reference_vec)
FROM tab
ORDER BY L2Distance(vec, reference_vec), id
LIMIT 3;
DROP TABLE tab;

View File

@ -49,6 +49,7 @@ AutoML
Autocompletion Autocompletion
AvroConfluent AvroConfluent
AzureQueue AzureQueue
BFloat
BIGINT BIGINT
BIGSERIAL BIGSERIAL
BORO BORO
@ -244,7 +245,10 @@ Deduplication
DefaultTableEngine DefaultTableEngine
DelayedInserts DelayedInserts
DeliveryTag DeliveryTag
Deltalake
DeltaLake DeltaLake
deltalakeCluster
deltaLakeCluster
Denormalize Denormalize
DestroyAggregatesThreads DestroyAggregatesThreads
DestroyAggregatesThreadsActive DestroyAggregatesThreadsActive
@ -377,10 +381,15 @@ Homebrew's
HorizontalDivide HorizontalDivide
Hostname Hostname
HouseOps HouseOps
hudi
Hudi Hudi
hudiCluster
HudiCluster
HyperLogLog HyperLogLog
Hypot Hypot
IANA IANA
icebergCluster
IcebergCluster
IDE IDE
IDEs IDEs
IDNA IDNA
@ -1668,6 +1677,7 @@ domainWithoutWWWRFC
dont dont
dotProduct dotProduct
dotall dotall
downsampled
downsampling downsampling
dplyr dplyr
dragonbox dragonbox
@ -3157,4 +3167,3 @@ znode
znodes znodes
zookeeperSessionUptime zookeeperSessionUptime
zstd zstd
BFloat