Compare commits

...

55 Commits

Author SHA1 Message Date
zhanglistar
c84c2b980d
Merge 21b52c95d2 into 44b4bd38b9 2024-11-21 01:08:44 +00:00
zhanglistar
21b52c95d2 fix function doc check 2024-11-21 09:08:02 +08:00
Mikhail Artemenko
44b4bd38b9
Merge pull request #72045 from ClickHouse/issues/70174/cluster_versions
Enable cluster table functions for DataLake Storages
2024-11-20 21:22:37 +00:00
Shichao Jin
40c7d5fd1a
Merge pull request #71894 from udiz/fix-arrayWithConstant-size-estimation
Fix: arrayWithConstant size estimation using row's element size
2024-11-20 19:56:27 +00:00
Mikhail Artemenko
4ccebd9a24 fix syntax for iceberg in docs 2024-11-20 11:15:39 +00:00
Mikhail Artemenko
99177c0daf remove icebergCluster alias 2024-11-20 11:15:12 +00:00
zhanglistar
0b6cf8e5ee change name of tuple vector functions 2024-11-20 18:14:23 +08:00
zhanglistar
1d52b1f020 Merge branch 'div-zero' of https://github.com/bigo-sg/ClickHouse into div-zero 2024-11-20 17:32:04 +08:00
zhanglistar
d922427270 prepare merge master 2024-11-20 17:31:39 +08:00
zhanglistar
52d85ca77f
Merge branch 'master' into div-zero 2024-11-20 17:28:12 +08:00
Mikhail Artemenko
0951991c1d update aspell-dict.txt 2024-11-19 13:10:42 +00:00
Mikhail Artemenko
19aec5e572 Merge branch 'issues/70174/cluster_versions' of github.com:ClickHouse/ClickHouse into issues/70174/cluster_versions 2024-11-19 12:51:56 +00:00
Mikhail Artemenko
a367de9977 add docs 2024-11-19 12:49:59 +00:00
Mikhail Artemenko
6894e280b2 fix pr issues 2024-11-19 12:34:42 +00:00
Mikhail Artemenko
39ebe113d9 Merge branch 'master' into issues/70174/cluster_versions 2024-11-19 11:28:46 +00:00
udiz
239bbaa133 use length 2024-11-19 00:00:43 +00:00
udiz
07fac5808d format null on test 2024-11-18 23:08:48 +00:00
udiz
ed95e0781f test uses less memory 2024-11-18 22:48:38 +00:00
robot-clickhouse
014608fb6b Automatic style fix 2024-11-18 17:51:51 +00:00
Mikhail Artemenko
a29ded4941 add test for iceberg 2024-11-18 17:39:46 +00:00
Mikhail Artemenko
d2efae7511 enable cluster versions for datalake storages 2024-11-18 17:35:21 +00:00
udiz
6879aa130a newline 2024-11-13 22:47:54 +00:00
udiz
43f3c886a2 add test 2024-11-13 22:46:36 +00:00
udiz
c383a743f7 arrayWithConstant size estimation using single value size 2024-11-13 20:02:31 +00:00
zhanglistar
6765941f8c
Merge branch 'ClickHouse:master' into div-zero 2024-11-11 11:13:59 +08:00
zhanglistar
1b4ba7b9ff fix fuzzer fail 2024-11-05 09:07:09 +08:00
zhanglistar
439edf6297
Merge branch 'ClickHouse:master' into div-zero 2024-11-05 09:06:13 +08:00
zhanglistar
50a22196a2 fix msan fail and clean code 2024-11-01 17:15:36 +08:00
zhanglistar
aa18de924b fix doc check fail 2024-10-29 10:42:20 +08:00
zhanglistar
d3941df46a fix style check 2024-10-28 15:51:22 +08:00
zhanglistar
e3201464c1 fix failures 2024-10-28 15:44:09 +08:00
zhanglistar
10597ffe62 fix style check 2024-10-25 10:13:22 +08:00
zhanglistar
fb505a84ba
Merge branch 'ClickHouse:master' into div-zero 2024-10-25 09:27:49 +08:00
zhanglistar
5b841d994e clean code 2024-10-15 12:28:01 +08:00
zhanglistar
0410f38dec clean code 2024-10-10 17:09:16 +08:00
zhanglistar
40136b1ac4 clean code 2024-10-10 17:03:50 +08:00
zhanglistar
549e7865d3 reset some code 2024-10-10 16:41:00 +08:00
zhanglistar
708efabd6b delete debug code 2024-10-10 16:13:34 +08:00
zhanglistar
2cf83d2cee refactor code2 2024-10-10 16:10:14 +08:00
zhanglistar
4a1535cc37 rafactor code 2024-10-10 08:55:22 +08:00
zhanglistar
d58080742a clean code 2024-10-09 09:15:35 +08:00
zhanglistar
e223d7a642
Merge branch 'ClickHouse:master' into div-zero 2024-10-08 11:29:44 +08:00
zhanglistar
90bdf2d770 modify doc 2024-09-25 12:22:48 +08:00
zhanglistar
10987107bd make divideOrNull and moduloOrNull result Nullable 2024-09-25 12:20:31 +08:00
zhanglistar
a4ec4bab79 fix tests 2024-09-25 10:59:10 +08:00
zhanglistar
0e4710deb8 delete useless code 2024-09-24 10:58:43 +08:00
zhanglistar
8f9bad0d03 add tests 2024-09-24 10:22:48 +08:00
zhanglistar
68bba802ab add doc 2024-09-24 10:00:57 +08:00
zhanglistar
671b826805 Merge branch 'master' of https://github.com/bigo-sg/ClickHouse into div-zero 2024-09-24 09:45:44 +08:00
zhanglistar
9b562ea3b7 add divOrNull and tests 2024-09-24 09:44:57 +08:00
zhanglistar
06a4e7908b add moduloOrNull 2024-09-24 09:41:37 +08:00
zhanglistar
f60acdf2cf delete blank 2024-09-18 17:36:06 +08:00
zhanglistar
6d545acf30 revert useless code. 2024-09-18 17:35:22 +08:00
zhanglistar
3891f6c936 fix file format 2024-09-18 16:42:47 +08:00
zhanglistar
ac77203725 make NULL if result is Nullable when div by zero 2024-09-18 16:33:04 +08:00
35 changed files with 1429 additions and 142 deletions

View File

@ -89,6 +89,14 @@ divide(a, b)
Alias: `a / b` (operator)
## divideOrNull
Like [divide](#divide) but returns NULL when the divisor is zero.
**Syntax**
```sql
divideOrNull(a, b)
```
## intDiv
Performs an integer division of two values `a` by `b`, i.e. computes the quotient rounded down to the next smallest integer.
@ -264,6 +272,15 @@ Result:
└────────────────────────┘
```
## moduloOrNull
Like [modulo](#modulo) but returns NULL when the divisor is zero.
**Syntax**
```sql
moduloOrNull(a, b)
```
## negate
Negates a value `a`. The result is always signed.

View File

@ -457,6 +457,41 @@ Result:
└─────────────────────────────┘
```
## tupleDivideOrNull
Like [tupleDivide](#tupleDivide), but division by zero will return `NULL`.
**Syntax**
```sql
tupleDivideOrNull(tuple1, tuple2)
```
**Arguments**
- `tuple1` — First tuple. [Tuple](../data-types/tuple.md).
- `tuple2` — Second tuple. [Tuple](../data-types/tuple.md).
**Returned value**
- Tuple with the result of division. [Tuple](../data-types/tuple.md).
**Example**
Query:
```sql
SELECT tupleDivideOrNull((1, 2, 3), (2, 3, 0));
```
Result:
```text
┌─tupleDivideOrNull((1, 2, 3), (2, 3, 0))─┐
│ (0.5,0.6666666666666666,NULL) │
└─────────────────────────────────────────┘
```
## tupleNegate
Calculates the negation of the tuple values.
@ -561,6 +596,41 @@ Result:
└──────────────────────────────────┘
```
## tupleDivideByNumberOrNull
Like [tupleDivideByNumber](#tupleDivideByNumber), but division by zero will return `NULL`.
**Syntax**
```sql
tupleDivideByNumberOrNull(tuple, number)
```
**Arguments**
- `tuple` — [Tuple](../data-types/tuple.md).
- `number` — Divider. [Int/UInt](../data-types/int-uint.md), [Float](../data-types/float.md) or [Decimal](../data-types/decimal.md).
**Returned value**
- Tuple with divided values. [Tuple](../data-types/tuple.md).
**Example**
Query:
```sql
SELECT tupleDivideByNumberOrNull((1, 2), 0.0);
```
Result:
```text
┌─tupleDivideByNumberOrNull((1, 2), 0.)─┐
│ (NULL,NULL) │
└───────────────────────────────────────┘
```
## tupleConcat
Combines tuples passed as arguments.
@ -607,7 +677,7 @@ tupleIntDiv(tuple_num, tuple_div)
**Implementation details**
- If either `tuple_num` or `tuple_div` contain non-integer values then the result is calculated by rounding to the nearest integer for each non-integer numerator or divisor.
- An error will be thrown for division by 0.
- An error will be thrown for division by 0.
**Examples**
@ -641,7 +711,7 @@ Result:
## tupleIntDivOrZero
Like [tupleIntDiv](#tupleintdiv) it does integer division of a tuple of numerators and a tuple of denominators, and returns a tuple of the quotients. It does not throw an error for 0 divisors, but rather returns the quotient as 0.
Like [tupleIntDiv](#tupleIntDiv) it does integer division of a tuple of numerators and a tuple of denominators, and returns a tuple of the quotients. It does not throw an error for 0 divisors, but rather returns the quotient as 0.
**Syntax**
@ -699,7 +769,7 @@ tupleIntDivByNumber(tuple_num, div)
**Implementation details**
- If either `tuple_num` or `div` contain non-integer values then the result is calculated by rounding to the nearest integer for each non-integer numerator or divisor.
- An error will be thrown for division by 0.
- An error will be thrown for division by 0.
**Examples**
@ -821,6 +891,42 @@ Result:
└─────────────────────────────────────┘
```
## tupleModuloOrNull
Like [tupleModulo](#tuplemodulo) it returns a tuple of the moduli (remainders) of division operations of two tuples. It does not throw an error for division by zero, but rather returns `NULL`.
**Syntax**
```sql
tupleModuloOrNull(tuple_num, tuple_mod)
```
**Parameters**
- `tuple_num`: Tuple of numerator values. [Tuple](../data-types/tuple) of numeric type.
- `tuple_div`: Tuple of modulus values. [Tuple](../data-types/tuple) of numeric type.
**Returned value**
- Tuple of the remainders of division of `tuple_num` and `tuple_div`. [Tuple](../data-types/tuple) of non-zero integer values.
- NULL is returned for division by zero.
**Examples**
Query:
``` sql
SELECT tupleModuloOrNull((15, 10, 5), (0, 3, 2));
```
Result:
``` text
┌─tupleModuloOrNull((15, 10, 5), (0, 3, 2))─┐
│ (NULL,1,1) │
└───────────────────────────────────────────┘
```
## tupleModuloByNumber
Returns a tuple of the moduli (remainders) of division operations of a tuple and a given divisor.
@ -857,6 +963,42 @@ Result:
└─────────────────────────────────────┘
```
## tupleModuloByNumberOrNull
Like [tupleModuloByNumber](#tuplemodulobynumber) it returns a tuple of the moduli (remainders) of division operations of a tuple and a given divisor. It does not throw an error for division by zero, but rather returns `NULL`.
**Syntax**
```sql
tupleModuloByNumberOrNull(tuple_num, div)
```
**Parameters**
- `tuple_num`: Tuple of numerator values. [Tuple](../data-types/tuple) of numeric type.
- `div`: The divisor value. [Numeric](../data-types/int-uint.md) type.
**Returned value**
- Tuple of the remainders of division of `tuple_num` and `div`. [Tuple](../data-types/tuple) of non-zero integer values.
- NULL is thrown for division by zero.
**Examples**
Query:
``` sql
SELECT tupleModuloByNumberOrNull((15, 10, 5), 0);
```
Result:
``` text
┌─tupleModuloByNumberOrNull((15, 10, 5), 0)─┐
│ (NULL,NULL,NULL) │
└───────────────────────────────────────────┘
```
## flattenTuple
Returns a flattened `output` tuple from a nested named `input` tuple. Elements of the `output` tuple are the paths from the original `input` tuple. For instance: `Tuple(a Int, Tuple(b Int, c Int)) -> Tuple(a Int, b Int, c Int)`. `flattenTuple` can be used to select all paths from type `Object` as separate columns.

View File

@ -49,4 +49,4 @@ LIMIT 2
**See Also**
- [DeltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
- [DeltaLake cluster table function](/docs/en/sql-reference/table-functions/deltalakeCluster.md)

View File

@ -0,0 +1,30 @@
---
slug: /en/sql-reference/table-functions/deltalakeCluster
sidebar_position: 46
sidebar_label: deltaLakeCluster
title: "deltaLakeCluster Table Function"
---
This is an extension to the [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function.
Allows processing files from [Delta Lake](https://github.com/delta-io/delta) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
deltaLakeCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- Description of all other arguments coincides with description of arguments in equivalent [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function.
**Returned value**
A table with the specified structure for reading data from cluster in the specified Delta Lake table in S3.
**See Also**
- [deltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
- [deltaLake table function](/docs/en/sql-reference/table-functions/deltalake.md)

View File

@ -29,4 +29,4 @@ A table with the specified structure for reading data in the specified Hudi tabl
**See Also**
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
- [Hudi cluster table function](/docs/en/sql-reference/table-functions/hudiCluster.md)

View File

@ -0,0 +1,30 @@
---
slug: /en/sql-reference/table-functions/hudiCluster
sidebar_position: 86
sidebar_label: hudiCluster
title: "hudiCluster Table Function"
---
This is an extension to the [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function.
Allows processing files from Apache [Hudi](https://hudi.apache.org/) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
hudiCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- Description of all other arguments coincides with description of arguments in equivalent [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function.
**Returned value**
A table with the specified structure for reading data from cluster in the specified Hudi table in S3.
**See Also**
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
- [Hudi table function](/docs/en/sql-reference/table-functions/hudi.md)

View File

@ -72,3 +72,4 @@ Table function `iceberg` is an alias to `icebergS3` now.
**See Also**
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
- [Iceberg cluster table function](/docs/en/sql-reference/table-functions/icebergCluster.md)

View File

@ -0,0 +1,43 @@
---
slug: /en/sql-reference/table-functions/icebergCluster
sidebar_position: 91
sidebar_label: icebergCluster
title: "icebergCluster Table Function"
---
This is an extension to the [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function.
Allows processing files from Apache [Iceberg](https://iceberg.apache.org/) in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
icebergS3Cluster(cluster_name, url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method])
icebergS3Cluster(cluster_name, named_collection[, option=value [,..]])
icebergAzureCluster(cluster_name, connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method])
icebergAzureCluster(cluster_name, named_collection[, option=value [,..]])
icebergHDFSCluster(cluster_name, path_to_table, [,format] [,compression_method])
icebergHDFSCluster(cluster_name, named_collection[, option=value [,..]])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- Description of all other arguments coincides with description of arguments in equivalent [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function.
**Returned value**
A table with the specified structure for reading data from cluster in the specified Iceberg table.
**Examples**
```sql
SELECT * FROM icebergS3Cluster('cluster_simple', 'http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test')
```
**See Also**
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
- [Iceberg table function](/docs/en/sql-reference/table-functions/iceberg.md)

View File

@ -15,6 +15,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_DIVISION;
extern const int LOGICAL_ERROR;
}
template <typename A, typename B>
@ -26,8 +27,9 @@ inline void throwIfDivisionLeadsToFPE(A a, B b)
throw Exception(ErrorCodes::ILLEGAL_DIVISION, "Division by zero");
/// http://avva.livejournal.com/2548306.html
if (unlikely(is_signed_v<A> && is_signed_v<B> && a == std::numeric_limits<A>::min() && b == -1))
throw Exception(ErrorCodes::ILLEGAL_DIVISION, "Division of minimal signed number by minus one");
if constexpr (is_signed_v<A> && is_signed_v<B>)
if (unlikely(a == std::numeric_limits<A>::min() && b == -1))
throw Exception(ErrorCodes::ILLEGAL_DIVISION, "Division of minimal signed number by minus one");
}
template <typename A, typename B>
@ -36,8 +38,9 @@ inline bool divisionLeadsToFPE(A a, B b)
if (unlikely(b == 0))
return true;
if (unlikely(is_signed_v<A> && is_signed_v<B> && a == std::numeric_limits<A>::min() && b == -1))
return true;
if constexpr (is_signed_v<A> && is_signed_v<B>)
if (unlikely(a == std::numeric_limits<A>::min() && b == -1))
return true;
return false;
}
@ -68,7 +71,7 @@ struct DivideIntegralImpl
static const constexpr bool allow_string_integer = false;
template <typename Result = ResultType>
static Result apply(A a, B b)
static Result apply(A a, B b, NullMap::value_type * m [[maybe_unused]] = nullptr)
{
using CastA = std::conditional_t<is_big_int_v<B> && std::is_same_v<A, UInt8>, uint8_t, A>;
using CastB = std::conditional_t<is_big_int_v<A> && std::is_same_v<B, UInt8>, uint8_t, B>;
@ -120,7 +123,7 @@ struct ModuloImpl
static const constexpr bool allow_string_integer = false;
template <typename Result = ResultType>
static Result apply(A a, B b)
static Result apply(A a, B b, NullMap::value_type * m [[maybe_unused]] = nullptr)
{
if constexpr (is_floating_point<ResultType>)
{
@ -175,7 +178,7 @@ struct PositiveModuloImpl : ModuloImpl<A, B>
using ResultType = typename NumberTraits::ResultOfPositiveModulo<A, B>::Type;
template <typename Result = ResultType>
static Result apply(A a, B b)
static Result apply(A a, B b, NullMap::value_type * m [[maybe_unused]] = nullptr)
{
auto res = ModuloImpl<A, B>::template apply<OriginResultType>(a, b);
if constexpr (is_signed_v<A>)
@ -196,4 +199,29 @@ struct PositiveModuloImpl : ModuloImpl<A, B>
}
};
template <typename A, typename B>
struct DivideFloatingImpl
{
using ResultType = typename NumberTraits::ResultOfFloatingPointDivision<A, B>::Type;
static const constexpr bool allow_fixed_string = false;
static const constexpr bool allow_string_integer = false;
template <typename Result = ResultType>
static NO_SANITIZE_UNDEFINED Result apply(A a [[maybe_unused]], B b [[maybe_unused]], NullMap::value_type * m [[maybe_unused]] = nullptr)
{
return static_cast<Result>(a) / static_cast<Result>(b);
}
#if USE_EMBEDDED_COMPILER
static constexpr bool compilable = true;
static llvm::Value * compile(llvm::IRBuilder<> & b, llvm::Value * left, llvm::Value * right, bool)
{
if (left->getType()->isIntegerTy())
throw Exception(ErrorCodes::LOGICAL_ERROR, "DivideFloatingImpl expected a floating-point type");
return b.CreateFDiv(left, right);
}
#endif
};
}

View File

@ -277,7 +277,7 @@ struct BinaryOperation
static const constexpr bool allow_string_integer = false;
template <OpCase op_case>
static void NO_INLINE process(const A * __restrict a, const B * __restrict b, ResultType * __restrict c, size_t size, const NullMap * right_nullmap = nullptr)
static void NO_INLINE process(const A * __restrict a, const B * __restrict b, ResultType * __restrict c, size_t size, const NullMap * right_nullmap = nullptr, NullMap * res_nullmap [[maybe_unused]] = nullptr)
{
if constexpr (op_case == OpCase::RightConstant)
{
@ -305,6 +305,8 @@ struct BinaryOperation
static ResultType process(A a, B b) { return Op::template apply<ResultType>(a, b); }
static ResultType process(A a, B b, NullMap::value_type * m [[maybe_unused]] = nullptr) { return Op::template apply<ResultType>(a, b); }
private:
template <OpCase op_case>
static void apply(const A * __restrict a, const B * __restrict b, ResultType * __restrict c, size_t i)
@ -572,7 +574,7 @@ private:
public:
template <OpCase op_case, bool is_decimal_a, bool is_decimal_b>
static void NO_INLINE process(const auto & a, const auto & b, ResultContainerType & c,
NativeResultType scale_a, NativeResultType scale_b, const NullMap * right_nullmap = nullptr)
NativeResultType scale_a, NativeResultType scale_b, const NullMap * right_nullmap = nullptr, NullMap * res_nullmap = nullptr)
{
if constexpr (op_case == OpCase::LeftConstant) static_assert(!is_decimal<decltype(a)>);
if constexpr (op_case == OpCase::RightConstant) static_assert(!is_decimal<decltype(b)>);
@ -628,79 +630,188 @@ public:
}
else if constexpr (is_division && is_decimal_b)
{
processWithRightNullmapImpl<op_case>(a, b, c, size, right_nullmap, [&scale_a](const auto & left, const auto & right)
if (res_nullmap)
{
return applyScaledDiv<is_decimal_a>(
static_cast<NativeResultType>(left), right, scale_a);
});
if (right_nullmap)
res_nullmap->assign(*right_nullmap);
processWithRightNullmapImpl<true, op_case>(a, b, c, size, res_nullmap, [&scale_a](const auto & left, const auto & right)
{
return applyScaledDiv<is_decimal_a>(
static_cast<NativeResultType>(left), right, scale_a);
});
}
else
processWithRightNullmapImpl<false, op_case>(a, b, c, size, right_nullmap, [&scale_a](const auto & left, const auto & right)
{
return applyScaledDiv<is_decimal_a>(
static_cast<NativeResultType>(left), right, scale_a);
});
return;
}
processWithRightNullmapImpl<op_case>(
a, b, c, size, right_nullmap,
[](const auto & left, const auto & right)
{
return apply(
static_cast<NativeResultType>(left),
static_cast<NativeResultType>(right));
});
if (res_nullmap)
{
if (right_nullmap)
res_nullmap->assign(*right_nullmap);
processWithRightNullmapImpl<true, op_case>(
a, b, c, size, res_nullmap,
[](const auto & left, const auto & right)
{
return apply(
static_cast<NativeResultType>(left),
static_cast<NativeResultType>(right));
});
}
else
processWithRightNullmapImpl<false, op_case>(
a, b, c, size, right_nullmap,
[](const auto & left, const auto & right)
{
return apply(
static_cast<NativeResultType>(left),
static_cast<NativeResultType>(right));
});
}
template <bool is_decimal_a, bool is_decimal_b, class A, class B>
static ResultType process(A a, B b, NativeResultType scale_a, NativeResultType scale_b)
static ResultType process(A a, B b, NativeResultType scale_a, NativeResultType scale_b, NullMap * m)
requires(!is_decimal<A> && !is_decimal<B>)
{
if constexpr (is_division && is_decimal_b)
return applyScaledDiv<is_decimal_a>(a, b, scale_a);
else if constexpr (is_plus_minus_compare)
try
{
if (scale_a != 1)
return applyScaled<true>(a, b, scale_a);
if (scale_b != 1)
return applyScaled<false>(a, b, scale_b);
ResultType res{};
if constexpr (is_division && is_decimal_b)
res = applyScaledDiv<is_decimal_a>(a, b, scale_a);
else if constexpr (is_plus_minus_compare)
{
if (scale_a != 1)
return applyScaled<true>(a, b, scale_a);
if (scale_b != 1)
return applyScaled<false>(a, b, scale_b);
return res = apply(a, b);
}
else
res = apply(a, b);
if constexpr (std::is_floating_point_v<ResultType>)
if (unlikely(!std::isfinite(res)) && m)
(*m)[0] = 1;
return res;
}
catch (const std::exception&)
{
if (m)
(*m)[0] = 1;
else
throw;
return ResultType(); /// Unreachable to disable compiler error.
}
return apply(a, b);
}
private:
template <OpCase op_case, typename ApplyFunc>
static void processWithRightNullmapImpl(const auto & a, const auto & b, ResultContainerType & c, size_t size, const NullMap * right_nullmap, ApplyFunc apply_func)
template <bool may_gen_null, OpCase op_case, typename ApplyFunc>
static void processWithRightNullmapImpl(const auto & a, const auto & b, ResultContainerType & c, size_t size, std::conditional_t<may_gen_null, NullMap *, const NullMap *> nullmap, ApplyFunc apply_func)
{
if (right_nullmap)
/// may_gen_null is false, means res_nullmap is nullptr here, nullmap is right_nullmap
if constexpr (!may_gen_null)
{
const NullMap * right_nullmap = nullmap;
if (right_nullmap)
{
if constexpr (op_case == OpCase::RightConstant)
{
if ((*right_nullmap)[0])
{
for (size_t i = 0; i < size; ++i)
c[i] = ResultType();
return;
}
for (size_t i = 0; i < size; ++i)
c[i] = apply_func(undec(a[i]), undec(b));
}
else
{
for (size_t i = 0; i < size; ++i)
{
if ((*right_nullmap)[i])
c[i] = ResultType();
else
c[i] = apply_func(unwrap<op_case, OpCase::LeftConstant>(a, i), undec(b[i]));
}
}
}
else
for (size_t i = 0; i < size; ++i)
c[i] = apply_func(unwrap<op_case, OpCase::LeftConstant>(a, i), unwrap<op_case, OpCase::RightConstant>(b, i));
return;
}
/// may_gen_null is true, means res_nullmap is not nullptr, and initialized with right_nullmap if it's not nullptr
else
{
auto & res_nullmap = nullmap;
if constexpr (op_case == OpCase::RightConstant)
{
if ((*right_nullmap)[0])
if (res_nullmap->size() && (*res_nullmap)[0])
{
for (size_t i = 0; i < size; ++i)
c[i] = ResultType();
return;
}
for (size_t i = 0; i < size; ++i)
c[i] = apply_func(undec(a[i]), undec(b));
}
else
{
for (size_t i = 0; i < size; ++i)
{
if ((*right_nullmap)[i])
c[i] = ResultType();
else
c[i] = apply_func(unwrap<op_case, OpCase::LeftConstant>(a, i), undec(b[i]));
try
{
c[i] = apply_func(undec(a[i]), undec(b));
if constexpr (std::is_floating_point_v<ResultContainerType>)
if (unlikely(!std::isfinite(c[i])))
{
c[i] = ResultType();
(*res_nullmap)[i] = 1;
}
}
catch (const std::exception&)
{
c[i] = ResultType(); /// dismiss msan
(*res_nullmap)[i] = 1;
}
}
}
else
for (size_t i = 0; i < size; ++i)
{
if ((*res_nullmap)[i])
{
c[i] = ResultType();
continue;
}
try
{
c[i] = apply_func(unwrap<op_case, OpCase::LeftConstant>(a, i), undec(b[i]));
if constexpr (std::is_floating_point_v<ResultContainerType>)
if (unlikely(!std::isfinite(c[i])))
{
c[i] = ResultType();
(*res_nullmap)[i] = 1;
}
}
catch (const std::exception&)
{
c[i] = ResultType();
(*res_nullmap)[i] = 1;
}
}
}
else
for (size_t i = 0; i < size; ++i)
c[i] = apply_func(unwrap<op_case, OpCase::LeftConstant>(a, i), unwrap<op_case, OpCase::RightConstant>(b, i));
}
static constexpr bool is_plus_minus = IsOperation<Operation>::plus ||
IsOperation<Operation>::minus;
static constexpr bool is_multiply = IsOperation<Operation>::multiply;
static constexpr bool is_float_division = IsOperation<Operation>::div_floating;
static constexpr bool is_float_division = IsOperation<Operation>::div_floating || IsOperation<Operation>::divide_or_null;
static constexpr bool is_int_division = IsOperation<Operation>::int_div ||
IsOperation<Operation>::int_div_or_zero;
static constexpr bool is_division = is_float_division || is_int_division;
@ -808,8 +919,10 @@ class FunctionBinaryArithmetic : public IFunction
static constexpr bool is_minus = IsOperation<Op>::minus;
static constexpr bool is_multiply = IsOperation<Op>::multiply;
static constexpr bool is_division = IsOperation<Op>::division;
static constexpr bool is_divide_or_null = IsOperation<Op>::divide_or_null;
static constexpr bool is_bit_hamming_distance = IsOperation<Op>::bit_hamming_distance;
static constexpr bool is_modulo = IsOperation<Op>::modulo;
static constexpr bool is_modulo_or_null = IsOperation<Op>::modulo_or_null;
static constexpr bool is_int_div = IsOperation<Op>::int_div;
static constexpr bool is_int_div_or_zero = IsOperation<Op>::int_div_or_zero;
@ -1363,12 +1476,12 @@ class FunctionBinaryArithmetic : public IFunction
}
template <OpCase op_case, bool left_decimal, bool right_decimal, typename OpImpl, typename OpImplCheck>
void helperInvokeEither(const auto& left, const auto& right, auto& vec_res, auto scale_a, auto scale_b, const NullMap * right_nullmap) const
void helperInvokeEither(const auto& left, const auto& right, auto& vec_res, auto scale_a, auto scale_b, const NullMap * right_nullmap, NullMap * res_nullmap) const
{
if (check_decimal_overflow)
OpImplCheck::template process<op_case, left_decimal, right_decimal>(left, right, vec_res, scale_a, scale_b, right_nullmap);
OpImplCheck::template process<op_case, left_decimal, right_decimal>(left, right, vec_res, scale_a, scale_b, right_nullmap, res_nullmap);
else
OpImpl::template process<op_case, left_decimal, right_decimal>(left, right, vec_res, scale_a, scale_b, right_nullmap);
OpImpl::template process<op_case, left_decimal, right_decimal>(left, right, vec_res, scale_a, scale_b, right_nullmap, res_nullmap);
}
template <class LeftDataType, class RightDataType, class ResultDataType>
@ -1376,7 +1489,7 @@ class FunctionBinaryArithmetic : public IFunction
const auto & left, const auto & right,
const ColumnConst * const col_left_const, const ColumnConst * const col_right_const,
const auto * const col_left, const auto * const col_right,
size_t col_left_size, const NullMap * right_nullmap) const
size_t col_left_size, const NullMap * right_nullmap, NullMap * res_nullmap) const
{
using T0 = typename LeftDataType::FieldType;
using T1 = typename RightDataType::FieldType;
@ -1431,8 +1544,8 @@ class FunctionBinaryArithmetic : public IFunction
ResultType res = {};
if (!right_nullmap || !(*right_nullmap)[0])
res = check_decimal_overflow
? OpImplCheck::template process<left_is_decimal, right_is_decimal>(const_a, const_b, scale_a, scale_b)
: OpImpl::template process<left_is_decimal, right_is_decimal>(const_a, const_b, scale_a, scale_b);
? OpImplCheck::template process<left_is_decimal, right_is_decimal>(const_a, const_b, scale_a, scale_b, res_nullmap)
: OpImpl::template process<left_is_decimal, right_is_decimal>(const_a, const_b, scale_a, scale_b, res_nullmap);
return ResultDataType(type.getPrecision(), type.getScale())
.createColumnConst(col_left_const->size(), toField(res, type.getScale()));
@ -1446,7 +1559,7 @@ class FunctionBinaryArithmetic : public IFunction
if (col_left && col_right)
{
helperInvokeEither<OpCase::Vector, left_is_decimal, right_is_decimal, OpImpl, OpImplCheck>(
col_left->getData(), col_right->getData(), vec_res, scale_a, scale_b, right_nullmap);
col_left->getData(), col_right->getData(), vec_res, scale_a, scale_b, right_nullmap, res_nullmap);
}
else if (col_left_const && col_right)
{
@ -1454,7 +1567,7 @@ class FunctionBinaryArithmetic : public IFunction
helperGetOrConvert<T0, ResultDataType>(col_left_const, left));
helperInvokeEither<OpCase::LeftConstant, left_is_decimal, right_is_decimal, OpImpl, OpImplCheck>(
const_a, col_right->getData(), vec_res, scale_a, scale_b, right_nullmap);
const_a, col_right->getData(), vec_res, scale_a, scale_b, right_nullmap, res_nullmap);
}
else if (col_left && col_right_const)
{
@ -1462,7 +1575,7 @@ class FunctionBinaryArithmetic : public IFunction
helperGetOrConvert<T1, ResultDataType>(col_right_const, right));
helperInvokeEither<OpCase::RightConstant, left_is_decimal, right_is_decimal, OpImpl, OpImplCheck>(
col_left->getData(), const_b, vec_res, scale_a, scale_b, right_nullmap);
col_left->getData(), const_b, vec_res, scale_a, scale_b, right_nullmap, res_nullmap);
}
else
return nullptr;
@ -1774,6 +1887,10 @@ public:
}
else
type_res = std::make_shared<ResultDataType>();
if constexpr (is_divide_or_null || is_modulo_or_null)
type_res = std::make_shared<DataTypeNullable>(type_res);
return true;
}
}
@ -2064,13 +2181,12 @@ ColumnPtr executeStringInteger(const ColumnsWithTypeAndName & arguments, const A
}
template <typename A, typename B>
ColumnPtr executeNumeric(const ColumnsWithTypeAndName & arguments, const A & left, const B & right, const NullMap * right_nullmap) const
ColumnPtr executeNumeric(const ColumnsWithTypeAndName & arguments, const A & left, const B & right, const NullMap * right_nullmap, NullMap * res_nullmap [[maybe_unused]]) const
{
using LeftDataType = std::decay_t<decltype(left)>;
using RightDataType = std::decay_t<decltype(right)>;
using ResultDataType = typename BinaryOperationTraits<Op, LeftDataType, RightDataType>::ResultDataType;
using DecimalResultType = typename BinaryOperationTraits<Op, LeftDataType, RightDataType>::DecimalResultDataType;
if constexpr (std::is_same_v<ResultDataType, InvalidType>)
{
return nullptr;
@ -2127,7 +2243,8 @@ ColumnPtr executeStringInteger(const ColumnsWithTypeAndName & arguments, const A
col_left_const, col_right_const,
col_left, col_right,
col_left_size,
right_nullmap);
right_nullmap,
res_nullmap);
}
/// Here we check if we have `intDiv` or `intDivOrZero` and at least one of the arguments is decimal, because in this case originally we had result as decimal, so we need to convert result into integer after calculations
else if constexpr (!decimal_with_float && (is_int_div || is_int_div_or_zero) && (IsDataTypeDecimal<LeftDataType> || IsDataTypeDecimal<RightDataType>))
@ -2150,7 +2267,8 @@ ColumnPtr executeStringInteger(const ColumnsWithTypeAndName & arguments, const A
col_left_const, col_right_const,
col_left, col_right,
col_left_size,
right_nullmap);
right_nullmap,
res_nullmap);
auto col = ColumnWithTypeAndName(res, type_res, name);
return castColumn(col, std::make_shared<ResultDataType>());
@ -2167,7 +2285,8 @@ ColumnPtr executeStringInteger(const ColumnsWithTypeAndName & arguments, const A
{
const auto res = right_nullmap && (*right_nullmap)[0] ? ResultType() : OpImpl::process(
col_left_const->template getValue<T0>(),
col_right_const->template getValue<T1>());
col_right_const->template getValue<T1>(),
res_nullmap ? res_nullmap->data() : nullptr);
return ResultDataType().createColumnConst(col_left_const->size(), toField(res));
}
@ -2184,7 +2303,8 @@ ColumnPtr executeStringInteger(const ColumnsWithTypeAndName & arguments, const A
col_right->getData().data(),
vec_res.data(),
vec_res.size(),
right_nullmap);
right_nullmap,
res_nullmap);
}
else if (col_left_const && col_right)
{
@ -2195,14 +2315,15 @@ ColumnPtr executeStringInteger(const ColumnsWithTypeAndName & arguments, const A
col_right->getData().data(),
vec_res.data(),
vec_res.size(),
right_nullmap);
right_nullmap,
res_nullmap);
}
else if (col_left && col_right_const)
{
const T1 value = col_right_const->template getValue<T1>();
OpImpl::template process<OpCase::RightConstant>(
col_left->getData().data(), &value, vec_res.data(), vec_res.size(), right_nullmap);
col_left->getData().data(), &value, vec_res.data(), vec_res.size(), right_nullmap, res_nullmap);
}
else
return nullptr;
@ -2259,7 +2380,7 @@ ColumnPtr executeStringInteger(const ColumnsWithTypeAndName & arguments, const A
return executeImpl2(arguments, result_type, input_rows_count);
}
ColumnPtr executeImpl2(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const NullMap * right_nullmap = nullptr) const
ColumnPtr executeImpl2(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const NullMap * right_nullmap = nullptr, NullMap * res_nullmap [[maybe_unused]] = nullptr) const
{
const auto & left_argument = arguments[0];
const auto & right_argument = arguments[1];
@ -2273,10 +2394,27 @@ ColumnPtr executeStringInteger(const ColumnsWithTypeAndName & arguments, const A
bool is_const = checkColumnConst<ColumnNullable>(right_argument.column.get());
const ColumnNullable * nullable_column = is_const ? checkAndGetColumnConstData<ColumnNullable>(right_argument.column.get())
: checkAndGetColumn<ColumnNullable>(right_argument.column.get());
const auto & right_null_map = nullable_column->getNullMapData();
/// Process operation is divideOrNull, moduloOrNull etc. which may return NULL when divide zero.
if constexpr (is_divide_or_null || is_modulo_or_null)
{
NullMap res_null_map(right_null_map.begin(), right_null_map.end());
if (is_const)
res_null_map.resize_fill(input_rows_count, 0);
const auto & null_bytemap = nullable_column->getNullMapData();
auto res = executeImpl2(createBlockWithNestedColumns(arguments), removeNullable(result_type), input_rows_count, &null_bytemap);
return wrapInNullable(res, arguments, result_type, input_rows_count);
auto res = executeImpl2(createBlockWithNestedColumns(arguments), removeNullable(result_type), input_rows_count, &right_null_map, &res_null_map);
return wrapInNullable(res, arguments, result_type, input_rows_count, &res_null_map);
}
auto res = executeImpl2(createBlockWithNestedColumns(arguments), removeNullable(result_type), input_rows_count, &right_null_map, nullptr);
return wrapInNullable(res, arguments, result_type, input_rows_count, nullptr);
}
/// Process when operation is divideOrNull and moduloOrNull, when right argument is not Nullable.
else if ((is_divide_or_null || is_modulo_or_null) && !res_nullmap)
{
NullMap result_null_map(input_rows_count, 0);
auto res = executeImpl2(arguments, result_type, input_rows_count, nullptr, &result_null_map);
return wrapInNullable(res, arguments, result_type, input_rows_count, &result_null_map);
}
/// Special case - one or both arguments are IPv4
@ -2354,7 +2492,7 @@ ColumnPtr executeStringInteger(const ColumnsWithTypeAndName & arguments, const A
return (res = executeStringInteger<ColumnString>(arguments, left, right)) != nullptr;
}
else
return (res = executeNumeric(arguments, left, right, right_nullmap)) != nullptr;
return (res = executeNumeric(arguments, left, right, right_nullmap, res_nullmap)) != nullptr;
});
if (isArray(result_type))
@ -2659,10 +2797,9 @@ public:
/// Check the case when operation is divide, intDiv or modulo and denominator is Nullable(Something).
/// For divide operation we should check only Nullable(Decimal), because only this case can throw division by zero error.
bool division_by_nullable = !arguments[0].type->onlyNull() && !arguments[1].type->onlyNull() && arguments[1].type->isNullable()
&& (IsOperation<Op>::int_div || IsOperation<Op>::modulo || IsOperation<Op>::positive_modulo
&& (IsOperation<Op>::int_div || IsOperation<Op>::modulo || IsOperation<Op>::positive_modulo || IsOperation<Op>::modulo_or_null || IsOperation<Op>::divide_or_null
|| (IsOperation<Op>::div_floating
&& (isDecimalOrNullableDecimal(arguments[0].type) || isDecimalOrNullableDecimal(arguments[1].type))));
/// More efficient specialization for two numeric arguments.
if (arguments.size() == 2
&& ((arguments[0].column && isColumnConst(*arguments[0].column))

View File

@ -1,5 +1,6 @@
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnTuple.h>
@ -218,7 +219,7 @@ checkAndGetNestedArrayOffset(const IColumn ** columns, size_t num_arguments)
}
ColumnPtr
wrapInNullable(const ColumnPtr & src, const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count)
wrapInNullable(const ColumnPtr & src, const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, const NullMap * res_null_map)
{
ColumnPtr result_null_map_column;
@ -233,6 +234,19 @@ wrapInNullable(const ColumnPtr & src, const ColumnsWithTypeAndName & args, const
result_null_map_column = nullable->getNullMapColumnPtr();
}
if (res_null_map && !memoryIsZero(res_null_map->data(), 0, res_null_map->size()))
{
if (!result_null_map_column)
result_null_map_column = ColumnUInt8::create(input_rows_count, 0);
MutableColumnPtr mutable_result_null_map_column = IColumn::mutate(std::move(result_null_map_column));
NullMap & result_null_map = assert_cast<ColumnUInt8 &>(*mutable_result_null_map_column).getData();
for (size_t i = 0, size = result_null_map.size(); i < size; ++i)
result_null_map[i] |= (*res_null_map)[i];
result_null_map_column = std::move(mutable_result_null_map_column);
}
for (const auto & elem : args)
{
if (!elem.type->isNullable())

View File

@ -2,6 +2,7 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Columns/ColumnNullable.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeNullable.h>
#include <Columns/IColumn.h>
@ -167,7 +168,7 @@ checkAndGetNestedArrayOffset(const IColumn ** columns, size_t num_arguments);
/// Return ColumnNullable of src, with null map as OR-ed null maps of args columns.
/// Or ColumnConst(ColumnNullable) if the result is always NULL or if the result is constant and always not NULL.
ColumnPtr wrapInNullable(const ColumnPtr & src, const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count);
ColumnPtr wrapInNullable(const ColumnPtr & src, const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, const NullMap * res_null_map = nullptr);
/** Return ColumnNullable of src, with input null map
* Or ColumnConst(ColumnNullable) if the result is always NULL or if the result is constant and always not NULL.

View File

@ -12,11 +12,13 @@ template <typename, typename> struct PlusImpl;
template <typename, typename> struct MinusImpl;
template <typename, typename> struct MultiplyImpl;
template <typename, typename> struct DivideFloatingImpl;
template <typename, typename> struct DivideOrNullImpl;
template <typename, typename> struct DivideIntegralImpl;
template <typename, typename> struct DivideIntegralOrZeroImpl;
template <typename, typename> struct LeastBaseImpl;
template <typename, typename> struct GreatestBaseImpl;
template <typename, typename> struct ModuloImpl;
template <typename, typename> struct ModuloOrNullImpl;
template <typename, typename> struct PositiveModuloImpl;
template <typename, typename> struct EqualsOp;
template <typename, typename> struct NotEqualsOp;
@ -51,16 +53,18 @@ struct IsOperation
static constexpr bool minus = IsSameOperation<Op, MinusImpl>::value;
static constexpr bool multiply = IsSameOperation<Op, MultiplyImpl>::value;
static constexpr bool div_floating = IsSameOperation<Op, DivideFloatingImpl>::value;
static constexpr bool divide_or_null = IsSameOperation<Op, DivideOrNullImpl>::value;
static constexpr bool int_div = IsSameOperation<Op, DivideIntegralImpl>::value;
static constexpr bool int_div_or_zero = IsSameOperation<Op, DivideIntegralOrZeroImpl>::value;
static constexpr bool modulo = IsSameOperation<Op, ModuloImpl>::value;
static constexpr bool modulo_or_null = IsSameOperation<Op, ModuloOrNullImpl>::value;
static constexpr bool positive_modulo = IsSameOperation<Op, PositiveModuloImpl>::value;
static constexpr bool least = IsSameOperation<Op, LeastBaseImpl>::value;
static constexpr bool greatest = IsSameOperation<Op, GreatestBaseImpl>::value;
static constexpr bool bit_hamming_distance = IsSameOperation<Op, BitHammingDistanceImpl>::value;
static constexpr bool division = div_floating || int_div || int_div_or_zero || modulo;
static constexpr bool division = div_floating || int_div || int_div_or_zero || modulo || modulo_or_null || divide_or_null;
// NOTE: allow_decimal should not fully contain `division` because of divInt
static constexpr bool allow_decimal = plus || minus || multiply || division || least || greatest;
};

View File

@ -62,16 +62,17 @@ public:
for (size_t i = 0; i < num_rows; ++i)
{
auto array_size = col_num->getInt(i);
auto element_size = col_value->byteSizeAt(i);
if (unlikely(array_size < 0))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} cannot be negative: while executing function {}", array_size, getName());
Int64 estimated_size = 0;
if (unlikely(common::mulOverflow(array_size, col_value->byteSize(), estimated_size)))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, col_value->byteSize(), getName());
if (unlikely(common::mulOverflow(array_size, element_size, estimated_size)))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, element_size, getName());
if (unlikely(estimated_size > max_array_size_in_columns_bytes))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, col_value->byteSize(), getName());
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, element_size, getName());
offset += array_size;

View File

@ -1,37 +1,9 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionBinaryArithmetic.h>
#include <Functions/DivisionUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
template <typename A, typename B>
struct DivideFloatingImpl
{
using ResultType = typename NumberTraits::ResultOfFloatingPointDivision<A, B>::Type;
static const constexpr bool allow_fixed_string = false;
static const constexpr bool allow_string_integer = false;
template <typename Result = ResultType>
static NO_SANITIZE_UNDEFINED Result apply(A a [[maybe_unused]], B b [[maybe_unused]])
{
return static_cast<Result>(a) / static_cast<Result>(b);
}
#if USE_EMBEDDED_COMPILER
static constexpr bool compilable = true;
static llvm::Value * compile(llvm::IRBuilder<> & b, llvm::Value * left, llvm::Value * right, bool)
{
if (left->getType()->isIntegerTy())
throw Exception(ErrorCodes::LOGICAL_ERROR, "DivideFloatingImpl expected a floating-point type");
return b.CreateFDiv(left, right);
}
#endif
};
struct NameDivide { static constexpr auto name = "divide"; };
using FunctionDivide = BinaryArithmeticOverloadResolver<DivideFloatingImpl, NameDivide>;

View File

@ -0,0 +1,161 @@
#include <cmath>
#include <limits>
#include <type_traits>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionBinaryArithmetic.h>
#include <Functions/DivisionUtils.h>
namespace DB
{
template <typename A, typename B>
struct DivideOrNullImpl
: BinaryOperation<A, B, DivideFloatingImpl<A, B>>
{
using Op = DivideFloatingImpl<A, B>;
using ResultType = typename NumberTraits::ResultOfFloatingPointDivision<A, B>::Type;
static const constexpr bool allow_fixed_string = false;
static const constexpr bool allow_string_integer = false;
template <OpCase op_case>
static void NO_INLINE process(const A * __restrict a, const B * __restrict b, ResultType * __restrict c, size_t size, const NullMap * right_nullmap [[maybe_unused]], NullMap * res_nullmap)
{
chassert(res_nullmap);
if constexpr (op_case == OpCase::RightConstant)
{
if (right_nullmap && (*right_nullmap)[0])
return;
if (unlikely(*b == 0))
{
for (size_t i = 0; i < size; ++i)
{
c[i] = ResultType();
(*res_nullmap)[i] = 1;
}
return;
}
if constexpr (std::is_signed_v<B>)
{
if (*b == -1)
{
for (size_t i = 0; i < size; ++i)
{
if (unlikely(a[i] == std::numeric_limits<A>::min()))
{
(*res_nullmap)[i] = 1;
c[i] = ResultType();
}
else
c[i] = static_cast<ResultType>(-a[i]);
}
return;
}
}
for (size_t i = 0; i < size; ++i)
c[i] = Op::template apply<ResultType>(a[i], *b);
}
else
{
for (size_t i = 0; i < size; ++i)
if ((*res_nullmap)[i])
c[i] = ResultType();
else
apply<op_case>(a, b, c, i, &((*res_nullmap)[i]));
}
}
static ResultType process(A a, B b, NullMap::value_type * m)
{
chassert(m);
ResultType res{};
try
{
if (b == 0)
{
*m = 1;
return res;
}
if constexpr (std::is_signed_v<B>)
{
if (b == -1)
{
if (unlikely(a == std::numeric_limits<A>::min()))
{
*m = 1;
return res;
}
return static_cast<ResultType>(-a);
}
}
return static_cast<ResultType>(a) / b;
}
catch (const std::exception&)
{
*m = 1;
}
return res;
}
template <typename Result = ResultType>
static NO_SANITIZE_UNDEFINED Result apply(A a, B b)
{
return static_cast<Result>(a) / b;
}
template <typename Result = ResultType>
static NO_SANITIZE_UNDEFINED Result apply(A a, B b, NullMap::value_type * m)
{
chassert(m);
auto res = static_cast<Result>(a) / b;
if constexpr (std::is_floating_point_v<ResultType>)
if (unlikely(!std::isfinite(res)))
*m = 1;
return res;
}
private:
template <OpCase op_case>
static void apply(const A * __restrict a, const B * __restrict b, ResultType * __restrict c, size_t i, NullMap::value_type * m)
{
chassert(m);
try
{
if constexpr (op_case == OpCase::Vector)
c[i] = Op::template apply<ResultType>(a[i], b[i]);
else
c[i] = Op::template apply<ResultType>(*a, b[i]);
if constexpr (std::is_floating_point_v<ResultType>)
{
if (unlikely(!std::isfinite(c[i])))
*m = 1;
}
}
catch (const std::exception&)
{
*m = 1;
}
}
public:
#if USE_EMBEDDED_COMPILER
static constexpr bool compilable = false;
#endif
};
namespace impl_
{
template <typename A, typename B> struct BinaryOperationImpl<A, B, DivideOrNullImpl<A, B>> : DivideOrNullImpl<A, B> {};
}
struct NameDivideOrNull { static constexpr auto name = "divideOrNull"; };
using FunctionDivideOrNull = BinaryArithmeticOverloadResolver<DivideOrNullImpl, NameDivideOrNull>;
REGISTER_FUNCTION(DivideOrNull)
{
factory.registerFunction<FunctionDivideOrNull>();
}
}

View File

@ -26,7 +26,7 @@ struct DivideIntegralByConstantImpl
static const constexpr bool allow_string_integer = false;
template <OpCase op_case>
static void NO_INLINE process(const A * __restrict a, const B * __restrict b, ResultType * __restrict c, size_t size, const NullMap * right_nullmap)
static void NO_INLINE process(const A * __restrict a, const B * __restrict b, ResultType * __restrict c, size_t size, const NullMap * right_nullmap, NullMap * res_nullmap[[maybe_unused]])
{
if constexpr (op_case == OpCase::RightConstant)
{
@ -51,7 +51,7 @@ struct DivideIntegralByConstantImpl
}
}
static ResultType process(A a, B b) { return Op::template apply<ResultType>(a, b); }
static ResultType process(A a, B b, NullMap::value_type * m [[maybe_unused]] = nullptr) { return Op::template apply<ResultType>(a, b); }
static void NO_INLINE NO_SANITIZE_UNDEFINED vectorConstant(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size)
{

View File

@ -27,7 +27,7 @@ struct ModuloByConstantImpl
static const constexpr bool allow_string_integer = false;
template <OpCase op_case>
static void NO_INLINE process(const A * __restrict a, const B * __restrict b, ResultType * __restrict c, size_t size, const NullMap * right_nullmap)
static void NO_INLINE process(const A * __restrict a, const B * __restrict b, ResultType * __restrict c, size_t size, const NullMap * right_nullmap, NullMap * res_nullmap [[maybe_unused]] = nullptr)
{
if constexpr (op_case == OpCase::RightConstant)
{
@ -51,12 +51,21 @@ struct ModuloByConstantImpl
}
}
static ResultType process(A a, B b) { return Op::template apply<ResultType>(a, b); }
static ResultType process(A a, B b, NullMap::value_type * res_nullmap [[maybe_unused]] = nullptr) { return Op::template apply<ResultType>(a, b); }
static void NO_INLINE NO_SANITIZE_UNDEFINED vectorConstant(const A * __restrict src, B b, ResultType * __restrict dst, size_t size)
{
/// Modulo with too small divisor.
if (unlikely((std::is_signed_v<B> && b == -1) || b == 1))
if constexpr (std::is_signed_v<B>)
{
if (unlikely((b == -1)))
{
for (size_t i = 0; i < size; ++i)
dst[i] = 0;
return;
}
}
if (b == 1)
{
for (size_t i = 0; i < size; ++i)
dst[i] = 0;

View File

@ -0,0 +1,201 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionBinaryArithmetic.h>
#include <libdivide.h>
namespace DB
{
template <typename A, typename B>
struct ModuloOrNullImpl
: BinaryOperation<A, B, ModuloImpl<A, B>>
{
using Op = ModuloImpl<A, B>;
using ResultType = typename Op::ResultType;
static const constexpr bool allow_fixed_string = false;
static const constexpr bool allow_string_integer = false;
template <OpCase op_case>
static void NO_INLINE process(const A * __restrict a, const B * __restrict b, ResultType * __restrict c, size_t size, const NullMap * right_nullmap [[maybe_unused]], NullMap * res_nullmap)
{
chassert(res_nullmap);
if constexpr (op_case == OpCase::RightConstant)
{
if (right_nullmap && (*right_nullmap)[0])
return;
if constexpr (!std::is_same_v<ResultType, Float64> && !is_big_int_v<A> && !is_big_int_v<B>
&& !std::is_same_v<A, Int8> && !std::is_same_v<A, UInt8>)
vectorConstant(a, *b, c, size, res_nullmap);
else
for (size_t i = 0; i < size; ++i)
apply<op_case>(a, b, c, i, &((*res_nullmap)[i]));
}
else
{
if (right_nullmap)
{
for (size_t i = 0; i < size; ++i)
if ((*right_nullmap)[i])
c[i] = ResultType();
else
apply<op_case>(a, b, c, i, &((*res_nullmap)[i]));
}
else
for (size_t i = 0; i < size; ++i)
apply<op_case>(a, b, c, i, &((*res_nullmap)[i]));
}
}
static ResultType process(A a, B b, NullMap::value_type * m)
{
chassert(m);
ResultType res{};
try
{
res = Op::template apply<ResultType>(a, b);
if constexpr (std::is_floating_point_v<ResultType>)
if (unlikely(!std::isfinite(res)))
*m = 1;
}
catch (const std::exception&)
{
*m = 1;
}
return res;
}
static void NO_INLINE NO_SANITIZE_UNDEFINED vectorConstant(const A * __restrict src, B b, ResultType * __restrict dst, size_t size, NullMap * res_nullmap)
{
/// Modulo with too small divisor.
if constexpr (std::is_signed_v<B>)
{
if (unlikely((b == -1)))
{
for (size_t i = 0; i < size; ++i)
dst[i] = 0;
return;
}
}
if (b == 1)
{
for (size_t i = 0; i < size; ++i)
dst[i] = 0;
return;
}
/// Modulo with too large divisor.
if constexpr ((std::is_signed_v<B> && std::is_signed_v<A>) || (std::is_unsigned_v<B> && std::is_unsigned_v<A>))
{
if (unlikely(b > std::numeric_limits<A>::max()
|| (std::is_signed_v<A> && std::is_signed_v<B> && b < std::numeric_limits<A>::lowest())))
{
for (size_t i = 0; i < size; ++i)
dst[i] = static_cast<ResultType>(src[i]);
return;
}
}
/// Set result to NULL if divide by zero or too large divisor.
if (unlikely(static_cast<A>(b) == 0 || std::is_signed_v<B> && b == std::numeric_limits<B>::lowest()))
{
for (size_t i = 0; i < size; ++i)
(*res_nullmap)[i] = 1;
return;
}
/// Modulo of division by negative number is the same as the positive number.
if (b < 0)
b = -b;
/// Here we failed to make the SSE variant from libdivide give an advantage.
if (b & (b - 1))
{
libdivide::divider<A> divider(static_cast<A>(b));
for (size_t i = 0; i < size; ++i)
{
/// NOTE: perhaps, the division semantics with the remainder of negative numbers is not preserved.
dst[i] = static_cast<ResultType>(src[i] - (src[i] / divider) * b);
}
}
else
{
// gcc libdivide doesn't work well for pow2 division
auto mask = b - 1;
for (size_t i = 0; i < size; ++i)
dst[i] = static_cast<ResultType>(src[i] & mask);
}
}
template <typename Result = ResultType>
static Result apply(A a, B b, NullMap::value_type * m)
{
chassert(m);
Result res{};
try
{
res = Op::template apply<Result>(a, b);
if constexpr (std::is_floating_point_v<Result>)
if (unlikely(!std::isfinite(res)))
*m = 1;
}
catch (const std::exception&)
{
*m = 1;
}
return res;
}
template <typename Result = ResultType>
static Result apply(A a, B b)
{
return Op::template apply<Result>(a, b);
}
private:
template <OpCase op_case>
static void apply(const A * __restrict a, const B * __restrict b, ResultType * __restrict c, size_t i, NullMap::value_type * m)
{
try
{
c[i] = ResultType();
if constexpr (op_case == OpCase::Vector)
c[i] = Op::template apply<ResultType>(a[i], b[i]);
else if constexpr (op_case == OpCase::RightConstant)
c[i] = Op::template apply<ResultType>(a[i], *b);
else
c[i] = Op::template apply<ResultType>(*a, b[i]);
if constexpr (std::is_floating_point_v<ResultType>)
if (unlikely(!std::isfinite(c[i])))
{
*m = 1;
}
}
catch (const std::exception&)
{
*m = 1;
}
}
public:
#if USE_EMBEDDED_COMPILER
static constexpr bool compilable = false;
#endif
};
namespace impl_
{
template <typename A, typename B> struct BinaryOperationImpl<A, B, ModuloOrNullImpl<A, B>> : ModuloOrNullImpl<A, B> {};
}
struct NameModuloOrNull { static constexpr auto name = "moduloOrNull"; };
using FunctionModuloOrNull = BinaryArithmeticOverloadResolver<ModuloOrNullImpl, NameModuloOrNull>;
REGISTER_FUNCTION(ModuloOrNull)
{
factory.registerFunction<FunctionModuloOrNull>();
}
}

View File

@ -54,7 +54,9 @@ struct PlusName { static constexpr auto name = "plus"; };
struct MinusName { static constexpr auto name = "minus"; };
struct MultiplyName { static constexpr auto name = "multiply"; };
struct DivideName { static constexpr auto name = "divide"; };
struct DivideOrNullName { static constexpr auto name = "divideOrNull"; };
struct ModuloName { static constexpr auto name = "modulo"; };
struct ModuloOrNullName { static constexpr auto name = "moduloOrNull"; };
struct IntDivName { static constexpr auto name = "intDiv"; };
struct IntDivOrZeroName { static constexpr auto name = "intDivOrZero"; };
@ -71,6 +73,16 @@ constexpr std::string makeFirstLetterUppercase(const std::string & str)
return res;
}
constexpr bool endWith(const std::string & str, const std::string & needle)
{
return str.size() >= needle.size() && str.compare(str.size() - needle.size(), needle.size(), needle) == 0;
}
constexpr std::string dropNeedle(const std::string & str, const std::string & needle)
{
return endWith(str, needle) ? str.substr(0, str.size() - needle.size()) : str;
}
template <class FuncName>
class FunctionTupleOperator : public ITupleFunction
{
@ -152,8 +164,12 @@ using FunctionTupleMultiply = FunctionTupleOperator<MultiplyName>;
using FunctionTupleDivide = FunctionTupleOperator<DivideName>;
using FunctionTupleDivideOrNull = FunctionTupleOperator<DivideOrNullName>;
using FunctionTupleModulo = FunctionTupleOperator<ModuloName>;
using FunctionTupleModuloOrNull = FunctionTupleOperator<ModuloOrNullName>;
using FunctionTupleIntDiv = FunctionTupleOperator<IntDivName>;
using FunctionTupleIntDivOrZero = FunctionTupleOperator<IntDivOrZeroName>;
@ -234,7 +250,7 @@ class FunctionTupleOperatorByNumber : public ITupleFunction
{
public:
/// constexpr cannot be used due to std::string has not constexpr constructor in this compiler version
static inline auto name = "tuple" + makeFirstLetterUppercase(FuncName::name) + "ByNumber";
static inline auto name = "tuple" + makeFirstLetterUppercase(dropNeedle(FuncName::name, "OrNull")) + "ByNumber" + (endWith(FuncName::name, "OrNull") ? "OrNull" : "");
explicit FunctionTupleOperatorByNumber(ContextPtr context_) : ITupleFunction(context_) {}
static FunctionPtr create(ContextPtr context_) { return std::make_shared<FunctionTupleOperatorByNumber>(context_); }
@ -307,12 +323,16 @@ using FunctionTupleMultiplyByNumber = FunctionTupleOperatorByNumber<MultiplyName
using FunctionTupleDivideByNumber = FunctionTupleOperatorByNumber<DivideName>;
using FunctionTupleDivideOrNullByNumber = FunctionTupleOperatorByNumber<DivideOrNullName>;
using FunctionTupleModuloByNumber = FunctionTupleOperatorByNumber<ModuloName>;
using FunctionTupleIntDivByNumber = FunctionTupleOperatorByNumber<IntDivName>;
using FunctionTupleIntDivOrZeroByNumber = FunctionTupleOperatorByNumber<IntDivOrZeroName>;
using FunctionTupleModuloOrNullByNumber = FunctionTupleOperatorByNumber<ModuloOrNullName>;
class FunctionDotProduct : public ITupleFunction
{
public:
@ -1581,7 +1601,9 @@ REGISTER_FUNCTION(VectorFunctions)
factory.registerAlias("vectorDifference", FunctionTupleMinus::name, FunctionFactory::Case::Insensitive);
factory.registerFunction<FunctionTupleMultiply>();
factory.registerFunction<FunctionTupleDivide>();
factory.registerFunction<FunctionTupleDivideOrNull>();
factory.registerFunction<FunctionTupleModulo>();
factory.registerFunction<FunctionTupleModuloOrNull>();
factory.registerFunction<FunctionTupleIntDiv>();
factory.registerFunction<FunctionTupleIntDivOrZero>();
factory.registerFunction<FunctionTupleNegate>();
@ -1647,7 +1669,9 @@ If the types of the first interval (or the interval in the tuple) and the second
factory.registerFunction<FunctionTupleMultiplyByNumber>();
factory.registerFunction<FunctionTupleDivideByNumber>();
factory.registerFunction<FunctionTupleDivideOrNullByNumber>();
factory.registerFunction<FunctionTupleModuloByNumber>();
factory.registerFunction<FunctionTupleModuloOrNullByNumber>();
factory.registerFunction<FunctionTupleIntDivByNumber>();
factory.registerFunction<FunctionTupleIntDivOrZeroByNumber>();

View File

@ -226,6 +226,26 @@ template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConf
#endif
template class TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
#if USE_AVRO && USE_AWS_S3
template class TableFunctionObjectStorage<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
#endif
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
template class TableFunctionObjectStorage<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
#endif
#if USE_AVRO && USE_HDFS
template class TableFunctionObjectStorage<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
#endif
#if USE_PARQUET && USE_AWS_S3
template class TableFunctionObjectStorage<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
#endif
#if USE_AWS_S3
template class TableFunctionObjectStorage<HudiClusterDefinition, StorageS3HudiConfiguration>;
#endif
#if USE_AVRO
void registerTableFunctionIceberg(TableFunctionFactory & factory)
{

View File

@ -96,7 +96,7 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
{
.documentation = {
.description=R"(The table function can be used to read the data stored on HDFS in parallel for many nodes in a specified cluster.)",
.examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster_name, uri, format)", ""}}},
.examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster, uri, format)", ""}}},
.allow_readonly = false
}
);
@ -105,15 +105,77 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
UNUSED(factory);
}
#if USE_AVRO
void registerTableFunctionIcebergCluster(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AWS_S3
template class TableFunctionObjectStorageCluster<S3ClusterDefinition, StorageS3Configuration>;
factory.registerFunction<TableFunctionIcebergS3Cluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster.)",
.examples{{"icebergS3Cluster", "SELECT * FROM icebergS3Cluster(cluster, url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
#if USE_AZURE_BLOB_STORAGE
template class TableFunctionObjectStorageCluster<AzureClusterDefinition, StorageAzureConfiguration>;
factory.registerFunction<TableFunctionIcebergAzureCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster.)",
.examples{{"icebergAzureCluster", "SELECT * FROM icebergAzureCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
#if USE_HDFS
template class TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
factory.registerFunction<TableFunctionIcebergHDFSCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster.)",
.examples{{"icebergHDFSCluster", "SELECT * FROM icebergHDFSCluster(cluster, uri, [format], [structure], [compression_method])", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
}
#endif
#if USE_AWS_S3
#if USE_PARQUET
void registerTableFunctionDeltaLakeCluster(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionDeltaLakeCluster>(
{.documentation
= {.description = R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster.)",
.examples{{"deltaLakeCluster", "SELECT * FROM deltaLakeCluster(cluster, url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
void registerTableFunctionHudiCluster(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionHudiCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Hudi table stored on object store in parallel for many nodes in a specified cluster.)",
.examples{{"hudiCluster", "SELECT * FROM hudiCluster(cluster, url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AVRO
registerTableFunctionIcebergCluster(factory);
#endif
#if USE_AWS_S3
#if USE_PARQUET
registerTableFunctionDeltaLakeCluster(factory);
#endif
registerTableFunctionHudiCluster(factory);
#endif
}
}

View File

@ -33,6 +33,36 @@ struct HDFSClusterDefinition
static constexpr auto storage_type_name = "HDFSCluster";
};
struct IcebergS3ClusterDefinition
{
static constexpr auto name = "icebergS3Cluster";
static constexpr auto storage_type_name = "IcebergS3Cluster";
};
struct IcebergAzureClusterDefinition
{
static constexpr auto name = "icebergAzureCluster";
static constexpr auto storage_type_name = "IcebergAzureCluster";
};
struct IcebergHDFSClusterDefinition
{
static constexpr auto name = "icebergHDFSCluster";
static constexpr auto storage_type_name = "IcebergHDFSCluster";
};
struct DeltaLakeClusterDefinition
{
static constexpr auto name = "deltaLakeCluster";
static constexpr auto storage_type_name = "DeltaLakeS3Cluster";
};
struct HudiClusterDefinition
{
static constexpr auto name = "hudiCluster";
static constexpr auto storage_type_name = "HudiS3Cluster";
};
/**
* Class implementing s3/hdfs/azureBlobStorageCluster(...) table functions,
* which allow to process many files from S3/HDFS/Azure blob storage on a specific cluster.
@ -79,4 +109,25 @@ using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster<AzureClu
#if USE_HDFS
using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
#endif
#if USE_AVRO && USE_AWS_S3
using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
#endif
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
using TableFunctionIcebergAzureCluster = TableFunctionObjectStorageCluster<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
#endif
#if USE_AVRO && USE_HDFS
using TableFunctionIcebergHDFSCluster = TableFunctionObjectStorageCluster<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
#endif
#if USE_AWS_S3 && USE_PARQUET
using TableFunctionDeltaLakeCluster = TableFunctionObjectStorageCluster<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
#endif
#if USE_AWS_S3
using TableFunctionHudiCluster = TableFunctionObjectStorageCluster<HudiClusterDefinition, StorageS3HudiConfiguration>;
#endif
}

View File

@ -66,6 +66,7 @@ void registerTableFunctions(bool use_legacy_mongodb_integration [[maybe_unused]]
registerTableFunctionObjectStorage(factory);
registerTableFunctionObjectStorageCluster(factory);
registerDataLakeTableFunctions(factory);
registerDataLakeClusterTableFunctions(factory);
}
}

View File

@ -70,6 +70,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory);
void registerTableFunctionObjectStorage(TableFunctionFactory & factory);
void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory);
void registerDataLakeTableFunctions(TableFunctionFactory & factory);
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory);
void registerTableFunctionTimeSeries(TableFunctionFactory & factory);

View File

@ -0,0 +1,20 @@
<clickhouse>
<remote_servers>
<cluster_simple>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</cluster_simple>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,6 @@
<clickhouse>
<query_log>
<database>system</database>
<table>query_log</table>
</query_log>
</clickhouse>

View File

@ -73,14 +73,38 @@ def started_cluster():
cluster.add_instance(
"node1",
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml",
],
user_configs=["configs/users.d/users.xml"],
with_minio=True,
with_azurite=True,
stay_alive=True,
with_hdfs=with_hdfs,
stay_alive=True,
)
cluster.add_instance(
"node2",
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml",
],
user_configs=["configs/users.d/users.xml"],
stay_alive=True,
)
cluster.add_instance(
"node3",
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml",
],
user_configs=["configs/users.d/users.xml"],
stay_alive=True,
)
logging.info("Starting cluster...")
@ -182,6 +206,7 @@ def get_creation_expression(
cluster,
format="Parquet",
table_function=False,
run_on_cluster=False,
**kwargs,
):
if storage_type == "s3":
@ -189,35 +214,56 @@ def get_creation_expression(
bucket = kwargs["bucket"]
else:
bucket = cluster.minio_bucket
print(bucket)
if table_function:
return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
if run_on_cluster:
assert table_function
return f"icebergS3Cluster('cluster_simple', s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
if table_function:
return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
elif storage_type == "azure":
if table_function:
if run_on_cluster:
assert table_function
return f"""
icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
icebergAzureCluster('cluster_simple', azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
if table_function:
return f"""
icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
elif storage_type == "hdfs":
if table_function:
if run_on_cluster:
assert table_function
return f"""
icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
icebergHDFSCluster('cluster_simple', hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');"""
if table_function:
return f"""
icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');"""
elif storage_type == "local":
assert not run_on_cluster
if table_function:
return f"""
icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format})
@ -227,6 +273,7 @@ def get_creation_expression(
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});"""
else:
raise Exception(f"Unknown iceberg storage type: {storage_type}")
@ -492,6 +539,108 @@ def test_types(started_cluster, format_version, storage_type):
)
@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs"])
def test_cluster_table_function(started_cluster, format_version, storage_type):
if is_arm() and storage_type == "hdfs":
pytest.skip("Disabled test IcebergHDFS for aarch64")
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
TABLE_NAME = (
"test_iceberg_cluster_"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)
def add_df(mode):
write_iceberg_from_df(
spark,
generate_data(spark, 0, 100),
TABLE_NAME,
mode=mode,
format_version=format_version,
)
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
logging.info(f"Adding another dataframe. result files: {files}")
return files
files = add_df(mode="overwrite")
for i in range(1, len(started_cluster.instances)):
files = add_df(mode="append")
logging.info(f"Setup complete. files: {files}")
assert len(files) == 5 + 4 * (len(started_cluster.instances) - 1)
clusters = instance.query(f"SELECT * FROM system.clusters")
logging.info(f"Clusters setup: {clusters}")
# Regular Query only node1
table_function_expr = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True
)
select_regular = (
instance.query(f"SELECT * FROM {table_function_expr}").strip().split()
)
# Cluster Query with node1 as coordinator
table_function_expr_cluster = get_creation_expression(
storage_type,
TABLE_NAME,
started_cluster,
table_function=True,
run_on_cluster=True,
)
select_cluster = (
instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split()
)
# Simple size check
assert len(select_regular) == 600
assert len(select_cluster) == 600
# Actual check
assert select_cluster == select_regular
# Check query_log
for replica in started_cluster.instances.values():
replica.query("SYSTEM FLUSH LOGS")
for node_name, replica in started_cluster.instances.items():
cluster_secondary_queries = (
replica.query(
f"""
SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
WHERE
type = 'QueryStart' AND
positionCaseInsensitive(query, '{storage_type}Cluster') != 0 AND
position(query, '{TABLE_NAME}') != 0 AND
position(query, 'system.query_log') = 0 AND
NOT is_initial_query
"""
)
.strip()
.split("\n")
)
logging.info(
f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}"
)
assert len(cluster_secondary_queries) == 1
@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"])
def test_delete_files(started_cluster, format_version, storage_type):

View File

@ -241,6 +241,7 @@ defaultValueOfTypeName
degrees
demangle
divide
divideOrNull
dotProduct
dumpColumnStructure
e
@ -434,6 +435,7 @@ minSampleSizeConversion
minus
modulo
moduloLegacy
moduloOrNull
moduloOrZero
monthName
multiFuzzyMatchAllIndices
@ -882,6 +884,8 @@ tumbleStart
tupleConcat
tupleDivide
tupleDivideByNumber
tupleDivideByNumberOrNull
tupleDivideOrNull
tupleElement
tupleHammingDistance
tupleIntDiv
@ -891,6 +895,8 @@ tupleIntDivOrZeroByNumber
tupleMinus
tupleModulo
tupleModuloByNumber
tupleModuloByNumberOrNull
tupleModuloOrNull
tupleMultiply
tupleMultiplyByNumber
tupleNegate

View File

@ -1,3 +1,6 @@
SELECT arrayWithConstant(96142475, ['qMUF']); -- { serverError TOO_LARGE_ARRAY_SIZE }
SELECT arrayWithConstant(100000000, materialize([[[[[[[[[['Hello, world!']]]]]]]]]])); -- { serverError TOO_LARGE_ARRAY_SIZE }
SELECT length(arrayWithConstant(10000000, materialize([[[[[[[[[['Hello world']]]]]]]]]])));
CREATE TEMPORARY TABLE args (value Array(Int)) ENGINE=Memory AS SELECT [1, 1, 1, 1] as value FROM numbers(1, 100);
SELECT length(arrayWithConstant(1000000, value)) FROM args FORMAT NULL;

View File

@ -0,0 +1,32 @@
10
0
91
1
Nullable(UInt8)
\N
\N
\N
\N
\N
\N
\N
\N
\N
\N
\N
\N
\N
\N
\N
\N
\N
\N
\N
\N
\N
\N
\N
(NULL,1,1)
(0,1,1)
(NULL,NULL,NULL)
(1,0,1)

View File

@ -0,0 +1,39 @@
select moduloOrNull(10, toNullable(materialize(100)));
select moduloOrNull(93, toNullable(materialize(93)));
select moduloOrNull(91, toNullable(materialize(93)));
select moduloOrNull(94, toNullable(materialize(93)));
select toTypeName(moduloOrNull(1, 0));
select moduloOrNull(1, 0);
select moduloOrNull(1, materialize(0));
select moduloOrNull(materialize(1), 0);
select moduloOrNull(materialize(1), materialize(0));
select moduloOrNull(1.1, toNullable(materialize(toUInt64(0))));
select moduloOrNull(materialize(1), toNullable(materialize(toUInt64(0))));
select moduloOrNull(toNullable(materialize(1)), toNullable(materialize(toUInt64(0))));
select moduloOrNull(toNullable(materialize(toFloat32(1))), toNullable(materialize(toInt64(0))));
select moduloOrNull(1.1, toNullable(materialize(toInt128(0))));
select moduloOrNull(toNullable(materialize(toFloat64(1))), toNullable(materialize(toInt128(0))));
select moduloOrNull(toNullable(materialize(toFloat64(1))), toNullable(materialize(toInt256(0))));
select moduloOrNull(1.0, toNullable(materialize(toInt256(0))));
SELECT moduloOrNull(toNullable(materialize(1)), toNullable(materialize(0)));
SELECT moduloOrNull(toNullable(materialize(toFloat32(1))), toNullable(materialize(0)));
SELECT moduloOrNull(toNullable(materialize(toFloat32(1))), materialize(0));
SELECT moduloOrNull(toNullable(materialize(toFloat32(1))), toNullable(0));
SELECT moduloOrNull(materialize(1), CAST(materialize(NULL), 'Nullable(Float32)'));
SELECT moduloOrNull(toDecimal32(16.2, 2), 0.0);
SELECT moduloOrNull(toDecimal32(16.2, 2), toDecimal32(0.0, 2));
SELECT moduloOrNull((16.2), 0.0);
SELECT moduloOrNull(materialize(16.2), 0.0);
SELECT moduloOrNull(16.2, materialize(0.0));
SELECT moduloOrNull(materialize(16.2), materialize(0.0));
SELECT tupleModuloOrNull((15, 10, 5), (0, 3, 2));
SELECT tupleModuloOrNull((15, 10, 5), (5, 3, 2));
SELECT tupleModuloByNumberOrNull((15, 10, 5), 0);
SELECT tupleModuloByNumberOrNull((15, 10, 5), 2);

View File

@ -0,0 +1,30 @@
\N
\N
\N
\N
\N
\N
\N
\N
1.1
11.1
8.333333002196431
10
1.7777777777777777
1.7777777777777777
1.7777777777777777
1.7777777777777777
1.7777777777777777
1.7777777777777777
1.7777777777777777
\N
\N
\N
\N
\N
\N
\N
(NULL,NULL,NULL)
(3,NULL,NULL)
(3,2,1)
(NULL,NULL,NULL)

View File

@ -0,0 +1,38 @@
SELECT divideOrNull(1 , CAST(NULL, 'Nullable(Float32)'));
SELECT divideOrNull(materialize(1), CAST(NULL, 'Nullable(Float32)'));
SELECT divideOrNull(1 , 0);
SELECT divideOrNull(materialize(1) , 0);
SELECT divideOrNull(1 , materialize(0));
SELECT divideOrNull(materialize(1) , materialize(0));
SELECT divideOrNull(1, CAST(materialize(0.0), 'Nullable(Float32)'));
SELECT divideOrNull(materialize(1), CAST(materialize(0.0), 'Nullable(Float32)'));
SELECT divideOrNull(1.1, CAST(1, 'Nullable(Float32)'));
SELECT divideOrNull(materialize(11.1), CAST(1, 'Nullable(Float32)'));
SELECT divideOrNull(10, CAST(materialize(1.2), 'Nullable(Float32)'));
SELECT divideOrNull(10, CAST(materialize(1), 'Nullable(Int128)'));
SELECT divideOrNull(CAST(16.0, 'Float32'), CAST(materialize(9), 'Nullable(Int128)'));
SELECT divideOrNull(CAST(16, 'Int64'), CAST(materialize(9), 'Nullable(Int128)'));
SELECT divideOrNull(CAST(16, 'Int32'), CAST(materialize(9), 'Nullable(Int128)'));
SELECT divideOrNull(CAST(16, 'Int8'), CAST(materialize(9), 'Nullable(Int128)'));
SELECT divideOrNull(CAST(16, 'Int128'), CAST(materialize(9), 'Nullable(Int128)'));
SELECT divideOrNull(CAST(16, 'UInt256'), CAST(materialize(9), 'Nullable(UInt128)'));
SELECT divideOrNull(CAST(16, 'UInt256'), CAST(materialize(9), 'Nullable(UInt128)'));
SELECT divideOrNull(toDecimal32(16.2, 2), toDecimal32(0.0, 1));
SELECT divideOrNull(toDecimal32(16.2, 2), materialize(toDecimal32(0.0, 1)));
SELECT divideOrNull(materialize(toDecimal32(16.2, 2)), toDecimal32(0.0, 1));
SELECT divideOrNull(materialize(toDecimal32(16.2, 2)), materialize(toDecimal32(0.0, 1)));
SELECT divideOrNull(toDecimal32(16.2, 2), 0.0);
SELECT divideOrNull(toDecimal32(16.2, 2), materialize(0.0));
SELECT divideOrNull(materialize(toDecimal32(16.2, 2)), materialize(0.0));
SELECT tupleDivideOrNull((15, 10, 5), (0, 0, 0));
SELECT tupleDivideOrNull((15, 10, 5), (5, 0, 0));
SELECT tupleDivideByNumberOrNull((15, 10, 5), 5);
SELECT tupleDivideByNumberOrNull((15, 10, 5), 0);

View File

@ -244,7 +244,10 @@ Deduplication
DefaultTableEngine
DelayedInserts
DeliveryTag
Deltalake
DeltaLake
deltalakeCluster
deltaLakeCluster
Denormalize
DestroyAggregatesThreads
DestroyAggregatesThreadsActive
@ -377,10 +380,15 @@ Homebrew's
HorizontalDivide
Hostname
HouseOps
hudi
Hudi
hudiCluster
HudiCluster
HyperLogLog
Hypot
IANA
icebergCluster
IcebergCluster
IDE
IDEs
IDNA
@ -3157,4 +3165,10 @@ znode
znodes
zookeeperSessionUptime
zstd
divideOrNull
moduloOrNull
tupleDivideOrNull
tupleDivideByNumberOrNull
tupleModuloByNumberOrNull
tupleModuloOrNull
BFloat