diff --git a/.gitmodules b/.gitmodules index 293029ad171..618cfe6e76b 100644 --- a/.gitmodules +++ b/.gitmodules @@ -290,3 +290,6 @@ [submodule "contrib/morton-nd"] path = contrib/morton-nd url = https://github.com/morton-nd/morton-nd +[submodule "contrib/xxHash"] + path = contrib/xxHash + url = https://github.com/Cyan4973/xxHash.git diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 8ebd4ab55d3..ec7382846c2 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -167,7 +167,9 @@ add_contrib (c-ares-cmake c-ares) add_contrib (qpl-cmake qpl) add_contrib (morton-nd-cmake morton-nd) -add_contrib(annoy-cmake annoy) +add_contrib (annoy-cmake annoy) + +add_contrib (xxHash-cmake xxHash) # Put all targets defined here and in subdirectories under "contrib/" folders in GUI-based IDEs. # Some of third-party projects may override CMAKE_FOLDER or FOLDER property of their targets, so they would not appear diff --git a/contrib/xxHash b/contrib/xxHash new file mode 160000 index 00000000000..3078dc6039f --- /dev/null +++ b/contrib/xxHash @@ -0,0 +1 @@ +Subproject commit 3078dc6039f8c0bffcb1904f81cfe6b2c3209435 diff --git a/contrib/xxHash-cmake/CMakeLists.txt b/contrib/xxHash-cmake/CMakeLists.txt new file mode 100644 index 00000000000..314094e9523 --- /dev/null +++ b/contrib/xxHash-cmake/CMakeLists.txt @@ -0,0 +1,13 @@ +set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/xxHash") +set (SRCS + "${LIBRARY_DIR}/xxhash.c" +) + +add_library(xxHash ${SRCS}) +target_include_directories(xxHash SYSTEM BEFORE INTERFACE "${LIBRARY_DIR}") + +# XXH_INLINE_ALL - Make all functions inline, with implementations being directly included within xxhash.h. Inlining functions is beneficial for speed on small keys. +# https://github.com/Cyan4973/xxHash/tree/v0.8.1#build-modifiers +target_compile_definitions(xxHash PUBLIC XXH_INLINE_ALL) + +add_library(ch_contrib::xxHash ALIAS xxHash) diff --git a/docker/packager/binary/Dockerfile b/docker/packager/binary/Dockerfile index 06c3c0d80f0..b3da09facda 100644 --- a/docker/packager/binary/Dockerfile +++ b/docker/packager/binary/Dockerfile @@ -6,29 +6,24 @@ FROM clickhouse/test-util:$FROM_TAG # Rust toolchain and libraries ENV RUSTUP_HOME=/rust/rustup ENV CARGO_HOME=/rust/cargo -RUN curl https://sh.rustup.rs -sSf | bash -s -- -y -RUN chmod 777 -R /rust ENV PATH="/rust/cargo/env:${PATH}" ENV PATH="/rust/cargo/bin:${PATH}" -RUN rustup target add aarch64-unknown-linux-gnu && \ - rustup target add x86_64-apple-darwin && \ - rustup target add x86_64-unknown-freebsd && \ - rustup target add aarch64-apple-darwin && \ - rustup target add powerpc64le-unknown-linux-gnu -RUN apt-get install \ +RUN curl https://sh.rustup.rs -sSf | bash -s -- -y && \ + chmod 777 -R /rust && \ + rustup target add aarch64-unknown-linux-gnu && \ + rustup target add x86_64-apple-darwin && \ + rustup target add x86_64-unknown-freebsd && \ + rustup target add aarch64-apple-darwin && \ + rustup target add powerpc64le-unknown-linux-gnu + +RUN apt-get update && \ + apt-get install --yes \ gcc-aarch64-linux-gnu \ build-essential \ libc6 \ libc6-dev \ - libc6-dev-arm64-cross \ - --yes - -# Install CMake 3.20+ for Rust compilation -# Used https://askubuntu.com/a/1157132 as reference -RUN apt purge cmake --yes -RUN wget -O - https://apt.kitware.com/keys/kitware-archive-latest.asc 2>/dev/null | gpg --dearmor - | tee /etc/apt/trusted.gpg.d/kitware.gpg >/dev/null -RUN apt-add-repository 'deb https://apt.kitware.com/ubuntu/ focal main' -RUN apt update && apt install cmake --yes + libc6-dev-arm64-cross && \ + apt-get clean ENV CC=clang-${LLVM_VERSION} ENV CXX=clang++-${LLVM_VERSION} diff --git a/docker/test/fasttest/run.sh b/docker/test/fasttest/run.sh index de9125d565b..b4d3405bfd9 100755 --- a/docker/test/fasttest/run.sh +++ b/docker/test/fasttest/run.sh @@ -137,6 +137,7 @@ function clone_submodules contrib/hashidsxx contrib/c-ares contrib/morton-nd + contrib/xxHash ) git submodule sync diff --git a/docker/test/stress/run.sh b/docker/test/stress/run.sh index 0afd6cc19f3..e290a959b23 100644 --- a/docker/test/stress/run.sh +++ b/docker/test/stress/run.sh @@ -458,7 +458,7 @@ else zgrep -Fav -e "Code: 236. DB::Exception: Cancelled merging parts" \ -e "Code: 236. DB::Exception: Cancelled mutating parts" \ -e "REPLICA_IS_ALREADY_ACTIVE" \ - -e "REPLICA_IS_ALREADY_EXIST" \ + -e "REPLICA_ALREADY_EXISTS" \ -e "ALL_REPLICAS_LOST" \ -e "DDLWorker: Cannot parse DDL task query" \ -e "RaftInstance: failed to accept a rpc connection due to error 125" \ diff --git a/docker/test/util/Dockerfile b/docker/test/util/Dockerfile index 57544bdc090..f1cf029e9a2 100644 --- a/docker/test/util/Dockerfile +++ b/docker/test/util/Dockerfile @@ -13,6 +13,7 @@ RUN apt-get update \ apt-transport-https \ apt-utils \ ca-certificates \ + curl \ dnsutils \ gnupg \ iputils-ping \ @@ -24,10 +25,16 @@ RUN apt-get update \ && echo "${LLVM_PUBKEY_HASH} /tmp/llvm-snapshot.gpg.key" | sha384sum -c \ && apt-key add /tmp/llvm-snapshot.gpg.key \ && export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \ - && echo "deb [trusted=yes] https://apt.llvm.org/${CODENAME}/ llvm-toolchain-${CODENAME}-${LLVM_VERSION} main" >> \ + && echo "deb https://apt.llvm.org/${CODENAME}/ llvm-toolchain-${CODENAME}-${LLVM_VERSION} main" >> \ /etc/apt/sources.list \ && apt-get clean +# Install cmake 3.20+ for rust support +# Used https://askubuntu.com/a/1157132 as reference +RUN curl -s https://apt.kitware.com/keys/kitware-archive-latest.asc | \ + gpg --dearmor - > /etc/apt/trusted.gpg.d/kitware.gpg && \ + echo "deb https://apt.kitware.com/ubuntu/ $(lsb_release -cs) main" >> /etc/apt/sources.list + # initial packages RUN apt-get update \ && apt-get install \ @@ -37,7 +44,6 @@ RUN apt-get update \ clang-${LLVM_VERSION} \ clang-tidy-${LLVM_VERSION} \ cmake \ - curl \ fakeroot \ gdb \ git \ diff --git a/docs/en/sql-reference/functions/arithmetic-functions.md b/docs/en/sql-reference/functions/arithmetic-functions.md index ece50591ef9..56f3a88b28b 100644 --- a/docs/en/sql-reference/functions/arithmetic-functions.md +++ b/docs/en/sql-reference/functions/arithmetic-functions.md @@ -161,3 +161,140 @@ Result: │ -1 │ └─────────────┘ ``` + +## multiplyDecimal(a, b[, result_scale]) + +Performs multiplication on two decimals. Result value will be of type [Decimal256](../../sql-reference/data-types/decimal.md). +Result scale can be explicitly specified by `result_scale` argument (const Integer in range `[0, 76]`). If not specified, the result scale is the max scale of given arguments. + +:::note +These functions work significantly slower than usual `multiply`. +In case you don't really need controlled precision and/or need fast computation, consider using [multiply](#multiply) +::: + +**Syntax** + +```sql +multiplyDecimal(a, b[, result_scale]) +``` + +**Arguments** + +- `a` — First value: [Decimal](../../sql-reference/data-types/decimal.md). +- `b` — Second value: [Decimal](../../sql-reference/data-types/decimal.md). +- `result_scale` — Scale of result: [Int/UInt](../../sql-reference/data-types/int-uint.md). + +**Returned value** + +- The result of multiplication with given scale. + +Type: [Decimal256](../../sql-reference/data-types/decimal.md). + +**Example** + +```text +┌─multiplyDecimal(toDecimal256(-12, 0), toDecimal32(-2.1, 1), 1)─┐ +│ 25.2 │ +└────────────────────────────────────────────────────────────────┘ +``` + +**Difference from regular multiplication:** +```sql +SELECT toDecimal64(-12.647, 3) * toDecimal32(2.1239, 4); +SELECT toDecimal64(-12.647, 3) as a, toDecimal32(2.1239, 4) as b, multiplyDecimal(a, b); +``` + +```text +┌─multiply(toDecimal64(-12.647, 3), toDecimal32(2.1239, 4))─┐ +│ -26.8609633 │ +└───────────────────────────────────────────────────────────┘ +┌─multiplyDecimal(toDecimal64(-12.647, 3), toDecimal32(2.1239, 4))─┐ +│ -26.8609 │ +└──────────────────────────────────────────────────────────────────┘ +``` + +```sql +SELECT + toDecimal64(-12.647987876, 9) AS a, + toDecimal64(123.967645643, 9) AS b, + multiplyDecimal(a, b); + +SELECT + toDecimal64(-12.647987876, 9) AS a, + toDecimal64(123.967645643, 9) AS b, + a * b; +``` + +```text +┌─────────────a─┬─────────────b─┬─multiplyDecimal(toDecimal64(-12.647987876, 9), toDecimal64(123.967645643, 9))─┐ +│ -12.647987876 │ 123.967645643 │ -1567.941279108 │ +└───────────────┴───────────────┴───────────────────────────────────────────────────────────────────────────────┘ + +Received exception from server (version 22.11.1): +Code: 407. DB::Exception: Received from localhost:9000. DB::Exception: Decimal math overflow: While processing toDecimal64(-12.647987876, 9) AS a, toDecimal64(123.967645643, 9) AS b, a * b. (DECIMAL_OVERFLOW) +``` + +## divideDecimal(a, b[, result_scale]) + +Performs division on two decimals. Result value will be of type [Decimal256](../../sql-reference/data-types/decimal.md). +Result scale can be explicitly specified by `result_scale` argument (const Integer in range `[0, 76]`). If not specified, the result scale is the max scale of given arguments. + +:::note +These function work significantly slower than usual `divide`. +In case you don't really need controlled precision and/or need fast computation, consider using [divide](#divide). +::: + +**Syntax** + +```sql +divideDecimal(a, b[, result_scale]) +``` + +**Arguments** + +- `a` — First value: [Decimal](../../sql-reference/data-types/decimal.md). +- `b` — Second value: [Decimal](../../sql-reference/data-types/decimal.md). +- `result_scale` — Scale of result: [Int/UInt](../../sql-reference/data-types/int-uint.md). + +**Returned value** + +- The result of division with given scale. + +Type: [Decimal256](../../sql-reference/data-types/decimal.md). + +**Example** + +```text +┌─divideDecimal(toDecimal256(-12, 0), toDecimal32(2.1, 1), 10)─┐ +│ -5.7142857142 │ +└──────────────────────────────────────────────────────────────┘ +``` + +**Difference from regular division:** +```sql +SELECT toDecimal64(-12, 1) / toDecimal32(2.1, 1); +SELECT toDecimal64(-12, 1) as a, toDecimal32(2.1, 1) as b, divideDecimal(a, b, 1), divideDecimal(a, b, 5); +``` + +```text +┌─divide(toDecimal64(-12, 1), toDecimal32(2.1, 1))─┐ +│ -5.7 │ +└──────────────────────────────────────────────────┘ + +┌───a─┬───b─┬─divideDecimal(toDecimal64(-12, 1), toDecimal32(2.1, 1), 1)─┬─divideDecimal(toDecimal64(-12, 1), toDecimal32(2.1, 1), 5)─┐ +│ -12 │ 2.1 │ -5.7 │ -5.71428 │ +└─────┴─────┴────────────────────────────────────────────────────────────┴────────────────────────────────────────────────────────────┘ +``` + +```sql +SELECT toDecimal64(-12, 0) / toDecimal32(2.1, 1); +SELECT toDecimal64(-12, 0) as a, toDecimal32(2.1, 1) as b, divideDecimal(a, b, 1), divideDecimal(a, b, 5); +``` + +```text +DB::Exception: Decimal result's scale is less than argument's one: While processing toDecimal64(-12, 0) / toDecimal32(2.1, 1). (ARGUMENT_OUT_OF_BOUND) + +┌───a─┬───b─┬─divideDecimal(toDecimal64(-12, 0), toDecimal32(2.1, 1), 1)─┬─divideDecimal(toDecimal64(-12, 0), toDecimal32(2.1, 1), 5)─┐ +│ -12 │ 2.1 │ -5.7 │ -5.71428 │ +└─────┴─────┴────────────────────────────────────────────────────────────┴────────────────────────────────────────────────────────────┘ +``` diff --git a/docs/ru/sql-reference/functions/arithmetic-functions.md b/docs/ru/sql-reference/functions/arithmetic-functions.md index bc1d0a55128..4e040edcc70 100644 --- a/docs/ru/sql-reference/functions/arithmetic-functions.md +++ b/docs/ru/sql-reference/functions/arithmetic-functions.md @@ -159,3 +159,150 @@ SELECT min2(-1, 2); └─────────────┘ ``` +## multiplyDecimal(a, b[, result_scale]) + +Совершает умножение двух Decimal. Результат будет иметь тип [Decimal256](../../sql-reference/data-types/decimal.md). +Scale (размер дробной части) результат можно явно задать аргументом `result_scale` (целочисленная константа из интервала `[0, 76]`). +Если этот аргумент не задан, то scale результата будет равен наибольшему из scale обоих аргументов. + +**Синтаксис** + +```sql +multiplyDecimal(a, b[, result_scale]) +``` + +:::note +Эта функция работают гораздо медленнее обычной `multiply`. +В случае, если нет необходимости иметь фиксированную точность и/или нужны быстрые вычисления, следует использовать [multiply](#multiply). +::: + +**Аргументы** + +- `a` — Первый сомножитель/делимое: [Decimal](../../sql-reference/data-types/decimal.md). +- `b` — Второй сомножитель/делитель: [Decimal](../../sql-reference/data-types/decimal.md). +- `result_scale` — Scale результата: [Int/UInt](../../sql-reference/data-types/int-uint.md). + +**Возвращаемое значение** + +- Результат умножения с заданным scale. + +Тип: [Decimal256](../../sql-reference/data-types/decimal.md). + +**Примеры** + +```sql +SELECT multiplyDecimal(toDecimal256(-12, 0), toDecimal32(-2.1, 1), 1); +``` + +```text +┌─multiplyDecimal(toDecimal256(-12, 0), toDecimal32(-2.1, 1), 1)─┐ +│ 25.2 │ +└────────────────────────────────────────────────────────────────┘ +``` + +**Отличие от стандартных функций** +```sql +SELECT toDecimal64(-12.647, 3) * toDecimal32(2.1239, 4); +SELECT toDecimal64(-12.647, 3) as a, toDecimal32(2.1239, 4) as b, multiplyDecimal(a, b); +``` + +```text +┌─multiply(toDecimal64(-12.647, 3), toDecimal32(2.1239, 4))─┐ +│ -26.8609633 │ +└───────────────────────────────────────────────────────────┘ +┌─multiplyDecimal(toDecimal64(-12.647, 3), toDecimal32(2.1239, 4))─┐ +│ -26.8609 │ +└──────────────────────────────────────────────────────────────────┘ +``` + +```sql +SELECT + toDecimal64(-12.647987876, 9) AS a, + toDecimal64(123.967645643, 9) AS b, + multiplyDecimal(a, b); + +SELECT + toDecimal64(-12.647987876, 9) AS a, + toDecimal64(123.967645643, 9) AS b, + a * b; +``` + +```text +┌─────────────a─┬─────────────b─┬─multiplyDecimal(toDecimal64(-12.647987876, 9), toDecimal64(123.967645643, 9))─┐ +│ -12.647987876 │ 123.967645643 │ -1567.941279108 │ +└───────────────┴───────────────┴───────────────────────────────────────────────────────────────────────────────┘ + +Received exception from server (version 22.11.1): +Code: 407. DB::Exception: Received from localhost:9000. DB::Exception: Decimal math overflow: While processing toDecimal64(-12.647987876, 9) AS a, toDecimal64(123.967645643, 9) AS b, a * b. (DECIMAL_OVERFLOW) +``` + +## divideDecimal(a, b[, result_scale]) + +Совершает деление двух Decimal. Результат будет иметь тип [Decimal256](../../sql-reference/data-types/decimal.md). +Scale (размер дробной части) результат можно явно задать аргументом `result_scale` (целочисленная константа из интервала `[0, 76]`). +Если этот аргумент не задан, то scale результата будет равен наибольшему из scale обоих аргументов. + +**Синтаксис** + +```sql +divideDecimal(a, b[, result_scale]) +``` + +:::note +Эта функция работает гораздо медленнее обычной `divide`. +В случае, если нет необходимости иметь фиксированную точность и/или нужны быстрые вычисления, следует использовать [divide](#divide). +::: + +**Аргументы** + +- `a` — Первый сомножитель/делимое: [Decimal](../../sql-reference/data-types/decimal.md). +- `b` — Второй сомножитель/делитель: [Decimal](../../sql-reference/data-types/decimal.md). +- `result_scale` — Scale результата: [Int/UInt](../../sql-reference/data-types/int-uint.md). + +**Возвращаемое значение** + +- Результат деления с заданным scale. + +Тип: [Decimal256](../../sql-reference/data-types/decimal.md). + +**Примеры** + +```sql +SELECT divideDecimal(toDecimal256(-12, 0), toDecimal32(2.1, 1), 10); +``` + +```text +┌─divideDecimal(toDecimal256(-12, 0), toDecimal32(2.1, 1), 10)─┐ +│ -5.7142857142 │ +└──────────────────────────────────────────────────────────────┘ +``` + +**Отличие от стандартных функций** +```sql +SELECT toDecimal64(-12, 1) / toDecimal32(2.1, 1); +SELECT toDecimal64(-12, 1) as a, toDecimal32(2.1, 1) as b, divideDecimal(a, b, 1), divideDecimal(a, b, 5); +``` + +```text +┌─divide(toDecimal64(-12, 1), toDecimal32(2.1, 1))─┐ +│ -5.7 │ +└──────────────────────────────────────────────────┘ + +┌───a─┬───b─┬─divideDecimal(toDecimal64(-12, 1), toDecimal32(2.1, 1), 1)─┬─divideDecimal(toDecimal64(-12, 1), toDecimal32(2.1, 1), 5)─┐ +│ -12 │ 2.1 │ -5.7 │ -5.71428 │ +└─────┴─────┴────────────────────────────────────────────────────────────┴────────────────────────────────────────────────────────────┘ +``` + +```sql +SELECT toDecimal64(-12, 0) / toDecimal32(2.1, 1); +SELECT toDecimal64(-12, 0) as a, toDecimal32(2.1, 1) as b, divideDecimal(a, b, 1), divideDecimal(a, b, 5); +``` + +```text +DB::Exception: Decimal result's scale is less than argument's one: While processing toDecimal64(-12, 0) / toDecimal32(2.1, 1). (ARGUMENT_OUT_OF_BOUND) + +┌───a─┬───b─┬─divideDecimal(toDecimal64(-12, 0), toDecimal32(2.1, 1), 1)─┬─divideDecimal(toDecimal64(-12, 0), toDecimal32(2.1, 1), 5)─┐ +│ -12 │ 2.1 │ -5.7 │ -5.71428 │ +└─────┴─────┴────────────────────────────────────────────────────────────┴────────────────────────────────────────────────────────────┘ +``` + diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 4702985c0ae..cda9dbbcf28 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1475,8 +1475,7 @@ try if (settings.async_insert_threads) global_context->setAsynchronousInsertQueue(std::make_shared( global_context, - settings.async_insert_threads, - settings.async_insert_cleanup_timeout_ms)); + settings.async_insert_threads)); /// Size of cache for marks (index of MergeTree family of tables). size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120); diff --git a/src/Analyzer/Passes/SumIfToCountIfPass.cpp b/src/Analyzer/Passes/SumIfToCountIfPass.cpp index f43c90e10eb..5413d1b4670 100644 --- a/src/Analyzer/Passes/SumIfToCountIfPass.cpp +++ b/src/Analyzer/Passes/SumIfToCountIfPass.cpp @@ -61,7 +61,7 @@ public: function_node_arguments_nodes[0] = std::move(function_node_arguments_nodes[1]); function_node_arguments_nodes.resize(1); - resolveAggregateFunctionNode(*function_node, "countIf"); + resolveAsCountIfAggregateFunction(*function_node, function_node_arguments_nodes[0]->getResultType()); return; } @@ -102,15 +102,16 @@ public: function_node_arguments_nodes[0] = std::move(nested_if_function_arguments_nodes[0]); function_node_arguments_nodes.resize(1); - resolveAggregateFunctionNode(*function_node, "countIf"); + resolveAsCountIfAggregateFunction(*function_node, function_node_arguments_nodes[0]->getResultType()); return; } /// Rewrite `sum(if(cond, 0, 1))` into `countIf(not(cond))`. if (if_true_condition_value == 0 && if_false_condition_value == 1) { - auto condition_result_type = nested_if_function_arguments_nodes[0]->getResultType(); DataTypePtr not_function_result_type = std::make_shared(); + + const auto & condition_result_type = nested_if_function_arguments_nodes[0]->getResultType(); if (condition_result_type->isNullable()) not_function_result_type = makeNullable(not_function_result_type); @@ -123,23 +124,21 @@ public: function_node_arguments_nodes[0] = std::move(not_function); function_node_arguments_nodes.resize(1); - resolveAggregateFunctionNode(*function_node, "countIf"); + resolveAsCountIfAggregateFunction(*function_node, function_node_arguments_nodes[0]->getResultType()); return; } } private: - static inline void resolveAggregateFunctionNode(FunctionNode & function_node, const String & aggregate_function_name) + static inline void resolveAsCountIfAggregateFunction(FunctionNode & function_node, const DataTypePtr & argument_type) { - auto function_result_type = function_node.getResultType(); - auto function_aggregate_function = function_node.getAggregateFunction(); - AggregateFunctionProperties properties; - auto aggregate_function = AggregateFunctionFactory::instance().get(aggregate_function_name, - function_aggregate_function->getArgumentTypes(), - function_aggregate_function->getParameters(), + auto aggregate_function = AggregateFunctionFactory::instance().get("countIf", + {argument_type}, + function_node.getAggregateFunction()->getParameters(), properties); + auto function_result_type = function_node.getResultType(); function_node.resolveAsAggregateFunction(std::move(aggregate_function), std::move(function_result_type)); } diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 2bc5d70421a..e312a84d0f5 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -258,7 +258,7 @@ M(250, NOT_ENOUGH_BLOCK_NUMBERS) \ M(251, NO_SUCH_REPLICA) \ M(252, TOO_MANY_PARTS) \ - M(253, REPLICA_IS_ALREADY_EXIST) \ + M(253, REPLICA_ALREADY_EXISTS) \ M(254, NO_ACTIVE_REPLICAS) \ M(255, TOO_MANY_RETRIES_TO_FETCH_PARTS) \ M(256, PARTITION_ALREADY_EXISTS) \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index cfc7df6c853..ef1a87c5501 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -604,7 +604,6 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \ M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \ M(Milliseconds, async_insert_busy_timeout_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \ - M(Milliseconds, async_insert_cleanup_timeout_ms, 1000, "Time to wait before each iteration of cleaning up buffers for INSERT queries which don't appear anymore. Only has meaning at server startup.", 0) \ \ M(UInt64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \ M(UInt64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \ @@ -705,6 +704,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) MAKE_OBSOLETE(M, DefaultDatabaseEngine, default_database_engine, DefaultDatabaseEngine::Atomic) \ MAKE_OBSOLETE(M, UInt64, max_pipeline_depth, 0) \ MAKE_OBSOLETE(M, Seconds, temporary_live_view_timeout, 1) \ + MAKE_OBSOLETE(M, Milliseconds, async_insert_cleanup_timeout_ms, 1000) \ /** The section above is for obsolete settings. Do not add anything there. */ diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 20fa11e90e2..c0bc9d3f3a2 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -39,7 +39,7 @@ namespace ErrorCodes extern const int NO_ZOOKEEPER; extern const int LOGICAL_ERROR; extern const int BAD_ARGUMENTS; - extern const int REPLICA_IS_ALREADY_EXIST; + extern const int REPLICA_ALREADY_EXISTS; extern const int DATABASE_REPLICATION_FAILED; extern const int UNKNOWN_DATABASE; extern const int UNKNOWN_TABLE; @@ -297,7 +297,7 @@ void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessL if (is_create_query || replica_host_id != host_id) { throw Exception( - ErrorCodes::REPLICA_IS_ALREADY_EXIST, + ErrorCodes::REPLICA_ALREADY_EXISTS, "Replica {} of shard {} of replicated database at {} already exists. Replica host ID: '{}', current host ID: '{}'", replica_name, shard_name, zookeeper_path, replica_host_id, host_id); } diff --git a/src/Functions/CMakeLists.txt b/src/Functions/CMakeLists.txt index c84e23da85b..93374f933b7 100644 --- a/src/Functions/CMakeLists.txt +++ b/src/Functions/CMakeLists.txt @@ -29,9 +29,9 @@ list (APPEND PRIVATE_LIBS ch_contrib::zlib boost::filesystem divide_impl + ch_contrib::xxHash ) - if (TARGET ch_rust::blake3) list (APPEND PUBLIC_LIBS ch_rust::blake3 @@ -66,8 +66,6 @@ if (TARGET ch_contrib::base64) list (APPEND PRIVATE_LIBS ch_contrib::base64) endif() -list (APPEND PRIVATE_LIBS ch_contrib::lz4) - if (ENABLE_NLP) list (APPEND PRIVATE_LIBS ch_contrib::cld2) endif() diff --git a/src/Functions/FunctionsConversion.h b/src/Functions/FunctionsConversion.h index 208da8a78fe..d7be0ecf701 100644 --- a/src/Functions/FunctionsConversion.h +++ b/src/Functions/FunctionsConversion.h @@ -2297,6 +2297,10 @@ struct ToStringMonotonicity if (const auto * low_cardinality_type = checkAndGetDataType(type_ptr)) type_ptr = low_cardinality_type->getDictionaryType().get(); + /// Order on enum values (which is the order on integers) is completely arbitrary in respect to the order on strings. + if (WhichDataType(type).isEnum()) + return not_monotonic; + /// `toString` function is monotonous if the argument is Date or Date32 or DateTime or String, or non-negative numbers with the same number of symbols. if (checkDataTypes(type_ptr)) return positive; diff --git a/src/Functions/FunctionsDecimalArithmetics.cpp b/src/Functions/FunctionsDecimalArithmetics.cpp new file mode 100644 index 00000000000..f275f169914 --- /dev/null +++ b/src/Functions/FunctionsDecimalArithmetics.cpp @@ -0,0 +1,17 @@ +#include +#include + +namespace DB +{ +REGISTER_FUNCTION(DivideDecimals) +{ + factory.registerFunction>(Documentation( + "Decimal division with given precision. Slower than simple `divide`, but has controlled precision and no sound overflows")); +} + +REGISTER_FUNCTION(MultiplyDecimals) +{ + factory.registerFunction>(Documentation( + "Decimal multiplication with given precision. Slower than simple `divide`, but has controlled precision and no sound overflows")); +} +} diff --git a/src/Functions/FunctionsDecimalArithmetics.h b/src/Functions/FunctionsDecimalArithmetics.h new file mode 100644 index 00000000000..9806d13ed30 --- /dev/null +++ b/src/Functions/FunctionsDecimalArithmetics.h @@ -0,0 +1,457 @@ +#pragma once +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int DECIMAL_OVERFLOW; + extern const int ILLEGAL_COLUMN; + extern const int ILLEGAL_TYPE_OF_ARGUMENT; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int ILLEGAL_DIVISION; +} + + +struct DecimalOpHelpers +{ + /* These functions perform main arithmetic logic. + * As soon as intermediate results may not fit Decimal256 (e.g. 1e36, scale 10), + * we may not operate with Decimals. Later on this big number may be shrunk (e.g. result scale is 0 in the case above). + * That's why we need to store intermediate results in a flexible extendable storage (here we use std::vector) + * Here we operate on numbers using simple digit arithmetic. + * This is the reason these functions are slower than traditional ones. + * + * Here and below we use UInt8 for storing digits (0-9 range with maximum carry of 9 will definitely fit this) + */ + static std::vector multiply(const std::vector & num1, const std::vector & num2) + { + UInt16 const len1 = num1.size(); + UInt16 const len2 = num2.size(); + if (len1 == 0 || len2 == 0) + return {0}; + + std::vector result(len1 + len2, 0); + UInt16 i_n1 = 0; + UInt16 i_n2; + + for (Int32 i = len1 - 1; i >= 0; --i) + { + UInt16 carry = 0; + i_n2 = 0; + for (Int32 j = len2 - 1; j >= 0; --j) + { + if (unlikely(i_n1 + i_n2 >= len1 + len2)) + throw DB::Exception("Numeric overflow: result bigger that Decimal256", ErrorCodes::DECIMAL_OVERFLOW); + UInt16 sum = num1[i] * num2[j] + result[i_n1 + i_n2] + carry; + carry = sum / 10; + result[i_n1 + i_n2] = sum % 10; + ++i_n2; + } + + if (carry > 0) + { + if (unlikely(i_n1 + i_n2 >= len1 + len2)) + throw DB::Exception("Numeric overflow: result bigger that Decimal256", ErrorCodes::DECIMAL_OVERFLOW); + result[i_n1 + i_n2] += carry; + } + + ++i_n1; + } + + // Maximum Int32 value exceeds 2 billion, we can safely use it for array length storing + Int32 i = static_cast(result.size() - 1); + + while (i >= 0 && result[i] == 0) + { + result.pop_back(); + --i; + } + if (i == -1) + return {0}; + + std::reverse(result.begin(), result.end()); + return result; + } + + static std::vector divide(const std::vector & number, const Int256 & divisor) + { + std::vector result; + const auto max_index = number.size() - 1; + + UInt16 idx = 0; + Int256 temp = 0; + + while (temp < divisor && max_index > idx) + { + temp = temp * 10 + number[idx]; + ++idx; + } + + if (unlikely(temp == 0)) + return {0}; + + while (max_index >= idx) + { + result.push_back(temp / divisor); + temp = (temp % divisor) * 10 + number[idx]; + ++idx; + } + result.push_back(temp / divisor); + + return result; + } + + static std::vector toDigits(Int256 x) + { + std::vector result; + if (x >= 10) + result = toDigits(x / 10); + + result.push_back(x % 10); + return result; + } + + static UInt256 fromDigits(const std::vector & digits) + { + Int256 result = 0; + Int256 scale = 0; + for (auto i = digits.rbegin(); i != digits.rend(); ++i) + { + result += DecimalUtils::scaleMultiplier(scale) * (*i); + ++scale; + } + return result; + } +}; + + +struct DivideDecimalsImpl +{ + static constexpr auto name = "divideDecimal"; + + template + static inline Decimal256 + execute(FirstType a, SecondType b, UInt16 scale_a, UInt16 scale_b, UInt16 result_scale) + { + if (b.value == 0) + throw DB::Exception("Division by zero", ErrorCodes::ILLEGAL_DIVISION); + if (a.value == 0) + return Decimal256(0); + + Int256 sign_a = a.value < 0 ? -1 : 1; + Int256 sign_b = b.value < 0 ? -1 : 1; + + std::vector a_digits = DecimalOpHelpers::toDigits(a.value * sign_a); + + while (scale_a < scale_b + result_scale) + { + a_digits.push_back(0); + ++scale_a; + } + + while (scale_a > scale_b + result_scale && !a_digits.empty()) + { + a_digits.pop_back(); + --scale_a; + } + + if (a_digits.empty()) + return Decimal256(0); + + std::vector divided = DecimalOpHelpers::divide(a_digits, b.value * sign_b); + + if (divided.size() > DecimalUtils::max_precision) + throw DB::Exception("Numeric overflow: result bigger that Decimal256", ErrorCodes::DECIMAL_OVERFLOW); + return Decimal256(sign_a * sign_b * DecimalOpHelpers::fromDigits(divided)); + } +}; + + +struct MultiplyDecimalsImpl +{ + static constexpr auto name = "multiplyDecimal"; + + template + static inline Decimal256 + execute(FirstType a, SecondType b, UInt16 scale_a, UInt16 scale_b, UInt16 result_scale) + { + if (a.value == 0 || b.value == 0) + return Decimal256(0); + + Int256 sign_a = a.value < 0 ? -1 : 1; + Int256 sign_b = b.value < 0 ? -1 : 1; + + std::vector a_digits = DecimalOpHelpers::toDigits(a.value * sign_a); + std::vector b_digits = DecimalOpHelpers::toDigits(b.value * sign_b); + + std::vector multiplied = DecimalOpHelpers::multiply(a_digits, b_digits); + + UInt16 product_scale = scale_a + scale_b; + while (product_scale < result_scale) + { + multiplied.push_back(0); + ++product_scale; + } + + while (product_scale > result_scale&& !multiplied.empty()) + { + multiplied.pop_back(); + --product_scale; + } + + if (multiplied.empty()) + return Decimal256(0); + + if (multiplied.size() > DecimalUtils::max_precision) + throw DB::Exception("Numeric overflow: result bigger that Decimal256", ErrorCodes::DECIMAL_OVERFLOW); + + return Decimal256(sign_a * sign_b * DecimalOpHelpers::fromDigits(multiplied)); + } +}; + + +template +struct Processor +{ + const Transform transform; + + explicit Processor(Transform transform_) + : transform(std::move(transform_)) + {} + + template + void NO_INLINE + vectorConstant(const FirstArgVectorType & vec_first, const SecondArgType second_value, + PaddedPODArray & vec_to, UInt16 scale_a, UInt16 scale_b, UInt16 result_scale) const + { + size_t size = vec_first.size(); + vec_to.resize(size); + + for (size_t i = 0; i < size; ++i) + vec_to[i] = transform.execute(vec_first[i], second_value, scale_a, scale_b, result_scale); + } + + template + void NO_INLINE + vectorVector(const FirstArgVectorType & vec_first, const SecondArgVectorType & vec_second, + PaddedPODArray & vec_to, UInt16 scale_a, UInt16 scale_b, UInt16 result_scale) const + { + size_t size = vec_first.size(); + vec_to.resize(size); + + for (size_t i = 0; i < size; ++i) + vec_to[i] = transform.execute(vec_first[i], vec_second[i], scale_a, scale_b, result_scale); + } + + template + void NO_INLINE + constantVector(const FirstArgType & first_value, const SecondArgVectorType & vec_second, + PaddedPODArray & vec_to, UInt16 scale_a, UInt16 scale_b, UInt16 result_scale) const + { + size_t size = vec_second.size(); + vec_to.resize(size); + + for (size_t i = 0; i < size; ++i) + vec_to[i] = transform.execute(first_value, vec_second[i], scale_a, scale_b, result_scale); + } +}; + + +template +struct DecimalArithmeticsImpl +{ + static ColumnPtr execute(Transform transform, const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type) + { + using FirstArgValueType = typename FirstArgType::FieldType; + using FirstArgColumnType = typename FirstArgType::ColumnType; + using SecondArgValueType = typename SecondArgType::FieldType; + using SecondArgColumnType = typename SecondArgType::ColumnType; + using ResultColumnType = typename ResultType::ColumnType; + + UInt16 scale_a = getDecimalScale(*arguments[0].type); + UInt16 scale_b = getDecimalScale(*arguments[1].type); + UInt16 result_scale = getDecimalScale(*result_type->getPtr()); + + auto op = Processor{std::move(transform)}; + + auto result_col = result_type->createColumn(); + auto col_to = assert_cast(result_col.get()); + + const auto * first_col = checkAndGetColumn(arguments[0].column.get()); + const auto * second_col = checkAndGetColumn(arguments[1].column.get()); + const auto * first_col_const = typeid_cast(arguments[0].column.get()); + const auto * second_col_const = typeid_cast(arguments[1].column.get()); + + if (first_col) + { + if (second_col_const) + op.vectorConstant(first_col->getData(), second_col_const->template getValue(), col_to->getData(), scale_a, scale_b, result_scale); + else + op.vectorVector(first_col->getData(), second_col->getData(), col_to->getData(), scale_a, scale_b, result_scale); + } + else if (first_col_const) + { + op.constantVector(first_col_const->template getValue(), second_col->getData(), col_to->getData(), scale_a, scale_b, result_scale); + } + else + { + throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of first argument of function {}", + arguments[0].column->getName(), Transform::name); + } + + return result_col; + } +}; + + +template +class FunctionsDecimalArithmetics : public IFunction +{ +public: + static constexpr auto name = Transform::name; + static FunctionPtr create(ContextPtr) { return std::make_shared(); } + + String getName() const override + { + return name; + } + + bool isVariadic() const override { return true; } + size_t getNumberOfArguments() const override { return 0; } + bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; } + + DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override + { + if (arguments.size() != 2 && arguments.size() != 3) + throw Exception("Number of arguments for function " + getName() + " does not match: 2 or 3 expected", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + if (!isDecimal(arguments[0].type) || !isDecimal(arguments[1].type)) + throw Exception("Arguments for " + getName() + " function must be Decimal", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + UInt8 scale = std::max(getDecimalScale(*arguments[0].type->getPtr()), getDecimalScale(*arguments[1].type->getPtr())); + + if (arguments.size() == 3) + { + WhichDataType which_scale(arguments[2].type.get()); + + if (!which_scale.isUInt8()) + throw Exception( + "Illegal type " + arguments[2].type->getName() + " of third argument of function " + getName() + + ". Should be constant UInt8 from range[0, 76]", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + const ColumnConst * scale_column = checkAndGetColumnConst(arguments[2].column.get()); + + if (!scale_column) + throw Exception( + "Illegal column of third argument of function " + getName() + ". Should be constant UInt8", + ErrorCodes::ILLEGAL_COLUMN); + + scale = scale_column->getValue(); + } + + /** + At compile time, result is unknown. We only know the Scale (number of fractional digits) at runtime. + Also nothing is known about size of whole part. + As in simple division/multiplication for decimals, we scale the result up, but is is explicit here and no downscale is performed. + It guarantees that result will have given scale and it can also be MANUALLY converted to other decimal types later. + **/ + if (scale > DecimalUtils::max_precision) + throw Exception("Illegal value of third argument of function " + this->getName() + ": must be integer in range [0, 76]", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + return std::make_shared(DecimalUtils::max_precision, scale); + } + + bool useDefaultImplementationForConstants() const override { return true; } + ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {2}; } + + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t /*input_rows_count*/) const override + { + return resolveOverload(arguments, result_type); + } + +private: + //long resolver to call proper templated func + ColumnPtr resolveOverload(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type) const + { + WhichDataType which_dividend(arguments[0].type.get()); + WhichDataType which_divisor(arguments[1].type.get()); + if (which_dividend.isDecimal32()) + { + using DividendType = DataTypeDecimal32; + if (which_divisor.isDecimal32()) + return DecimalArithmeticsImpl::execute(Transform{}, arguments, result_type); + else if (which_divisor.isDecimal64()) + return DecimalArithmeticsImpl::execute(Transform{}, arguments, result_type); + else if (which_divisor.isDecimal128()) + return DecimalArithmeticsImpl::execute(Transform{}, arguments, result_type); + else if (which_divisor.isDecimal256()) + return DecimalArithmeticsImpl::execute(Transform{}, arguments, result_type); + } + + else if (which_dividend.isDecimal64()) + { + using DividendType = DataTypeDecimal64; + if (which_divisor.isDecimal32()) + return DecimalArithmeticsImpl::execute(Transform{}, arguments, result_type); + else if (which_divisor.isDecimal64()) + return DecimalArithmeticsImpl::execute(Transform{}, arguments, result_type); + else if (which_divisor.isDecimal128()) + return DecimalArithmeticsImpl::execute(Transform{}, arguments, result_type); + else if (which_divisor.isDecimal256()) + return DecimalArithmeticsImpl::execute(Transform{}, arguments, result_type); + + } + + else if (which_dividend.isDecimal128()) + { + using DividendType = DataTypeDecimal128; + if (which_divisor.isDecimal32()) + return DecimalArithmeticsImpl::execute(Transform{}, arguments, result_type); + else if (which_divisor.isDecimal64()) + return DecimalArithmeticsImpl::execute(Transform{}, arguments, result_type); + else if (which_divisor.isDecimal128()) + return DecimalArithmeticsImpl::execute(Transform{}, arguments, result_type); + else if (which_divisor.isDecimal256()) + return DecimalArithmeticsImpl::execute(Transform{}, arguments, result_type); + + } + + else if (which_dividend.isDecimal256()) + { + using DividendType = DataTypeDecimal256; + if (which_divisor.isDecimal32()) + return DecimalArithmeticsImpl::execute(Transform{}, arguments, result_type); + else if (which_divisor.isDecimal64()) + return DecimalArithmeticsImpl::execute(Transform{}, arguments, result_type); + else if (which_divisor.isDecimal128()) + return DecimalArithmeticsImpl::execute(Transform{}, arguments, result_type); + else if (which_divisor.isDecimal256()) + return DecimalArithmeticsImpl::execute(Transform{}, arguments, result_type); + + } + + // the compiler is happy now + return nullptr; + } +}; + +} + diff --git a/src/Functions/FunctionsHashing.cpp b/src/Functions/FunctionsHashing.cpp index fb631deb4b1..8f616b0be94 100644 --- a/src/Functions/FunctionsHashing.cpp +++ b/src/Functions/FunctionsHashing.cpp @@ -39,6 +39,13 @@ REGISTER_FUNCTION(Hashing) factory.registerFunction(); factory.registerFunction(); + factory.registerFunction( + { + "Calculates value of XXH3 64-bit hash function. Refer to https://github.com/Cyan4973/xxHash for detailed documentation.", + Documentation::Examples{{"hash", "SELECT xxh3('ClickHouse')"}}, + Documentation::Categories{"Hash"} + }, + FunctionFactory::CaseSensitive); factory.registerFunction(); diff --git a/src/Functions/FunctionsHashing.h b/src/Functions/FunctionsHashing.h index ec0a489471b..ee5f3ea86b5 100644 --- a/src/Functions/FunctionsHashing.h +++ b/src/Functions/FunctionsHashing.h @@ -3,12 +3,18 @@ #include #include #include +#include #include #include -#include #include "config.h" +#ifdef __clang__ +# pragma clang diagnostic push +# pragma clang diagnostic ignored "-Wused-but-marked-unused" +#endif +#include + #if USE_BLAKE3 # include #endif @@ -17,7 +23,6 @@ #include #include #include -#include #if USE_SSL # include @@ -588,7 +593,7 @@ struct ImplXxHash32 static constexpr auto name = "xxHash32"; using ReturnType = UInt32; - static auto apply(const char * s, const size_t len) { return XXH32(s, len, 0); } + static auto apply(const char * s, const size_t len) { return XXH_INLINE_XXH32(s, len, 0); } /** * With current implementation with more than 1 arguments it will give the results * non-reproducible from outside of CH. @@ -609,7 +614,24 @@ struct ImplXxHash64 using ReturnType = UInt64; using uint128_t = CityHash_v1_0_2::uint128; - static auto apply(const char * s, const size_t len) { return XXH64(s, len, 0); } + static auto apply(const char * s, const size_t len) { return XXH_INLINE_XXH64(s, len, 0); } + + /* + With current implementation with more than 1 arguments it will give the results + non-reproducible from outside of CH. (see comment on ImplXxHash32). + */ + static auto combineHashes(UInt64 h1, UInt64 h2) { return CityHash_v1_0_2::Hash128to64(uint128_t(h1, h2)); } + + static constexpr bool use_int_hash_for_pods = false; +}; + +struct ImplXXH3 +{ + static constexpr auto name = "xxh3"; + using ReturnType = UInt64; + using uint128_t = CityHash_v1_0_2::uint128; + + static auto apply(const char * s, const size_t len) { return XXH_INLINE_XXH3_64bits(s, len); } /* With current implementation with more than 1 arguments it will give the results @@ -1508,7 +1530,12 @@ using FunctionHiveHash = FunctionAnyHash; using FunctionXxHash32 = FunctionAnyHash; using FunctionXxHash64 = FunctionAnyHash; +using FunctionXXH3 = FunctionAnyHash; using FunctionWyHash64 = FunctionAnyHash; using FunctionBLAKE3 = FunctionStringHashFixedString; } + +#ifdef __clang__ +# pragma clang diagnostic pop +#endif diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index bf85affcb90..a61f4bdc530 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -48,15 +48,22 @@ namespace ErrorCodes extern const int TIMEOUT_EXCEEDED; extern const int UNKNOWN_EXCEPTION; extern const int UNKNOWN_FORMAT; + extern const int BAD_ARGUMENTS; } AsynchronousInsertQueue::InsertQuery::InsertQuery(const ASTPtr & query_, const Settings & settings_) - : query(query_->clone()), settings(settings_) + : query(query_->clone()) + , query_str(queryToString(query)) + , settings(settings_) + , hash(calculateHash()) { } AsynchronousInsertQueue::InsertQuery::InsertQuery(const InsertQuery & other) - : query(other.query->clone()), settings(other.settings) + : query(other.query->clone()) + , query_str(other.query_str) + , settings(other.settings) + , hash(other.hash) { } @@ -66,29 +73,33 @@ AsynchronousInsertQueue::InsertQuery::operator=(const InsertQuery & other) if (this != &other) { query = other.query->clone(); + query_str = other.query_str; settings = other.settings; + hash = other.hash; } return *this; } -UInt64 AsynchronousInsertQueue::InsertQuery::Hash::operator()(const InsertQuery & insert_query) const +UInt128 AsynchronousInsertQueue::InsertQuery::calculateHash() const { - SipHash hash; - insert_query.query->updateTreeHash(hash); + SipHash siphash; + query->updateTreeHash(siphash); - for (const auto & setting : insert_query.settings.allChanged()) + for (const auto & setting : settings.allChanged()) { - hash.update(setting.getName()); - applyVisitor(FieldVisitorHash(hash), setting.getValue()); + siphash.update(setting.getName()); + applyVisitor(FieldVisitorHash(siphash), setting.getValue()); } - return hash.get64(); + UInt128 res; + siphash.get128(res); + return res; } bool AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other) const { - return queryToString(query) == queryToString(other.query) && settings == other.settings; + return query_str == other.query_str && settings == other.settings; } AsynchronousInsertQueue::InsertData::Entry::Entry(String && bytes_, String && query_id_) @@ -100,43 +111,31 @@ AsynchronousInsertQueue::InsertData::Entry::Entry(String && bytes_, String && qu void AsynchronousInsertQueue::InsertData::Entry::finish(std::exception_ptr exception_) { - std::lock_guard lock(mutex); - finished = true; + if (finished.exchange(true)) + return; + if (exception_) + { + promise.set_exception(exception_); ProfileEvents::increment(ProfileEvents::FailedAsyncInsertQuery, 1); - exception = exception_; - cv.notify_all(); + } + else + { + promise.set_value(); + } } -bool AsynchronousInsertQueue::InsertData::Entry::wait(const Milliseconds & timeout) const -{ - std::unique_lock lock(mutex); - return cv.wait_for(lock, timeout, [&] { return finished; }); -} - -bool AsynchronousInsertQueue::InsertData::Entry::isFinished() const -{ - std::lock_guard lock(mutex); - return finished; -} - -std::exception_ptr AsynchronousInsertQueue::InsertData::Entry::getException() const -{ - std::lock_guard lock(mutex); - return exception; -} - - -AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, Milliseconds cleanup_timeout_) +AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_) : WithContext(context_) - , cleanup_timeout(cleanup_timeout_) + , pool_size(pool_size_) + , queue_shards(pool_size) , pool(pool_size) - , dump_by_first_update_thread(&AsynchronousInsertQueue::busyCheck, this) - , cleanup_thread(&AsynchronousInsertQueue::cleanup, this) { - using namespace std::chrono; + if (!pool_size) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "pool_size cannot be zero"); - assert(pool_size); + for (size_t i = 0; i < pool_size; ++i) + dump_by_first_update_threads.emplace_back([this, i] { processBatchDeadlines(i); }); } AsynchronousInsertQueue::~AsynchronousInsertQueue() @@ -144,34 +143,31 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue() /// TODO: add a setting for graceful shutdown. LOG_TRACE(log, "Shutting down the asynchronous insertion queue"); - shutdown = true; - { - std::lock_guard lock(deadline_mutex); - are_tasks_available.notify_one(); - } - { - std::lock_guard lock(cleanup_mutex); - cleanup_can_run.notify_one(); - } - assert(dump_by_first_update_thread.joinable()); - dump_by_first_update_thread.join(); + for (size_t i = 0; i < pool_size; ++i) + { + auto & shard = queue_shards[i]; - assert(cleanup_thread.joinable()); - cleanup_thread.join(); + shard.are_tasks_available.notify_one(); + assert(dump_by_first_update_threads[i].joinable()); + dump_by_first_update_threads[i].join(); + + { + std::lock_guard lock(shard.mutex); + + for (auto & [_, elem] : shard.queue) + { + for (const auto & entry : elem.data->entries) + { + entry->finish(std::make_exception_ptr(Exception( + ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout exceeded)"))); + } + } + } + } pool.wait(); - - std::lock_guard lock(currently_processing_mutex); - for (const auto & [_, entry] : currently_processing_queries) - { - if (!entry->isFinished()) - entry->finish(std::make_exception_ptr(Exception( - ErrorCodes::TIMEOUT_EXCEEDED, - "Wait for async insert timeout exceeded)"))); - } - LOG_TRACE(log, "Asynchronous insertion queue finished"); } @@ -185,7 +181,7 @@ void AsynchronousInsertQueue::scheduleDataProcessingJob(const InsertQuery & key, }); } -void AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context) +std::future AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context) { query = query->clone(); const auto & settings = query_context->getSettingsRef(); @@ -214,97 +210,77 @@ void AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context) quota->used(QuotaType::WRITTEN_BYTES, bytes.size()); auto entry = std::make_shared(std::move(bytes), query_context->getCurrentQueryId()); + InsertQuery key{query, settings}; + InsertDataPtr data_to_process; + std::future insert_future; + + auto shard_num = key.hash % pool_size; + auto & shard = queue_shards[shard_num]; { - /// Firstly try to get entry from queue without exclusive lock. - std::shared_lock read_lock(rwlock); - if (auto it = queue.find(key); it != queue.end()) + std::lock_guard lock(shard.mutex); + + auto [it, inserted] = shard.iterators.try_emplace(key.hash); + if (inserted) { - pushImpl(std::move(entry), it); - return; + auto now = std::chrono::steady_clock::now(); + auto timeout = now + Milliseconds{key.settings.async_insert_busy_timeout_ms}; + it->second = shard.queue.emplace(timeout, Container{key, std::make_unique()}).first; } + + auto queue_it = it->second; + auto & data = queue_it->second.data; + size_t entry_data_size = entry->bytes.size(); + + assert(data); + data->size_in_bytes += entry_data_size; + data->entries.emplace_back(entry); + insert_future = entry->getFuture(); + + LOG_TRACE(log, "Have {} pending inserts with total {} bytes of data for query '{}'", + data->entries.size(), data->size_in_bytes, key.query_str); + + /// Here we check whether we hit the limit on maximum data size in the buffer. + /// And use setting from query context. + /// It works, because queries with the same set of settings are already grouped together. + if (data->size_in_bytes > key.settings.async_insert_max_data_size) + { + data_to_process = std::move(data); + shard.iterators.erase(it); + shard.queue.erase(queue_it); + } + + CurrentMetrics::add(CurrentMetrics::PendingAsyncInsert); + ProfileEvents::increment(ProfileEvents::AsyncInsertQuery); + ProfileEvents::increment(ProfileEvents::AsyncInsertBytes, entry_data_size); } - std::lock_guard write_lock(rwlock); - auto it = queue.emplace(key, std::make_shared()).first; - pushImpl(std::move(entry), it); + if (data_to_process) + scheduleDataProcessingJob(key, std::move(data_to_process), getContext()); + else + shard.are_tasks_available.notify_one(); + + return insert_future; } -void AsynchronousInsertQueue::pushImpl(InsertData::EntryPtr entry, QueueIterator it) +void AsynchronousInsertQueue::processBatchDeadlines(size_t shard_num) { - auto & [data_mutex, data] = *it->second; - std::lock_guard data_lock(data_mutex); + auto & shard = queue_shards[shard_num]; - if (!data) - { - auto now = std::chrono::steady_clock::now(); - data = std::make_unique(now); - - std::lock_guard lock(deadline_mutex); - deadline_queue.insert({now + Milliseconds{it->first.settings.async_insert_busy_timeout_ms}, it}); - are_tasks_available.notify_one(); - } - - size_t entry_data_size = entry->bytes.size(); - - data->size += entry_data_size; - data->entries.emplace_back(entry); - - { - std::lock_guard currently_processing_lock(currently_processing_mutex); - currently_processing_queries.emplace(entry->query_id, entry); - } - - LOG_TRACE(log, "Have {} pending inserts with total {} bytes of data for query '{}'", - data->entries.size(), data->size, queryToString(it->first.query)); - - /// Here we check whether we hit the limit on maximum data size in the buffer. - /// And use setting from query context! - /// It works, because queries with the same set of settings are already grouped together. - if (data->size > it->first.settings.async_insert_max_data_size) - scheduleDataProcessingJob(it->first, std::move(data), getContext()); - - CurrentMetrics::add(CurrentMetrics::PendingAsyncInsert); - ProfileEvents::increment(ProfileEvents::AsyncInsertQuery); - ProfileEvents::increment(ProfileEvents::AsyncInsertBytes, entry_data_size); -} - -void AsynchronousInsertQueue::waitForProcessingQuery(const String & query_id, const Milliseconds & timeout) -{ - InsertData::EntryPtr entry; - - { - std::lock_guard lock(currently_processing_mutex); - auto it = currently_processing_queries.find(query_id); - if (it == currently_processing_queries.end()) - return; - - entry = it->second; - } - - bool finished = entry->wait(timeout); - - if (!finished) - throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout ({} ms) exceeded)", timeout.count()); - - if (auto exception = entry->getException()) - std::rethrow_exception(exception); -} - -void AsynchronousInsertQueue::busyCheck() -{ while (!shutdown) { - std::vector entries_to_flush; + std::vector entries_to_flush; { - std::unique_lock deadline_lock(deadline_mutex); - are_tasks_available.wait_for(deadline_lock, Milliseconds(getContext()->getSettingsRef().async_insert_busy_timeout_ms), [this]() + std::unique_lock lock(shard.mutex); + + shard.are_tasks_available.wait_for(lock, + Milliseconds(getContext()->getSettingsRef().async_insert_busy_timeout_ms), [&shard, this] { if (shutdown) return true; - if (!deadline_queue.empty() && deadline_queue.begin()->first < std::chrono::steady_clock::now()) + if (!shard.queue.empty() && shard.queue.begin()->first < std::chrono::steady_clock::now()) return true; return false; @@ -317,91 +293,22 @@ void AsynchronousInsertQueue::busyCheck() while (true) { - if (deadline_queue.empty() || deadline_queue.begin()->first > now) + if (shard.queue.empty() || shard.queue.begin()->first > now) break; - entries_to_flush.emplace_back(deadline_queue.begin()->second); - deadline_queue.erase(deadline_queue.begin()); + auto it = shard.queue.begin(); + shard.iterators.erase(it->second.key.hash); + + entries_to_flush.emplace_back(std::move(it->second)); + shard.queue.erase(it); } } - std::shared_lock read_lock(rwlock); for (auto & entry : entries_to_flush) - { - auto & [key, elem] = *entry; - std::lock_guard data_lock(elem->mutex); - if (!elem->data) - continue; - - scheduleDataProcessingJob(key, std::move(elem->data), getContext()); - } + scheduleDataProcessingJob(entry.key, std::move(entry.data), getContext()); } } -void AsynchronousInsertQueue::cleanup() -{ - while (true) - { - { - std::unique_lock cleanup_lock(cleanup_mutex); - cleanup_can_run.wait_for(cleanup_lock, Milliseconds(cleanup_timeout), [this]() -> bool { return shutdown; }); - - if (shutdown) - return; - } - - std::vector keys_to_remove; - - { - std::shared_lock read_lock(rwlock); - - for (auto & [key, elem] : queue) - { - std::lock_guard data_lock(elem->mutex); - if (!elem->data) - keys_to_remove.push_back(key); - } - } - - if (!keys_to_remove.empty()) - { - std::lock_guard write_lock(rwlock); - size_t total_removed = 0; - - for (const auto & key : keys_to_remove) - { - auto it = queue.find(key); - if (it != queue.end() && !it->second->data) - { - queue.erase(it); - ++total_removed; - } - } - - if (total_removed) - LOG_TRACE(log, "Removed stale entries for {} queries from asynchronous insertion queue", total_removed); - } - - { - std::vector ids_to_remove; - std::lock_guard lock(currently_processing_mutex); - - for (const auto & [query_id, entry] : currently_processing_queries) - if (entry->isFinished()) - ids_to_remove.push_back(query_id); - - if (!ids_to_remove.empty()) - { - for (const auto & id : ids_to_remove) - currently_processing_queries.erase(id); - - LOG_TRACE(log, "Removed {} finished entries from asynchronous insertion queue", ids_to_remove.size()); - } - } - } -} - - static void appendElementsToLogSafe( AsynchronousInsertLog & log, std::vector elements, @@ -464,7 +371,7 @@ try { current_exception = e.displayText(); LOG_ERROR(log, "Failed parsing for query '{}' with query id {}. {}", - queryToString(key.query), current_entry->query_id, current_exception); + key.query_str, current_entry->query_id, current_exception); for (const auto & column : result_columns) if (column->size() > total_rows) @@ -546,7 +453,7 @@ try completed_executor.execute(); LOG_INFO(log, "Flushed {} rows, {} bytes for query '{}'", - total_rows, total_bytes, queryToString(key.query)); + total_rows, total_bytes, key.query_str); } catch (...) { diff --git a/src/Interpreters/AsynchronousInsertQueue.h b/src/Interpreters/AsynchronousInsertQueue.h index fcf4e3d98d2..71a3bce235e 100644 --- a/src/Interpreters/AsynchronousInsertQueue.h +++ b/src/Interpreters/AsynchronousInsertQueue.h @@ -4,10 +4,7 @@ #include #include #include - -#include -#include - +#include namespace DB { @@ -19,25 +16,29 @@ class AsynchronousInsertQueue : public WithContext public: using Milliseconds = std::chrono::milliseconds; - AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, Milliseconds cleanup_timeout); + AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_); ~AsynchronousInsertQueue(); - void push(ASTPtr query, ContextPtr query_context); - void waitForProcessingQuery(const String & query_id, const Milliseconds & timeout); + std::future push(ASTPtr query, ContextPtr query_context); + size_t getPoolSize() const { return pool_size; } private: struct InsertQuery { + public: ASTPtr query; + String query_str; Settings settings; + UInt128 hash; InsertQuery(const ASTPtr & query_, const Settings & settings_); InsertQuery(const InsertQuery & other); InsertQuery & operator=(const InsertQuery & other); - bool operator==(const InsertQuery & other) const; - struct Hash { UInt64 operator()(const InsertQuery & insert_query) const; }; + + private: + UInt128 calculateHash() const; }; struct InsertData @@ -47,109 +48,84 @@ private: public: const String bytes; const String query_id; - std::chrono::time_point create_time; + const std::chrono::time_point create_time; Entry(String && bytes_, String && query_id_); void finish(std::exception_ptr exception_ = nullptr); - bool wait(const Milliseconds & timeout) const; - bool isFinished() const; - std::exception_ptr getException() const; + std::future getFuture() { return promise.get_future(); } + bool isFinished() const { return finished; } private: - mutable std::mutex mutex; - mutable std::condition_variable cv; - - bool finished = false; - std::exception_ptr exception; + std::promise promise; + std::atomic_bool finished = false; }; - explicit InsertData(std::chrono::steady_clock::time_point now) - : first_update(now) - {} - using EntryPtr = std::shared_ptr; std::list entries; - size_t size = 0; - - /// Timestamp of the first insert into queue, or after the last queue dump. - /// Used to detect for how long the queue is active, so we can dump it by timer. - std::chrono::time_point first_update; + size_t size_in_bytes = 0; }; using InsertDataPtr = std::unique_ptr; - /// A separate container, that holds a data and a mutex for it. - /// When it's needed to process current chunk of data, it can be moved for processing - /// and new data can be recreated without holding a lock during processing. struct Container { - std::mutex mutex; + InsertQuery key; InsertDataPtr data; }; - using Queue = std::unordered_map, InsertQuery::Hash>; - using QueueIterator = Queue::iterator; /// Ordered container - using DeadlineQueue = std::map; + /// Key is a timestamp of the first insert into batch. + /// Used to detect for how long the batch is active, so we can dump it by timer. + using Queue = std::map; + using QueueIterator = Queue::iterator; + using QueueIteratorByKey = std::unordered_map; + struct QueueShard + { + mutable std::mutex mutex; + mutable std::condition_variable are_tasks_available; - mutable std::shared_mutex rwlock; - Queue queue; + Queue queue; + QueueIteratorByKey iterators; + }; - /// This is needed only for using inside cleanup() function and correct signaling about shutdown - mutable std::mutex cleanup_mutex; - mutable std::condition_variable cleanup_can_run; - - mutable std::mutex deadline_mutex; - mutable std::condition_variable are_tasks_available; - DeadlineQueue deadline_queue; - - using QueryIdToEntry = std::unordered_map; - mutable std::mutex currently_processing_mutex; - QueryIdToEntry currently_processing_queries; + const size_t pool_size; + std::vector queue_shards; /// Logic and events behind queue are as follows: - /// - busy_timeout: if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't - /// grow for a long period of time and users will be able to select new data in deterministic manner. - /// - stale_timeout: if queue is stale for too long, then we dump the data too, so that users will be able to select the last - /// piece of inserted data. + /// - async_insert_busy_timeout_ms: + /// if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't + /// grow for a long period of time and users will be able to select new data in deterministic manner. /// - /// During processing incoming INSERT queries we can also check whether the maximum size of data in buffer is reached (async_insert_max_data_size setting) - /// If so, then again we dump the data. - - const Milliseconds cleanup_timeout; + /// During processing incoming INSERT queries we can also check whether the maximum size of data in buffer is reached + /// (async_insert_max_data_size setting). If so, then again we dump the data. std::atomic shutdown{false}; - ThreadPool pool; /// dump the data only inside this pool. - ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck() - ThreadFromGlobalPool cleanup_thread; /// uses busy_timeout and cleanup() + /// Dump the data only inside this pool. + ThreadPool pool; + + /// Uses async_insert_busy_timeout_ms and processBatchDeadlines() + std::vector dump_by_first_update_threads; Poco::Logger * log = &Poco::Logger::get("AsynchronousInsertQueue"); - void busyCheck(); - void cleanup(); - - /// Should be called with shared or exclusively locked 'rwlock'. - void pushImpl(InsertData::EntryPtr entry, QueueIterator it); - + void processBatchDeadlines(size_t shard_num); void scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context); + static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context); template static void finishWithException(const ASTPtr & query, const std::list & entries, const E & exception); - /// @param timeout - time to wait - /// @return true if shutdown requested - bool waitForShutdown(const Milliseconds & timeout); - public: - auto getQueueLocked() const + auto getQueueLocked(size_t shard_num) const { - std::shared_lock lock(rwlock); - return std::make_pair(std::ref(queue), std::move(lock)); + auto & shard = queue_shards[shard_num]; + std::unique_lock lock(shard.mutex); + return std::make_pair(std::ref(shard.queue), std::move(lock)); } }; diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index b44db316f90..c2680e27444 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -592,13 +592,12 @@ static std::tuple executeQueryImpl( quota->checkExceeded(QuotaType::ERRORS); } - queue->push(ast, context); + auto insert_future = queue->push(ast, context); if (settings.wait_for_async_insert) { auto timeout = settings.wait_for_async_insert_timeout.totalMilliseconds(); - auto query_id = context->getCurrentQueryId(); - auto source = std::make_shared(query_id, timeout, *queue); + auto source = std::make_shared(std::move(insert_future), timeout); res.pipeline = QueryPipeline(Pipe(std::move(source))); } diff --git a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp index e9b01ec7dda..8b546f48116 100644 --- a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp +++ b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp @@ -324,14 +324,31 @@ static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr &>(*offsets_column).getData(); offsets_data.reserve(arrow_column->length()); + uint64_t start_offset = 0u; + for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i) { arrow::ListArray & list_chunk = dynamic_cast(*(arrow_column->chunk(chunk_i))); auto arrow_offsets_array = list_chunk.offsets(); auto & arrow_offsets = dynamic_cast(*arrow_offsets_array); - auto start = offsets_data.back(); + + /* + * It seems like arrow::ListArray::values() (nested column data) might or might not be shared across chunks. + * When it is shared, the offsets will be monotonically increasing. Otherwise, the offsets will be zero based. + * In order to account for both cases, the starting offset is updated whenever a zero-based offset is found. + * More info can be found in: https://lists.apache.org/thread/rrwfb9zo2dc58dhd9rblf20xd7wmy7jm and + * https://github.com/ClickHouse/ClickHouse/pull/43297 + * */ + if (list_chunk.offset() == 0) + { + start_offset = offsets_data.back(); + } + for (int64_t i = 1; i < arrow_offsets.length(); ++i) - offsets_data.emplace_back(start + arrow_offsets.Value(i)); + { + auto offset = arrow_offsets.Value(i); + offsets_data.emplace_back(start_offset + offset); + } } return offsets_column; } @@ -467,8 +484,23 @@ static std::shared_ptr getNestedArrowColumn(std::shared_ptr for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i) { arrow::ListArray & list_chunk = dynamic_cast(*(arrow_column->chunk(chunk_i))); - std::shared_ptr chunk = list_chunk.values(); - array_vector.emplace_back(std::move(chunk)); + + /* + * It seems like arrow::ListArray::values() (nested column data) might or might not be shared across chunks. + * Therefore, simply appending arrow::ListArray::values() could lead to duplicated data to be appended. + * To properly handle this, arrow::ListArray::values() needs to be sliced based on the chunk offsets. + * arrow::ListArray::Flatten does that. More info on: https://lists.apache.org/thread/rrwfb9zo2dc58dhd9rblf20xd7wmy7jm and + * https://github.com/ClickHouse/ClickHouse/pull/43297 + * */ + auto flatten_result = list_chunk.Flatten(); + if (flatten_result.ok()) + { + array_vector.emplace_back(flatten_result.ValueOrDie()); + } + else + { + throw Exception(ErrorCodes::INCORRECT_DATA, "Failed to flatten chunk '{}' of column of type '{}' ", chunk_i, arrow_column->type()->id()); + } } return std::make_shared(array_vector); } diff --git a/src/Processors/Sources/WaitForAsyncInsertSource.h b/src/Processors/Sources/WaitForAsyncInsertSource.h index 40871a59125..1029c164941 100644 --- a/src/Processors/Sources/WaitForAsyncInsertSource.h +++ b/src/Processors/Sources/WaitForAsyncInsertSource.h @@ -6,18 +6,24 @@ namespace DB { +namespace ErrorCodes +{ + extern const int TIMEOUT_EXCEEDED; + extern const int LOGICAL_ERROR; +} + /// Source, that allow to wait until processing of /// asynchronous insert for specified query_id will be finished. class WaitForAsyncInsertSource : public ISource, WithContext { public: WaitForAsyncInsertSource( - const String & query_id_, size_t timeout_ms_, AsynchronousInsertQueue & queue_) + std::future insert_future_, size_t timeout_ms_) : ISource(Block()) - , query_id(query_id_) + , insert_future(std::move(insert_future_)) , timeout_ms(timeout_ms_) - , queue(queue_) { + assert(insert_future.valid()); } String getName() const override { return "WaitForAsyncInsert"; } @@ -25,14 +31,20 @@ public: protected: Chunk generate() override { - queue.waitForProcessingQuery(query_id, std::chrono::milliseconds(timeout_ms)); + auto status = insert_future.wait_for(std::chrono::milliseconds(timeout_ms)); + if (status == std::future_status::deferred) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: got future in deferred state"); + + if (status == std::future_status::timeout) + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout ({} ms) exceeded)", timeout_ms); + + insert_future.get(); return Chunk(); } private: - String query_id; + std::future insert_future; size_t timeout_ms; - AsynchronousInsertQueue & queue; }; } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 8a1e4ebaec8..d8e24a655ec 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -130,7 +130,7 @@ namespace ErrorCodes extern const int NO_ZOOKEEPER; extern const int INCORRECT_DATA; extern const int INCOMPATIBLE_COLUMNS; - extern const int REPLICA_IS_ALREADY_EXIST; + extern const int REPLICA_ALREADY_EXISTS; extern const int NO_REPLICA_HAS_PART; extern const int LOGICAL_ERROR; extern const int TOO_MANY_UNEXPECTED_DATA_PARTS; @@ -778,7 +778,7 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr /// Do not use LOGICAL_ERROR code, because it may happen if user has specified wrong zookeeper_path throw Exception("Cannot create table, because it is created concurrently every time " "or because of wrong zookeeper_path " - "or because of logical error", ErrorCodes::REPLICA_IS_ALREADY_EXIST); + "or because of logical error", ErrorCodes::REPLICA_ALREADY_EXISTS); } void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metadata_snapshot) @@ -842,7 +842,7 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada switch (code) { case Coordination::Error::ZNODEEXISTS: - throw Exception(ErrorCodes::REPLICA_IS_ALREADY_EXIST, "Replica {} already exists", replica_path); + throw Exception(ErrorCodes::REPLICA_ALREADY_EXISTS, "Replica {} already exists", replica_path); case Coordination::Error::ZBADVERSION: LOG_ERROR(log, "Retrying createReplica(), because some other replicas were created at the same time"); break; diff --git a/src/Storages/System/StorageSystemAsynchronousInserts.cpp b/src/Storages/System/StorageSystemAsynchronousInserts.cpp index 5ebdb828c34..15258ccfd7f 100644 --- a/src/Storages/System/StorageSystemAsynchronousInserts.cpp +++ b/src/Storages/System/StorageSystemAsynchronousInserts.cpp @@ -27,8 +27,6 @@ NamesAndTypesList StorageSystemAsynchronousInserts::getNamesAndTypes() {"total_bytes", std::make_shared()}, {"entries.query_id", std::make_shared(std::make_shared())}, {"entries.bytes", std::make_shared(std::make_shared())}, - {"entries.finished", std::make_shared(std::make_shared())}, - {"entries.exception", std::make_shared(std::make_shared())}, }; } @@ -40,78 +38,56 @@ void StorageSystemAsynchronousInserts::fillData(MutableColumns & res_columns, Co if (!insert_queue) return; - auto [queue, queue_lock] = insert_queue->getQueueLocked(); - for (const auto & [key, elem] : queue) + for (size_t shard_num = 0; shard_num < insert_queue->getPoolSize(); ++shard_num) { - std::lock_guard elem_lock(elem->mutex); + auto [queue, queue_lock] = insert_queue->getQueueLocked(shard_num); - if (!elem->data) - continue; - - auto time_in_microseconds = [](const time_point & timestamp) + for (const auto & [first_update, elem] : queue) { - auto time_diff = duration_cast(steady_clock::now() - timestamp); - auto time_us = (system_clock::now() - time_diff).time_since_epoch().count(); + const auto & [key, data] = elem; - DecimalUtils::DecimalComponents components{time_us / 1'000'000, time_us % 1'000'000}; - return DecimalField(DecimalUtils::decimalFromComponents(components, TIME_SCALE), TIME_SCALE); - }; - - const auto & insert_query = key.query->as(); - size_t i = 0; - - res_columns[i++]->insert(queryToString(insert_query)); - - /// If query is "INSERT INTO FUNCTION" then table_id is empty. - if (insert_query.table_id) - { - res_columns[i++]->insert(insert_query.table_id.getDatabaseName()); - res_columns[i++]->insert(insert_query.table_id.getTableName()); - } - else - { - res_columns[i++]->insertDefault(); - res_columns[i++]->insertDefault(); - } - - res_columns[i++]->insert(insert_query.format); - res_columns[i++]->insert(time_in_microseconds(elem->data->first_update)); - res_columns[i++]->insert(elem->data->size); - - Array arr_query_id; - Array arr_bytes; - Array arr_finished; - Array arr_exception; - - for (const auto & entry : elem->data->entries) - { - arr_query_id.push_back(entry->query_id); - arr_bytes.push_back(entry->bytes.size()); - arr_finished.push_back(entry->isFinished()); - - if (auto exception = entry->getException()) + auto time_in_microseconds = [](const time_point & timestamp) { - try - { - std::rethrow_exception(exception); - } - catch (const Exception & e) - { - arr_exception.push_back(e.displayText()); - } - catch (...) - { - arr_exception.push_back("Unknown exception"); - } + auto time_diff = duration_cast(steady_clock::now() - timestamp); + auto time_us = (system_clock::now() - time_diff).time_since_epoch().count(); + + DecimalUtils::DecimalComponents components{time_us / 1'000'000, time_us % 1'000'000}; + return DecimalField(DecimalUtils::decimalFromComponents(components, TIME_SCALE), TIME_SCALE); + }; + + const auto & insert_query = key.query->as(); + size_t i = 0; + + res_columns[i++]->insert(queryToString(insert_query)); + + /// If query is "INSERT INTO FUNCTION" then table_id is empty. + if (insert_query.table_id) + { + res_columns[i++]->insert(insert_query.table_id.getDatabaseName()); + res_columns[i++]->insert(insert_query.table_id.getTableName()); } else - arr_exception.push_back(""); - } + { + res_columns[i++]->insertDefault(); + res_columns[i++]->insertDefault(); + } - res_columns[i++]->insert(arr_query_id); - res_columns[i++]->insert(arr_bytes); - res_columns[i++]->insert(arr_finished); - res_columns[i++]->insert(arr_exception); + res_columns[i++]->insert(insert_query.format); + res_columns[i++]->insert(time_in_microseconds(first_update)); + res_columns[i++]->insert(data->size_in_bytes); + + Array arr_query_id; + Array arr_bytes; + + for (const auto & entry : data->entries) + { + arr_query_id.push_back(entry->query_id); + arr_bytes.push_back(entry->bytes.size()); + } + + res_columns[i++]->insert(arr_query_id); + res_columns[i++]->insert(arr_bytes); + } } } diff --git a/tests/ci/metrics_lambda/app.py b/tests/ci/ci_runners_metrics_lambda/app.py similarity index 93% rename from tests/ci/metrics_lambda/app.py rename to tests/ci/ci_runners_metrics_lambda/app.py index 4a1921bf312..54df910ee8c 100644 --- a/tests/ci/metrics_lambda/app.py +++ b/tests/ci/ci_runners_metrics_lambda/app.py @@ -11,6 +11,17 @@ import requests import boto3 from botocore.exceptions import ClientError +UNIVERSAL_LABEL = "universal" +RUNNER_TYPE_LABELS = [ + "builder", + "func-tester", + "func-tester-aarch64", + "fuzzer-unit-tester", + "stress-tester", + "style-checker", + "style-checker-aarch64", +] + def get_dead_runners_in_ec2(runners): ids = { @@ -170,26 +181,23 @@ def list_runners(access_token): def group_runners_by_tag(listed_runners): result = {} - RUNNER_TYPE_LABELS = [ - "builder", - "func-tester", - "func-tester-aarch64", - "fuzzer-unit-tester", - "stress-tester", - "style-checker", - "style-checker-aarch64", - ] + def add_to_result(tag, runner): + if tag not in result: + result[tag] = [] + result[tag].append(runner) + for runner in listed_runners: + if UNIVERSAL_LABEL in runner.tags: + # Do not proceed other labels if UNIVERSAL_LABEL is included + add_to_result(UNIVERSAL_LABEL, runner) + continue + for tag in runner.tags: if tag in RUNNER_TYPE_LABELS: - if tag not in result: - result[tag] = [] - result[tag].append(runner) + add_to_result(tag, runner) break else: - if "unlabeled" not in result: - result["unlabeled"] = [] - result["unlabeled"].append(runner) + add_to_result("unlabeled", runner) return result diff --git a/tests/ci/ci_runners_metrics_lambda/build_and_deploy_archive.sh b/tests/ci/ci_runners_metrics_lambda/build_and_deploy_archive.sh new file mode 120000 index 00000000000..96ba3fa024e --- /dev/null +++ b/tests/ci/ci_runners_metrics_lambda/build_and_deploy_archive.sh @@ -0,0 +1 @@ +../team_keys_lambda/build_and_deploy_archive.sh \ No newline at end of file diff --git a/tests/ci/metrics_lambda/requirements.txt b/tests/ci/ci_runners_metrics_lambda/requirements.txt similarity index 100% rename from tests/ci/metrics_lambda/requirements.txt rename to tests/ci/ci_runners_metrics_lambda/requirements.txt diff --git a/tests/ci/docker_images_check.py b/tests/ci/docker_images_check.py index fb7228628fd..873aee9aabf 100644 --- a/tests/ci/docker_images_check.py +++ b/tests/ci/docker_images_check.py @@ -441,11 +441,15 @@ def main(): result_images = {} images_processing_result = [] + additional_cache = "" + if pr_info.release_pr or pr_info.merged_pr: + additional_cache = str(pr_info.release_pr or pr_info.merged_pr) + for image in changed_images: # If we are in backport PR, then pr_info.release_pr is defined # We use it as tag to reduce rebuilding time images_processing_result += process_image_with_parents( - image, image_versions, pr_info.release_pr, args.push + image, image_versions, additional_cache, args.push ) result_images[image.repo] = result_version diff --git a/tests/ci/env_helper.py b/tests/ci/env_helper.py index a18f47497fd..ab0c3c6f688 100644 --- a/tests/ci/env_helper.py +++ b/tests/ci/env_helper.py @@ -42,11 +42,13 @@ def GITHUB_JOB_ID() -> str: if _GITHUB_JOB_ID: return _GITHUB_JOB_ID jobs = [] + page = 1 while not _GITHUB_JOB_ID: response = get_with_retries( f"https://api.github.com/repos/{GITHUB_REPOSITORY}/" - f"actions/runs/{GITHUB_RUN_ID}/jobs?per_page=100" + f"actions/runs/{GITHUB_RUN_ID}/jobs?per_page=100&page={page}" ) + page += 1 data = response.json() jobs.extend(data["jobs"]) for job in data["jobs"]: @@ -55,7 +57,10 @@ def GITHUB_JOB_ID() -> str: _GITHUB_JOB_ID = job["id"] _GITHUB_JOB_URL = job["html_url"] return _GITHUB_JOB_ID - if len(jobs) == data["total_count"]: + if ( + len(jobs) >= data["total_count"] # just in case of inconsistency + or len(data["jobs"]) == 0 # if we excided pages + ): _GITHUB_JOB_ID = "0" return _GITHUB_JOB_ID diff --git a/tests/ci/metrics_lambda/Dockerfile b/tests/ci/metrics_lambda/Dockerfile deleted file mode 100644 index 0d50224c51d..00000000000 --- a/tests/ci/metrics_lambda/Dockerfile +++ /dev/null @@ -1,13 +0,0 @@ -FROM public.ecr.aws/lambda/python:3.9 - -# Install the function's dependencies using file requirements.txt -# from your project folder. - -COPY requirements.txt . -RUN pip3 install -r requirements.txt --target "${LAMBDA_TASK_ROOT}" - -# Copy function code -COPY app.py ${LAMBDA_TASK_ROOT} - -# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile) -CMD [ "app.handler" ] diff --git a/tests/ci/pr_info.py b/tests/ci/pr_info.py index 5f725a61b3e..6a2fac0a291 100644 --- a/tests/ci/pr_info.py +++ b/tests/ci/pr_info.py @@ -64,6 +64,7 @@ def get_pr_for_commit(sha, ref): class PRInfo: default_event = { "commits": 1, + "head_commit": {"message": "commit_message"}, "before": "HEAD~", "after": "HEAD", "ref": None, @@ -86,7 +87,9 @@ class PRInfo: self.changed_files = set() # type: Set[str] self.body = "" self.diff_urls = [] + # release_pr and merged_pr are used for docker images additional cache self.release_pr = 0 + self.merged_pr = 0 ref = github_event.get("ref", "refs/heads/master") if ref and ref.startswith("refs/heads/"): ref = ref[11:] @@ -158,6 +161,14 @@ class PRInfo: self.diff_urls.append(github_event["pull_request"]["diff_url"]) elif "commits" in github_event: + # `head_commit` always comes with `commits` + commit_message = github_event["head_commit"]["message"] + if commit_message.startswith("Merge pull request #"): + merged_pr = commit_message.split(maxsplit=4)[3] + try: + self.merged_pr = int(merged_pr[1:]) + except ValueError: + logging.error("Failed to convert %s to integer", merged_pr) self.sha = github_event["after"] pull_request = get_pr_for_commit(self.sha, github_event["ref"]) repo_prefix = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}" diff --git a/tests/ci/team_keys_lambda/build_and_deploy_archive.sh b/tests/ci/team_keys_lambda/build_and_deploy_archive.sh index defa400453f..1ea2935c445 100644 --- a/tests/ci/team_keys_lambda/build_and_deploy_archive.sh +++ b/tests/ci/team_keys_lambda/build_and_deploy_archive.sh @@ -2,10 +2,13 @@ set -xeo pipefail WORKDIR=$(dirname "$0") +WORKDIR=$(readlink -f "${WORKDIR}") cd "$WORKDIR" -PY_EXEC=python3.9 -LAMBDA_NAME=$(basename "$PWD") +PY_VERSION=3.9 +PY_EXEC="python${PY_VERSION}" +DOCKER_IMAGE="python:${PY_VERSION}-slim" +LAMBDA_NAME=$(basename "$WORKDIR") LAMBDA_NAME=${LAMBDA_NAME//_/-} PACKAGE=lambda-package rm -rf "$PACKAGE" "$PACKAGE".zip @@ -14,10 +17,12 @@ cp app.py "$PACKAGE" if [ -f requirements.txt ]; then VENV=lambda-venv rm -rf "$VENV" lambda-package.zip - "$PY_EXEC" -m venv "$VENV" - # shellcheck disable=SC1091 - source "$VENV/bin/activate" - pip install -r requirements.txt + docker run --rm --user="${UID}" --volume="${WORKDIR}:/lambda" --workdir="/lambda" "${DOCKER_IMAGE}" \ + /bin/bash -c " + '$PY_EXEC' -m venv '$VENV' && + source '$VENV/bin/activate' && + pip install -r requirements.txt + " cp -rT "$VENV/lib/$PY_EXEC/site-packages/" "$PACKAGE" rm -r "$PACKAGE"/{pip,pip-*,setuptools,setuptools-*} fi diff --git a/tests/ci/worker/init_runner.sh b/tests/ci/worker/init_runner.sh index 66a38a6a37d..64f11b41777 100644 --- a/tests/ci/worker/init_runner.sh +++ b/tests/ci/worker/init_runner.sh @@ -46,15 +46,17 @@ curl "${TEAM_KEYS_URL}" > /home/ubuntu/.ssh/authorized_keys2 chown ubuntu: /home/ubuntu/.ssh -R -# Create a pre-run script that will restart docker daemon before the job started +# Create a pre-run script that will provide diagnostics info mkdir -p /tmp/actions-hooks -cat > /tmp/actions-hooks/pre-run.sh << 'EOF' +cat > /tmp/actions-hooks/pre-run.sh << EOF #!/bin/bash -set -xuo pipefail +set -uo pipefail echo "Runner's public DNS: $(ec2metadata --public-hostname)" +echo "Runner's labels: ${LABELS}" EOF +# Create a post-run script that will restart docker daemon before the job started cat > /tmp/actions-hooks/post-run.sh << 'EOF' #!/bin/bash set -xuo pipefail diff --git a/tests/performance/general_purpose_hashes.xml b/tests/performance/general_purpose_hashes.xml index f34554360cf..ba4e8f93859 100644 --- a/tests/performance/general_purpose_hashes.xml +++ b/tests/performance/general_purpose_hashes.xml @@ -15,6 +15,7 @@ hiveHash xxHash32 xxHash64 + xxh3 CRC32 diff --git a/tests/queries/0_stateless/01111_create_drop_replicated_db_stress.sh b/tests/queries/0_stateless/01111_create_drop_replicated_db_stress.sh index a95029de257..983cb515d8e 100755 --- a/tests/queries/0_stateless/01111_create_drop_replicated_db_stress.sh +++ b/tests/queries/0_stateless/01111_create_drop_replicated_db_stress.sh @@ -16,7 +16,7 @@ function create_db() # So CREATE TABLE queries will fail on all replicas except one. But it's still makes sense for a stress test. $CLICKHOUSE_CLIENT --allow_experimental_database_replicated=1 --query \ "create database if not exists ${CLICKHOUSE_DATABASE}_repl_$SUFFIX engine=Replicated('/test/01111/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX', '$SHARD', '$REPLICA')" \ - 2>&1| grep -Fa "Exception: " | grep -Fv "REPLICA_IS_ALREADY_EXIST" | grep -Fiv "Will not try to start it up" | \ + 2>&1| grep -Fa "Exception: " | grep -Fv "REPLICA_ALREADY_EXISTS" | grep -Fiv "Will not try to start it up" | \ grep -Fv "Coordination::Exception" | grep -Fv "already contains some data and it does not look like Replicated database path" sleep 0.$RANDOM done diff --git a/tests/queries/0_stateless/02117_show_create_table_system.reference b/tests/queries/0_stateless/02117_show_create_table_system.reference index ce881422f63..c206a41a03e 100644 --- a/tests/queries/0_stateless/02117_show_create_table_system.reference +++ b/tests/queries/0_stateless/02117_show_create_table_system.reference @@ -14,9 +14,7 @@ CREATE TABLE system.asynchronous_inserts `first_update` DateTime64(6), `total_bytes` UInt64, `entries.query_id` Array(String), - `entries.bytes` Array(UInt64), - `entries.finished` Array(UInt8), - `entries.exception` Array(String) + `entries.bytes` Array(UInt64) ) ENGINE = SystemAsynchronousInserts COMMENT 'SYSTEM TABLE is built on the fly.' diff --git a/tests/queries/0_stateless/02156_async_insert_query_log.reference b/tests/queries/0_stateless/02156_async_insert_query_log.reference index 404dbfe753d..f4fd93b21b4 100644 --- a/tests/queries/0_stateless/02156_async_insert_query_log.reference +++ b/tests/queries/0_stateless/02156_async_insert_query_log.reference @@ -1,4 +1,4 @@ 1 a 2 b -INSERT INTO async_inserts_2156 VALUES 1 Insert 1 0 -INSERT INTO async_inserts_2156 VALUES 1 Insert 1 +INSERT INTO async_inserts_2156 VALUES 1 Insert 1 +INSERT INTO async_inserts_2156 VALUES 1 Insert 1 diff --git a/tests/queries/0_stateless/02156_async_insert_query_log.sh b/tests/queries/0_stateless/02156_async_insert_query_log.sh index d7177fbe70c..a0a2db312ad 100755 --- a/tests/queries/0_stateless/02156_async_insert_query_log.sh +++ b/tests/queries/0_stateless/02156_async_insert_query_log.sh @@ -7,7 +7,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) ${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts_2156" ${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts_2156 (id UInt32, s String) ENGINE = Memory" -${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=0" -d "INSERT INTO async_inserts_2156 VALUES (1, 'a')" +${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1" -d "INSERT INTO async_inserts_2156 VALUES (1, 'a')" ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1" -d "INSERT INTO async_inserts_2156 VALUES (2, 'b')" ${CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts_2156 ORDER BY id" @@ -15,7 +15,7 @@ ${CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts_2156 ORDER BY id" ${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS" ${CLICKHOUSE_CLIENT} -q "SELECT query, arrayExists(x -> x LIKE '%async_inserts_2156', tables), \ - query_kind, Settings['async_insert'], Settings['wait_for_async_insert'] FROM system.query_log \ + query_kind, Settings['async_insert'] FROM system.query_log \ WHERE event_date >= yesterday() AND current_database = '$CLICKHOUSE_DATABASE' \ AND query ILIKE 'INSERT INTO async_inserts_2156 VALUES%' AND type = 'QueryFinish' \ ORDER BY query_start_time_microseconds" diff --git a/tests/queries/0_stateless/02475_precise_decimal_arithmetics.reference b/tests/queries/0_stateless/02475_precise_decimal_arithmetics.reference new file mode 100644 index 00000000000..6ffc8602640 --- /dev/null +++ b/tests/queries/0_stateless/02475_precise_decimal_arithmetics.reference @@ -0,0 +1,23 @@ +0 +0 +0 +9999999999999999550522436926092261716351992671467843175339166479588690755584 +9999999999999999451597035424131548206707486713696660676795842648250000000000 +11.126038 +10.8 +-11.126038 +-10.8 +10.8 +1376.638914 +1403.6 +-1376.638914 +-1403.6 +1403.6 +332833500 +999 +1000 +1000 +1000 +0.1 +0.1 +0.1 diff --git a/tests/queries/0_stateless/02475_precise_decimal_arithmetics.sql b/tests/queries/0_stateless/02475_precise_decimal_arithmetics.sql new file mode 100644 index 00000000000..3bd7906c7d8 --- /dev/null +++ b/tests/queries/0_stateless/02475_precise_decimal_arithmetics.sql @@ -0,0 +1,45 @@ +-- Tags: no-fasttest + +-- check cases when one of operands is zero +SELECT divideDecimal(toDecimal32(0, 2), toDecimal128(11.123456, 6)); +SELECT divideDecimal(toDecimal64(123.123, 3), toDecimal64(0, 1)); -- { serverError 153 } +SELECT multiplyDecimal(toDecimal32(0, 2), toDecimal128(11.123456, 6)); +SELECT multiplyDecimal(toDecimal32(123.123, 3), toDecimal128(0, 1)); + +-- don't look at strange query result -- it happens due to bad float precision: toUInt256(1e38) == 99999999999999997752612184630461283328 +SELECT multiplyDecimal(toDecimal256(1e38, 0), toDecimal256(1e38, 0)); +SELECT divideDecimal(toDecimal256(1e66, 0), toDecimal256(1e-10, 10), 0); + +-- fits Decimal256, but scale is too big to fit +SELECT multiplyDecimal(toDecimal256(1e38, 0), toDecimal256(1e38, 0), 2); -- { serverError 407 } +SELECT divideDecimal(toDecimal256(1e72, 0), toDecimal256(1e-5, 5), 2); -- { serverError 407 } + +-- does not fit Decimal256 +SELECT multiplyDecimal(toDecimal256('1e38', 0), toDecimal256('1e38', 0)); -- { serverError 407 } +SELECT multiplyDecimal(toDecimal256(1e39, 0), toDecimal256(1e39, 0), 0); -- { serverError 407 } +SELECT divideDecimal(toDecimal256(1e39, 0), toDecimal256(1e-38, 39)); -- { serverError 407 } + +-- test different signs +SELECT divideDecimal(toDecimal128(123.76, 2), toDecimal128(11.123456, 6)); +SELECT divideDecimal(toDecimal32(123.123, 3), toDecimal128(11.4, 1), 2); +SELECT divideDecimal(toDecimal128(-123.76, 2), toDecimal128(11.123456, 6)); +SELECT divideDecimal(toDecimal32(123.123, 3), toDecimal128(-11.4, 1), 2); +SELECT divideDecimal(toDecimal32(-123.123, 3), toDecimal128(-11.4, 1), 2); + +SELECT multiplyDecimal(toDecimal64(123.76, 2), toDecimal128(11.123456, 6)); +SELECT multiplyDecimal(toDecimal32(123.123, 3), toDecimal128(11.4, 1), 2); +SELECT multiplyDecimal(toDecimal64(-123.76, 2), toDecimal128(11.123456, 6)); +SELECT multiplyDecimal(toDecimal32(123.123, 3), toDecimal128(-11.4, 1), 2); +SELECT multiplyDecimal(toDecimal32(-123.123, 3), toDecimal128(-11.4, 1), 2); + +-- check against non-const columns +SELECT sum(multiplyDecimal(toDecimal64(number, 1), toDecimal64(number, 5))) FROM numbers(1000); +SELECT sum(divideDecimal(toDecimal64(number, 1), toDecimal64(number, 5))) FROM (select * from numbers(1000) OFFSET 1); + +-- check against Nullable type +SELECT multiplyDecimal(toNullable(toDecimal64(10, 1)), toDecimal64(100, 5)); +SELECT multiplyDecimal(toDecimal64(10, 1), toNullable(toDecimal64(100, 5))); +SELECT multiplyDecimal(toNullable(toDecimal64(10, 1)), toNullable(toDecimal64(100, 5))); +SELECT divideDecimal(toNullable(toDecimal64(10, 1)), toDecimal64(100, 5)); +SELECT divideDecimal(toDecimal64(10, 1), toNullable(toDecimal64(100, 5))); +SELECT divideDecimal(toNullable(toDecimal64(10, 1)), toNullable(toDecimal64(100, 5))); diff --git a/tests/queries/0_stateless/02481_async_insert_race_long.reference b/tests/queries/0_stateless/02481_async_insert_race_long.reference new file mode 100644 index 00000000000..d86bac9de59 --- /dev/null +++ b/tests/queries/0_stateless/02481_async_insert_race_long.reference @@ -0,0 +1 @@ +OK diff --git a/tests/queries/0_stateless/02481_async_insert_race_long.sh b/tests/queries/0_stateless/02481_async_insert_race_long.sh new file mode 100755 index 00000000000..cec9278c127 --- /dev/null +++ b/tests/queries/0_stateless/02481_async_insert_race_long.sh @@ -0,0 +1,63 @@ +#!/usr/bin/env bash +# Tags: no-random-settings, no-fasttest, long + +set -e + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +export MY_CLICKHOUSE_CLIENT="$CLICKHOUSE_CLIENT --async_insert_busy_timeout_ms 10 --async_insert_max_data_size 1 --async_insert 1" + +function insert1() +{ + while true; do + ${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 0 -q 'INSERT INTO async_inserts_race FORMAT CSV 1,"a"' + done +} + +function insert2() +{ + while true; do + ${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 0 -q 'INSERT INTO async_inserts_race FORMAT JSONEachRow {"id": 5, "s": "e"} {"id": 6, "s": "f"}' + done +} + +function insert3() +{ + while true; do + ${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 1 -q "INSERT INTO async_inserts_race VALUES (7, 'g') (8, 'h')" & + sleep 0.05 + done +} + +function select1() +{ + while true; do + ${MY_CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts_race FORMAT Null" + done + +} + +${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts_race" +${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts_race (id UInt32, s String) ENGINE = MergeTree ORDER BY id" + +TIMEOUT=10 + +export -f insert1 +export -f insert2 +export -f insert3 +export -f select1 + +for _ in {1..3}; do + timeout $TIMEOUT bash -c insert1 & + timeout $TIMEOUT bash -c insert2 & + timeout $TIMEOUT bash -c insert3 & +done + +timeout $TIMEOUT bash -c select1 & + +wait +echo "OK" + +${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts_race"; diff --git a/tests/queries/0_stateless/02481_parquet_int_list_multiple_chunks.reference b/tests/queries/0_stateless/02481_parquet_int_list_multiple_chunks.reference new file mode 100644 index 00000000000..285856e363a --- /dev/null +++ b/tests/queries/0_stateless/02481_parquet_int_list_multiple_chunks.reference @@ -0,0 +1,3 @@ +Parquet +3d94071a2fe62a3b3285f170ca6f42e5 - +70000 diff --git a/tests/queries/0_stateless/02481_parquet_int_list_multiple_chunks.sh b/tests/queries/0_stateless/02481_parquet_int_list_multiple_chunks.sh new file mode 100755 index 00000000000..c2c6f689851 --- /dev/null +++ b/tests/queries/0_stateless/02481_parquet_int_list_multiple_chunks.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +# Tags: no-ubsan, no-fasttest + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +echo "Parquet" + +# File generated with the below script + +#import pyarrow as pa +#import pyarrow.parquet as pq +#import random +# +# +#def gen_array(offset): +# array = [] +# array_length = random.randint(0, 9) +# for i in range(array_length): +# array.append(i + offset) +# +# return array +# +# +#def gen_arrays(number_of_arrays): +# list_of_arrays = [] +# for i in range(number_of_arrays): +# list_of_arrays.append(gen_array(i)) +# return list_of_arrays +# +#arr = pa.array(gen_arrays(70000)) +#table = pa.table([arr], ["arr"]) +#pq.write_table(table, "int-list-zero-based-chunked-array.parquet") + +DATA_FILE=$CUR_DIR/data_parquet/int-list-zero-based-chunked-array.parquet +${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS parquet_load" +${CLICKHOUSE_CLIENT} --query="CREATE TABLE parquet_load (arr Array(Int64)) ENGINE = Memory" +cat "$DATA_FILE" | ${CLICKHOUSE_CLIENT} -q "INSERT INTO parquet_load FORMAT Parquet" +${CLICKHOUSE_CLIENT} --query="SELECT * FROM parquet_load" | md5sum +${CLICKHOUSE_CLIENT} --query="SELECT count() FROM parquet_load" +${CLICKHOUSE_CLIENT} --query="drop table parquet_load" \ No newline at end of file diff --git a/tests/queries/0_stateless/02481_parquet_list_monotonically_increasing_offsets.reference b/tests/queries/0_stateless/02481_parquet_list_monotonically_increasing_offsets.reference new file mode 100644 index 00000000000..2db066c0f87 --- /dev/null +++ b/tests/queries/0_stateless/02481_parquet_list_monotonically_increasing_offsets.reference @@ -0,0 +1,3 @@ +Parquet +e1cfe4265689ead763b18489b363344d - +39352 diff --git a/tests/queries/0_stateless/02481_parquet_list_monotonically_increasing_offsets.sh b/tests/queries/0_stateless/02481_parquet_list_monotonically_increasing_offsets.sh new file mode 100755 index 00000000000..47245eeb940 --- /dev/null +++ b/tests/queries/0_stateless/02481_parquet_list_monotonically_increasing_offsets.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +# Tags: no-ubsan, no-fasttest + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +echo "Parquet" + +DATA_FILE=$CUR_DIR/data_parquet/list_monotonically_increasing_offsets.parquet +${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS parquet_load" +${CLICKHOUSE_CLIENT} --query="CREATE TABLE parquet_load (list Array(Int64), json Nullable(String)) ENGINE = Memory" +cat "$DATA_FILE" | ${CLICKHOUSE_CLIENT} -q "INSERT INTO parquet_load FORMAT Parquet" +${CLICKHOUSE_CLIENT} --query="SELECT * FROM parquet_load" | md5sum +${CLICKHOUSE_CLIENT} --query="SELECT count() FROM parquet_load" +${CLICKHOUSE_CLIENT} --query="drop table parquet_load" \ No newline at end of file diff --git a/tests/queries/0_stateless/02481_pk_analysis_with_enum_to_string.reference b/tests/queries/0_stateless/02481_pk_analysis_with_enum_to_string.reference new file mode 100644 index 00000000000..b6a7d89c68e --- /dev/null +++ b/tests/queries/0_stateless/02481_pk_analysis_with_enum_to_string.reference @@ -0,0 +1 @@ +16 diff --git a/tests/queries/0_stateless/02481_pk_analysis_with_enum_to_string.sql b/tests/queries/0_stateless/02481_pk_analysis_with_enum_to_string.sql new file mode 100644 index 00000000000..91402bbed60 --- /dev/null +++ b/tests/queries/0_stateless/02481_pk_analysis_with_enum_to_string.sql @@ -0,0 +1,23 @@ +CREATE TABLE gen +( + repo_name String, + event_type Enum8('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22), + actor_login String, + created_at DateTime, + action Enum8('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20), + number UInt32, + merged_at DateTime +) +ENGINE = GenerateRandom; + +CREATE TABLE github_events AS gen ENGINE=MergeTree ORDER BY (event_type, repo_name, created_at); + +INSERT INTO github_events SELECT * FROM gen LIMIT 100000; + +INSERT INTO github_events VALUES ('apache/pulsar','PullRequestEvent','hangc0276','2021-01-22 06:58:03','opened',9276,'1970-01-01 00:00:00') ('apache/pulsar','PullRequestEvent','hangc0276','2021-01-25 02:38:07','closed',9276,'1970-01-01 00:00:00') ('apache/pulsar','PullRequestEvent','hangc0276','2021-01-25 02:38:09','reopened',9276,'1970-01-01 00:00:00') ('apache/pulsar','PullRequestEvent','hangc0276','2021-04-22 06:05:09','closed',9276,'2021-04-22 06:05:08') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-01-23 00:32:09','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-01-23 02:52:11','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-01-24 03:02:31','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-01-25 02:16:42','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-01-26 06:52:42','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-01-27 01:10:33','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-01-29 02:11:41','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-02-02 07:35:40','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-02-03 00:44:26','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','IssueCommentEvent','hangc0276','2021-02-03 02:14:26','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','PullRequestReviewEvent','codelipenghui','2021-03-29 14:31:25','created',9276,'1970-01-01 00:00:00') ('apache/pulsar','PullRequestReviewEvent','eolivelli','2021-03-29 16:34:02','created',9276,'1970-01-01 00:00:00'); + +OPTIMIZE TABLE github_events FINAL; + +SELECT count() +FROM github_events +WHERE (repo_name = 'apache/pulsar') AND (toString(event_type) IN ('PullRequestEvent', 'PullRequestReviewCommentEvent', 'PullRequestReviewEvent', 'IssueCommentEvent')) AND (actor_login NOT IN ('github-actions[bot]', 'codecov-commenter')) AND (number = 9276); diff --git a/tests/queries/0_stateless/02481_xxh3_hash_function.reference b/tests/queries/0_stateless/02481_xxh3_hash_function.reference new file mode 100644 index 00000000000..73276fe135e --- /dev/null +++ b/tests/queries/0_stateless/02481_xxh3_hash_function.reference @@ -0,0 +1 @@ +18009318874338624809 diff --git a/tests/queries/0_stateless/02481_xxh3_hash_function.sql b/tests/queries/0_stateless/02481_xxh3_hash_function.sql new file mode 100644 index 00000000000..cd87f08a68e --- /dev/null +++ b/tests/queries/0_stateless/02481_xxh3_hash_function.sql @@ -0,0 +1 @@ +SELECT xxh3('ClickHouse'); diff --git a/tests/queries/0_stateless/02493_analyzer_sum_if_to_count_if.reference b/tests/queries/0_stateless/02493_analyzer_sum_if_to_count_if.reference new file mode 100644 index 00000000000..eccf51501ed --- /dev/null +++ b/tests/queries/0_stateless/02493_analyzer_sum_if_to_count_if.reference @@ -0,0 +1,77 @@ +QUERY id: 0 + PROJECTION COLUMNS + sumIf(1, equals(modulo(number, 2), 0)) UInt64 + PROJECTION + LIST id: 1, nodes: 1 + FUNCTION id: 2, function_name: countIf, function_type: aggregate, result_type: UInt64 + ARGUMENTS + LIST id: 3, nodes: 1 + FUNCTION id: 4, function_name: equals, function_type: ordinary, result_type: UInt8 + ARGUMENTS + LIST id: 5, nodes: 2 + FUNCTION id: 6, function_name: modulo, function_type: ordinary, result_type: UInt8 + ARGUMENTS + LIST id: 7, nodes: 2 + COLUMN id: 8, column_name: number, result_type: UInt64, source_id: 9 + CONSTANT id: 10, constant_value: UInt64_2, constant_value_type: UInt8 + CONSTANT id: 11, constant_value: UInt64_0, constant_value_type: UInt8 + JOIN TREE + TABLE_FUNCTION id: 9, table_function_name: numbers + ARGUMENTS + LIST id: 12, nodes: 1 + CONSTANT id: 13, constant_value: UInt64_10, constant_value_type: UInt8 +-- +5 +-- +QUERY id: 0 + PROJECTION COLUMNS + sum(if(equals(modulo(number, 2), 0), 1, 0)) UInt64 + PROJECTION + LIST id: 1, nodes: 1 + FUNCTION id: 2, function_name: countIf, function_type: aggregate, result_type: UInt64 + ARGUMENTS + LIST id: 3, nodes: 1 + FUNCTION id: 4, function_name: equals, function_type: ordinary, result_type: UInt8 + ARGUMENTS + LIST id: 5, nodes: 2 + FUNCTION id: 6, function_name: modulo, function_type: ordinary, result_type: UInt8 + ARGUMENTS + LIST id: 7, nodes: 2 + COLUMN id: 8, column_name: number, result_type: UInt64, source_id: 9 + CONSTANT id: 10, constant_value: UInt64_2, constant_value_type: UInt8 + CONSTANT id: 11, constant_value: UInt64_0, constant_value_type: UInt8 + JOIN TREE + TABLE_FUNCTION id: 9, table_function_name: numbers + ARGUMENTS + LIST id: 12, nodes: 1 + CONSTANT id: 13, constant_value: UInt64_10, constant_value_type: UInt8 +-- +5 +-- +QUERY id: 0 + PROJECTION COLUMNS + sum(if(equals(modulo(number, 2), 0), 0, 1)) UInt64 + PROJECTION + LIST id: 1, nodes: 1 + FUNCTION id: 2, function_name: countIf, function_type: aggregate, result_type: UInt64 + ARGUMENTS + LIST id: 3, nodes: 1 + FUNCTION id: 4, function_name: not, function_type: ordinary, result_type: UInt8 + ARGUMENTS + LIST id: 5, nodes: 1 + FUNCTION id: 6, function_name: equals, function_type: ordinary, result_type: UInt8 + ARGUMENTS + LIST id: 7, nodes: 2 + FUNCTION id: 8, function_name: modulo, function_type: ordinary, result_type: UInt8 + ARGUMENTS + LIST id: 9, nodes: 2 + COLUMN id: 10, column_name: number, result_type: UInt64, source_id: 11 + CONSTANT id: 12, constant_value: UInt64_2, constant_value_type: UInt8 + CONSTANT id: 13, constant_value: UInt64_0, constant_value_type: UInt8 + JOIN TREE + TABLE_FUNCTION id: 11, table_function_name: numbers + ARGUMENTS + LIST id: 14, nodes: 1 + CONSTANT id: 15, constant_value: UInt64_10, constant_value_type: UInt8 +-- +5 diff --git a/tests/queries/0_stateless/02493_analyzer_sum_if_to_count_if.sql b/tests/queries/0_stateless/02493_analyzer_sum_if_to_count_if.sql new file mode 100644 index 00000000000..f1dbfa1f32a --- /dev/null +++ b/tests/queries/0_stateless/02493_analyzer_sum_if_to_count_if.sql @@ -0,0 +1,24 @@ +SET allow_experimental_analyzer = 1; +SET optimize_rewrite_sum_if_to_count_if = 1; + +EXPLAIN QUERY TREE (SELECT sumIf(1, (number % 2) == 0) FROM numbers(10)); + +SELECT '--'; + +SELECT sumIf(1, (number % 2) == 0) FROM numbers(10); + +SELECT '--'; + +EXPLAIN QUERY TREE (SELECT sum(if((number % 2) == 0, 1, 0)) FROM numbers(10)); + +SELECT '--'; + +SELECT sum(if((number % 2) == 0, 1, 0)) FROM numbers(10); + +SELECT '--'; + +EXPLAIN QUERY TREE (SELECT sum(if((number % 2) == 0, 0, 1)) FROM numbers(10)); + +SELECT '--'; + +SELECT sum(if((number % 2) == 0, 0, 1)) FROM numbers(10); diff --git a/tests/queries/0_stateless/data_parquet/int-list-zero-based-chunked-array.parquet b/tests/queries/0_stateless/data_parquet/int-list-zero-based-chunked-array.parquet new file mode 100644 index 00000000000..2eb3ba3ab15 Binary files /dev/null and b/tests/queries/0_stateless/data_parquet/int-list-zero-based-chunked-array.parquet differ diff --git a/tests/queries/0_stateless/data_parquet/list_monotonically_increasing_offsets.parquet b/tests/queries/0_stateless/data_parquet/list_monotonically_increasing_offsets.parquet new file mode 100644 index 00000000000..1c23e27db65 Binary files /dev/null and b/tests/queries/0_stateless/data_parquet/list_monotonically_increasing_offsets.parquet differ