Merge remote-tracking branch 'rschu1ze/master' into clang-17

This commit is contained in:
Robert Schulze 2023-09-20 09:15:58 +00:00
commit cbcff6ef08
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
45 changed files with 995 additions and 118 deletions

95
.github/workflows/libfuzzer.yml vendored Normal file
View File

@ -0,0 +1,95 @@
name: libFuzzer
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
on: # yamllint disable-line rule:truthy
# schedule:
# - cron: '0 0 2 31 1' # never for now
workflow_call:
jobs:
BuilderFuzzers:
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
BUILD_NAME=fuzzers
EOF
- name: Download changed images
# even if artifact does not exist, e.g. on `do not test` label or failed Docker job
continue-on-error: true
uses: actions/download-artifact@v3
with:
name: changed_images
path: ${{ env.IMAGES_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
submodules: true
ref: ${{github.ref}}
- name: Build
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
- name: Upload build URLs to artifacts
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v3
with:
name: ${{ env.BUILD_URLS }}
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
libFuzzerTest:
needs: [BuilderFuzzers]
runs-on: [self-hosted, func-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/libfuzzer
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=libFuzzer tests
REPO_COPY=${{runner.temp}}/libfuzzer/ClickHouse
KILL_TIMEOUT=10800
EOF
- name: Download changed images
# even if artifact does not exist, e.g. on `do not test` label or failed Docker job
continue-on-error: true
uses: actions/download-artifact@v3
with:
name: changed_images
path: ${{ env.TEMP_PATH }}
- name: Download json reports
uses: actions/download-artifact@v3
with:
path: ${{ env.REPORTS_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
- name: libFuzzer test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 libfuzzer_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH"

View File

@ -5187,9 +5187,16 @@ jobs:
cd "$GITHUB_WORKSPACE/tests/ci"
python3 finish_check.py
python3 merge_pr.py --check-approved
##############################################################################################
########################### SQLLOGIC TEST ###################################################
##############################################################################################
#############################################################################################
####################################### libFuzzer ###########################################
#############################################################################################
libFuzzer:
if: contains(github.event.pull_request.labels.*.name, 'libFuzzer')
needs: [DockerHubPush, StyleCheck]
uses: ./.github/workflows/libfuzzer.yml
##############################################################################################
############################ SQLLOGIC TEST ###################################################
##############################################################################################
SQLLogicTestRelease:
needs: [BuilderDebRelease]
runs-on: [self-hosted, func-tester]

View File

@ -19,6 +19,7 @@ include (cmake/tools.cmake)
include (cmake/ccache.cmake)
include (cmake/clang_tidy.cmake)
include (cmake/git.cmake)
include (cmake/utils.cmake)
# Ignore export() since we don't use it,
# but it gets broken with a global targets via link_libraries()
@ -551,22 +552,6 @@ add_subdirectory (programs)
add_subdirectory (tests)
add_subdirectory (utils)
# Function get_all_targets collects all targets recursively
function(get_all_targets var)
macro(get_all_targets_recursive targets dir)
get_property(subdirectories DIRECTORY ${dir} PROPERTY SUBDIRECTORIES)
foreach(subdir ${subdirectories})
get_all_targets_recursive(${targets} ${subdir})
endforeach()
get_property(current_targets DIRECTORY ${dir} PROPERTY BUILDSYSTEM_TARGETS)
list(APPEND ${targets} ${current_targets})
endmacro()
set(targets)
get_all_targets_recursive(targets ${CMAKE_CURRENT_SOURCE_DIR})
set(${var} ${targets} PARENT_SCOPE)
endfunction()
if (FUZZER)
# Bundle fuzzers target
add_custom_target(fuzzers)
@ -581,14 +566,18 @@ if (FUZZER)
# clickhouse fuzzer isn't working correctly
# initial PR https://github.com/ClickHouse/ClickHouse/pull/27526
#if (target MATCHES ".+_fuzzer" OR target STREQUAL "clickhouse")
if (target MATCHES ".+_fuzzer")
if (target_type STREQUAL "EXECUTABLE" AND target MATCHES ".+_fuzzer")
message(STATUS "${target} instrumented with fuzzer")
target_link_libraries(${target} PUBLIC ch_contrib::fuzzer)
# Add to fuzzers bundle
add_dependencies(fuzzers ${target})
get_target_filename(${target} target_bin_name)
get_target_property(target_bin_dir ${target} BINARY_DIR)
add_custom_command(TARGET fuzzers POST_BUILD COMMAND mv "${target_bin_dir}/${target_bin_name}" "${CMAKE_CURRENT_BINARY_DIR}/programs/" VERBATIM)
endif()
endif()
endforeach()
add_custom_command(TARGET fuzzers POST_BUILD COMMAND SRC=${CMAKE_SOURCE_DIR} BIN=${CMAKE_BINARY_DIR} OUT=${CMAKE_BINARY_DIR}/programs ${CMAKE_SOURCE_DIR}/tests/fuzz/build.sh VERBATIM)
endif()
include (cmake/sanitize_targets.cmake)

120
cmake/utils.cmake Normal file
View File

@ -0,0 +1,120 @@
# Useful stuff
# Function get_all_targets collects all targets recursively
function(get_all_targets outvar)
macro(get_all_targets_recursive targets dir)
get_property(subdirectories DIRECTORY ${dir} PROPERTY SUBDIRECTORIES)
foreach(subdir ${subdirectories})
get_all_targets_recursive(${targets} ${subdir})
endforeach()
get_property(current_targets DIRECTORY ${dir} PROPERTY BUILDSYSTEM_TARGETS)
list(APPEND ${targets} ${current_targets})
endmacro()
set(targets)
get_all_targets_recursive(targets ${CMAKE_CURRENT_SOURCE_DIR})
set(${outvar} ${targets} PARENT_SCOPE)
endfunction()
# Function get_target_filename calculates target's output file name
function(get_target_filename target outvar)
get_target_property(prop_type "${target}" TYPE)
get_target_property(prop_is_framework "${target}" FRAMEWORK)
get_target_property(prop_outname "${target}" OUTPUT_NAME)
get_target_property(prop_archive_outname "${target}" ARCHIVE_OUTPUT_NAME)
get_target_property(prop_library_outname "${target}" LIBRARY_OUTPUT_NAME)
get_target_property(prop_runtime_outname "${target}" RUNTIME_OUTPUT_NAME)
# message("prop_archive_outname: ${prop_archive_outname}")
# message("prop_library_outname: ${prop_library_outname}")
# message("prop_runtime_outname: ${prop_runtime_outname}")
if(DEFINED CMAKE_BUILD_TYPE)
get_target_property(prop_cfg_outname "${target}" "${OUTPUT_NAME}_${CMAKE_BUILD_TYPE}")
get_target_property(prop_archive_cfg_outname "${target}" "${ARCHIVE_OUTPUT_NAME}_${CMAKE_BUILD_TYPE}")
get_target_property(prop_library_cfg_outname "${target}" "${LIBRARY_OUTPUT_NAME}_${CMAKE_BUILD_TYPE}")
get_target_property(prop_runtime_cfg_outname "${target}" "${RUNTIME_OUTPUT_NAME}_${CMAKE_BUILD_TYPE}")
# message("prop_archive_cfg_outname: ${prop_archive_cfg_outname}")
# message("prop_library_cfg_outname: ${prop_library_cfg_outname}")
# message("prop_runtime_cfg_outname: ${prop_runtime_cfg_outname}")
if(NOT ("${prop_cfg_outname}" STREQUAL "prop_cfg_outname-NOTFOUND"))
set(prop_outname "${prop_cfg_outname}")
endif()
if(NOT ("${prop_archive_cfg_outname}" STREQUAL "prop_archive_cfg_outname-NOTFOUND"))
set(prop_archive_outname "${prop_archive_cfg_outname}")
endif()
if(NOT ("${prop_library_cfg_outname}" STREQUAL "prop_library_cfg_outname-NOTFOUND"))
set(prop_library_outname "${prop_library_cfg_outname}")
endif()
if(NOT ("${prop_runtime_cfg_outname}" STREQUAL "prop_runtime_cfg_outname-NOTFOUND"))
set(prop_runtime_outname "${prop_runtime_cfg_outname}")
endif()
endif()
set(outname "${target}")
if(NOT ("${prop_outname}" STREQUAL "prop_outname-NOTFOUND"))
set(outname "${prop_outname}")
endif()
if("${prop_is_framework}")
set(filename "${outname}")
elseif(prop_type STREQUAL "STATIC_LIBRARY")
if(NOT ("${prop_archive_outname}" STREQUAL "prop_archive_outname-NOTFOUND"))
set(outname "${prop_archive_outname}")
endif()
set(filename "${CMAKE_STATIC_LIBRARY_PREFIX}${outname}${CMAKE_STATIC_LIBRARY_SUFFIX}")
elseif(prop_type STREQUAL "MODULE_LIBRARY")
if(NOT ("${prop_library_outname}" STREQUAL "prop_library_outname-NOTFOUND"))
set(outname "${prop_library_outname}")
endif()
set(filename "${CMAKE_SHARED_MODULE_LIBRARY_PREFIX}${outname}${CMAKE_SHARED_MODULE_LIBRARY_SUFFIX}")
elseif(prop_type STREQUAL "SHARED_LIBRARY")
if(WIN32)
if(NOT ("${prop_runtime_outname}" STREQUAL "prop_runtime_outname-NOTFOUND"))
set(outname "${prop_runtime_outname}")
endif()
else()
if(NOT ("${prop_library_outname}" STREQUAL "prop_library_outname-NOTFOUND"))
set(outname "${prop_library_outname}")
endif()
endif()
set(filename "${CMAKE_SHARED_LIBRARY_PREFIX}${outname}${CMAKE_SHARED_LIBRARY_SUFFIX}")
elseif(prop_type STREQUAL "EXECUTABLE")
if(NOT ("${prop_runtime_outname}" STREQUAL "prop_runtime_outname-NOTFOUND"))
set(outname "${prop_runtime_outname}")
endif()
set(filename "${CMAKE_EXECUTABLE_PREFIX}${outname}${CMAKE_EXECUTABLE_SUFFIX}")
else()
message(FATAL_ERROR "target \"${target}\" is not of type STATIC_LIBRARY, MODULE_LIBRARY, SHARED_LIBRARY, or EXECUTABLE.")
endif()
set("${outvar}" "${filename}" PARENT_SCOPE)
endfunction()
# Function get_cmake_properties returns list of all propreties that cmake supports
function(get_cmake_properties outvar)
execute_process(COMMAND cmake --help-property-list OUTPUT_VARIABLE cmake_properties)
# Convert command output into a CMake list
string(REGEX REPLACE ";" "\\\\;" cmake_properties "${cmake_properties}")
string(REGEX REPLACE "\n" ";" cmake_properties "${cmake_properties}")
list(REMOVE_DUPLICATES cmake_properties)
set("${outvar}" "${cmake_properties}" PARENT_SCOPE)
endfunction()
# Function get_target_property_list returns list of all propreties set for target
function(get_target_property_list target outvar)
get_cmake_properties(cmake_property_list)
foreach(property ${cmake_property_list})
string(REPLACE "<CONFIG>" "${CMAKE_BUILD_TYPE}" property ${property})
# https://stackoverflow.com/questions/32197663/how-can-i-remove-the-the-location-property-may-not-be-read-from-target-error-i
if(property STREQUAL "LOCATION" OR property MATCHES "^LOCATION_" OR property MATCHES "_LOCATION$")
continue()
endif()
get_property(was_set TARGET ${target} PROPERTY ${property} SET)
if(was_set)
get_target_property(value ${target} ${property})
string(REGEX REPLACE ";" "\\\\\\\\;" value "${value}")
list(APPEND outvar "${property} = ${value}")
endif()
endforeach()
set(${outvar} ${${outvar}} PARENT_SCOPE)
endfunction()

View File

@ -19,6 +19,10 @@
"name": "clickhouse/fuzzer",
"dependent": []
},
"docker/test/libfuzzer": {
"name": "clickhouse/libfuzzer",
"dependent": []
},
"docker/test/performance-comparison": {
"name": "clickhouse/performance-comparison",
"dependent": []
@ -115,6 +119,7 @@
"name": "clickhouse/test-base",
"dependent": [
"docker/test/fuzzer",
"docker/test/libfuzzer",
"docker/test/integration/base",
"docker/test/keeper-jepsen",
"docker/test/server-jepsen",

View File

@ -78,6 +78,7 @@ RUN add-apt-repository ppa:ubuntu-toolchain-r/test --yes \
python3-boto3 \
yasm \
zstd \
zip \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists

View File

@ -97,11 +97,10 @@ if [ -n "$MAKE_DEB" ]; then
bash -x /build/packages/build
fi
if [ "$BUILD_TARGET" != "fuzzers" ]; then
mv ./programs/clickhouse* /output
[ -x ./programs/self-extracting/clickhouse ] && mv ./programs/self-extracting/clickhouse /output
mv ./src/unit_tests_dbms /output ||: # may not exist for some binary builds
fi
mv ./programs/clickhouse* /output || mv ./programs/*_fuzzer /output
[ -x ./programs/self-extracting/clickhouse ] && mv ./programs/self-extracting/clickhouse /output
mv ./src/unit_tests_dbms /output ||: # may not exist for some binary builds
mv ./programs/*.dict ./programs/*.options ./programs/*_seed_corpus.zip /output ||: # libFuzzer oss-fuzz compatible infrastructure
prepare_combined_output () {
local OUTPUT

View File

@ -0,0 +1,43 @@
ARG FROM_TAG=latest
FROM clickhouse/test-base:$FROM_TAG
# ARG for quick switch to a given ubuntu mirror
ARG apt_archive="http://archive.ubuntu.com"
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
ENV LANG=C.UTF-8
ENV TZ=Europe/Amsterdam
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install --yes --no-install-recommends \
ca-certificates \
libc6-dbg \
moreutils \
ncdu \
p7zip-full \
parallel \
psmisc \
python3 \
python3-pip \
rsync \
tree \
tzdata \
vim \
wget \
&& apt-get autoremove --yes \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
RUN pip3 install Jinja2
COPY * /
ENV FUZZER_ARGS="-max_total_time=60"
SHELL ["/bin/bash", "-c"]
CMD set -o pipefail \
&& timeout -s 9 1h /run_libfuzzer.py 2>&1 | ts "$(printf '%%Y-%%m-%%d %%H:%%M:%%S\t')" | tee main.log
# docker run --network=host --volume <workspace>:/workspace -e PR_TO_TEST=<> -e SHA_TO_TEST=<> clickhouse/libfuzzer

View File

@ -0,0 +1,77 @@
#!/usr/bin/env python3
import configparser
import logging
import os
from pathlib import Path
import subprocess
DEBUGGER = os.getenv("DEBUGGER", "")
FUZZER_ARGS = os.getenv("FUZZER_ARGS", "")
def run_fuzzer(fuzzer: str):
logging.info(f"Running fuzzer {fuzzer}...")
corpus_dir = f"{fuzzer}.in"
with Path(corpus_dir) as path:
if not path.exists() or not path.is_dir():
corpus_dir = ""
options_file = f"{fuzzer}.options"
custom_libfuzzer_options = ""
with Path(options_file) as path:
if path.exists() and path.is_file():
parser = configparser.ConfigParser()
parser.read(path)
if parser.has_section("asan"):
os.environ[
"ASAN_OPTIONS"
] = f"{os.environ['ASAN_OPTIONS']}:{':'.join('%s=%s' % (key, value) for key, value in parser['asan'].items())}"
if parser.has_section("msan"):
os.environ[
"MSAN_OPTIONS"
] = f"{os.environ['MSAN_OPTIONS']}:{':'.join('%s=%s' % (key, value) for key, value in parser['msan'].items())}"
if parser.has_section("ubsan"):
os.environ[
"UBSAN_OPTIONS"
] = f"{os.environ['UBSAN_OPTIONS']}:{':'.join('%s=%s' % (key, value) for key, value in parser['ubsan'].items())}"
if parser.has_section("libfuzzer"):
custom_libfuzzer_options = " ".join(
"-%s=%s" % (key, value)
for key, value in parser["libfuzzer"].items()
)
cmd_line = f"{DEBUGGER} ./{fuzzer} {FUZZER_ARGS} {corpus_dir}"
if custom_libfuzzer_options:
cmd_line += f" {custom_libfuzzer_options}"
if not "-dict=" in cmd_line and Path(f"{fuzzer}.dict").exists():
cmd_line += f" -dict={fuzzer}.dict"
cmd_line += " < /dev/null"
logging.info(f"...will execute: {cmd_line}")
subprocess.check_call(cmd_line, shell=True)
def main():
logging.basicConfig(level=logging.INFO)
subprocess.check_call("ls -al", shell=True)
with Path() as current:
for fuzzer in current.iterdir():
if (current / fuzzer).is_file() and os.access(current / fuzzer, os.X_OK):
run_fuzzer(fuzzer)
exit(0)
if __name__ == "__main__":
main()

View File

@ -157,7 +157,7 @@ At this time, it is not checked for one of the sorting stages, or when merging a
The `max_execution_time` parameter can be a bit tricky to understand.
It operates based on interpolation relative to the current query execution speed (this behaviour is controlled by [timeout_before_checking_execution_speed](#timeout-before-checking-execution-speed)).
ClickHouse will interrupt a query if the projected execution time exceeds the specified `max_execution_time`.
By default, the timeout_before_checking_execution_speed is set to 1 second. This means that after just one second of query execution, ClickHouse will begin estimating the total execution time.
By default, the timeout_before_checking_execution_speed is set to 10 seconds. This means that after 10 seconds of query execution, ClickHouse will begin estimating the total execution time.
If, for example, `max_execution_time` is set to 3600 seconds (1 hour), ClickHouse will terminate the query if the estimated time exceeds this 3600-second limit.
If you set `timeout_before_checking_execution_speed `to 0, ClickHouse will use clock time as the basis for `max_execution_time`.

View File

@ -169,7 +169,7 @@ class IColumn;
M(String, parallel_replicas_custom_key, "", "Custom key assigning work to replicas when parallel replicas are used.", 0) \
M(ParallelReplicasCustomKeyFilterType, parallel_replicas_custom_key_filter_type, ParallelReplicasCustomKeyFilterType::DEFAULT, "Type of filter to use with custom key for parallel replicas. default - use modulo operation on the custom key, range - use range filter on custom key using all possible values for the value type of custom key.", 0) \
\
M(String, cluster_for_parallel_replicas, "default", "Cluster for a shard in which current server is located", 0) \
M(String, cluster_for_parallel_replicas, "", "Cluster for a shard in which current server is located", 0) \
M(UInt64, allow_experimental_parallel_reading_from_replicas, 0, "Use all the replicas from a shard for SELECT query execution. Reading is parallelized and coordinated dynamically. 0 - disabled, 1 - enabled, silently disable them in case of failure, 2 - enabled, throw an exception in case of failure", 0) \
M(Float, parallel_replicas_single_task_marks_count_multiplier, 2, "A multiplier which will be added during calculation for minimal number of marks to retrieve from coordinator. This will be applied only for remote replicas.", 0) \
M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \
@ -796,7 +796,7 @@ class IColumn;
M(UInt64, grace_hash_join_initial_buckets, 1, "Initial number of grace hash join buckets", 0) \
M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, allow_experimental_undrop_table_query, false, "Allow to use undrop query to restore dropped table in a limited time", 0) \
M(Bool, allow_experimental_undrop_table_query, true, "Allow to use undrop query to restore dropped table in a limited time", 0) \
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(UInt64, extract_kvp_max_pairs_per_row, 1000, "Max number pairs that can be produced by extractKeyValuePairs function. Used to safeguard against consuming too much memory.", 0) \
M(Timezone, session_timezone, "", "This setting can be removed in the future due to potential caveats. It is experimental and is not suitable for production usage. The default timezone for current session or query. The server default timezone if empty.", 0) \

View File

@ -687,10 +687,9 @@ namespace JSONUtils
return names_and_types;
}
NamesAndTypesList readMetadataAndValidateHeader(ReadBuffer & in, const Block & header)
void validateMetadataByHeader(const NamesAndTypesList & names_and_types_from_metadata, const Block & header)
{
auto names_and_types = JSONUtils::readMetadata(in);
for (const auto & [name, type] : names_and_types)
for (const auto & [name, type] : names_and_types_from_metadata)
{
if (!header.has(name))
continue;
@ -698,10 +697,16 @@ namespace JSONUtils
auto header_type = header.getByName(name).type;
if (!type->equals(*header_type))
throw Exception(
ErrorCodes::INCORRECT_DATA,
"Type {} of column '{}' from metadata is not the same as type in header {}",
type->getName(), name, header_type->getName());
ErrorCodes::INCORRECT_DATA,
"Type {} of column '{}' from metadata is not the same as type in header {}",
type->getName(), name, header_type->getName());
}
}
NamesAndTypesList readMetadataAndValidateHeader(ReadBuffer & in, const Block & header)
{
auto names_and_types = JSONUtils::readMetadata(in);
validateMetadataByHeader(names_and_types, header);
return names_and_types;
}

View File

@ -124,6 +124,7 @@ namespace JSONUtils
NamesAndTypesList readMetadata(ReadBuffer & in);
NamesAndTypesList readMetadataAndValidateHeader(ReadBuffer & in, const Block & header);
void validateMetadataByHeader(const NamesAndTypesList & names_and_types_from_metadata, const Block & header);
bool skipUntilFieldInObject(ReadBuffer & in, const String & desired_field_name);
void skipTheRestOfObject(ReadBuffer & in);

View File

@ -5,6 +5,7 @@
#include <Storages/IStorage.h>
#include <Common/assert_cast.h>
#include <IO/WithFileName.h>
#include <IO/WithFileSize.h>
namespace DB
@ -86,7 +87,16 @@ try
buf = read_buffer_iterator.next();
if (!buf)
break;
is_eof = buf->eof();
/// We just want to check for eof, but eof() can be pretty expensive.
/// So we use getFileSize() when available, which has better worst case.
/// (For remote files, typically eof() would read 1 MB from S3, which may be much
/// more than what the schema reader and even data reader will read).
auto size = tryGetFileSizeFromReadBuffer(*buf);
if (size.has_value())
is_eof = *size == 0;
else
is_eof = buf->eof();
}
catch (Exception & e)
{

View File

@ -53,7 +53,6 @@ public:
}
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {0, 2}; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{

View File

@ -114,7 +114,8 @@ void SelectStreamFactory::createForShard(
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count)
UInt32 shard_count,
bool parallel_replicas_enabled)
{
auto it = objects_by_shard.find(shard_info.shard_num);
if (it != objects_by_shard.end())
@ -146,7 +147,10 @@ void SelectStreamFactory::createForShard(
return;
});
if (settings.prefer_localhost_replica && shard_info.isLocal())
// prefer_localhost_replica is not effective in case of parallel replicas
// (1) prefer_localhost_replica is about choosing one replica on a shard
// (2) parallel replica coordinator has own logic to choose replicas to read from
if (settings.prefer_localhost_replica && shard_info.isLocal() && !parallel_replicas_enabled)
{
StoragePtr main_table_storage;
@ -187,7 +191,7 @@ void SelectStreamFactory::createForShard(
return;
}
UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;
const UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;
if (!max_allowed_delay)
{

View File

@ -78,7 +78,8 @@ public:
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count);
UInt32 shard_count,
bool parallel_replicas_enabled);
struct ShardPlans
{

View File

@ -178,9 +178,12 @@ void executeQuery(
main_table, query_info.additional_filter_ast, log);
new_context->increaseDistributedDepth();
size_t shards = query_info.getCluster()->getShardCount();
for (const auto & shard_info : query_info.getCluster()->getShardsInfo())
ClusterPtr cluster = query_info.getCluster();
const size_t shards = cluster->getShardCount();
for (size_t i = 0, s = cluster->getShardsInfo().size(); i < s; ++i)
{
const auto & shard_info = cluster->getShardsInfo()[i];
ASTPtr query_ast_for_shard = query_ast->clone();
if (sharding_key_expr && query_info.optimized_cluster && settings.optimize_skip_unused_shards_rewrite_in && shards > 1)
{
@ -210,9 +213,15 @@ void executeQuery(
}
}
// decide for each shard if parallel reading from replicas should be enabled
// according to settings and number of replicas declared per shard
const auto & addresses = cluster->getShardsAddresses().at(i);
bool parallel_replicas_enabled = addresses.size() > 1 && context->canUseParallelReplicas();
stream_factory.createForShard(shard_info,
query_ast_for_shard, main_table, table_func_ptr,
new_context, plans, remote_shards, static_cast<UInt32>(shards));
new_context, plans, remote_shards, static_cast<UInt32>(shards),
parallel_replicas_enabled);
}
if (!remote_shards.empty())
@ -236,7 +245,7 @@ void executeQuery(
log,
shards,
query_info.storage_limits,
query_info.getCluster()->getName());
not_optimized_cluster->getName());
read_from_remote->setStepDescription("Read from remote replica");
plan->addStep(std::move(read_from_remote));

View File

@ -4629,18 +4629,20 @@ Context::ParallelReplicasMode Context::getParallelReplicasMode() const
return SAMPLE_KEY;
}
bool Context::canUseParallelReplicasOnInitiator() const
bool Context::canUseParallelReplicas() const
{
const auto & settings_ref = getSettingsRef();
return getParallelReplicasMode() == ParallelReplicasMode::READ_TASKS && settings_ref.max_parallel_replicas > 1
&& !getClientInfo().collaborate_with_initiator;
return getParallelReplicasMode() == ParallelReplicasMode::READ_TASKS && settings_ref.max_parallel_replicas > 1;
}
bool Context::canUseParallelReplicasOnInitiator() const
{
return canUseParallelReplicas() && !getClientInfo().collaborate_with_initiator;
}
bool Context::canUseParallelReplicasOnFollower() const
{
const auto & settings_ref = getSettingsRef();
return getParallelReplicasMode() == ParallelReplicasMode::READ_TASKS && settings_ref.max_parallel_replicas > 1
&& getClientInfo().collaborate_with_initiator;
return canUseParallelReplicas() && getClientInfo().collaborate_with_initiator;
}
void Context::setPreparedSetsCache(const PreparedSetsCachePtr & cache)

View File

@ -1182,6 +1182,7 @@ public:
WriteSettings getWriteSettings() const;
/** There are multiple conditions that have to be met to be able to use parallel replicas */
bool canUseParallelReplicas() const;
bool canUseParallelReplicasOnInitiator() const;
bool canUseParallelReplicasOnFollower() const;

View File

@ -95,6 +95,7 @@ namespace ErrorCodes
extern const int SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY;
extern const int ILLEGAL_SYNTAX_FOR_DATA_TYPE;
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_INDEX;
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_DATABASE;
extern const int PATH_ACCESS_DENIED;
@ -697,6 +698,8 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
for (const auto & index : create.columns_list->indices->children)
{
IndexDescription index_desc = IndexDescription::getIndexFromAST(index->clone(), properties.columns, getContext());
if (properties.indices.has(index_desc.name))
throw Exception(ErrorCodes::ILLEGAL_INDEX, "Duplicated index name {}", backQuoteIfNeed(index_desc.name));
const auto & settings = getContext()->getSettingsRef();
if (index_desc.type == INVERTED_INDEX_NAME && !settings.allow_experimental_inverted_index)
{
@ -711,6 +714,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
properties.indices.push_back(index_desc);
}
if (create.columns_list->projections)
for (const auto & projection_ast : create.columns_list->projections->children)
{

View File

@ -18,6 +18,7 @@
#include <string_view>
#include <regex>
#include <gtest/gtest.h>
#include <boost/algorithm/string/replace.hpp>
namespace
{
@ -39,7 +40,11 @@ std::ostream & operator<<(std::ostream & ostr, const std::shared_ptr<IParser> pa
std::ostream & operator<<(std::ostream & ostr, const ParserTestCase & test_case)
{
return ostr << "ParserTestCase input: " << test_case.input_text;
// New line characters are removed because at the time of writing this the unit test results are parsed from the
// command line output, and multi-line string representations are breaking the parsing logic.
std::string input_text{test_case.input_text};
boost::replace_all(input_text, "\n", "\\n");
return ostr << "ParserTestCase input: " << input_text;
}
class ParserTest : public ::testing::TestWithParam<std::tuple<std::shared_ptr<IParser>, ParserTestCase>>
@ -494,11 +499,11 @@ INSTANTIATE_TEST_SUITE_P(
::testing::Values(std::make_shared<ParserPRQLQuery>(kDummyMaxQuerySize, kDummyMaxParserDepth)),
::testing::ValuesIn(std::initializer_list<ParserTestCase>{
{
"from albums\ngroup [author_id] (\n aggregate [first_pushlied = min published]\n)\njoin a=author side:left [==author_id]\njoin p=purchases side:right [==author_id]\ngroup [a.id, p.purchase_id] (\n aggregate [avg_sell = min first_pushlied]\n)",
"WITH table_1 AS\n (\n SELECT\n MIN(published) AS _expr_0,\n author_id\n FROM albums\n GROUP BY author_id\n )\nSELECT\n a.id,\n p.purchase_id,\n MIN(table_0._expr_0) AS avg_sell\nFROM table_1 AS table_0\nLEFT JOIN author AS a ON table_0.author_id = a.author_id\nRIGHT JOIN purchases AS p ON table_0.author_id = p.author_id\nGROUP BY\n a.id,\n p.purchase_id",
"from albums\ngroup {author_id} (\n aggregate {first_published = min published}\n)\njoin a=author side:left (==author_id)\njoin p=purchases side:right (==author_id)\ngroup {a.id, p.purchase_id} (\n aggregate {avg_sell = min first_published}\n)",
"WITH table_0 AS\n (\n SELECT\n MIN(published) AS _expr_0,\n author_id\n FROM albums\n GROUP BY author_id\n )\nSELECT\n a.id,\n p.purchase_id,\n MIN(table_0._expr_0) AS avg_sell\nFROM table_0\nLEFT JOIN author AS a ON table_0.author_id = a.author_id\nRIGHT JOIN purchases AS p ON table_0.author_id = p.author_id\nGROUP BY\n a.id,\n p.purchase_id",
},
{
"from matches\nfilter start_date > @2023-05-30 # Some comment here\nderive [\n some_derived_value_1 = a + (b ?? 0), # And there\n some_derived_value_2 = c + some_derived_value\n]\nfilter some_derived_value_2 > 0\ngroup [country, city] (\n aggregate [\n average some_derived_value_2,\n aggr = max some_derived_value_2,\n ]\n)\nderive place = f\"{city} in {country}\"\nderive country_code = s\"LEFT(country, 2)\"\nsort [aggr, -country]\ntake 1..20",
"WITH\n table_3 AS\n (\n SELECT\n country,\n city,\n c + some_derived_value AS _expr_1\n FROM matches\n WHERE start_date > toDate('2023-05-30')\n ),\n table_1 AS\n (\n SELECT\n country,\n city,\n AVG(_expr_1) AS _expr_0,\n MAX(_expr_1) AS aggr\n FROM table_3 AS table_2\n WHERE _expr_1 > 0\n GROUP BY\n country,\n city\n )\nSELECT\n country,\n city,\n _expr_0,\n aggr,\n CONCAT(city, ' in ', country) AS place,\n LEFT(country, 2) AS country_code\nFROM table_1 AS table_0\nORDER BY\n aggr ASC,\n country DESC\nLIMIT 20",
"from matches\nfilter start_date > @2023-05-30 # Some comment here\nderive {\n some_derived_value_1 = a + (b ?? 0), # And there\n some_derived_value_2 = c + some_derived_value\n}\nfilter some_derived_value_2 > 0\ngroup {country, city} (\n aggregate {\n average some_derived_value_2,\n aggr = max some_derived_value_2\n }\n)\nderive place = f\"{city} in {country}\"\nderive country_code = s\"LEFT(country, 2)\"\nsort {aggr, -country}\ntake 1..20",
"WITH\n table_1 AS\n (\n SELECT\n country,\n city,\n c + some_derived_value AS _expr_1\n FROM matches\n WHERE start_date > toDate('2023-05-30')\n ),\n table_0 AS\n (\n SELECT\n country,\n city,\n AVG(_expr_1) AS _expr_0,\n MAX(_expr_1) AS aggr\n FROM table_1\n WHERE _expr_1 > 0\n GROUP BY\n country,\n city\n )\nSELECT\n country,\n city,\n _expr_0,\n aggr,\n CONCAT(city, ' in ', country) AS place,\n LEFT(country, 2) AS country_code\nFROM table_0\nORDER BY\n aggr ASC,\n country DESC\nLIMIT 20",
},
})));

View File

@ -32,10 +32,11 @@ public:
String getName() const override { return "JSONEachRowRowInputFormat"; }
void resetParser() override;
private:
protected:
void readPrefix() override;
void readSuffix() override;
private:
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;

View File

@ -12,42 +12,106 @@ namespace ErrorCodes
}
JSONRowInputFormat::JSONRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: JSONEachRowRowInputFormat(in_, header_, params_, format_settings_, false), validate_types_from_metadata(format_settings_.json.validate_types_from_metadata)
: JSONRowInputFormat(std::make_unique<PeekableReadBuffer>(in_), header_, params_, format_settings_)
{
}
JSONRowInputFormat::JSONRowInputFormat(std::unique_ptr<PeekableReadBuffer> buf, const DB::Block & header_, DB::IRowInputFormat::Params params_, const DB::FormatSettings & format_settings_)
: JSONEachRowRowInputFormat(*buf, header_, params_, format_settings_, false), validate_types_from_metadata(format_settings_.json.validate_types_from_metadata), peekable_buf(std::move(buf))
{
}
void JSONRowInputFormat::readPrefix()
{
skipBOMIfExists(*in);
JSONUtils::skipObjectStart(*in);
if (validate_types_from_metadata)
JSONUtils::readMetadataAndValidateHeader(*in, getPort().getHeader());
else
JSONUtils::readMetadata(*in);
skipBOMIfExists(*peekable_buf);
JSONUtils::skipComma(*in);
if (!JSONUtils::skipUntilFieldInObject(*in, "data"))
throw Exception(ErrorCodes::INCORRECT_DATA, "Expected field \"data\" with table content");
PeekableReadBufferCheckpoint checkpoint(*peekable_buf);
NamesAndTypesList names_and_types_from_metadata;
JSONUtils::skipArrayStart(*in);
data_in_square_brackets = true;
/// Try to parse metadata, if failed, try to parse data as JSONEachRow format.
try
{
JSONUtils::skipObjectStart(*peekable_buf);
names_and_types_from_metadata = JSONUtils::readMetadata(*peekable_buf);
JSONUtils::skipComma(*peekable_buf);
if (!JSONUtils::skipUntilFieldInObject(*peekable_buf, "data"))
throw Exception(ErrorCodes::INCORRECT_DATA, "Expected field \"data\" with table content");
JSONUtils::skipArrayStart(*peekable_buf);
data_in_square_brackets = true;
}
catch (const ParsingException &)
{
parse_as_json_each_row = true;
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::INCORRECT_DATA)
throw;
parse_as_json_each_row = true;
}
if (parse_as_json_each_row)
{
peekable_buf->rollbackToCheckpoint();
JSONEachRowRowInputFormat::readPrefix();
}
else if (validate_types_from_metadata)
{
JSONUtils::validateMetadataByHeader(names_and_types_from_metadata, getPort().getHeader());
}
}
void JSONRowInputFormat::readSuffix()
{
JSONUtils::skipArrayEnd(*in);
JSONUtils::skipTheRestOfObject(*in);
if (parse_as_json_each_row)
{
JSONEachRowRowInputFormat::readSuffix();
}
else
{
JSONUtils::skipArrayEnd(*peekable_buf);
JSONUtils::skipTheRestOfObject(*peekable_buf);
}
}
JSONRowSchemaReader::JSONRowSchemaReader(ReadBuffer & in_) : ISchemaReader(in_)
void JSONRowInputFormat::setReadBuffer(DB::ReadBuffer & in_)
{
peekable_buf->setSubBuffer(in_);
}
void JSONRowInputFormat::resetParser()
{
JSONEachRowRowInputFormat::resetParser();
peekable_buf->reset();
}
JSONRowSchemaReader::JSONRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: JSONRowSchemaReader(std::make_unique<PeekableReadBuffer>(in_), format_settings_)
{
}
JSONRowSchemaReader::JSONRowSchemaReader(std::unique_ptr<PeekableReadBuffer> buf, const DB::FormatSettings & format_settings_)
: JSONEachRowSchemaReader(*buf, format_settings_), peekable_buf(std::move(buf))
{
}
NamesAndTypesList JSONRowSchemaReader::readSchema()
{
skipBOMIfExists(in);
JSONUtils::skipObjectStart(in);
return JSONUtils::readMetadata(in);
skipBOMIfExists(*peekable_buf);
PeekableReadBufferCheckpoint checkpoint(*peekable_buf);
/// Try to parse metadata, if failed, try to parse data as JSONEachRow format
try
{
JSONUtils::skipObjectStart(*peekable_buf);
return JSONUtils::readMetadata(*peekable_buf);
}
catch (...)
{
peekable_buf->rollbackToCheckpoint(true);
return JSONEachRowSchemaReader::readSchema();
}
}
void registerInputFormatJSON(FormatFactory & factory)
@ -69,7 +133,7 @@ void registerJSONSchemaReader(FormatFactory & factory)
auto register_schema_reader = [&](const String & format)
{
factory.registerSchemaReader(
format, [](ReadBuffer & buf, const FormatSettings &) { return std::make_unique<JSONRowSchemaReader>(buf); });
format, [](ReadBuffer & buf, const FormatSettings & format_settings) { return std::make_unique<JSONRowSchemaReader>(buf, format_settings); });
};
register_schema_reader("JSON");
/// JSONCompact has the same suffix with metadata.

View File

@ -23,21 +23,38 @@ public:
String getName() const override { return "JSONRowInputFormat"; }
void setReadBuffer(ReadBuffer & in_) override;
void resetParser() override;
private:
JSONRowInputFormat(
std::unique_ptr<PeekableReadBuffer> buf,
const Block & header_,
Params params_,
const FormatSettings & format_settings_);
void readPrefix() override;
void readSuffix() override;
const bool validate_types_from_metadata;
bool parse_as_json_each_row = false;
std::unique_ptr<PeekableReadBuffer> peekable_buf;
std::exception_ptr reading_metadata_exception;
};
class JSONRowSchemaReader : public ISchemaReader
class JSONRowSchemaReader : public JSONEachRowSchemaReader
{
public:
JSONRowSchemaReader(ReadBuffer & in_);
JSONRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_);
NamesAndTypesList readSchema() override;
bool hasStrictOrderOfColumns() const override { return false; }
private:
JSONRowSchemaReader(std::unique_ptr<PeekableReadBuffer> buf, const FormatSettings & format_settings_);
std::unique_ptr<PeekableReadBuffer> peekable_buf;
};
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <Processors/Chunk.h>
#include <Interpreters/Context.h>
namespace DB
{
/// This interface is meant to be used by the SinkToStorage processor
/// SinkToStorage delegates on it the creation of the data chunk that will deliver to the next stages of the query pipeline
/// Default implementation (createDefault() factory method) just forwards everything that it receives
class IOutputChunkGenerator
{
public:
static std::unique_ptr<IOutputChunkGenerator> createCopyRanges(bool deduplicate_later);
static std::unique_ptr<IOutputChunkGenerator> createDefault();
virtual ~IOutputChunkGenerator() = default;
virtual void onNewChunkArrived(Chunk chunk) = 0;
virtual void onRowsProcessed(size_t row_count, bool append) = 0;
virtual Chunk generateChunk() = 0;
};
}

View File

@ -0,0 +1,91 @@
#include <Processors/Sinks/IOutputChunkGenerator.h>
namespace DB
{
/// Default implementation. The new chunk received is forwarded as-is to the next stages of the query
class ForwardEverythingGenerator : public IOutputChunkGenerator
{
public:
explicit ForwardEverythingGenerator() = default;
void onNewChunkArrived(Chunk chunk) override
{
in_chunk = chunk.clone();
}
void onRowsProcessed(size_t /*row_count*/, bool /*append*/) override
{}
Chunk generateChunk() override
{
return std::move(in_chunk);
}
private:
Chunk in_chunk;
};
/// Specific implementation which generates a chunk with just a subset of the rows received originally
/// Rows are assumed to be processed in the same order than they appear in the original chunk
/// Is up to the client to decide how many rows process at once, but after each range processed,
/// onRowsProcessed() has to be called, indicating whether append that range to the output chunk or not
class CopyRangesGenerator : public IOutputChunkGenerator
{
public:
explicit CopyRangesGenerator() = default;
void onNewChunkArrived(Chunk chunk) override
{
out_cols = chunk.cloneEmptyColumns();
in_chunk = std::move(chunk);
row_offset = 0;
final_chunk_rows = 0;
}
void onRowsProcessed(size_t row_count, bool append) override
{
if (append)
{
const Columns& in_cols = in_chunk.getColumns();
for (size_t i = 0; i < out_cols.size(); i++)
{
out_cols[i]->insertRangeFrom(*(in_cols[i]), row_offset, row_count);
}
final_chunk_rows += row_count;
}
row_offset += row_count;
}
Chunk generateChunk() override
{
return Chunk(std::move(out_cols), final_chunk_rows);
}
private:
Chunk in_chunk;
MutableColumns out_cols;
size_t row_offset = 0;
size_t final_chunk_rows = 0;
};
std::unique_ptr<IOutputChunkGenerator> IOutputChunkGenerator::createCopyRanges(bool deduplicate_later)
{
// If MV is responsible for deduplication, block won't be considered duplicated.
// So default implementation, forwarding all the data, is used
if (deduplicate_later)
{
return createDefault();
}
return std::make_unique<CopyRangesGenerator>();
}
std::unique_ptr<IOutputChunkGenerator> IOutputChunkGenerator::createDefault()
{
return std::make_unique<ForwardEverythingGenerator>();
}
}

View File

@ -4,7 +4,12 @@
namespace DB
{
SinkToStorage::SinkToStorage(const Block & header) : ExceptionKeepingTransform(header, header, false) {}
SinkToStorage::SinkToStorage(const Block & header) : SinkToStorage(header, IOutputChunkGenerator::createDefault()) {}
SinkToStorage::SinkToStorage(const Block & header, std::unique_ptr<IOutputChunkGenerator> output_generator_)
: ExceptionKeepingTransform(header, header, false),
output_generator(std::move(output_generator_))
{ }
void SinkToStorage::onConsume(Chunk chunk)
{
@ -15,15 +20,15 @@ void SinkToStorage::onConsume(Chunk chunk)
*/
Nested::validateArraySizes(getHeader().cloneWithColumns(chunk.getColumns()));
output_generator->onNewChunkArrived(chunk.clone());
consume(chunk.clone());
if (!lastBlockIsDuplicate())
cur_chunk = std::move(chunk);
}
SinkToStorage::GenerateResult SinkToStorage::onGenerate()
{
GenerateResult res;
res.chunk = std::move(cur_chunk);
res.chunk = output_generator->generateChunk();
res.is_done = true;
return res;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Storages/TableLockHolder.h>
#include <Processors/Transforms/ExceptionKeepingTransform.h>
#include <Processors/Sinks/IOutputChunkGenerator.h>
namespace DB
{
@ -13,13 +14,15 @@ friend class PartitionedSink;
public:
explicit SinkToStorage(const Block & header);
explicit SinkToStorage(const Block & header, std::unique_ptr<IOutputChunkGenerator> output_generator_);
const Block & getHeader() const { return inputs.front().getHeader(); }
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
protected:
virtual void consume(Chunk chunk) = 0;
virtual bool lastBlockIsDuplicate() const { return false; }
IOutputChunkGenerator& getOutputGenerator() { return *output_generator; }
private:
std::vector<TableLockHolder> table_locks;
@ -27,7 +30,7 @@ private:
void onConsume(Chunk chunk) override;
GenerateResult onGenerate() override;
Chunk cur_chunk;
std::unique_ptr<IOutputChunkGenerator> output_generator;
};
using SinkToStoragePtr = std::shared_ptr<SinkToStorage>;

View File

@ -130,7 +130,8 @@ ReplicatedMergeTreeSinkImpl<async_insert>::ReplicatedMergeTreeSinkImpl(
bool majority_quorum,
ContextPtr context_,
bool is_attach_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
: SinkToStorage(metadata_snapshot_->getSampleBlock(),
IOutputChunkGenerator::createCopyRanges(context_->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, required_quorum_size(majority_quorum ? std::nullopt : std::make_optional<size_t>(quorum_size))
@ -386,13 +387,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
finishDelayedChunk(zookeeper);
delayed_chunk = std::make_unique<ReplicatedMergeTreeSinkImpl::DelayedChunk>();
delayed_chunk->partitions = std::move(partitions);
/// If deduplicated data should not be inserted into MV, we need to set proper
/// value for `last_block_is_duplicate`, which is possible only after the part is committed.
/// Othervide we can delay commit.
/// TODO: we can also delay commit if there is no MVs.
if (!settings.deduplicate_blocks_in_dependent_materialized_views)
finishDelayedChunk(zookeeper);
finishDelayedChunk(zookeeper);
++num_blocks_processed;
}
@ -403,8 +398,6 @@ void ReplicatedMergeTreeSinkImpl<false>::finishDelayedChunk(const ZooKeeperWithF
if (!delayed_chunk)
return;
last_block_is_duplicate = false;
for (auto & partition : delayed_chunk->partitions)
{
ProfileEventsScope scoped_attach(&partition.part_counters);
@ -415,9 +408,10 @@ void ReplicatedMergeTreeSinkImpl<false>::finishDelayedChunk(const ZooKeeperWithF
try
{
bool deduplicated = commitPart(zookeeper, part, partition.block_id, delayed_chunk->replicas_num, false).second;
const size_t rowsCount = partition.temp_part.part->rows_count;
const bool deduplicated = commitPart(zookeeper, part, partition.block_id, delayed_chunk->replicas_num, false).second;
last_block_is_duplicate = last_block_is_duplicate || deduplicated;
getOutputGenerator().onRowsProcessed(rowsCount, !deduplicated);
/// Set a special error code if the block is duplicate
int error = (deduplicate && deduplicated) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
@ -1092,13 +1086,6 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::onStart()
storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event, context, true);
}
template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::onFinish()
{
auto zookeeper = storage.getZooKeeper();
finishDelayedChunk(std::make_shared<ZooKeeperWithFaultInjection>(zookeeper));
}
template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::waitForQuorum(
const ZooKeeperWithFaultInjectionPtr & zookeeper,

View File

@ -51,23 +51,12 @@ public:
void onStart() override;
void consume(Chunk chunk) override;
void onFinish() override;
String getName() const override { return "ReplicatedMergeTreeSink"; }
/// For ATTACHing existing data on filesystem.
bool writeExistingPart(MergeTreeData::MutableDataPartPtr & part);
/// For proper deduplication in MaterializedViews
bool lastBlockIsDuplicate() const override
{
/// If MV is responsible for deduplication, block is not considered duplicating.
if (context->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views)
return false;
return last_block_is_duplicate;
}
struct DelayedChunk;
private:
using BlockIDsType = std::conditional_t<async_insert, std::vector<String>, String>;
@ -122,7 +111,6 @@ private:
bool is_attach = false;
bool quorum_parallel = false;
const bool deduplicate = true;
bool last_block_is_duplicate = false;
UInt64 num_blocks_processed = 0;
using Logger = Poco::Logger;

View File

@ -210,3 +210,12 @@ def download_performance_build(check_name, reports_path, result_path):
result_path,
lambda x: x.endswith("performance.tar.zst"),
)
def download_fuzzers(check_name, reports_path, result_path):
download_builds_filter(
check_name,
reports_path,
result_path,
lambda x: x.endswith(("_fuzzer", ".dict", ".options", "_seed_corpus.zip")),
)

View File

@ -282,6 +282,7 @@ CI_CONFIG = CiConfig(
"SQLancer (debug)": TestConfig("package_debug"),
"Sqllogic test (release)": TestConfig("package_release"),
"SQLTest": TestConfig("package_release"),
"libFuzzer tests": TestConfig("fuzzers"),
},
)
CI_CONFIG.validate()

View File

@ -0,0 +1,190 @@
#!/usr/bin/env python3
import argparse
import logging
import os
import sys
import atexit
import zipfile
from pathlib import Path
from typing import List
from github import Github
from build_download_helper import download_fuzzers
from clickhouse_helper import (
CiLogsCredentials,
)
from commit_status_helper import (
RerunHelper,
get_commit,
update_mergeable_check,
)
from docker_pull_helper import DockerImage, get_image_with_version
from env_helper import TEMP_PATH, REPO_COPY, REPORTS_PATH
from get_robot_token import get_best_robot_token
from pr_info import PRInfo
from report import TestResults
from stopwatch import Stopwatch
from tee_popen import TeePopen
NO_CHANGES_MSG = "Nothing to run"
def get_additional_envs(check_name, run_by_hash_num, run_by_hash_total):
result = []
if "DatabaseReplicated" in check_name:
result.append("USE_DATABASE_REPLICATED=1")
if "DatabaseOrdinary" in check_name:
result.append("USE_DATABASE_ORDINARY=1")
if "wide parts enabled" in check_name:
result.append("USE_POLYMORPHIC_PARTS=1")
if "ParallelReplicas" in check_name:
result.append("USE_PARALLEL_REPLICAS=1")
if "s3 storage" in check_name:
result.append("USE_S3_STORAGE_FOR_MERGE_TREE=1")
if "analyzer" in check_name:
result.append("USE_NEW_ANALYZER=1")
if run_by_hash_total != 0:
result.append(f"RUN_BY_HASH_NUM={run_by_hash_num}")
result.append(f"RUN_BY_HASH_TOTAL={run_by_hash_total}")
return result
def get_run_command(
fuzzers_path: Path,
repo_path: Path,
result_path: Path,
kill_timeout: int,
additional_envs: List[str],
ci_logs_args: str,
image: DockerImage,
) -> str:
additional_options = ["--hung-check"]
additional_options.append("--print-time")
additional_options_str = (
'-e ADDITIONAL_OPTIONS="' + " ".join(additional_options) + '"'
)
envs = [
f"-e MAX_RUN_TIME={int(0.9 * kill_timeout)}",
# a static link, don't use S3_URL or S3_DOWNLOAD
'-e S3_URL="https://s3.amazonaws.com/clickhouse-datasets"',
]
envs += [f"-e {e}" for e in additional_envs]
env_str = " ".join(envs)
return (
f"docker run "
f"{ci_logs_args} "
f"--workdir=/fuzzers "
f"--volume={fuzzers_path}:/fuzzers "
f"--volume={repo_path}/tests:/usr/share/clickhouse-test "
f"--volume={result_path}:/test_output "
f"--cap-add=SYS_PTRACE {env_str} {additional_options_str} {image}"
)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("check_name")
parser.add_argument("kill_timeout", type=int)
return parser.parse_args()
def main():
logging.basicConfig(level=logging.INFO)
stopwatch = Stopwatch()
temp_path = Path(TEMP_PATH)
repo_path = Path(REPO_COPY)
reports_path = REPORTS_PATH
args = parse_args()
check_name = args.check_name
kill_timeout = args.kill_timeout
gh = Github(get_best_robot_token(), per_page=100)
pr_info = PRInfo()
commit = get_commit(gh, pr_info.sha)
atexit.register(update_mergeable_check, gh, pr_info, check_name)
temp_path.mkdir(parents=True, exist_ok=True)
if "RUN_BY_HASH_NUM" in os.environ:
run_by_hash_num = int(os.getenv("RUN_BY_HASH_NUM", "0"))
run_by_hash_total = int(os.getenv("RUN_BY_HASH_TOTAL", "0"))
check_name_with_group = (
check_name + f" [{run_by_hash_num + 1}/{run_by_hash_total}]"
)
else:
run_by_hash_num = 0
run_by_hash_total = 0
check_name_with_group = check_name
rerun_helper = RerunHelper(commit, check_name_with_group)
if rerun_helper.is_already_finished_by_status():
logging.info("Check is already finished according to github status, exiting")
sys.exit(0)
docker_image = get_image_with_version(reports_path, "clickhouse/libfuzzer")
fuzzers_path = temp_path / "fuzzers"
fuzzers_path.mkdir(parents=True, exist_ok=True)
download_fuzzers(check_name, reports_path, fuzzers_path)
for file in os.listdir(fuzzers_path):
if file.endswith("_fuzzer"):
os.chmod(fuzzers_path / file, 0o777)
elif file.endswith("_seed_corpus.zip"):
corpus_path = fuzzers_path / (file.removesuffix("_seed_corpus.zip") + ".in")
zipfile.ZipFile(fuzzers_path / file, "r").extractall(corpus_path)
result_path = temp_path / "result_path"
result_path.mkdir(parents=True, exist_ok=True)
run_log_path = result_path / "run.log"
additional_envs = get_additional_envs(
check_name, run_by_hash_num, run_by_hash_total
)
ci_logs_credentials = CiLogsCredentials(Path(temp_path) / "export-logs-config.sh")
ci_logs_args = ci_logs_credentials.get_docker_arguments(
pr_info, stopwatch.start_time_str, check_name
)
run_command = get_run_command(
fuzzers_path,
repo_path,
result_path,
kill_timeout,
additional_envs,
ci_logs_args,
docker_image,
)
logging.info("Going to run libFuzzer tests: %s", run_command)
with TeePopen(run_command, run_log_path) as process:
retcode = process.wait()
if retcode == 0:
logging.info("Run successfully")
else:
logging.info("Run failed")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@ -55,6 +55,7 @@ class TeePopen:
stderr=STDOUT,
stdout=PIPE,
bufsize=1,
errors="backslashreplace",
)
if self.timeout is not None and self.timeout > 0:
t = Thread(target=self._check_timeout)

28
tests/fuzz/build.sh Executable file
View File

@ -0,0 +1,28 @@
#!/bin/bash -eu
# copy fuzzer options and dictionaries
cp $SRC/tests/fuzz/*.dict $OUT/
cp $SRC/tests/fuzz/*.options $OUT/
# prepare corpus dirs
mkdir -p $BIN/tests/fuzz/lexer_fuzzer.in/
mkdir -p $BIN/tests/fuzz/select_parser_fuzzer.in/
mkdir -p $BIN/tests/fuzz/create_parser_fuzzer.in/
mkdir -p $BIN/tests/fuzz/execute_query_fuzzer.in/
# prepare corpus
cp $SRC/tests/queries/0_stateless/*.sql $BIN/tests/fuzz/lexer_fuzzer.in/
cp $SRC/tests/queries/0_stateless/*.sql $BIN/tests/fuzz/select_parser_fuzzer.in/
cp $SRC/tests/queries/0_stateless/*.sql $BIN/tests/fuzz/create_parser_fuzzer.in/
cp $SRC/tests/queries/0_stateless/*.sql $BIN/tests/fuzz/execute_query_fuzzer.in/
cp $SRC/tests/queries/1_stateful/*.sql $BIN/tests/fuzz/lexer_fuzzer.in/
cp $SRC/tests/queries/1_stateful/*.sql $BIN/tests/fuzz/select_parser_fuzzer.in/
cp $SRC/tests/queries/1_stateful/*.sql $BIN/tests/fuzz/create_parser_fuzzer.in/
cp $SRC/tests/queries/1_stateful/*.sql $BIN/tests/fuzz/execute_query_fuzzer.in/
# build corpus archives
cd $BIN/tests/fuzz
for dir in *_fuzzer.in; do
fuzzer=$(basename $dir .in)
zip -rj "$OUT/${fuzzer}_seed_corpus.zip" "${dir}/"
done

View File

@ -95,7 +95,14 @@ def create_tables(cluster, table_name):
return "60\t0\t59\t1770\n"
def test_read_equally_from_each_replica(start_cluster):
@pytest.mark.parametrize(
"prefer_localhost_replica",
[
pytest.param(0),
pytest.param(1),
],
)
def test_read_equally_from_each_replica(start_cluster, prefer_localhost_replica):
"""create and populate table in special way (see create_table()),
so parallel replicas will read equal number of rows from each replica
"""
@ -110,7 +117,7 @@ def test_read_equally_from_each_replica(start_cluster):
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"prefer_localhost_replica": 0,
"prefer_localhost_replica": prefer_localhost_replica,
"max_parallel_replicas": 3,
"use_hedged_requests": 0,
},

View File

@ -106,10 +106,18 @@ def create_tables(cluster, table_name):
pytest.param("test_single_shard_multiple_replicas", 3, 0),
pytest.param("test_single_shard_multiple_replicas", 4, 0),
pytest.param("test_single_shard_multiple_replicas", 10, 0),
pytest.param("test_single_shard_multiple_replicas", 2, 1),
pytest.param("test_single_shard_multiple_replicas", 3, 1),
pytest.param("test_single_shard_multiple_replicas", 4, 1),
pytest.param("test_single_shard_multiple_replicas", 10, 1),
pytest.param("test_multiple_shards_multiple_replicas", 2, 0),
pytest.param("test_multiple_shards_multiple_replicas", 3, 0),
pytest.param("test_multiple_shards_multiple_replicas", 4, 0),
pytest.param("test_multiple_shards_multiple_replicas", 10, 0),
pytest.param("test_multiple_shards_multiple_replicas", 2, 1),
pytest.param("test_multiple_shards_multiple_replicas", 3, 1),
pytest.param("test_multiple_shards_multiple_replicas", 4, 1),
pytest.param("test_multiple_shards_multiple_replicas", 10, 1),
],
)
def test_parallel_replicas_over_distributed(

View File

@ -1,6 +1,6 @@
SET session_timezone = 'UTC';
SELECT ADDDATE('2022-05-07'::Date, INTERVAL 5 MINUTE);
SELECT ADDDATE(materialize('2022-05-07'::Date), INTERVAL 5 MINUTE);
SELECT addDate('2022-05-07'::Date, INTERVAL 5 MINUTE);
SELECT addDate('2022-05-07'::Date32, INTERVAL 5 MINUTE);
@ -14,7 +14,7 @@ SELECT addDate('1234', INTERVAL 5 MINUTE); -- { serverError ILLEGAL_TYPE_OF_ARG
SELECT '---';
SELECT SUBDATE('2022-05-07'::Date, INTERVAL 5 MINUTE);
SELECT SUBDATE(materialize('2022-05-07'::Date), INTERVAL 5 MINUTE);
SELECT subDate('2022-05-07'::Date, INTERVAL 5 MINUTE);
SELECT subDate('2022-05-07'::Date32, INTERVAL 5 MINUTE);

View File

@ -0,0 +1,14 @@
Initial
2020-01-01 13:00:00 24
Last block is duplicate
2020-01-01 13:00:00 24
2021-09-01 11:00:00 24
One block is duplicate (default setting)
2020-01-01 13:00:00 24
2021-09-01 11:00:00 24
2022-01-01 12:00:00 24
One block is duplicate (changed setting)
2020-01-01 13:00:00 24
2021-09-01 11:00:00 24
2022-01-01 12:00:00 24
2023-01-01 12:00:00 24

View File

@ -0,0 +1,44 @@
-- Tags: zookeeper
DROP TABLE IF EXISTS landing SYNC;
DROP TABLE IF EXISTS mv SYNC;
CREATE TABLE landing
(
`time` DateTime,
`number` Int64
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/landing/', 'r1')
PARTITION BY toYYYYMMDD(time)
ORDER BY time;
CREATE MATERIALIZED VIEW mv
ENGINE = ReplicatedSummingMergeTree('/clickhouse/{database}/tables/mv', 'r1')
PARTITION BY toYYYYMMDD(hour) ORDER BY hour
AS SELECT
toStartOfHour(time) AS hour,
sum(number) AS sum_amount
FROM landing GROUP BY hour;
SELECT 'Initial';
INSERT INTO landing VALUES ('2020-01-01 13:23:34', 24);
SELECT * FROM mv ORDER BY hour;
SELECT 'Last block is duplicate';
INSERT INTO landing VALUES ('2021-09-01 11:00:00', 24), ('2020-01-01 13:23:34', 24);
SELECT * FROM mv ORDER BY hour;
SELECT 'One block is duplicate (default setting)';
SET max_insert_delayed_streams_for_parallel_write = 0;
INSERT INTO landing VALUES ('2021-09-01 11:00:00', 24), ('2022-01-01 12:03:00', 24);
SELECT * FROM mv ORDER BY hour;
SELECT 'One block is duplicate (changed setting)';
SET max_insert_delayed_streams_for_parallel_write = 5;
INSERT INTO landing VALUES ('2021-09-01 11:00:00', 24), ('2023-01-01 12:03:00', 24);
SELECT * FROM mv ORDER BY hour;
DROP TABLE mv;
DROP TABLE landing;

View File

@ -0,0 +1,3 @@
a Nullable(Int64)
b Nullable(String)
10 Hello

View File

@ -0,0 +1,3 @@
desc format(JSON, '{"a" : 10, "b" : "Hello"}');
select * from format(JSON, '{"a" : 10, "b" : "Hello"}');

View File

@ -0,0 +1,10 @@
DROP TABLE IF EXISTS test_dup_index;
CREATE TABLE test_dup_index
(
a Int64,
b Int64,
INDEX idx_a a TYPE minmax,
INDEX idx_a b TYPE minmax
) Engine = MergeTree()
ORDER BY a; -- { serverError ILLEGAL_INDEX }