Merge branch 'master' into native-types-conversions

This commit is contained in:
Kruglov Pavel 2023-03-09 13:03:25 +01:00 committed by GitHub
commit fe973f3d6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
218 changed files with 4335 additions and 759 deletions

View File

@ -452,7 +452,8 @@ jobs:
- name: Check docker clickhouse/clickhouse-server building
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 docker_server.py --release-type head --no-push
python3 docker_server.py --release-type head --no-push \
--image-repo clickhouse/clickhouse-server --image-path docker/server
python3 docker_server.py --release-type head --no-push --no-ubuntu \
--image-repo clickhouse/clickhouse-keeper --image-path docker/keeper
- name: Cleanup

View File

@ -35,7 +35,6 @@ jobs:
fetch-depth: 0
- name: Cherry pick
run: |
sudo pip install GitPython
cd "$GITHUB_WORKSPACE/tests/ci"
python3 cherry_pick.py
- name: Cleanup

View File

@ -860,7 +860,8 @@ jobs:
- name: Check docker clickhouse/clickhouse-server building
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 docker_server.py --release-type head
python3 docker_server.py --release-type head \
--image-repo clickhouse/clickhouse-server --image-path docker/server
python3 docker_server.py --release-type head --no-ubuntu \
--image-repo clickhouse/clickhouse-keeper --image-path docker/keeper
- name: Cleanup

View File

@ -37,7 +37,6 @@ jobs:
cd "$GITHUB_WORKSPACE/tests/ci"
python3 run_check.py
PythonUnitTests:
needs: CheckLabels
runs-on: [self-hosted, style-checker]
steps:
- name: Check out repository code
@ -917,7 +916,8 @@ jobs:
- name: Check docker clickhouse/clickhouse-server building
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 docker_server.py --release-type head --no-push
python3 docker_server.py --release-type head --no-push \
--image-repo clickhouse/clickhouse-server --image-path docker/server
python3 docker_server.py --release-type head --no-push --no-ubuntu \
--image-repo clickhouse/clickhouse-keeper --image-path docker/keeper
- name: Cleanup

View File

@ -7,15 +7,28 @@ on: # yamllint disable-line rule:truthy
release:
types:
- published
workflow_dispatch:
inputs:
tag:
description: 'Release tag'
required: true
type: string
jobs:
ReleasePublish:
runs-on: [self-hosted, style-checker]
steps:
- name: Set tag from input
if: github.event_name == 'workflow_dispatch'
run: |
echo "GITHUB_TAG=${{ github.event.inputs.tag }}" >> "$GITHUB_ENV"
- name: Set tag from REF
if: github.event_name == 'release'
run: |
echo "GITHUB_TAG=${GITHUB_REF#refs/tags/}" >> "$GITHUB_ENV"
- name: Deploy packages and assets
run: |
GITHUB_TAG="${GITHUB_REF#refs/tags/}"
curl --silent --data '' \
curl --silent --data '' --no-buffer \
'${{ secrets.PACKAGES_RELEASE_URL }}/release/'"${GITHUB_TAG}"'?binary=binary_darwin&binary=binary_darwin_aarch64&sync=true'
############################################################################################
##################################### Docker images #######################################
@ -23,16 +36,26 @@ jobs:
DockerServerImages:
runs-on: [self-hosted, style-checker]
steps:
- name: Set tag from input
if: github.event_name == 'workflow_dispatch'
run: |
echo "GITHUB_TAG=${{ github.event.inputs.tag }}" >> "$GITHUB_ENV"
- name: Set tag from REF
if: github.event_name == 'release'
run: |
echo "GITHUB_TAG=${GITHUB_REF#refs/tags/}" >> "$GITHUB_ENV"
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
fetch-depth: 0 # otherwise we will have no version info
ref: ${{ env.GITHUB_TAG }}
- name: Check docker clickhouse/clickhouse-server building
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 docker_server.py --release-type auto --version "${{ github.ref }}"
python3 docker_server.py --release-type auto --version "${{ github.ref }}" --no-ubuntu \
python3 docker_server.py --release-type auto --version "$GITHUB_TAG" \
--image-repo clickhouse/clickhouse-server --image-path docker/server
python3 docker_server.py --release-type auto --version "$GITHUB_TAG" --no-ubuntu \
--image-repo clickhouse/clickhouse-keeper --image-path docker/keeper
- name: Cleanup
if: always()

View File

@ -525,7 +525,8 @@ jobs:
- name: Check docker clickhouse/clickhouse-server building
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 docker_server.py --release-type head --no-push
python3 docker_server.py --release-type head --no-push \
--image-repo clickhouse/clickhouse-server --image-path docker/server
python3 docker_server.py --release-type head --no-push --no-ubuntu \
--image-repo clickhouse/clickhouse-keeper --image-path docker/keeper
- name: Cleanup

214
base/base/hex.h Normal file
View File

@ -0,0 +1,214 @@
#pragma once
#include <cstring>
#include "types.h"
/// Maps 0..15 to 0..9A..F or 0..9a..f correspondingly.
constexpr inline std::string_view hex_digit_to_char_uppercase_table = "0123456789ABCDEF";
constexpr inline std::string_view hex_digit_to_char_lowercase_table = "0123456789abcdef";
constexpr char hexDigitUppercase(unsigned char c)
{
return hex_digit_to_char_uppercase_table[c];
}
constexpr char hexDigitLowercase(unsigned char c)
{
return hex_digit_to_char_lowercase_table[c];
}
/// Maps 0..255 to 00..FF or 00..ff correspondingly
constexpr inline std::string_view hex_byte_to_char_uppercase_table = //
"000102030405060708090A0B0C0D0E0F"
"101112131415161718191A1B1C1D1E1F"
"202122232425262728292A2B2C2D2E2F"
"303132333435363738393A3B3C3D3E3F"
"404142434445464748494A4B4C4D4E4F"
"505152535455565758595A5B5C5D5E5F"
"606162636465666768696A6B6C6D6E6F"
"707172737475767778797A7B7C7D7E7F"
"808182838485868788898A8B8C8D8E8F"
"909192939495969798999A9B9C9D9E9F"
"A0A1A2A3A4A5A6A7A8A9AAABACADAEAF"
"B0B1B2B3B4B5B6B7B8B9BABBBCBDBEBF"
"C0C1C2C3C4C5C6C7C8C9CACBCCCDCECF"
"D0D1D2D3D4D5D6D7D8D9DADBDCDDDEDF"
"E0E1E2E3E4E5E6E7E8E9EAEBECEDEEEF"
"F0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF";
constexpr inline std::string_view hex_byte_to_char_lowercase_table = //
"000102030405060708090a0b0c0d0e0f"
"101112131415161718191a1b1c1d1e1f"
"202122232425262728292a2b2c2d2e2f"
"303132333435363738393a3b3c3d3e3f"
"404142434445464748494a4b4c4d4e4f"
"505152535455565758595a5b5c5d5e5f"
"606162636465666768696a6b6c6d6e6f"
"707172737475767778797a7b7c7d7e7f"
"808182838485868788898a8b8c8d8e8f"
"909192939495969798999a9b9c9d9e9f"
"a0a1a2a3a4a5a6a7a8a9aaabacadaeaf"
"b0b1b2b3b4b5b6b7b8b9babbbcbdbebf"
"c0c1c2c3c4c5c6c7c8c9cacbcccdcecf"
"d0d1d2d3d4d5d6d7d8d9dadbdcdddedf"
"e0e1e2e3e4e5e6e7e8e9eaebecedeeef"
"f0f1f2f3f4f5f6f7f8f9fafbfcfdfeff";
inline void writeHexByteUppercase(UInt8 byte, void * out)
{
memcpy(out, &hex_byte_to_char_uppercase_table[static_cast<size_t>(byte) * 2], 2);
}
inline void writeHexByteLowercase(UInt8 byte, void * out)
{
memcpy(out, &hex_byte_to_char_lowercase_table[static_cast<size_t>(byte) * 2], 2);
}
constexpr inline std::string_view bin_byte_to_char_table = //
"0000000000000001000000100000001100000100000001010000011000000111"
"0000100000001001000010100000101100001100000011010000111000001111"
"0001000000010001000100100001001100010100000101010001011000010111"
"0001100000011001000110100001101100011100000111010001111000011111"
"0010000000100001001000100010001100100100001001010010011000100111"
"0010100000101001001010100010101100101100001011010010111000101111"
"0011000000110001001100100011001100110100001101010011011000110111"
"0011100000111001001110100011101100111100001111010011111000111111"
"0100000001000001010000100100001101000100010001010100011001000111"
"0100100001001001010010100100101101001100010011010100111001001111"
"0101000001010001010100100101001101010100010101010101011001010111"
"0101100001011001010110100101101101011100010111010101111001011111"
"0110000001100001011000100110001101100100011001010110011001100111"
"0110100001101001011010100110101101101100011011010110111001101111"
"0111000001110001011100100111001101110100011101010111011001110111"
"0111100001111001011110100111101101111100011111010111111001111111"
"1000000010000001100000101000001110000100100001011000011010000111"
"1000100010001001100010101000101110001100100011011000111010001111"
"1001000010010001100100101001001110010100100101011001011010010111"
"1001100010011001100110101001101110011100100111011001111010011111"
"1010000010100001101000101010001110100100101001011010011010100111"
"1010100010101001101010101010101110101100101011011010111010101111"
"1011000010110001101100101011001110110100101101011011011010110111"
"1011100010111001101110101011101110111100101111011011111010111111"
"1100000011000001110000101100001111000100110001011100011011000111"
"1100100011001001110010101100101111001100110011011100111011001111"
"1101000011010001110100101101001111010100110101011101011011010111"
"1101100011011001110110101101101111011100110111011101111011011111"
"1110000011100001111000101110001111100100111001011110011011100111"
"1110100011101001111010101110101111101100111011011110111011101111"
"1111000011110001111100101111001111110100111101011111011011110111"
"1111100011111001111110101111101111111100111111011111111011111111";
inline void writeBinByte(UInt8 byte, void * out)
{
memcpy(out, &bin_byte_to_char_table[static_cast<size_t>(byte) * 8], 8);
}
/// Produces hex representation of an unsigned int with leading zeros (for checksums)
template <typename TUInt>
inline void writeHexUIntImpl(TUInt uint_, char * out, std::string_view table)
{
union
{
TUInt value;
UInt8 uint8[sizeof(TUInt)];
};
value = uint_;
for (size_t i = 0; i < sizeof(TUInt); ++i)
{
if constexpr (std::endian::native == std::endian::little)
memcpy(out + i * 2, &table[static_cast<size_t>(uint8[sizeof(TUInt) - 1 - i]) * 2], 2);
else
memcpy(out + i * 2, &table[static_cast<size_t>(uint8[i]) * 2], 2);
}
}
template <typename TUInt>
inline void writeHexUIntUppercase(TUInt uint_, char * out)
{
writeHexUIntImpl(uint_, out, hex_byte_to_char_uppercase_table);
}
template <typename TUInt>
inline void writeHexUIntLowercase(TUInt uint_, char * out)
{
writeHexUIntImpl(uint_, out, hex_byte_to_char_lowercase_table);
}
template <typename TUInt>
std::string getHexUIntUppercase(TUInt uint_)
{
std::string res(sizeof(TUInt) * 2, '\0');
writeHexUIntUppercase(uint_, res.data());
return res;
}
template <typename TUInt>
std::string getHexUIntLowercase(TUInt uint_)
{
std::string res(sizeof(TUInt) * 2, '\0');
writeHexUIntLowercase(uint_, res.data());
return res;
}
/// Maps 0..9, A..F, a..f to 0..15. Other chars are mapped to implementation specific value.
constexpr inline std::string_view hex_char_to_digit_table
= {"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\xff\xff\xff\xff\xff\xff" //0-9
"\xff\x0a\x0b\x0c\x0d\x0e\x0f\xff\xff\xff\xff\xff\xff\xff\xff\xff" //A-Z
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\x0a\x0b\x0c\x0d\x0e\x0f\xff\xff\xff\xff\xff\xff\xff\xff\xff" //a-z
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff",
256};
constexpr UInt8 unhex(char c)
{
return hex_char_to_digit_table[static_cast<UInt8>(c)];
}
constexpr UInt8 unhex2(const char * data)
{
return static_cast<UInt8>(unhex(data[0])) * 0x10 + static_cast<UInt8>(unhex(data[1]));
}
constexpr UInt16 unhex4(const char * data)
{
return static_cast<UInt16>(unhex(data[0])) * 0x1000 + static_cast<UInt16>(unhex(data[1])) * 0x100
+ static_cast<UInt16>(unhex(data[2])) * 0x10 + static_cast<UInt16>(unhex(data[3]));
}
template <typename TUInt>
constexpr TUInt unhexUInt(const char * data)
{
TUInt res = 0;
if constexpr ((sizeof(TUInt) <= 8) || ((sizeof(TUInt) % 8) != 0))
{
for (size_t i = 0; i < sizeof(TUInt) * 2; ++i, ++data)
{
res <<= 4;
res += unhex(*data);
}
}
else
{
for (size_t i = 0; i < sizeof(TUInt) / 8; ++i, data += 16)
{
res <<= 64;
res += unhexUInt<UInt64>(data);
}
}
return res;
}

13
base/base/interpolate.h Normal file
View File

@ -0,0 +1,13 @@
#pragma once
#include <cassert>
#include <cmath>
/** Linear interpolation in logarithmic coordinates.
* Exponential interpolation is related to linear interpolation
* exactly in same way as geometric mean is related to arithmetic mean.
*/
constexpr double interpolateExponential(double min, double max, double ratio)
{
assert(min > 0 && ratio >= 0 && ratio <= 1);
return min * std::pow(max / min, ratio);
}

View File

@ -115,6 +115,13 @@ configure_file("${ORC_SOURCE_SRC_DIR}/Adaptor.hh.in" "${ORC_BUILD_INCLUDE_DIR}/A
# ARROW_ORC + adapters/orc/CMakefiles
set(ORC_SRCS
"${CMAKE_CURRENT_BINARY_DIR}/orc_proto.pb.h"
"${ORC_SOURCE_SRC_DIR}/sargs/ExpressionTree.cc"
"${ORC_SOURCE_SRC_DIR}/sargs/Literal.cc"
"${ORC_SOURCE_SRC_DIR}/sargs/PredicateLeaf.cc"
"${ORC_SOURCE_SRC_DIR}/sargs/SargsApplier.cc"
"${ORC_SOURCE_SRC_DIR}/sargs/SearchArgument.cc"
"${ORC_SOURCE_SRC_DIR}/sargs/TruthValue.cc"
"${ORC_SOURCE_SRC_DIR}/Exceptions.cc"
"${ORC_SOURCE_SRC_DIR}/OrcFile.cc"
"${ORC_SOURCE_SRC_DIR}/Reader.cc"
@ -129,13 +136,20 @@ set(ORC_SRCS
"${ORC_SOURCE_SRC_DIR}/MemoryPool.cc"
"${ORC_SOURCE_SRC_DIR}/RLE.cc"
"${ORC_SOURCE_SRC_DIR}/RLEv1.cc"
"${ORC_SOURCE_SRC_DIR}/RLEv2.cc"
"${ORC_SOURCE_SRC_DIR}/RleDecoderV2.cc"
"${ORC_SOURCE_SRC_DIR}/RleEncoderV2.cc"
"${ORC_SOURCE_SRC_DIR}/RLEV2Util.cc"
"${ORC_SOURCE_SRC_DIR}/Statistics.cc"
"${ORC_SOURCE_SRC_DIR}/StripeStream.cc"
"${ORC_SOURCE_SRC_DIR}/Timezone.cc"
"${ORC_SOURCE_SRC_DIR}/TypeImpl.cc"
"${ORC_SOURCE_SRC_DIR}/Vector.cc"
"${ORC_SOURCE_SRC_DIR}/Writer.cc"
"${ORC_SOURCE_SRC_DIR}/Adaptor.cc"
"${ORC_SOURCE_SRC_DIR}/BloomFilter.cc"
"${ORC_SOURCE_SRC_DIR}/Murmur3.cc"
"${ORC_SOURCE_SRC_DIR}/BlockBuffer.cc"
"${ORC_SOURCE_SRC_DIR}/wrap/orc-proto-wrapper.cc"
"${ORC_SOURCE_SRC_DIR}/io/InputStream.cc"
"${ORC_SOURCE_SRC_DIR}/io/OutputStream.cc"
"${ORC_ADDITION_SOURCE_DIR}/orc_proto.pb.cc"
@ -358,6 +372,9 @@ SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_zlib.cc" ${ARROW_SRCS})
add_definitions(-DARROW_WITH_ZSTD)
SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_zstd.cc" ${ARROW_SRCS})
add_definitions(-DARROW_WITH_BROTLI)
SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_brotli.cc" ${ARROW_SRCS})
add_library(_arrow ${ARROW_SRCS})
@ -372,6 +389,7 @@ target_link_libraries(_arrow PRIVATE
ch_contrib::snappy
ch_contrib::zlib
ch_contrib::zstd
ch_contrib::brotli
)
target_link_libraries(_arrow PUBLIC _orc)

2
contrib/orc vendored

@ -1 +1 @@
Subproject commit f9a393ed2433a60034795284f82d093b348f2102
Subproject commit c5d7755ba0b9a95631c8daea4d094101f26ec761

View File

@ -67,7 +67,7 @@ It generally means that the SSH keys for connecting to GitHub are missing. These
You can also clone the repository via https protocol:
git clone --recursive--shallow-submodules https://github.com/ClickHouse/ClickHouse.git
git clone --recursive --shallow-submodules https://github.com/ClickHouse/ClickHouse.git
This, however, will not let you send your changes to the server. You can still use it temporarily and add the SSH keys later replacing the remote address of the repository with `git remote` command.

View File

@ -19,8 +19,8 @@ Kafka lets you:
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
name1 [type1],
name2 [type2],
...
) ENGINE = Kafka()
SETTINGS
@ -113,6 +113,10 @@ Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
</details>
:::info
The Kafka table engine doesn't support columns with [default value](../../../sql-reference/statements/create/table.md#default_value). If you need columns with default value, you can add them at materialized view level (see below).
:::
## Description {#description}
The delivered messages are tracked automatically, so each message in a group is only counted once. If you want to get the data twice, then create a copy of the table with another group name.

View File

@ -1981,6 +1981,7 @@ To exchange data with Hadoop, you can use [HDFS table engine](/docs/en/engines/t
- [input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference) - allow skipping columns with unsupported types while schema inference for Parquet format. Default value - `false`.
- [output_format_parquet_fixed_string_as_fixed_byte_array](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_fixed_string_as_fixed_byte_array) - use Parquet FIXED_LENGTH_BYTE_ARRAY type instead of Binary/String for FixedString columns. Default value - `true`.
- [output_format_parquet_version](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_version) - The version of Parquet format used in output format. Default value - `2.latest`.
- [output_format_parquet_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_compression_method) - compression method used in output Parquet format. Default value - `snappy`.
## Arrow {#data-format-arrow}
@ -2051,6 +2052,7 @@ $ clickhouse-client --query="SELECT * FROM {some_table} FORMAT Arrow" > {filenam
- [input_format_arrow_allow_missing_columns](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_allow_missing_columns) - allow missing columns while reading Arrow data. Default value - `false`.
- [input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference) - allow skipping columns with unsupported types while schema inference for Arrow format. Default value - `false`.
- [output_format_arrow_fixed_string_as_fixed_byte_array](/docs/en/operations/settings/settings-formats.md/#output_format_arrow_fixed_string_as_fixed_byte_array) - use Arrow FIXED_SIZE_BINARY type instead of Binary/String for FixedString columns. Default value - `true`.
- [output_format_arrow_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_arrow_compression_method) - compression method used in output Arrow format. Default value - `none`.
## ArrowStream {#data-format-arrow-stream}
@ -2107,6 +2109,7 @@ $ clickhouse-client --query="SELECT * FROM {some_table} FORMAT ORC" > {filename.
### Arrow format settings {#parquet-format-settings}
- [output_format_arrow_string_as_string](/docs/en/operations/settings/settings-formats.md/#output_format_arrow_string_as_string) - use Arrow String type instead of Binary for String columns. Default value - `false`.
- [output_format_orc_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_orc_compression_method) - compression method used in output ORC format. Default value - `none`.
- [input_format_arrow_import_nested](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_import_nested) - allow inserting array of structs into Nested table in Arrow input format. Default value - `false`.
- [input_format_arrow_case_insensitive_column_matching](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_case_insensitive_column_matching) - ignore case when matching Arrow columns with ClickHouse columns. Default value - `false`.
- [input_format_arrow_allow_missing_columns](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_allow_missing_columns) - allow missing columns while reading Arrow data. Default value - `false`.

View File

@ -967,6 +967,7 @@ The maximum number of jobs that can be scheduled on the Global Thread pool. Incr
Possible values:
- Positive integer.
- 0 — No limit.
Default value: `10000`.
@ -976,6 +977,69 @@ Default value: `10000`.
<thread_pool_queue_size>12000</thread_pool_queue_size>
```
## max_io_thread_pool_size {#max-io-thread-pool-size}
ClickHouse uses threads from the IO Thread pool to do some IO operations (e.g. to interact with S3). `max_io_thread_pool_size` limits the maximum number of threads in the pool.
Possible values:
- Positive integer.
Default value: `100`.
## max_io_thread_pool_free_size {#max-io-thread-pool-free-size}
If the number of **idle** threads in the IO Thread pool exceeds `max_io_thread_pool_free_size`, ClickHouse will release resources occupied by idling threads and decrease the pool size. Threads can be created again if necessary.
Possible values:
- Positive integer.
Default value: `0`.
## io_thread_pool_queue_size {#io-thread-pool-queue-size}
The maximum number of jobs that can be scheduled on the IO Thread pool.
Possible values:
- Positive integer.
- 0 — No limit.
Default value: `10000`.
## max_backups_io_thread_pool_size {#max-backups-io-thread-pool-size}
ClickHouse uses threads from the Backups IO Thread pool to do S3 backup IO operations. `max_backups_io_thread_pool_size` limits the maximum number of threads in the pool.
Possible values:
- Positive integer.
Default value: `1000`.
## max_backups_io_thread_pool_free_size {#max-backups-io-thread-pool-free-size}
If the number of **idle** threads in the Backups IO Thread pool exceeds `max_backup_io_thread_pool_free_size`, ClickHouse will release resources occupied by idling threads and decrease the pool size. Threads can be created again if necessary.
Possible values:
- Positive integer.
- Zero.
Default value: `0`.
## backups_io_thread_pool_queue_size {#backups-io-thread-pool-queue-size}
The maximum number of jobs that can be scheduled on the Backups IO Thread pool. It is recommended to keep this queue unlimited due to the current S3 backup logic.
Possible values:
- Positive integer.
- 0 — No limit.
Default value: `0`.
## background_pool_size {#background_pool_size}
Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance.

View File

@ -1014,6 +1014,12 @@ Use Arrow FIXED_SIZE_BINARY type instead of Binary/String for FixedString column
Enabled by default.
### output_format_arrow_compression_method {#output_format_arrow_compression_method}
Compression method used in output Arrow format. Supported codecs: `lz4_frame`, `zstd`, `none` (uncompressed)
Default value: `none`.
## ORC format settings {#orc-format-settings}
### input_format_orc_import_nested {#input_format_orc_import_nested}
@ -1057,6 +1063,12 @@ Use ORC String type instead of Binary for String columns.
Disabled by default.
### output_format_orc_compression_method {#output_format_orc_compression_method}
Compression method used in output ORC format. Supported codecs: `lz4`, `snappy`, `zlib`, `zstd`, `none` (uncompressed)
Default value: `none`.
## Parquet format settings {#parquet-format-settings}
### input_format_parquet_import_nested {#input_format_parquet_import_nested}
@ -1112,6 +1124,12 @@ The version of Parquet format used in output format. Supported versions: `1.0`,
Default value: `2.latest`.
### output_format_parquet_compression_method {#output_format_parquet_compression_method}
Compression method used in output Parquet format. Supported codecs: `snappy`, `lz4`, `brotli`, `zstd`, `gzip`, `none` (uncompressed)
Default value: `snappy`.
## Hive format settings {#hive-format-settings}
### input_format_hive_text_fields_delimiter {#input_format_hive_text_fields_delimiter}

View File

@ -1248,7 +1248,9 @@ Possible values:
Default value: 1.
:::warning
Disable this setting if you use [max_parallel_replicas](#settings-max_parallel_replicas).
Disable this setting if you use [max_parallel_replicas](#settings-max_parallel_replicas) without [parallel_replicas_custom_key](#settings-parallel_replicas_custom_key).
If [parallel_replicas_custom_key](#settings-parallel_replicas_custom_key) is set, disable this setting only if it's used on a cluster with multiple shards containing multiple replicas.
If it's used on a cluster with a single shard and multiple replicas, disabling this setting will have negative effects.
:::
## totals_mode {#totals-mode}
@ -1273,16 +1275,47 @@ Default value: `1`.
**Additional Info**
This setting is useful for replicated tables with a sampling key. A query may be processed faster if it is executed on several servers in parallel. But the query performance may degrade in the following cases:
This options will produce different results depending on the settings used.
:::warning
This setting will produce incorrect results when joins or subqueries are involved, and all tables don't meet certain requirements. See [Distributed Subqueries and max_parallel_replicas](../../sql-reference/operators/in.md/#max_parallel_replica-subqueries) for more details.
:::
### Parallel processing using `SAMPLE` key
A query may be processed faster if it is executed on several servers in parallel. But the query performance may degrade in the following cases:
- The position of the sampling key in the partitioning key does not allow efficient range scans.
- Adding a sampling key to the table makes filtering by other columns less efficient.
- The sampling key is an expression that is expensive to calculate.
- The cluster latency distribution has a long tail, so that querying more servers increases the query overall latency.
:::warning
This setting will produce incorrect results when joins or subqueries are involved, and all tables don't meet certain requirements. See [Distributed Subqueries and max_parallel_replicas](../../sql-reference/operators/in.md/#max_parallel_replica-subqueries) for more details.
:::
### Parallel processing using [parallel_replicas_custom_key](#settings-parallel_replicas_custom_key)
This setting is useful for any replicated table.
## parallel_replicas_custom_key {#settings-parallel_replicas_custom_key}
An arbitrary integer expression that can be used to split work between replicas for a specific table.
The value can be any integer expression.
A query may be processed faster if it is executed on several servers in parallel but it depends on the used [parallel_replicas_custom_key](#settings-parallel_replicas_custom_key)
and [parallel_replicas_custom_key_filter_type](#settings-parallel_replicas_custom_key_filter_type).
Simple expressions using primary keys are preferred.
If the setting is used on a cluster that consists of a single shard with multiple replicas, those replicas will be converted into virtual shards.
Otherwise, it will behave same as for `SAMPLE` key, it will use multiple replicas of each shard.
## parallel_replicas_custom_key_filter_type {#settings-parallel_replicas_custom_key_filter_type}
How to use `parallel_replicas_custom_key` expression for splitting work between replicas.
Possible values:
- `default` — Use the default implementation using modulo operation on the `parallel_replicas_custom_key`.
- `range` — Split the entire value space of the expression in the ranges. This type of filtering is useful if values of `parallel_replicas_custom_key` are uniformly spread across the entire integer space, e.g. hash values.
Default value: `default`.
## compile_expressions {#compile-expressions}

View File

@ -330,7 +330,7 @@ repeat(s, n)
**Arguments**
- `s` — The string to repeat. [String](../../sql-reference/data-types/string.md).
- `n` — The number of times to repeat the string. [UInt](../../sql-reference/data-types/int-uint.md).
- `n` — The number of times to repeat the string. [UInt or Int](../../sql-reference/data-types/int-uint.md).
**Returned value**

View File

@ -233,8 +233,9 @@ If `some_predicate` is not selective enough, it will return large amount of data
### Distributed Subqueries and max_parallel_replicas
When max_parallel_replicas is greater than 1, distributed queries are further transformed. For example, the following:
When [max_parallel_replicas](#settings-max_parallel_replicas) is greater than 1, distributed queries are further transformed.
For example, the following:
```sql
SELECT CounterID, count() FROM distributed_table_1 WHERE UserID IN (SELECT UserID FROM local_table_2 WHERE CounterID < 100)
SETTINGS max_parallel_replicas=3
@ -247,8 +248,12 @@ SELECT CounterID, count() FROM local_table_1 WHERE UserID IN (SELECT UserID FROM
SETTINGS parallel_replicas_count=3, parallel_replicas_offset=M
```
where M is between 1 and 3 depending on which replica the local query is executing on. These settings affect every MergeTree-family table in the query and have the same effect as applying `SAMPLE 1/3 OFFSET (M-1)/3` on each table.
where M is between 1 and 3 depending on which replica the local query is executing on.
Therefore adding the max_parallel_replicas setting will only produce correct results if both tables have the same replication scheme and are sampled by UserID or a subkey of it. In particular, if local_table_2 does not have a sampling key, incorrect results will be produced. The same rule applies to JOIN.
These settings affect every MergeTree-family table in the query and have the same effect as applying `SAMPLE 1/3 OFFSET (M-1)/3` on each table.
Therefore adding the [max_parallel_replicas](#settings-max_parallel_replicas) setting will only produce correct results if both tables have the same replication scheme and are sampled by UserID or a subkey of it. In particular, if local_table_2 does not have a sampling key, incorrect results will be produced. The same rule applies to JOIN.
One workaround if local_table_2 does not meet the requirements, is to use `GLOBAL IN` or `GLOBAL JOIN`.
If a table doesn't have a sampling key, more flexible options for [parallel_replicas_custom_key](#settings-parallel_replicas_custom_key) can be used that can produce different and more optimal behaviour.

View File

@ -110,7 +110,7 @@ If the type is not `Nullable` and if `NULL` is specified, it will be treated as
See also [data_type_default_nullable](../../../operations/settings/settings.md#data_type_default_nullable) setting.
## Default Values
## Default Values {#default_values}
The column description can specify an expression for a default value, in one of the following ways: `DEFAULT expr`, `MATERIALIZED expr`, `ALIAS expr`.
@ -576,7 +576,7 @@ SELECT * FROM base.t1;
You can add a comment to the table when you creating it.
:::note
The comment is supported for all table engines except [Kafka](../../../engines/table-engines/integrations/kafka.md), [RabbitMQ](../../../engines/table-engines/integrations/rabbitmq.md) and [EmbeddedRocksDB](../../../engines/table-engines/integrations/embedded-rocksdb.md).
The comment clause is supported by all table engines except [Kafka](../../../engines/table-engines/integrations/kafka.md), [RabbitMQ](../../../engines/table-engines/integrations/rabbitmq.md) and [EmbeddedRocksDB](../../../engines/table-engines/integrations/embedded-rocksdb.md).
:::

View File

@ -70,6 +70,12 @@ A materialized view is implemented as follows: when inserting data to the table
Materialized views in ClickHouse use **column names** instead of column order during insertion into destination table. If some column names are not present in the `SELECT` query result, ClickHouse uses a default value, even if the column is not [Nullable](../../data-types/nullable.md). A safe practice would be to add aliases for every column when using Materialized views.
Materialized views in ClickHouse are implemented more like insert triggers. If theres some aggregation in the view query, its applied only to the batch of freshly inserted data. Any changes to existing data of source table (like update, delete, drop partition, etc.) does not change the materialized view.
Materialized views in ClickHouse do not have deterministic behaviour in case of errors. This means that blocks that had been already written will be preserved in the destination table, but all blocks after error will not.
By default if pushing to one of views fails, then the INSERT query will fail too, and some blocks may not be written to the destination table. This can be changed using `materialized_views_ignore_errors` setting (you should set it for `INSERT` query), if you will set `materialized_views_ignore_errors=true`, then any errors while pushing to views will be ignored and all blocks will be written to the destination table.
Also note, that `materialized_views_ignore_errors` set to `true` by default for `system.*_log` tables.
:::
If you specify `POPULATE`, the existing table data is inserted into the view when creating it, as if making a `CREATE TABLE ... AS SELECT ...` . Otherwise, the query contains only the data inserted in the table after creating the view. We **do not recommend** using `POPULATE`, since data inserted in the table during the view creation will not be inserted in it.

View File

@ -20,7 +20,7 @@
#include <Common/formatReadable.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/OpenSSLHelpers.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <Common/getResource.h>
#include <base/sleep.h>
#include <IO/ReadBufferFromFileDescriptor.h>

View File

@ -44,10 +44,11 @@ private:
friend class RoleCache;
explicit EnabledRoles(const Params & params_);
void setRolesInfo(const std::shared_ptr<const EnabledRolesInfo> & info_, scope_guard * notifications);
const Params params;
/// Called by RoleCache to store `EnabledRolesInfo` in this `EnabledRoles` after the calculation is done.
void setRolesInfo(const std::shared_ptr<const EnabledRolesInfo> & info_, scope_guard * notifications);
std::shared_ptr<const EnabledRolesInfo> info;
mutable std::mutex info_mutex;

View File

@ -57,7 +57,9 @@ namespace
RoleCache::RoleCache(const AccessControl & access_control_)
: access_control(access_control_), cache(600000 /* 10 minutes */) {}
: access_control(access_control_), cache(600000 /* 10 minutes */)
{
}
RoleCache::~RoleCache() = default;
@ -70,18 +72,18 @@ RoleCache::getEnabledRoles(const std::vector<UUID> & roles, const std::vector<UU
EnabledRoles::Params params;
params.current_roles.insert(roles.begin(), roles.end());
params.current_roles_with_admin_option.insert(roles_with_admin_option.begin(), roles_with_admin_option.end());
auto it = enabled_roles.find(params);
if (it != enabled_roles.end())
auto it = enabled_roles_by_params.find(params);
if (it != enabled_roles_by_params.end())
{
auto from_cache = it->second.lock();
if (from_cache)
return from_cache;
enabled_roles.erase(it);
if (auto enabled_roles = it->second.enabled_roles.lock())
return enabled_roles;
enabled_roles_by_params.erase(it);
}
auto res = std::shared_ptr<EnabledRoles>(new EnabledRoles(params));
collectEnabledRoles(*res, nullptr);
enabled_roles.emplace(std::move(params), res);
SubscriptionsOnRoles subscriptions_on_roles;
collectEnabledRoles(*res, subscriptions_on_roles, nullptr);
enabled_roles_by_params.emplace(std::move(params), EnabledRolesWithSubscriptions{res, std::move(subscriptions_on_roles)});
return res;
}
@ -90,21 +92,23 @@ void RoleCache::collectEnabledRoles(scope_guard * notifications)
{
/// `mutex` is already locked.
for (auto i = enabled_roles.begin(), e = enabled_roles.end(); i != e;)
for (auto i = enabled_roles_by_params.begin(), e = enabled_roles_by_params.end(); i != e;)
{
auto elem = i->second.lock();
if (!elem)
i = enabled_roles.erase(i);
auto & item = i->second;
if (auto enabled_roles = item.enabled_roles.lock())
{
collectEnabledRoles(*enabled_roles, item.subscriptions_on_roles, notifications);
++i;
}
else
{
collectEnabledRoles(*elem, notifications);
++i;
i = enabled_roles_by_params.erase(i);
}
}
}
void RoleCache::collectEnabledRoles(EnabledRoles & enabled, scope_guard * notifications)
void RoleCache::collectEnabledRoles(EnabledRoles & enabled_roles, SubscriptionsOnRoles & subscriptions_on_roles, scope_guard * notifications)
{
/// `mutex` is already locked.
@ -112,43 +116,57 @@ void RoleCache::collectEnabledRoles(EnabledRoles & enabled, scope_guard * notifi
auto new_info = std::make_shared<EnabledRolesInfo>();
boost::container::flat_set<UUID> skip_ids;
auto get_role_function = [this](const UUID & id) { return getRole(id); };
/// We need to collect and keep not only enabled roles but also subscriptions for them to be able to recalculate EnabledRolesInfo when some of the roles change.
SubscriptionsOnRoles new_subscriptions_on_roles;
new_subscriptions_on_roles.reserve(subscriptions_on_roles.size());
for (const auto & current_role : enabled.params.current_roles)
auto get_role_function = [this, &subscriptions_on_roles](const UUID & id) TSA_NO_THREAD_SAFETY_ANALYSIS { return getRole(id, subscriptions_on_roles); };
for (const auto & current_role : enabled_roles.params.current_roles)
collectRoles(*new_info, skip_ids, get_role_function, current_role, true, false);
for (const auto & current_role : enabled.params.current_roles_with_admin_option)
for (const auto & current_role : enabled_roles.params.current_roles_with_admin_option)
collectRoles(*new_info, skip_ids, get_role_function, current_role, true, true);
/// Remove duplicates from `subscriptions_on_roles`.
std::sort(new_subscriptions_on_roles.begin(), new_subscriptions_on_roles.end());
new_subscriptions_on_roles.erase(std::unique(new_subscriptions_on_roles.begin(), new_subscriptions_on_roles.end()), new_subscriptions_on_roles.end());
subscriptions_on_roles = std::move(new_subscriptions_on_roles);
/// Collect data from the collected roles.
enabled.setRolesInfo(new_info, notifications);
enabled_roles.setRolesInfo(new_info, notifications);
}
RolePtr RoleCache::getRole(const UUID & role_id)
RolePtr RoleCache::getRole(const UUID & role_id, SubscriptionsOnRoles & subscriptions_on_roles)
{
/// `mutex` is already locked.
auto role_from_cache = cache.get(role_id);
if (role_from_cache)
{
subscriptions_on_roles.emplace_back(role_from_cache->second);
return role_from_cache->first;
}
auto subscription = access_control.subscribeForChanges(role_id,
[this, role_id](const UUID &, const AccessEntityPtr & entity)
auto on_role_changed_or_removed = [this, role_id](const UUID &, const AccessEntityPtr & entity)
{
auto changed_role = entity ? typeid_cast<RolePtr>(entity) : nullptr;
if (changed_role)
roleChanged(role_id, changed_role);
else
roleRemoved(role_id);
});
};
auto subscription_on_role = std::make_shared<scope_guard>(access_control.subscribeForChanges(role_id, on_role_changed_or_removed));
auto role = access_control.tryRead<Role>(role_id);
if (role)
{
auto cache_value = Poco::SharedPtr<std::pair<RolePtr, scope_guard>>(
new std::pair<RolePtr, scope_guard>{role, std::move(subscription)});
auto cache_value = Poco::SharedPtr<std::pair<RolePtr, std::shared_ptr<scope_guard>>>(
new std::pair<RolePtr, std::shared_ptr<scope_guard>>{role, subscription_on_role});
cache.add(role_id, cache_value);
subscriptions_on_roles.emplace_back(subscription_on_role);
return role;
}
@ -162,12 +180,17 @@ void RoleCache::roleChanged(const UUID & role_id, const RolePtr & changed_role)
scope_guard notifications;
std::lock_guard lock{mutex};
auto role_from_cache = cache.get(role_id);
if (!role_from_cache)
return;
role_from_cache->first = changed_role;
cache.update(role_id, role_from_cache);
collectEnabledRoles(&notifications);
if (role_from_cache)
{
/// We update the role stored in a cache entry only if that entry has not expired yet.
role_from_cache->first = changed_role;
cache.update(role_id, role_from_cache);
}
/// An enabled role for some users has been changed, we need to recalculate the access rights.
collectEnabledRoles(&notifications); /// collectEnabledRoles() must be called with the `mutex` locked.
}
@ -177,8 +200,12 @@ void RoleCache::roleRemoved(const UUID & role_id)
scope_guard notifications;
std::lock_guard lock{mutex};
/// If a cache entry with the role has expired already, that remove() will do nothing.
cache.remove(role_id);
collectEnabledRoles(&notifications);
/// An enabled role for some users has been removed, we need to recalculate the access rights.
collectEnabledRoles(&notifications); /// collectEnabledRoles() must be called with the `mutex` locked.
}
}

View File

@ -24,15 +24,29 @@ public:
const std::vector<UUID> & current_roles_with_admin_option);
private:
void collectEnabledRoles(scope_guard * notifications);
void collectEnabledRoles(EnabledRoles & enabled, scope_guard * notifications);
RolePtr getRole(const UUID & role_id);
using SubscriptionsOnRoles = std::vector<std::shared_ptr<scope_guard>>;
void collectEnabledRoles(scope_guard * notifications) TSA_REQUIRES(mutex);
void collectEnabledRoles(EnabledRoles & enabled_roles, SubscriptionsOnRoles & subscriptions_on_roles, scope_guard * notifications) TSA_REQUIRES(mutex);
RolePtr getRole(const UUID & role_id, SubscriptionsOnRoles & subscriptions_on_roles) TSA_REQUIRES(mutex);
void roleChanged(const UUID & role_id, const RolePtr & changed_role);
void roleRemoved(const UUID & role_id);
const AccessControl & access_control;
Poco::AccessExpireCache<UUID, std::pair<RolePtr, scope_guard>> cache;
std::map<EnabledRoles::Params, std::weak_ptr<EnabledRoles>> enabled_roles;
Poco::AccessExpireCache<UUID, std::pair<RolePtr, std::shared_ptr<scope_guard>>> TSA_GUARDED_BY(mutex) cache;
struct EnabledRolesWithSubscriptions
{
std::weak_ptr<EnabledRoles> enabled_roles;
/// We need to keep subscriptions for all enabled roles to be able to recalculate EnabledRolesInfo when some of the roles change.
/// `cache` also keeps subscriptions but that's not enough because values can be purged from the `cache` anytime.
SubscriptionsOnRoles subscriptions_on_roles;
};
std::map<EnabledRoles::Params, EnabledRolesWithSubscriptions> TSA_GUARDED_BY(mutex) enabled_roles_by_params;
mutable std::mutex mutex;
};

View File

@ -3,6 +3,7 @@
#include <Core/NamesAndTypes.h>
#include <Analyzer/IQueryTreeNode.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
@ -117,6 +118,11 @@ public:
return column.type;
}
void convertToNullable() override
{
column.type = makeNullableSafe(column.type);
}
void dumpTreeImpl(WriteBuffer & buffer, FormatState & state, size_t indent) const override;
protected:

View File

@ -99,7 +99,7 @@ void FunctionNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state
buffer << ", function_type: " << function_type;
if (function)
buffer << ", result_type: " + function->getResultType()->getName();
buffer << ", result_type: " + getResultType()->getName();
const auto & parameters = getParameters();
if (!parameters.getNodes().empty())
@ -177,6 +177,7 @@ QueryTreeNodePtr FunctionNode::cloneImpl() const
*/
result_function->function = function;
result_function->kind = kind;
result_function->wrap_with_nullable = wrap_with_nullable;
return result_function;
}

View File

@ -8,6 +8,7 @@
#include <Common/typeid_cast.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Core/IResolvedFunction.h>
#include <DataTypes/DataTypeNullable.h>
#include <Functions/IFunction.h>
namespace DB
@ -187,7 +188,16 @@ public:
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Function node with name '{}' is not resolved",
function_name);
return function->getResultType();
auto type = function->getResultType();
if (wrap_with_nullable)
return makeNullableSafe(type);
return type;
}
void convertToNullable() override
{
chassert(kind == FunctionKind::ORDINARY);
wrap_with_nullable = true;
}
void dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, size_t indent) const override;
@ -205,6 +215,7 @@ private:
String function_name;
FunctionKind kind = FunctionKind::UNKNOWN;
IResolvedFunctionPtr function;
bool wrap_with_nullable = false;
static constexpr size_t parameters_child_index = 0;
static constexpr size_t arguments_child_index = 1;

View File

@ -90,6 +90,11 @@ public:
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Method getResultType is not supported for {} query node", getNodeTypeName());
}
virtual void convertToNullable()
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Method convertToNullable is not supported for {} query node", getNodeTypeName());
}
struct CompareOptions
{
bool compare_aliases = true;

View File

@ -1,3 +1,5 @@
#include <Analyzer/Passes/ArrayExistsToHasPass.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
@ -8,71 +10,85 @@
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/LambdaNode.h>
#include "ArrayExistsToHasPass.h"
namespace DB
{
namespace
{
class RewriteArrayExistsToHasVisitor : public InDepthQueryTreeVisitorWithContext<RewriteArrayExistsToHasVisitor>
class RewriteArrayExistsToHasVisitor : public InDepthQueryTreeVisitorWithContext<RewriteArrayExistsToHasVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<RewriteArrayExistsToHasVisitor>;
using Base::Base;
void visitImpl(QueryTreeNodePtr & node)
{
public:
using Base = InDepthQueryTreeVisitorWithContext<RewriteArrayExistsToHasVisitor>;
using Base::Base;
if (!getSettings().optimize_rewrite_array_exists_to_has)
return;
void visitImpl(QueryTreeNodePtr & node)
auto * array_exists_function_node = node->as<FunctionNode>();
if (!array_exists_function_node || array_exists_function_node->getFunctionName() != "arrayExists")
return;
auto & array_exists_function_arguments_nodes = array_exists_function_node->getArguments().getNodes();
if (array_exists_function_arguments_nodes.size() != 2)
return;
/// lambda function must be like: x -> x = elem
auto * lambda_node = array_exists_function_arguments_nodes[0]->as<LambdaNode>();
if (!lambda_node)
return;
auto & lambda_arguments_nodes = lambda_node->getArguments().getNodes();
if (lambda_arguments_nodes.size() != 1)
return;
const auto & lambda_argument_column_node = lambda_arguments_nodes[0];
if (lambda_argument_column_node->getNodeType() != QueryTreeNodeType::COLUMN)
return;
auto * filter_node = lambda_node->getExpression()->as<FunctionNode>();
if (!filter_node || filter_node->getFunctionName() != "equals")
return;
const auto & filter_arguments_nodes = filter_node->getArguments().getNodes();
if (filter_arguments_nodes.size() != 2)
return;
const auto & filter_lhs_argument_node = filter_arguments_nodes[0];
auto filter_lhs_argument_node_type = filter_lhs_argument_node->getNodeType();
const auto & filter_rhs_argument_node = filter_arguments_nodes[1];
auto filter_rhs_argument_node_type = filter_rhs_argument_node->getNodeType();
QueryTreeNodePtr has_constant_element_argument;
if (filter_lhs_argument_node_type == QueryTreeNodeType::COLUMN &&
filter_rhs_argument_node_type == QueryTreeNodeType::CONSTANT &&
filter_lhs_argument_node->isEqual(*lambda_argument_column_node))
{
if (!getSettings().optimize_rewrite_array_exists_to_has)
return;
auto * function_node = node->as<FunctionNode>();
if (!function_node || function_node->getFunctionName() != "arrayExists")
return;
auto & function_arguments_nodes = function_node->getArguments().getNodes();
if (function_arguments_nodes.size() != 2)
return;
/// lambda function must be like: x -> x = elem
auto * lambda_node = function_arguments_nodes[0]->as<LambdaNode>();
if (!lambda_node)
return;
auto & lambda_arguments_nodes = lambda_node->getArguments().getNodes();
if (lambda_arguments_nodes.size() != 1)
return;
auto * column_node = lambda_arguments_nodes[0]->as<ColumnNode>();
auto * filter_node = lambda_node->getExpression()->as<FunctionNode>();
if (!filter_node || filter_node->getFunctionName() != "equals")
return;
auto filter_arguments_nodes = filter_node->getArguments().getNodes();
if (filter_arguments_nodes.size() != 2)
return;
ColumnNode * filter_column_node = nullptr;
if (filter_arguments_nodes[1]->as<ConstantNode>() && (filter_column_node = filter_arguments_nodes[0]->as<ColumnNode>())
&& filter_column_node->getColumnName() == column_node->getColumnName())
{
/// Rewrite arrayExists(x -> x = elem, arr) -> has(arr, elem)
function_arguments_nodes[0] = std::move(function_arguments_nodes[1]);
function_arguments_nodes[1] = std::move(filter_arguments_nodes[1]);
function_node->resolveAsFunction(
FunctionFactory::instance().get("has", getContext())->build(function_node->getArgumentColumns()));
}
else if (
filter_arguments_nodes[0]->as<ConstantNode>() && (filter_column_node = filter_arguments_nodes[1]->as<ColumnNode>())
&& filter_column_node->getColumnName() == column_node->getColumnName())
{
/// Rewrite arrayExists(x -> elem = x, arr) -> has(arr, elem)
function_arguments_nodes[0] = std::move(function_arguments_nodes[1]);
function_arguments_nodes[1] = std::move(filter_arguments_nodes[0]);
function_node->resolveAsFunction(
FunctionFactory::instance().get("has", getContext())->build(function_node->getArgumentColumns()));
}
/// Rewrite arrayExists(x -> x = elem, arr) -> has(arr, elem)
has_constant_element_argument = filter_rhs_argument_node;
}
};
else if (filter_lhs_argument_node_type == QueryTreeNodeType::CONSTANT &&
filter_rhs_argument_node_type == QueryTreeNodeType::COLUMN &&
filter_rhs_argument_node->isEqual(*lambda_argument_column_node))
{
/// Rewrite arrayExists(x -> elem = x, arr) -> has(arr, elem)
has_constant_element_argument = filter_lhs_argument_node;
}
else
{
return;
}
auto has_function = FunctionFactory::instance().get("has", getContext());
array_exists_function_arguments_nodes[0] = std::move(array_exists_function_arguments_nodes[1]);
array_exists_function_arguments_nodes[1] = std::move(has_constant_element_argument);
array_exists_function_node->resolveAsFunction(has_function->build(array_exists_function_node->getArgumentColumns()));
}
};
}

View File

@ -4,8 +4,15 @@
namespace DB
{
/// Rewrite possible 'arrayExists(func, arr)' to 'has(arr, elem)' to improve performance
/// arrayExists(x -> x = 1, arr) -> has(arr, 1)
/** Rewrite possible 'arrayExists(func, arr)' to 'has(arr, elem)' to improve performance.
*
* Example: SELECT arrayExists(x -> x = 1, arr);
* Result: SELECT has(arr, 1);
*
* Example: SELECT arrayExists(x -> 1 = x, arr);
* Result: SELECT has(arr, 1);
*/
class RewriteArrayExistsToHasPass final : public IQueryTreePass
{
public:
@ -15,4 +22,5 @@ public:
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
};
}

View File

@ -22,8 +22,7 @@ public:
void visitImpl(QueryTreeNodePtr & node)
{
const auto & context = getContext();
if (!context->getSettingsRef().final)
if (!getSettings().final)
return;
const auto * query_node = node->as<QueryNode>();

View File

@ -0,0 +1,237 @@
#include <Analyzer/Passes/LogicalExpressionOptimizerPass.h>
#include <Functions/FunctionFactory.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/ConstantNode.h>
#include <Analyzer/HashUtils.h>
#include <DataTypes/DataTypeString.h>
namespace DB
{
class LogicalExpressionOptimizerVisitor : public InDepthQueryTreeVisitorWithContext<LogicalExpressionOptimizerVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<LogicalExpressionOptimizerVisitor>;
explicit LogicalExpressionOptimizerVisitor(ContextPtr context)
: Base(std::move(context))
{}
void visitImpl(QueryTreeNodePtr & node)
{
auto * function_node = node->as<FunctionNode>();
if (!function_node)
return;
if (function_node->getFunctionName() == "or")
{
tryReplaceOrEqualsChainWithIn(node);
return;
}
if (function_node->getFunctionName() == "and")
{
tryReplaceAndEqualsChainsWithConstant(node);
return;
}
}
private:
void tryReplaceAndEqualsChainsWithConstant(QueryTreeNodePtr & node)
{
auto & function_node = node->as<FunctionNode &>();
assert(function_node.getFunctionName() == "and");
if (function_node.getResultType()->isNullable())
return;
QueryTreeNodes and_operands;
QueryTreeNodePtrWithHashMap<const ConstantNode *> node_to_constants;
for (const auto & argument : function_node.getArguments())
{
auto * argument_function = argument->as<FunctionNode>();
if (!argument_function || argument_function->getFunctionName() != "equals")
{
and_operands.push_back(argument);
continue;
}
const auto & equals_arguments = argument_function->getArguments().getNodes();
const auto & lhs = equals_arguments[0];
const auto & rhs = equals_arguments[1];
const auto has_and_with_different_constant = [&](const QueryTreeNodePtr & expression, const ConstantNode * constant)
{
if (auto it = node_to_constants.find(expression); it != node_to_constants.end())
{
if (!it->second->isEqual(*constant))
return true;
}
else
{
node_to_constants.emplace(expression, constant);
and_operands.push_back(argument);
}
return false;
};
bool collapse_to_false = false;
if (const auto * lhs_literal = lhs->as<ConstantNode>())
collapse_to_false = has_and_with_different_constant(rhs, lhs_literal);
else if (const auto * rhs_literal = rhs->as<ConstantNode>())
collapse_to_false = has_and_with_different_constant(lhs, rhs_literal);
else
and_operands.push_back(argument);
if (collapse_to_false)
{
auto false_value = std::make_shared<ConstantValue>(0u, function_node.getResultType());
auto false_node = std::make_shared<ConstantNode>(std::move(false_value));
node = std::move(false_node);
return;
}
}
if (and_operands.size() == 1)
{
/// AND operator can have UInt8 or bool as its type.
/// bool is used if a bool constant is at least one operand.
/// Because we reduce the number of operands here by eliminating the same equality checks,
/// the only situation we can end up here is we had AND check where all the equality checks are the same so we know the type is UInt8.
/// Otherwise, we will have > 1 operands and we don't have to do anything.
assert(!function_node.getResultType()->isNullable() && and_operands[0]->getResultType()->equals(*function_node.getResultType()));
node = std::move(and_operands[0]);
return;
}
auto and_function_resolver = FunctionFactory::instance().get("and", getContext());
function_node.getArguments().getNodes() = std::move(and_operands);
function_node.resolveAsFunction(and_function_resolver);
}
void tryReplaceOrEqualsChainWithIn(QueryTreeNodePtr & node)
{
auto & function_node = node->as<FunctionNode &>();
assert(function_node.getFunctionName() == "or");
QueryTreeNodes or_operands;
QueryTreeNodePtrWithHashMap<QueryTreeNodes> node_to_equals_functions;
QueryTreeNodePtrWithHashMap<QueryTreeNodeConstRawPtrWithHashSet> node_to_constants;
for (const auto & argument : function_node.getArguments())
{
auto * argument_function = argument->as<FunctionNode>();
if (!argument_function || argument_function->getFunctionName() != "equals")
{
or_operands.push_back(argument);
continue;
}
/// collect all equality checks (x = value)
const auto & equals_arguments = argument_function->getArguments().getNodes();
const auto & lhs = equals_arguments[0];
const auto & rhs = equals_arguments[1];
const auto add_equals_function_if_not_present = [&](const auto & expression_node, const ConstantNode * constant)
{
auto & constant_set = node_to_constants[expression_node];
if (!constant_set.contains(constant))
{
constant_set.insert(constant);
node_to_equals_functions[expression_node].push_back(argument);
}
};
if (const auto * lhs_literal = lhs->as<ConstantNode>())
add_equals_function_if_not_present(rhs, lhs_literal);
else if (const auto * rhs_literal = rhs->as<ConstantNode>())
add_equals_function_if_not_present(lhs, rhs_literal);
else
or_operands.push_back(argument);
}
auto in_function_resolver = FunctionFactory::instance().get("in", getContext());
for (auto & [expression, equals_functions] : node_to_equals_functions)
{
const auto & settings = getSettings();
if (equals_functions.size() < settings.optimize_min_equality_disjunction_chain_length && !expression.node->getResultType()->lowCardinality())
{
std::move(equals_functions.begin(), equals_functions.end(), std::back_inserter(or_operands));
continue;
}
Tuple args;
args.reserve(equals_functions.size());
/// first we create tuple from RHS of equals functions
for (const auto & equals : equals_functions)
{
const auto * equals_function = equals->as<FunctionNode>();
assert(equals_function && equals_function->getFunctionName() == "equals");
const auto & equals_arguments = equals_function->getArguments().getNodes();
if (const auto * rhs_literal = equals_arguments[1]->as<ConstantNode>())
{
args.push_back(rhs_literal->getValue());
}
else
{
const auto * lhs_literal = equals_arguments[0]->as<ConstantNode>();
assert(lhs_literal);
args.push_back(lhs_literal->getValue());
}
}
auto rhs_node = std::make_shared<ConstantNode>(std::move(args));
auto in_function = std::make_shared<FunctionNode>("in");
QueryTreeNodes in_arguments;
in_arguments.reserve(2);
in_arguments.push_back(expression.node);
in_arguments.push_back(std::move(rhs_node));
in_function->getArguments().getNodes() = std::move(in_arguments);
in_function->resolveAsFunction(in_function_resolver);
or_operands.push_back(std::move(in_function));
}
if (or_operands.size() == 1)
{
/// if the result type of operand is the same as the result type of OR
/// we can replace OR with the operand
if (or_operands[0]->getResultType()->equals(*function_node.getResultType()))
{
assert(!function_node.getResultType()->isNullable());
node = std::move(or_operands[0]);
return;
}
/// otherwise add a stub 0 to make OR correct
or_operands.push_back(std::make_shared<ConstantNode>(static_cast<UInt8>(0)));
}
auto or_function_resolver = FunctionFactory::instance().get("or", getContext());
function_node.getArguments().getNodes() = std::move(or_operands);
function_node.resolveAsFunction(or_function_resolver);
}
};
void LogicalExpressionOptimizerPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
{
LogicalExpressionOptimizerVisitor visitor(std::move(context));
visitor.visit(query_tree_node);
}
}

View File

@ -0,0 +1,82 @@
#pragma once
#include <Analyzer/IQueryTreePass.h>
namespace DB
{
/**
* This pass tries to do optimizations on logical expression:
*
* 1. Replaces chains of equality functions inside an OR with a single IN operator.
* The replacement is done if:
* - one of the operands of the equality function is a constant
* - length of chain is at least 'optimize_min_equality_disjunction_chain_length' long OR the expression has type of LowCardinality
*
* E.g. (optimize_min_equality_disjunction_chain_length = 2)
* -------------------------------
* SELECT *
* FROM table
* WHERE a = 1 OR b = 'test' OR a = 2;
*
* will be transformed into
*
* SELECT *
* FROM TABLE
* WHERE b = 'test' OR a IN (1, 2);
* -------------------------------
*
* 2. Removes duplicate OR checks
* -------------------------------
* SELECT *
* FROM table
* WHERE a = 1 OR b = 'test' OR a = 1;
*
* will be transformed into
*
* SELECT *
* FROM TABLE
* WHERE a = 1 OR b = 'test';
* -------------------------------
*
* 3. Replaces AND chains with a single constant.
* The replacement is done if:
* - one of the operands of the equality function is a constant
* - constants are different for same expression
* -------------------------------
* SELECT *
* FROM table
* WHERE a = 1 AND b = 'test' AND a = 2;
*
* will be transformed into
*
* SELECT *
* FROM TABLE
* WHERE 0;
* -------------------------------
*
* 4. Removes duplicate AND checks
* -------------------------------
* SELECT *
* FROM table
* WHERE a = 1 AND b = 'test' AND a = 1;
*
* will be transformed into
*
* SELECT *
* FROM TABLE
* WHERE a = 1 AND b = 'test';
* -------------------------------
*/
class LogicalExpressionOptimizerPass final : public IQueryTreePass
{
public:
String getName() override { return "LogicalExpressionOptimizer"; }
String getDescription() override { return "Transform equality chain to a single IN function or a constant if possible"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
};
}

View File

@ -199,7 +199,6 @@ namespace ErrorCodes
* TODO: SELECT (compound_expression).*, (compound_expression).COLUMNS are not supported on parser level.
* TODO: SELECT a.b.c.*, a.b.c.COLUMNS. Qualified matcher where identifier size is greater than 2 are not supported on parser level.
* TODO: Support function identifier resolve from parent query scope, if lambda in parent scope does not capture any columns.
* TODO: Support group_by_use_nulls.
* TODO: Scalar subqueries cache.
*/
@ -472,6 +471,12 @@ public:
alias_name_to_expressions[node_alias].push_back(node);
}
if (const auto * function = node->as<FunctionNode>())
{
if (AggregateFunctionFactory::instance().isAggregateFunctionName(function->getFunctionName()))
++aggregate_functions_counter;
}
expressions.emplace_back(node);
}
@ -490,6 +495,12 @@ public:
alias_name_to_expressions.erase(it);
}
if (const auto * function = top_expression->as<FunctionNode>())
{
if (AggregateFunctionFactory::instance().isAggregateFunctionName(function->getFunctionName()))
--aggregate_functions_counter;
}
expressions.pop_back();
}
@ -508,6 +519,11 @@ public:
return alias_name_to_expressions.contains(alias);
}
bool hasAggregateFunction() const
{
return aggregate_functions_counter > 0;
}
QueryTreeNodePtr getExpressionWithAlias(const std::string & alias) const
{
auto expression_it = alias_name_to_expressions.find(alias);
@ -554,6 +570,7 @@ public:
private:
QueryTreeNodes expressions;
size_t aggregate_functions_counter = 0;
std::unordered_map<std::string, QueryTreeNodes> alias_name_to_expressions;
};
@ -686,7 +703,11 @@ struct IdentifierResolveScope
if (auto * union_node = scope_node->as<UnionNode>())
context = union_node->getContext();
else if (auto * query_node = scope_node->as<QueryNode>())
{
context = query_node->getContext();
group_by_use_nulls = context->getSettingsRef().group_by_use_nulls &&
(query_node->isGroupByWithGroupingSets() || query_node->isGroupByWithRollup() || query_node->isGroupByWithCube());
}
}
QueryTreeNodePtr scope_node;
@ -734,9 +755,14 @@ struct IdentifierResolveScope
/// Table expression node to data
std::unordered_map<QueryTreeNodePtr, TableExpressionData> table_expression_node_to_data;
QueryTreeNodePtrWithHashSet nullable_group_by_keys;
/// Use identifier lookup to result cache
bool use_identifier_lookup_to_result_cache = true;
/// Apply nullability to aggregation keys
bool group_by_use_nulls = false;
/// JOINs count
size_t joins_count = 0;
@ -5407,10 +5433,18 @@ ProjectionNames QueryAnalyzer::resolveExpressionNode(QueryTreeNodePtr & node, Id
}
}
if (node
&& scope.nullable_group_by_keys.contains(node)
&& !scope.expressions_in_resolve_process_stack.hasAggregateFunction())
{
node = node->clone();
node->convertToNullable();
}
/** Update aliases after expression node was resolved.
* Do not update node in alias table if we resolve it for duplicate alias.
*/
if (!node_alias.empty() && use_alias_table)
if (!node_alias.empty() && use_alias_table && !scope.group_by_use_nulls)
{
auto it = scope.alias_name_to_expression_node.find(node_alias);
if (it != scope.alias_name_to_expression_node.end())
@ -6418,9 +6452,6 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
auto & query_node_typed = query_node->as<QueryNode &>();
const auto & settings = scope.context->getSettingsRef();
if (settings.group_by_use_nulls)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "GROUP BY use nulls is not supported");
bool is_rollup_or_cube = query_node_typed.isGroupByWithRollup() || query_node_typed.isGroupByWithCube();
if (query_node_typed.isGroupByWithGroupingSets() && query_node_typed.isGroupByWithTotals())
@ -6556,16 +6587,11 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
resolveQueryJoinTreeNode(query_node_typed.getJoinTree(), scope, visitor);
}
scope.use_identifier_lookup_to_result_cache = true;
if (!scope.group_by_use_nulls)
scope.use_identifier_lookup_to_result_cache = true;
/// Resolve query node sections.
auto projection_columns = resolveProjectionExpressionNodeList(query_node_typed.getProjectionNode(), scope);
if (query_node_typed.getProjection().getNodes().empty())
throw Exception(ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED,
"Empty list of columns in projection. In scope {}",
scope.scope_node->formatASTForErrorMessage());
if (query_node_typed.hasWith())
resolveExpressionNodeList(query_node_typed.getWithNode(), scope, true /*allow_lambda_expression*/, false /*allow_table_expression*/);
@ -6586,6 +6612,15 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
resolveExpressionNodeList(grouping_sets_keys_list_node, scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
}
if (scope.group_by_use_nulls)
{
for (const auto & grouping_set : query_node_typed.getGroupBy().getNodes())
{
for (const auto & group_by_elem : grouping_set->as<ListNode>()->getNodes())
scope.nullable_group_by_keys.insert(group_by_elem);
}
}
}
else
{
@ -6593,6 +6628,12 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
replaceNodesWithPositionalArguments(query_node_typed.getGroupByNode(), query_node_typed.getProjection().getNodes(), scope);
resolveExpressionNodeList(query_node_typed.getGroupByNode(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
if (scope.group_by_use_nulls)
{
for (const auto & group_by_elem : query_node_typed.getGroupBy().getNodes())
scope.nullable_group_by_keys.insert(group_by_elem);
}
}
}
@ -6645,6 +6686,12 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
convertLimitOffsetExpression(query_node_typed.getOffset(), "OFFSET", scope);
}
auto projection_columns = resolveProjectionExpressionNodeList(query_node_typed.getProjectionNode(), scope);
if (query_node_typed.getProjection().getNodes().empty())
throw Exception(ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED,
"Empty list of columns in projection. In scope {}",
scope.scope_node->formatASTForErrorMessage());
/** Resolve nodes with duplicate aliases.
* Table expressions cannot have duplicate aliases.
*
@ -6708,7 +6755,7 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
"ARRAY JOIN",
"in PREWHERE");
validateAggregates(query_node);
validateAggregates(query_node, { .group_by_use_nulls = scope.group_by_use_nulls });
/** WITH section can be safely removed, because WITH section only can provide aliases to query expressions
* and CTE for other sections to use.

View File

@ -6,6 +6,9 @@ namespace DB
{
/** Rewrite _shard_num column into shardNum() function.
*
* Example: SELECT _shard_num FROM distributed_table;
* Result: SELECT shardNum() FROM distributed_table;
*/
class ShardNumColumnToFunctionPass final : public IQueryTreePass
{

View File

@ -38,6 +38,7 @@
#include <Analyzer/Passes/AutoFinalOnQueryPass.h>
#include <Analyzer/Passes/ArrayExistsToHasPass.h>
#include <Analyzer/Passes/ComparisonTupleEliminationPass.h>
#include <Analyzer/Passes/LogicalExpressionOptimizerPass.h>
#include <Analyzer/Passes/CrossToInnerJoinPass.h>
#include <Analyzer/Passes/ShardNumColumnToFunctionPass.h>
@ -147,7 +148,6 @@ private:
/** ClickHouse query tree pass manager.
*
* TODO: Support logical expressions optimizer.
* TODO: Support setting convert_query_to_cnf.
* TODO: Support setting optimize_using_constraints.
* TODO: Support setting optimize_substitute_columns.
@ -262,6 +262,8 @@ void addQueryTreePasses(QueryTreePassManager & manager)
manager.addPass(std::make_unique<ConvertOrLikeChainPass>());
manager.addPass(std::make_unique<LogicalExpressionOptimizerPass>());
manager.addPass(std::make_unique<GroupingFunctionsResolvePass>());
manager.addPass(std::make_unique<AutoFinalOnQueryPass>());
manager.addPass(std::make_unique<CrossToInnerJoinPass>());

View File

@ -105,7 +105,7 @@ private:
const QueryTreeNodePtr & query_node;
};
void validateAggregates(const QueryTreeNodePtr & query_node)
void validateAggregates(const QueryTreeNodePtr & query_node, ValidationParams params)
{
const auto & query_node_typed = query_node->as<QueryNode &>();
auto join_tree_node_type = query_node_typed.getJoinTree()->getNodeType();
@ -182,7 +182,9 @@ void validateAggregates(const QueryTreeNodePtr & query_node)
if (grouping_set_key->as<ConstantNode>())
continue;
group_by_keys_nodes.push_back(grouping_set_key);
group_by_keys_nodes.push_back(grouping_set_key->clone());
if (params.group_by_use_nulls)
group_by_keys_nodes.back()->convertToNullable();
}
}
else
@ -190,7 +192,9 @@ void validateAggregates(const QueryTreeNodePtr & query_node)
if (node->as<ConstantNode>())
continue;
group_by_keys_nodes.push_back(node);
group_by_keys_nodes.push_back(node->clone());
if (params.group_by_use_nulls)
group_by_keys_nodes.back()->convertToNullable();
}
}

View File

@ -5,6 +5,11 @@
namespace DB
{
struct ValidationParams
{
bool group_by_use_nulls;
};
/** Validate aggregates in query node.
*
* 1. Check that there are no aggregate functions and GROUPING function in JOIN TREE, WHERE, PREWHERE, in another aggregate functions.
@ -15,7 +20,7 @@ namespace DB
* PROJECTION.
* 5. Throws exception if there is GROUPING SETS or ROLLUP or CUBE or WITH TOTALS without aggregation.
*/
void validateAggregates(const QueryTreeNodePtr & query_node);
void validateAggregates(const QueryTreeNodePtr & query_node, ValidationParams params);
/** Assert that there are no function nodes with specified function name in node children.
* Do not visit subqueries.

View File

@ -6,7 +6,7 @@
#include <IO/WriteHelpers.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/escapeForFileName.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <Backups/BackupCoordinationStage.h>

View File

@ -6,7 +6,7 @@
#include <Backups/BackupCoordinationLocal.h>
#include <Backups/BackupCoordinationRemote.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <Common/quoteString.h>
#include <Common/XMLUtils.h>
#include <Interpreters/Context.h>

View File

@ -2,7 +2,7 @@
#include <optional>
#include <fmt/format.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <Core/Types.h>

View File

@ -14,7 +14,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <Common/WeakHash.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <unordered_map>
#include <iostream>

View File

@ -1,6 +1,6 @@
#pragma once
#include <Common/hex.h>
#include <base/hex.h>
namespace DB
{

View File

@ -3,7 +3,7 @@
#include <random>
#include <base/getThreadId.h>
#include <Common/Exception.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <Core/Settings.h>
#include <IO/Operators.h>

View File

@ -1,7 +1,7 @@
#if defined(__ELF__) && !defined(OS_FREEBSD)
#include <Common/SymbolIndex.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <algorithm>
#include <optional>

View File

@ -1,4 +1,4 @@
#include <Common/hex.h>
#include <base/hex.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/escapeForFileName.h>

View File

@ -383,6 +383,14 @@ bool isSymlink(const fs::path & path)
return fs::is_symlink(path); /// STYLE_CHECK_ALLOW_STD_FS_SYMLINK
}
bool isSymlinkNoThrow(const fs::path & path)
{
std::error_code dummy;
if (path.filename().empty())
return fs::is_symlink(path.parent_path(), dummy); /// STYLE_CHECK_ALLOW_STD_FS_SYMLINK
return fs::is_symlink(path, dummy); /// STYLE_CHECK_ALLOW_STD_FS_SYMLINK
}
fs::path readSymlink(const fs::path & path)
{
/// See the comment for isSymlink

View File

@ -95,6 +95,7 @@ void setModificationTime(const std::string & path, time_t time);
time_t getChangeTime(const std::string & path);
bool isSymlink(const fs::path & path);
bool isSymlinkNoThrow(const fs::path & path);
fs::path readSymlink(const fs::path & path);
}

View File

@ -1,5 +1,5 @@
#include <Common/formatIPv6.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <Common/StringUtils/StringUtils.h>
#include <base/range.h>

View File

@ -7,7 +7,7 @@
#include <utility>
#include <base/range.h>
#include <base/unaligned.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <Common/StringUtils/StringUtils.h>
constexpr size_t IPV4_BINARY_LENGTH = 4;

View File

@ -4,7 +4,7 @@
#include <link.h>
#include <array>
#include <Common/hex.h>
#include <base/hex.h>
static int callback(dl_phdr_info * info, size_t, void * data)

View File

@ -4,7 +4,7 @@
#if defined(OS_LINUX)
#include <Common/StringUtils/StringUtils.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>

View File

@ -1,92 +0,0 @@
#include <Common/hex.h>
const char * const hex_digit_to_char_uppercase_table = "0123456789ABCDEF";
const char * const hex_digit_to_char_lowercase_table = "0123456789abcdef";
const char * const hex_byte_to_char_uppercase_table =
"000102030405060708090A0B0C0D0E0F"
"101112131415161718191A1B1C1D1E1F"
"202122232425262728292A2B2C2D2E2F"
"303132333435363738393A3B3C3D3E3F"
"404142434445464748494A4B4C4D4E4F"
"505152535455565758595A5B5C5D5E5F"
"606162636465666768696A6B6C6D6E6F"
"707172737475767778797A7B7C7D7E7F"
"808182838485868788898A8B8C8D8E8F"
"909192939495969798999A9B9C9D9E9F"
"A0A1A2A3A4A5A6A7A8A9AAABACADAEAF"
"B0B1B2B3B4B5B6B7B8B9BABBBCBDBEBF"
"C0C1C2C3C4C5C6C7C8C9CACBCCCDCECF"
"D0D1D2D3D4D5D6D7D8D9DADBDCDDDEDF"
"E0E1E2E3E4E5E6E7E8E9EAEBECEDEEEF"
"F0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF";
const char * const hex_byte_to_char_lowercase_table =
"000102030405060708090a0b0c0d0e0f"
"101112131415161718191a1b1c1d1e1f"
"202122232425262728292a2b2c2d2e2f"
"303132333435363738393a3b3c3d3e3f"
"404142434445464748494a4b4c4d4e4f"
"505152535455565758595a5b5c5d5e5f"
"606162636465666768696a6b6c6d6e6f"
"707172737475767778797a7b7c7d7e7f"
"808182838485868788898a8b8c8d8e8f"
"909192939495969798999a9b9c9d9e9f"
"a0a1a2a3a4a5a6a7a8a9aaabacadaeaf"
"b0b1b2b3b4b5b6b7b8b9babbbcbdbebf"
"c0c1c2c3c4c5c6c7c8c9cacbcccdcecf"
"d0d1d2d3d4d5d6d7d8d9dadbdcdddedf"
"e0e1e2e3e4e5e6e7e8e9eaebecedeeef"
"f0f1f2f3f4f5f6f7f8f9fafbfcfdfeff";
const char * const hex_char_to_digit_table =
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\xff\xff\xff\xff\xff\xff" //0-9
"\xff\x0a\x0b\x0c\x0d\x0e\x0f\xff\xff\xff\xff\xff\xff\xff\xff\xff" //A-Z
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\x0a\x0b\x0c\x0d\x0e\x0f\xff\xff\xff\xff\xff\xff\xff\xff\xff" //a-z
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff";
const char * const bin_byte_to_char_table =
"0000000000000001000000100000001100000100000001010000011000000111"
"0000100000001001000010100000101100001100000011010000111000001111"
"0001000000010001000100100001001100010100000101010001011000010111"
"0001100000011001000110100001101100011100000111010001111000011111"
"0010000000100001001000100010001100100100001001010010011000100111"
"0010100000101001001010100010101100101100001011010010111000101111"
"0011000000110001001100100011001100110100001101010011011000110111"
"0011100000111001001110100011101100111100001111010011111000111111"
"0100000001000001010000100100001101000100010001010100011001000111"
"0100100001001001010010100100101101001100010011010100111001001111"
"0101000001010001010100100101001101010100010101010101011001010111"
"0101100001011001010110100101101101011100010111010101111001011111"
"0110000001100001011000100110001101100100011001010110011001100111"
"0110100001101001011010100110101101101100011011010110111001101111"
"0111000001110001011100100111001101110100011101010111011001110111"
"0111100001111001011110100111101101111100011111010111111001111111"
"1000000010000001100000101000001110000100100001011000011010000111"
"1000100010001001100010101000101110001100100011011000111010001111"
"1001000010010001100100101001001110010100100101011001011010010111"
"1001100010011001100110101001101110011100100111011001111010011111"
"1010000010100001101000101010001110100100101001011010011010100111"
"1010100010101001101010101010101110101100101011011010111010101111"
"1011000010110001101100101011001110110100101101011011011010110111"
"1011100010111001101110101011101110111100101111011011111010111111"
"1100000011000001110000101100001111000100110001011100011011000111"
"1100100011001001110010101100101111001100110011011100111011001111"
"1101000011010001110100101101001111010100110101011101011011010111"
"1101100011011001110110101101101111011100110111011101111011011111"
"1110000011100001111000101110001111100100111001011110011011100111"
"1110100011101001111010101110101111101100111011011110111011101111"
"1111000011110001111100101111001111110100111101011111011011110111"
"1111100011111001111110101111101111111100111111011111111011111111";

View File

@ -1,145 +0,0 @@
#pragma once
#include <string>
/// Maps 0..15 to 0..9A..F or 0..9a..f correspondingly.
extern const char * const hex_digit_to_char_uppercase_table;
extern const char * const hex_digit_to_char_lowercase_table;
inline char hexDigitUppercase(unsigned char c)
{
return hex_digit_to_char_uppercase_table[c];
}
inline char hexDigitLowercase(unsigned char c)
{
return hex_digit_to_char_lowercase_table[c];
}
#include <cstring>
#include <cstddef>
#include <base/types.h>
/// Maps 0..255 to 00..FF or 00..ff correspondingly
extern const char * const hex_byte_to_char_uppercase_table;
extern const char * const hex_byte_to_char_lowercase_table;
inline void writeHexByteUppercase(UInt8 byte, void * out)
{
memcpy(out, &hex_byte_to_char_uppercase_table[static_cast<size_t>(byte) * 2], 2);
}
inline void writeHexByteLowercase(UInt8 byte, void * out)
{
memcpy(out, &hex_byte_to_char_lowercase_table[static_cast<size_t>(byte) * 2], 2);
}
extern const char * const bin_byte_to_char_table;
inline void writeBinByte(UInt8 byte, void * out)
{
memcpy(out, &bin_byte_to_char_table[static_cast<size_t>(byte) * 8], 8);
}
/// Produces hex representation of an unsigned int with leading zeros (for checksums)
template <typename TUInt>
inline void writeHexUIntImpl(TUInt uint_, char * out, const char * const table)
{
union
{
TUInt value;
UInt8 uint8[sizeof(TUInt)];
};
value = uint_;
for (size_t i = 0; i < sizeof(TUInt); ++i)
{
if constexpr (std::endian::native == std::endian::little)
memcpy(out + i * 2, &table[static_cast<size_t>(uint8[sizeof(TUInt) - 1 - i]) * 2], 2);
else
memcpy(out + i * 2, &table[static_cast<size_t>(uint8[i]) * 2], 2);
}
}
template <typename TUInt>
inline void writeHexUIntUppercase(TUInt uint_, char * out)
{
writeHexUIntImpl(uint_, out, hex_byte_to_char_uppercase_table);
}
template <typename TUInt>
inline void writeHexUIntLowercase(TUInt uint_, char * out)
{
writeHexUIntImpl(uint_, out, hex_byte_to_char_lowercase_table);
}
template <typename TUInt>
std::string getHexUIntUppercase(TUInt uint_)
{
std::string res(sizeof(TUInt) * 2, '\0');
writeHexUIntUppercase(uint_, res.data());
return res;
}
template <typename TUInt>
std::string getHexUIntLowercase(TUInt uint_)
{
std::string res(sizeof(TUInt) * 2, '\0');
writeHexUIntLowercase(uint_, res.data());
return res;
}
/// Maps 0..9, A..F, a..f to 0..15. Other chars are mapped to implementation specific value.
extern const char * const hex_char_to_digit_table;
inline UInt8 unhex(char c)
{
return hex_char_to_digit_table[static_cast<UInt8>(c)];
}
inline UInt8 unhex2(const char * data)
{
return
static_cast<UInt8>(unhex(data[0])) * 0x10
+ static_cast<UInt8>(unhex(data[1]));
}
inline UInt16 unhex4(const char * data)
{
return
static_cast<UInt16>(unhex(data[0])) * 0x1000
+ static_cast<UInt16>(unhex(data[1])) * 0x100
+ static_cast<UInt16>(unhex(data[2])) * 0x10
+ static_cast<UInt16>(unhex(data[3]));
}
template <typename TUInt>
TUInt unhexUInt(const char * data)
{
TUInt res = 0;
if constexpr ((sizeof(TUInt) <= 8) || ((sizeof(TUInt) % 8) != 0))
{
for (size_t i = 0; i < sizeof(TUInt) * 2; ++i, ++data)
{
res <<= 4;
res += unhex(*data);
}
}
else
{
for (size_t i = 0; i < sizeof(TUInt) / 8; ++i, data += 16)
{
res <<= 64;
res += unhexUInt<UInt64>(data);
}
}
return res;
}

View File

@ -1,20 +0,0 @@
#pragma once
#include <cmath>
inline double interpolateLinear(double min, double max, double ratio)
{
return min + (max - min) * ratio;
}
/** It is linear interpolation in logarithmic coordinates.
* Exponential interpolation is related to linear interpolation
* exactly in same way as geometric mean is related to arithmetic mean.
* 'min' must be greater than zero, 'ratio' must be from 0 to 1.
*/
inline double interpolateExponential(double min, double max, double ratio)
{
return min * std::pow(max / min, ratio);
}

View File

@ -6,7 +6,7 @@
#include <city.h>
#include <Common/ProfileEvents.h>
#include <Common/Exception.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionFactory.h>
#include <IO/ReadBuffer.h>

View File

@ -8,7 +8,7 @@
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Common/hex.h>
#include <base/hex.h>
namespace DB

View File

@ -4,7 +4,7 @@
#include <Poco/Path.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <Common/setThreadName.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/checkStackSize.h>

View File

@ -11,7 +11,7 @@
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Common/LockMemoryExceptionInThread.h>

View File

@ -147,6 +147,8 @@ class IColumn;
M(UInt64, max_parallel_replicas, 1, "The maximum number of replicas of each shard used when the query is executed. For consistency (to get different parts of the same partition), this option only works for the specified sampling key. The lag of the replicas is not controlled.", 0) \
M(UInt64, parallel_replicas_count, 0, "This is internal setting that should not be used directly and represents an implementation detail of the 'parallel replicas' mode. This setting will be automatically set up by the initiator server for distributed queries to the number of parallel replicas participating in query processing.", 0) \
M(UInt64, parallel_replica_offset, 0, "This is internal setting that should not be used directly and represents an implementation detail of the 'parallel replicas' mode. This setting will be automatically set up by the initiator server for distributed queries to the index of the replica participating in query processing among parallel replicas.", 0) \
M(String, parallel_replicas_custom_key, "", "Custom key assigning work to replicas when parallel replicas are used.", 0) \
M(ParallelReplicasCustomKeyFilterType, parallel_replicas_custom_key_filter_type, ParallelReplicasCustomKeyFilterType::DEFAULT, "Type of filter to use with custom key for parallel replicas. default - use modulo operation on the custom key, range - use range filter on custom key using all possible values for the value type of custom key.", 0) \
\
M(String, cluster_for_parallel_replicas, "default", "Cluster for a shard in which current server is located", 0) \
M(Bool, allow_experimental_parallel_reading_from_replicas, false, "If true, ClickHouse will send a SELECT query to all replicas of a table. It will work for any kind on MergeTree table.", 0) \
@ -514,6 +516,7 @@ class IColumn;
M(Bool, allow_experimental_alter_materialized_view_structure, false, "Allow atomic alter on Materialized views. Work in progress.", 0) \
M(Bool, enable_early_constant_folding, true, "Enable query optimization where we analyze function and subqueries results and rewrite query if there're constants there", 0) \
M(Bool, deduplicate_blocks_in_dependent_materialized_views, false, "Should deduplicate blocks for materialized views if the block is not a duplicate for the table. Use true to always deduplicate in dependent tables.", 0) \
M(Bool, materialized_views_ignore_errors, false, "Allows to ignore errors for MATERIALIZED VIEW, and deliver original block to the table regardless of MVs", 0) \
M(Bool, use_compact_format_in_distributed_parts_names, true, "Changes format of directories names for distributed table insert parts.", 0) \
M(Bool, validate_polygons, true, "Throw exception if polygon is invalid in function pointInPolygon (e.g. self-tangent, self-intersecting). If the setting is false, the function will accept invalid polygons but may silently return wrong result.", 0) \
M(UInt64, max_parser_depth, DBMS_DEFAULT_MAX_PARSER_DEPTH, "Maximum parser depth (recursion depth of recursive descend parser).", 0) \
@ -864,6 +867,7 @@ class IColumn;
M(Bool, output_format_parquet_string_as_string, false, "Use Parquet String type instead of Binary for String columns.", 0) \
M(Bool, output_format_parquet_fixed_string_as_fixed_byte_array, true, "Use Parquet FIXED_LENGTH_BYTE_ARRAY type instead of Binary for FixedString columns.", 0) \
M(ParquetVersion, output_format_parquet_version, "2.latest", "Parquet format version for output format. Supported versions: 1.0, 2.4, 2.6 and 2.latest (default)", 0) \
M(ParquetCompression, output_format_parquet_compression_method, "lz4", "Compression method for Parquet output format. Supported codecs: snappy, lz4, brotli, zstd, gzip, none (uncompressed)", 0) \
M(String, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy'.", 0) \
M(UInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \
M(String, output_format_avro_string_column_pattern, "", "For Avro format: regexp of String columns to select as AVRO string.", 0) \
@ -906,8 +910,10 @@ class IColumn;
M(Bool, output_format_arrow_low_cardinality_as_dictionary, false, "Enable output LowCardinality type as Dictionary Arrow type", 0) \
M(Bool, output_format_arrow_string_as_string, false, "Use Arrow String type instead of Binary for String columns", 0) \
M(Bool, output_format_arrow_fixed_string_as_fixed_byte_array, true, "Use Arrow FIXED_SIZE_BINARY type instead of Binary for FixedString columns.", 0) \
M(ArrowCompression, output_format_arrow_compression_method, "lz4_frame", "Compression method for Arrow output format. Supported codecs: lz4_frame, zstd, none (uncompressed)", 0) \
\
M(Bool, output_format_orc_string_as_string, false, "Use ORC String type instead of Binary for String columns", 0) \
M(ORCCompression, output_format_orc_compression_method, "lz4", "Compression method for ORC output format. Supported codecs: lz4, snappy, zlib, zstd, none (uncompressed)", 0) \
\
M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0) \
\

View File

@ -82,7 +82,10 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{
{"23.3", {{"output_format_parquet_version", "1.0", "2.latest", "Use latest Parquet format version for output format"},
{"input_format_json_ignore_unknown_keys_in_named_tuple", false, true, "Improve parsing JSON objects as named tuples"},
{"input_format_native_allow_types_conversion", false, true, "Allow types conversion in Native input forma"}}},
{"input_format_native_allow_types_conversion", false, true, "Allow types conversion in Native input forma"},
{"output_format_arrow_compression_method", "none", "lz4_frame", "Use lz4 compression in Arrow output format by default"},
{"output_format_parquet_compression_method", "snappy", "lz4", "Use lz4 compression in Parquet output format by default"},
{"output_format_orc_compression_method", "none", "lz4_frame", "Use lz4 compression in ORC output format by default"}}},
{"23.2", {{"output_format_parquet_fixed_string_as_fixed_byte_array", false, true, "Use Parquet FIXED_LENGTH_BYTE_ARRAY type for FixedString by default"},
{"output_format_arrow_fixed_string_as_fixed_byte_array", false, true, "Use Arrow FIXED_SIZE_BINARY type for FixedString by default"},
{"query_plan_remove_redundant_distinct", false, true, "Remove redundant Distinct step in query plan"},

View File

@ -158,7 +158,7 @@ IMPLEMENT_SETTING_ENUM(EscapingRule, ErrorCodes::BAD_ARGUMENTS,
{"XML", FormatSettings::EscapingRule::XML},
{"Raw", FormatSettings::EscapingRule::Raw}})
IMPLEMENT_SETTING_ENUM(MsgPackUUIDRepresentation , ErrorCodes::BAD_ARGUMENTS,
IMPLEMENT_SETTING_ENUM(MsgPackUUIDRepresentation, ErrorCodes::BAD_ARGUMENTS,
{{"bin", FormatSettings::MsgPackUUIDRepresentation::BIN},
{"str", FormatSettings::MsgPackUUIDRepresentation::STR},
{"ext", FormatSettings::MsgPackUUIDRepresentation::EXT}})
@ -167,16 +167,39 @@ IMPLEMENT_SETTING_ENUM(Dialect, ErrorCodes::BAD_ARGUMENTS,
{{"clickhouse", Dialect::clickhouse},
{"kusto", Dialect::kusto}})
IMPLEMENT_SETTING_ENUM(ParallelReplicasCustomKeyFilterType, ErrorCodes::BAD_ARGUMENTS,
{{"default", ParallelReplicasCustomKeyFilterType::DEFAULT},
{"range", ParallelReplicasCustomKeyFilterType::RANGE}})
IMPLEMENT_SETTING_ENUM(LocalFSReadMethod, ErrorCodes::BAD_ARGUMENTS,
{{"mmap", LocalFSReadMethod::mmap},
{"pread", LocalFSReadMethod::pread},
{"read", LocalFSReadMethod::read}})
IMPLEMENT_SETTING_ENUM_WITH_RENAME(ParquetVersion, ErrorCodes::BAD_ARGUMENTS,
{{"1.0", FormatSettings::ParquetVersion::V1_0},
{"2.4", FormatSettings::ParquetVersion::V2_4},
{"2.6", FormatSettings::ParquetVersion::V2_6},
{"2.latest", FormatSettings::ParquetVersion::V2_LATEST}})
IMPLEMENT_SETTING_ENUM(ParquetCompression, ErrorCodes::BAD_ARGUMENTS,
{{"none", FormatSettings::ParquetCompression::NONE},
{"snappy", FormatSettings::ParquetCompression::SNAPPY},
{"zstd", FormatSettings::ParquetCompression::ZSTD},
{"gzip", FormatSettings::ParquetCompression::GZIP},
{"lz4", FormatSettings::ParquetCompression::LZ4},
{"brotli", FormatSettings::ParquetCompression::BROTLI}})
IMPLEMENT_SETTING_ENUM(ArrowCompression, ErrorCodes::BAD_ARGUMENTS,
{{"none", FormatSettings::ArrowCompression::NONE},
{"lz4_frame", FormatSettings::ArrowCompression::LZ4_FRAME},
{"zstd", FormatSettings::ArrowCompression::ZSTD}})
IMPLEMENT_SETTING_ENUM(ORCCompression, ErrorCodes::BAD_ARGUMENTS,
{{"none", FormatSettings::ORCCompression::NONE},
{"snappy", FormatSettings::ORCCompression::SNAPPY},
{"zstd", FormatSettings::ORCCompression::ZSTD},
{"zlib", FormatSettings::ORCCompression::ZLIB},
{"lz4", FormatSettings::ORCCompression::LZ4}})
}

View File

@ -194,6 +194,12 @@ DECLARE_SETTING_ENUM_WITH_RENAME(EscapingRule, FormatSettings::EscapingRule)
DECLARE_SETTING_ENUM_WITH_RENAME(MsgPackUUIDRepresentation, FormatSettings::MsgPackUUIDRepresentation)
DECLARE_SETTING_ENUM_WITH_RENAME(ParquetCompression, FormatSettings::ParquetCompression)
DECLARE_SETTING_ENUM_WITH_RENAME(ArrowCompression, FormatSettings::ArrowCompression)
DECLARE_SETTING_ENUM_WITH_RENAME(ORCCompression, FormatSettings::ORCCompression)
enum class Dialect
{
clickhouse,
@ -203,5 +209,13 @@ enum class Dialect
DECLARE_SETTING_ENUM(Dialect)
enum class ParallelReplicasCustomKeyFilterType : uint8_t
{
DEFAULT,
RANGE,
};
DECLARE_SETTING_ENUM(ParallelReplicasCustomKeyFilterType)
DECLARE_SETTING_ENUM(LocalFSReadMethod)
}

View File

@ -13,7 +13,7 @@
#include <Common/StackTrace.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Core/ServerUUID.h>
#include <Common/hex.h>
#include <base/hex.h>
#include "config.h"
#include "config_version.h"

View File

@ -273,7 +273,7 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
else
renameNoReplace(old_metadata_path, new_metadata_path);
/// After metadata was successfully moved, the following methods should not throw (if them do, it's a logical error)
/// After metadata was successfully moved, the following methods should not throw (if they do, it's a logical error)
table_data_path = detach(*this, table_name, table->storesDataOnDisk());
if (exchange)
other_table_data_path = detach(other_db, to_table_name, other_table->storesDataOnDisk());
@ -509,6 +509,9 @@ void DatabaseAtomic::tryCreateMetadataSymlink()
{
try
{
/// fs::exists could return false for broken symlink
if (FS::isSymlinkNoThrow(metadata_symlink))
fs::remove(metadata_symlink);
fs::create_directory_symlink(metadata_path, path_to_metadata_symlink);
}
catch (...)

View File

@ -4,7 +4,7 @@
#include <IO/ReadBufferFromFile.h>
#include <base/scope_guard.h>
#include <Common/assert_cast.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <Common/getRandomASCIIString.h>
#include <Interpreters/Context.h>

View File

@ -5,7 +5,7 @@
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
#include <Common/logger_useful.h>
#include <iostream>
#include <Common/hex.h>
#include <base/hex.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/Context.h>

View File

@ -6,9 +6,13 @@
#include <Disks/DiskSelector.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/isDiskFunction.h>
#include <Interpreters/Context.h>
#include <Parsers/IAST.h>
#include <Interpreters/InDepthNodeVisitor.h>
namespace DB
{
@ -18,43 +22,85 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
std::string getOrCreateDiskFromDiskAST(const ASTFunction & function, ContextPtr context)
namespace
{
/// We need a unique name for a created custom disk, but it needs to be the same
/// after table is reattached or server is restarted, so take a hash of the disk
/// configuration serialized ast as a disk name suffix.
auto disk_setting_string = serializeAST(function, true);
auto disk_name = DiskSelector::TMP_INTERNAL_DISK_PREFIX
+ toString(sipHash128(disk_setting_string.data(), disk_setting_string.size()));
auto result_disk = context->getOrCreateDisk(disk_name, [&](const DisksMap & disks_map) -> DiskPtr {
const auto * function_args_expr = assert_cast<const ASTExpressionList *>(function.arguments.get());
const auto & function_args = function_args_expr->children;
auto config = getDiskConfigurationFromAST(disk_name, function_args, context);
auto disk = DiskFactory::instance().create(disk_name, *config, disk_name, context, disks_map);
/// Mark that disk can be used without storage policy.
disk->markDiskAsCustom();
return disk;
});
if (!result_disk->isRemote())
std::string getOrCreateDiskFromDiskAST(const ASTFunction & function, ContextPtr context)
{
static constexpr auto custom_disks_base_dir_in_config = "custom_local_disks_base_directory";
auto disk_path_expected_prefix = context->getConfigRef().getString(custom_disks_base_dir_in_config, "");
/// We need a unique name for a created custom disk, but it needs to be the same
/// after table is reattached or server is restarted, so take a hash of the disk
/// configuration serialized ast as a disk name suffix.
auto disk_setting_string = serializeAST(function, true);
auto disk_name = DiskSelector::TMP_INTERNAL_DISK_PREFIX
+ toString(sipHash128(disk_setting_string.data(), disk_setting_string.size()));
if (disk_path_expected_prefix.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Base path for custom local disks must be defined in config file by `{}`",
custom_disks_base_dir_in_config);
auto result_disk = context->getOrCreateDisk(disk_name, [&](const DisksMap & disks_map) -> DiskPtr {
const auto * function_args_expr = assert_cast<const ASTExpressionList *>(function.arguments.get());
const auto & function_args = function_args_expr->children;
auto config = getDiskConfigurationFromAST(disk_name, function_args, context);
auto disk = DiskFactory::instance().create(disk_name, *config, disk_name, context, disks_map);
/// Mark that disk can be used without storage policy.
disk->markDiskAsCustom();
return disk;
});
if (!pathStartsWith(result_disk->getPath(), disk_path_expected_prefix))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path of the custom local disk must be inside `{}` directory",
disk_path_expected_prefix);
if (!result_disk->isRemote())
{
static constexpr auto custom_disks_base_dir_in_config = "custom_local_disks_base_directory";
auto disk_path_expected_prefix = context->getConfigRef().getString(custom_disks_base_dir_in_config, "");
if (disk_path_expected_prefix.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Base path for custom local disks must be defined in config file by `{}`",
custom_disks_base_dir_in_config);
if (!pathStartsWith(result_disk->getPath(), disk_path_expected_prefix))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path of the custom local disk must be inside `{}` directory",
disk_path_expected_prefix);
}
return disk_name;
}
class DiskConfigurationFlattener
{
public:
struct Data
{
ContextPtr context;
};
static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return true; }
static void visit(ASTPtr & ast, Data & data)
{
if (isDiskFunction(ast))
{
auto disk_name = getOrCreateDiskFromDiskAST(*ast->as<ASTFunction>(), data.context);
ast = std::make_shared<ASTLiteral>(disk_name);
}
}
};
/// Visits children first.
using FlattenDiskConfigurationVisitor = InDepthNodeVisitor<DiskConfigurationFlattener, false>;
}
std::string getOrCreateDiskFromDiskAST(const ASTPtr & disk_function, ContextPtr context)
{
if (!isDiskFunction(disk_function))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected a disk function");
auto ast = disk_function->clone();
FlattenDiskConfigurationVisitor::Data data{context};
FlattenDiskConfigurationVisitor{data}.visit(ast);
auto disk_name = assert_cast<const ASTLiteral &>(*ast).value.get<String>();
LOG_TRACE(&Poco::Logger::get("getOrCreateDiskFromDiskAST"), "Result disk name: {}", disk_name);
return disk_name;
}

View File

@ -13,6 +13,6 @@ class ASTFunction;
* add it to DiskSelector by a unique (but always the same for given configuration) disk name
* and return this name.
*/
std::string getOrCreateDiskFromDiskAST(const ASTFunction & function, ContextPtr context);
std::string getOrCreateDiskFromDiskAST(const ASTPtr & disk_function, ContextPtr context);
}

View File

@ -1,6 +1,6 @@
#include <Formats/BSONTypes.h>
#include <Common/Exception.h>
#include <Common/hex.h>
#include <base/hex.h>
namespace DB
{

View File

@ -118,6 +118,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array;
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method;
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
format_settings.pretty.color = settings.output_format_pretty_color;
format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;
@ -158,6 +159,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.arrow.case_insensitive_column_matching = settings.input_format_arrow_case_insensitive_column_matching;
format_settings.arrow.output_string_as_string = settings.output_format_arrow_string_as_string;
format_settings.arrow.output_fixed_string_as_fixed_byte_array = settings.output_format_arrow_fixed_string_as_fixed_byte_array;
format_settings.arrow.output_compression_method = settings.output_format_arrow_compression_method;
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
format_settings.orc.allow_missing_columns = settings.input_format_orc_allow_missing_columns;
format_settings.orc.row_batch_size = settings.input_format_orc_row_batch_size;
@ -168,6 +170,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.orc.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_orc_skip_columns_with_unsupported_types_in_schema_inference;
format_settings.orc.case_insensitive_column_matching = settings.input_format_orc_case_insensitive_column_matching;
format_settings.orc.output_string_as_string = settings.output_format_orc_string_as_string;
format_settings.orc.output_compression_method = settings.output_format_orc_compression_method;
format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
format_settings.capn_proto.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference;

View File

@ -86,6 +86,13 @@ struct FormatSettings
UInt64 max_parser_depth = DBMS_DEFAULT_MAX_PARSER_DEPTH;
enum class ArrowCompression
{
NONE,
LZ4_FRAME,
ZSTD
};
struct
{
UInt64 row_group_size = 1000000;
@ -96,6 +103,7 @@ struct FormatSettings
bool case_insensitive_column_matching = false;
bool output_string_as_string = false;
bool output_fixed_string_as_fixed_byte_array = true;
ArrowCompression output_compression_method = ArrowCompression::NONE;
} arrow;
struct
@ -183,6 +191,16 @@ struct FormatSettings
V2_LATEST,
};
enum class ParquetCompression
{
NONE,
SNAPPY,
ZSTD,
LZ4,
GZIP,
BROTLI,
};
struct
{
UInt64 row_group_size = 1000000;
@ -195,6 +213,7 @@ struct FormatSettings
bool output_fixed_string_as_fixed_byte_array = true;
UInt64 max_block_size = 8192;
ParquetVersion output_version;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
} parquet;
struct Pretty
@ -276,6 +295,15 @@ struct FormatSettings
bool accurate_types_of_literals = true;
} values;
enum class ORCCompression
{
NONE,
LZ4,
SNAPPY,
ZSTD,
ZLIB,
};
struct
{
bool import_nested = false;
@ -285,6 +313,7 @@ struct FormatSettings
bool case_insensitive_column_matching = false;
std::unordered_set<int> skip_stripes = {};
bool output_string_as_string = false;
ORCCompression output_compression_method = ORCCompression::NONE;
} orc;
/// For capnProto format we should determine how to

View File

@ -1,5 +1,5 @@
#include <Formats/verbosePrintString.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <IO/Operators.h>

View File

@ -26,7 +26,7 @@
#include <IO/WriteHelpers.h>
#include <Common/IPv6ToBinary.h>
#include <Common/formatIPv6.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <Common/typeid_cast.h>
#include <arpa/inet.h>

View File

@ -3,7 +3,7 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Common/BitHelpers.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFixedString.h>
#include <Functions/FunctionFactory.h>

View File

@ -1,4 +1,4 @@
#include <Common/hex.h>
#include <base/hex.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionStringToString.h>
#include <base/find_symbols.h>

View File

@ -1,6 +1,6 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionBinaryArithmetic.h>
#include <Common/hex.h>
#include <base/hex.h>
namespace DB
{

View File

@ -2,7 +2,7 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionStringToString.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <base/find_symbols.h>

View File

@ -39,13 +39,15 @@ struct RepeatImpl
size, max_string_size);
}
template <typename T>
static void vectorStrConstRepeat(
const ColumnString::Chars & data,
const ColumnString::Offsets & offsets,
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets,
UInt64 repeat_time)
T repeat_time)
{
repeat_time = repeat_time < 0 ? 0 : repeat_time;
checkRepeatTime(repeat_time);
UInt64 data_size = 0;
@ -77,7 +79,8 @@ struct RepeatImpl
res_offsets.assign(offsets);
for (UInt64 i = 0; i < col_num.size(); ++i)
{
size_t repeated_size = (offsets[i] - offsets[i - 1] - 1) * col_num[i] + 1;
T repeat_time = col_num[i] < 0 ? 0 : col_num[i];
size_t repeated_size = (offsets[i] - offsets[i - 1] - 1) * repeat_time + 1;
checkStringSize(repeated_size);
data_size += repeated_size;
res_offsets[i] = data_size;
@ -86,7 +89,7 @@ struct RepeatImpl
for (UInt64 i = 0; i < col_num.size(); ++i)
{
T repeat_time = col_num[i];
T repeat_time = col_num[i] < 0 ? 0 : col_num[i];
checkRepeatTime(repeat_time);
process(data.data() + offsets[i - 1], res_data.data() + res_offsets[i - 1], offsets[i] - offsets[i - 1], repeat_time);
}
@ -105,7 +108,8 @@ struct RepeatImpl
UInt64 col_size = col_num.size();
for (UInt64 i = 0; i < col_size; ++i)
{
size_t repeated_size = str_size * col_num[i] + 1;
T repeat_time = col_num[i] < 0 ? 0 : col_num[i];
size_t repeated_size = str_size * repeat_time + 1;
checkStringSize(repeated_size);
data_size += repeated_size;
res_offsets[i] = data_size;
@ -113,7 +117,7 @@ struct RepeatImpl
res_data.resize(data_size);
for (UInt64 i = 0; i < col_size; ++i)
{
T repeat_time = col_num[i];
T repeat_time = col_num[i] < 0 ? 0 : col_num[i];
checkRepeatTime(repeat_time);
process(
reinterpret_cast<UInt8 *>(const_cast<char *>(copy_str.data())),
@ -168,7 +172,8 @@ class FunctionRepeat : public IFunction
template <typename F>
static bool castType(const IDataType * type, F && f)
{
return castTypeToEither<DataTypeUInt8, DataTypeUInt16, DataTypeUInt32, DataTypeUInt64>(type, std::forward<F>(f));
return castTypeToEither<DataTypeInt8, DataTypeInt16, DataTypeInt32, DataTypeInt64,
DataTypeUInt8, DataTypeUInt16, DataTypeUInt32, DataTypeUInt64>(type, std::forward<F>(f));
}
public:
@ -186,7 +191,7 @@ public:
if (!isString(arguments[0]))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}",
arguments[0]->getName(), getName());
if (!isUnsignedInteger(arguments[1]))
if (!isInteger(arguments[1]))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}",
arguments[1]->getName(), getName());
return arguments[0];
@ -204,9 +209,15 @@ public:
{
if (const ColumnConst * scale_column_num = checkAndGetColumn<ColumnConst>(numcolumn.get()))
{
UInt64 repeat_time = scale_column_num->getValue<UInt64>();
auto col_res = ColumnString::create();
RepeatImpl::vectorStrConstRepeat(col->getChars(), col->getOffsets(), col_res->getChars(), col_res->getOffsets(), repeat_time);
castType(arguments[1].type.get(), [&](const auto & type)
{
using DataType = std::decay_t<decltype(type)>;
using T = typename DataType::FieldType;
T repeat_time = scale_column_num->getValue<T>();
RepeatImpl::vectorStrConstRepeat(col->getChars(), col->getOffsets(), col_res->getChars(), col_res->getOffsets(), repeat_time);
return true;
});
return col_res;
}
else if (castType(arguments[1].type.get(), [&](const auto & type)

View File

@ -70,7 +70,7 @@ private:
if (!has_prev_value)
{
dst[i] = is_first_line_zero ? 0 : src[i];
dst[i] = is_first_line_zero ? static_cast<Dst>(0) : static_cast<Dst>(src[i]);
prev = src[i];
has_prev_value = true;
}
@ -102,6 +102,10 @@ private:
f(UInt32());
else if (which.isUInt64())
f(UInt64());
else if (which.isUInt128())
f(UInt128());
else if (which.isUInt256())
f(UInt256());
else if (which.isInt8())
f(Int8());
else if (which.isInt16())
@ -110,6 +114,10 @@ private:
f(Int32());
else if (which.isInt64())
f(Int64());
else if (which.isInt128())
f(Int128());
else if (which.isInt256())
f(Int256());
else if (which.isFloat32())
f(Float32());
else if (which.isFloat64())

View File

@ -2,7 +2,7 @@
#include <IO/ReadHelpers.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <base/arithmeticOverflow.h>

View File

@ -1,5 +1,5 @@
#include <Core/Defines.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <Common/PODArray.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/memcpySmall.h>

View File

@ -2,7 +2,7 @@
#include <cinttypes>
#include <utility>
#include <Common/formatIPv6.h>
#include <Common/hex.h>
#include <base/hex.h>
namespace DB

View File

@ -14,7 +14,7 @@
#include <Poco/StreamCopier.h>
#include <Poco/String.h>
#include <Common/SipHash.h>
#include <Common/hex.h>
#include <base/hex.h>
using namespace DB;
TEST(HadoopSnappyDecoder, repeatNeedMoreInput)
{

View File

@ -1,5 +1,6 @@
#include <Interpreters/ActionsDAG.h>
#include <Analyzer/FunctionNode.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/IFunction.h>
@ -199,6 +200,23 @@ const ActionsDAG::Node & ActionsDAG::addFunction(
std::move(children),
std::move(arguments),
std::move(result_name),
function_base->getResultType(),
all_const);
}
const ActionsDAG::Node & ActionsDAG::addFunction(
const FunctionNode & function,
NodeRawConstPtrs children,
std::string result_name)
{
auto [arguments, all_const] = getFunctionArguments(children);
return addFunctionImpl(
function.getFunction(),
std::move(children),
std::move(arguments),
std::move(result_name),
function.getResultType(),
all_const);
}
@ -214,6 +232,7 @@ const ActionsDAG::Node & ActionsDAG::addFunction(
std::move(children),
std::move(arguments),
std::move(result_name),
function_base->getResultType(),
all_const);
}
@ -238,6 +257,7 @@ const ActionsDAG::Node & ActionsDAG::addFunctionImpl(
NodeRawConstPtrs children,
ColumnsWithTypeAndName arguments,
std::string result_name,
DataTypePtr result_type,
bool all_const)
{
size_t num_arguments = children.size();
@ -247,7 +267,7 @@ const ActionsDAG::Node & ActionsDAG::addFunctionImpl(
node.children = std::move(children);
node.function_base = function_base;
node.result_type = node.function_base->getResultType();
node.result_type = result_type;
node.function = node.function_base->prepare(arguments);
node.is_deterministic = node.function_base->isDeterministic();
@ -2264,7 +2284,15 @@ ActionsDAGPtr ActionsDAG::buildFilterActionsDAG(
for (const auto & child : node->children)
function_children.push_back(node_to_result_node.find(child)->second);
result_node = &result_dag->addFunction(node->function_base, std::move(function_children), {});
auto [arguments, all_const] = getFunctionArguments(function_children);
result_node = &result_dag->addFunctionImpl(
node->function_base,
std::move(function_children),
std::move(arguments),
{},
node->result_type,
all_const);
break;
}
}

View File

@ -23,6 +23,8 @@ using FunctionBasePtr = std::shared_ptr<const IFunctionBase>;
class IFunctionOverloadResolver;
using FunctionOverloadResolverPtr = std::shared_ptr<IFunctionOverloadResolver>;
class FunctionNode;
class IDataType;
using DataTypePtr = std::shared_ptr<const IDataType>;
@ -139,6 +141,10 @@ public:
const FunctionOverloadResolverPtr & function,
NodeRawConstPtrs children,
std::string result_name);
const Node & addFunction(
const FunctionNode & function,
NodeRawConstPtrs children,
std::string result_name);
const Node & addFunction(
const FunctionBasePtr & function_base,
NodeRawConstPtrs children,
@ -358,6 +364,7 @@ private:
NodeRawConstPtrs children,
ColumnsWithTypeAndName arguments,
std::string result_name,
DataTypePtr result_type,
bool all_const);
#if USE_EMBEDDED_COMPILER

View File

@ -1,6 +1,6 @@
#pragma once
#include <Core/Types.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <Core/UUID.h>
namespace DB

View File

@ -2,7 +2,7 @@
#include <base/getThreadId.h>
#include <Common/scope_guard_safe.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <Common/logger_useful.h>
#include <Interpreters/Cache/FileCache.h>
#include <IO/WriteBufferFromString.h>

View File

@ -15,6 +15,7 @@
#include <base/sort.h>
#include <boost/range/algorithm_ext/erase.hpp>
#include <span>
namespace DB
{
@ -509,7 +510,6 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
shard_local_addresses.push_back(replica);
shard_all_addresses.push_back(replica);
}
ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
all_replicas_pools, settings.load_balancing,
settings.distributed_replica_error_half_life.totalSeconds(), settings.distributed_replica_error_cap);
@ -653,9 +653,9 @@ void Cluster::initMisc()
}
}
std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings) const
std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard) const
{
return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings)};
return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard)};
}
std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard(size_t index) const
@ -668,7 +668,44 @@ std::unique_ptr<Cluster> Cluster::getClusterWithMultipleShards(const std::vector
return std::unique_ptr<Cluster>{ new Cluster(SubclusterTag{}, *this, indices) };
}
Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings)
namespace
{
void shuffleReplicas(std::vector<Cluster::Address> & replicas, const Settings & settings, size_t replicas_needed)
{
std::random_device rd;
std::mt19937 gen{rd()};
if (settings.prefer_localhost_replica)
{
// force for local replica to always be included
auto first_non_local_replica = std::partition(replicas.begin(), replicas.end(), [](const auto & replica) { return replica.is_local; });
size_t local_replicas_count = first_non_local_replica - replicas.begin();
if (local_replicas_count == replicas_needed)
{
/// we have exact amount of local replicas as needed, no need to do anything
return;
}
if (local_replicas_count > replicas_needed)
{
/// we can use only local replicas, shuffle them
std::shuffle(replicas.begin(), first_non_local_replica, gen);
return;
}
/// shuffle just non local replicas
std::shuffle(first_non_local_replica, replicas.end(), gen);
return;
}
std::shuffle(replicas.begin(), replicas.end(), gen);
}
}
Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard)
{
if (from.addresses_with_failover.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster is empty");
@ -677,40 +714,55 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
std::set<std::pair<String, int>> unique_hosts;
for (size_t shard_index : collections::range(0, from.shards_info.size()))
{
const auto & replicas = from.addresses_with_failover[shard_index];
for (const auto & address : replicas)
auto create_shards_from_replicas = [&](std::span<const Address> replicas)
{
if (!unique_hosts.emplace(address.host_name, address.port).second)
continue; /// Duplicate host, skip.
for (const auto & address : replicas)
{
if (!unique_hosts.emplace(address.host_name, address.port).second)
continue; /// Duplicate host, skip.
ShardInfo info;
info.shard_num = ++shard_num;
ShardInfo info;
info.shard_num = ++shard_num;
if (address.is_local)
info.local_addresses.push_back(address);
if (address.is_local)
info.local_addresses.push_back(address);
info.all_addresses.push_back(address);
info.all_addresses.push_back(address);
auto pool = ConnectionPoolFactory::instance().get(
static_cast<unsigned>(settings.distributed_connections_pool_size),
address.host_name,
address.port,
address.default_database,
address.user,
address.password,
address.quota_key,
address.cluster,
address.cluster_secret,
"server",
address.compression,
address.secure,
address.priority);
auto pool = ConnectionPoolFactory::instance().get(
static_cast<unsigned>(settings.distributed_connections_pool_size),
address.host_name,
address.port,
address.default_database,
address.user,
address.password,
address.quota_key,
address.cluster,
address.cluster_secret,
"server",
address.compression,
address.secure,
address.priority);
info.pool = std::make_shared<ConnectionPoolWithFailover>(ConnectionPoolPtrs{pool}, settings.load_balancing);
info.per_replica_pools = {std::move(pool)};
info.pool = std::make_shared<ConnectionPoolWithFailover>(ConnectionPoolPtrs{pool}, settings.load_balancing);
info.per_replica_pools = {std::move(pool)};
addresses_with_failover.emplace_back(Addresses{address});
shards_info.emplace_back(std::move(info));
addresses_with_failover.emplace_back(Addresses{address});
shards_info.emplace_back(std::move(info));
}
};
const auto & replicas = from.addresses_with_failover[shard_index];
if (!max_replicas_from_shard || replicas.size() <= max_replicas_from_shard)
{
create_shards_from_replicas(replicas);
}
else
{
auto shuffled_replicas = replicas;
// shuffle replicas so we don't always pick the same subset
shuffleReplicas(shuffled_replicas, settings, max_replicas_from_shard);
create_shards_from_replicas(std::span{shuffled_replicas.begin(), max_replicas_from_shard});
}
}

View File

@ -250,7 +250,7 @@ public:
std::unique_ptr<Cluster> getClusterWithMultipleShards(const std::vector<size_t> & indices) const;
/// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards.
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings) const;
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0) const;
/// Returns false if cluster configuration doesn't allow to use it for cross-replication.
/// NOTE: true does not mean, that it's actually a cross-replication cluster.
@ -271,7 +271,7 @@ private:
/// For getClusterWithReplicasAsShards implementation
struct ReplicasAsShardsTag {};
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings);
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard);
/// Inter-server secret
String secret;

View File

@ -10,6 +10,7 @@
#include <Interpreters/IInterpreter.h>
#include <Interpreters/OptimizeShardingKeyRewriteInVisitor.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTFunction.h>
#include <Interpreters/ProcessList.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromRemote.h>
@ -20,6 +21,7 @@
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageReplicatedMergeTree.h>
namespace DB
{
@ -157,7 +159,8 @@ void executeQuery(
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster)
const ClusterPtr & not_optimized_cluster,
AdditionalShardFilterGenerator shard_filter_generator)
{
const Settings & settings = context->getSettingsRef();
@ -189,7 +192,22 @@ void executeQuery(
visitor.visit(query_ast_for_shard);
}
else
query_ast_for_shard = query_ast;
query_ast_for_shard = query_ast->clone();
if (shard_filter_generator)
{
auto shard_filter = shard_filter_generator(shard_info.shard_num);
if (shard_filter)
{
auto & select_query = query_ast_for_shard->as<ASTSelectQuery &>();
auto where_expression = select_query.where();
if (where_expression)
shard_filter = makeASTFunction("and", where_expression, shard_filter);
select_query.setExpression(ASTSelectQuery::Expression::WHERE, std::move(shard_filter));
}
}
stream_factory.createForShard(shard_info,
query_ast_for_shard, main_table, table_func_ptr,

View File

@ -37,6 +37,7 @@ class SelectStreamFactory;
ContextMutablePtr updateSettingsForCluster(
const Cluster & cluster, ContextPtr context, const Settings & settings, const StorageID & main_table, const SelectQueryInfo * query_info = nullptr, Poco::Logger * log = nullptr);
using AdditionalShardFilterGenerator = std::function<ASTPtr(uint64_t)>;
/// Execute a distributed query, creating a query plan, from which the query pipeline can be built.
/// `stream_factory` object encapsulates the logic of creating plans for a different type of query
/// (currently SELECT, DESCRIBE).
@ -50,7 +51,8 @@ void executeQuery(
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster);
const ClusterPtr & not_optimized_cluster,
AdditionalShardFilterGenerator shard_filter_generator = {});
void executeQueryWithParallelReplicas(

View File

@ -4056,21 +4056,34 @@ std::shared_ptr<AsyncReadCounters> Context::getAsyncReadCounters() const
return async_read_counters;
}
Context::ParallelReplicasMode Context::getParallelReplicasMode() const
{
const auto & settings = getSettingsRef();
using enum Context::ParallelReplicasMode;
if (!settings.parallel_replicas_custom_key.value.empty())
return CUSTOM_KEY;
if (settings.allow_experimental_parallel_reading_from_replicas
&& !settings.use_hedged_requests)
return READ_TASKS;
return SAMPLE_KEY;
}
bool Context::canUseParallelReplicasOnInitiator() const
{
const auto & settings = getSettingsRef();
return settings.allow_experimental_parallel_reading_from_replicas
return getParallelReplicasMode() == ParallelReplicasMode::READ_TASKS
&& settings.max_parallel_replicas > 1
&& !settings.use_hedged_requests
&& !getClientInfo().collaborate_with_initiator;
}
bool Context::canUseParallelReplicasOnFollower() const
{
const auto & settings = getSettingsRef();
return settings.allow_experimental_parallel_reading_from_replicas
return getParallelReplicasMode() == ParallelReplicasMode::READ_TASKS
&& settings.max_parallel_replicas > 1
&& !settings.use_hedged_requests
&& getClientInfo().collaborate_with_initiator;
}

View File

@ -1123,6 +1123,15 @@ public:
bool canUseParallelReplicasOnInitiator() const;
bool canUseParallelReplicasOnFollower() const;
enum class ParallelReplicasMode : uint8_t
{
SAMPLE_KEY,
CUSTOM_KEY,
READ_TASKS,
};
ParallelReplicasMode getParallelReplicasMode() const;
private:
std::unique_lock<std::recursive_mutex> getLock() const;

View File

@ -10,7 +10,6 @@
#include <Core/ProtocolDefines.h>
#include <Disks/IVolume.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <Common/logger_useful.h>
#include <Common/thread_local_rng.h>

View File

@ -139,6 +139,7 @@ private:
mutable SharedMutex rehash_mutex;
FileBucket * current_bucket = nullptr;
mutable std::mutex current_bucket_mutex;
InMemoryJoinPtr hash_join;

View File

@ -9,7 +9,7 @@
#include <Common/Macros.h>
#include <Common/randomSeed.h>
#include <Common/atomicRename.h>
#include <Common/hex.h>
#include <base/hex.h>
#include <Core/Defines.h>
#include <Core/SettingsEnums.h>

View File

@ -161,6 +161,8 @@ public:
if (curr_process.processed)
continue;
LOG_DEBUG(&Poco::Logger::get("KillQuery"), "Will kill query {} (synchronously)", curr_process.query_id);
auto code = process_list.sendCancelToQuery(curr_process.query_id, curr_process.user, true);
if (code != CancellationCode::QueryIsNotInitializedYet && code != CancellationCode::CancelSent)
@ -226,6 +228,8 @@ BlockIO InterpreterKillQueryQuery::execute()
MutableColumns res_columns = header.cloneEmptyColumns();
for (const auto & query_desc : queries_to_stop)
{
if (!query.test)
LOG_DEBUG(&Poco::Logger::get("KillQuery"), "Will kill query {} (asynchronously)", query_desc.query_id);
auto code = (query.test) ? CancellationCode::Unknown : process_list.sendCancelToQuery(query_desc.query_id, query_desc.user, true);
insertResultRow(query_desc.source_num, code, processes_block, header, res_columns);
}

View File

@ -38,6 +38,7 @@
#include <Interpreters/QueryLog.h>
#include <Interpreters/replaceAliasColumnsInQuery.h>
#include <Interpreters/RewriteCountDistinctVisitor.h>
#include <Interpreters/getCustomKeyFilterForParallelReplicas.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/QueryPlan/AggregatingStep.h>
@ -114,6 +115,7 @@ namespace ErrorCodes
extern const int INVALID_WITH_FILL_EXPRESSION;
extern const int ACCESS_DENIED;
extern const int UNKNOWN_IDENTIFIER;
extern const int BAD_ARGUMENTS;
}
/// Assumes `storage` is set and the table filter (row-level security) is not empty.
@ -229,10 +231,13 @@ InterpreterSelectQuery::InterpreterSelectQuery(
InterpreterSelectQuery::~InterpreterSelectQuery() = default;
namespace
{
/** There are no limits on the maximum size of the result for the subquery.
* Since the result of the query is not the result of the entire query.
*/
static ContextPtr getSubqueryContext(const ContextPtr & context)
ContextPtr getSubqueryContext(const ContextPtr & context)
{
auto subquery_context = Context::createCopy(context);
Settings subquery_settings = context->getSettings();
@ -244,7 +249,7 @@ static ContextPtr getSubqueryContext(const ContextPtr & context)
return subquery_context;
}
static void rewriteMultipleJoins(ASTPtr & query, const TablesWithColumns & tables, const String & database, const Settings & settings)
void rewriteMultipleJoins(ASTPtr & query, const TablesWithColumns & tables, const String & database, const Settings & settings)
{
ASTSelectQuery & select = query->as<ASTSelectQuery &>();
@ -264,7 +269,7 @@ static void rewriteMultipleJoins(ASTPtr & query, const TablesWithColumns & table
}
/// Checks that the current user has the SELECT privilege.
static void checkAccessRightsForSelect(
void checkAccessRightsForSelect(
const ContextPtr & context,
const StorageID & table_id,
const StorageMetadataPtr & table_metadata,
@ -294,7 +299,7 @@ static void checkAccessRightsForSelect(
context->checkAccess(AccessType::SELECT, table_id, syntax_analyzer_result.requiredSourceColumnsForAccessCheck());
}
static ASTPtr parseAdditionalFilterConditionForTable(
ASTPtr parseAdditionalFilterConditionForTable(
const Map & setting,
const DatabaseAndTableWithAlias & target,
const Context & context)
@ -322,7 +327,7 @@ static ASTPtr parseAdditionalFilterConditionForTable(
}
/// Returns true if we should ignore quotas and limits for a specified table in the system database.
static bool shouldIgnoreQuotaAndLimits(const StorageID & table_id)
bool shouldIgnoreQuotaAndLimits(const StorageID & table_id)
{
if (table_id.database_name == DatabaseCatalog::SYSTEM_DATABASE)
{
@ -333,6 +338,8 @@ static bool shouldIgnoreQuotaAndLimits(const StorageID & table_id)
return false;
}
}
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const ContextPtr & context_,
@ -448,10 +455,11 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}
if (joined_tables.tablesCount() > 1 && settings.allow_experimental_parallel_reading_from_replicas)
if (joined_tables.tablesCount() > 1 && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas))
{
LOG_WARNING(log, "Joins are not supported with parallel replicas. Query will be executed without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
context->setSetting("parallel_replicas_custom_key", String{""});
}
/// Rewrite JOINs
@ -509,6 +517,42 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query_info.additional_filter_ast = parseAdditionalFilterConditionForTable(
settings.additional_table_filters, joined_tables.tablesWithColumns().front().table, *context);
ASTPtr parallel_replicas_custom_filter_ast = nullptr;
if (context->getParallelReplicasMode() == Context::ParallelReplicasMode::CUSTOM_KEY && !joined_tables.tablesWithColumns().empty())
{
if (settings.parallel_replicas_count > 1)
{
if (auto custom_key_ast = parseCustomKeyForTable(settings.parallel_replicas_custom_key, *context))
{
LOG_TRACE(log, "Processing query on a replica using custom_key '{}'", settings.parallel_replicas_custom_key.value);
if (!storage)
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Storage is unknown when trying to parse custom key for parallel replica");
parallel_replicas_custom_filter_ast = getCustomKeyFilterForParallelReplica(
settings.parallel_replicas_count,
settings.parallel_replica_offset,
std::move(custom_key_ast),
settings.parallel_replicas_custom_key_filter_type,
*storage,
context);
}
else if (settings.parallel_replica_offset > 0)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Parallel replicas processing with custom_key has been requested "
"(setting 'max_parallel_replicas') but the table does not have custom_key defined for it "
"or it's invalid (settings `parallel_replicas_custom_key`)");
}
}
else if (auto * distributed = dynamic_cast<StorageDistributed *>(storage.get());
distributed && canUseCustomKey(settings, *distributed->getCluster(), *context))
{
query_info.use_custom_key = true;
context->setSetting("distributed_group_by_no_merge", 2);
}
}
if (autoFinalOnQuery(query))
{
query.setFinal();
@ -693,6 +737,16 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query_info.filter_asts.push_back(query_info.additional_filter_ast);
}
if (parallel_replicas_custom_filter_ast)
{
parallel_replicas_custom_filter_info = generateFilterActions(
table_id, parallel_replicas_custom_filter_ast, context, storage, storage_snapshot, metadata_snapshot, required_columns,
prepared_sets);
parallel_replicas_custom_filter_info->do_remove_column = true;
query_info.filter_asts.push_back(parallel_replicas_custom_filter_ast);
}
source_header = storage_snapshot->getSampleBlockForColumns(required_columns, parameter_values);
}
@ -1435,17 +1489,23 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
query_plan.addStep(std::move(row_level_security_step));
}
if (additional_filter_info)
const auto add_filter_step = [&](const auto & new_filter_info, const std::string & description)
{
auto additional_filter_step = std::make_unique<FilterStep>(
auto filter_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
additional_filter_info->actions,
additional_filter_info->column_name,
additional_filter_info->do_remove_column);
new_filter_info->actions,
new_filter_info->column_name,
new_filter_info->do_remove_column);
additional_filter_step->setStepDescription("Additional filter");
query_plan.addStep(std::move(additional_filter_step));
}
filter_step->setStepDescription(description);
query_plan.addStep(std::move(filter_step));
};
if (additional_filter_info)
add_filter_step(additional_filter_info, "Additional filter");
if (parallel_replicas_custom_filter_info)
add_filter_step(parallel_replicas_custom_filter_info, "Parallel replica custom key filter");
if (expressions.before_array_join)
{

View File

@ -215,6 +215,9 @@ private:
/// For additional_filter setting.
FilterDAGInfoPtr additional_filter_info;
/// For "per replica" filter when multiple replicas are used
FilterDAGInfoPtr parallel_replicas_custom_filter_info;
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
/// List of columns to read to execute the query.

Some files were not shown because too many files have changed in this diff Show More