Compare commits

...

20 Commits

Author SHA1 Message Date
Robert Schulze
3d73d115a9
Merge fa81342d6a into 44b4bd38b9 2024-11-21 00:06:10 +01:00
Mikhail Artemenko
44b4bd38b9
Merge pull request #72045 from ClickHouse/issues/70174/cluster_versions
Enable cluster table functions for DataLake Storages
2024-11-20 21:22:37 +00:00
Shichao Jin
40c7d5fd1a
Merge pull request #71894 from udiz/fix-arrayWithConstant-size-estimation
Fix: arrayWithConstant size estimation using row's element size
2024-11-20 19:56:27 +00:00
Mikhail Artemenko
4ccebd9a24 fix syntax for iceberg in docs 2024-11-20 11:15:39 +00:00
Mikhail Artemenko
99177c0daf remove icebergCluster alias 2024-11-20 11:15:12 +00:00
Mikhail Artemenko
0951991c1d update aspell-dict.txt 2024-11-19 13:10:42 +00:00
Mikhail Artemenko
19aec5e572 Merge branch 'issues/70174/cluster_versions' of github.com:ClickHouse/ClickHouse into issues/70174/cluster_versions 2024-11-19 12:51:56 +00:00
Mikhail Artemenko
a367de9977 add docs 2024-11-19 12:49:59 +00:00
Mikhail Artemenko
6894e280b2 fix pr issues 2024-11-19 12:34:42 +00:00
Mikhail Artemenko
39ebe113d9 Merge branch 'master' into issues/70174/cluster_versions 2024-11-19 11:28:46 +00:00
udiz
239bbaa133 use length 2024-11-19 00:00:43 +00:00
udiz
07fac5808d format null on test 2024-11-18 23:08:48 +00:00
Robert Schulze
fa81342d6a
Skipping index cache 2024-11-18 22:54:10 +00:00
udiz
ed95e0781f test uses less memory 2024-11-18 22:48:38 +00:00
robot-clickhouse
014608fb6b Automatic style fix 2024-11-18 17:51:51 +00:00
Mikhail Artemenko
a29ded4941 add test for iceberg 2024-11-18 17:39:46 +00:00
Mikhail Artemenko
d2efae7511 enable cluster versions for datalake storages 2024-11-18 17:35:21 +00:00
udiz
6879aa130a newline 2024-11-13 22:47:54 +00:00
udiz
43f3c886a2 add test 2024-11-13 22:46:36 +00:00
udiz
c383a743f7 arrayWithConstant size estimation using single value size 2024-11-13 20:02:31 +00:00
54 changed files with 880 additions and 80 deletions

View File

@ -250,7 +250,7 @@ Default: 0
## index_mark_cache_size_ratio
The size of the protected queue in the index mark cache relative to the cache's total size.
The size of the protected queue (in case of SLRU policy) in the index mark cache relative to the cache's total size.
Type: Double
@ -278,12 +278,48 @@ Default: 0
## index_uncompressed_cache_size_ratio
The size of the protected queue in the index uncompressed cache relative to the cache's total size.
The size of the protected queue (in case of SLRU policy) in the index uncompressed cache relative to the cache's total size.
Type: Double
Default: 0.5
## skipping_index_cache_policy
Skipping index cache policy name.
Type: String
Default: SLRU
## skipping_index_cache_size
Size of cache for skipping index granules. Zero means disabled.
:::note
This setting can be modified at runtime and will take effect immediately.
:::
Type: UInt64
Default: 5368709120 (= 5 GiB)
## skipping_index_cache_size_ratio
The size of the protected queue (in case of SLRU policy) in the skipping index cache relative to the cache's total size.
Type: Double
Default: 0.5
## skipping_index_cache_max_entries
The maximum number of entries in the skipping index cache.
Type: UInt64
Default: 10000000
## io_thread_pool_queue_size
Queue size for IO thread pool. Zero means unlimited.
@ -314,7 +350,7 @@ Default: 5368709120
## mark_cache_size_ratio
The size of the protected queue in the mark cache relative to the cache's total size.
The size of the protected queue (in case of SLRU policy) in the mark cache relative to the cache's total size.
Type: Double
@ -825,7 +861,7 @@ Default: 0
## uncompressed_cache_size_ratio
The size of the protected queue in the uncompressed cache relative to the cache's total size.
The size of the protected queue (in case of SLRU policy) in the uncompressed cache relative to the cache's total size.
Type: Double

View File

@ -49,4 +49,4 @@ LIMIT 2
**See Also**
- [DeltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
- [DeltaLake cluster table function](/docs/en/sql-reference/table-functions/deltalakeCluster.md)

View File

@ -0,0 +1,30 @@
---
slug: /en/sql-reference/table-functions/deltalakeCluster
sidebar_position: 46
sidebar_label: deltaLakeCluster
title: "deltaLakeCluster Table Function"
---
This is an extension to the [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function.
Allows processing files from [Delta Lake](https://github.com/delta-io/delta) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
deltaLakeCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- Description of all other arguments coincides with description of arguments in equivalent [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function.
**Returned value**
A table with the specified structure for reading data from cluster in the specified Delta Lake table in S3.
**See Also**
- [deltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
- [deltaLake table function](/docs/en/sql-reference/table-functions/deltalake.md)

View File

@ -29,4 +29,4 @@ A table with the specified structure for reading data in the specified Hudi tabl
**See Also**
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
- [Hudi cluster table function](/docs/en/sql-reference/table-functions/hudiCluster.md)

View File

@ -0,0 +1,30 @@
---
slug: /en/sql-reference/table-functions/hudiCluster
sidebar_position: 86
sidebar_label: hudiCluster
title: "hudiCluster Table Function"
---
This is an extension to the [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function.
Allows processing files from Apache [Hudi](https://hudi.apache.org/) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
hudiCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- Description of all other arguments coincides with description of arguments in equivalent [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function.
**Returned value**
A table with the specified structure for reading data from cluster in the specified Hudi table in S3.
**See Also**
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
- [Hudi table function](/docs/en/sql-reference/table-functions/hudi.md)

View File

@ -72,3 +72,4 @@ Table function `iceberg` is an alias to `icebergS3` now.
**See Also**
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
- [Iceberg cluster table function](/docs/en/sql-reference/table-functions/icebergCluster.md)

View File

@ -0,0 +1,43 @@
---
slug: /en/sql-reference/table-functions/icebergCluster
sidebar_position: 91
sidebar_label: icebergCluster
title: "icebergCluster Table Function"
---
This is an extension to the [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function.
Allows processing files from Apache [Iceberg](https://iceberg.apache.org/) in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
icebergS3Cluster(cluster_name, url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method])
icebergS3Cluster(cluster_name, named_collection[, option=value [,..]])
icebergAzureCluster(cluster_name, connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method])
icebergAzureCluster(cluster_name, named_collection[, option=value [,..]])
icebergHDFSCluster(cluster_name, path_to_table, [,format] [,compression_method])
icebergHDFSCluster(cluster_name, named_collection[, option=value [,..]])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- Description of all other arguments coincides with description of arguments in equivalent [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function.
**Returned value**
A table with the specified structure for reading data from cluster in the specified Iceberg table.
**Examples**
```sql
SELECT * FROM icebergS3Cluster('cluster_simple', 'http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test')
```
**See Also**
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
- [Iceberg table function](/docs/en/sql-reference/table-functions/iceberg.md)

View File

@ -91,6 +91,10 @@ namespace ServerSetting
extern const ServerSettingsString index_uncompressed_cache_policy;
extern const ServerSettingsUInt64 index_uncompressed_cache_size;
extern const ServerSettingsDouble index_uncompressed_cache_size_ratio;
extern const ServerSettingsString skipping_index_cache_policy;
extern const ServerSettingsUInt64 skipping_index_cache_size;
extern const ServerSettingsUInt64 skipping_index_cache_max_entries;
extern const ServerSettingsDouble skipping_index_cache_size_ratio;
extern const ServerSettingsUInt64 io_thread_pool_queue_size;
extern const ServerSettingsString mark_cache_policy;
extern const ServerSettingsUInt64 mark_cache_size;
@ -779,6 +783,17 @@ void LocalServer::processConfig()
}
global_context->setIndexMarkCache(index_mark_cache_policy, index_mark_cache_size, index_mark_cache_size_ratio);
String skipping_index_cache_policy = server_settings[ServerSetting::skipping_index_cache_policy];
size_t skipping_index_cache_size = server_settings[ServerSetting::skipping_index_cache_size];
size_t skipping_index_cache_max_count = server_settings[ServerSetting::skipping_index_cache_max_entries];
double skipping_index_cache_size_ratio = server_settings[ServerSetting::skipping_index_cache_size_ratio];
if (skipping_index_cache_size > max_cache_size)
{
skipping_index_cache_size = max_cache_size;
LOG_INFO(log, "Lowered skipping index cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(skipping_index_cache_size));
}
global_context->setSkippingIndexCache(skipping_index_cache_policy, skipping_index_cache_size, skipping_index_cache_max_count, skipping_index_cache_size_ratio);
size_t mmap_cache_size = server_settings[ServerSetting::mmap_cache_size];
if (mmap_cache_size > max_cache_size)
{

View File

@ -214,6 +214,10 @@ namespace ServerSetting
extern const ServerSettingsString index_mark_cache_policy;
extern const ServerSettingsUInt64 index_mark_cache_size;
extern const ServerSettingsDouble index_mark_cache_size_ratio;
extern const ServerSettingsString skipping_index_cache_policy;
extern const ServerSettingsUInt64 skipping_index_cache_size;
extern const ServerSettingsUInt64 skipping_index_cache_max_entries;
extern const ServerSettingsDouble skipping_index_cache_size_ratio;
extern const ServerSettingsString index_uncompressed_cache_policy;
extern const ServerSettingsUInt64 index_uncompressed_cache_size;
extern const ServerSettingsDouble index_uncompressed_cache_size_ratio;
@ -1590,6 +1594,17 @@ try
}
global_context->setIndexMarkCache(index_mark_cache_policy, index_mark_cache_size, index_mark_cache_size_ratio);
String skipping_index_cache_policy = server_settings[ServerSetting::skipping_index_cache_policy];
size_t skipping_index_cache_size = server_settings[ServerSetting::skipping_index_cache_size];
size_t skipping_index_cache_max_entries = server_settings[ServerSetting::skipping_index_cache_max_entries];
double skipping_index_cache_size_ratio = server_settings[ServerSetting::skipping_index_cache_size_ratio];
if (skipping_index_cache_size > max_cache_size)
{
skipping_index_cache_size = max_cache_size;
LOG_INFO(log, "Lowered skipping index cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(skipping_index_cache_size));
}
global_context->setSkippingIndexCache(skipping_index_cache_policy, skipping_index_cache_size, skipping_index_cache_max_entries, skipping_index_cache_size_ratio);
size_t mmap_cache_size = server_settings[ServerSetting::mmap_cache_size];
if (mmap_cache_size > max_cache_size)
{
@ -1899,6 +1914,7 @@ try
global_context->updateMarkCacheConfiguration(*config);
global_context->updateIndexUncompressedCacheConfiguration(*config);
global_context->updateIndexMarkCacheConfiguration(*config);
global_context->updateSkippingIndexCacheConfiguration(*config);
global_context->updateMMappedFileCacheConfiguration(*config);
global_context->updateQueryCacheConfiguration(*config);

View File

@ -166,6 +166,7 @@ enum class AccessType : uint8_t
M(SYSTEM_PREWARM_MARK_CACHE, "SYSTEM PREWARM MARK, PREWARM MARK CACHE, PREWARM MARKS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_MARK_CACHE, "SYSTEM DROP MARK, DROP MARK CACHE, DROP MARKS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_UNCOMPRESSED_CACHE, "SYSTEM DROP UNCOMPRESSED, DROP UNCOMPRESSED CACHE, DROP UNCOMPRESSED", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_SKIPPING_INDEX_CACHE, "SYSTEM DROP SKIPPING INDEX CACHE, DROP SKIPPING INDEX CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_MMAP_CACHE, "SYSTEM DROP MMAP, DROP MMAP CACHE, DROP MMAP", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_QUERY_CACHE, "SYSTEM DROP QUERY, DROP QUERY CACHE, DROP QUERY", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_COMPILED_EXPRESSION_CACHE, "SYSTEM DROP COMPILED EXPRESSION, DROP COMPILED EXPRESSION CACHE, DROP COMPILED EXPRESSIONS", GLOBAL, SYSTEM_DROP_CACHE) \

View File

@ -286,6 +286,7 @@
M(FilesystemCacheDelayedCleanupElements, "Filesystem cache elements in background cleanup queue") \
M(FilesystemCacheHoldFileSegments, "Filesystem cache file segment which are currently hold as unreleasable") \
M(AsyncInsertCacheSize, "Number of async insert hash id in cache") \
M(SkippingIndexCacheSize, "Size of the skipping index cache in bytes") \
M(S3Requests, "S3 requests count") \
M(KeeperAliveConnections, "Number of alive connections") \
M(KeeperOutstandingRequests, "Number of outstanding requests") \

View File

@ -65,6 +65,9 @@
M(DefaultImplementationForNullsRowsWithNulls, "Number of rows which contain null values processed by default implementation for nulls in function execution", ValueType::Number) \
M(MarkCacheHits, "Number of times an entry has been found in the mark cache, so we didn't have to load a mark file.", ValueType::Number) \
M(MarkCacheMisses, "Number of times an entry has not been found in the mark cache, so we had to load a mark file in memory, which is a costly operation, adding to query latency.", ValueType::Number) \
M(SkippingIndexCacheHits, "Number of times an index granule has been found in the skipping index cache.", ValueType::Number) \
M(SkippingIndexCacheMisses, "Number of times an index granule has not been found in the skipping index cache and had to be read from disk.", ValueType::Number) \
M(SkippingIndexCacheWeightLost, "Approximate number of bytes evicted from the secondary index cache.", ValueType::Number) \
M(QueryCacheHits, "Number of times a query result has been found in the query cache (and query computation was avoided). Only updated for SELECT queries with SETTING use_query_cache = 1.", ValueType::Number) \
M(QueryCacheMisses, "Number of times a query result has not been found in the query cache (and required query computation). Only updated for SELECT queries with SETTING use_query_cache = 1.", ValueType::Number) \
/* Each page cache chunk access increments exactly one of the following 5 PageCacheChunk* counters. */ \

View File

@ -101,6 +101,10 @@ static constexpr auto DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO = 0.5;
static constexpr auto DEFAULT_INDEX_MARK_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_INDEX_MARK_CACHE_MAX_SIZE = 5_GiB;
static constexpr auto DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO = 0.3;
static constexpr auto DEFAULT_SKIPPING_INDEX_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_SKIPPING_INDEX_CACHE_MAX_SIZE = 5_GiB;
static constexpr auto DEFAULT_SKIPPING_INDEX_CACHE_SIZE_RATIO = 0.5;
static constexpr auto DEFAULT_SKIPPING_INDEX_CACHE_MAX_ENTRIES = 10'000'000;
static constexpr auto DEFAULT_MMAP_CACHE_MAX_SIZE = 1_KiB; /// chosen by rolling dice
static constexpr auto DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE = 128_MiB;
static constexpr auto DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES = 10'000;

View File

@ -38,11 +38,10 @@ struct FieldRef : public Field
size_t column_idx = 0;
};
/** Range with open or closed ends; possibly unbounded.
*/
/// Range with open or closed ends; possibly unbounded.
struct Range;
/** A serious of range who can overlap or non-overlap.
*/
/// A series of ranges which may overlap.
using Ranges = std::vector<Range>;
/** Range with open or closed ends; possibly unbounded.

View File

@ -98,18 +98,22 @@ namespace DB
\
DECLARE(Double, cache_size_to_ram_max_ratio, 0.5, "Set cache size to RAM max ratio. Allows to lower cache size on low-memory systems.", 0) \
DECLARE(String, uncompressed_cache_policy, DEFAULT_UNCOMPRESSED_CACHE_POLICY, "Uncompressed cache policy name.", 0) \
DECLARE(UInt64, uncompressed_cache_size, DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE, "Size of cache for uncompressed blocks. Zero means disabled.", 0) \
DECLARE(Double, uncompressed_cache_size_ratio, DEFAULT_UNCOMPRESSED_CACHE_SIZE_RATIO, "The size of the protected queue in the uncompressed cache relative to the cache's total size.", 0) \
DECLARE(UInt64, uncompressed_cache_size, DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE, "Maximum size of cache for uncompressed blocks in bytes. Zero means disabled.", 0) \
DECLARE(Double, uncompressed_cache_size_ratio, DEFAULT_UNCOMPRESSED_CACHE_SIZE_RATIO, "The size of the protected queue (in case of SLRU policy) in the uncompressed cache relative to the cache's total size.", 0) \
DECLARE(String, mark_cache_policy, DEFAULT_MARK_CACHE_POLICY, "Mark cache policy name.", 0) \
DECLARE(UInt64, mark_cache_size, DEFAULT_MARK_CACHE_MAX_SIZE, "Size of cache for marks (index of MergeTree family of tables).", 0) \
DECLARE(Double, mark_cache_size_ratio, DEFAULT_MARK_CACHE_SIZE_RATIO, "The size of the protected queue in the mark cache relative to the cache's total size.", 0) \
DECLARE(UInt64, mark_cache_size, DEFAULT_MARK_CACHE_MAX_SIZE, "Maximum size of cache for marks (index of MergeTree family of tables) in bytes.", 0) \
DECLARE(Double, mark_cache_size_ratio, DEFAULT_MARK_CACHE_SIZE_RATIO, "The size of the protected queue (in case of SLRU policy) in the mark cache relative to the cache's total size.", 0) \
DECLARE(Double, mark_cache_prewarm_ratio, 0.95, "The ratio of total size of mark cache to fill during prewarm.", 0) \
DECLARE(String, index_uncompressed_cache_policy, DEFAULT_INDEX_UNCOMPRESSED_CACHE_POLICY, "Secondary index uncompressed cache policy name.", 0) \
DECLARE(UInt64, index_uncompressed_cache_size, DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE, "Size of cache for uncompressed blocks of secondary indices. Zero means disabled.", 0) \
DECLARE(Double, index_uncompressed_cache_size_ratio, DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO, "The size of the protected queue in the secondary index uncompressed cache relative to the cache's total size.", 0) \
DECLARE(UInt64, index_uncompressed_cache_size, DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE, "Maximum size of cache for uncompressed blocks of secondary indices in bytes. Zero means disabled.", 0) \
DECLARE(Double, index_uncompressed_cache_size_ratio, DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO, "The size of the protected queue (in case of SLRU policy) in the secondary index uncompressed cache relative to the cache's total size.", 0) \
DECLARE(String, index_mark_cache_policy, DEFAULT_INDEX_MARK_CACHE_POLICY, "Secondary index mark cache policy name.", 0) \
DECLARE(UInt64, index_mark_cache_size, DEFAULT_INDEX_MARK_CACHE_MAX_SIZE, "Size of cache for secondary index marks. Zero means disabled.", 0) \
DECLARE(Double, index_mark_cache_size_ratio, DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO, "The size of the protected queue in the secondary index mark cache relative to the cache's total size.", 0) \
DECLARE(UInt64, index_mark_cache_size, DEFAULT_INDEX_MARK_CACHE_MAX_SIZE, "Maximum size of cache for secondary index marks in bytes. Zero means disabled.", 0) \
DECLARE(Double, index_mark_cache_size_ratio, DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO, "The size of the protected queue (in case of SLRU policy) in the secondary index mark cache relative to the cache's total size.", 0) \
DECLARE(String, skipping_index_cache_policy, DEFAULT_SKIPPING_INDEX_CACHE_POLICY, "Skipping index granule cache policy name.", 0) \
DECLARE(UInt64, skipping_index_cache_size, DEFAULT_SKIPPING_INDEX_CACHE_MAX_SIZE, "Size of cache for secondary index granules in bytes. Zero means disabled.", 0) \
DECLARE(UInt64, skipping_index_cache_max_entries, DEFAULT_SKIPPING_INDEX_CACHE_MAX_ENTRIES, "Size of cache for secondary index granules in entries. Zero means disabled.", 0) \
DECLARE(Double, skipping_index_cache_size_ratio, DEFAULT_SKIPPING_INDEX_CACHE_SIZE_RATIO, "The size of the protected queue (in case of SLRU policy) in the skipping index granule cache relative to the cache's total size.", 0) \
DECLARE(UInt64, page_cache_chunk_size, 2 << 20, "Bytes per chunk in userspace page cache. Rounded up to a multiple of page size (typically 4 KiB) or huge page size (typically 2 MiB, only if page_cache_use_thp is enabled).", 0) \
DECLARE(UInt64, page_cache_mmap_size, 1 << 30, "Bytes per memory mapping in userspace page cache. Not important.", 0) \
DECLARE(UInt64, page_cache_size, 0, "Amount of virtual memory to map for userspace page cache. If page_cache_use_madv_free is enabled, it's recommended to set this higher than the machine's RAM size. Use 0 to disable userspace page cache.", 0) \

View File

@ -62,16 +62,17 @@ public:
for (size_t i = 0; i < num_rows; ++i)
{
auto array_size = col_num->getInt(i);
auto element_size = col_value->byteSizeAt(i);
if (unlikely(array_size < 0))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} cannot be negative: while executing function {}", array_size, getName());
Int64 estimated_size = 0;
if (unlikely(common::mulOverflow(array_size, col_value->byteSize(), estimated_size)))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, col_value->byteSize(), getName());
if (unlikely(common::mulOverflow(array_size, element_size, estimated_size)))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, element_size, getName());
if (unlikely(estimated_size > max_array_size_in_columns_bytes))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, col_value->byteSize(), getName());
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, element_size, getName());
offset += array_size;

View File

@ -101,6 +101,11 @@ UInt64 BloomFilter::isEmpty() const
return true;
}
size_t BloomFilter::memoryUsageBytes() const
{
return filter.capacity() * sizeof(UnderType);
}
bool operator== (const BloomFilter & a, const BloomFilter & b)
{
for (size_t i = 0; i < a.words; ++i)

View File

@ -55,6 +55,8 @@ public:
/// For debug.
UInt64 isEmpty() const;
size_t memoryUsageBytes() const;
friend bool operator== (const BloomFilter & a, const BloomFilter & b);
private:

View File

@ -34,6 +34,7 @@
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/SkippingIndexCache.h>
#include <Storages/Distributed/DistributedSettings.h>
#include <Storages/CompressionCodecSelector.h>
#include <IO/S3Settings.h>
@ -415,6 +416,7 @@ struct ContextSharedPart : boost::noncopyable
mutable OnceFlag build_vector_similarity_index_threadpool_initialized;
mutable std::unique_ptr<ThreadPool> build_vector_similarity_index_threadpool; /// Threadpool for vector-similarity index creation.
mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices.
mutable SkippingIndexCachePtr skipping_index_cache TSA_GUARDED_BY(mutex); /// Cache of deserialized secondary index granules.
mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results.
mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices.
mutable MMappedFileCachePtr mmap_cache TSA_GUARDED_BY(mutex); /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
@ -3324,6 +3326,43 @@ void Context::clearIndexMarkCache() const
shared->index_mark_cache->clear();
}
void Context::setSkippingIndexCache(const String & cache_policy, size_t max_size_in_bytes, size_t max_entries, double size_ratio)
{
std::lock_guard lock(shared->mutex);
if (shared->skipping_index_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Skipping index cache has been already created.");
shared->skipping_index_cache = std::make_shared<SkippingIndexCache>(cache_policy, max_size_in_bytes, max_entries, size_ratio);
}
void Context::updateSkippingIndexCacheConfiguration(const Poco::Util::AbstractConfiguration & config)
{
std::lock_guard lock(shared->mutex);
if (!shared->skipping_index_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Skipping index cache was not created yet.");
size_t max_size_in_bytes = config.getUInt64("skipping_index_cache_size", DEFAULT_SKIPPING_INDEX_CACHE_MAX_SIZE);
size_t max_entries = config.getUInt64("skipping_index_cache_max_entries", DEFAULT_SKIPPING_INDEX_CACHE_MAX_ENTRIES);
shared->skipping_index_cache->setMaxSizeInBytes(max_size_in_bytes);
shared->skipping_index_cache->setMaxCount(max_entries);
}
SkippingIndexCachePtr Context::getSkippingIndexCache() const
{
SharedLockGuard lock(shared->mutex);
return shared->skipping_index_cache;
}
void Context::clearSkippingIndexCache() const
{
std::lock_guard lock(shared->mutex);
if (shared->skipping_index_cache)
shared->skipping_index_cache->clear();
}
void Context::setMMappedFileCache(size_t max_cache_size_in_num_entries)
{
std::lock_guard lock(shared->mutex);
@ -3417,6 +3456,10 @@ void Context::clearCaches() const
throw Exception(ErrorCodes::LOGICAL_ERROR, "Index mark cache was not created yet.");
shared->index_mark_cache->clear();
if (!shared->skipping_index_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Skipping index cache was not created yet.");
shared->skipping_index_cache->clear();
if (!shared->mmap_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mmapped file cache was not created yet.");
shared->mmap_cache->clear();

View File

@ -92,6 +92,7 @@ class MarkCache;
class PageCache;
class MMappedFileCache;
class UncompressedCache;
class SkippingIndexCache;
class ProcessList;
class QueryStatus;
using QueryStatusPtr = std::shared_ptr<QueryStatus>;
@ -1086,6 +1087,11 @@ public:
std::shared_ptr<MarkCache> getIndexMarkCache() const;
void clearIndexMarkCache() const;
void setSkippingIndexCache(const String & cache_policy, size_t max_size_in_bytes, size_t max_entries, double size_ratio);
void updateSkippingIndexCacheConfiguration(const Poco::Util::AbstractConfiguration & config);
std::shared_ptr<SkippingIndexCache> getSkippingIndexCache() const;
void clearSkippingIndexCache() const;
void setMMappedFileCache(size_t max_cache_size_in_num_entries);
void updateMMappedFileCacheConfiguration(const Poco::Util::AbstractConfiguration & config);
std::shared_ptr<MMappedFileCache> getMMappedFileCache() const;

View File

@ -77,6 +77,17 @@ public:
const GinSegmentWithRowIdRangeVector & getFilter() const { return rowid_ranges; }
GinSegmentWithRowIdRangeVector & getFilter() { return rowid_ranges; }
size_t memoryUsageBytes() const
{
size_t term_memory = 0;
for (const auto & term : terms)
term_memory += term.capacity();
return query_string.capacity()
+ term_memory
+ (rowid_ranges.capacity() * sizeof(rowid_ranges[0]));
}
private:
/// Filter parameters
const GinFilterParameters & params;

View File

@ -387,6 +387,10 @@ BlockIO InterpreterSystemQuery::execute()
getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE);
system_context->clearIndexUncompressedCache();
break;
case Type::DROP_SKIPPING_INDEX_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_SKIPPING_INDEX_CACHE);
system_context->clearSkippingIndexCache();
break;
case Type::DROP_MMAP_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MMAP_CACHE);
system_context->clearMMappedFileCache();
@ -1357,6 +1361,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_UNCOMPRESSED_CACHE:
case Type::DROP_INDEX_MARK_CACHE:
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
case Type::DROP_SKIPPING_INDEX_CACHE:
case Type::DROP_FILESYSTEM_CACHE:
case Type::SYNC_FILESYSTEM_CACHE:
case Type::DROP_PAGE_CACHE:

View File

@ -411,6 +411,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s
case Type::DROP_INDEX_MARK_CACHE:
case Type::DROP_UNCOMPRESSED_CACHE:
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
case Type::DROP_SKIPPING_INDEX_CACHE:
case Type::DROP_COMPILED_EXPRESSION_CACHE:
case Type::DROP_S3_CLIENT_CACHE:
case Type::RESET_COVERAGE:

View File

@ -28,6 +28,7 @@ public:
DROP_UNCOMPRESSED_CACHE,
DROP_INDEX_MARK_CACHE,
DROP_INDEX_UNCOMPRESSED_CACHE,
DROP_SKIPPING_INDEX_CACHE,
DROP_MMAP_CACHE,
DROP_QUERY_CACHE,
DROP_COMPILED_EXPRESSION_CACHE,

View File

@ -661,10 +661,10 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
struct IndexStat
{
std::atomic<size_t> total_granules{0};
std::atomic<size_t> granules_dropped{0};
std::atomic<size_t> total_parts{0};
std::atomic<size_t> parts_dropped{0};
std::atomic<size_t> total_granules = 0;
std::atomic<size_t> granules_dropped = 0;
std::atomic<size_t> total_parts = 0;
std::atomic<size_t> parts_dropped = 0;
};
std::vector<IndexStat> useful_indices_stat(skip_indexes.useful_indices.size());
@ -677,6 +677,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
{
auto mark_cache = context->getIndexMarkCache();
auto uncompressed_cache = context->getIndexUncompressedCache();
auto skipping_index_cache = context->getSkippingIndexCache();
auto query_status = context->getProcessListElement();
@ -734,6 +735,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
reader_settings,
mark_cache.get(),
uncompressed_cache.get(),
skipping_index_cache.get(),
log);
stat.granules_dropped.fetch_add(total_granules - ranges.ranges.getNumberOfMarks(), std::memory_order_relaxed);
@ -755,7 +757,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
indices_and_condition.indices, indices_and_condition.condition,
part, ranges.ranges,
settings, reader_settings,
mark_cache.get(), uncompressed_cache.get(), log);
mark_cache.get(), uncompressed_cache.get(), skipping_index_cache.get(), log);
stat.total_granules.fetch_add(total_granules, std::memory_order_relaxed);
stat.granules_dropped.fetch_add(total_granules - ranges.ranges.getNumberOfMarks(), std::memory_order_relaxed);
@ -1364,6 +1366,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
const MergeTreeReaderSettings & reader_settings,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
SkippingIndexCache * skipping_index_cache,
LoggerPtr log)
{
if (!index_helper->getDeserializedFormat(part->getDataPartStorage(), index_helper->getFileName()))
@ -1399,6 +1402,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
index_ranges,
mark_cache,
uncompressed_cache,
skipping_index_cache,
reader_settings);
MarkRanges res;
@ -1416,13 +1420,10 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
{
const MarkRange & index_range = index_ranges[i];
if (last_index_mark != index_range.begin || !granule)
reader.seek(index_range.begin);
for (size_t index_mark = index_range.begin; index_mark < index_range.end; ++index_mark)
{
if (index_mark != index_range.begin || !granule || last_index_mark != index_range.begin)
reader.read(granule);
granule = reader.read(index_mark);
if (index_helper->isVectorSimilarityIndex())
{
@ -1484,6 +1485,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingMergedIndex(
const MergeTreeReaderSettings & reader_settings,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
SkippingIndexCache * skipping_index_cache,
LoggerPtr log)
{
for (const auto & index_helper : indices)
@ -1517,6 +1519,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingMergedIndex(
ranges,
mark_cache,
uncompressed_cache,
skipping_index_cache,
reader_settings));
}
@ -1533,17 +1536,13 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingMergedIndex(
range.begin / index_granularity,
(range.end + index_granularity - 1) / index_granularity);
if (last_index_mark != index_range.begin || !granules_filled)
for (auto & reader : readers)
reader->seek(index_range.begin);
for (size_t index_mark = index_range.begin; index_mark < index_range.end; ++index_mark)
{
if (index_mark != index_range.begin || !granules_filled || last_index_mark != index_range.begin)
{
for (size_t i = 0; i < readers.size(); ++i)
{
readers[i]->read(granules[i]);
granules[i] = readers[i]->read(index_mark);
granules_filled = true;
}
}

View File

@ -94,6 +94,7 @@ private:
const MergeTreeReaderSettings & reader_settings,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
SkippingIndexCache * skipping_index_cache,
LoggerPtr log);
static MarkRanges filterMarksUsingMergedIndex(
@ -105,6 +106,7 @@ private:
const MergeTreeReaderSettings & reader_settings,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
SkippingIndexCache * skipping_index_cache,
LoggerPtr log);
struct PartFilterCounters

View File

@ -85,6 +85,14 @@ bool MergeTreeIndexGranuleBloomFilter::empty() const
return !total_rows;
}
size_t MergeTreeIndexGranuleBloomFilter::memoryUsageBytes() const
{
size_t sum = 0;
for (const auto & bloom_filter : bloom_filters)
sum += bloom_filter->memoryUsageBytes();
return sum;
}
void MergeTreeIndexGranuleBloomFilter::deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version)
{
if (version != 1)

View File

@ -23,6 +23,8 @@ public:
bool empty() const override;
size_t memoryUsageBytes() const override;
void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version) override;

View File

@ -66,6 +66,15 @@ void MergeTreeIndexGranuleBloomFilterText::deserializeBinary(ReadBuffer & istr,
}
size_t MergeTreeIndexGranuleBloomFilterText::memoryUsageBytes() const
{
size_t sum = 0;
for (const auto & bloom_filter : bloom_filters)
sum += bloom_filter.memoryUsageBytes();
return sum;
}
MergeTreeIndexAggregatorBloomFilterText::MergeTreeIndexAggregatorBloomFilterText(
const Names & index_columns_,
const String & index_name_,

View File

@ -25,6 +25,8 @@ struct MergeTreeIndexGranuleBloomFilterText final : public IMergeTreeIndexGranul
bool empty() const override { return !has_elems; }
size_t memoryUsageBytes() const override;
const String index_name;
const BloomFilterParameters params;

View File

@ -86,6 +86,15 @@ void MergeTreeIndexGranuleFullText::deserializeBinary(ReadBuffer & istr, MergeTr
}
size_t MergeTreeIndexGranuleFullText::memoryUsageBytes() const
{
size_t sum = 0;
for (const auto & gin_filter : gin_filters)
sum += gin_filter.memoryUsageBytes();
return sum;
}
MergeTreeIndexAggregatorFullText::MergeTreeIndexAggregatorFullText(
GinIndexStorePtr store_,
const Names & index_columns_,

View File

@ -23,6 +23,8 @@ struct MergeTreeIndexGranuleFullText final : public IMergeTreeIndexGranule
bool empty() const override { return !has_elems; }
size_t memoryUsageBytes() const override;
const String index_name;
const GinFilterParameters params;
GinFilters gin_filters;

View File

@ -23,6 +23,8 @@ public:
bool empty() const override { return is_empty; }
size_t memoryUsageBytes() const override { return sizeof(*this); }
~MergeTreeIndexGranuleHypothesis() override = default;
const String & index_name;

View File

@ -23,6 +23,8 @@ struct MergeTreeIndexGranuleMinMax final : public IMergeTreeIndexGranule
bool empty() const override { return hyperrectangle.empty(); }
size_t memoryUsageBytes() const override { return hyperrectangle.capacity() * sizeof(Range); }
const String index_name;
const Block index_sample_block;

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/MergeTreeIndexReader.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Storages/MergeTree/SkippingIndexCache.h>
namespace
{
@ -51,40 +52,62 @@ MergeTreeIndexReader::MergeTreeIndexReader(
MergeTreeData::DataPartPtr part_,
size_t marks_count_,
const MarkRanges & all_mark_ranges_,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
MergeTreeReaderSettings settings)
MarkCache * mark_cache_,
UncompressedCache * uncompressed_cache_,
SkippingIndexCache * skipping_index_cache_,
MergeTreeReaderSettings settings_)
: index(index_)
, part(std::move(part_))
, marks_count(marks_count_)
, all_mark_ranges(all_mark_ranges_)
, mark_cache(mark_cache_)
, uncompressed_cache(uncompressed_cache_)
, skipping_index_cache(skipping_index_cache_)
, settings(std::move(settings_))
{
auto index_format = index->getDeserializedFormat(part_->getDataPartStorage(), index->getFileName());
}
void MergeTreeIndexReader::initStreamIfNeeded()
{
if (stream)
return;
auto index_format = index->getDeserializedFormat(part->getDataPartStorage(), index->getFileName());
stream = makeIndexReader(
index_format.extension,
index_,
part_,
marks_count_,
all_mark_ranges_,
index,
part,
marks_count,
all_mark_ranges,
mark_cache,
uncompressed_cache,
std::move(settings));
version = index_format.version;
stream->adjustRightMark(getLastMark(all_mark_ranges_));
stream->adjustRightMark(getLastMark(all_mark_ranges));
stream->seekToStart();
}
void MergeTreeIndexReader::seek(size_t mark)
MergeTreeIndexGranulePtr MergeTreeIndexReader::read(size_t mark)
{
stream->seekToMark(mark);
}
auto load_func = [&] {
initStreamIfNeeded();
if (stream_mark != mark)
stream->seekToMark(mark);
void MergeTreeIndexReader::read(MergeTreeIndexGranulePtr & granule)
{
if (granule == nullptr)
granule = index->createIndexGranule();
auto granule = index->createIndexGranule();
granule->deserializeBinary(*stream->getDataBuffer(), version);
stream_mark = mark + 1;
return granule;
};
granule->deserializeBinary(*stream->getDataBuffer(), version);
UInt128 key = SkippingIndexCache::hash(
part->getDataPartStorage().getFullPath(),
index->getFileName(),
mark);
return skipping_index_cache->getOrSet(key, load_func);
}
}

View File

@ -18,16 +18,26 @@ public:
const MarkRanges & all_mark_ranges_,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
MergeTreeReaderSettings settings);
SkippingIndexCache * skipping_index_cache,
MergeTreeReaderSettings settings_);
void seek(size_t mark);
void read(MergeTreeIndexGranulePtr & granule);
MergeTreeIndexGranulePtr read(size_t mark);
private:
MergeTreeIndexPtr index;
MergeTreeData::DataPartPtr part;
size_t marks_count;
const MarkRanges & all_mark_ranges;
MarkCache * mark_cache;
UncompressedCache * uncompressed_cache;
SkippingIndexCache * skipping_index_cache;
MergeTreeReaderSettings settings;
std::unique_ptr<MergeTreeReaderStream> stream;
uint8_t version = 0;
size_t stream_mark = 0;
void initStreamIfNeeded();
};
}

View File

@ -31,6 +31,8 @@ struct MergeTreeIndexGranuleSet final : public IMergeTreeIndexGranule
size_t size() const { return block.rows(); }
bool empty() const override { return !size(); }
size_t memoryUsageBytes() const override { return block.bytes() + (set_hyperrectangle.capacity() * sizeof(Range)); }
~MergeTreeIndexGranuleSet() override = default;
const String index_name;

View File

@ -176,6 +176,12 @@ String USearchIndexWithSerialization::Statistics::toString() const
max_level, connectivity, size, capacity, ReadableSize(memory_usage), bytes_per_vector, scalar_words, nodes, edges, max_edges);
}
size_t USearchIndexWithSerialization::memoryUsageBytes() const
{
return Base::memory_usage();
}
MergeTreeIndexGranuleVectorSimilarity::MergeTreeIndexGranuleVectorSimilarity(
const String & index_name_,
unum::usearch::metric_kind_t metric_kind_,

View File

@ -60,6 +60,8 @@ public:
};
Statistics getStatistics() const;
size_t memoryUsageBytes() const;
};
using USearchIndexWithSerializationPtr = std::shared_ptr<USearchIndexWithSerialization>;
@ -87,6 +89,8 @@ struct MergeTreeIndexGranuleVectorSimilarity final : public IMergeTreeIndexGranu
bool empty() const override { return !index || index->size() == 0; }
size_t memoryUsageBytes() const override { return index->memoryUsageBytes(); }
const String index_name;
const unum::usearch::metric_kind_t metric_kind;
const unum::usearch::scalar_kind_t scalar_kind;

View File

@ -62,6 +62,9 @@ struct IMergeTreeIndexGranule
virtual void deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version) = 0;
virtual bool empty() const = 0;
/// The in-memory size of the granule. Not expected to be 100% accurate.
virtual size_t memoryUsageBytes() const = 0;
};
using MergeTreeIndexGranulePtr = std::shared_ptr<IMergeTreeIndexGranule>;

View File

@ -0,0 +1,103 @@
#pragma once
#include <Common/CacheBase.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
namespace ProfileEvents
{
extern const Event SkippingIndexCacheMisses;
extern const Event SkippingIndexCacheHits;
extern const Event SkippingIndexCacheWeightLost;
}
namespace CurrentMetrics
{
extern const Metric SkippingIndexCacheSize;
}
namespace DB
{
struct SkippingIndexCacheCell
{
/// memoryUsageBytes() gives only approximate results ... adding some excess bytes should make it less bad
static constexpr auto ENTRY_OVERHEAD_BYTES_GUESS = 200uz;
MergeTreeIndexGranulePtr granule;
size_t memory_bytes;
explicit SkippingIndexCacheCell(MergeTreeIndexGranulePtr granule_)
: granule(std::move(granule_))
, memory_bytes(granule->memoryUsageBytes() + ENTRY_OVERHEAD_BYTES_GUESS)
{
CurrentMetrics::add(CurrentMetrics::SkippingIndexCacheSize, memory_bytes);
}
~SkippingIndexCacheCell()
{
CurrentMetrics::sub(CurrentMetrics::SkippingIndexCacheSize, memory_bytes);
}
SkippingIndexCacheCell(const SkippingIndexCacheCell &) = delete;
SkippingIndexCacheCell & operator=(const SkippingIndexCacheCell &) = delete;
};
struct SkippingIndexCacheWeightFunction
{
size_t operator()(const SkippingIndexCacheCell & cell) const
{
return cell.memory_bytes;
}
};
/// Cache of deserialized skipping index granules.
class SkippingIndexCache : public CacheBase<UInt128, SkippingIndexCacheCell, UInt128TrivialHash, SkippingIndexCacheWeightFunction>
{
public:
using Base = CacheBase<UInt128, SkippingIndexCacheCell, UInt128TrivialHash, SkippingIndexCacheWeightFunction>;
SkippingIndexCache(const String & cache_policy, size_t max_size_in_bytes, size_t max_count, double size_ratio)
: Base(cache_policy, max_size_in_bytes, max_count, size_ratio)
{}
static UInt128 hash(const String & path_to_data_part, const String & index_name, size_t index_mark)
{
SipHash hash;
hash.update(path_to_data_part.data(), path_to_data_part.size() + 1);
hash.update(index_name.data(), index_name.size() + 1);
hash.update(index_mark);
return hash.get128();
}
/// LoadFunc should have signature () -> MergeTreeIndexGranulePtr.
template <typename LoadFunc>
MergeTreeIndexGranulePtr getOrSet(const Key & key, LoadFunc && load)
{
auto wrapped_load = [&]() -> std::shared_ptr<SkippingIndexCacheCell> {
MergeTreeIndexGranulePtr granule = load();
return std::make_shared<SkippingIndexCacheCell>(std::move(granule));
};
auto result = Base::getOrSet(key, wrapped_load);
if (result.second)
ProfileEvents::increment(ProfileEvents::SkippingIndexCacheMisses);
else
ProfileEvents::increment(ProfileEvents::SkippingIndexCacheHits);
return result.first->granule;
}
private:
void onRemoveOverflowWeightLoss(size_t weight_loss) override
{
ProfileEvents::increment(ProfileEvents::SkippingIndexCacheWeightLost, weight_loss);
}
};
using SkippingIndexCachePtr = std::shared_ptr<SkippingIndexCache>;
}

View File

@ -226,6 +226,26 @@ template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConf
#endif
template class TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
#if USE_AVRO && USE_AWS_S3
template class TableFunctionObjectStorage<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
#endif
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
template class TableFunctionObjectStorage<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
#endif
#if USE_AVRO && USE_HDFS
template class TableFunctionObjectStorage<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
#endif
#if USE_PARQUET && USE_AWS_S3
template class TableFunctionObjectStorage<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
#endif
#if USE_AWS_S3
template class TableFunctionObjectStorage<HudiClusterDefinition, StorageS3HudiConfiguration>;
#endif
#if USE_AVRO
void registerTableFunctionIceberg(TableFunctionFactory & factory)
{

View File

@ -96,7 +96,7 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
{
.documentation = {
.description=R"(The table function can be used to read the data stored on HDFS in parallel for many nodes in a specified cluster.)",
.examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster_name, uri, format)", ""}}},
.examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster, uri, format)", ""}}},
.allow_readonly = false
}
);
@ -105,15 +105,77 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
UNUSED(factory);
}
#if USE_AVRO
void registerTableFunctionIcebergCluster(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AWS_S3
template class TableFunctionObjectStorageCluster<S3ClusterDefinition, StorageS3Configuration>;
factory.registerFunction<TableFunctionIcebergS3Cluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster.)",
.examples{{"icebergS3Cluster", "SELECT * FROM icebergS3Cluster(cluster, url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
#if USE_AZURE_BLOB_STORAGE
template class TableFunctionObjectStorageCluster<AzureClusterDefinition, StorageAzureConfiguration>;
factory.registerFunction<TableFunctionIcebergAzureCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster.)",
.examples{{"icebergAzureCluster", "SELECT * FROM icebergAzureCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
#if USE_HDFS
template class TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
factory.registerFunction<TableFunctionIcebergHDFSCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster.)",
.examples{{"icebergHDFSCluster", "SELECT * FROM icebergHDFSCluster(cluster, uri, [format], [structure], [compression_method])", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
}
#endif
#if USE_AWS_S3
#if USE_PARQUET
void registerTableFunctionDeltaLakeCluster(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionDeltaLakeCluster>(
{.documentation
= {.description = R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster.)",
.examples{{"deltaLakeCluster", "SELECT * FROM deltaLakeCluster(cluster, url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
void registerTableFunctionHudiCluster(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionHudiCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Hudi table stored on object store in parallel for many nodes in a specified cluster.)",
.examples{{"hudiCluster", "SELECT * FROM hudiCluster(cluster, url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AVRO
registerTableFunctionIcebergCluster(factory);
#endif
#if USE_AWS_S3
#if USE_PARQUET
registerTableFunctionDeltaLakeCluster(factory);
#endif
registerTableFunctionHudiCluster(factory);
#endif
}
}

View File

@ -33,6 +33,36 @@ struct HDFSClusterDefinition
static constexpr auto storage_type_name = "HDFSCluster";
};
struct IcebergS3ClusterDefinition
{
static constexpr auto name = "icebergS3Cluster";
static constexpr auto storage_type_name = "IcebergS3Cluster";
};
struct IcebergAzureClusterDefinition
{
static constexpr auto name = "icebergAzureCluster";
static constexpr auto storage_type_name = "IcebergAzureCluster";
};
struct IcebergHDFSClusterDefinition
{
static constexpr auto name = "icebergHDFSCluster";
static constexpr auto storage_type_name = "IcebergHDFSCluster";
};
struct DeltaLakeClusterDefinition
{
static constexpr auto name = "deltaLakeCluster";
static constexpr auto storage_type_name = "DeltaLakeS3Cluster";
};
struct HudiClusterDefinition
{
static constexpr auto name = "hudiCluster";
static constexpr auto storage_type_name = "HudiS3Cluster";
};
/**
* Class implementing s3/hdfs/azureBlobStorageCluster(...) table functions,
* which allow to process many files from S3/HDFS/Azure blob storage on a specific cluster.
@ -79,4 +109,25 @@ using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster<AzureClu
#if USE_HDFS
using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
#endif
#if USE_AVRO && USE_AWS_S3
using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
#endif
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
using TableFunctionIcebergAzureCluster = TableFunctionObjectStorageCluster<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
#endif
#if USE_AVRO && USE_HDFS
using TableFunctionIcebergHDFSCluster = TableFunctionObjectStorageCluster<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
#endif
#if USE_AWS_S3 && USE_PARQUET
using TableFunctionDeltaLakeCluster = TableFunctionObjectStorageCluster<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
#endif
#if USE_AWS_S3
using TableFunctionHudiCluster = TableFunctionObjectStorageCluster<HudiClusterDefinition, StorageS3HudiConfiguration>;
#endif
}

View File

@ -66,6 +66,7 @@ void registerTableFunctions(bool use_legacy_mongodb_integration [[maybe_unused]]
registerTableFunctionObjectStorage(factory);
registerTableFunctionObjectStorageCluster(factory);
registerDataLakeTableFunctions(factory);
registerDataLakeClusterTableFunctions(factory);
}
}

View File

@ -70,6 +70,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory);
void registerTableFunctionObjectStorage(TableFunctionFactory & factory);
void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory);
void registerDataLakeTableFunctions(TableFunctionFactory & factory);
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory);
void registerTableFunctionTimeSeries(TableFunctionFactory & factory);

View File

@ -0,0 +1,20 @@
<clickhouse>
<remote_servers>
<cluster_simple>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</cluster_simple>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,6 @@
<clickhouse>
<query_log>
<database>system</database>
<table>query_log</table>
</query_log>
</clickhouse>

View File

@ -73,14 +73,38 @@ def started_cluster():
cluster.add_instance(
"node1",
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml",
],
user_configs=["configs/users.d/users.xml"],
with_minio=True,
with_azurite=True,
stay_alive=True,
with_hdfs=with_hdfs,
stay_alive=True,
)
cluster.add_instance(
"node2",
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml",
],
user_configs=["configs/users.d/users.xml"],
stay_alive=True,
)
cluster.add_instance(
"node3",
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml",
],
user_configs=["configs/users.d/users.xml"],
stay_alive=True,
)
logging.info("Starting cluster...")
@ -182,6 +206,7 @@ def get_creation_expression(
cluster,
format="Parquet",
table_function=False,
run_on_cluster=False,
**kwargs,
):
if storage_type == "s3":
@ -189,35 +214,56 @@ def get_creation_expression(
bucket = kwargs["bucket"]
else:
bucket = cluster.minio_bucket
print(bucket)
if table_function:
return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
if run_on_cluster:
assert table_function
return f"icebergS3Cluster('cluster_simple', s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
if table_function:
return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
elif storage_type == "azure":
if table_function:
if run_on_cluster:
assert table_function
return f"""
icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
icebergAzureCluster('cluster_simple', azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
if table_function:
return f"""
icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
elif storage_type == "hdfs":
if table_function:
if run_on_cluster:
assert table_function
return f"""
icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
icebergHDFSCluster('cluster_simple', hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');"""
if table_function:
return f"""
icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');"""
elif storage_type == "local":
assert not run_on_cluster
if table_function:
return f"""
icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format})
@ -227,6 +273,7 @@ def get_creation_expression(
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});"""
else:
raise Exception(f"Unknown iceberg storage type: {storage_type}")
@ -492,6 +539,108 @@ def test_types(started_cluster, format_version, storage_type):
)
@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs"])
def test_cluster_table_function(started_cluster, format_version, storage_type):
if is_arm() and storage_type == "hdfs":
pytest.skip("Disabled test IcebergHDFS for aarch64")
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
TABLE_NAME = (
"test_iceberg_cluster_"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)
def add_df(mode):
write_iceberg_from_df(
spark,
generate_data(spark, 0, 100),
TABLE_NAME,
mode=mode,
format_version=format_version,
)
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
logging.info(f"Adding another dataframe. result files: {files}")
return files
files = add_df(mode="overwrite")
for i in range(1, len(started_cluster.instances)):
files = add_df(mode="append")
logging.info(f"Setup complete. files: {files}")
assert len(files) == 5 + 4 * (len(started_cluster.instances) - 1)
clusters = instance.query(f"SELECT * FROM system.clusters")
logging.info(f"Clusters setup: {clusters}")
# Regular Query only node1
table_function_expr = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True
)
select_regular = (
instance.query(f"SELECT * FROM {table_function_expr}").strip().split()
)
# Cluster Query with node1 as coordinator
table_function_expr_cluster = get_creation_expression(
storage_type,
TABLE_NAME,
started_cluster,
table_function=True,
run_on_cluster=True,
)
select_cluster = (
instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split()
)
# Simple size check
assert len(select_regular) == 600
assert len(select_cluster) == 600
# Actual check
assert select_cluster == select_regular
# Check query_log
for replica in started_cluster.instances.values():
replica.query("SYSTEM FLUSH LOGS")
for node_name, replica in started_cluster.instances.items():
cluster_secondary_queries = (
replica.query(
f"""
SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
WHERE
type = 'QueryStart' AND
positionCaseInsensitive(query, '{storage_type}Cluster') != 0 AND
position(query, '{TABLE_NAME}') != 0 AND
position(query, 'system.query_log') = 0 AND
NOT is_initial_query
"""
)
.strip()
.split("\n")
)
logging.info(
f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}"
)
assert len(cluster_secondary_queries) == 1
@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"])
def test_delete_files(started_cluster, format_version, storage_type):

View File

@ -115,6 +115,7 @@ SYSTEM DROP CONNECTIONS CACHE ['SYSTEM DROP CONNECTIONS CACHE','DROP CONNECTIONS
SYSTEM PREWARM MARK CACHE ['SYSTEM PREWARM MARK','PREWARM MARK CACHE','PREWARM MARKS'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP MARK CACHE ['SYSTEM DROP MARK','DROP MARK CACHE','DROP MARKS'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP UNCOMPRESSED CACHE ['SYSTEM DROP UNCOMPRESSED','DROP UNCOMPRESSED CACHE','DROP UNCOMPRESSED'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP SKIPPING INDEX CACHE ['SYSTEM DROP SKIPPING INDEX CACHE','DROP SKIPPING INDEX CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP MMAP CACHE ['SYSTEM DROP MMAP','DROP MMAP CACHE','DROP MMAP'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP QUERY CACHE ['SYSTEM DROP QUERY','DROP QUERY CACHE','DROP QUERY'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP COMPILED EXPRESSION CACHE ['SYSTEM DROP COMPILED EXPRESSION','DROP COMPILED EXPRESSION CACHE','DROP COMPILED EXPRESSIONS'] GLOBAL SYSTEM DROP CACHE

View File

@ -0,0 +1,7 @@
SkippingIndexCacheSize 0
SkippingIndexCacheSize 0
0
SkippingIndexCacheSize 1 1
1
SELECT count() FROM tab where val = 25; 0 10
SELECT count() FROM tab WHERE val = 30; 10 0

View File

@ -0,0 +1,25 @@
-- Tags: no-parallel
-- no-parallel: looks at server-wide metrics
-- Tests the skipping index cache.
SYSTEM DROP SKIPPING INDEX CACHE;
SELECT metric, value FROM system.metrics WHERE metric = 'SkippingIndexCacheSize';
DROP TABLE IF EXISTS tab;
CREATE TABLE tab (id Int64, val Int64, INDEX idx val TYPE set(0) GRANULARITY 1) ENGINE = MergeTree ORDER BY id SETTINGS index_granularity = 128;
INSERT INTO tab SELECT number, number * 10 FROM numbers(1280);
SELECT metric, value FROM system.metrics WHERE metric = 'SkippingIndexCacheSize';
SELECT count() FROM tab where val = 25;
SELECT metric, value >= 1280 * 8, value < 1280 * 8 * 2 FROM system.metrics WHERE metric = 'SkippingIndexCacheSize';
SELECT count() FROM tab WHERE val = 30;
SELECT metric, value >= 1280 * 8, value < 1280 * 8 * 2 FROM system.metrics WHERE metric = 'SecondaryIndexCacheSize';
SYSTEM FLUSH LOGS;
SELECT query, ProfileEvents['SkippingIndexCacheHits'], ProfileEvents['SkippingIndexCacheMisses']
FROM system.query_log
WHERE event_date >= yesterday() AND current_database = currentDatabase() AND type = 'QueryFinish' AND query LIKE 'SELECT count()%'
ORDER BY event_time_microseconds;

View File

@ -1,3 +1,6 @@
SELECT arrayWithConstant(96142475, ['qMUF']); -- { serverError TOO_LARGE_ARRAY_SIZE }
SELECT arrayWithConstant(100000000, materialize([[[[[[[[[['Hello, world!']]]]]]]]]])); -- { serverError TOO_LARGE_ARRAY_SIZE }
SELECT length(arrayWithConstant(10000000, materialize([[[[[[[[[['Hello world']]]]]]]]]])));
CREATE TEMPORARY TABLE args (value Array(Int)) ENGINE=Memory AS SELECT [1, 1, 1, 1] as value FROM numbers(1, 100);
SELECT length(arrayWithConstant(1000000, value)) FROM args FORMAT NULL;

View File

@ -244,7 +244,10 @@ Deduplication
DefaultTableEngine
DelayedInserts
DeliveryTag
Deltalake
DeltaLake
deltalakeCluster
deltaLakeCluster
Denormalize
DestroyAggregatesThreads
DestroyAggregatesThreadsActive
@ -377,10 +380,15 @@ Homebrew's
HorizontalDivide
Hostname
HouseOps
hudi
Hudi
hudiCluster
HudiCluster
HyperLogLog
Hypot
IANA
icebergCluster
IcebergCluster
IDE
IDEs
IDNA