Merge branch 'master' into keeper-parallel-storage

This commit is contained in:
Antonio Andelic 2023-09-05 08:46:38 +00:00
commit 57943798b7
111 changed files with 2353 additions and 645 deletions

View File

@ -580,6 +580,47 @@ jobs:
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderFuzzers:
needs: [DockerHubPush, FastTest, StyleCheck]
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
BUILD_NAME=fuzzers
EOF
- name: Download changed images
uses: actions/download-artifact@v3
with:
name: changed_images
path: ${{ env.IMAGES_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
submodules: true
- name: Build
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
- name: Upload build URLs to artifacts
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v3
with:
name: ${{ env.BUILD_URLS }}
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
##########################################################################################
##################################### SPECIAL BUILDS #####################################
##########################################################################################

View File

@ -118,7 +118,11 @@ endif()
# - sanitize.cmake
add_library(global-libs INTERFACE)
include (cmake/fuzzer.cmake)
# We don't want to instrument everything with fuzzer, but only specific targets (see below),
# also, since we build our own llvm, we specifically don't want to instrument
# libFuzzer library itself - it would result in infinite recursion
#include (cmake/fuzzer.cmake)
include (cmake/sanitize.cmake)
option(ENABLE_COLORED_BUILD "Enable colors in compiler output" ON)
@ -558,6 +562,46 @@ add_subdirectory (programs)
add_subdirectory (tests)
add_subdirectory (utils)
# Function get_all_targets collects all targets recursively
function(get_all_targets var)
macro(get_all_targets_recursive targets dir)
get_property(subdirectories DIRECTORY ${dir} PROPERTY SUBDIRECTORIES)
foreach(subdir ${subdirectories})
get_all_targets_recursive(${targets} ${subdir})
endforeach()
get_property(current_targets DIRECTORY ${dir} PROPERTY BUILDSYSTEM_TARGETS)
list(APPEND ${targets} ${current_targets})
endmacro()
set(targets)
get_all_targets_recursive(targets ${CMAKE_CURRENT_SOURCE_DIR})
set(${var} ${targets} PARENT_SCOPE)
endfunction()
if (FUZZER)
# Bundle fuzzers target
add_custom_target(fuzzers)
# Instrument all targets fuzzer and link with libfuzzer
get_all_targets(all_targets)
foreach(target ${all_targets})
if (NOT(target STREQUAL "_fuzzer" OR target STREQUAL "_fuzzer_no_main"))
get_target_property(target_type ${target} TYPE)
if (NOT(target_type STREQUAL "INTERFACE_LIBRARY" OR target_type STREQUAL "UTILITY"))
target_compile_options(${target} PRIVATE "-fsanitize=fuzzer-no-link")
endif()
# clickhouse fuzzer isn't working correctly
# initial PR https://github.com/ClickHouse/ClickHouse/pull/27526
#if (target MATCHES ".+_fuzzer" OR target STREQUAL "clickhouse")
if (target MATCHES ".+_fuzzer")
message(STATUS "${target} instrumented with fuzzer")
target_link_libraries(${target} PUBLIC ch_contrib::fuzzer)
# Add to fuzzers bundle
add_dependencies(fuzzers ${target})
endif()
endif()
endforeach()
endif()
include (cmake/sanitize_targets.cmake)
# Build native targets if necessary

View File

@ -32,7 +32,7 @@ RUN arch=${TARGETARCH:-amd64} \
esac
ARG REPOSITORY="https://s3.amazonaws.com/clickhouse-builds/22.4/31c367d3cd3aefd316778601ff6565119fe36682/package_release"
ARG VERSION="23.8.1.2992"
ARG VERSION="23.8.2.7"
ARG PACKAGES="clickhouse-keeper"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -97,9 +97,11 @@ if [ -n "$MAKE_DEB" ]; then
bash -x /build/packages/build
fi
mv ./programs/clickhouse* /output
[ -x ./programs/self-extracting/clickhouse ] && mv ./programs/self-extracting/clickhouse /output
mv ./src/unit_tests_dbms /output ||: # may not exist for some binary builds
if [ "$BUILD_TARGET" != "fuzzers" ]; then
mv ./programs/clickhouse* /output
[ -x ./programs/self-extracting/clickhouse ] && mv ./programs/self-extracting/clickhouse /output
mv ./src/unit_tests_dbms /output ||: # may not exist for some binary builds
fi
prepare_combined_output () {
local OUTPUT

View File

@ -149,7 +149,10 @@ def parse_env_variables(
result = []
result.append("OUTPUT_DIR=/output")
cmake_flags = ["$CMAKE_FLAGS"]
build_target = "clickhouse-bundle"
if package_type == "fuzzers":
build_target = "fuzzers"
else:
build_target = "clickhouse-bundle"
is_cross_darwin = compiler.endswith(DARWIN_SUFFIX)
is_cross_darwin_arm = compiler.endswith(DARWIN_ARM_SUFFIX)
@ -258,6 +261,17 @@ def parse_env_variables(
cmake_flags.append("-DBUILD_STANDALONE_KEEPER=1")
else:
result.append("BUILD_MUSL_KEEPER=1")
elif package_type == "fuzzers":
cmake_flags.append("-DENABLE_FUZZING=1")
cmake_flags.append("-DENABLE_PROTOBUF=1")
cmake_flags.append("-DUSE_INTERNAL_PROTOBUF_LIBRARY=1")
cmake_flags.append("-DWITH_COVERAGE=1")
cmake_flags.append("-DCMAKE_AUTOGEN_VERBOSE=ON")
# cmake_flags.append("-DCMAKE_INSTALL_PREFIX=/usr")
# cmake_flags.append("-DCMAKE_INSTALL_SYSCONFDIR=/etc")
# cmake_flags.append("-DCMAKE_INSTALL_LOCALSTATEDIR=/var")
# Reduce linking and building time by avoid *install/all dependencies
cmake_flags.append("-DCMAKE_SKIP_INSTALL_ALL_DEPENDENCY=ON")
result.append(f"CC={cc}")
result.append(f"CXX={cxx}")
@ -365,7 +379,7 @@ def parse_args() -> argparse.Namespace:
)
parser.add_argument(
"--package-type",
choices=["deb", "binary"],
choices=["deb", "binary", "fuzzers"],
required=True,
)
parser.add_argument(

View File

@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="23.8.1.2992"
ARG VERSION="23.8.2.7"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -23,7 +23,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="23.8.1.2992"
ARG VERSION="23.8.2.7"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -0,0 +1,20 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v23.3.12.11-lts (414317bed21) FIXME as compared to v23.3.11.5-lts (5762a23a76d)
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix async insert with deduplication for ReplicatedMergeTree using merging algorithms [#51676](https://github.com/ClickHouse/ClickHouse/pull/51676) ([Antonio Andelic](https://github.com/antonio2368)).
* Fix deadlock on DatabaseCatalog shutdown [#51908](https://github.com/ClickHouse/ClickHouse/pull/51908) ([Alexander Tokmakov](https://github.com/tavplubix)).
* Fix crash in join on sparse column [#53548](https://github.com/ClickHouse/ClickHouse/pull/53548) ([vdimir](https://github.com/vdimir)).
* Fix rows_before_limit_at_least for DelayedSource. [#54122](https://github.com/ClickHouse/ClickHouse/pull/54122) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Fix broken `02862_sorted_distinct_sparse_fix` [#53738](https://github.com/ClickHouse/ClickHouse/pull/53738) ([Antonio Andelic](https://github.com/antonio2368)).

View File

@ -0,0 +1,18 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v23.8.2.7-lts (f73c8f37874) FIXME as compared to v23.8.1.2992-lts (ebc7d9a9f3b)
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix: parallel replicas over distributed don't read from all replicas [#54199](https://github.com/ClickHouse/ClickHouse/pull/54199) ([Igor Nikonov](https://github.com/devcrafter)).
* Fix: allow IPv6 for bloom filter [#54200](https://github.com/ClickHouse/ClickHouse/pull/54200) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* S3Queue is experimental [#54214](https://github.com/ClickHouse/ClickHouse/pull/54214) ([Alexey Milovidov](https://github.com/alexey-milovidov)).

View File

@ -8,17 +8,21 @@ Contains information about [distributed ddl queries (ON CLUSTER clause)](../../s
Columns:
- `entry` ([String](../../sql-reference/data-types/string.md)) — Query id.
- `host_name` ([String](../../sql-reference/data-types/string.md)) — Hostname.
- `host_address` ([String](../../sql-reference/data-types/string.md)) — IP address that the Hostname resolves to.
- `port` ([UInt16](../../sql-reference/data-types/int-uint.md)) — Host Port.
- `status` ([Enum8](../../sql-reference/data-types/enum.md)) — Status of the query.
- `entry_version` ([Nullable(UInt8)](../../sql-reference/data-types/int-uint.md)) - Version of the entry
- `initiator_host` ([Nullable(String)](../../sql-reference/data-types/string.md)) - Host that initiated the DDL operation
- `initiator_port` ([Nullable(UInt16)](../../sql-reference/data-types/int-uint.md)) - Port used by the initiator
- `cluster` ([String](../../sql-reference/data-types/string.md)) — Cluster name.
- `query` ([String](../../sql-reference/data-types/string.md)) — Query executed.
- `initiator` ([String](../../sql-reference/data-types/string.md)) — Node that executed the query.
- `query_start_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Query start time.
- `settings` ([Map(String, String)](../../sql-reference/data-types/map.md)) - Settings used in the DDL operation
- `query_create_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Query created time.
- `host` ([String](../../sql-reference/data-types/string.md)) — Hostname
- `port` ([UInt16](../../sql-reference/data-types/int-uint.md)) — Host Port.
- `status` ([Enum8](../../sql-reference/data-types/enum.md)) — Status of the query.
- `exception_code` ([Enum8](../../sql-reference/data-types/enum.md)) — Exception code.
- `exception_text` ([Nullable(String)](../../sql-reference/data-types/string.md)) - Exception message
- `query_finish_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Query finish time.
- `query_duration_ms` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Duration of query execution (in milliseconds).
- `exception_code` ([Enum8](../../sql-reference/data-types/enum.md)) — Exception code from [ClickHouse Keeper](../../operations/tips.md#zookeeper).
**Example**
@ -34,32 +38,38 @@ Query id: f544e72a-6641-43f1-836b-24baa1c9632a
Row 1:
──────
entry: query-0000000000
host_name: clickhouse01
host_address: 172.23.0.11
port: 9000
status: Finished
entry_version: 5
initiator_host: clickhouse01
initiator_port: 9000
cluster: test_cluster
query: CREATE DATABASE test_db UUID '4a82697e-c85e-4e5b-a01e-a36f2a758456' ON CLUSTER test_cluster
initiator: clickhouse01:9000
query_start_time: 2020-12-30 13:07:51
query_finish_time: 2020-12-30 13:07:51
query_duration_ms: 6
exception_code: ZOK
settings: {'max_threads':'16','use_uncompressed_cache':'0'}
query_create_time: 2023-09-01 16:15:14
host: clickhouse-01
port: 9000
status: Finished
exception_code: 0
exception_text:
query_finish_time: 2023-09-01 16:15:14
query_duration_ms: 154
Row 2:
──────
entry: query-0000000000
host_name: clickhouse02
host_address: 172.23.0.12
port: 9000
status: Finished
entry: query-0000000001
entry_version: 5
initiator_host: clickhouse01
initiator_port: 9000
cluster: test_cluster
query: CREATE DATABASE test_db UUID '4a82697e-c85e-4e5b-a01e-a36f2a758456' ON CLUSTER test_cluster
initiator: clickhouse01:9000
query_start_time: 2020-12-30 13:07:51
query_finish_time: 2020-12-30 13:07:51
query_duration_ms: 6
exception_code: ZOK
settings: {'max_threads':'16','use_uncompressed_cache':'0'}
query_create_time: 2023-09-01 16:15:14
host: clickhouse-01
port: 9000
status: Finished
exception_code: 630
exception_text: Code: 630. DB::Exception: Cannot drop or rename test_db, because some tables depend on it:
query_finish_time: 2023-09-01 16:15:14
query_duration_ms: 154
2 rows in set. Elapsed: 0.025 sec.
```

View File

@ -0,0 +1,38 @@
---
slug: /en/operations/utilities/clickhouse-disks
sidebar_position: 59
sidebar_label: clickhouse-disks
---
# clickhouse-disks
A utility providing filesystem-like operations for ClickHouse disks.
Program-wide options:
* `--config-file, -C` -- path to ClickHouse config, defaults to `/etc/clickhouse-server/config.xml`.
* `--save-logs` -- Log progress of invoked commands to `/var/log/clickhouse-server/clickhouse-disks.log`.
* `--log-level` -- What [type](../server-configuration-parameters/settings#server_configuration_parameters-logger) of events to log, defaults to `none`.
* `--disk` -- what disk to use for `mkdir, move, read, write, remove` commands. Defaults to `default`.
## Commands
* `copy [--disk-from d1] [--disk-to d2] <FROM_PATH> <TO_PATH>`.
Recursively copy data from `FROM_PATH` at disk `d1` (defaults to `disk` value if not provided)
to `TO_PATH` at disk `d2` (defaults to `disk` value if not provided).
* `move <FROM_PATH> <TO_PATH>`.
Move file or directory from `FROM_PATH` to `TO_PATH`.
* `remove <PATH>`.
Remove `PATH` recursively.
* `link <FROM_PATH> <TO_PATH>`.
Create a hardlink from `FROM_PATH` to `TO_PATH`.
* `list [--recursive] <PATH>...`
List files at `PATH`s. Non-recursive by default.
* `list-disks`.
List disks names.
* `mkdir [--recursive] <PATH>`.
Create a directory. Non-recursive by default.
* `read: <FROM_PATH> [<TO_PATH>]`
Read a file from `FROM_PATH` to `TO_PATH` (`stdout` if not supplied).
* `write [FROM_PATH] <TO_PATH>`.
Write a file from `FROM_PATH` (`stdin` if not supplied) to `TO_PATH`.

View File

@ -13,4 +13,6 @@ pagination_next: 'en/operations/utilities/clickhouse-copier'
- [clickhouse-format](../../operations/utilities/clickhouse-format.md) — Enables formatting input queries.
- [ClickHouse obfuscator](../../operations/utilities/clickhouse-obfuscator.md) — Obfuscates data.
- [ClickHouse compressor](../../operations/utilities/clickhouse-compressor.md) — Compresses and decompresses data.
- [clickhouse-disks](../../operations/utilities/clickhouse-disks.md) -- Provides filesystem-like operations
on files among different ClickHouse disks.
- [clickhouse-odbc-bridge](../../operations/utilities/odbc-bridge.md) — A proxy server for ODBC driver.

View File

@ -738,16 +738,16 @@ age('unit', startdate, enddate, [timezone])
- `unit` — The type of interval for result. [String](../../sql-reference/data-types/string.md).
Possible values:
- `microsecond` (possible abbreviations: `us`, `u`)
- `millisecond` (possible abbreviations: `ms`)
- `second` (possible abbreviations: `ss`, `s`)
- `minute` (possible abbreviations: `mi`, `n`)
- `hour` (possible abbreviations: `hh`, `h`)
- `day` (possible abbreviations: `dd`, `d`)
- `week` (possible abbreviations: `wk`, `ww`)
- `month` (possible abbreviations: `mm`, `m`)
- `quarter` (possible abbreviations: `qq`, `q`)
- `year` (possible abbreviations: `yyyy`, `yy`)
- `microsecond` `microseconds` `us` `u`
- `millisecond` `milliseconds` `ms`
- `second` `seconds` `ss` `s`
- `minute` `minutes` `mi` `n`
- `hour` `hours` `hh` `h`
- `day` `days` `dd` `d`
- `week` `weeks` `wk` `ww`
- `month` `months` `mm` `m`
- `quarter` `quarters` `qq` `q`
- `year` `years` `yyyy` `yy`
- `startdate` — The first time value to subtract (the subtrahend). [Date](../../sql-reference/data-types/date.md), [Date32](../../sql-reference/data-types/date32.md), [DateTime](../../sql-reference/data-types/datetime.md) or [DateTime64](../../sql-reference/data-types/datetime64.md).
@ -815,16 +815,16 @@ Aliases: `dateDiff`, `DATE_DIFF`, `timestampDiff`, `timestamp_diff`, `TIMESTAMP_
- `unit` — The type of interval for result. [String](../../sql-reference/data-types/string.md).
Possible values:
- `microsecond` (possible abbreviations: `microseconds`, `us`, `u`)
- `millisecond` (possible abbreviations: `milliseconds`, `ms`)
- `second` (possible abbreviations: `seconds`, `ss`, `s`)
- `minute` (possible abbreviations: `minutes`, `mi`, `n`)
- `hour` (possible abbreviations: `hours`, `hh`, `h`)
- `day` (possible abbreviations: `days`, `dd`, `d`)
- `week` (possible abbreviations: `weeks`, `wk`, `ww`)
- `month` (possible abbreviations: `months`, `mm`, `m`)
- `quarter` (possible abbreviations: `quarters`, `qq`, `q`)
- `year` (possible abbreviations: `years`, `yyyy`, `yy`)
- `microsecond` `microseconds` `us` `u`
- `millisecond` `milliseconds` `ms`
- `second` `seconds` `ss` `s`
- `minute` `minutes` `mi` `n`
- `hour` `hours` `hh` `h`
- `day` `days` `dd` `d`
- `week` `weeks` `wk` `ww`
- `month` `months` `mm` `m`
- `quarter` `quarters` `qq` `q`
- `year` `years` `yyyy` `yy`
- `startdate` — The first time value to subtract (the subtrahend). [Date](../../sql-reference/data-types/date.md), [Date32](../../sql-reference/data-types/date32.md), [DateTime](../../sql-reference/data-types/datetime.md) or [DateTime64](../../sql-reference/data-types/datetime64.md).

View File

@ -94,8 +94,10 @@ Result:
│ 1 │ 1 │ 3 │ 3 │
└─────────────┴───────────────┴───────┴───────────────┘
```
All following examples are executed against this state with 5 rows.
When columns for deduplication are not specified, all of them are taken into account. Row is removed only if all values in all columns are equal to corresponding values in previous row:
#### `DEDUPLICATE`
When columns for deduplication are not specified, all of them are taken into account. The row is removed only if all values in all columns are equal to corresponding values in the previous row:
``` sql
OPTIMIZE TABLE example FINAL DEDUPLICATE;
@ -116,7 +118,7 @@ Result:
│ 1 │ 1 │ 3 │ 3 │
└─────────────┴───────────────┴───────┴───────────────┘
```
#### `DEDUPLICATE BY *`
When columns are specified implicitly, the table is deduplicated by all columns that are not `ALIAS` or `MATERIALIZED`. Considering the table above, these are `primary_key`, `secondary_key`, `value`, and `partition_key` columns:
```sql
OPTIMIZE TABLE example FINAL DEDUPLICATE BY *;
@ -137,7 +139,7 @@ Result:
│ 1 │ 1 │ 3 │ 3 │
└─────────────┴───────────────┴───────┴───────────────┘
```
#### `DEDUPLICATE BY * EXCEPT`
Deduplicate by all columns that are not `ALIAS` or `MATERIALIZED` and explicitly not `value`: `primary_key`, `secondary_key`, and `partition_key` columns.
``` sql
@ -158,7 +160,7 @@ Result:
│ 1 │ 1 │ 2 │ 3 │
└─────────────┴───────────────┴───────┴───────────────┘
```
#### `DEDUPLICATE BY <list of columns>`
Deduplicate explicitly by `primary_key`, `secondary_key`, and `partition_key` columns:
```sql
OPTIMIZE TABLE example FINAL DEDUPLICATE BY primary_key, secondary_key, partition_key;
@ -178,8 +180,8 @@ Result:
│ 1 │ 1 │ 2 │ 3 │
└─────────────┴───────────────┴───────┴───────────────┘
```
Deduplicate by any column matching a regex: `primary_key`, `secondary_key`, and `partition_key` columns:
#### `DEDUPLICATE BY COLUMNS(<regex>)`
Deduplicate by all columns matching a regex: `primary_key`, `secondary_key`, and `partition_key` columns:
```sql
OPTIMIZE TABLE example FINAL DEDUPLICATE BY COLUMNS('.*_key');
```

View File

@ -17,23 +17,21 @@ public:
{
command_name = "copy";
command_option_description.emplace(createOptionsDescription("Allowed options", getTerminalWidth()));
description = "Recursively copy data containing at `from_path` to `to_path`\nPath should be in format './' or './path' or 'path'";
description = "Recursively copy data from `FROM_PATH` to `TO_PATH`";
usage = "copy [OPTION]... <FROM_PATH> <TO_PATH>";
command_option_description->add_options()
("diskFrom", po::value<String>(), "set name for disk from which we do operations")
("diskTo", po::value<String>(), "set name for disk to which we do operations")
;
("disk-from", po::value<String>(), "disk from which we copy")
("disk-to", po::value<String>(), "disk to which we copy");
}
void processOptions(
Poco::Util::LayeredConfiguration & config,
po::variables_map & options) const override
{
if (options.count("diskFrom"))
config.setString("diskFrom", options["diskFrom"].as<String>());
if (options.count("diskTo"))
config.setString("diskTo", options["diskTo"].as<String>());
if (options.count("disk-from"))
config.setString("disk-from", options["disk-from"].as<String>());
if (options.count("disk-to"))
config.setString("disk-to", options["disk-to"].as<String>());
}
void execute(
@ -47,8 +45,8 @@ public:
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Bad Arguments");
}
String disk_name_from = config.getString("diskFrom", config.getString("disk", "default"));
String disk_name_to = config.getString("diskTo", config.getString("disk", "default"));
String disk_name_from = config.getString("disk-from", config.getString("disk", "default"));
String disk_name_to = config.getString("disk-to", config.getString("disk", "default"));
const String & path_from = command_arguments[0];
const String & path_to = command_arguments[1];

View File

@ -15,7 +15,7 @@ public:
CommandLink()
{
command_name = "link";
description = "Create hardlink from `from_path` to `to_path`\nPath should be in format './' or './path' or 'path'";
description = "Create hardlink from `from_path` to `to_path`";
usage = "link [OPTION]... <FROM_PATH> <TO_PATH>";
}

View File

@ -17,11 +17,10 @@ public:
{
command_name = "list";
command_option_description.emplace(createOptionsDescription("Allowed options", getTerminalWidth()));
description = "List files (the default disk is used by default)\nPath should be in format './' or './path' or 'path'";
description = "List files at path[s]";
usage = "list [OPTION]... <PATH>...";
command_option_description->add_options()
("recursive", "recursively list all directories")
;
("recursive", "recursively list all directories");
}
void processOptions(

View File

@ -18,11 +18,10 @@ public:
{
command_name = "mkdir";
command_option_description.emplace(createOptionsDescription("Allowed options", getTerminalWidth()));
description = "Create directory or directories recursively";
description = "Create a directory";
usage = "mkdir [OPTION]... <PATH>";
command_option_description->add_options()
("recursive", "recursively create directories")
;
("recursive", "recursively create directories");
}
void processOptions(

View File

@ -15,7 +15,7 @@ public:
CommandMove()
{
command_name = "move";
description = "Move file or directory from `from_path` to `to_path`\nPath should be in format './' or './path' or 'path'";
description = "Move file or directory from `from_path` to `to_path`";
usage = "move [OPTION]... <FROM_PATH> <TO_PATH>";
}

View File

@ -20,11 +20,10 @@ public:
{
command_name = "read";
command_option_description.emplace(createOptionsDescription("Allowed options", getTerminalWidth()));
description = "read File `from_path` to `to_path` or to stdout\nPath should be in format './' or './path' or 'path'";
usage = "read [OPTION]... <FROM_PATH> <TO_PATH>\nor\nread [OPTION]... <FROM_PATH>";
description = "Read a file from `FROM_PATH` to `TO_PATH`";
usage = "read [OPTION]... <FROM_PATH> [<TO_PATH>]";
command_option_description->add_options()
("output", po::value<String>(), "set path to file to which we are read")
;
("output", po::value<String>(), "file to which we are reading, defaults to `stdout`");
}
void processOptions(

View File

@ -21,11 +21,10 @@ public:
{
command_name = "write";
command_option_description.emplace(createOptionsDescription("Allowed options", getTerminalWidth()));
description = "Write File `from_path` or stdin to `to_path`";
usage = "write [OPTION]... <FROM_PATH> <TO_PATH>\nor\nstdin | write [OPTION]... <TO_PATH>\nPath should be in format './' or './path' or 'path'";
description = "Write a file from `FROM_PATH` to `TO_PATH`";
usage = "write [OPTION]... [<FROM_PATH>] <TO_PATH>";
command_option_description->add_options()
("input", po::value<String>(), "set path to file to which we are write")
;
("input", po::value<String>(), "file from which we are reading, defaults to `stdin`");
}
void processOptions(

View File

@ -948,48 +948,66 @@ int mainEntryClickHouseLocal(int argc, char ** argv)
#if defined(FUZZING_MODE)
// linked from programs/main.cpp
bool isClickhouseApp(const std::string & app_suffix, std::vector<char *> & argv);
std::optional<DB::LocalServer> fuzz_app;
extern "C" int LLVMFuzzerInitialize(int * pargc, char *** pargv)
{
int & argc = *pargc;
char ** argv = *pargv;
std::vector<char *> argv(*pargv, *pargv + (*pargc + 1));
if (!isClickhouseApp("local", argv))
{
std::cerr << "\033[31m" << "ClickHouse compiled in fuzzing mode, only clickhouse local is available." << "\033[0m" << std::endl;
exit(1);
}
/// As a user you can add flags to clickhouse binary in fuzzing mode as follows
/// clickhouse <set of clickhouse-local specific flag> -- <set of libfuzzer flags>
/// clickhouse local <set of clickhouse-local specific flag> -- <set of libfuzzer flags>
/// Calculate the position of delimiter "--" that separates arguments
/// of clickhouse-local and libfuzzer
int pos_delim = argc;
for (int i = 0; i < argc; ++i)
{
if (strcmp(argv[i], "--") == 0)
char **p = &(*pargv)[1];
auto it = argv.begin() + 1;
for (; *it; ++it)
if (strcmp(*it, "--") == 0)
{
pos_delim = i;
++it;
break;
}
}
while (*it)
if (strncmp(*it, "--", 2) != 0)
{
*(p++) = *it;
it = argv.erase(it);
}
else
++it;
*pargc = static_cast<int>(p - &(*pargv)[0]);
*p = nullptr;
/// Initialize clickhouse-local app
fuzz_app.emplace();
fuzz_app->init(pos_delim, argv);
fuzz_app->init(static_cast<int>(argv.size() - 1), argv.data());
/// We will leave clickhouse-local specific arguments as is, because libfuzzer will ignore
/// all keys starting with --
return 0;
}
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
auto input = String(reinterpret_cast<const char *>(data), size);
DB::FunctionGetFuzzerData::update(input);
fuzz_app->run();
try
{
auto input = String(reinterpret_cast<const char *>(data), size);
DB::FunctionGetFuzzerData::update(input);
fuzz_app->run();
}
catch (...)
{
}
return 0;
}
catch (...)
{
return 1;
}
#endif

View File

@ -165,26 +165,6 @@ int printHelp(int, char **)
std::cerr << "clickhouse " << application.first << " [args] " << std::endl;
return -1;
}
bool isClickhouseApp(const std::string & app_suffix, std::vector<char *> & argv)
{
/// Use app if the first arg 'app' is passed (the arg should be quietly removed)
if (argv.size() >= 2)
{
auto first_arg = argv.begin() + 1;
/// 'clickhouse --client ...' and 'clickhouse client ...' are Ok
if (*first_arg == "--" + app_suffix || *first_arg == app_suffix)
{
argv.erase(first_arg);
return true;
}
}
/// Use app if clickhouse binary is run through symbolic link with name clickhouse-app
std::string app_name = "clickhouse-" + app_suffix;
return !argv.empty() && (app_name == argv[0] || endsWith(argv[0], "/" + app_name));
}
#endif
@ -351,7 +331,7 @@ struct Checker
;
#if !defined(USE_MUSL)
#if !defined(FUZZING_MODE) && !defined(USE_MUSL)
/// NOTE: We will migrate to full static linking or our own dynamic loader to make this code obsolete.
void checkHarmfulEnvironmentVariables(char ** argv)
{
@ -407,6 +387,25 @@ void checkHarmfulEnvironmentVariables(char ** argv)
}
bool isClickhouseApp(const std::string & app_suffix, std::vector<char *> & argv)
{
/// Use app if the first arg 'app' is passed (the arg should be quietly removed)
if (argv.size() >= 2)
{
auto first_arg = argv.begin() + 1;
/// 'clickhouse --client ...' and 'clickhouse client ...' are Ok
if (*first_arg == "--" + app_suffix || *first_arg == app_suffix)
{
argv.erase(first_arg);
return true;
}
}
/// Use app if clickhouse binary is run through symbolic link with name clickhouse-app
std::string app_name = "clickhouse-" + app_suffix;
return !argv.empty() && (app_name == argv[0] || endsWith(argv[0], "/" + app_name));
}
/// Don't allow dlopen in the main ClickHouse binary, because it is harmful and insecure.
/// We don't use it. But it can be used by some libraries for implementation of "plugins".

View File

@ -30,3 +30,7 @@ endif()
clickhouse_program_add(server)
install(FILES config.xml users.xml DESTINATION "${CLICKHOUSE_ETC_DIR}/clickhouse-server" COMPONENT clickhouse)
if (ENABLE_FUZZING)
add_subdirectory(fuzzers)
endif()

View File

@ -0,0 +1,15 @@
clickhouse_add_executable(tcp_protocol_fuzzer tcp_protocol_fuzzer.cpp ../Server.cpp ../MetricsTransmitter.cpp)
set (TCP_PROTOCOL_FUZZER_LINK
PRIVATE
daemon
clickhouse_aggregate_functions
clickhouse_functions
clickhouse_table_functions
)
if (TARGET ch_contrib::jemalloc)
list(APPEND TCP_PROTOCOL_FUZZER_LINK PRIVATE ch_contrib::jemalloc)
endif()
target_link_libraries(tcp_protocol_fuzzer ${TCP_PROTOCOL_FUZZER_LINK})

View File

@ -0,0 +1,119 @@
#include <cstdint>
#include <future>
#include <thread>
#include <utility>
#include <vector>
#include <iostream>
#include <chrono>
#include <Poco/Net/PollSet.h>
#include <Poco/Net/SocketAddress.h>
#include <Poco/Net/StreamSocket.h>
#include <Interpreters/Context.h>
int mainEntryClickHouseServer(int argc, char ** argv);
static std::string clickhouse("clickhouse-server");
static std::vector<char *> args{clickhouse.data()};
static std::future<int> main_app;
static std::string s_host("0.0.0.0");
static char * host = s_host.data();
static int64_t port = 9000;
using namespace std::chrono_literals;
extern "C"
int LLVMFuzzerInitialize(int * argc, char ***argv)
{
for (int i = 1; i < *argc; ++i)
{
if ((*argv)[i][0] == '-')
{
if ((*argv)[i][1] == '-')
args.push_back((*argv)[i]);
else
{
if (strncmp((*argv)[i], "-host=", 6) == 0)
{
host = (*argv)[i] + 6;
}
else if (strncmp((*argv)[i], "-port=", 6) == 0)
{
char * p_end = nullptr;
port = strtol((*argv)[i] + 6, &p_end, 10);
}
}
}
}
args.push_back(nullptr);
main_app = std::async(std::launch::async, mainEntryClickHouseServer, args.size() - 1, args.data());
while (!DB::Context::getGlobalContextInstance() || !DB::Context::getGlobalContextInstance()->isServerCompletelyStarted())
{
std::this_thread::sleep_for(100ms);
if (main_app.wait_for(0s) == std::future_status::ready)
exit(-1);
}
return 0;
}
extern "C"
int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
{
try
{
if (main_app.wait_for(0s) == std::future_status::ready)
return -1;
if (size == 0)
return -1;
Poco::Net::SocketAddress address(host, port);
Poco::Net::StreamSocket socket;
socket.connectNB(address);
Poco::Net::PollSet ps;
ps.add(socket, Poco::Net::PollSet::POLL_READ | Poco::Net::PollSet::POLL_WRITE);
std::vector<char> buf(1048576);
size_t sent = 0;
while (true)
{
auto m = ps.poll(Poco::Timespan(1000000));
if (m.empty())
continue;
if (m.begin()->second & Poco::Net::PollSet::POLL_READ)
{
if (int n = socket.receiveBytes(buf.data(), static_cast<int>(buf.size())); n == 0)
{
socket.close();
break;
}
continue;
}
if (sent < size && m.begin()->second & Poco::Net::PollSet::POLL_WRITE)
{
sent += socket.sendBytes(data + sent, static_cast<int>(size - sent));
if (sent == size)
{
socket.shutdownSend();
continue;
}
}
}
}
catch (...)
{
}
return 0;
}

View File

@ -1,2 +1,2 @@
clickhouse_add_executable(aggregate_function_state_deserialization_fuzzer aggregate_function_state_deserialization_fuzzer.cpp ${SRCS})
target_link_libraries(aggregate_function_state_deserialization_fuzzer PRIVATE dbms clickhouse_aggregate_functions ${LIB_FUZZING_ENGINE})
target_link_libraries(aggregate_function_state_deserialization_fuzzer PRIVATE dbms clickhouse_aggregate_functions)

View File

@ -6,6 +6,7 @@
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Common/Arena.h>
#include <Common/MemoryTracker.h>
#include <Common/CurrentThread.h>
@ -16,68 +17,69 @@
#include <base/scope_guard.h>
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
using namespace DB;
static SharedContextHolder shared_context;
static ContextMutablePtr context;
auto initialize = [&]() mutable
try
{
shared_context = Context::createShared();
context = Context::createGlobal(shared_context.get());
context->makeGlobalContext();
context->setApplicationType(Context::ApplicationType::LOCAL);
using namespace DB;
MainThreadStatus::getInstance();
static SharedContextHolder shared_context;
static ContextMutablePtr context;
registerAggregateFunctions();
return true;
};
auto initialize = [&]() mutable
{
shared_context = Context::createShared();
context = Context::createGlobal(shared_context.get());
context->makeGlobalContext();
context->setApplicationType(Context::ApplicationType::LOCAL);
static bool initialized = initialize();
(void) initialized;
MainThreadStatus::getInstance();
total_memory_tracker.resetCounters();
total_memory_tracker.setHardLimit(1_GiB);
CurrentThread::get().memory_tracker.resetCounters();
CurrentThread::get().memory_tracker.setHardLimit(1_GiB);
registerAggregateFunctions();
return true;
};
/// The input format is as follows:
/// - the aggregate function name on the first line, possible with parameters, then data types of the arguments,
/// example: quantile(0.5), Float64
/// - the serialized aggregation state for the rest of the input.
static bool initialized = initialize();
(void) initialized;
/// Compile the code as follows:
/// mkdir build_asan_fuzz
/// cd build_asan_fuzz
/// CC=clang CXX=clang++ cmake -D SANITIZE=address -D ENABLE_FUZZING=1 -D WITH_COVERAGE=1 ..
///
/// The corpus is located here:
/// https://github.com/ClickHouse/fuzz-corpus/tree/main/aggregate_function_state_deserialization
///
/// The fuzzer can be run as follows:
/// ../../../build_asan_fuzz/src/DataTypes/fuzzers/aggregate_function_state_deserialization corpus -jobs=64 -rss_limit_mb=8192
total_memory_tracker.resetCounters();
total_memory_tracker.setHardLimit(1_GiB);
CurrentThread::get().memory_tracker.resetCounters();
CurrentThread::get().memory_tracker.setHardLimit(1_GiB);
DB::ReadBufferFromMemory in(data, size);
/// The input format is as follows:
/// - the aggregate function name on the first line, possible with parameters, then data types of the arguments,
/// example: quantile(0.5), Float64
/// - the serialized aggregation state for the rest of the input.
String args;
readStringUntilNewlineInto(args, in);
assertChar('\n', in);
/// Compile the code as follows:
/// mkdir build_asan_fuzz
/// cd build_asan_fuzz
/// CC=clang CXX=clang++ cmake -D SANITIZE=address -D ENABLE_FUZZING=1 -D WITH_COVERAGE=1 ..
///
/// The corpus is located here:
/// https://github.com/ClickHouse/fuzz-corpus/tree/main/aggregate_function_state_deserialization
///
/// The fuzzer can be run as follows:
/// ../../../build_asan_fuzz/src/DataTypes/fuzzers/aggregate_function_state_deserialization corpus -jobs=64 -rss_limit_mb=8192
DataTypePtr type = DataTypeFactory::instance().get(fmt::format("AggregateFunction({})", args));
AggregateFunctionPtr func = assert_cast<const DataTypeAggregateFunction &>(*type).getFunction();
DB::ReadBufferFromMemory in(data, size);
Arena arena;
char * place = arena.alignedAlloc(func->sizeOfData(), func->alignOfData());
func->create(place);
SCOPE_EXIT(func->destroy(place));
func->deserialize(place, in, {}, &arena);
String args;
readStringUntilNewlineInto(args, in);
assertChar('\n', in);
DataTypePtr type = DataTypeFactory::instance().get(fmt::format("AggregateFunction({})", args));
AggregateFunctionPtr func = assert_cast<const DataTypeAggregateFunction &>(*type).getFunction();
Arena arena;
char * place = arena.alignedAlloc(func->sizeOfData(), func->alignOfData());
func->create(place);
SCOPE_EXIT(func->destroy(place));
func->deserialize(place, in, {}, &arena);
}
catch (...)
{
}
return 0;
}
catch (...)
{
return 1;
}

View File

@ -0,0 +1,195 @@
#include "UniqToCountPass.h"
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/QueryNode.h>
namespace DB
{
namespace
{
bool matchFnUniq(String func_name)
{
auto name = Poco::toLower(func_name);
return name == "uniq" || name == "uniqHLL12" || name == "uniqExact" || name == "uniqTheta" || name == "uniqCombined"
|| name == "uniqCombined64";
}
/// Extract the corresponding projection columns for group by node list.
/// For example:
/// SELECT a as aa, any(b) FROM table group by a; -> aa(ColumnNode)
NamesAndTypes extractProjectionColumnsForGroupBy(const QueryNode * query_node)
{
if (!query_node->hasGroupBy())
return {};
NamesAndTypes result;
for (const auto & group_by_ele : query_node->getGroupByNode()->getChildren())
{
const auto & projection_columns = query_node->getProjectionColumns();
const auto & projection_nodes = query_node->getProjection().getNodes();
assert(projection_columns.size() == projection_nodes.size());
for (size_t i = 0; i < projection_columns.size(); i++)
{
if (projection_nodes[i]->isEqual(*group_by_ele))
result.push_back(projection_columns[i]);
}
}
return result;
}
/// Whether query_columns equals subquery_columns.
/// query_columns: query columns from query
/// subquery_columns: projection columns from subquery
bool nodeListEquals(const QueryTreeNodes & query_columns, const NamesAndTypes & subquery_columns)
{
if (query_columns.size() != subquery_columns.size())
return false;
for (const auto & query_column : query_columns)
{
auto find = std::find_if(
subquery_columns.begin(),
subquery_columns.end(),
[&](const auto & subquery_column) -> bool
{
if (auto * column_node = query_column->as<ColumnNode>())
{
return subquery_column == column_node->getColumn();
}
return false;
});
if (find == subquery_columns.end())
return false;
}
return true;
}
/// Whether subquery_columns contains all columns in subquery_columns.
/// query_columns: query columns from query
/// subquery_columns: projection columns from subquery
bool nodeListContainsAll(const QueryTreeNodes & query_columns, const NamesAndTypes & subquery_columns)
{
if (query_columns.size() > subquery_columns.size())
return false;
for (const auto & query_column : query_columns)
{
auto find = std::find_if(
subquery_columns.begin(),
subquery_columns.end(),
[&](const auto & subquery_column) -> bool
{
if (auto * column_node = query_column->as<ColumnNode>())
{
return subquery_column == column_node->getColumn();
}
return false;
});
if (find == subquery_columns.end())
return false;
}
return true;
}
}
class UniqToCountVisitor : public InDepthQueryTreeVisitorWithContext<UniqToCountVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<UniqToCountVisitor>;
using Base::Base;
void enterImpl(QueryTreeNodePtr & node)
{
if (!getSettings().optimize_uniq_to_count)
return;
auto * query_node = node->as<QueryNode>();
if (!query_node)
return;
/// Check that query has only single table expression which is subquery
auto * subquery_node = query_node->getJoinTree()->as<QueryNode>();
if (!subquery_node)
return;
/// Check that query has only single node in projection
auto & projection_nodes = query_node->getProjection().getNodes();
if (projection_nodes.size() != 1)
return;
/// Check that projection_node is a function
auto & projection_node = projection_nodes[0];
auto * function_node = projection_node->as<FunctionNode>();
if (!function_node)
return;
/// Check that query single projection node is `uniq` or its variants
if (!matchFnUniq(function_node->getFunctionName()))
return;
auto & uniq_arguments_nodes = function_node->getArguments().getNodes();
/// Whether query matches 'SELECT uniq(x ...) FROM (SELECT DISTINCT x ...)'
auto match_subquery_with_distinct = [&]() -> bool
{
if (!subquery_node->isDistinct())
return false;
/// uniq expression list == subquery projection columns
if (!nodeListEquals(uniq_arguments_nodes, subquery_node->getProjectionColumns()))
return false;
return true;
};
/// Whether query matches 'SELECT uniq(x ...) FROM (SELECT x ... GROUP BY x ...)'
auto match_subquery_with_group_by = [&]() -> bool
{
if (!subquery_node->hasGroupBy())
return false;
/// uniq argument node list == subquery group by node list
auto group_by_columns = extractProjectionColumnsForGroupBy(subquery_node);
if (!nodeListEquals(uniq_arguments_nodes, group_by_columns))
return false;
/// subquery projection columns must contain all columns in uniq argument node list
if (!nodeListContainsAll(uniq_arguments_nodes, subquery_node->getProjectionColumns()))
return false;
return true;
};
/// Replace uniq of initial query to count
if (match_subquery_with_distinct() || match_subquery_with_group_by())
{
AggregateFunctionProperties properties;
auto aggregate_function = AggregateFunctionFactory::instance().get("count", {}, {}, properties);
function_node->getArguments().getNodes().clear();
function_node->resolveAsAggregateFunction(std::move(aggregate_function));
}
}
};
void UniqToCountPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
{
UniqToCountVisitor visitor(context);
visitor.visit(query_tree_node);
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <Analyzer/IQueryTreePass.h>
namespace DB
{
/** Optimize `uniq` and its variants(except uniqUpTo) into `count` over subquery.
* Example: 'SELECT uniq(x ...) FROM (SELECT DISTINCT x ...)' to
* Result: 'SELECT count() FROM (SELECT DISTINCT x ...)'
*
* Example: 'SELECT uniq(x ...) FROM (SELECT x ... GROUP BY x ...)' to
* Result: 'SELECT count() FROM (SELECT x ... GROUP BY x ...)'
*
* Note that we can rewrite all uniq variants except uniqUpTo.
*/
class UniqToCountPass final : public IQueryTreePass
{
public:
String getName() override { return "UniqToCount"; }
String getDescription() override
{
return "Rewrite uniq and its variants(except uniqUpTo) to count if subquery has distinct or group by clause.";
}
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
};
}

View File

@ -18,6 +18,7 @@
#include <Analyzer/Utils.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/Passes/CountDistinctPass.h>
#include <Analyzer/Passes/UniqToCountPass.h>
#include <Analyzer/Passes/FunctionToSubcolumnsPass.h>
#include <Analyzer/Passes/RewriteAggregateFunctionWithIfPass.h>
#include <Analyzer/Passes/SumIfToCountIfPass.h>
@ -247,6 +248,7 @@ void addQueryTreePasses(QueryTreePassManager & manager)
manager.addPass(std::make_unique<ConvertLogicalExpressionToCNFPass>());
manager.addPass(std::make_unique<CountDistinctPass>());
manager.addPass(std::make_unique<UniqToCountPass>());
manager.addPass(std::make_unique<RewriteAggregateFunctionWithIfPass>());
manager.addPass(std::make_unique<SumIfToCountIfPass>());
manager.addPass(std::make_unique<RewriteArrayExistsToHasPass>());

View File

@ -1147,7 +1147,18 @@ void ClientBase::onEndOfStream()
bool is_running = false;
output_format->setStartTime(
clock_gettime_ns(CLOCK_MONOTONIC) - static_cast<UInt64>(progress_indication.elapsedSeconds() * 1000000000), is_running);
output_format->finalize();
try
{
output_format->finalize();
}
catch (...)
{
/// Format should be reset to make it work for subsequent query
/// (otherwise it will throw again in resetOutput())
output_format.reset();
throw;
}
}
resetOutput();

View File

@ -1013,8 +1013,8 @@ Packet Connection::receivePacket()
case Protocol::Server::ReadTaskRequest:
return res;
case Protocol::Server::MergeTreeAllRangesAnnounecement:
res.announcement = receiveInitialParallelReadAnnounecement();
case Protocol::Server::MergeTreeAllRangesAnnouncement:
res.announcement = receiveInitialParallelReadAnnouncement();
return res;
case Protocol::Server::MergeTreeReadTaskRequest:
@ -1181,7 +1181,7 @@ ParallelReadRequest Connection::receiveParallelReadRequest() const
return ParallelReadRequest::deserialize(*in);
}
InitialAllRangesAnnouncement Connection::receiveInitialParallelReadAnnounecement() const
InitialAllRangesAnnouncement Connection::receiveInitialParallelReadAnnouncement() const
{
return InitialAllRangesAnnouncement::deserialize(*in);
}

View File

@ -276,7 +276,7 @@ private:
std::unique_ptr<Exception> receiveException() const;
Progress receiveProgress() const;
ParallelReadRequest receiveParallelReadRequest() const;
InitialAllRangesAnnouncement receiveInitialParallelReadAnnounecement() const;
InitialAllRangesAnnouncement receiveInitialParallelReadAnnouncement() const;
ProfileInfo receiveProfileInfo() const;
void initInputBuffers();

View File

@ -113,14 +113,15 @@ ConnectionPoolWithFailover::Status ConnectionPoolWithFailover::getStatus() const
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const ConnectionTimeouts & timeouts,
const Settings * settings,
PoolMode pool_mode,
AsyncCallback async_callback)
AsyncCallback async_callback,
std::optional<bool> skip_unavailable_endpoints)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, timeouts, fail_message, settings, nullptr, async_callback);
};
std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry);
std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints);
std::vector<Entry> entries;
entries.reserve(results.size());
@ -146,14 +147,15 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
const ConnectionTimeouts & timeouts,
const Settings * settings, PoolMode pool_mode,
const QualifiedTableName & table_to_check,
AsyncCallback async_callback)
AsyncCallback async_callback,
std::optional<bool> skip_unavailable_endpoints)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, async_callback);
};
return getManyImpl(settings, pool_mode, try_get_entry);
return getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints);
}
ConnectionPoolWithFailover::Base::GetPriorityFunc ConnectionPoolWithFailover::makeGetPriorityFunc(const Settings * settings)
@ -172,13 +174,18 @@ ConnectionPoolWithFailover::Base::GetPriorityFunc ConnectionPoolWithFailover::ma
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyImpl(
const Settings * settings,
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry)
const TryGetEntryFunc & try_get_entry,
std::optional<bool> skip_unavailable_endpoints)
{
if (nested_pools.empty())
throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
"Cannot get connection from ConnectionPoolWithFailover cause nested pools are empty");
size_t min_entries = (settings && settings->skip_unavailable_shards) ? 0 : 1;
if (!skip_unavailable_endpoints.has_value())
skip_unavailable_endpoints = (settings && settings->skip_unavailable_shards);
size_t min_entries = skip_unavailable_endpoints.value() ? 0 : 1;
size_t max_tries = (settings ?
size_t{settings->connections_with_failover_max_tries} :
size_t{DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES});

View File

@ -55,7 +55,8 @@ public:
*/
std::vector<Entry> getMany(const ConnectionTimeouts & timeouts,
const Settings * settings, PoolMode pool_mode,
AsyncCallback async_callback = {});
AsyncCallback async_callback = {},
std::optional<bool> skip_unavailable_endpoints = std::nullopt);
/// The same as getMany(), but return std::vector<TryResult>.
std::vector<TryResult> getManyForTableFunction(const ConnectionTimeouts & timeouts,
@ -71,7 +72,8 @@ public:
const Settings * settings,
PoolMode pool_mode,
const QualifiedTableName & table_to_check,
AsyncCallback async_callback = {});
AsyncCallback async_callback = {},
std::optional<bool> skip_unavailable_endpoints = std::nullopt);
struct NestedPoolStatus
{
@ -98,7 +100,8 @@ private:
std::vector<TryResult> getManyImpl(
const Settings * settings,
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry);
const TryGetEntryFunc & try_get_entry,
std::optional<bool> skip_unavailable_endpoints = std::nullopt);
/// Try to get a connection from the pool and check that it is good.
/// If table_to_check is not null and the check is enabled in settings, check that replication delay

View File

@ -260,7 +260,7 @@ Packet MultiplexedConnections::drain()
switch (packet.type)
{
case Protocol::Server::TimezoneUpdate:
case Protocol::Server::MergeTreeAllRangesAnnounecement:
case Protocol::Server::MergeTreeAllRangesAnnouncement:
case Protocol::Server::MergeTreeReadTaskRequest:
case Protocol::Server::ReadTaskRequest:
case Protocol::Server::PartUUIDs:
@ -339,7 +339,7 @@ Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callbac
switch (packet.type)
{
case Protocol::Server::TimezoneUpdate:
case Protocol::Server::MergeTreeAllRangesAnnounecement:
case Protocol::Server::MergeTreeAllRangesAnnouncement:
case Protocol::Server::MergeTreeReadTaskRequest:
case Protocol::Server::ReadTaskRequest:
case Protocol::Server::PartUUIDs:

View File

@ -429,17 +429,20 @@ PreformattedMessage getCurrentExceptionMessageAndPattern(bool with_stacktrace, b
}
catch (...) {}
// #ifdef ABORT_ON_LOGICAL_ERROR
// try
// {
// throw;
// }
// catch (const std::logic_error &)
// {
// abortOnFailedAssertion(stream.str());
// }
// catch (...) {}
// #endif
#ifdef ABORT_ON_LOGICAL_ERROR
try
{
throw;
}
catch (const std::logic_error &)
{
if (!with_stacktrace)
stream << ", Stack trace:\n\n" << getExceptionStackTraceString(e);
abortOnFailedAssertion(stream.str());
}
catch (...) {}
#endif
}
catch (...)
{

View File

@ -119,7 +119,7 @@ public:
return true;
}
template <typename TValue = Value, bool = true, typename... Args>
template <typename TValue = Value, typename... Args>
requires(!std::is_same_v<TValue, IntervalTreeVoidValue>)
ALWAYS_INLINE bool emplace(Interval interval, Args &&... args)
{

View File

@ -1516,7 +1516,7 @@ bool ZooKeeper::hasReachedDeadline() const
void ZooKeeper::maybeInjectSendFault()
{
if (unlikely(inject_setup.test() && send_inject_fault && send_inject_fault.value()(thread_local_rng)))
throw Exception::fromMessage(Error::ZSESSIONEXPIRED, "Session expired (fault injected on recv)");
throw Exception::fromMessage(Error::ZSESSIONEXPIRED, "Session expired (fault injected on send)");
}
void ZooKeeper::maybeInjectRecvFault()

View File

@ -5,16 +5,16 @@
# If you want really small size of the resulted binary, just link with fuzz_compression and clickhouse_common_io
clickhouse_add_executable (compressed_buffer_fuzzer compressed_buffer_fuzzer.cpp)
target_link_libraries (compressed_buffer_fuzzer PRIVATE dbms ${LIB_FUZZING_ENGINE})
target_link_libraries (compressed_buffer_fuzzer PRIVATE dbms)
clickhouse_add_executable (lz4_decompress_fuzzer lz4_decompress_fuzzer.cpp)
target_link_libraries (lz4_decompress_fuzzer PUBLIC dbms ch_contrib::lz4 ${LIB_FUZZING_ENGINE})
target_link_libraries (lz4_decompress_fuzzer PUBLIC dbms ch_contrib::lz4)
clickhouse_add_executable (delta_decompress_fuzzer delta_decompress_fuzzer.cpp)
target_link_libraries (delta_decompress_fuzzer PRIVATE dbms ${LIB_FUZZING_ENGINE})
target_link_libraries (delta_decompress_fuzzer PRIVATE dbms)
clickhouse_add_executable (double_delta_decompress_fuzzer double_delta_decompress_fuzzer.cpp)
target_link_libraries (double_delta_decompress_fuzzer PRIVATE dbms ${LIB_FUZZING_ENGINE})
target_link_libraries (double_delta_decompress_fuzzer PRIVATE dbms)
clickhouse_add_executable (encrypted_decompress_fuzzer encrypted_decompress_fuzzer.cpp)
target_link_libraries (encrypted_decompress_fuzzer PRIVATE dbms ${LIB_FUZZING_ENGINE})
target_link_libraries (encrypted_decompress_fuzzer PRIVATE dbms)

View File

@ -4,17 +4,18 @@
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
DB::ReadBufferFromMemory from(data, size);
DB::CompressedReadBuffer in{from};
try
{
DB::ReadBufferFromMemory from(data, size);
DB::CompressedReadBuffer in{from};
while (!in.eof())
in.next();
while (!in.eof())
in.next();
}
catch (...)
{
}
return 0;
}
catch (...)
{
return 1;
}

View File

@ -15,29 +15,30 @@ struct AuxiliaryRandomData
};
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
if (size < sizeof(AuxiliaryRandomData))
return 0;
try
{
if (size < sizeof(AuxiliaryRandomData))
return 0;
const auto * p = reinterpret_cast<const AuxiliaryRandomData *>(data);
auto codec = DB::getCompressionCodecDelta(p->delta_size_bytes);
const auto * p = reinterpret_cast<const AuxiliaryRandomData *>(data);
auto codec = DB::getCompressionCodecDelta(p->delta_size_bytes);
size_t output_buffer_size = p->decompressed_size % 65536;
size -= sizeof(AuxiliaryRandomData);
data += sizeof(AuxiliaryRandomData) / sizeof(uint8_t);
size_t output_buffer_size = p->decompressed_size % 65536;
size -= sizeof(AuxiliaryRandomData);
data += sizeof(AuxiliaryRandomData) / sizeof(uint8_t);
// std::string input = std::string(reinterpret_cast<const char*>(data), size);
// fmt::print(stderr, "Using input {} of size {}, output size is {}. \n", input, size, output_buffer_size);
// std::string input = std::string(reinterpret_cast<const char*>(data), size);
// fmt::print(stderr, "Using input {} of size {}, output size is {}. \n", input, size, output_buffer_size);
DB::Memory<> memory;
memory.resize(output_buffer_size + codec->getAdditionalSizeAtTheEndOfBuffer());
DB::Memory<> memory;
memory.resize(output_buffer_size + codec->getAdditionalSizeAtTheEndOfBuffer());
codec->doDecompressData(reinterpret_cast<const char *>(data), size, memory.data(), output_buffer_size);
codec->doDecompressData(reinterpret_cast<const char *>(data), static_cast<UInt32>(size), memory.data(), static_cast<UInt32>(output_buffer_size));
}
catch (...)
{
}
return 0;
}
catch (...)
{
return 1;
}

View File

@ -15,29 +15,30 @@ struct AuxiliaryRandomData
};
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
if (size < sizeof(AuxiliaryRandomData))
return 0;
try
{
if (size < sizeof(AuxiliaryRandomData))
return 0;
const auto * p = reinterpret_cast<const AuxiliaryRandomData *>(data);
auto codec = DB::getCompressionCodecDoubleDelta(p->data_bytes_size);
const auto * p = reinterpret_cast<const AuxiliaryRandomData *>(data);
auto codec = DB::getCompressionCodecDoubleDelta(p->data_bytes_size);
size_t output_buffer_size = p->decompressed_size % 65536;
size -= sizeof(AuxiliaryRandomData);
data += sizeof(AuxiliaryRandomData) / sizeof(uint8_t);
size_t output_buffer_size = p->decompressed_size % 65536;
size -= sizeof(AuxiliaryRandomData);
data += sizeof(AuxiliaryRandomData) / sizeof(uint8_t);
// std::string input = std::string(reinterpret_cast<const char*>(data), size);
// fmt::print(stderr, "Using input {} of size {}, output size is {}. \n", input, size, output_buffer_size);
// std::string input = std::string(reinterpret_cast<const char*>(data), size);
// fmt::print(stderr, "Using input {} of size {}, output size is {}. \n", input, size, output_buffer_size);
DB::Memory<> memory;
memory.resize(output_buffer_size + codec->getAdditionalSizeAtTheEndOfBuffer());
DB::Memory<> memory;
memory.resize(output_buffer_size + codec->getAdditionalSizeAtTheEndOfBuffer());
codec->doDecompressData(reinterpret_cast<const char *>(data), size, memory.data(), output_buffer_size);
codec->doDecompressData(reinterpret_cast<const char *>(data), static_cast<UInt32>(size), memory.data(), static_cast<UInt32>(output_buffer_size));
}
catch (...)
{
}
return 0;
}
catch (...)
{
return 1;
}

View File

@ -271,33 +271,35 @@ void XMLGenerator::generate()
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
XMLGenerator generator(data, size);
try
{
XMLGenerator generator(data, size);
generator.generate();
if (generator.hasError())
return 0;
generator.generate();
if (generator.hasError())
return 0;
auto config = generator.getResult();
auto codec_128 = getCompressionCodecEncrypted(DB::AES_128_GCM_SIV);
auto codec_256 = getCompressionCodecEncrypted(DB::AES_256_GCM_SIV);
DB::CompressionCodecEncrypted::Configuration::instance().tryLoad(*config, "");
auto config = generator.getResult();
auto codec_128 = getCompressionCodecEncrypted(DB::AES_128_GCM_SIV);
auto codec_256 = getCompressionCodecEncrypted(DB::AES_256_GCM_SIV);
DB::CompressionCodecEncrypted::Configuration::instance().tryLoad(*config, "");
size_t data_size = size - generator.keySize();
size_t data_size = size - generator.keySize();
std::string input = std::string(reinterpret_cast<const char*>(data), data_size);
fmt::print(stderr, "Using input {} of size {}, output size is {}. \n", input, data_size, input.size() - 31);
std::string input = std::string(reinterpret_cast<const char*>(data), data_size);
fmt::print(stderr, "Using input {} of size {}, output size is {}. \n", input, data_size, input.size() - 31);
DB::Memory<> memory;
memory.resize(input.size() + codec_128->getAdditionalSizeAtTheEndOfBuffer());
codec_128->doDecompressData(input.data(), input.size(), memory.data(), input.size() - 31);
DB::Memory<> memory;
memory.resize(input.size() + codec_128->getAdditionalSizeAtTheEndOfBuffer());
codec_128->doDecompressData(input.data(), static_cast<UInt32>(input.size()), memory.data(), static_cast<UInt32>(input.size()) - 31);
memory.resize(input.size() + codec_128->getAdditionalSizeAtTheEndOfBuffer());
codec_256->doDecompressData(input.data(), static_cast<UInt32>(input.size()), memory.data(), static_cast<UInt32>(input.size()) - 31);
}
catch (...)
{
}
memory.resize(input.size() + codec_128->getAdditionalSizeAtTheEndOfBuffer());
codec_256->doDecompressData(input.data(), input.size(), memory.data(), input.size() - 31);
return 0;
}
catch (...)
{
return 1;
}

View File

@ -16,31 +16,31 @@ struct AuxiliaryRandomData
};
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
try
{
if (size < sizeof(AuxiliaryRandomData) + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER)
return 0;
if (size < sizeof(AuxiliaryRandomData) + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER)
return 0;
const auto * p = reinterpret_cast<const AuxiliaryRandomData *>(data);
auto codec = DB::getCompressionCodecLZ4(static_cast<int>(p->level));
const auto * p = reinterpret_cast<const AuxiliaryRandomData *>(data);
auto codec = DB::getCompressionCodecLZ4(p->level);
size_t output_buffer_size = p->decompressed_size % 65536;
size -= sizeof(AuxiliaryRandomData);
size -= LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER;
data += sizeof(AuxiliaryRandomData) / sizeof(uint8_t);
size_t output_buffer_size = p->decompressed_size % 65536;
size -= sizeof(AuxiliaryRandomData);
size -= LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER;
data += sizeof(AuxiliaryRandomData) / sizeof(uint8_t);
// std::string input = std::string(reinterpret_cast<const char*>(data), size);
// fmt::print(stderr, "Using input {} of size {}, output size is {}. \n", input, size, output_buffer_size);
// std::string input = std::string(reinterpret_cast<const char*>(data), size);
// fmt::print(stderr, "Using input {} of size {}, output size is {}. \n", input, size, output_buffer_size);
DB::Memory<> memory;
memory.resize(output_buffer_size + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
DB::Memory<> memory;
memory.resize(output_buffer_size + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
codec->doDecompressData(reinterpret_cast<const char *>(data), size, memory.data(), output_buffer_size);
codec->doDecompressData(reinterpret_cast<const char *>(data), static_cast<UInt32>(size), memory.data(), static_cast<UInt32>(output_buffer_size));
}
catch (...)
{
}
return 0;
}
catch (...)
{
return 1;
}

View File

@ -870,7 +870,7 @@ NearestFieldType<std::decay_t<T>> & Field::get()
// Disregard signedness when converting between int64 types.
constexpr Field::Types::Which target = TypeToEnum<StoredType>::value;
if (target != which
&& (!isInt64OrUInt64orBoolFieldType(target) || !isInt64OrUInt64orBoolFieldType(which)))
&& (!isInt64OrUInt64orBoolFieldType(target) || !isInt64OrUInt64orBoolFieldType(which)) && target != Field::Types::IPv4)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid Field get from type {} to type {}", which, target);
#endif

View File

@ -81,7 +81,7 @@ namespace Protocol
/// This is such an inverted logic, where server sends requests
/// And client returns back response
ProfileEvents = 14, /// Packet with profile events from server.
MergeTreeAllRangesAnnounecement = 15,
MergeTreeAllRangesAnnouncement = 15,
MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator
TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone
MAX = TimezoneUpdate,
@ -110,7 +110,7 @@ namespace Protocol
"PartUUIDs",
"ReadTaskRequest",
"ProfileEvents",
"MergeTreeAllRangesAnnounecement",
"MergeTreeAllRangesAnnouncement",
"MergeTreeReadTaskRequest",
"TimezoneUpdate",
};

View File

@ -775,6 +775,7 @@ class IColumn;
M(Bool, function_json_value_return_type_allow_nullable, false, "Allow function JSON_VALUE to return nullable type.", 0) \
M(Bool, function_json_value_return_type_allow_complex, false, "Allow function JSON_VALUE to return complex type, such as: struct, array, map.", 0) \
M(Bool, use_with_fill_by_sorting_prefix, true, "Columns preceding WITH FILL columns in ORDER BY clause form sorting prefix. Rows with different values in sorting prefix are filled independently", 0) \
M(Bool, optimize_uniq_to_count, true, "Rewrite uniq and its variants(except uniqUpTo) to count if subquery has distinct or group by clause.", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \

View File

@ -1,2 +1,2 @@
clickhouse_add_executable (names_and_types_fuzzer names_and_types_fuzzer.cpp)
target_link_libraries (names_and_types_fuzzer PRIVATE dbms ${LIB_FUZZING_ENGINE})
target_link_libraries (names_and_types_fuzzer PRIVATE dbms)

View File

@ -3,15 +3,16 @@
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
DB::ReadBufferFromMemory in(data, size);
DB::NamesAndTypesList res;
res.readText(in);
try
{
DB::ReadBufferFromMemory in(data, size);
DB::NamesAndTypesList res;
res.readText(in);
}
catch (...)
{
}
return 0;
}
catch (...)
{
return 1;
}

View File

@ -189,7 +189,7 @@ void SerializationNullable::serializeBinary(const IColumn & column, size_t row_n
/// Deserialize value into ColumnNullable.
/// We need to insert both to nested column and to null byte map, or, in case of exception, to not insert at all.
template <typename ReturnType = void, typename CheckForNull, typename DeserializeNested, ReturnType * = nullptr>
template <typename ReturnType = void, typename CheckForNull, typename DeserializeNested>
requires std::same_as<ReturnType, void>
static ReturnType
safeDeserialize(IColumn & column, const ISerialization &, CheckForNull && check_for_null, DeserializeNested && deserialize_nested)
@ -217,7 +217,7 @@ safeDeserialize(IColumn & column, const ISerialization &, CheckForNull && check_
}
/// Deserialize value into non-nullable column. In case of NULL, insert default value and return false.
template <typename ReturnType = void, typename CheckForNull, typename DeserializeNested, ReturnType * = nullptr>
template <typename ReturnType = void, typename CheckForNull, typename DeserializeNested>
requires std::same_as<ReturnType, bool>
static ReturnType
safeDeserialize(IColumn & column, const ISerialization &, CheckForNull && check_for_null, DeserializeNested && deserialize_nested)

View File

@ -1,2 +1,2 @@
clickhouse_add_executable(data_type_deserialization_fuzzer data_type_deserialization_fuzzer.cpp ${SRCS})
target_link_libraries(data_type_deserialization_fuzzer PRIVATE dbms clickhouse_aggregate_functions ${LIB_FUZZING_ENGINE})
target_link_libraries(data_type_deserialization_fuzzer PRIVATE dbms clickhouse_aggregate_functions)

View File

@ -14,69 +14,70 @@
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
using namespace DB;
static SharedContextHolder shared_context;
static ContextMutablePtr context;
auto initialize = [&]() mutable
try
{
shared_context = Context::createShared();
context = Context::createGlobal(shared_context.get());
context->makeGlobalContext();
context->setApplicationType(Context::ApplicationType::LOCAL);
using namespace DB;
MainThreadStatus::getInstance();
static SharedContextHolder shared_context;
static ContextMutablePtr context;
registerAggregateFunctions();
return true;
};
auto initialize = [&]() mutable
{
shared_context = Context::createShared();
context = Context::createGlobal(shared_context.get());
context->makeGlobalContext();
context->setApplicationType(Context::ApplicationType::LOCAL);
static bool initialized = initialize();
(void) initialized;
MainThreadStatus::getInstance();
total_memory_tracker.resetCounters();
total_memory_tracker.setHardLimit(1_GiB);
CurrentThread::get().memory_tracker.resetCounters();
CurrentThread::get().memory_tracker.setHardLimit(1_GiB);
registerAggregateFunctions();
return true;
};
/// The input format is as follows:
/// - data type name on the first line,
/// - the data for the rest of the input.
static bool initialized = initialize();
(void) initialized;
/// Compile the code as follows:
/// mkdir build_asan_fuzz
/// cd build_asan_fuzz
/// CC=clang CXX=clang++ cmake -D SANITIZE=address -D ENABLE_FUZZING=1 -D WITH_COVERAGE=1 ..
///
/// The corpus is located here:
/// https://github.com/ClickHouse/fuzz-corpus/tree/main/data_type_deserialization
///
/// The fuzzer can be run as follows:
/// ../../../build_asan_fuzz/src/DataTypes/fuzzers/data_type_deserialization_fuzzer corpus -jobs=64 -rss_limit_mb=8192
total_memory_tracker.resetCounters();
total_memory_tracker.setHardLimit(1_GiB);
CurrentThread::get().memory_tracker.resetCounters();
CurrentThread::get().memory_tracker.setHardLimit(1_GiB);
/// clickhouse-local --query "SELECT toJSONString(*) FROM (SELECT name FROM system.functions UNION ALL SELECT name FROM system.data_type_families)" > dictionary
/// The input format is as follows:
/// - data type name on the first line,
/// - the data for the rest of the input.
DB::ReadBufferFromMemory in(data, size);
/// Compile the code as follows:
/// mkdir build_asan_fuzz
/// cd build_asan_fuzz
/// CC=clang CXX=clang++ cmake -D SANITIZE=address -D ENABLE_FUZZING=1 -D WITH_COVERAGE=1 ..
///
/// The corpus is located here:
/// https://github.com/ClickHouse/fuzz-corpus/tree/main/data_type_deserialization
///
/// The fuzzer can be run as follows:
/// ../../../build_asan_fuzz/src/DataTypes/fuzzers/data_type_deserialization_fuzzer corpus -jobs=64 -rss_limit_mb=8192
String data_type;
readStringUntilNewlineInto(data_type, in);
assertChar('\n', in);
/// clickhouse-local --query "SELECT toJSONString(*) FROM (SELECT name FROM system.functions UNION ALL SELECT name FROM system.data_type_families)" > dictionary
DataTypePtr type = DataTypeFactory::instance().get(data_type);
DB::ReadBufferFromMemory in(data, size);
FormatSettings settings;
settings.max_binary_string_size = 100;
settings.max_binary_array_size = 100;
String data_type;
readStringUntilNewlineInto(data_type, in);
assertChar('\n', in);
Field field;
type->getDefaultSerialization()->deserializeBinary(field, in, settings);
DataTypePtr type = DataTypeFactory::instance().get(data_type);
FormatSettings settings;
settings.max_binary_string_size = 100;
settings.max_binary_array_size = 100;
Field field;
type->getDefaultSerialization()->deserializeBinary(field, in, settings);
}
catch (...)
{
}
return 0;
}
catch (...)
{
return 1;
}

View File

@ -1,2 +1,2 @@
clickhouse_add_executable(format_fuzzer format_fuzzer.cpp ${SRCS})
target_link_libraries(format_fuzzer PRIVATE dbms clickhouse_aggregate_functions ${LIB_FUZZING_ENGINE})
target_link_libraries(format_fuzzer PRIVATE dbms clickhouse_aggregate_functions)

View File

@ -22,112 +22,113 @@
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
using namespace DB;
static SharedContextHolder shared_context;
static ContextMutablePtr context;
auto initialize = [&]() mutable
try
{
shared_context = Context::createShared();
context = Context::createGlobal(shared_context.get());
context->makeGlobalContext();
context->setApplicationType(Context::ApplicationType::LOCAL);
using namespace DB;
MainThreadStatus::getInstance();
static SharedContextHolder shared_context;
static ContextMutablePtr context;
registerAggregateFunctions();
registerFormats();
auto initialize = [&]() mutable
{
shared_context = Context::createShared();
context = Context::createGlobal(shared_context.get());
context->makeGlobalContext();
context->setApplicationType(Context::ApplicationType::LOCAL);
return true;
};
MainThreadStatus::getInstance();
static bool initialized = initialize();
(void) initialized;
registerAggregateFunctions();
registerFormats();
total_memory_tracker.resetCounters();
total_memory_tracker.setHardLimit(1_GiB);
CurrentThread::get().memory_tracker.resetCounters();
CurrentThread::get().memory_tracker.setHardLimit(1_GiB);
return true;
};
/// The input format is as follows:
/// - format name on the first line,
/// - table structure on the second line,
/// - the data for the rest of the input.
static bool initialized = initialize();
(void) initialized;
/** The corpus was generated as follows:
total_memory_tracker.resetCounters();
total_memory_tracker.setHardLimit(1_GiB);
CurrentThread::get().memory_tracker.resetCounters();
CurrentThread::get().memory_tracker.setHardLimit(1_GiB);
i=0; find ../../../../tests/queries -name '*.sql' |
xargs -I{} bash -c "tr '\n' ' ' <{}; echo" |
rg -o -i 'CREATE TABLE\s+\w+\s+\(.+?\) ENGINE' |
sed -r -e 's/CREATE TABLE\s+\w+\s+\((.+?)\) ENGINE/\1/i' | sort | uniq |
while read line; do
i=$((i+1));
clickhouse-local --query "SELECT name FROM system.formats ORDER BY rand() LIMIT 1" >> $i;
echo "$line" >> $i;
echo $RANDOM >> $i;
echo $i;
/// The input format is as follows:
/// - format name on the first line,
/// - table structure on the second line,
/// - the data for the rest of the input.
/** The corpus was generated as follows:
i=0; find ../../../../tests/queries -name '*.sql' |
xargs -I{} bash -c "tr '\n' ' ' <{}; echo" |
rg -o -i 'CREATE TABLE\s+\w+\s+\(.+?\) ENGINE' |
sed -r -e 's/CREATE TABLE\s+\w+\s+\((.+?)\) ENGINE/\1/i' | sort | uniq |
while read line; do
i=$((i+1));
clickhouse-local --query "SELECT name FROM system.formats ORDER BY rand() LIMIT 1" >> $i;
echo "$line" >> $i;
echo $RANDOM >> $i;
echo $i;
done
*/
/** And:
for format in $(clickhouse-client --query "SELECT name FROM system.formats WHERE is_output"); do
echo $format;
echo $format >> $format;
echo "WatchID Int64, JavaEnable Int16, Title String, GoodEvent Int16, EventTime DateTime, EventDate Date, CounterID Int32, ClientIP Int32, RegionID Int32, UserID Int64, CounterClass Int16, OS Int16, UserAgent Int16, URL String, Referer String, IsRefresh Int16, RefererCategoryID Int16, RefererRegionID Int32, URLCategoryID Int16, URLRegionID Int32, ResolutionWidth Int16, ResolutionHeight Int16, ResolutionDepth Int16, FlashMajor Int16, FlashMinor Int16, FlashMinor2 String, NetMajor Int16, NetMinor Int16, UserAgentMajor Int16, UserAgentMinor String, CookieEnable Int16, JavascriptEnable Int16, IsMobile Int16, MobilePhone Int16, MobilePhoneModel String, Params String, IPNetworkID Int32, TraficSourceID Int16, SearchEngineID Int16, SearchPhrase String, AdvEngineID Int16, IsArtifical Int16, WindowClientWidth Int16, WindowClientHeight Int16, ClientTimeZone Int16, ClientEventTime DateTime, SilverlightVersion1 Int16, SilverlightVersion2 Int16, SilverlightVersion3 Int32, SilverlightVersion4 Int16, PageCharset String, CodeVersion Int32, IsLink Int16, IsDownload Int16, IsNotBounce Int16, FUniqID Int64, OriginalURL String, HID Int32, IsOldCounter Int16, IsEvent Int16, IsParameter Int16, DontCountHits Int16, WithHash Int16, HitColor String, LocalEventTime DateTime, Age Int16, Sex Int16, Income Int16, Interests Int16, Robotness Int16, RemoteIP Int32, WindowName Int32, OpenerName Int32, HistoryLength Int16, BrowserLanguage String, BrowserCountry String, SocialNetwork String, SocialAction String, HTTPError Int16, SendTiming Int32, DNSTiming Int32, ConnectTiming Int32, ResponseStartTiming Int32, ResponseEndTiming Int32, FetchTiming Int32, SocialSourceNetworkID Int16, SocialSourcePage String, ParamPrice Int64, ParamOrderID String, ParamCurrency String, ParamCurrencyID Int16, OpenstatServiceName String, OpenstatCampaignID String, OpenstatAdID String, OpenstatSourceID String, UTMSource String, UTMMedium String, UTMCampaign String, UTMContent String, UTMTerm String, FromTag String, HasGCLID Int16, RefererHash Int64, URLHash Int64, CLID Int32" >> $format;
clickhouse-client --query "SELECT * FROM hits LIMIT 10 FORMAT $format" >> $format || rm $format;
done
*/
/** And:
*/
for format in $(clickhouse-client --query "SELECT name FROM system.formats WHERE is_output"); do
echo $format;
echo $format >> $format;
echo "WatchID Int64, JavaEnable Int16, Title String, GoodEvent Int16, EventTime DateTime, EventDate Date, CounterID Int32, ClientIP Int32, RegionID Int32, UserID Int64, CounterClass Int16, OS Int16, UserAgent Int16, URL String, Referer String, IsRefresh Int16, RefererCategoryID Int16, RefererRegionID Int32, URLCategoryID Int16, URLRegionID Int32, ResolutionWidth Int16, ResolutionHeight Int16, ResolutionDepth Int16, FlashMajor Int16, FlashMinor Int16, FlashMinor2 String, NetMajor Int16, NetMinor Int16, UserAgentMajor Int16, UserAgentMinor String, CookieEnable Int16, JavascriptEnable Int16, IsMobile Int16, MobilePhone Int16, MobilePhoneModel String, Params String, IPNetworkID Int32, TraficSourceID Int16, SearchEngineID Int16, SearchPhrase String, AdvEngineID Int16, IsArtifical Int16, WindowClientWidth Int16, WindowClientHeight Int16, ClientTimeZone Int16, ClientEventTime DateTime, SilverlightVersion1 Int16, SilverlightVersion2 Int16, SilverlightVersion3 Int32, SilverlightVersion4 Int16, PageCharset String, CodeVersion Int32, IsLink Int16, IsDownload Int16, IsNotBounce Int16, FUniqID Int64, OriginalURL String, HID Int32, IsOldCounter Int16, IsEvent Int16, IsParameter Int16, DontCountHits Int16, WithHash Int16, HitColor String, LocalEventTime DateTime, Age Int16, Sex Int16, Income Int16, Interests Int16, Robotness Int16, RemoteIP Int32, WindowName Int32, OpenerName Int32, HistoryLength Int16, BrowserLanguage String, BrowserCountry String, SocialNetwork String, SocialAction String, HTTPError Int16, SendTiming Int32, DNSTiming Int32, ConnectTiming Int32, ResponseStartTiming Int32, ResponseEndTiming Int32, FetchTiming Int32, SocialSourceNetworkID Int16, SocialSourcePage String, ParamPrice Int64, ParamOrderID String, ParamCurrency String, ParamCurrencyID Int16, OpenstatServiceName String, OpenstatCampaignID String, OpenstatAdID String, OpenstatSourceID String, UTMSource String, UTMMedium String, UTMCampaign String, UTMContent String, UTMTerm String, FromTag String, HasGCLID Int16, RefererHash Int64, URLHash Int64, CLID Int32" >> $format;
clickhouse-client --query "SELECT * FROM hits LIMIT 10 FORMAT $format" >> $format || rm $format;
done
/// Compile the code as follows:
/// mkdir build_asan_fuzz
/// cd build_asan_fuzz
/// CC=clang CXX=clang++ cmake -D SANITIZE=address -D ENABLE_FUZZING=1 -D WITH_COVERAGE=1 ..
///
/// The corpus is located here:
/// https://github.com/ClickHouse/fuzz-corpus/tree/main/format_fuzzer
///
/// The fuzzer can be run as follows:
/// ../../../build_asan_fuzz/src/Formats/fuzzers/format_fuzzer corpus -jobs=64 -rss_limit_mb=8192
*/
DB::ReadBufferFromMemory in(data, size);
/// Compile the code as follows:
/// mkdir build_asan_fuzz
/// cd build_asan_fuzz
/// CC=clang CXX=clang++ cmake -D SANITIZE=address -D ENABLE_FUZZING=1 -D WITH_COVERAGE=1 ..
///
/// The corpus is located here:
/// https://github.com/ClickHouse/fuzz-corpus/tree/main/format_fuzzer
///
/// The fuzzer can be run as follows:
/// ../../../build_asan_fuzz/src/Formats/fuzzers/format_fuzzer corpus -jobs=64 -rss_limit_mb=8192
String format;
readStringUntilNewlineInto(format, in);
assertChar('\n', in);
DB::ReadBufferFromMemory in(data, size);
String structure;
readStringUntilNewlineInto(structure, in);
assertChar('\n', in);
String format;
readStringUntilNewlineInto(format, in);
assertChar('\n', in);
ColumnsDescription description = parseColumnsListFromString(structure, context);
auto columns_info = description.getOrdinary();
String structure;
readStringUntilNewlineInto(structure, in);
assertChar('\n', in);
Block header;
for (const auto & info : columns_info)
{
ColumnWithTypeAndName column;
column.name = info.name;
column.type = info.type;
column.column = column.type->createColumn();
header.insert(std::move(column));
}
ColumnsDescription description = parseColumnsListFromString(structure, context);
auto columns_info = description.getOrdinary();
InputFormatPtr input_format = context->getInputFormat(format, in, header, 13 /* small block size */);
Block header;
for (const auto & info : columns_info)
{
ColumnWithTypeAndName column;
column.name = info.name;
column.type = info.type;
column.column = column.type->createColumn();
header.insert(std::move(column));
QueryPipeline pipeline(Pipe(std::move(input_format)));
PullingPipelineExecutor executor(pipeline);
Block res;
while (executor.pull(res))
;
}
catch (...)
{
}
InputFormatPtr input_format = context->getInputFormat(format, in, header, 13 /* small block size */);
QueryPipeline pipeline(Pipe(std::move(input_format)));
PullingPipelineExecutor executor(pipeline);
Block res;
while (executor.pull(res))
;
return 0;
}
catch (...)
{
return 1;
}

View File

@ -594,14 +594,14 @@ struct JavaHashImpl
static_cast<uint32_t>(x) ^ static_cast<uint32_t>(static_cast<uint64_t>(x) >> 32));
}
template <class T, T * = nullptr>
template <class T>
requires std::same_as<T, int8_t> || std::same_as<T, int16_t> || std::same_as<T, int32_t>
static ReturnType apply(T x)
{
return x;
}
template <class T, T * = nullptr>
template <class T>
requires(!std::same_as<T, int8_t> && !std::same_as<T, int16_t> && !std::same_as<T, int32_t>)
static ReturnType apply(T x)
{

View File

@ -89,7 +89,7 @@ public:
}
template <typename T, typename... Args>
requires (!std::same_as<T, DateTime64>)
requires(!std::same_as<T, DateTime64>)
inline auto execute(const T & t, Args &&... args) const
{
return wrapped_transform.execute(t, std::forward<Args>(args)...);

View File

@ -2743,6 +2743,8 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
Stopwatch watch;
LOG_DEBUG(shared->log, "Trying to establish a new connection with ZooKeeper");
shared->zookeeper = shared->zookeeper->startNewSession();
if (isServerCompletelyStarted())
shared->zookeeper->setServerCompletelyStarted();
LOG_DEBUG(shared->log, "Establishing a new connection with ZooKeeper took {} ms", watch.elapsedMilliseconds());
}

View File

@ -39,6 +39,7 @@
#include <Interpreters/QueryLog.h>
#include <Interpreters/replaceAliasColumnsInQuery.h>
#include <Interpreters/RewriteCountDistinctVisitor.h>
#include <Interpreters/RewriteUniqToCountVisitor.h>
#include <Interpreters/getCustomKeyFilterForParallelReplicas.h>
#include <QueryPipeline/Pipe.h>
@ -421,6 +422,12 @@ InterpreterSelectQuery::InterpreterSelectQuery(
RewriteCountDistinctFunctionVisitor(data_rewrite_countdistinct).visit(query_ptr);
}
if (settings.optimize_uniq_to_count)
{
RewriteUniqToCountMatcher::Data data_rewrite_uniq_count;
RewriteUniqToCountVisitor(data_rewrite_uniq_count).visit(query_ptr);
}
JoinedTables joined_tables(getSubqueryContext(context), getSelectQuery(), options.with_all_cols, options_.is_create_parameterized_view);
bool got_storage_from_query = false;
@ -463,12 +470,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}
/// Set skip_unavailable_shards to true only if it wasn't disabled explicitly
if (settings.allow_experimental_parallel_reading_from_replicas > 0 && !settings.skip_unavailable_shards && !settings.isChanged("skip_unavailable_shards"))
{
context->setSetting("skip_unavailable_shards", true);
}
/// Check support for JOIN for parallel replicas with custom key
if (joined_tables.tablesCount() > 1 && !settings.parallel_replicas_custom_key.value.empty())
{

View File

@ -0,0 +1,163 @@
#include <Interpreters/RewriteUniqToCountVisitor.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/parseQuery.h>
namespace DB
{
using Aliases = std::unordered_map<String, ASTPtr>;
namespace
{
bool matchFnUniq(String func_name)
{
auto name = Poco::toLower(func_name);
return name == "uniq" || name == "uniqHLL12" || name == "uniqExact" || name == "uniqTheta" || name == "uniqCombined"
|| name == "uniqCombined64";
}
bool expressionEquals(const ASTPtr & lhs, const ASTPtr & rhs, const Aliases & alias)
{
if (lhs->getTreeHash() == rhs->getTreeHash())
{
return true;
}
else
{
auto * lhs_idf = lhs->as<ASTIdentifier>();
auto * rhs_idf = rhs->as<ASTIdentifier>();
if (lhs_idf && rhs_idf)
{
/// compound identifiers, such as: <t.name, name>
if (lhs_idf->shortName() == rhs_idf->shortName())
return true;
/// translate alias
if (alias.find(lhs_idf->shortName()) != alias.end())
lhs_idf = alias.find(lhs_idf->shortName())->second->as<ASTIdentifier>();
if (alias.find(rhs_idf->shortName()) != alias.end())
rhs_idf = alias.find(rhs_idf->shortName())->second->as<ASTIdentifier>();
if (lhs_idf->shortName() == rhs_idf->shortName())
return true;
}
}
return false;
}
bool expressionListEquals(ASTExpressionList * lhs, ASTExpressionList * rhs, const Aliases & alias)
{
if (!lhs || !rhs)
return false;
if (lhs->children.size() != rhs->children.size())
return false;
for (size_t i = 0; i < lhs->children.size(); i++)
{
if (!expressionEquals(lhs->children[i], rhs->children[i], alias))
return false;
}
return true;
}
/// Test whether lhs contains all expressions in rhs.
bool expressionListContainsAll(ASTExpressionList * lhs, ASTExpressionList * rhs, const Aliases & alias)
{
if (!lhs || !rhs)
return false;
if (lhs->children.size() < rhs->children.size())
return false;
for (const auto & re : rhs->children)
{
auto predicate = [&re, &alias](ASTPtr & le) { return expressionEquals(le, re, alias); };
if (std::find_if(lhs->children.begin(), lhs->children.end(), predicate) == lhs->children.end())
return false;
}
return true;
}
}
void RewriteUniqToCountMatcher::visit(ASTPtr & ast, Data & /*data*/)
{
auto * selectq = ast->as<ASTSelectQuery>();
if (!selectq || !selectq->tables() || selectq->tables()->children.size() != 1)
return;
auto expr_list = selectq->select();
if (!expr_list || expr_list->children.size() != 1)
return;
auto * func = expr_list->children[0]->as<ASTFunction>();
if (!func || !matchFnUniq(func->name))
return;
if (selectq->tables()->as<ASTTablesInSelectQuery>()->children[0]->as<ASTTablesInSelectQueryElement>()->children.size() != 1)
return;
auto * table_expr = selectq->tables()
->as<ASTTablesInSelectQuery>()
->children[0]
->as<ASTTablesInSelectQueryElement>()
->children[0]
->as<ASTTableExpression>();
if (!table_expr || table_expr->children.size() != 1 || !table_expr->subquery)
return;
auto * subquery = table_expr->subquery->as<ASTSubquery>();
if (!subquery)
return;
auto * sub_selectq = subquery->children[0]
->as<ASTSelectWithUnionQuery>()->children[0]
->as<ASTExpressionList>()->children[0]
->as<ASTSelectQuery>();
if (!sub_selectq)
return;
auto sub_expr_list = sub_selectq->select();
if (!sub_expr_list)
return;
/// collect subquery select expressions alias
Aliases alias;
for (const auto & expr : sub_expr_list->children)
{
if (!expr->tryGetAlias().empty())
alias.insert({expr->tryGetAlias(), expr});
}
/// Whether query matches 'SELECT uniq(x ...) FROM (SELECT DISTINCT x ...)'
auto match_subquery_with_distinct = [&]() -> bool
{
if (!sub_selectq->distinct)
return false;
/// uniq expression list == subquery group by expression list
if (!expressionListEquals(func->children[0]->as<ASTExpressionList>(), sub_expr_list->as<ASTExpressionList>(), alias))
return false;
return true;
};
/// Whether query matches 'SELECT uniq(x ...) FROM (SELECT x ... GROUP BY x ...)'
auto match_subquery_with_group_by = [&]() -> bool
{
auto group_by = sub_selectq->groupBy();
if (!group_by)
return false;
/// uniq expression list == subquery group by expression list
if (!expressionListEquals(func->children[0]->as<ASTExpressionList>(), group_by->as<ASTExpressionList>(), alias))
return false;
/// subquery select expression list must contain all columns in uniq expression list
if (!expressionListContainsAll(sub_expr_list->as<ASTExpressionList>(), func->children[0]->as<ASTExpressionList>(), alias))
return false;
return true;
};
if (match_subquery_with_distinct() || match_subquery_with_group_by())
expr_list->children[0] = makeASTFunction("count");
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <Parsers/IAST.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include "Interpreters/TreeRewriter.h"
namespace DB
{
class ASTFunction;
/** Optimize `uniq` into `count` over subquery.
* Example: 'SELECT uniq(x ...) FROM (SELECT DISTINCT x ...)' to
* Result: 'SELECT count() FROM (SELECT DISTINCT x ...)'
*
* Example: 'SELECT uniq(x ...) FROM (SELECT x ... GROUP BY x ...)' to
* Result: 'SELECT count() FROM (SELECT x ... GROUP BY x ...)'
*
* Note that we can rewrite all uniq variants except uniqUpTo.
*/
class RewriteUniqToCountMatcher
{
public:
struct Data {};
static void visit(ASTPtr & ast, Data &);
static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return true; }
};
using RewriteUniqToCountVisitor = InDepthNodeVisitor<RewriteUniqToCountMatcher, true>;
}

View File

@ -283,6 +283,11 @@ Field convertFieldToTypeImpl(const Field & src, const IDataType & type, const ID
/// Already in needed type.
return src;
}
if (which_type.isIPv4() && src.getType() == Field::Types::UInt64)
{
/// convert to UInt32 which is the underlying type for native IPv4
return convertNumericType<UInt32>(src, type);
}
}
else if (which_type.isUUID() && src.getType() == Field::Types::UUID)
{

View File

@ -5,5 +5,4 @@ target_link_libraries(execute_query_fuzzer PRIVATE
clickhouse_table_functions
clickhouse_aggregate_functions
clickhouse_dictionaries
clickhouse_dictionaries_embedded
${LIB_FUZZING_ENGINE})
clickhouse_dictionaries_embedded)

View File

@ -13,43 +13,44 @@
using namespace DB;
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
std::string input = std::string(reinterpret_cast<const char*>(data), size);
static SharedContextHolder shared_context;
static ContextMutablePtr context;
auto initialize = [&]() mutable
try
{
shared_context = Context::createShared();
context = Context::createGlobal(shared_context.get());
context->makeGlobalContext();
context->setApplicationType(Context::ApplicationType::LOCAL);
std::string input = std::string(reinterpret_cast<const char*>(data), size);
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks(/* global_skip_access_check= */ true);
registerFormats();
static SharedContextHolder shared_context;
static ContextMutablePtr context;
return true;
};
auto initialize = [&]() mutable
{
shared_context = Context::createShared();
context = Context::createGlobal(shared_context.get());
context->makeGlobalContext();
context->setApplicationType(Context::ApplicationType::LOCAL);
static bool initialized = initialize();
(void) initialized;
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks(/* global_skip_access_check= */ true);
registerFormats();
auto io = DB::executeQuery(input, context, true, QueryProcessingStage::Complete);
return true;
};
PullingPipelineExecutor executor(io.pipeline);
Block res;
while (!res && executor.pull(res));
static bool initialized = initialize();
(void) initialized;
auto io = DB::executeQuery(input, context, true, QueryProcessingStage::Complete);
PullingPipelineExecutor executor(io.pipeline);
Block res;
while (!res && executor.pull(res));
}
catch (...)
{
}
return 0;
}
catch (...)
{
return 1;
}

View File

@ -1,11 +1,11 @@
clickhouse_add_executable(lexer_fuzzer lexer_fuzzer.cpp ${SRCS})
target_link_libraries(lexer_fuzzer PRIVATE clickhouse_parsers ${LIB_FUZZING_ENGINE})
target_link_libraries(lexer_fuzzer PRIVATE clickhouse_parsers)
clickhouse_add_executable(select_parser_fuzzer select_parser_fuzzer.cpp ${SRCS})
target_link_libraries(select_parser_fuzzer PRIVATE clickhouse_parsers ${LIB_FUZZING_ENGINE})
target_link_libraries(select_parser_fuzzer PRIVATE clickhouse_parsers dbms)
clickhouse_add_executable(create_parser_fuzzer create_parser_fuzzer.cpp ${SRCS})
target_link_libraries(create_parser_fuzzer PRIVATE clickhouse_parsers ${LIB_FUZZING_ENGINE})
target_link_libraries(create_parser_fuzzer PRIVATE clickhouse_parsers dbms)
add_subdirectory(codegen_fuzzer)

View File

@ -41,5 +41,10 @@ clickhouse_add_executable(codegen_select_fuzzer ${FUZZER_SRCS})
set_source_files_properties("${PROTO_SRCS}" "out.cpp" PROPERTIES COMPILE_FLAGS "-Wno-reserved-identifier")
# contrib/libprotobuf-mutator/src/libfuzzer/libfuzzer_macro.h:143:44: error: no newline at end of file [-Werror,-Wnewline-eof]
target_compile_options (codegen_select_fuzzer PRIVATE -Wno-newline-eof)
target_link_libraries(protoc ch_contrib::fuzzer)
target_include_directories(codegen_select_fuzzer SYSTEM BEFORE PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")
target_link_libraries(codegen_select_fuzzer PRIVATE ch_contrib::protobuf_mutator ch_contrib::protoc dbms ${LIB_FUZZING_ENGINE})
target_link_libraries(codegen_select_fuzzer PRIVATE ch_contrib::protobuf_mutator ch_contrib::protoc dbms)

View File

@ -8,27 +8,28 @@
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
std::string input = std::string(reinterpret_cast<const char*>(data), size);
try
{
std::string input = std::string(reinterpret_cast<const char*>(data), size);
DB::ParserCreateQuery parser;
DB::ASTPtr ast = parseQuery(parser, input.data(), input.data() + input.size(), "", 0, 1000);
DB::ParserCreateQuery parser;
DB::ASTPtr ast = parseQuery(parser, input.data(), input.data() + input.size(), "", 0, 1000);
const UInt64 max_ast_depth = 1000;
ast->checkDepth(max_ast_depth);
const UInt64 max_ast_depth = 1000;
ast->checkDepth(max_ast_depth);
const UInt64 max_ast_elements = 50000;
ast->checkSize(max_ast_elements);
const UInt64 max_ast_elements = 50000;
ast->checkSize(max_ast_elements);
DB::WriteBufferFromOwnString wb;
DB::formatAST(*ast, wb);
DB::WriteBufferFromOwnString wb;
DB::formatAST(*ast, wb);
std::cerr << wb.str() << std::endl;
std::cerr << wb.str() << std::endl;
}
catch (...)
{
}
return 0;
}
catch (...)
{
return 1;
}

View File

@ -8,21 +8,27 @@
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
{
DB::String query;
DB::ReadBufferFromMemory in(data, size);
readStringUntilEOF(query, in);
DB::Lexer lexer(query.data(), query.data() + query.size());
while (true)
try
{
DB::Token token = lexer.nextToken();
DB::String query;
DB::ReadBufferFromMemory in(data, size);
readStringUntilEOF(query, in);
if (token.isEnd())
break;
DB::Lexer lexer(query.data(), query.data() + query.size());
if (token.isError())
return 1;
while (true)
{
DB::Token token = lexer.nextToken();
if (token.isEnd())
break;
if (token.isError())
return 0;
}
}
catch (...)
{
}
return 0;

View File

@ -7,29 +7,30 @@
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
std::string input = std::string(reinterpret_cast<const char*>(data), size);
try
{
std::string input = std::string(reinterpret_cast<const char*>(data), size);
DB::ParserQueryWithOutput parser(input.data() + input.size());
DB::ParserQueryWithOutput parser(input.data() + input.size());
const UInt64 max_parser_depth = 1000;
DB::ASTPtr ast = parseQuery(parser, input.data(), input.data() + input.size(), "", 0, max_parser_depth);
const UInt64 max_parser_depth = 1000;
DB::ASTPtr ast = parseQuery(parser, input.data(), input.data() + input.size(), "", 0, max_parser_depth);
const UInt64 max_ast_depth = 1000;
ast->checkDepth(max_ast_depth);
const UInt64 max_ast_depth = 1000;
ast->checkDepth(max_ast_depth);
const UInt64 max_ast_elements = 50000;
ast->checkSize(max_ast_elements);
const UInt64 max_ast_elements = 50000;
ast->checkSize(max_ast_elements);
DB::WriteBufferFromOwnString wb;
DB::formatAST(*ast, wb);
DB::WriteBufferFromOwnString wb;
DB::formatAST(*ast, wb);
std::cerr << wb.str() << std::endl;
std::cerr << wb.str() << std::endl;
}
catch (...)
{
}
return 0;
}
catch (...)
{
return 1;
}

View File

@ -206,6 +206,27 @@ void PipelineExecutor::finalizeExecution()
all_processors_finished = false;
break;
}
else if (node->processor && read_progress_callback)
{
/// Some executors might have reported progress as part of their finish() call
/// For example, when reading from parallel replicas the coordinator will cancel the queries as soon as it
/// enough data (on LIMIT), but as the progress report is asynchronous it might not be reported until the
/// connection is cancelled and all packets drained
/// To cover these cases we check if there is any pending progress in the processors to report
if (auto read_progress = node->processor->getReadProgress())
{
if (read_progress->counters.total_rows_approx)
read_progress_callback->addTotalRowsApprox(read_progress->counters.total_rows_approx);
if (read_progress->counters.total_bytes)
read_progress_callback->addTotalBytes(read_progress->counters.total_bytes);
/// We are finalizing the execution, so no need to call onProgress if there is nothing to report
if (read_progress->counters.read_rows || read_progress->counters.read_bytes)
read_progress_callback->onProgress(
read_progress->counters.read_rows, read_progress->counters.read_bytes, read_progress->limits);
}
}
}
if (!all_processors_finished)

View File

@ -66,12 +66,14 @@ void ISource::progress(size_t read_rows, size_t read_bytes)
{
//std::cerr << "========= Progress " << read_rows << " from " << getName() << std::endl << StackTrace().toString() << std::endl;
read_progress_was_set = true;
std::lock_guard lock(read_progress_mutex);
read_progress.read_rows += read_rows;
read_progress.read_bytes += read_bytes;
}
std::optional<ISource::ReadProgress> ISource::getReadProgress()
{
std::lock_guard lock(read_progress_mutex);
if (finished && read_progress.read_bytes == 0 && read_progress.total_rows_approx == 0)
return {};
@ -85,6 +87,18 @@ std::optional<ISource::ReadProgress> ISource::getReadProgress()
return ReadProgress{res_progress, empty_limits};
}
void ISource::addTotalRowsApprox(size_t value)
{
std::lock_guard lock(read_progress_mutex);
read_progress.total_rows_approx += value;
}
void ISource::addTotalBytes(size_t value)
{
std::lock_guard lock(read_progress_mutex);
read_progress.total_bytes += value;
}
void ISource::work()
{
try

View File

@ -2,6 +2,9 @@
#include <Processors/IProcessor.h>
#include <atomic>
#include <mutex>
namespace DB
{
@ -9,8 +12,9 @@ namespace DB
class ISource : public IProcessor
{
private:
std::mutex read_progress_mutex;
ReadProgressCounters read_progress;
bool read_progress_was_set = false;
std::atomic_bool read_progress_was_set = false;
bool auto_progress;
protected:
@ -42,8 +46,8 @@ public:
/// Default implementation for all the sources.
std::optional<ReadProgress> getReadProgress() final;
void addTotalRowsApprox(size_t value) { read_progress.total_rows_approx += value; }
void addTotalBytes(size_t value) { read_progress.total_bytes += value; }
void addTotalRowsApprox(size_t value);
void addTotalBytes(size_t value);
};
using SourcePtr = std::shared_ptr<ISource>;

View File

@ -108,7 +108,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
, extension(extension_)
{
create_connections = [this, pool, throttler, extension_](AsyncCallback async_callback)->std::unique_ptr<IConnections>
create_connections = [this, pool, throttler](AsyncCallback async_callback)->std::unique_ptr<IConnections>
{
const Settings & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
@ -121,26 +121,32 @@ RemoteQueryExecutor::RemoteQueryExecutor(
table_to_check = std::make_shared<QualifiedTableName>(main_table.getQualifiedName());
auto res = std::make_unique<HedgedConnections>(pool, context, timeouts, throttler, pool_mode, table_to_check, std::move(async_callback));
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
if (extension && extension->replica_info)
res->setReplicaInfo(*extension->replica_info);
return res;
}
#endif
std::vector<IConnectionPool::Entry> connection_entries;
std::optional<bool> skip_unavailable_endpoints;
if (extension && extension->parallel_reading_coordinator)
skip_unavailable_endpoints = true;
if (main_table)
{
auto try_results = pool->getManyChecked(timeouts, &current_settings, pool_mode, main_table.getQualifiedName(), std::move(async_callback));
auto try_results = pool->getManyChecked(timeouts, &current_settings, pool_mode, main_table.getQualifiedName(), std::move(async_callback), skip_unavailable_endpoints);
connection_entries.reserve(try_results.size());
for (auto & try_result : try_results)
connection_entries.emplace_back(std::move(try_result.entry));
}
else
connection_entries = pool->getMany(timeouts, &current_settings, pool_mode, std::move(async_callback));
{
connection_entries = pool->getMany(timeouts, &current_settings, pool_mode, std::move(async_callback), skip_unavailable_endpoints);
}
auto res = std::make_unique<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
if (extension && extension->replica_info)
res->setReplicaInfo(*extension->replica_info);
return res;
};
}
@ -237,7 +243,7 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As
AsyncCallbackSetter async_callback_setter(connections.get(), async_callback);
const auto & settings = context->getSettingsRef();
if (needToSkipUnavailableShard())
if (isReplicaUnavailable() || needToSkipUnavailableShard())
{
/// To avoid sending the query again in the read(), we need to update the following flags:
was_cancelled = true;
@ -363,7 +369,7 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
read_context->resume();
if (needToSkipUnavailableShard())
if (isReplicaUnavailable() || needToSkipUnavailableShard())
{
/// We need to tell the coordinator not to wait for this replica.
/// But at this point it may lead to an incomplete result set, because
@ -438,9 +444,9 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
processMergeTreeReadTaskRequest(packet.request.value());
return ReadResult(ReadResult::Type::ParallelReplicasToken);
case Protocol::Server::MergeTreeAllRangesAnnounecement:
case Protocol::Server::MergeTreeAllRangesAnnouncement:
chassert(packet.announcement.has_value());
processMergeTreeInitialReadAnnounecement(packet.announcement.value());
processMergeTreeInitialReadAnnouncement(packet.announcement.value());
return ReadResult(ReadResult::Type::ParallelReplicasToken);
case Protocol::Server::ReadTaskRequest:
@ -562,7 +568,7 @@ void RemoteQueryExecutor::processMergeTreeReadTaskRequest(ParallelReadRequest re
connections->sendMergeTreeReadTaskResponse(response);
}
void RemoteQueryExecutor::processMergeTreeInitialReadAnnounecement(InitialAllRangesAnnouncement announcement)
void RemoteQueryExecutor::processMergeTreeInitialReadAnnouncement(InitialAllRangesAnnouncement announcement)
{
if (!extension || !extension->parallel_reading_coordinator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");
@ -596,39 +602,51 @@ void RemoteQueryExecutor::finish()
return;
/// Get the remaining packets so that there is no out of sync in the connections to the replicas.
Packet packet = connections->drain();
switch (packet.type)
/// We do this manually instead of calling drain() because we want to process Log, ProfileEvents and Progress
/// packets that had been sent before the connection is fully finished in order to have final statistics of what
/// was executed in the remote queries
while (connections->hasActiveConnections() && !finished)
{
case Protocol::Server::EndOfStream:
finished = true;
break;
Packet packet = connections->receivePacket();
case Protocol::Server::Log:
/// Pass logs from remote server to client
if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
log_queue->pushBlock(std::move(packet.block));
break;
switch (packet.type)
{
case Protocol::Server::EndOfStream:
finished = true;
break;
case Protocol::Server::Exception:
got_exception_from_replica = true;
packet.exception->rethrow();
break;
case Protocol::Server::Exception:
got_exception_from_replica = true;
packet.exception->rethrow();
break;
case Protocol::Server::ProfileEvents:
/// Pass profile events from remote server to client
if (auto profile_queue = CurrentThread::getInternalProfileEventsQueue())
if (!profile_queue->emplace(std::move(packet.block)))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push into profile queue");
break;
case Protocol::Server::Log:
/// Pass logs from remote server to client
if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
log_queue->pushBlock(std::move(packet.block));
break;
case Protocol::Server::TimezoneUpdate:
break;
case Protocol::Server::ProfileEvents:
/// Pass profile events from remote server to client
if (auto profile_queue = CurrentThread::getInternalProfileEventsQueue())
if (!profile_queue->emplace(std::move(packet.block)))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push into profile queue");
break;
default:
got_unknown_packet_from_replica = true;
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from one of the following replicas: {}",
toString(packet.type),
connections->dumpAddresses());
case Protocol::Server::ProfileInfo:
/// Use own (client-side) info about read bytes, it is more correct info than server-side one.
if (profile_info_callback)
profile_info_callback(packet.profile_info);
break;
case Protocol::Server::Progress:
if (progress_callback)
progress_callback(packet.progress);
break;
default:
break;
}
}
}

View File

@ -186,6 +186,8 @@ public:
bool needToSkipUnavailableShard() const { return context->getSettingsRef().skip_unavailable_shards && (0 == connections->size()); }
bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; }
private:
RemoteQueryExecutor(
const String & query_, const Block & header_, ContextPtr context_,
@ -283,7 +285,7 @@ private:
void processReadTaskRequest();
void processMergeTreeReadTaskRequest(ParallelReadRequest request);
void processMergeTreeInitialReadAnnounecement(InitialAllRangesAnnouncement announcement);
void processMergeTreeInitialReadAnnouncement(InitialAllRangesAnnouncement announcement);
/// Cancel query and restart it with info about duplicate UUIDs
/// only for `allow_experimental_query_deduplication`.

View File

@ -471,7 +471,7 @@ void TCPHandler::runImpl()
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
return;
sendMergeTreeAllRangesAnnounecementAssumeLocked(announcement);
sendMergeTreeAllRangesAnnouncementAssumeLocked(announcement);
ProfileEvents::increment(ProfileEvents::MergeTreeAllRangesAnnouncementsSent);
ProfileEvents::increment(ProfileEvents::MergeTreeAllRangesAnnouncementsSentElapsedMicroseconds, watch.elapsedMicroseconds());
});
@ -1044,9 +1044,9 @@ void TCPHandler::sendReadTaskRequestAssumeLocked()
}
void TCPHandler::sendMergeTreeAllRangesAnnounecementAssumeLocked(InitialAllRangesAnnouncement announcement)
void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRangesAnnouncement announcement)
{
writeVarUInt(Protocol::Server::MergeTreeAllRangesAnnounecement, *out);
writeVarUInt(Protocol::Server::MergeTreeAllRangesAnnouncement, *out);
announcement.serialize(*out);
out->next();
}

View File

@ -264,7 +264,7 @@ private:
void sendEndOfStream();
void sendPartUUIDs();
void sendReadTaskRequestAssumeLocked();
void sendMergeTreeAllRangesAnnounecementAssumeLocked(InitialAllRangesAnnouncement announcement);
void sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRangesAnnouncement announcement);
void sendMergeTreeReadTaskRequestAssumeLocked(ParallelReadRequest request);
void sendProfileInfo(const ProfileInfo & info);
void sendTotals(const Block & totals);

View File

@ -4852,17 +4852,18 @@ void MergeTreeData::removePartContributionToColumnAndSecondaryIndexSizes(const D
log_subtract(total_column_size.marks, part_column_size.marks, ".marks");
}
auto indexes_descriptions = getInMemoryMetadataPtr()->secondary_indices;
for (const auto & index : indexes_descriptions)
for (auto & [secondary_index_name, total_secondary_index_size] : secondary_index_sizes)
{
IndexSize & total_secondary_index_size = secondary_index_sizes[index.name];
IndexSize part_secondary_index_size = part->getSecondaryIndexSize(index.name);
if (!part->hasSecondaryIndex(secondary_index_name))
continue;
IndexSize part_secondary_index_size = part->getSecondaryIndexSize(secondary_index_name);
auto log_subtract = [&](size_t & from, size_t value, const char * field)
{
if (value > from)
LOG_ERROR(log, "Possibly incorrect index size subtraction: {} - {} = {}, index: {}, field: {}",
from, value, from - value, index.name, field);
from, value, from - value, secondary_index_name, field);
from -= value;
};
@ -8604,7 +8605,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::createE
bool MergeTreeData::allowRemoveStaleMovingParts() const
{
return ConfigHelper::getBool(getContext()->getConfigRef(), "allow_remove_stale_moving_parts");
return ConfigHelper::getBool(getContext()->getConfigRef(), "allow_remove_stale_moving_parts", /* default_ = */ true);
}
CurrentlySubmergingEmergingTagger::~CurrentlySubmergingEmergingTagger()

View File

@ -1,7 +1,7 @@
clickhouse_add_executable (mergetree_checksum_fuzzer mergetree_checksum_fuzzer.cpp)
# Look at comment around fuzz_compression target declaration
target_link_libraries (mergetree_checksum_fuzzer PRIVATE dbms ${LIB_FUZZING_ENGINE})
target_link_libraries (mergetree_checksum_fuzzer PRIVATE dbms)
clickhouse_add_executable (columns_description_fuzzer columns_description_fuzzer.cpp)
target_link_libraries (columns_description_fuzzer PRIVATE dbms ${LIB_FUZZING_ENGINE})
target_link_libraries (columns_description_fuzzer PRIVATE dbms)

View File

@ -2,14 +2,16 @@
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
using namespace DB;
ColumnsDescription columns = ColumnsDescription::parse(std::string(reinterpret_cast<const char *>(data), size));
std::cerr << columns.toString() << "\n";
try
{
using namespace DB;
ColumnsDescription columns = ColumnsDescription::parse(std::string(reinterpret_cast<const char *>(data), size));
std::cerr << columns.toString() << "\n";
}
catch (...)
{
}
return 0;
}
catch (...)
{
return 1;
}

View File

@ -5,19 +5,20 @@
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
DB::ReadBufferFromMemory in(data, size);
DB::MergeTreeDataPartChecksums res;
DB::WriteBufferFromFileDescriptor out(STDOUT_FILENO);
try
{
DB::ReadBufferFromMemory in(data, size);
DB::MergeTreeDataPartChecksums res;
DB::WriteBufferFromFileDescriptor out(STDOUT_FILENO);
if (!res.read(in))
return 1;
res.write(out);
if (!res.read(in))
return 0;
res.write(out);
}
catch (...)
{
}
return 0;
}
catch (...)
{
return 1;
}

View File

@ -95,3 +95,4 @@ test_odbc_interaction/test.py::test_postgres_insert
test_zookeeper_config/test.py::test_chroot_with_different_root
test_zookeeper_config/test.py::test_chroot_with_same_root
test_merge_tree_azure_blob_storage/test.py::test_table_manipulations
test_parallel_replicas_skip_shards/test.py::test_skip_unavailable_shards

View File

@ -96,8 +96,17 @@ def get_packager_cmd(
return cmd
def _expect_artifacts(build_config: BuildConfig) -> bool:
if build_config.package_type == "fuzzers":
return False
return True
def build_clickhouse(
packager_cmd: str, logs_path: Path, build_output_path: Path
packager_cmd: str,
logs_path: Path,
build_output_path: Path,
expect_artifacts: bool = True,
) -> Tuple[Path, bool]:
build_log_path = logs_path / BUILD_LOG_NAME
success = False
@ -109,7 +118,7 @@ def build_clickhouse(
build_results = []
if retcode == 0:
if len(build_results) > 0:
if not expect_artifacts or len(build_results) > 0:
success = True
logging.info("Built successfully")
else:
@ -312,7 +321,9 @@ def main():
os.makedirs(logs_path, exist_ok=True)
start = time.time()
log_path, success = build_clickhouse(packager_cmd, logs_path, build_output_path)
log_path, success = build_clickhouse(
packager_cmd, logs_path, build_output_path, _expect_artifacts(build_config)
)
elapsed = int(time.time() - start)
subprocess.check_call(
f"sudo chown -R ubuntu:ubuntu {build_output_path}", shell=True

View File

@ -9,7 +9,7 @@ from typing import Callable, Dict, List, Literal
@dataclass
class BuildConfig:
compiler: str
package_type: Literal["deb", "binary"]
package_type: Literal["deb", "binary", "fuzzers"]
additional_pkgs: bool = False
debug_build: bool = False
sanitizer: str = ""
@ -182,6 +182,10 @@ CI_CONFIG = CiConfig(
package_type="binary",
static_binary_name="s390x",
),
"fuzzers": BuildConfig(
compiler="clang-16",
package_type="fuzzers",
),
},
builds_report_config={
"ClickHouse build check": [
@ -193,6 +197,7 @@ CI_CONFIG = CiConfig(
"package_msan",
"package_debug",
"binary_release",
"fuzzers",
],
"ClickHouse special build check": [
"binary_tidy",

View File

@ -1,28 +1,28 @@
#!/usr/bin/env python3
# A trivial stateless slack bot that notifies about new broken tests in ClickHouse CI.
# It checks what happened to our CI during the last check_period hours (1 hour) and notifies us in slack if necessary.
# This script should be executed once each check_period hours (1 hour).
# It will post duplicate messages if you run it more often; it will lose some messages if you run it less often.
#
# You can run it locally with no arguments, it will work in a dry-run mode. Or you can set your own SLACK_URL_DEFAULT.
# Feel free to add more checks, more details to messages, or better heuristics.
# NOTE There's no deployment automation for now,
# an AWS Lambda (slack-ci-bot-test lambda in CI-CD) has to be updated manually after changing this script.
#
# See also: https://aretestsgreenyet.com/
"""
A trivial stateless slack bot that notifies about new broken tests in ClickHouse CI.
It checks what happened to our CI during the last check_period hours (1 hour) and
notifies us in slack if necessary.
This script should be executed once each check_period hours (1 hour).
It will post duplicate messages if you run it more often; it will lose some messages
if you run it less often.
You can run it locally with no arguments, it will work in a dry-run mode.
Or you can set your own SLACK_URL_DEFAULT.
Feel free to add more checks, more details to messages, or better heuristics.
It's deployed to slack-bot-ci-lambda in CI/CD account
See also: https://aretestsgreenyet.com/
"""
import os
import json
import base64
import random
if os.environ.get("AWS_LAMBDA_ENV", "0") == "1":
# For AWS labmda (python 3.7)
from botocore.vendored import requests
else:
# For running locally
import requests
import requests # type: ignore
DRY_RUN_MARK = "<no url, dry run>"
@ -34,7 +34,8 @@ REPORT_NO_FAILURES_PROBABILITY = 0.99
MAX_TESTS_TO_REPORT = 4
# Slack has a stupid limitation on message size, it splits long messages into multiple ones breaking formatting
# Slack has a stupid limitation on message size, it splits long messages into multiple,
# ones breaking formatting
MESSAGE_LENGTH_LIMIT = 4000
# Find tests that failed in master during the last check_period * 24 hours,
@ -61,7 +62,7 @@ WHERE 1
AND test_name NOT IN (
SELECT test_name FROM checks WHERE 1
AND check_start_time >= now - INTERVAL 1 MONTH
AND (check_start_time + check_duration_ms / 1000) BETWEEN now - INTERVAL 2 WEEK AND now - INTERVAL extended_check_period HOUR
AND (check_start_time + check_duration_ms / 1000) BETWEEN now - INTERVAL 2 WEEK AND now - INTERVAL extended_check_period HOUR
AND pull_request_number = 0
AND check_status != 'success'
AND test_status LIKE 'F%')
@ -95,11 +96,11 @@ FAILED_CHECKS_PERCENTAGE_QUERY = """
SELECT if(toHour(now('Europe/Amsterdam')) = 12, v, 0)
FROM
(
SELECT
countDistinctIf((commit_sha, check_name), (test_status LIKE 'F%') AND (check_status != 'success'))
SELECT
countDistinctIf((commit_sha, check_name), (test_status LIKE 'F%') AND (check_status != 'success'))
/ countDistinct((commit_sha, check_name)) AS v
FROM checks
WHERE 1
WHERE 1
AND (pull_request_number = 0)
AND (test_status != 'SKIPPED')
AND (check_start_time > (now() - toIntervalDay(1)))
@ -111,7 +112,7 @@ ALL_RECENT_FAILURES_QUERY = """
WITH
'{}' AS name_substr,
90 AS interval_days,
('Stateless tests (asan)', 'Stateless tests (address)', 'Stateless tests (address, actions)') AS backport_and_release_specific_checks
('Stateless tests (asan)', 'Stateless tests (address)', 'Stateless tests (address, actions)', 'Integration tests (asan) [1/3]', 'Stateless tests (tsan) [1/3]') AS backport_and_release_specific_checks
SELECT
toStartOfDay(check_start_time) AS d,
count(),
@ -315,14 +316,14 @@ def check_and_alert():
)
def lambda_handler(event, context):
def handler(event, context):
try:
check_and_alert()
return {"statusCode": 200, "body": "OK"}
except Exception as e:
send_to_slack(
"I failed, please help me (see ClickHouse/utils/ci-slack-bot/ci-slack-bot.py): "
+ str(e)
"I failed, please help me "
f"(see ClickHouse/ClickHouse/tests/ci/slack_bot_ci_lambda/app.py): {e}"
)
return {"statusCode": 200, "body": "FAIL"}

View File

@ -0,0 +1 @@
../team_keys_lambda/build_and_deploy_archive.sh

View File

@ -0,0 +1 @@
../lambda_shared_package

View File

@ -23,7 +23,7 @@ cp app.py "$PACKAGE"
if [ -f requirements.txt ]; then
VENV=lambda-venv
rm -rf "$VENV" lambda-package.zip
docker run --rm --user="${UID}" -e HOME=/tmp --entrypoint=/bin/bash \
docker run --net=host --rm --user="${UID}" -e HOME=/tmp --entrypoint=/bin/bash \
--volume="${WORKDIR}/..:/ci" --workdir="/ci/${DIR_NAME}" "${DOCKER_IMAGE}" \
-exc "
'$PY_EXEC' -m venv '$VENV' &&

View File

@ -114,9 +114,9 @@ def test_disks_app_func_cp(started_cluster):
"/usr/bin/clickhouse",
"disks",
"copy",
"--diskFrom",
"--disk-from",
"test1",
"--diskTo",
"--disk-to",
"test2",
".",
".",

View File

@ -0,0 +1,54 @@
<clickhouse>
<remote_servers>
<test_multiple_shards_multiple_replicas>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>n1</host>
<port>9000</port>
</replica>
<replica>
<host>n2</host>
<port>9000</port>
</replica>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>n4</host>
<port>9000</port>
</replica>
<replica>
<host>n5</host>
<port>9000</port>
</replica>
<replica>
<host>n6</host>
<port>9000</port>
</replica>
</shard>
</test_multiple_shards_multiple_replicas>
<test_single_shard_multiple_replicas>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>n1</host>
<port>9000</port>
</replica>
<replica>
<host>n2</host>
<port>9000</port>
</replica>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
</shard>
</test_single_shard_multiple_replicas>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,164 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
# create only 2 nodes out of 3 nodes in cluster with 1 shard
# and out of 6 nodes in first shard in cluster with 2 shards
node1 = cluster.add_instance(
"n1", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node2 = cluster.add_instance(
"n2", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def create_tables(cluster, table_name):
# create replicated tables
node1.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
node2.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
node1.query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r1') ORDER BY (key)"
)
node2.query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r2') ORDER BY (key)"
)
# create distributed table
node1.query(f"DROP TABLE IF EXISTS {table_name}_d SYNC")
node1.query(
f"""
CREATE TABLE {table_name}_d AS {table_name}
Engine=Distributed(
{cluster},
currentDatabase(),
{table_name},
key
)
"""
)
# populate data
node1.query(f"INSERT INTO {table_name} SELECT number, number FROM numbers(1000)")
node2.query(f"INSERT INTO {table_name} SELECT -number, -number FROM numbers(1000)")
node1.query(f"INSERT INTO {table_name} SELECT number, number FROM numbers(3)")
@pytest.mark.parametrize(
"prefer_localhost_replica",
[
pytest.param(0),
pytest.param(1),
],
)
def test_skip_unavailable_shards(start_cluster, prefer_localhost_replica):
cluster = "test_multiple_shards_multiple_replicas"
table_name = "test_table"
create_tables(cluster, table_name)
expected_result = f"2003\t-999\t999\t3\n"
# w/o parallel replicas
assert (
node1.query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d settings skip_unavailable_shards=1"
)
== expected_result
)
# parallel replicas
assert (
node1.query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 3,
"use_hedged_requests": 0,
"prefer_localhost_replica": prefer_localhost_replica,
"skip_unavailable_shards": 1,
"connections_with_failover_max_tries": 0, # just don't wait for unavailable replicas
},
)
== expected_result
)
@pytest.mark.parametrize(
"prefer_localhost_replica",
[
pytest.param(0),
pytest.param(1),
],
)
def test_error_on_unavailable_shards(start_cluster, prefer_localhost_replica):
cluster = "test_multiple_shards_multiple_replicas"
table_name = "test_table"
create_tables(cluster, table_name)
# w/o parallel replicas
with pytest.raises(QueryRuntimeException):
node1.query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d settings skip_unavailable_shards=0"
)
# parallel replicas
with pytest.raises(QueryRuntimeException):
node1.query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 3,
"use_hedged_requests": 0,
"prefer_localhost_replica": prefer_localhost_replica,
"skip_unavailable_shards": 0,
},
)
@pytest.mark.parametrize(
"skip_unavailable_shards",
[
pytest.param(0),
pytest.param(1),
],
)
def test_no_unavailable_shards(start_cluster, skip_unavailable_shards):
cluster = "test_single_shard_multiple_replicas"
table_name = "test_table"
create_tables(cluster, table_name)
expected_result = f"2003\t-999\t999\t3\n"
# w/o parallel replicas
assert (
node1.query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d settings skip_unavailable_shards={skip_unavailable_shards}"
)
== expected_result
)
# parallel replicas
assert (
node1.query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 3,
"use_hedged_requests": 0,
"prefer_localhost_replica": 0,
"skip_unavailable_shards": skip_unavailable_shards,
},
)
== expected_result
)

View File

@ -0,0 +1,34 @@
<clickhouse>
<remote_servers>
<two_shards>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node4</host>
<port>9000</port>
</replica>
<replica>
<host>node5</host>
<port>9000</port>
</replica>
<replica>
<host>node6</host>
<port>9000</port>
</replica>
</shard>
</two_shards>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,71 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", main_configs=["configs/remote_servers.xml"])
node2 = cluster.add_instance("node2", main_configs=["configs/remote_servers.xml"])
node3 = cluster.add_instance("node3", main_configs=["configs/remote_servers.xml"])
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_skip_unavailable_shards(start_cluster):
expected = "node1\nnode2\nnode3\n"
assert (
node1.query(
"SELECT hostName() as h FROM clusterAllReplicas('two_shards', system.one) order by h",
settings={
"allow_experimental_parallel_reading_from_replicas": 0,
"skip_unavailable_shards": 1,
},
)
== expected
)
assert (
node1.query(
"SELECT hostName() as h FROM clusterAllReplicas('two_shards', system.one) order by h",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 3,
"use_hedged_requests": 0,
"skip_unavailable_shards": 1,
# "async_socket_for_remote" : 0,
# "async_query_sending_for_remote" : 0,
# "connections_with_failover_max_tries": 0,
},
)
== expected
)
def test_error_on_unavailable_shards(start_cluster):
with pytest.raises(QueryRuntimeException):
node1.query(
"SELECT hostName() as h FROM clusterAllReplicas('two_shards', system.one) order by h",
settings={
"allow_experimental_parallel_reading_from_replicas": 0,
"skip_unavailable_shards": 0,
},
)
with pytest.raises(QueryRuntimeException):
node1.query(
"SELECT hostName() as h FROM clusterAllReplicas('two_shards', system.one) order by h",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 3,
"use_hedged_requests": 0,
"skip_unavailable_shards": 0,
},
)

View File

@ -0,0 +1,8 @@
<test>
<query>select uniq(number) from (select DISTINCT number from numbers(1000000))</query>
<query>select uniq(number) from (select number from numbers(1000000) group by number)</query>
<!--For new analyzer-->
<query>select uniq(number) from (select DISTINCT number from numbers(1000000)) SETTINGS allow_experimental_analyzer=1</query>
<query>select uniq(number) from (select number from numbers(1000000) group by number) SETTINGS allow_experimental_analyzer=1</query>
</test>

View File

@ -1 +1,2 @@
default test_table value_index minmax minmax value 1 38 12 24
default test_table value_index minmax minmax value 1 38 12 24

View File

@ -12,4 +12,10 @@ ORDER BY key SETTINGS compress_marks=false;
INSERT INTO test_table VALUES (0, 'Value');
SELECT * FROM system.data_skipping_indices WHERE database = currentDatabase();
ALTER TABLE test_table DROP INDEX value_index;
ALTER TABLE test_table ADD INDEX value_index value TYPE minmax GRANULARITY 1;
ALTER TABLE test_table MATERIALIZE INDEX value_index SETTINGS mutations_sync=1;
SELECT * FROM system.data_skipping_indices WHERE database = currentDatabase();
DROP TABLE test_table;

View File

@ -4,7 +4,7 @@ INSERT INTO test_parallel_replicas_unavailable_shards SELECT * FROM numbers(10);
SYSTEM FLUSH LOGS;
SET skip_unavailable_shards=1, allow_experimental_parallel_reading_from_replicas=1, max_parallel_replicas=11, use_hedged_requests=0, cluster_for_parallel_replicas='parallel_replicas', parallel_replicas_for_non_replicated_merge_tree=1;
SET allow_experimental_parallel_reading_from_replicas=2, max_parallel_replicas=11, use_hedged_requests=0, cluster_for_parallel_replicas='parallel_replicas', parallel_replicas_for_non_replicated_merge_tree=1;
SET send_logs_level='error';
SELECT count() FROM test_parallel_replicas_unavailable_shards WHERE NOT ignore(*);

View File

@ -0,0 +1,4 @@
1
1
02841_summary_default_interactive_0 2
02841_summary_default_interactive_high 2

Some files were not shown because too many files have changed in this diff Show More