Merge branch 'master' into parallel-log-appending

This commit is contained in:
Antonio Andelic 2022-11-26 09:04:27 +01:00 committed by GitHub
commit 599f381217
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
58 changed files with 1520 additions and 460 deletions

3
.gitmodules vendored
View File

@ -290,3 +290,6 @@
[submodule "contrib/morton-nd"]
path = contrib/morton-nd
url = https://github.com/morton-nd/morton-nd
[submodule "contrib/xxHash"]
path = contrib/xxHash
url = https://github.com/Cyan4973/xxHash.git

View File

@ -167,7 +167,9 @@ add_contrib (c-ares-cmake c-ares)
add_contrib (qpl-cmake qpl)
add_contrib (morton-nd-cmake morton-nd)
add_contrib(annoy-cmake annoy)
add_contrib (annoy-cmake annoy)
add_contrib (xxHash-cmake xxHash)
# Put all targets defined here and in subdirectories under "contrib/<immediate-subdir>" folders in GUI-based IDEs.
# Some of third-party projects may override CMAKE_FOLDER or FOLDER property of their targets, so they would not appear

1
contrib/xxHash vendored Submodule

@ -0,0 +1 @@
Subproject commit 3078dc6039f8c0bffcb1904f81cfe6b2c3209435

View File

@ -0,0 +1,13 @@
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/xxHash")
set (SRCS
"${LIBRARY_DIR}/xxhash.c"
)
add_library(xxHash ${SRCS})
target_include_directories(xxHash SYSTEM BEFORE INTERFACE "${LIBRARY_DIR}")
# XXH_INLINE_ALL - Make all functions inline, with implementations being directly included within xxhash.h. Inlining functions is beneficial for speed on small keys.
# https://github.com/Cyan4973/xxHash/tree/v0.8.1#build-modifiers
target_compile_definitions(xxHash PUBLIC XXH_INLINE_ALL)
add_library(ch_contrib::xxHash ALIAS xxHash)

View File

@ -6,29 +6,24 @@ FROM clickhouse/test-util:$FROM_TAG
# Rust toolchain and libraries
ENV RUSTUP_HOME=/rust/rustup
ENV CARGO_HOME=/rust/cargo
RUN curl https://sh.rustup.rs -sSf | bash -s -- -y
RUN chmod 777 -R /rust
ENV PATH="/rust/cargo/env:${PATH}"
ENV PATH="/rust/cargo/bin:${PATH}"
RUN rustup target add aarch64-unknown-linux-gnu && \
rustup target add x86_64-apple-darwin && \
rustup target add x86_64-unknown-freebsd && \
rustup target add aarch64-apple-darwin && \
rustup target add powerpc64le-unknown-linux-gnu
RUN apt-get install \
RUN curl https://sh.rustup.rs -sSf | bash -s -- -y && \
chmod 777 -R /rust && \
rustup target add aarch64-unknown-linux-gnu && \
rustup target add x86_64-apple-darwin && \
rustup target add x86_64-unknown-freebsd && \
rustup target add aarch64-apple-darwin && \
rustup target add powerpc64le-unknown-linux-gnu
RUN apt-get update && \
apt-get install --yes \
gcc-aarch64-linux-gnu \
build-essential \
libc6 \
libc6-dev \
libc6-dev-arm64-cross \
--yes
# Install CMake 3.20+ for Rust compilation
# Used https://askubuntu.com/a/1157132 as reference
RUN apt purge cmake --yes
RUN wget -O - https://apt.kitware.com/keys/kitware-archive-latest.asc 2>/dev/null | gpg --dearmor - | tee /etc/apt/trusted.gpg.d/kitware.gpg >/dev/null
RUN apt-add-repository 'deb https://apt.kitware.com/ubuntu/ focal main'
RUN apt update && apt install cmake --yes
libc6-dev-arm64-cross && \
apt-get clean
ENV CC=clang-${LLVM_VERSION}
ENV CXX=clang++-${LLVM_VERSION}

View File

@ -137,6 +137,7 @@ function clone_submodules
contrib/hashidsxx
contrib/c-ares
contrib/morton-nd
contrib/xxHash
)
git submodule sync

View File

@ -458,7 +458,7 @@ else
zgrep -Fav -e "Code: 236. DB::Exception: Cancelled merging parts" \
-e "Code: 236. DB::Exception: Cancelled mutating parts" \
-e "REPLICA_IS_ALREADY_ACTIVE" \
-e "REPLICA_IS_ALREADY_EXIST" \
-e "REPLICA_ALREADY_EXISTS" \
-e "ALL_REPLICAS_LOST" \
-e "DDLWorker: Cannot parse DDL task query" \
-e "RaftInstance: failed to accept a rpc connection due to error 125" \

View File

@ -13,6 +13,7 @@ RUN apt-get update \
apt-transport-https \
apt-utils \
ca-certificates \
curl \
dnsutils \
gnupg \
iputils-ping \
@ -24,10 +25,16 @@ RUN apt-get update \
&& echo "${LLVM_PUBKEY_HASH} /tmp/llvm-snapshot.gpg.key" | sha384sum -c \
&& apt-key add /tmp/llvm-snapshot.gpg.key \
&& export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \
&& echo "deb [trusted=yes] https://apt.llvm.org/${CODENAME}/ llvm-toolchain-${CODENAME}-${LLVM_VERSION} main" >> \
&& echo "deb https://apt.llvm.org/${CODENAME}/ llvm-toolchain-${CODENAME}-${LLVM_VERSION} main" >> \
/etc/apt/sources.list \
&& apt-get clean
# Install cmake 3.20+ for rust support
# Used https://askubuntu.com/a/1157132 as reference
RUN curl -s https://apt.kitware.com/keys/kitware-archive-latest.asc | \
gpg --dearmor - > /etc/apt/trusted.gpg.d/kitware.gpg && \
echo "deb https://apt.kitware.com/ubuntu/ $(lsb_release -cs) main" >> /etc/apt/sources.list
# initial packages
RUN apt-get update \
&& apt-get install \
@ -37,7 +44,6 @@ RUN apt-get update \
clang-${LLVM_VERSION} \
clang-tidy-${LLVM_VERSION} \
cmake \
curl \
fakeroot \
gdb \
git \

View File

@ -161,3 +161,140 @@ Result:
│ -1 │
└─────────────┘
```
## multiplyDecimal(a, b[, result_scale])
Performs multiplication on two decimals. Result value will be of type [Decimal256](../../sql-reference/data-types/decimal.md).
Result scale can be explicitly specified by `result_scale` argument (const Integer in range `[0, 76]`). If not specified, the result scale is the max scale of given arguments.
:::note
These functions work significantly slower than usual `multiply`.
In case you don't really need controlled precision and/or need fast computation, consider using [multiply](#multiply)
:::
**Syntax**
```sql
multiplyDecimal(a, b[, result_scale])
```
**Arguments**
- `a` — First value: [Decimal](../../sql-reference/data-types/decimal.md).
- `b` — Second value: [Decimal](../../sql-reference/data-types/decimal.md).
- `result_scale` — Scale of result: [Int/UInt](../../sql-reference/data-types/int-uint.md).
**Returned value**
- The result of multiplication with given scale.
Type: [Decimal256](../../sql-reference/data-types/decimal.md).
**Example**
```text
┌─multiplyDecimal(toDecimal256(-12, 0), toDecimal32(-2.1, 1), 1)─┐
│ 25.2 │
└────────────────────────────────────────────────────────────────┘
```
**Difference from regular multiplication:**
```sql
SELECT toDecimal64(-12.647, 3) * toDecimal32(2.1239, 4);
SELECT toDecimal64(-12.647, 3) as a, toDecimal32(2.1239, 4) as b, multiplyDecimal(a, b);
```
```text
┌─multiply(toDecimal64(-12.647, 3), toDecimal32(2.1239, 4))─┐
│ -26.8609633 │
└───────────────────────────────────────────────────────────┘
┌─multiplyDecimal(toDecimal64(-12.647, 3), toDecimal32(2.1239, 4))─┐
│ -26.8609 │
└──────────────────────────────────────────────────────────────────┘
```
```sql
SELECT
toDecimal64(-12.647987876, 9) AS a,
toDecimal64(123.967645643, 9) AS b,
multiplyDecimal(a, b);
SELECT
toDecimal64(-12.647987876, 9) AS a,
toDecimal64(123.967645643, 9) AS b,
a * b;
```
```text
┌─────────────a─┬─────────────b─┬─multiplyDecimal(toDecimal64(-12.647987876, 9), toDecimal64(123.967645643, 9))─┐
│ -12.647987876 │ 123.967645643 │ -1567.941279108 │
└───────────────┴───────────────┴───────────────────────────────────────────────────────────────────────────────┘
Received exception from server (version 22.11.1):
Code: 407. DB::Exception: Received from localhost:9000. DB::Exception: Decimal math overflow: While processing toDecimal64(-12.647987876, 9) AS a, toDecimal64(123.967645643, 9) AS b, a * b. (DECIMAL_OVERFLOW)
```
## divideDecimal(a, b[, result_scale])
Performs division on two decimals. Result value will be of type [Decimal256](../../sql-reference/data-types/decimal.md).
Result scale can be explicitly specified by `result_scale` argument (const Integer in range `[0, 76]`). If not specified, the result scale is the max scale of given arguments.
:::note
These function work significantly slower than usual `divide`.
In case you don't really need controlled precision and/or need fast computation, consider using [divide](#divide).
:::
**Syntax**
```sql
divideDecimal(a, b[, result_scale])
```
**Arguments**
- `a` — First value: [Decimal](../../sql-reference/data-types/decimal.md).
- `b` — Second value: [Decimal](../../sql-reference/data-types/decimal.md).
- `result_scale` — Scale of result: [Int/UInt](../../sql-reference/data-types/int-uint.md).
**Returned value**
- The result of division with given scale.
Type: [Decimal256](../../sql-reference/data-types/decimal.md).
**Example**
```text
┌─divideDecimal(toDecimal256(-12, 0), toDecimal32(2.1, 1), 10)─┐
│ -5.7142857142 │
└──────────────────────────────────────────────────────────────┘
```
**Difference from regular division:**
```sql
SELECT toDecimal64(-12, 1) / toDecimal32(2.1, 1);
SELECT toDecimal64(-12, 1) as a, toDecimal32(2.1, 1) as b, divideDecimal(a, b, 1), divideDecimal(a, b, 5);
```
```text
┌─divide(toDecimal64(-12, 1), toDecimal32(2.1, 1))─┐
│ -5.7 │
└──────────────────────────────────────────────────┘
┌───a─┬───b─┬─divideDecimal(toDecimal64(-12, 1), toDecimal32(2.1, 1), 1)─┬─divideDecimal(toDecimal64(-12, 1), toDecimal32(2.1, 1), 5)─┐
│ -12 │ 2.1 │ -5.7 │ -5.71428 │
└─────┴─────┴────────────────────────────────────────────────────────────┴────────────────────────────────────────────────────────────┘
```
```sql
SELECT toDecimal64(-12, 0) / toDecimal32(2.1, 1);
SELECT toDecimal64(-12, 0) as a, toDecimal32(2.1, 1) as b, divideDecimal(a, b, 1), divideDecimal(a, b, 5);
```
```text
DB::Exception: Decimal result's scale is less than argument's one: While processing toDecimal64(-12, 0) / toDecimal32(2.1, 1). (ARGUMENT_OUT_OF_BOUND)
┌───a─┬───b─┬─divideDecimal(toDecimal64(-12, 0), toDecimal32(2.1, 1), 1)─┬─divideDecimal(toDecimal64(-12, 0), toDecimal32(2.1, 1), 5)─┐
│ -12 │ 2.1 │ -5.7 │ -5.71428 │
└─────┴─────┴────────────────────────────────────────────────────────────┴────────────────────────────────────────────────────────────┘
```

View File

@ -159,3 +159,150 @@ SELECT min2(-1, 2);
└─────────────┘
```
## multiplyDecimal(a, b[, result_scale])
Совершает умножение двух Decimal. Результат будет иметь тип [Decimal256](../../sql-reference/data-types/decimal.md).
Scale (размер дробной части) результат можно явно задать аргументом `result_scale` (целочисленная константа из интервала `[0, 76]`).
Если этот аргумент не задан, то scale результата будет равен наибольшему из scale обоих аргументов.
**Синтаксис**
```sql
multiplyDecimal(a, b[, result_scale])
```
:::note
Эта функция работают гораздо медленнее обычной `multiply`.
В случае, если нет необходимости иметь фиксированную точность и/или нужны быстрые вычисления, следует использовать [multiply](#multiply).
:::
**Аргументы**
- `a` — Первый сомножитель/делимое: [Decimal](../../sql-reference/data-types/decimal.md).
- `b` — Второй сомножитель/делитель: [Decimal](../../sql-reference/data-types/decimal.md).
- `result_scale` — Scale результата: [Int/UInt](../../sql-reference/data-types/int-uint.md).
**Возвращаемое значение**
- Результат умножения с заданным scale.
Тип: [Decimal256](../../sql-reference/data-types/decimal.md).
**Примеры**
```sql
SELECT multiplyDecimal(toDecimal256(-12, 0), toDecimal32(-2.1, 1), 1);
```
```text
┌─multiplyDecimal(toDecimal256(-12, 0), toDecimal32(-2.1, 1), 1)─┐
│ 25.2 │
└────────────────────────────────────────────────────────────────┘
```
**Отличие от стандартных функций**
```sql
SELECT toDecimal64(-12.647, 3) * toDecimal32(2.1239, 4);
SELECT toDecimal64(-12.647, 3) as a, toDecimal32(2.1239, 4) as b, multiplyDecimal(a, b);
```
```text
┌─multiply(toDecimal64(-12.647, 3), toDecimal32(2.1239, 4))─┐
│ -26.8609633 │
└───────────────────────────────────────────────────────────┘
┌─multiplyDecimal(toDecimal64(-12.647, 3), toDecimal32(2.1239, 4))─┐
│ -26.8609 │
└──────────────────────────────────────────────────────────────────┘
```
```sql
SELECT
toDecimal64(-12.647987876, 9) AS a,
toDecimal64(123.967645643, 9) AS b,
multiplyDecimal(a, b);
SELECT
toDecimal64(-12.647987876, 9) AS a,
toDecimal64(123.967645643, 9) AS b,
a * b;
```
```text
┌─────────────a─┬─────────────b─┬─multiplyDecimal(toDecimal64(-12.647987876, 9), toDecimal64(123.967645643, 9))─┐
│ -12.647987876 │ 123.967645643 │ -1567.941279108 │
└───────────────┴───────────────┴───────────────────────────────────────────────────────────────────────────────┘
Received exception from server (version 22.11.1):
Code: 407. DB::Exception: Received from localhost:9000. DB::Exception: Decimal math overflow: While processing toDecimal64(-12.647987876, 9) AS a, toDecimal64(123.967645643, 9) AS b, a * b. (DECIMAL_OVERFLOW)
```
## divideDecimal(a, b[, result_scale])
Совершает деление двух Decimal. Результат будет иметь тип [Decimal256](../../sql-reference/data-types/decimal.md).
Scale (размер дробной части) результат можно явно задать аргументом `result_scale` (целочисленная константа из интервала `[0, 76]`).
Если этот аргумент не задан, то scale результата будет равен наибольшему из scale обоих аргументов.
**Синтаксис**
```sql
divideDecimal(a, b[, result_scale])
```
:::note
Эта функция работает гораздо медленнее обычной `divide`.
В случае, если нет необходимости иметь фиксированную точность и/или нужны быстрые вычисления, следует использовать [divide](#divide).
:::
**Аргументы**
- `a` — Первый сомножитель/делимое: [Decimal](../../sql-reference/data-types/decimal.md).
- `b` — Второй сомножитель/делитель: [Decimal](../../sql-reference/data-types/decimal.md).
- `result_scale` — Scale результата: [Int/UInt](../../sql-reference/data-types/int-uint.md).
**Возвращаемое значение**
- Результат деления с заданным scale.
Тип: [Decimal256](../../sql-reference/data-types/decimal.md).
**Примеры**
```sql
SELECT divideDecimal(toDecimal256(-12, 0), toDecimal32(2.1, 1), 10);
```
```text
┌─divideDecimal(toDecimal256(-12, 0), toDecimal32(2.1, 1), 10)─┐
│ -5.7142857142 │
└──────────────────────────────────────────────────────────────┘
```
**Отличие от стандартных функций**
```sql
SELECT toDecimal64(-12, 1) / toDecimal32(2.1, 1);
SELECT toDecimal64(-12, 1) as a, toDecimal32(2.1, 1) as b, divideDecimal(a, b, 1), divideDecimal(a, b, 5);
```
```text
┌─divide(toDecimal64(-12, 1), toDecimal32(2.1, 1))─┐
│ -5.7 │
└──────────────────────────────────────────────────┘
┌───a─┬───b─┬─divideDecimal(toDecimal64(-12, 1), toDecimal32(2.1, 1), 1)─┬─divideDecimal(toDecimal64(-12, 1), toDecimal32(2.1, 1), 5)─┐
│ -12 │ 2.1 │ -5.7 │ -5.71428 │
└─────┴─────┴────────────────────────────────────────────────────────────┴────────────────────────────────────────────────────────────┘
```
```sql
SELECT toDecimal64(-12, 0) / toDecimal32(2.1, 1);
SELECT toDecimal64(-12, 0) as a, toDecimal32(2.1, 1) as b, divideDecimal(a, b, 1), divideDecimal(a, b, 5);
```
```text
DB::Exception: Decimal result's scale is less than argument's one: While processing toDecimal64(-12, 0) / toDecimal32(2.1, 1). (ARGUMENT_OUT_OF_BOUND)
┌───a─┬───b─┬─divideDecimal(toDecimal64(-12, 0), toDecimal32(2.1, 1), 1)─┬─divideDecimal(toDecimal64(-12, 0), toDecimal32(2.1, 1), 5)─┐
│ -12 │ 2.1 │ -5.7 │ -5.71428 │
└─────┴─────┴────────────────────────────────────────────────────────────┴────────────────────────────────────────────────────────────┘
```

View File

@ -1475,8 +1475,7 @@ try
if (settings.async_insert_threads)
global_context->setAsynchronousInsertQueue(std::make_shared<AsynchronousInsertQueue>(
global_context,
settings.async_insert_threads,
settings.async_insert_cleanup_timeout_ms));
settings.async_insert_threads));
/// Size of cache for marks (index of MergeTree family of tables).
size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120);

View File

@ -61,7 +61,7 @@ public:
function_node_arguments_nodes[0] = std::move(function_node_arguments_nodes[1]);
function_node_arguments_nodes.resize(1);
resolveAggregateFunctionNode(*function_node, "countIf");
resolveAsCountIfAggregateFunction(*function_node, function_node_arguments_nodes[0]->getResultType());
return;
}
@ -102,15 +102,16 @@ public:
function_node_arguments_nodes[0] = std::move(nested_if_function_arguments_nodes[0]);
function_node_arguments_nodes.resize(1);
resolveAggregateFunctionNode(*function_node, "countIf");
resolveAsCountIfAggregateFunction(*function_node, function_node_arguments_nodes[0]->getResultType());
return;
}
/// Rewrite `sum(if(cond, 0, 1))` into `countIf(not(cond))`.
if (if_true_condition_value == 0 && if_false_condition_value == 1)
{
auto condition_result_type = nested_if_function_arguments_nodes[0]->getResultType();
DataTypePtr not_function_result_type = std::make_shared<DataTypeUInt8>();
const auto & condition_result_type = nested_if_function_arguments_nodes[0]->getResultType();
if (condition_result_type->isNullable())
not_function_result_type = makeNullable(not_function_result_type);
@ -123,23 +124,21 @@ public:
function_node_arguments_nodes[0] = std::move(not_function);
function_node_arguments_nodes.resize(1);
resolveAggregateFunctionNode(*function_node, "countIf");
resolveAsCountIfAggregateFunction(*function_node, function_node_arguments_nodes[0]->getResultType());
return;
}
}
private:
static inline void resolveAggregateFunctionNode(FunctionNode & function_node, const String & aggregate_function_name)
static inline void resolveAsCountIfAggregateFunction(FunctionNode & function_node, const DataTypePtr & argument_type)
{
auto function_result_type = function_node.getResultType();
auto function_aggregate_function = function_node.getAggregateFunction();
AggregateFunctionProperties properties;
auto aggregate_function = AggregateFunctionFactory::instance().get(aggregate_function_name,
function_aggregate_function->getArgumentTypes(),
function_aggregate_function->getParameters(),
auto aggregate_function = AggregateFunctionFactory::instance().get("countIf",
{argument_type},
function_node.getAggregateFunction()->getParameters(),
properties);
auto function_result_type = function_node.getResultType();
function_node.resolveAsAggregateFunction(std::move(aggregate_function), std::move(function_result_type));
}

View File

@ -258,7 +258,7 @@
M(250, NOT_ENOUGH_BLOCK_NUMBERS) \
M(251, NO_SUCH_REPLICA) \
M(252, TOO_MANY_PARTS) \
M(253, REPLICA_IS_ALREADY_EXIST) \
M(253, REPLICA_ALREADY_EXISTS) \
M(254, NO_ACTIVE_REPLICAS) \
M(255, TOO_MANY_RETRIES_TO_FETCH_PARTS) \
M(256, PARTITION_ALREADY_EXISTS) \

View File

@ -604,7 +604,6 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \
M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \
M(Milliseconds, async_insert_busy_timeout_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \
M(Milliseconds, async_insert_cleanup_timeout_ms, 1000, "Time to wait before each iteration of cleaning up buffers for INSERT queries which don't appear anymore. Only has meaning at server startup.", 0) \
\
M(UInt64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \
M(UInt64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \
@ -705,6 +704,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
MAKE_OBSOLETE(M, DefaultDatabaseEngine, default_database_engine, DefaultDatabaseEngine::Atomic) \
MAKE_OBSOLETE(M, UInt64, max_pipeline_depth, 0) \
MAKE_OBSOLETE(M, Seconds, temporary_live_view_timeout, 1) \
MAKE_OBSOLETE(M, Milliseconds, async_insert_cleanup_timeout_ms, 1000) \
/** The section above is for obsolete settings. Do not add anything there. */

View File

@ -39,7 +39,7 @@ namespace ErrorCodes
extern const int NO_ZOOKEEPER;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int REPLICA_IS_ALREADY_EXIST;
extern const int REPLICA_ALREADY_EXISTS;
extern const int DATABASE_REPLICATION_FAILED;
extern const int UNKNOWN_DATABASE;
extern const int UNKNOWN_TABLE;
@ -297,7 +297,7 @@ void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessL
if (is_create_query || replica_host_id != host_id)
{
throw Exception(
ErrorCodes::REPLICA_IS_ALREADY_EXIST,
ErrorCodes::REPLICA_ALREADY_EXISTS,
"Replica {} of shard {} of replicated database at {} already exists. Replica host ID: '{}', current host ID: '{}'",
replica_name, shard_name, zookeeper_path, replica_host_id, host_id);
}

View File

@ -29,9 +29,9 @@ list (APPEND PRIVATE_LIBS
ch_contrib::zlib
boost::filesystem
divide_impl
ch_contrib::xxHash
)
if (TARGET ch_rust::blake3)
list (APPEND PUBLIC_LIBS
ch_rust::blake3
@ -66,8 +66,6 @@ if (TARGET ch_contrib::base64)
list (APPEND PRIVATE_LIBS ch_contrib::base64)
endif()
list (APPEND PRIVATE_LIBS ch_contrib::lz4)
if (ENABLE_NLP)
list (APPEND PRIVATE_LIBS ch_contrib::cld2)
endif()

View File

@ -2297,6 +2297,10 @@ struct ToStringMonotonicity
if (const auto * low_cardinality_type = checkAndGetDataType<DataTypeLowCardinality>(type_ptr))
type_ptr = low_cardinality_type->getDictionaryType().get();
/// Order on enum values (which is the order on integers) is completely arbitrary in respect to the order on strings.
if (WhichDataType(type).isEnum())
return not_monotonic;
/// `toString` function is monotonous if the argument is Date or Date32 or DateTime or String, or non-negative numbers with the same number of symbols.
if (checkDataTypes<DataTypeDate, DataTypeDate32, DataTypeDateTime, DataTypeString>(type_ptr))
return positive;

View File

@ -0,0 +1,17 @@
#include <Functions/FunctionsDecimalArithmetics.h>
#include <Functions/FunctionFactory.h>
namespace DB
{
REGISTER_FUNCTION(DivideDecimals)
{
factory.registerFunction<FunctionsDecimalArithmetics<DivideDecimalsImpl>>(Documentation(
"Decimal division with given precision. Slower than simple `divide`, but has controlled precision and no sound overflows"));
}
REGISTER_FUNCTION(MultiplyDecimals)
{
factory.registerFunction<FunctionsDecimalArithmetics<MultiplyDecimalsImpl>>(Documentation(
"Decimal multiplication with given precision. Slower than simple `divide`, but has controlled precision and no sound overflows"));
}
}

View File

@ -0,0 +1,457 @@
#pragma once
#include <type_traits>
#include <Core/AccurateComparison.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/castTypeToEither.h>
#include <IO/WriteHelpers.h>
#include <Common/logger_useful.h>
#include <Poco/Logger.h>
#include <Loggers/Loggers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int DECIMAL_OVERFLOW;
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_DIVISION;
}
struct DecimalOpHelpers
{
/* These functions perform main arithmetic logic.
* As soon as intermediate results may not fit Decimal256 (e.g. 1e36, scale 10),
* we may not operate with Decimals. Later on this big number may be shrunk (e.g. result scale is 0 in the case above).
* That's why we need to store intermediate results in a flexible extendable storage (here we use std::vector)
* Here we operate on numbers using simple digit arithmetic.
* This is the reason these functions are slower than traditional ones.
*
* Here and below we use UInt8 for storing digits (0-9 range with maximum carry of 9 will definitely fit this)
*/
static std::vector<UInt8> multiply(const std::vector<UInt8> & num1, const std::vector<UInt8> & num2)
{
UInt16 const len1 = num1.size();
UInt16 const len2 = num2.size();
if (len1 == 0 || len2 == 0)
return {0};
std::vector<UInt8> result(len1 + len2, 0);
UInt16 i_n1 = 0;
UInt16 i_n2;
for (Int32 i = len1 - 1; i >= 0; --i)
{
UInt16 carry = 0;
i_n2 = 0;
for (Int32 j = len2 - 1; j >= 0; --j)
{
if (unlikely(i_n1 + i_n2 >= len1 + len2))
throw DB::Exception("Numeric overflow: result bigger that Decimal256", ErrorCodes::DECIMAL_OVERFLOW);
UInt16 sum = num1[i] * num2[j] + result[i_n1 + i_n2] + carry;
carry = sum / 10;
result[i_n1 + i_n2] = sum % 10;
++i_n2;
}
if (carry > 0)
{
if (unlikely(i_n1 + i_n2 >= len1 + len2))
throw DB::Exception("Numeric overflow: result bigger that Decimal256", ErrorCodes::DECIMAL_OVERFLOW);
result[i_n1 + i_n2] += carry;
}
++i_n1;
}
// Maximum Int32 value exceeds 2 billion, we can safely use it for array length storing
Int32 i = static_cast<Int32>(result.size() - 1);
while (i >= 0 && result[i] == 0)
{
result.pop_back();
--i;
}
if (i == -1)
return {0};
std::reverse(result.begin(), result.end());
return result;
}
static std::vector<UInt8> divide(const std::vector<UInt8> & number, const Int256 & divisor)
{
std::vector<UInt8> result;
const auto max_index = number.size() - 1;
UInt16 idx = 0;
Int256 temp = 0;
while (temp < divisor && max_index > idx)
{
temp = temp * 10 + number[idx];
++idx;
}
if (unlikely(temp == 0))
return {0};
while (max_index >= idx)
{
result.push_back(temp / divisor);
temp = (temp % divisor) * 10 + number[idx];
++idx;
}
result.push_back(temp / divisor);
return result;
}
static std::vector<UInt8> toDigits(Int256 x)
{
std::vector<UInt8> result;
if (x >= 10)
result = toDigits(x / 10);
result.push_back(x % 10);
return result;
}
static UInt256 fromDigits(const std::vector<UInt8> & digits)
{
Int256 result = 0;
Int256 scale = 0;
for (auto i = digits.rbegin(); i != digits.rend(); ++i)
{
result += DecimalUtils::scaleMultiplier<Decimal256>(scale) * (*i);
++scale;
}
return result;
}
};
struct DivideDecimalsImpl
{
static constexpr auto name = "divideDecimal";
template <typename FirstType, typename SecondType>
static inline Decimal256
execute(FirstType a, SecondType b, UInt16 scale_a, UInt16 scale_b, UInt16 result_scale)
{
if (b.value == 0)
throw DB::Exception("Division by zero", ErrorCodes::ILLEGAL_DIVISION);
if (a.value == 0)
return Decimal256(0);
Int256 sign_a = a.value < 0 ? -1 : 1;
Int256 sign_b = b.value < 0 ? -1 : 1;
std::vector<UInt8> a_digits = DecimalOpHelpers::toDigits(a.value * sign_a);
while (scale_a < scale_b + result_scale)
{
a_digits.push_back(0);
++scale_a;
}
while (scale_a > scale_b + result_scale && !a_digits.empty())
{
a_digits.pop_back();
--scale_a;
}
if (a_digits.empty())
return Decimal256(0);
std::vector<UInt8> divided = DecimalOpHelpers::divide(a_digits, b.value * sign_b);
if (divided.size() > DecimalUtils::max_precision<Decimal256>)
throw DB::Exception("Numeric overflow: result bigger that Decimal256", ErrorCodes::DECIMAL_OVERFLOW);
return Decimal256(sign_a * sign_b * DecimalOpHelpers::fromDigits(divided));
}
};
struct MultiplyDecimalsImpl
{
static constexpr auto name = "multiplyDecimal";
template <typename FirstType, typename SecondType>
static inline Decimal256
execute(FirstType a, SecondType b, UInt16 scale_a, UInt16 scale_b, UInt16 result_scale)
{
if (a.value == 0 || b.value == 0)
return Decimal256(0);
Int256 sign_a = a.value < 0 ? -1 : 1;
Int256 sign_b = b.value < 0 ? -1 : 1;
std::vector<UInt8> a_digits = DecimalOpHelpers::toDigits(a.value * sign_a);
std::vector<UInt8> b_digits = DecimalOpHelpers::toDigits(b.value * sign_b);
std::vector<UInt8> multiplied = DecimalOpHelpers::multiply(a_digits, b_digits);
UInt16 product_scale = scale_a + scale_b;
while (product_scale < result_scale)
{
multiplied.push_back(0);
++product_scale;
}
while (product_scale > result_scale&& !multiplied.empty())
{
multiplied.pop_back();
--product_scale;
}
if (multiplied.empty())
return Decimal256(0);
if (multiplied.size() > DecimalUtils::max_precision<Decimal256>)
throw DB::Exception("Numeric overflow: result bigger that Decimal256", ErrorCodes::DECIMAL_OVERFLOW);
return Decimal256(sign_a * sign_b * DecimalOpHelpers::fromDigits(multiplied));
}
};
template <typename ResultType, typename Transform>
struct Processor
{
const Transform transform;
explicit Processor(Transform transform_)
: transform(std::move(transform_))
{}
template <typename FirstArgVectorType, typename SecondArgType>
void NO_INLINE
vectorConstant(const FirstArgVectorType & vec_first, const SecondArgType second_value,
PaddedPODArray<typename ResultType::FieldType> & vec_to, UInt16 scale_a, UInt16 scale_b, UInt16 result_scale) const
{
size_t size = vec_first.size();
vec_to.resize(size);
for (size_t i = 0; i < size; ++i)
vec_to[i] = transform.execute(vec_first[i], second_value, scale_a, scale_b, result_scale);
}
template <typename FirstArgVectorType, typename SecondArgVectorType>
void NO_INLINE
vectorVector(const FirstArgVectorType & vec_first, const SecondArgVectorType & vec_second,
PaddedPODArray<typename ResultType::FieldType> & vec_to, UInt16 scale_a, UInt16 scale_b, UInt16 result_scale) const
{
size_t size = vec_first.size();
vec_to.resize(size);
for (size_t i = 0; i < size; ++i)
vec_to[i] = transform.execute(vec_first[i], vec_second[i], scale_a, scale_b, result_scale);
}
template <typename FirstArgType, typename SecondArgVectorType>
void NO_INLINE
constantVector(const FirstArgType & first_value, const SecondArgVectorType & vec_second,
PaddedPODArray<typename ResultType::FieldType> & vec_to, UInt16 scale_a, UInt16 scale_b, UInt16 result_scale) const
{
size_t size = vec_second.size();
vec_to.resize(size);
for (size_t i = 0; i < size; ++i)
vec_to[i] = transform.execute(first_value, vec_second[i], scale_a, scale_b, result_scale);
}
};
template <typename FirstArgType, typename SecondArgType, typename ResultType, typename Transform>
struct DecimalArithmeticsImpl
{
static ColumnPtr execute(Transform transform, const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type)
{
using FirstArgValueType = typename FirstArgType::FieldType;
using FirstArgColumnType = typename FirstArgType::ColumnType;
using SecondArgValueType = typename SecondArgType::FieldType;
using SecondArgColumnType = typename SecondArgType::ColumnType;
using ResultColumnType = typename ResultType::ColumnType;
UInt16 scale_a = getDecimalScale(*arguments[0].type);
UInt16 scale_b = getDecimalScale(*arguments[1].type);
UInt16 result_scale = getDecimalScale(*result_type->getPtr());
auto op = Processor<ResultType, Transform>{std::move(transform)};
auto result_col = result_type->createColumn();
auto col_to = assert_cast<ResultColumnType *>(result_col.get());
const auto * first_col = checkAndGetColumn<FirstArgColumnType>(arguments[0].column.get());
const auto * second_col = checkAndGetColumn<SecondArgColumnType>(arguments[1].column.get());
const auto * first_col_const = typeid_cast<const ColumnConst *>(arguments[0].column.get());
const auto * second_col_const = typeid_cast<const ColumnConst *>(arguments[1].column.get());
if (first_col)
{
if (second_col_const)
op.vectorConstant(first_col->getData(), second_col_const->template getValue<SecondArgValueType>(), col_to->getData(), scale_a, scale_b, result_scale);
else
op.vectorVector(first_col->getData(), second_col->getData(), col_to->getData(), scale_a, scale_b, result_scale);
}
else if (first_col_const)
{
op.constantVector(first_col_const->template getValue<FirstArgValueType>(), second_col->getData(), col_to->getData(), scale_a, scale_b, result_scale);
}
else
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of first argument of function {}",
arguments[0].column->getName(), Transform::name);
}
return result_col;
}
};
template <typename Transform>
class FunctionsDecimalArithmetics : public IFunction
{
public:
static constexpr auto name = Transform::name;
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionsDecimalArithmetics>(); }
String getName() const override
{
return name;
}
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() != 2 && arguments.size() != 3)
throw Exception("Number of arguments for function " + getName() + " does not match: 2 or 3 expected",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (!isDecimal(arguments[0].type) || !isDecimal(arguments[1].type))
throw Exception("Arguments for " + getName() + " function must be Decimal", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
UInt8 scale = std::max(getDecimalScale(*arguments[0].type->getPtr()), getDecimalScale(*arguments[1].type->getPtr()));
if (arguments.size() == 3)
{
WhichDataType which_scale(arguments[2].type.get());
if (!which_scale.isUInt8())
throw Exception(
"Illegal type " + arguments[2].type->getName() + " of third argument of function " + getName()
+ ". Should be constant UInt8 from range[0, 76]",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
const ColumnConst * scale_column = checkAndGetColumnConst<ColumnUInt8>(arguments[2].column.get());
if (!scale_column)
throw Exception(
"Illegal column of third argument of function " + getName() + ". Should be constant UInt8",
ErrorCodes::ILLEGAL_COLUMN);
scale = scale_column->getValue<UInt8>();
}
/**
At compile time, result is unknown. We only know the Scale (number of fractional digits) at runtime.
Also nothing is known about size of whole part.
As in simple division/multiplication for decimals, we scale the result up, but is is explicit here and no downscale is performed.
It guarantees that result will have given scale and it can also be MANUALLY converted to other decimal types later.
**/
if (scale > DecimalUtils::max_precision<Decimal256>)
throw Exception("Illegal value of third argument of function " + this->getName() + ": must be integer in range [0, 76]",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeDecimal256>(DecimalUtils::max_precision<Decimal256>, scale);
}
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {2}; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t /*input_rows_count*/) const override
{
return resolveOverload(arguments, result_type);
}
private:
//long resolver to call proper templated func
ColumnPtr resolveOverload(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type) const
{
WhichDataType which_dividend(arguments[0].type.get());
WhichDataType which_divisor(arguments[1].type.get());
if (which_dividend.isDecimal32())
{
using DividendType = DataTypeDecimal32;
if (which_divisor.isDecimal32())
return DecimalArithmeticsImpl<DividendType, DataTypeDecimal32, DataTypeDecimal256, Transform>::execute(Transform{}, arguments, result_type);
else if (which_divisor.isDecimal64())
return DecimalArithmeticsImpl<DividendType, DataTypeDecimal64, DataTypeDecimal256, Transform>::execute(Transform{}, arguments, result_type);
else if (which_divisor.isDecimal128())
return DecimalArithmeticsImpl<DividendType, DataTypeDecimal128, DataTypeDecimal256, Transform>::execute(Transform{}, arguments, result_type);
else if (which_divisor.isDecimal256())
return DecimalArithmeticsImpl<DividendType, DataTypeDecimal256, DataTypeDecimal256, Transform>::execute(Transform{}, arguments, result_type);
}
else if (which_dividend.isDecimal64())
{
using DividendType = DataTypeDecimal64;
if (which_divisor.isDecimal32())
return DecimalArithmeticsImpl<DividendType, DataTypeDecimal32, DataTypeDecimal256, Transform>::execute(Transform{}, arguments, result_type);
else if (which_divisor.isDecimal64())
return DecimalArithmeticsImpl<DividendType, DataTypeDecimal64, DataTypeDecimal256, Transform>::execute(Transform{}, arguments, result_type);
else if (which_divisor.isDecimal128())
return DecimalArithmeticsImpl<DividendType, DataTypeDecimal128, DataTypeDecimal256, Transform>::execute(Transform{}, arguments, result_type);
else if (which_divisor.isDecimal256())
return DecimalArithmeticsImpl<DividendType, DataTypeDecimal256, DataTypeDecimal256, Transform>::execute(Transform{}, arguments, result_type);
}
else if (which_dividend.isDecimal128())
{
using DividendType = DataTypeDecimal128;
if (which_divisor.isDecimal32())
return DecimalArithmeticsImpl<DividendType, DataTypeDecimal32, DataTypeDecimal256, Transform>::execute(Transform{}, arguments, result_type);
else if (which_divisor.isDecimal64())
return DecimalArithmeticsImpl<DividendType, DataTypeDecimal64, DataTypeDecimal256, Transform>::execute(Transform{}, arguments, result_type);
else if (which_divisor.isDecimal128())
return DecimalArithmeticsImpl<DividendType, DataTypeDecimal128, DataTypeDecimal256, Transform>::execute(Transform{}, arguments, result_type);
else if (which_divisor.isDecimal256())
return DecimalArithmeticsImpl<DividendType, DataTypeDecimal256, DataTypeDecimal256, Transform>::execute(Transform{}, arguments, result_type);
}
else if (which_dividend.isDecimal256())
{
using DividendType = DataTypeDecimal256;
if (which_divisor.isDecimal32())
return DecimalArithmeticsImpl<DividendType, DataTypeDecimal32, DataTypeDecimal256, Transform>::execute(Transform{}, arguments, result_type);
else if (which_divisor.isDecimal64())
return DecimalArithmeticsImpl<DividendType, DataTypeDecimal64, DataTypeDecimal256, Transform>::execute(Transform{}, arguments, result_type);
else if (which_divisor.isDecimal128())
return DecimalArithmeticsImpl<DividendType, DataTypeDecimal128, DataTypeDecimal256, Transform>::execute(Transform{}, arguments, result_type);
else if (which_divisor.isDecimal256())
return DecimalArithmeticsImpl<DividendType, DataTypeDecimal256, DataTypeDecimal256, Transform>::execute(Transform{}, arguments, result_type);
}
// the compiler is happy now
return nullptr;
}
};
}

View File

@ -39,6 +39,13 @@ REGISTER_FUNCTION(Hashing)
factory.registerFunction<FunctionXxHash32>();
factory.registerFunction<FunctionXxHash64>();
factory.registerFunction<FunctionXXH3>(
{
"Calculates value of XXH3 64-bit hash function. Refer to https://github.com/Cyan4973/xxHash for detailed documentation.",
Documentation::Examples{{"hash", "SELECT xxh3('ClickHouse')"}},
Documentation::Categories{"Hash"}
},
FunctionFactory::CaseSensitive);
factory.registerFunction<FunctionWyHash64>();

View File

@ -3,12 +3,18 @@
#include <city.h>
#include <farmhash.h>
#include <metrohash.h>
#include <wyhash.h>
#include <MurmurHash2.h>
#include <MurmurHash3.h>
#include <wyhash.h>
#include "config.h"
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wused-but-marked-unused"
#endif
#include <xxhash.h>
#if USE_BLAKE3
# include <blake3.h>
#endif
@ -17,7 +23,6 @@
#include <Common/typeid_cast.h>
#include <Common/safe_cast.h>
#include <Common/HashTable/Hash.h>
#include <xxhash.h>
#if USE_SSL
# include <openssl/md4.h>
@ -588,7 +593,7 @@ struct ImplXxHash32
static constexpr auto name = "xxHash32";
using ReturnType = UInt32;
static auto apply(const char * s, const size_t len) { return XXH32(s, len, 0); }
static auto apply(const char * s, const size_t len) { return XXH_INLINE_XXH32(s, len, 0); }
/**
* With current implementation with more than 1 arguments it will give the results
* non-reproducible from outside of CH.
@ -609,7 +614,24 @@ struct ImplXxHash64
using ReturnType = UInt64;
using uint128_t = CityHash_v1_0_2::uint128;
static auto apply(const char * s, const size_t len) { return XXH64(s, len, 0); }
static auto apply(const char * s, const size_t len) { return XXH_INLINE_XXH64(s, len, 0); }
/*
With current implementation with more than 1 arguments it will give the results
non-reproducible from outside of CH. (see comment on ImplXxHash32).
*/
static auto combineHashes(UInt64 h1, UInt64 h2) { return CityHash_v1_0_2::Hash128to64(uint128_t(h1, h2)); }
static constexpr bool use_int_hash_for_pods = false;
};
struct ImplXXH3
{
static constexpr auto name = "xxh3";
using ReturnType = UInt64;
using uint128_t = CityHash_v1_0_2::uint128;
static auto apply(const char * s, const size_t len) { return XXH_INLINE_XXH3_64bits(s, len); }
/*
With current implementation with more than 1 arguments it will give the results
@ -1508,7 +1530,12 @@ using FunctionHiveHash = FunctionAnyHash<HiveHashImpl>;
using FunctionXxHash32 = FunctionAnyHash<ImplXxHash32>;
using FunctionXxHash64 = FunctionAnyHash<ImplXxHash64>;
using FunctionXXH3 = FunctionAnyHash<ImplXXH3>;
using FunctionWyHash64 = FunctionAnyHash<ImplWyHash64>;
using FunctionBLAKE3 = FunctionStringHashFixedString<ImplBLAKE3>;
}
#ifdef __clang__
# pragma clang diagnostic pop
#endif

View File

@ -48,15 +48,22 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
extern const int UNKNOWN_EXCEPTION;
extern const int UNKNOWN_FORMAT;
extern const int BAD_ARGUMENTS;
}
AsynchronousInsertQueue::InsertQuery::InsertQuery(const ASTPtr & query_, const Settings & settings_)
: query(query_->clone()), settings(settings_)
: query(query_->clone())
, query_str(queryToString(query))
, settings(settings_)
, hash(calculateHash())
{
}
AsynchronousInsertQueue::InsertQuery::InsertQuery(const InsertQuery & other)
: query(other.query->clone()), settings(other.settings)
: query(other.query->clone())
, query_str(other.query_str)
, settings(other.settings)
, hash(other.hash)
{
}
@ -66,29 +73,33 @@ AsynchronousInsertQueue::InsertQuery::operator=(const InsertQuery & other)
if (this != &other)
{
query = other.query->clone();
query_str = other.query_str;
settings = other.settings;
hash = other.hash;
}
return *this;
}
UInt64 AsynchronousInsertQueue::InsertQuery::Hash::operator()(const InsertQuery & insert_query) const
UInt128 AsynchronousInsertQueue::InsertQuery::calculateHash() const
{
SipHash hash;
insert_query.query->updateTreeHash(hash);
SipHash siphash;
query->updateTreeHash(siphash);
for (const auto & setting : insert_query.settings.allChanged())
for (const auto & setting : settings.allChanged())
{
hash.update(setting.getName());
applyVisitor(FieldVisitorHash(hash), setting.getValue());
siphash.update(setting.getName());
applyVisitor(FieldVisitorHash(siphash), setting.getValue());
}
return hash.get64();
UInt128 res;
siphash.get128(res);
return res;
}
bool AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other) const
{
return queryToString(query) == queryToString(other.query) && settings == other.settings;
return query_str == other.query_str && settings == other.settings;
}
AsynchronousInsertQueue::InsertData::Entry::Entry(String && bytes_, String && query_id_)
@ -100,43 +111,31 @@ AsynchronousInsertQueue::InsertData::Entry::Entry(String && bytes_, String && qu
void AsynchronousInsertQueue::InsertData::Entry::finish(std::exception_ptr exception_)
{
std::lock_guard lock(mutex);
finished = true;
if (finished.exchange(true))
return;
if (exception_)
{
promise.set_exception(exception_);
ProfileEvents::increment(ProfileEvents::FailedAsyncInsertQuery, 1);
exception = exception_;
cv.notify_all();
}
else
{
promise.set_value();
}
}
bool AsynchronousInsertQueue::InsertData::Entry::wait(const Milliseconds & timeout) const
{
std::unique_lock lock(mutex);
return cv.wait_for(lock, timeout, [&] { return finished; });
}
bool AsynchronousInsertQueue::InsertData::Entry::isFinished() const
{
std::lock_guard lock(mutex);
return finished;
}
std::exception_ptr AsynchronousInsertQueue::InsertData::Entry::getException() const
{
std::lock_guard lock(mutex);
return exception;
}
AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, Milliseconds cleanup_timeout_)
AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_)
: WithContext(context_)
, cleanup_timeout(cleanup_timeout_)
, pool_size(pool_size_)
, queue_shards(pool_size)
, pool(pool_size)
, dump_by_first_update_thread(&AsynchronousInsertQueue::busyCheck, this)
, cleanup_thread(&AsynchronousInsertQueue::cleanup, this)
{
using namespace std::chrono;
if (!pool_size)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "pool_size cannot be zero");
assert(pool_size);
for (size_t i = 0; i < pool_size; ++i)
dump_by_first_update_threads.emplace_back([this, i] { processBatchDeadlines(i); });
}
AsynchronousInsertQueue::~AsynchronousInsertQueue()
@ -144,34 +143,31 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
/// TODO: add a setting for graceful shutdown.
LOG_TRACE(log, "Shutting down the asynchronous insertion queue");
shutdown = true;
{
std::lock_guard lock(deadline_mutex);
are_tasks_available.notify_one();
}
{
std::lock_guard lock(cleanup_mutex);
cleanup_can_run.notify_one();
}
assert(dump_by_first_update_thread.joinable());
dump_by_first_update_thread.join();
for (size_t i = 0; i < pool_size; ++i)
{
auto & shard = queue_shards[i];
assert(cleanup_thread.joinable());
cleanup_thread.join();
shard.are_tasks_available.notify_one();
assert(dump_by_first_update_threads[i].joinable());
dump_by_first_update_threads[i].join();
{
std::lock_guard lock(shard.mutex);
for (auto & [_, elem] : shard.queue)
{
for (const auto & entry : elem.data->entries)
{
entry->finish(std::make_exception_ptr(Exception(
ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout exceeded)")));
}
}
}
}
pool.wait();
std::lock_guard lock(currently_processing_mutex);
for (const auto & [_, entry] : currently_processing_queries)
{
if (!entry->isFinished())
entry->finish(std::make_exception_ptr(Exception(
ErrorCodes::TIMEOUT_EXCEEDED,
"Wait for async insert timeout exceeded)")));
}
LOG_TRACE(log, "Asynchronous insertion queue finished");
}
@ -185,7 +181,7 @@ void AsynchronousInsertQueue::scheduleDataProcessingJob(const InsertQuery & key,
});
}
void AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
std::future<void> AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
{
query = query->clone();
const auto & settings = query_context->getSettingsRef();
@ -214,97 +210,77 @@ void AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
quota->used(QuotaType::WRITTEN_BYTES, bytes.size());
auto entry = std::make_shared<InsertData::Entry>(std::move(bytes), query_context->getCurrentQueryId());
InsertQuery key{query, settings};
InsertDataPtr data_to_process;
std::future<void> insert_future;
auto shard_num = key.hash % pool_size;
auto & shard = queue_shards[shard_num];
{
/// Firstly try to get entry from queue without exclusive lock.
std::shared_lock read_lock(rwlock);
if (auto it = queue.find(key); it != queue.end())
std::lock_guard lock(shard.mutex);
auto [it, inserted] = shard.iterators.try_emplace(key.hash);
if (inserted)
{
pushImpl(std::move(entry), it);
return;
auto now = std::chrono::steady_clock::now();
auto timeout = now + Milliseconds{key.settings.async_insert_busy_timeout_ms};
it->second = shard.queue.emplace(timeout, Container{key, std::make_unique<InsertData>()}).first;
}
auto queue_it = it->second;
auto & data = queue_it->second.data;
size_t entry_data_size = entry->bytes.size();
assert(data);
data->size_in_bytes += entry_data_size;
data->entries.emplace_back(entry);
insert_future = entry->getFuture();
LOG_TRACE(log, "Have {} pending inserts with total {} bytes of data for query '{}'",
data->entries.size(), data->size_in_bytes, key.query_str);
/// Here we check whether we hit the limit on maximum data size in the buffer.
/// And use setting from query context.
/// It works, because queries with the same set of settings are already grouped together.
if (data->size_in_bytes > key.settings.async_insert_max_data_size)
{
data_to_process = std::move(data);
shard.iterators.erase(it);
shard.queue.erase(queue_it);
}
CurrentMetrics::add(CurrentMetrics::PendingAsyncInsert);
ProfileEvents::increment(ProfileEvents::AsyncInsertQuery);
ProfileEvents::increment(ProfileEvents::AsyncInsertBytes, entry_data_size);
}
std::lock_guard write_lock(rwlock);
auto it = queue.emplace(key, std::make_shared<Container>()).first;
pushImpl(std::move(entry), it);
if (data_to_process)
scheduleDataProcessingJob(key, std::move(data_to_process), getContext());
else
shard.are_tasks_available.notify_one();
return insert_future;
}
void AsynchronousInsertQueue::pushImpl(InsertData::EntryPtr entry, QueueIterator it)
void AsynchronousInsertQueue::processBatchDeadlines(size_t shard_num)
{
auto & [data_mutex, data] = *it->second;
std::lock_guard data_lock(data_mutex);
auto & shard = queue_shards[shard_num];
if (!data)
{
auto now = std::chrono::steady_clock::now();
data = std::make_unique<InsertData>(now);
std::lock_guard lock(deadline_mutex);
deadline_queue.insert({now + Milliseconds{it->first.settings.async_insert_busy_timeout_ms}, it});
are_tasks_available.notify_one();
}
size_t entry_data_size = entry->bytes.size();
data->size += entry_data_size;
data->entries.emplace_back(entry);
{
std::lock_guard currently_processing_lock(currently_processing_mutex);
currently_processing_queries.emplace(entry->query_id, entry);
}
LOG_TRACE(log, "Have {} pending inserts with total {} bytes of data for query '{}'",
data->entries.size(), data->size, queryToString(it->first.query));
/// Here we check whether we hit the limit on maximum data size in the buffer.
/// And use setting from query context!
/// It works, because queries with the same set of settings are already grouped together.
if (data->size > it->first.settings.async_insert_max_data_size)
scheduleDataProcessingJob(it->first, std::move(data), getContext());
CurrentMetrics::add(CurrentMetrics::PendingAsyncInsert);
ProfileEvents::increment(ProfileEvents::AsyncInsertQuery);
ProfileEvents::increment(ProfileEvents::AsyncInsertBytes, entry_data_size);
}
void AsynchronousInsertQueue::waitForProcessingQuery(const String & query_id, const Milliseconds & timeout)
{
InsertData::EntryPtr entry;
{
std::lock_guard lock(currently_processing_mutex);
auto it = currently_processing_queries.find(query_id);
if (it == currently_processing_queries.end())
return;
entry = it->second;
}
bool finished = entry->wait(timeout);
if (!finished)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout ({} ms) exceeded)", timeout.count());
if (auto exception = entry->getException())
std::rethrow_exception(exception);
}
void AsynchronousInsertQueue::busyCheck()
{
while (!shutdown)
{
std::vector<QueueIterator> entries_to_flush;
std::vector<Container> entries_to_flush;
{
std::unique_lock deadline_lock(deadline_mutex);
are_tasks_available.wait_for(deadline_lock, Milliseconds(getContext()->getSettingsRef().async_insert_busy_timeout_ms), [this]()
std::unique_lock lock(shard.mutex);
shard.are_tasks_available.wait_for(lock,
Milliseconds(getContext()->getSettingsRef().async_insert_busy_timeout_ms), [&shard, this]
{
if (shutdown)
return true;
if (!deadline_queue.empty() && deadline_queue.begin()->first < std::chrono::steady_clock::now())
if (!shard.queue.empty() && shard.queue.begin()->first < std::chrono::steady_clock::now())
return true;
return false;
@ -317,91 +293,22 @@ void AsynchronousInsertQueue::busyCheck()
while (true)
{
if (deadline_queue.empty() || deadline_queue.begin()->first > now)
if (shard.queue.empty() || shard.queue.begin()->first > now)
break;
entries_to_flush.emplace_back(deadline_queue.begin()->second);
deadline_queue.erase(deadline_queue.begin());
auto it = shard.queue.begin();
shard.iterators.erase(it->second.key.hash);
entries_to_flush.emplace_back(std::move(it->second));
shard.queue.erase(it);
}
}
std::shared_lock read_lock(rwlock);
for (auto & entry : entries_to_flush)
{
auto & [key, elem] = *entry;
std::lock_guard data_lock(elem->mutex);
if (!elem->data)
continue;
scheduleDataProcessingJob(key, std::move(elem->data), getContext());
}
scheduleDataProcessingJob(entry.key, std::move(entry.data), getContext());
}
}
void AsynchronousInsertQueue::cleanup()
{
while (true)
{
{
std::unique_lock cleanup_lock(cleanup_mutex);
cleanup_can_run.wait_for(cleanup_lock, Milliseconds(cleanup_timeout), [this]() -> bool { return shutdown; });
if (shutdown)
return;
}
std::vector<InsertQuery> keys_to_remove;
{
std::shared_lock read_lock(rwlock);
for (auto & [key, elem] : queue)
{
std::lock_guard data_lock(elem->mutex);
if (!elem->data)
keys_to_remove.push_back(key);
}
}
if (!keys_to_remove.empty())
{
std::lock_guard write_lock(rwlock);
size_t total_removed = 0;
for (const auto & key : keys_to_remove)
{
auto it = queue.find(key);
if (it != queue.end() && !it->second->data)
{
queue.erase(it);
++total_removed;
}
}
if (total_removed)
LOG_TRACE(log, "Removed stale entries for {} queries from asynchronous insertion queue", total_removed);
}
{
std::vector<String> ids_to_remove;
std::lock_guard lock(currently_processing_mutex);
for (const auto & [query_id, entry] : currently_processing_queries)
if (entry->isFinished())
ids_to_remove.push_back(query_id);
if (!ids_to_remove.empty())
{
for (const auto & id : ids_to_remove)
currently_processing_queries.erase(id);
LOG_TRACE(log, "Removed {} finished entries from asynchronous insertion queue", ids_to_remove.size());
}
}
}
}
static void appendElementsToLogSafe(
AsynchronousInsertLog & log,
std::vector<AsynchronousInsertLogElement> elements,
@ -464,7 +371,7 @@ try
{
current_exception = e.displayText();
LOG_ERROR(log, "Failed parsing for query '{}' with query id {}. {}",
queryToString(key.query), current_entry->query_id, current_exception);
key.query_str, current_entry->query_id, current_exception);
for (const auto & column : result_columns)
if (column->size() > total_rows)
@ -546,7 +453,7 @@ try
completed_executor.execute();
LOG_INFO(log, "Flushed {} rows, {} bytes for query '{}'",
total_rows, total_bytes, queryToString(key.query));
total_rows, total_bytes, key.query_str);
}
catch (...)
{

View File

@ -4,10 +4,7 @@
#include <Common/ThreadPool.h>
#include <Core/Settings.h>
#include <Poco/Logger.h>
#include <atomic>
#include <unordered_map>
#include <future>
namespace DB
{
@ -19,25 +16,29 @@ class AsynchronousInsertQueue : public WithContext
public:
using Milliseconds = std::chrono::milliseconds;
AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, Milliseconds cleanup_timeout);
AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_);
~AsynchronousInsertQueue();
void push(ASTPtr query, ContextPtr query_context);
void waitForProcessingQuery(const String & query_id, const Milliseconds & timeout);
std::future<void> push(ASTPtr query, ContextPtr query_context);
size_t getPoolSize() const { return pool_size; }
private:
struct InsertQuery
{
public:
ASTPtr query;
String query_str;
Settings settings;
UInt128 hash;
InsertQuery(const ASTPtr & query_, const Settings & settings_);
InsertQuery(const InsertQuery & other);
InsertQuery & operator=(const InsertQuery & other);
bool operator==(const InsertQuery & other) const;
struct Hash { UInt64 operator()(const InsertQuery & insert_query) const; };
private:
UInt128 calculateHash() const;
};
struct InsertData
@ -47,109 +48,84 @@ private:
public:
const String bytes;
const String query_id;
std::chrono::time_point<std::chrono::system_clock> create_time;
const std::chrono::time_point<std::chrono::system_clock> create_time;
Entry(String && bytes_, String && query_id_);
void finish(std::exception_ptr exception_ = nullptr);
bool wait(const Milliseconds & timeout) const;
bool isFinished() const;
std::exception_ptr getException() const;
std::future<void> getFuture() { return promise.get_future(); }
bool isFinished() const { return finished; }
private:
mutable std::mutex mutex;
mutable std::condition_variable cv;
bool finished = false;
std::exception_ptr exception;
std::promise<void> promise;
std::atomic_bool finished = false;
};
explicit InsertData(std::chrono::steady_clock::time_point now)
: first_update(now)
{}
using EntryPtr = std::shared_ptr<Entry>;
std::list<EntryPtr> entries;
size_t size = 0;
/// Timestamp of the first insert into queue, or after the last queue dump.
/// Used to detect for how long the queue is active, so we can dump it by timer.
std::chrono::time_point<std::chrono::steady_clock> first_update;
size_t size_in_bytes = 0;
};
using InsertDataPtr = std::unique_ptr<InsertData>;
/// A separate container, that holds a data and a mutex for it.
/// When it's needed to process current chunk of data, it can be moved for processing
/// and new data can be recreated without holding a lock during processing.
struct Container
{
std::mutex mutex;
InsertQuery key;
InsertDataPtr data;
};
using Queue = std::unordered_map<InsertQuery, std::shared_ptr<Container>, InsertQuery::Hash>;
using QueueIterator = Queue::iterator;
/// Ordered container
using DeadlineQueue = std::map<std::chrono::steady_clock::time_point, QueueIterator>;
/// Key is a timestamp of the first insert into batch.
/// Used to detect for how long the batch is active, so we can dump it by timer.
using Queue = std::map<std::chrono::steady_clock::time_point, Container>;
using QueueIterator = Queue::iterator;
using QueueIteratorByKey = std::unordered_map<UInt128, QueueIterator>;
struct QueueShard
{
mutable std::mutex mutex;
mutable std::condition_variable are_tasks_available;
mutable std::shared_mutex rwlock;
Queue queue;
Queue queue;
QueueIteratorByKey iterators;
};
/// This is needed only for using inside cleanup() function and correct signaling about shutdown
mutable std::mutex cleanup_mutex;
mutable std::condition_variable cleanup_can_run;
mutable std::mutex deadline_mutex;
mutable std::condition_variable are_tasks_available;
DeadlineQueue deadline_queue;
using QueryIdToEntry = std::unordered_map<String, InsertData::EntryPtr>;
mutable std::mutex currently_processing_mutex;
QueryIdToEntry currently_processing_queries;
const size_t pool_size;
std::vector<QueueShard> queue_shards;
/// Logic and events behind queue are as follows:
/// - busy_timeout: if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't
/// grow for a long period of time and users will be able to select new data in deterministic manner.
/// - stale_timeout: if queue is stale for too long, then we dump the data too, so that users will be able to select the last
/// piece of inserted data.
/// - async_insert_busy_timeout_ms:
/// if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't
/// grow for a long period of time and users will be able to select new data in deterministic manner.
///
/// During processing incoming INSERT queries we can also check whether the maximum size of data in buffer is reached (async_insert_max_data_size setting)
/// If so, then again we dump the data.
const Milliseconds cleanup_timeout;
/// During processing incoming INSERT queries we can also check whether the maximum size of data in buffer is reached
/// (async_insert_max_data_size setting). If so, then again we dump the data.
std::atomic<bool> shutdown{false};
ThreadPool pool; /// dump the data only inside this pool.
ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck()
ThreadFromGlobalPool cleanup_thread; /// uses busy_timeout and cleanup()
/// Dump the data only inside this pool.
ThreadPool pool;
/// Uses async_insert_busy_timeout_ms and processBatchDeadlines()
std::vector<ThreadFromGlobalPool> dump_by_first_update_threads;
Poco::Logger * log = &Poco::Logger::get("AsynchronousInsertQueue");
void busyCheck();
void cleanup();
/// Should be called with shared or exclusively locked 'rwlock'.
void pushImpl(InsertData::EntryPtr entry, QueueIterator it);
void processBatchDeadlines(size_t shard_num);
void scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context);
static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context);
template <typename E>
static void finishWithException(const ASTPtr & query, const std::list<InsertData::EntryPtr> & entries, const E & exception);
/// @param timeout - time to wait
/// @return true if shutdown requested
bool waitForShutdown(const Milliseconds & timeout);
public:
auto getQueueLocked() const
auto getQueueLocked(size_t shard_num) const
{
std::shared_lock lock(rwlock);
return std::make_pair(std::ref(queue), std::move(lock));
auto & shard = queue_shards[shard_num];
std::unique_lock lock(shard.mutex);
return std::make_pair(std::ref(shard.queue), std::move(lock));
}
};

View File

@ -592,13 +592,12 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
quota->checkExceeded(QuotaType::ERRORS);
}
queue->push(ast, context);
auto insert_future = queue->push(ast, context);
if (settings.wait_for_async_insert)
{
auto timeout = settings.wait_for_async_insert_timeout.totalMilliseconds();
auto query_id = context->getCurrentQueryId();
auto source = std::make_shared<WaitForAsyncInsertSource>(query_id, timeout, *queue);
auto source = std::make_shared<WaitForAsyncInsertSource>(std::move(insert_future), timeout);
res.pipeline = QueryPipeline(Pipe(std::move(source)));
}

View File

@ -324,14 +324,31 @@ static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr<arrow::ChunkedAr
ColumnArray::Offsets & offsets_data = assert_cast<ColumnVector<UInt64> &>(*offsets_column).getData();
offsets_data.reserve(arrow_column->length());
uint64_t start_offset = 0u;
for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
{
arrow::ListArray & list_chunk = dynamic_cast<arrow::ListArray &>(*(arrow_column->chunk(chunk_i)));
auto arrow_offsets_array = list_chunk.offsets();
auto & arrow_offsets = dynamic_cast<arrow::Int32Array &>(*arrow_offsets_array);
auto start = offsets_data.back();
/*
* It seems like arrow::ListArray::values() (nested column data) might or might not be shared across chunks.
* When it is shared, the offsets will be monotonically increasing. Otherwise, the offsets will be zero based.
* In order to account for both cases, the starting offset is updated whenever a zero-based offset is found.
* More info can be found in: https://lists.apache.org/thread/rrwfb9zo2dc58dhd9rblf20xd7wmy7jm and
* https://github.com/ClickHouse/ClickHouse/pull/43297
* */
if (list_chunk.offset() == 0)
{
start_offset = offsets_data.back();
}
for (int64_t i = 1; i < arrow_offsets.length(); ++i)
offsets_data.emplace_back(start + arrow_offsets.Value(i));
{
auto offset = arrow_offsets.Value(i);
offsets_data.emplace_back(start_offset + offset);
}
}
return offsets_column;
}
@ -467,8 +484,23 @@ static std::shared_ptr<arrow::ChunkedArray> getNestedArrowColumn(std::shared_ptr
for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
{
arrow::ListArray & list_chunk = dynamic_cast<arrow::ListArray &>(*(arrow_column->chunk(chunk_i)));
std::shared_ptr<arrow::Array> chunk = list_chunk.values();
array_vector.emplace_back(std::move(chunk));
/*
* It seems like arrow::ListArray::values() (nested column data) might or might not be shared across chunks.
* Therefore, simply appending arrow::ListArray::values() could lead to duplicated data to be appended.
* To properly handle this, arrow::ListArray::values() needs to be sliced based on the chunk offsets.
* arrow::ListArray::Flatten does that. More info on: https://lists.apache.org/thread/rrwfb9zo2dc58dhd9rblf20xd7wmy7jm and
* https://github.com/ClickHouse/ClickHouse/pull/43297
* */
auto flatten_result = list_chunk.Flatten();
if (flatten_result.ok())
{
array_vector.emplace_back(flatten_result.ValueOrDie());
}
else
{
throw Exception(ErrorCodes::INCORRECT_DATA, "Failed to flatten chunk '{}' of column of type '{}' ", chunk_i, arrow_column->type()->id());
}
}
return std::make_shared<arrow::ChunkedArray>(array_vector);
}

View File

@ -6,18 +6,24 @@
namespace DB
{
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
extern const int LOGICAL_ERROR;
}
/// Source, that allow to wait until processing of
/// asynchronous insert for specified query_id will be finished.
class WaitForAsyncInsertSource : public ISource, WithContext
{
public:
WaitForAsyncInsertSource(
const String & query_id_, size_t timeout_ms_, AsynchronousInsertQueue & queue_)
std::future<void> insert_future_, size_t timeout_ms_)
: ISource(Block())
, query_id(query_id_)
, insert_future(std::move(insert_future_))
, timeout_ms(timeout_ms_)
, queue(queue_)
{
assert(insert_future.valid());
}
String getName() const override { return "WaitForAsyncInsert"; }
@ -25,14 +31,20 @@ public:
protected:
Chunk generate() override
{
queue.waitForProcessingQuery(query_id, std::chrono::milliseconds(timeout_ms));
auto status = insert_future.wait_for(std::chrono::milliseconds(timeout_ms));
if (status == std::future_status::deferred)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: got future in deferred state");
if (status == std::future_status::timeout)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout ({} ms) exceeded)", timeout_ms);
insert_future.get();
return Chunk();
}
private:
String query_id;
std::future<void> insert_future;
size_t timeout_ms;
AsynchronousInsertQueue & queue;
};
}

View File

@ -130,7 +130,7 @@ namespace ErrorCodes
extern const int NO_ZOOKEEPER;
extern const int INCORRECT_DATA;
extern const int INCOMPATIBLE_COLUMNS;
extern const int REPLICA_IS_ALREADY_EXIST;
extern const int REPLICA_ALREADY_EXISTS;
extern const int NO_REPLICA_HAS_PART;
extern const int LOGICAL_ERROR;
extern const int TOO_MANY_UNEXPECTED_DATA_PARTS;
@ -778,7 +778,7 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
/// Do not use LOGICAL_ERROR code, because it may happen if user has specified wrong zookeeper_path
throw Exception("Cannot create table, because it is created concurrently every time "
"or because of wrong zookeeper_path "
"or because of logical error", ErrorCodes::REPLICA_IS_ALREADY_EXIST);
"or because of logical error", ErrorCodes::REPLICA_ALREADY_EXISTS);
}
void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metadata_snapshot)
@ -842,7 +842,7 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
switch (code)
{
case Coordination::Error::ZNODEEXISTS:
throw Exception(ErrorCodes::REPLICA_IS_ALREADY_EXIST, "Replica {} already exists", replica_path);
throw Exception(ErrorCodes::REPLICA_ALREADY_EXISTS, "Replica {} already exists", replica_path);
case Coordination::Error::ZBADVERSION:
LOG_ERROR(log, "Retrying createReplica(), because some other replicas were created at the same time");
break;

View File

@ -27,8 +27,6 @@ NamesAndTypesList StorageSystemAsynchronousInserts::getNamesAndTypes()
{"total_bytes", std::make_shared<DataTypeUInt64>()},
{"entries.query_id", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"entries.bytes", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())},
{"entries.finished", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt8>())},
{"entries.exception", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
};
}
@ -40,78 +38,56 @@ void StorageSystemAsynchronousInserts::fillData(MutableColumns & res_columns, Co
if (!insert_queue)
return;
auto [queue, queue_lock] = insert_queue->getQueueLocked();
for (const auto & [key, elem] : queue)
for (size_t shard_num = 0; shard_num < insert_queue->getPoolSize(); ++shard_num)
{
std::lock_guard elem_lock(elem->mutex);
auto [queue, queue_lock] = insert_queue->getQueueLocked(shard_num);
if (!elem->data)
continue;
auto time_in_microseconds = [](const time_point<steady_clock> & timestamp)
for (const auto & [first_update, elem] : queue)
{
auto time_diff = duration_cast<microseconds>(steady_clock::now() - timestamp);
auto time_us = (system_clock::now() - time_diff).time_since_epoch().count();
const auto & [key, data] = elem;
DecimalUtils::DecimalComponents<DateTime64> components{time_us / 1'000'000, time_us % 1'000'000};
return DecimalField(DecimalUtils::decimalFromComponents<DateTime64>(components, TIME_SCALE), TIME_SCALE);
};
const auto & insert_query = key.query->as<const ASTInsertQuery &>();
size_t i = 0;
res_columns[i++]->insert(queryToString(insert_query));
/// If query is "INSERT INTO FUNCTION" then table_id is empty.
if (insert_query.table_id)
{
res_columns[i++]->insert(insert_query.table_id.getDatabaseName());
res_columns[i++]->insert(insert_query.table_id.getTableName());
}
else
{
res_columns[i++]->insertDefault();
res_columns[i++]->insertDefault();
}
res_columns[i++]->insert(insert_query.format);
res_columns[i++]->insert(time_in_microseconds(elem->data->first_update));
res_columns[i++]->insert(elem->data->size);
Array arr_query_id;
Array arr_bytes;
Array arr_finished;
Array arr_exception;
for (const auto & entry : elem->data->entries)
{
arr_query_id.push_back(entry->query_id);
arr_bytes.push_back(entry->bytes.size());
arr_finished.push_back(entry->isFinished());
if (auto exception = entry->getException())
auto time_in_microseconds = [](const time_point<steady_clock> & timestamp)
{
try
{
std::rethrow_exception(exception);
}
catch (const Exception & e)
{
arr_exception.push_back(e.displayText());
}
catch (...)
{
arr_exception.push_back("Unknown exception");
}
auto time_diff = duration_cast<microseconds>(steady_clock::now() - timestamp);
auto time_us = (system_clock::now() - time_diff).time_since_epoch().count();
DecimalUtils::DecimalComponents<DateTime64> components{time_us / 1'000'000, time_us % 1'000'000};
return DecimalField(DecimalUtils::decimalFromComponents<DateTime64>(components, TIME_SCALE), TIME_SCALE);
};
const auto & insert_query = key.query->as<const ASTInsertQuery &>();
size_t i = 0;
res_columns[i++]->insert(queryToString(insert_query));
/// If query is "INSERT INTO FUNCTION" then table_id is empty.
if (insert_query.table_id)
{
res_columns[i++]->insert(insert_query.table_id.getDatabaseName());
res_columns[i++]->insert(insert_query.table_id.getTableName());
}
else
arr_exception.push_back("");
}
{
res_columns[i++]->insertDefault();
res_columns[i++]->insertDefault();
}
res_columns[i++]->insert(arr_query_id);
res_columns[i++]->insert(arr_bytes);
res_columns[i++]->insert(arr_finished);
res_columns[i++]->insert(arr_exception);
res_columns[i++]->insert(insert_query.format);
res_columns[i++]->insert(time_in_microseconds(first_update));
res_columns[i++]->insert(data->size_in_bytes);
Array arr_query_id;
Array arr_bytes;
for (const auto & entry : data->entries)
{
arr_query_id.push_back(entry->query_id);
arr_bytes.push_back(entry->bytes.size());
}
res_columns[i++]->insert(arr_query_id);
res_columns[i++]->insert(arr_bytes);
}
}
}

View File

@ -11,6 +11,17 @@ import requests
import boto3
from botocore.exceptions import ClientError
UNIVERSAL_LABEL = "universal"
RUNNER_TYPE_LABELS = [
"builder",
"func-tester",
"func-tester-aarch64",
"fuzzer-unit-tester",
"stress-tester",
"style-checker",
"style-checker-aarch64",
]
def get_dead_runners_in_ec2(runners):
ids = {
@ -170,26 +181,23 @@ def list_runners(access_token):
def group_runners_by_tag(listed_runners):
result = {}
RUNNER_TYPE_LABELS = [
"builder",
"func-tester",
"func-tester-aarch64",
"fuzzer-unit-tester",
"stress-tester",
"style-checker",
"style-checker-aarch64",
]
def add_to_result(tag, runner):
if tag not in result:
result[tag] = []
result[tag].append(runner)
for runner in listed_runners:
if UNIVERSAL_LABEL in runner.tags:
# Do not proceed other labels if UNIVERSAL_LABEL is included
add_to_result(UNIVERSAL_LABEL, runner)
continue
for tag in runner.tags:
if tag in RUNNER_TYPE_LABELS:
if tag not in result:
result[tag] = []
result[tag].append(runner)
add_to_result(tag, runner)
break
else:
if "unlabeled" not in result:
result["unlabeled"] = []
result["unlabeled"].append(runner)
add_to_result("unlabeled", runner)
return result

View File

@ -0,0 +1 @@
../team_keys_lambda/build_and_deploy_archive.sh

View File

@ -441,11 +441,15 @@ def main():
result_images = {}
images_processing_result = []
additional_cache = ""
if pr_info.release_pr or pr_info.merged_pr:
additional_cache = str(pr_info.release_pr or pr_info.merged_pr)
for image in changed_images:
# If we are in backport PR, then pr_info.release_pr is defined
# We use it as tag to reduce rebuilding time
images_processing_result += process_image_with_parents(
image, image_versions, pr_info.release_pr, args.push
image, image_versions, additional_cache, args.push
)
result_images[image.repo] = result_version

View File

@ -42,11 +42,13 @@ def GITHUB_JOB_ID() -> str:
if _GITHUB_JOB_ID:
return _GITHUB_JOB_ID
jobs = []
page = 1
while not _GITHUB_JOB_ID:
response = get_with_retries(
f"https://api.github.com/repos/{GITHUB_REPOSITORY}/"
f"actions/runs/{GITHUB_RUN_ID}/jobs?per_page=100"
f"actions/runs/{GITHUB_RUN_ID}/jobs?per_page=100&page={page}"
)
page += 1
data = response.json()
jobs.extend(data["jobs"])
for job in data["jobs"]:
@ -55,7 +57,10 @@ def GITHUB_JOB_ID() -> str:
_GITHUB_JOB_ID = job["id"]
_GITHUB_JOB_URL = job["html_url"]
return _GITHUB_JOB_ID
if len(jobs) == data["total_count"]:
if (
len(jobs) >= data["total_count"] # just in case of inconsistency
or len(data["jobs"]) == 0 # if we excided pages
):
_GITHUB_JOB_ID = "0"
return _GITHUB_JOB_ID

View File

@ -1,13 +0,0 @@
FROM public.ecr.aws/lambda/python:3.9
# Install the function's dependencies using file requirements.txt
# from your project folder.
COPY requirements.txt .
RUN pip3 install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"
# Copy function code
COPY app.py ${LAMBDA_TASK_ROOT}
# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "app.handler" ]

View File

@ -64,6 +64,7 @@ def get_pr_for_commit(sha, ref):
class PRInfo:
default_event = {
"commits": 1,
"head_commit": {"message": "commit_message"},
"before": "HEAD~",
"after": "HEAD",
"ref": None,
@ -86,7 +87,9 @@ class PRInfo:
self.changed_files = set() # type: Set[str]
self.body = ""
self.diff_urls = []
# release_pr and merged_pr are used for docker images additional cache
self.release_pr = 0
self.merged_pr = 0
ref = github_event.get("ref", "refs/heads/master")
if ref and ref.startswith("refs/heads/"):
ref = ref[11:]
@ -158,6 +161,14 @@ class PRInfo:
self.diff_urls.append(github_event["pull_request"]["diff_url"])
elif "commits" in github_event:
# `head_commit` always comes with `commits`
commit_message = github_event["head_commit"]["message"]
if commit_message.startswith("Merge pull request #"):
merged_pr = commit_message.split(maxsplit=4)[3]
try:
self.merged_pr = int(merged_pr[1:])
except ValueError:
logging.error("Failed to convert %s to integer", merged_pr)
self.sha = github_event["after"]
pull_request = get_pr_for_commit(self.sha, github_event["ref"])
repo_prefix = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}"

View File

@ -2,10 +2,13 @@
set -xeo pipefail
WORKDIR=$(dirname "$0")
WORKDIR=$(readlink -f "${WORKDIR}")
cd "$WORKDIR"
PY_EXEC=python3.9
LAMBDA_NAME=$(basename "$PWD")
PY_VERSION=3.9
PY_EXEC="python${PY_VERSION}"
DOCKER_IMAGE="python:${PY_VERSION}-slim"
LAMBDA_NAME=$(basename "$WORKDIR")
LAMBDA_NAME=${LAMBDA_NAME//_/-}
PACKAGE=lambda-package
rm -rf "$PACKAGE" "$PACKAGE".zip
@ -14,10 +17,12 @@ cp app.py "$PACKAGE"
if [ -f requirements.txt ]; then
VENV=lambda-venv
rm -rf "$VENV" lambda-package.zip
"$PY_EXEC" -m venv "$VENV"
# shellcheck disable=SC1091
source "$VENV/bin/activate"
pip install -r requirements.txt
docker run --rm --user="${UID}" --volume="${WORKDIR}:/lambda" --workdir="/lambda" "${DOCKER_IMAGE}" \
/bin/bash -c "
'$PY_EXEC' -m venv '$VENV' &&
source '$VENV/bin/activate' &&
pip install -r requirements.txt
"
cp -rT "$VENV/lib/$PY_EXEC/site-packages/" "$PACKAGE"
rm -r "$PACKAGE"/{pip,pip-*,setuptools,setuptools-*}
fi

View File

@ -46,15 +46,17 @@ curl "${TEAM_KEYS_URL}" > /home/ubuntu/.ssh/authorized_keys2
chown ubuntu: /home/ubuntu/.ssh -R
# Create a pre-run script that will restart docker daemon before the job started
# Create a pre-run script that will provide diagnostics info
mkdir -p /tmp/actions-hooks
cat > /tmp/actions-hooks/pre-run.sh << 'EOF'
cat > /tmp/actions-hooks/pre-run.sh << EOF
#!/bin/bash
set -xuo pipefail
set -uo pipefail
echo "Runner's public DNS: $(ec2metadata --public-hostname)"
echo "Runner's labels: ${LABELS}"
EOF
# Create a post-run script that will restart docker daemon before the job started
cat > /tmp/actions-hooks/post-run.sh << 'EOF'
#!/bin/bash
set -xuo pipefail

View File

@ -15,6 +15,7 @@
<value>hiveHash</value>
<value>xxHash32</value>
<value>xxHash64</value>
<value>xxh3</value>
<value>CRC32</value>
</values>
</substitution>

View File

@ -16,7 +16,7 @@ function create_db()
# So CREATE TABLE queries will fail on all replicas except one. But it's still makes sense for a stress test.
$CLICKHOUSE_CLIENT --allow_experimental_database_replicated=1 --query \
"create database if not exists ${CLICKHOUSE_DATABASE}_repl_$SUFFIX engine=Replicated('/test/01111/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX', '$SHARD', '$REPLICA')" \
2>&1| grep -Fa "Exception: " | grep -Fv "REPLICA_IS_ALREADY_EXIST" | grep -Fiv "Will not try to start it up" | \
2>&1| grep -Fa "Exception: " | grep -Fv "REPLICA_ALREADY_EXISTS" | grep -Fiv "Will not try to start it up" | \
grep -Fv "Coordination::Exception" | grep -Fv "already contains some data and it does not look like Replicated database path"
sleep 0.$RANDOM
done

View File

@ -14,9 +14,7 @@ CREATE TABLE system.asynchronous_inserts
`first_update` DateTime64(6),
`total_bytes` UInt64,
`entries.query_id` Array(String),
`entries.bytes` Array(UInt64),
`entries.finished` Array(UInt8),
`entries.exception` Array(String)
`entries.bytes` Array(UInt64)
)
ENGINE = SystemAsynchronousInserts
COMMENT 'SYSTEM TABLE is built on the fly.'

View File

@ -1,4 +1,4 @@
1 a
2 b
INSERT INTO async_inserts_2156 VALUES 1 Insert 1 0
INSERT INTO async_inserts_2156 VALUES 1 Insert 1
INSERT INTO async_inserts_2156 VALUES 1 Insert 1
INSERT INTO async_inserts_2156 VALUES 1 Insert 1

View File

@ -7,7 +7,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts_2156"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts_2156 (id UInt32, s String) ENGINE = Memory"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=0" -d "INSERT INTO async_inserts_2156 VALUES (1, 'a')"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1" -d "INSERT INTO async_inserts_2156 VALUES (1, 'a')"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1" -d "INSERT INTO async_inserts_2156 VALUES (2, 'b')"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts_2156 ORDER BY id"
@ -15,7 +15,7 @@ ${CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts_2156 ORDER BY id"
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "SELECT query, arrayExists(x -> x LIKE '%async_inserts_2156', tables), \
query_kind, Settings['async_insert'], Settings['wait_for_async_insert'] FROM system.query_log \
query_kind, Settings['async_insert'] FROM system.query_log \
WHERE event_date >= yesterday() AND current_database = '$CLICKHOUSE_DATABASE' \
AND query ILIKE 'INSERT INTO async_inserts_2156 VALUES%' AND type = 'QueryFinish' \
ORDER BY query_start_time_microseconds"

View File

@ -0,0 +1,23 @@
0
0
0
9999999999999999550522436926092261716351992671467843175339166479588690755584
9999999999999999451597035424131548206707486713696660676795842648250000000000
11.126038
10.8
-11.126038
-10.8
10.8
1376.638914
1403.6
-1376.638914
-1403.6
1403.6
332833500
999
1000
1000
1000
0.1
0.1
0.1

View File

@ -0,0 +1,45 @@
-- Tags: no-fasttest
-- check cases when one of operands is zero
SELECT divideDecimal(toDecimal32(0, 2), toDecimal128(11.123456, 6));
SELECT divideDecimal(toDecimal64(123.123, 3), toDecimal64(0, 1)); -- { serverError 153 }
SELECT multiplyDecimal(toDecimal32(0, 2), toDecimal128(11.123456, 6));
SELECT multiplyDecimal(toDecimal32(123.123, 3), toDecimal128(0, 1));
-- don't look at strange query result -- it happens due to bad float precision: toUInt256(1e38) == 99999999999999997752612184630461283328
SELECT multiplyDecimal(toDecimal256(1e38, 0), toDecimal256(1e38, 0));
SELECT divideDecimal(toDecimal256(1e66, 0), toDecimal256(1e-10, 10), 0);
-- fits Decimal256, but scale is too big to fit
SELECT multiplyDecimal(toDecimal256(1e38, 0), toDecimal256(1e38, 0), 2); -- { serverError 407 }
SELECT divideDecimal(toDecimal256(1e72, 0), toDecimal256(1e-5, 5), 2); -- { serverError 407 }
-- does not fit Decimal256
SELECT multiplyDecimal(toDecimal256('1e38', 0), toDecimal256('1e38', 0)); -- { serverError 407 }
SELECT multiplyDecimal(toDecimal256(1e39, 0), toDecimal256(1e39, 0), 0); -- { serverError 407 }
SELECT divideDecimal(toDecimal256(1e39, 0), toDecimal256(1e-38, 39)); -- { serverError 407 }
-- test different signs
SELECT divideDecimal(toDecimal128(123.76, 2), toDecimal128(11.123456, 6));
SELECT divideDecimal(toDecimal32(123.123, 3), toDecimal128(11.4, 1), 2);
SELECT divideDecimal(toDecimal128(-123.76, 2), toDecimal128(11.123456, 6));
SELECT divideDecimal(toDecimal32(123.123, 3), toDecimal128(-11.4, 1), 2);
SELECT divideDecimal(toDecimal32(-123.123, 3), toDecimal128(-11.4, 1), 2);
SELECT multiplyDecimal(toDecimal64(123.76, 2), toDecimal128(11.123456, 6));
SELECT multiplyDecimal(toDecimal32(123.123, 3), toDecimal128(11.4, 1), 2);
SELECT multiplyDecimal(toDecimal64(-123.76, 2), toDecimal128(11.123456, 6));
SELECT multiplyDecimal(toDecimal32(123.123, 3), toDecimal128(-11.4, 1), 2);
SELECT multiplyDecimal(toDecimal32(-123.123, 3), toDecimal128(-11.4, 1), 2);
-- check against non-const columns
SELECT sum(multiplyDecimal(toDecimal64(number, 1), toDecimal64(number, 5))) FROM numbers(1000);
SELECT sum(divideDecimal(toDecimal64(number, 1), toDecimal64(number, 5))) FROM (select * from numbers(1000) OFFSET 1);
-- check against Nullable type
SELECT multiplyDecimal(toNullable(toDecimal64(10, 1)), toDecimal64(100, 5));
SELECT multiplyDecimal(toDecimal64(10, 1), toNullable(toDecimal64(100, 5)));
SELECT multiplyDecimal(toNullable(toDecimal64(10, 1)), toNullable(toDecimal64(100, 5)));
SELECT divideDecimal(toNullable(toDecimal64(10, 1)), toDecimal64(100, 5));
SELECT divideDecimal(toDecimal64(10, 1), toNullable(toDecimal64(100, 5)));
SELECT divideDecimal(toNullable(toDecimal64(10, 1)), toNullable(toDecimal64(100, 5)));

View File

@ -0,0 +1 @@
OK

View File

@ -0,0 +1,63 @@
#!/usr/bin/env bash
# Tags: no-random-settings, no-fasttest, long
set -e
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
export MY_CLICKHOUSE_CLIENT="$CLICKHOUSE_CLIENT --async_insert_busy_timeout_ms 10 --async_insert_max_data_size 1 --async_insert 1"
function insert1()
{
while true; do
${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 0 -q 'INSERT INTO async_inserts_race FORMAT CSV 1,"a"'
done
}
function insert2()
{
while true; do
${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 0 -q 'INSERT INTO async_inserts_race FORMAT JSONEachRow {"id": 5, "s": "e"} {"id": 6, "s": "f"}'
done
}
function insert3()
{
while true; do
${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 1 -q "INSERT INTO async_inserts_race VALUES (7, 'g') (8, 'h')" &
sleep 0.05
done
}
function select1()
{
while true; do
${MY_CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts_race FORMAT Null"
done
}
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts_race"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts_race (id UInt32, s String) ENGINE = MergeTree ORDER BY id"
TIMEOUT=10
export -f insert1
export -f insert2
export -f insert3
export -f select1
for _ in {1..3}; do
timeout $TIMEOUT bash -c insert1 &
timeout $TIMEOUT bash -c insert2 &
timeout $TIMEOUT bash -c insert3 &
done
timeout $TIMEOUT bash -c select1 &
wait
echo "OK"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts_race";

View File

@ -0,0 +1,3 @@
Parquet
3d94071a2fe62a3b3285f170ca6f42e5 -
70000

View File

@ -0,0 +1,42 @@
#!/usr/bin/env bash
# Tags: no-ubsan, no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
echo "Parquet"
# File generated with the below script
#import pyarrow as pa
#import pyarrow.parquet as pq
#import random
#
#
#def gen_array(offset):
# array = []
# array_length = random.randint(0, 9)
# for i in range(array_length):
# array.append(i + offset)
#
# return array
#
#
#def gen_arrays(number_of_arrays):
# list_of_arrays = []
# for i in range(number_of_arrays):
# list_of_arrays.append(gen_array(i))
# return list_of_arrays
#
#arr = pa.array(gen_arrays(70000))
#table = pa.table([arr], ["arr"])
#pq.write_table(table, "int-list-zero-based-chunked-array.parquet")
DATA_FILE=$CUR_DIR/data_parquet/int-list-zero-based-chunked-array.parquet
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS parquet_load"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE parquet_load (arr Array(Int64)) ENGINE = Memory"
cat "$DATA_FILE" | ${CLICKHOUSE_CLIENT} -q "INSERT INTO parquet_load FORMAT Parquet"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM parquet_load" | md5sum
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM parquet_load"
${CLICKHOUSE_CLIENT} --query="drop table parquet_load"

View File

@ -0,0 +1,3 @@
Parquet
e1cfe4265689ead763b18489b363344d -
39352

View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
# Tags: no-ubsan, no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
echo "Parquet"
DATA_FILE=$CUR_DIR/data_parquet/list_monotonically_increasing_offsets.parquet
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS parquet_load"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE parquet_load (list Array(Int64), json Nullable(String)) ENGINE = Memory"
cat "$DATA_FILE" | ${CLICKHOUSE_CLIENT} -q "INSERT INTO parquet_load FORMAT Parquet"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM parquet_load" | md5sum
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM parquet_load"
${CLICKHOUSE_CLIENT} --query="drop table parquet_load"

View File

@ -0,0 +1,23 @@
CREATE TABLE gen
(
repo_name String,
event_type Enum8('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
actor_login String,
created_at DateTime,
action Enum8('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
number UInt32,
merged_at DateTime
)
ENGINE = GenerateRandom;
CREATE TABLE github_events AS gen ENGINE=MergeTree ORDER BY (event_type, repo_name, created_at);
INSERT INTO github_events SELECT * FROM gen LIMIT 100000;
INSERT INTO github_events VALUES ('apache/pulsar','PullRequestEvent','hangc0276','2021-01-22 06:58:03','opened',9276,'1970-01-01 00:00:00') ('apache/pulsar','PullRequestEvent','hangc0276','2021-01-25 02:38:07','closed',9276,'1970-01-01 00:00:00') ('apache/pulsar','PullRequestEvent','hangc0276','2021-01-25 02:38:09','reopened',9276,'1970-01-01 00:00:00') ('apache/pulsar','PullRequestEvent','hangc0276','2021-04-22 06:05:09','closed',9276,'2021-04-22 06:05:08') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-01-23 00:32:09','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-01-23 02:52:11','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-01-24 03:02:31','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-01-25 02:16:42','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-01-26 06:52:42','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-01-27 01:10:33','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-01-29 02:11:41','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-02-02 07:35:40','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-02-03 00:44:26','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-02-03 02:14:26','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','PullRequestReviewEvent','codelipenghui','2021-03-29 14:31:25','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','PullRequestReviewEvent','eolivelli','2021-03-29 16:34:02','created',9276,'1970-01-01 00:00:00');
OPTIMIZE TABLE github_events FINAL;
SELECT count()
FROM github_events
WHERE (repo_name = 'apache/pulsar') AND (toString(event_type) IN ('PullRequestEvent', 'PullRequestReviewCommentEvent', 'PullRequestReviewEvent', 'IssueCommentEvent')) AND (actor_login NOT IN ('github-actions[bot]', 'codecov-commenter')) AND (number = 9276);

View File

@ -0,0 +1 @@
18009318874338624809

View File

@ -0,0 +1 @@
SELECT xxh3('ClickHouse');

View File

@ -0,0 +1,77 @@
QUERY id: 0
PROJECTION COLUMNS
sumIf(1, equals(modulo(number, 2), 0)) UInt64
PROJECTION
LIST id: 1, nodes: 1
FUNCTION id: 2, function_name: countIf, function_type: aggregate, result_type: UInt64
ARGUMENTS
LIST id: 3, nodes: 1
FUNCTION id: 4, function_name: equals, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 5, nodes: 2
FUNCTION id: 6, function_name: modulo, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 7, nodes: 2
COLUMN id: 8, column_name: number, result_type: UInt64, source_id: 9
CONSTANT id: 10, constant_value: UInt64_2, constant_value_type: UInt8
CONSTANT id: 11, constant_value: UInt64_0, constant_value_type: UInt8
JOIN TREE
TABLE_FUNCTION id: 9, table_function_name: numbers
ARGUMENTS
LIST id: 12, nodes: 1
CONSTANT id: 13, constant_value: UInt64_10, constant_value_type: UInt8
--
5
--
QUERY id: 0
PROJECTION COLUMNS
sum(if(equals(modulo(number, 2), 0), 1, 0)) UInt64
PROJECTION
LIST id: 1, nodes: 1
FUNCTION id: 2, function_name: countIf, function_type: aggregate, result_type: UInt64
ARGUMENTS
LIST id: 3, nodes: 1
FUNCTION id: 4, function_name: equals, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 5, nodes: 2
FUNCTION id: 6, function_name: modulo, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 7, nodes: 2
COLUMN id: 8, column_name: number, result_type: UInt64, source_id: 9
CONSTANT id: 10, constant_value: UInt64_2, constant_value_type: UInt8
CONSTANT id: 11, constant_value: UInt64_0, constant_value_type: UInt8
JOIN TREE
TABLE_FUNCTION id: 9, table_function_name: numbers
ARGUMENTS
LIST id: 12, nodes: 1
CONSTANT id: 13, constant_value: UInt64_10, constant_value_type: UInt8
--
5
--
QUERY id: 0
PROJECTION COLUMNS
sum(if(equals(modulo(number, 2), 0), 0, 1)) UInt64
PROJECTION
LIST id: 1, nodes: 1
FUNCTION id: 2, function_name: countIf, function_type: aggregate, result_type: UInt64
ARGUMENTS
LIST id: 3, nodes: 1
FUNCTION id: 4, function_name: not, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 5, nodes: 1
FUNCTION id: 6, function_name: equals, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 7, nodes: 2
FUNCTION id: 8, function_name: modulo, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 9, nodes: 2
COLUMN id: 10, column_name: number, result_type: UInt64, source_id: 11
CONSTANT id: 12, constant_value: UInt64_2, constant_value_type: UInt8
CONSTANT id: 13, constant_value: UInt64_0, constant_value_type: UInt8
JOIN TREE
TABLE_FUNCTION id: 11, table_function_name: numbers
ARGUMENTS
LIST id: 14, nodes: 1
CONSTANT id: 15, constant_value: UInt64_10, constant_value_type: UInt8
--
5

View File

@ -0,0 +1,24 @@
SET allow_experimental_analyzer = 1;
SET optimize_rewrite_sum_if_to_count_if = 1;
EXPLAIN QUERY TREE (SELECT sumIf(1, (number % 2) == 0) FROM numbers(10));
SELECT '--';
SELECT sumIf(1, (number % 2) == 0) FROM numbers(10);
SELECT '--';
EXPLAIN QUERY TREE (SELECT sum(if((number % 2) == 0, 1, 0)) FROM numbers(10));
SELECT '--';
SELECT sum(if((number % 2) == 0, 1, 0)) FROM numbers(10);
SELECT '--';
EXPLAIN QUERY TREE (SELECT sum(if((number % 2) == 0, 0, 1)) FROM numbers(10));
SELECT '--';
SELECT sum(if((number % 2) == 0, 0, 1)) FROM numbers(10);