Merge branch 'master' into jepen-multiple-tests

This commit is contained in:
Antonio Andelic 2022-11-04 13:05:03 +00:00
commit 2744cea382
56 changed files with 616 additions and 286 deletions

View File

@ -3,10 +3,20 @@ option (ENABLE_CLANG_TIDY "Use clang-tidy static analyzer" OFF)
if (ENABLE_CLANG_TIDY)
find_program (CLANG_TIDY_PATH NAMES "clang-tidy" "clang-tidy-15" "clang-tidy-14" "clang-tidy-13" "clang-tidy-12")
find_program (CLANG_TIDY_CACHE_PATH NAMES "clang-tidy-cache")
if (CLANG_TIDY_CACHE_PATH)
find_program (_CLANG_TIDY_PATH NAMES "clang-tidy" "clang-tidy-15" "clang-tidy-14" "clang-tidy-13" "clang-tidy-12")
# Why do we use ';' here?
# It's a cmake black magic: https://cmake.org/cmake/help/latest/prop_tgt/LANG_CLANG_TIDY.html#prop_tgt:%3CLANG%3E_CLANG_TIDY
# The CLANG_TIDY_PATH is passed to CMAKE_CXX_CLANG_TIDY, which follows CXX_CLANG_TIDY syntax.
set (CLANG_TIDY_PATH "${CLANG_TIDY_CACHE_PATH};${_CLANG_TIDY_PATH}" CACHE STRING "A combined command to run clang-tidy with caching wrapper")
else ()
find_program (CLANG_TIDY_PATH NAMES "clang-tidy" "clang-tidy-15" "clang-tidy-14" "clang-tidy-13" "clang-tidy-12")
endif ()
if (CLANG_TIDY_PATH)
message(STATUS
message (STATUS
"Using clang-tidy: ${CLANG_TIDY_PATH}.
The checks will be run during build process.
See the .clang-tidy file at the root directory to configure the checks.")
@ -15,11 +25,15 @@ if (ENABLE_CLANG_TIDY)
# clang-tidy requires assertions to guide the analysis
# Note that NDEBUG is set implicitly by CMake for non-debug builds
set(COMPILER_FLAGS "${COMPILER_FLAGS} -UNDEBUG")
set (COMPILER_FLAGS "${COMPILER_FLAGS} -UNDEBUG")
# The variable CMAKE_CXX_CLANG_TIDY will be set inside src and base directories with non third-party code.
# The variable CMAKE_CXX_CLANG_TIDY will be set inside the following directories with non third-party code.
# - base
# - programs
# - src
# - utils
# set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH}")
else ()
message(${RECONFIGURE_MESSAGE_LEVEL} "clang-tidy is not found")
message (${RECONFIGURE_MESSAGE_LEVEL} "clang-tidy is not found")
endif ()
endif ()

View File

@ -91,6 +91,9 @@ ENV PATH="$PATH:/usr/local/go/bin"
ENV GOPATH=/workdir/go
ENV GOCACHE=/workdir/
RUN curl https://raw.githubusercontent.com/matus-chochlik/ctcache/7fd516e91c17779cbc6fc18bd119313d9532dd90/clang-tidy-cache -Lo /usr/bin/clang-tidy-cache \
&& chmod +x /usr/bin/clang-tidy-cache
RUN mkdir /workdir && chmod 777 /workdir
WORKDIR /workdir

View File

@ -258,6 +258,10 @@ def parse_env_variables(
if clang_tidy:
# 15G is not enough for tidy build
cache_maxsize = "25G"
# `CTCACHE_DIR` has the same purpose as the `CCACHE_DIR` above.
# It's there to have the clang-tidy cache embedded into our standard `CCACHE_DIR`
result.append("CTCACHE_DIR=/ccache/clang-tidy-cache")
result.append(f"CCACHE_MAXSIZE={cache_maxsize}")
if distcc_hosts:
@ -282,9 +286,7 @@ def parse_env_variables(
cmake_flags.append("-DENABLE_TESTS=1")
if shared_libraries:
cmake_flags.append(
"-DUSE_STATIC_LIBRARIES=0 -DSPLIT_SHARED_LIBRARIES=1"
)
cmake_flags.append("-DUSE_STATIC_LIBRARIES=0 -DSPLIT_SHARED_LIBRARIES=1")
# We can't always build utils because it requires too much space, but
# we have to build them at least in some way in CI. The shared library
# build is probably the least heavy disk-wise.

View File

@ -2939,7 +2939,7 @@ Possible values:
- 0 — Projection optimization disabled.
- 1 — Projection optimization enabled.
Default value: `0`.
Default value: `1`.
## force_optimize_projection {#force-optimize-projection}

View File

@ -28,18 +28,34 @@ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 8919F6BD2B48D7
sudo apt-get update
```
### You Get the Unsupported Architecture Warning with Apt-get {#you-get-the-unsupported-architecture-warning-with-apt-get}
### You Get Different Warnings with `apt-get update` {#you-get-different-warnings-with-apt-get-update}
- The completed warning message is as follows:
- The completed warning messages are as one of following:
```
N: Skipping acquire of configured file 'main/binary-i386/Packages' as repository 'https://packages.clickhouse.com/deb stable InRelease' doesn't support architecture 'i386'
```
```
E: Failed to fetch https://packages.clickhouse.com/deb/dists/stable/main/binary-amd64/Packages.gz File has unexpected size (30451 != 28154). Mirror sync in progress?
```
```
E: Repository 'https://packages.clickhouse.com/deb stable InRelease' changed its 'Origin' value from 'Artifactory' to 'ClickHouse'
E: Repository 'https://packages.clickhouse.com/deb stable InRelease' changed its 'Label' value from 'Artifactory' to 'ClickHouse'
N: Repository 'https://packages.clickhouse.com/deb stable InRelease' changed its 'Suite' value from 'stable' to ''
N: This must be accepted explicitly before updates for this repository can be applied. See apt-secure(8) manpage for details.
```
```
Err:11 https://packages.clickhouse.com/deb stable InRelease
400 Bad Request [IP: 172.66.40.249 443]
```
To resolve the above issue, please use the following script:
```bash
sudo rm /var/lib/apt/lists/packages.clickhouse.com_* /var/lib/dpkg/arch
sudo rm /var/lib/apt/lists/packages.clickhouse.com_* /var/lib/dpkg/arch /var/lib/apt/lists/partial/packages.clickhouse.com_*
sudo apt-get clean
sudo apt-get autoclean
```

View File

@ -571,13 +571,13 @@ Similar to base58Decode, but returns an empty string in case of error.
## base64Encode(s)
Encodes s string into base64
Encodes s FixedString or String into base64.
Alias: `TO_BASE64`.
## base64Decode(s)
Decode base64-encoded string s into original string. In case of failure raises an exception.
Decode base64-encoded FixedString or String s into original string. In case of failure raises an exception.
Alias: `FROM_BASE64`.

View File

@ -6,28 +6,29 @@ sidebar_label: For Replacing in Strings
# Functions for Searching and Replacing in Strings
:::note
:::note
Functions for [searching](../../sql-reference/functions/string-search-functions.md) and [other manipulations with strings](../../sql-reference/functions/string-functions.md) are described separately.
:::
## replaceOne(haystack, pattern, replacement)
Replaces the first occurrence, if it exists, of the pattern substring in haystack with the replacement substring.
Hereafter, pattern and replacement must be constants.
Replaces the first occurrence of the substring pattern (if it exists) in haystack by the replacement string.
pattern and replacement must be constants.
## replaceAll(haystack, pattern, replacement), replace(haystack, pattern, replacement)
Replaces all occurrences of the pattern substring in haystack with the replacement substring.
Replaces all occurrences of the substring pattern in haystack by the replacement string.
## replaceRegexpOne(haystack, pattern, replacement)
Replacement using the pattern regular expression. A re2 regular expression.
Replaces only the first occurrence, if it exists.
A pattern can be specified as replacement. This pattern can include substitutions `\0-\9`.
The substitution `\0` includes the entire regular expression. Substitutions `\1-\9` correspond to the subpattern numbers.To use the `\` character in a template, escape it using `\`.
Also keep in mind that a string literal requires an extra escape.
Replaces the first occurrence of the substring matching the regular expression pattern in haystack by the replacement string.
pattern must be a constant [re2 regular expression](https://github.com/google/re2/wiki/Syntax).
replacement must be a plain constant string or a constant string containing substitutions `\0-\9`.
Substitutions `\1-\9` correspond to the 1st to 9th capturing group (submatch), substitution `\0` corresponds to the entire match.
To use a verbatim `\` character in the pattern or replacement string, escape it using `\`.
Also keep in mind that string literals require an extra escaping.
Example 1. Converting the date to American format:
Example 1. Converting ISO dates to American format:
``` sql
SELECT DISTINCT
@ -62,7 +63,7 @@ SELECT replaceRegexpOne('Hello, World!', '.*', '\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0')
## replaceRegexpAll(haystack, pattern, replacement)
This does the same thing, but replaces all the occurrences. Example:
Like replaceRegexpOne, but replaces all occurrences of the pattern. Example:
``` sql
SELECT replaceRegexpAll('Hello, World!', '.', '\\0\\0') AS res

View File

@ -37,7 +37,7 @@ deb:
contents:
- src: root/etc/clickhouse-client/config.xml
dst: /etc/clickhouse-client/config.xml
type: config
type: config|noreplace
- src: root/usr/bin/clickhouse-benchmark
dst: /usr/bin/clickhouse-benchmark
- src: root/usr/bin/clickhouse-compressor

View File

@ -29,7 +29,7 @@ deb:
contents:
- src: root/etc/clickhouse-keeper/keeper_config.xml
dst: /etc/clickhouse-keeper/keeper_config.xml
type: config
type: config|noreplace
- src: root/usr/bin/clickhouse-keeper
dst: /usr/bin/clickhouse-keeper
# docs

View File

@ -44,10 +44,10 @@ deb:
contents:
- src: root/etc/clickhouse-server/config.xml
dst: /etc/clickhouse-server/config.xml
type: config
type: config|noreplace
- src: root/etc/clickhouse-server/users.xml
dst: /etc/clickhouse-server/users.xml
type: config
type: config|noreplace
- src: clickhouse-server.init
dst: /etc/init.d/clickhouse-server
- src: clickhouse-server.service

View File

@ -18,8 +18,10 @@ AggregateFunctionPtr createAggregateFunctionAnalysisOfVariance(const std::string
assertNoParameters(name, parameters);
assertBinary(name, arguments);
if (!isNumber(arguments[0]) || !isNumber(arguments[1]))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Aggregate function {} only supports numerical types", name);
if (!isNumber(arguments[0]))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Aggregate function {} only supports numerical argument types", name);
if (!WhichDataType(arguments[1]).isNativeUInt())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Second argument of aggregate function {} should be a native unsigned integer", name);
return std::make_shared<AggregateFunctionAnalysisOfVariance>(arguments, parameters);
}

View File

@ -77,7 +77,7 @@ public:
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
auto f_stat = data(place).getFStatistic();
if (std::isinf(f_stat) || isNaN(f_stat))
if (std::isinf(f_stat) || isNaN(f_stat) || f_stat < 0)
throw Exception("F statistic is not defined or infinite for these arguments", ErrorCodes::BAD_ARGUMENTS);
auto p_value = data(place).getPValue(f_stat);

View File

@ -482,6 +482,8 @@ struct ZTestMoments
template <typename T>
struct AnalysisOfVarianceMoments
{
constexpr static size_t MAX_GROUPS_NUMBER = 1024 * 1024;
/// Sums of values within a group
std::vector<T> xs1{};
/// Sums of squared values within a group
@ -494,6 +496,10 @@ struct AnalysisOfVarianceMoments
if (xs1.size() >= possible_size)
return;
if (possible_size > MAX_GROUPS_NUMBER)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Too many groups for analysis of variance (should be no more than {}, got {})",
MAX_GROUPS_NUMBER, possible_size);
xs1.resize(possible_size, 0.0);
xs2.resize(possible_size, 0.0);
ns.resize(possible_size, 0);

View File

@ -126,6 +126,7 @@ BackupWriterS3::BackupWriterS3(
, max_single_read_retries(context_->getSettingsRef().s3_max_single_read_retries)
, read_settings(context_->getReadSettings())
, rw_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).rw_settings)
, log(&Poco::Logger::get("BackupWriterS3"))
{
rw_settings.updateFromSettingsIfEmpty(context_->getSettingsRef());
}
@ -146,9 +147,12 @@ void BackupWriterS3::copyObjectImpl(
const String & src_key,
const String & dst_bucket,
const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head,
std::optional<ObjectAttributes> metadata) const
const Aws::S3::Model::HeadObjectResult & head,
const std::optional<ObjectAttributes> & metadata) const
{
size_t size = head.GetContentLength();
LOG_TRACE(log, "Copying {} bytes using single-operation copy", size);
Aws::S3::Model::CopyObjectRequest request;
request.SetCopySource(src_bucket + "/" + src_key);
request.SetBucket(dst_bucket);
@ -186,13 +190,11 @@ void BackupWriterS3::copyObjectMultipartImpl(
const String & src_key,
const String & dst_bucket,
const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head,
std::optional<ObjectAttributes> metadata) const
const Aws::S3::Model::HeadObjectResult & head,
const std::optional<ObjectAttributes> & metadata) const
{
if (!head)
head = requestObjectHeadData(src_bucket, src_key).GetResult();
size_t size = head->GetContentLength();
size_t size = head.GetContentLength();
LOG_TRACE(log, "Copying {} bytes using multipart upload copy", size);
String multipart_upload_id;
@ -213,16 +215,20 @@ void BackupWriterS3::copyObjectMultipartImpl(
std::vector<String> part_tags;
size_t position = 0;
size_t upload_part_size = rw_settings.min_upload_part_size;
for (size_t position = 0, part_number = 1; position < size; ++part_number, position += upload_part_size)
for (size_t part_number = 1; position < size; ++part_number)
{
size_t next_position = std::min(position + upload_part_size, size);
Aws::S3::Model::UploadPartCopyRequest part_request;
part_request.SetCopySource(src_bucket + "/" + src_key);
part_request.SetBucket(dst_bucket);
part_request.SetKey(dst_key);
part_request.SetUploadId(multipart_upload_id);
part_request.SetPartNumber(static_cast<int>(part_number));
part_request.SetCopySourceRange(fmt::format("bytes={}-{}", position, std::min(size, position + upload_part_size) - 1));
part_request.SetCopySourceRange(fmt::format("bytes={}-{}", position, next_position - 1));
auto outcome = client->UploadPartCopy(part_request);
if (!outcome.IsSuccess())
@ -239,6 +245,14 @@ void BackupWriterS3::copyObjectMultipartImpl(
auto etag = outcome.GetResult().GetCopyPartResult().GetETag();
part_tags.push_back(etag);
position = next_position;
if (part_number % rw_settings.upload_part_size_multiply_parts_count_threshold == 0)
{
upload_part_size *= rw_settings.upload_part_size_multiply_factor;
upload_part_size = std::min(upload_part_size, rw_settings.max_upload_part_size);
}
}
{
@ -280,15 +294,14 @@ void BackupWriterS3::copyFileNative(DiskPtr from_disk, const String & file_name_
auto file_path = fs::path(s3_uri.key) / file_name_to;
auto head = requestObjectHeadData(source_bucket, objects[0].absolute_path).GetResult();
static constexpr int64_t multipart_upload_threashold = 5UL * 1024 * 1024 * 1024;
if (head.GetContentLength() >= multipart_upload_threashold)
if (static_cast<size_t>(head.GetContentLength()) < rw_settings.max_single_operation_copy_size)
{
copyObjectMultipartImpl(
copyObjectImpl(
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, head);
}
else
{
copyObjectImpl(
copyObjectMultipartImpl(
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, head);
}
}

View File

@ -61,7 +61,6 @@ public:
void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to) override;
private:
Aws::S3::Model::HeadObjectOutcome requestObjectHeadData(const std::string & bucket_from, const std::string & key) const;
void copyObjectImpl(
@ -69,22 +68,23 @@ private:
const String & src_key,
const String & dst_bucket,
const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
std::optional<ObjectAttributes> metadata = std::nullopt) const;
const Aws::S3::Model::HeadObjectResult & head,
const std::optional<ObjectAttributes> & metadata = std::nullopt) const;
void copyObjectMultipartImpl(
const String & src_bucket,
const String & src_key,
const String & dst_bucket,
const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
std::optional<ObjectAttributes> metadata = std::nullopt) const;
const Aws::S3::Model::HeadObjectResult & head,
const std::optional<ObjectAttributes> & metadata = std::nullopt) const;
S3::URI s3_uri;
std::shared_ptr<Aws::S3::S3Client> client;
UInt64 max_single_read_retries;
ReadSettings read_settings;
S3Settings::ReadWriteSettings rw_settings;
Poco::Logger * log;
};
}

View File

@ -12,6 +12,7 @@
M(FailedQuery, "Number of failed queries.") \
M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.") \
M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.") \
M(FailedAsyncInsertQuery, "Number of failed ASYNC INSERT queries.") \
M(QueryTimeMicroseconds, "Total time of all queries.") \
M(SelectQueryTimeMicroseconds, "Total time of SELECT queries.") \
M(InsertQueryTimeMicroseconds, "Total time of INSERT queries.") \

View File

@ -2,21 +2,19 @@
#include "config.h"
#if USE_BASE64
# include <Columns/ColumnConst.h>
# include <Common/MemorySanitizer.h>
# include <Columns/ColumnFixedString.h>
# include <Columns/ColumnString.h>
# include <DataTypes/DataTypeString.h>
# include <Functions/FunctionFactory.h>
# include <Functions/FunctionHelpers.h>
# include <Functions/GatherUtils/Algorithms.h>
# include <IO/WriteHelpers.h>
# include <Functions/IFunction.h>
# include <Interpreters/Context_fwd.h>
# include <turbob64.h>
# include <Common/MemorySanitizer.h>
# include <span>
namespace DB
{
using namespace GatherUtils;
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
@ -25,33 +23,86 @@ namespace ErrorCodes
extern const int INCORRECT_DATA;
}
namespace Detail
{
inline size_t base64Decode(const std::span<const UInt8> src, UInt8 * dst)
{
# if defined(__aarch64__)
return tb64sdec(reinterpret_cast<const uint8_t *>(src.data()), src.size(), reinterpret_cast<uint8_t *>(dst));
# else
return _tb64d(reinterpret_cast<const uint8_t *>(src.data()), src.size(), reinterpret_cast<uint8_t *>(dst));
# endif
}
}
struct Base64Encode
{
static constexpr auto name = "base64Encode";
static size_t getBufferSize(size_t string_length, size_t string_count)
static size_t getBufferSize(const size_t string_length, const size_t string_count)
{
return ((string_length - string_count) / 3 + string_count) * 4 + string_count;
}
static size_t performCoding(const std::span<const UInt8> src, UInt8 * dst)
{
/*
* Some bug in sse arm64 implementation?
* `base64Encode(repeat('a', 46))` returns wrong padding character
*/
# if defined(__aarch64__)
return tb64senc(reinterpret_cast<const uint8_t *>(src.data()), src.size(), reinterpret_cast<uint8_t *>(dst));
# else
return _tb64e(reinterpret_cast<const uint8_t *>(src.data()), src.size(), reinterpret_cast<uint8_t *>(dst));
# endif
}
};
struct Base64Decode
{
static constexpr auto name = "base64Decode";
static size_t getBufferSize(size_t string_length, size_t string_count)
static size_t getBufferSize(const size_t string_length, const size_t string_count)
{
return ((string_length - string_count) / 4 + string_count) * 3 + string_count;
}
static size_t performCoding(const std::span<const UInt8> src, UInt8 * dst)
{
const auto outlen = Detail::base64Decode(src, dst);
if (src.size() > 0 && !outlen)
throw Exception(
ErrorCodes::INCORRECT_DATA,
"Failed to {} input '{}'",
name,
String(reinterpret_cast<const char *>(src.data()), src.size()));
return outlen;
}
};
struct TryBase64Decode
{
static constexpr auto name = "tryBase64Decode";
static size_t getBufferSize(size_t string_length, size_t string_count)
static size_t getBufferSize(const size_t string_length, const size_t string_count)
{
return Base64Decode::getBufferSize(string_length, string_count);
}
static size_t performCoding(const std::span<const UInt8> src, UInt8 * dst)
{
if (src.empty())
return 0;
const auto outlen = Detail::base64Decode(src, dst);
// during decoding character array can be partially polluted
// if fail, revert back and clean
if (!outlen)
*dst = 0;
return outlen;
}
};
template <typename Func>
@ -71,99 +122,60 @@ public:
if (arguments.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong number of arguments for function {}: 1 expected.", getName());
if (!WhichDataType(arguments[0].type).isString())
if (!WhichDataType(arguments[0].type).isStringOrFixedString())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of 1st argument of function {}. Must be String.",
arguments[0].type->getName(), getName());
"Illegal type {} of 1st argument of function {}. Must be FixedString or String.",
arguments[0].type->getName(),
getName());
return std::make_shared<DataTypeString>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, const size_t input_rows_count) const override
{
const ColumnPtr column_string = arguments[0].column;
const ColumnString * input = checkAndGetColumn<ColumnString>(column_string.get());
const auto & input_column = arguments[0].column;
if (const auto * src_column_as_fixed_string = checkAndGetColumn<ColumnFixedString>(*input_column))
return execute(*src_column_as_fixed_string, input_rows_count);
else if (const auto * src_column_as_string = checkAndGetColumn<ColumnString>(*input_column))
return execute(*src_column_as_string, input_rows_count);
if (!input)
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Illegal column {} of first argument of function {}, must be of type String",
arguments[0].column->getName(), getName());
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Illegal column {} of first argument of function {}, must be of type FixedString or String.",
input_column->getName(),
getName());
}
private:
static ColumnPtr execute(const ColumnString & src_column, const size_t src_row_count)
{
auto dst_column = ColumnString::create();
auto & dst_data = dst_column->getChars();
auto & dst_chars = dst_column->getChars();
auto & dst_offsets = dst_column->getOffsets();
size_t reserve = Func::getBufferSize(input->getChars().size(), input->size());
dst_data.resize(reserve);
dst_offsets.resize(input_rows_count);
const auto reserve = Func::getBufferSize(src_column.byteSize(), src_column.size());
dst_chars.resize(reserve);
dst_offsets.resize(src_row_count);
const ColumnString::Offsets & src_offsets = input->getOffsets();
const auto & src_chars = src_column.getChars();
const auto & src_offsets = src_column.getOffsets();
const auto * source = input->getChars().data();
auto * dst = dst_data.data();
auto * dst = dst_chars.data();
auto * dst_pos = dst;
const auto * src = src_chars.data();
size_t src_offset_prev = 0;
for (size_t row = 0; row < input_rows_count; ++row)
for (size_t row = 0; row < src_row_count; ++row)
{
size_t srclen = src_offsets[row] - src_offset_prev - 1;
size_t outlen = 0;
if constexpr (std::is_same_v<Func, Base64Encode>)
{
/*
* Some bug in sse arm64 implementation?
* `base64Encode(repeat('a', 46))` returns wrong padding character
*/
#if defined(__aarch64__)
outlen = tb64senc(reinterpret_cast<const uint8_t *>(source), srclen, reinterpret_cast<uint8_t *>(dst_pos));
#else
outlen = _tb64e(reinterpret_cast<const uint8_t *>(source), srclen, reinterpret_cast<uint8_t *>(dst_pos));
#endif
}
else if constexpr (std::is_same_v<Func, Base64Decode>)
{
if (srclen > 0)
{
#if defined(__aarch64__)
outlen = tb64sdec(reinterpret_cast<const uint8_t *>(source), srclen, reinterpret_cast<uint8_t *>(dst_pos));
#else
outlen = _tb64d(reinterpret_cast<const uint8_t *>(source), srclen, reinterpret_cast<uint8_t *>(dst_pos));
#endif
if (!outlen)
throw Exception(
ErrorCodes::INCORRECT_DATA,
"Failed to {} input '{}'",
getName(), String(reinterpret_cast<const char *>(source), srclen));
}
}
else
{
if (srclen > 0)
{
// during decoding character array can be partially polluted
// if fail, revert back and clean
auto * savepoint = dst_pos;
outlen = _tb64d(reinterpret_cast<const uint8_t *>(source), srclen, reinterpret_cast<uint8_t *>(dst_pos));
if (!outlen)
{
outlen = 0;
dst_pos = savepoint; //-V1048
// clean the symbol
dst_pos[0] = 0;
}
}
}
const size_t src_length = src_offsets[row] - src_offset_prev - 1;
const auto outlen = Func::performCoding({src, src_length}, dst_pos);
/// Base64 library is using AVX-512 with some shuffle operations.
/// Memory sanitizer don't understand if there was uninitialized memory in SIMD register but it was not used in the result of shuffle.
__msan_unpoison(dst_pos, outlen);
source += srclen + 1;
src += src_length + 1;
dst_pos += outlen;
*dst_pos = '\0';
dst_pos += 1;
@ -172,8 +184,44 @@ public:
src_offset_prev = src_offsets[row];
}
dst_data.resize(dst_pos - dst);
dst_chars.resize(dst_pos - dst);
return dst_column;
}
static ColumnPtr execute(const ColumnFixedString & src_column, const size_t src_row_count)
{
auto dst_column = ColumnString::create();
auto & dst_chars = dst_column->getChars();
auto & dst_offsets = dst_column->getOffsets();
const auto reserve = Func::getBufferSize(src_column.byteSize(), src_column.size());
dst_chars.resize(reserve);
dst_offsets.resize(src_row_count);
const auto & src_chars = src_column.getChars();
const auto & src_n = src_column.getN();
auto * dst = dst_chars.data();
auto * dst_pos = dst;
const auto * src = src_chars.data();
for (size_t row = 0; row < src_row_count; ++row)
{
const auto outlen = Func::performCoding({src, src_n}, dst_pos);
/// Base64 library is using AVX-512 with some shuffle operations.
/// Memory sanitizer don't understand if there was uninitialized memory in SIMD register but it was not used in the result of shuffle.
__msan_unpoison(dst_pos, outlen);
src += src_n;
dst_pos += outlen;
*dst_pos = '\0';
dst_pos += 1;
dst_offsets[row] = dst_pos - dst;
}
dst_chars.resize(dst_pos - dst);
return dst_column;
}
};

View File

@ -38,18 +38,21 @@ public:
{
if (!isStringOrFixedString(arguments[0]))
throw Exception(
"Illegal type " + arguments[0]->getName() + " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of first argument of function {}",
arguments[0]->getName(), getName());
if (!isStringOrFixedString(arguments[1]))
throw Exception(
"Illegal type " + arguments[1]->getName() + " of second argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of second argument of function {}",
arguments[1]->getName(), getName());
if (!isStringOrFixedString(arguments[2]))
throw Exception(
"Illegal type " + arguments[2]->getName() + " of third argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of third argument of function {}",
arguments[2]->getName(), getName());
return std::make_shared<DataTypeString>();
}
@ -61,7 +64,10 @@ public:
const ColumnPtr column_replacement = arguments[2].column;
if (!isColumnConst(*column_needle) || !isColumnConst(*column_replacement))
throw Exception("2nd and 3rd arguments of function " + getName() + " must be constants.", ErrorCodes::ILLEGAL_COLUMN);
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"2nd and 3rd arguments of function {} must be constants.",
getName());
const IColumn * c1 = arguments[1].column.get();
const IColumn * c2 = arguments[2].column.get();
@ -71,7 +77,9 @@ public:
String replacement = c2_const->getValue<String>();
if (needle.empty())
throw Exception("Length of the second argument of function replace must be greater than 0.", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
throw Exception(
ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"Length of the second argument of function replace must be greater than 0.");
if (const ColumnString * col = checkAndGetColumn<ColumnString>(column_src.get()))
{
@ -87,8 +95,9 @@ public:
}
else
throw Exception(
"Illegal column " + arguments[0].column->getName() + " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
ErrorCodes::ILLEGAL_COLUMN,
"Illegal column {} of first argument of function {}",
arguments[0].column->getName(), getName());
}
};

View File

@ -91,7 +91,7 @@ struct MultiMatchAllIndicesImpl
hs_error_t err = hs_clone_scratch(regexps->getScratch(), &scratch);
if (err != HS_SUCCESS)
throw Exception("Could not clone scratch space for hyperscan", ErrorCodes::CANNOT_ALLOCATE_MEMORY);
throw Exception("Could not clone scratch space for vectorscan", ErrorCodes::CANNOT_ALLOCATE_MEMORY);
MultiRegexps::ScratchPtr smart_scratch(scratch);
@ -203,7 +203,7 @@ struct MultiMatchAllIndicesImpl
hs_error_t err = hs_clone_scratch(regexps->getScratch(), &scratch);
if (err != HS_SUCCESS)
throw Exception("Could not clone scratch space for hyperscan", ErrorCodes::CANNOT_ALLOCATE_MEMORY);
throw Exception("Could not clone scratch space for vectorscan", ErrorCodes::CANNOT_ALLOCATE_MEMORY);
MultiRegexps::ScratchPtr smart_scratch(scratch);

View File

@ -38,6 +38,7 @@ namespace ErrorCodes
namespace Regexps
{
using Regexp = OptimizedRegularExpressionSingleThreaded;
using RegexpPtr = std::shared_ptr<Regexp>;
@ -112,11 +113,11 @@ struct HyperscanDeleter
};
/// Helper unique pointers to correctly delete the allocated space when hyperscan cannot compile something and we throw an exception.
using CompilerError = std::unique_ptr<hs_compile_error_t, HyperscanDeleter<decltype(&hs_free_compile_error), &hs_free_compile_error>>;
using CompilerErrorPtr = std::unique_ptr<hs_compile_error_t, HyperscanDeleter<decltype(&hs_free_compile_error), &hs_free_compile_error>>;
using ScratchPtr = std::unique_ptr<hs_scratch_t, HyperscanDeleter<decltype(&hs_free_scratch), &hs_free_scratch>>;
using DataBasePtr = std::unique_ptr<hs_database_t, HyperscanDeleter<decltype(&hs_free_database), &hs_free_database>>;
/// Database is thread safe across multiple threads and Scratch is not but we can copy it whenever we use it in the searcher.
/// Database is immutable/thread-safe across multiple threads. Scratch is not but we can copy it whenever we use it in the searcher.
class Regexps
{
public:
@ -154,7 +155,7 @@ private:
using DeferredConstructedRegexpsPtr = std::shared_ptr<DeferredConstructedRegexps>;
template <bool save_indices, bool WithEditDistance>
template <bool save_indices, bool with_edit_distance>
inline Regexps constructRegexps(const std::vector<String> & str_patterns, [[maybe_unused]] std::optional<UInt32> edit_distance)
{
/// Common pointers
@ -168,7 +169,7 @@ inline Regexps constructRegexps(const std::vector<String> & str_patterns, [[mayb
patterns.reserve(str_patterns.size());
flags.reserve(str_patterns.size());
if constexpr (WithEditDistance)
if constexpr (with_edit_distance)
{
ext_exprs.reserve(str_patterns.size());
ext_exprs_ptrs.reserve(str_patterns.size());
@ -186,7 +187,7 @@ inline Regexps constructRegexps(const std::vector<String> & str_patterns, [[mayb
* as it is said in the Hyperscan documentation. https://intel.github.io/hyperscan/dev-reference/performance.html#single-match-flag
*/
flags.push_back(HS_FLAG_DOTALL | HS_FLAG_SINGLEMATCH | HS_FLAG_ALLOWEMPTY | HS_FLAG_UTF8);
if constexpr (WithEditDistance)
if constexpr (with_edit_distance)
{
/// Hyperscan currently does not support UTF8 matching with edit distance.
flags.back() &= ~HS_FLAG_UTF8;
@ -211,7 +212,7 @@ inline Regexps constructRegexps(const std::vector<String> & str_patterns, [[mayb
}
hs_error_t err;
if constexpr (!WithEditDistance)
if constexpr (!with_edit_distance)
err = hs_compile_multi(
patterns.data(),
flags.data(),
@ -236,7 +237,7 @@ inline Regexps constructRegexps(const std::vector<String> & str_patterns, [[mayb
if (err != HS_SUCCESS)
{
/// CompilerError is a unique_ptr, so correct memory free after the exception is thrown.
CompilerError error(compile_error);
CompilerErrorPtr error(compile_error);
if (error->expression < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, String(error->message));
@ -253,7 +254,7 @@ inline Regexps constructRegexps(const std::vector<String> & str_patterns, [[mayb
/// If not HS_SUCCESS, it is guaranteed that the memory would not be allocated for scratch.
if (err != HS_SUCCESS)
throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not allocate scratch space for hyperscan");
throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not allocate scratch space for vectorscan");
return {db, scratch};
}
@ -288,9 +289,9 @@ struct GlobalCacheTable
}
};
/// If WithEditDistance is False, edit_distance must be nullopt. Also, we use templates here because each instantiation of function template
/// If with_edit_distance is False, edit_distance must be nullopt. Also, we use templates here because each instantiation of function template
/// has its own copy of local static variables which must not be the same for different hyperscan compilations.
template <bool save_indices, bool WithEditDistance>
template <bool save_indices, bool with_edit_distance>
inline DeferredConstructedRegexpsPtr getOrSet(const std::vector<std::string_view> & patterns, std::optional<UInt32> edit_distance)
{
static GlobalCacheTable pool; /// Different variables for different pattern parameters, thread-safe in C++11
@ -320,7 +321,7 @@ inline DeferredConstructedRegexpsPtr getOrSet(const std::vector<std::string_view
auto deferred_constructed_regexps = std::make_shared<DeferredConstructedRegexps>(
[str_patterns, edit_distance]()
{
return constructRegexps<save_indices, WithEditDistance>(str_patterns, edit_distance);
return constructRegexps<save_indices, with_edit_distance>(str_patterns, edit_distance);
});
bucket = {std::move(str_patterns), edit_distance, deferred_constructed_regexps};
}
@ -331,7 +332,7 @@ inline DeferredConstructedRegexpsPtr getOrSet(const std::vector<std::string_view
auto deferred_constructed_regexps = std::make_shared<DeferredConstructedRegexps>(
[str_patterns, edit_distance]()
{
return constructRegexps<save_indices, WithEditDistance>(str_patterns, edit_distance);
return constructRegexps<save_indices, with_edit_distance>(str_patterns, edit_distance);
});
bucket = {std::move(str_patterns), edit_distance, deferred_constructed_regexps};
}

View File

@ -1,7 +1,6 @@
#pragma once
#include <base/types.h>
#include <Common/Volnitsky.h>
#include <Columns/ColumnString.h>
#include <IO/WriteHelpers.h>
@ -17,131 +16,130 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
struct ReplaceRegexpTraits
{
enum class Replace
{
First,
All
};
};
/** Replace all matches of regexp 'needle' to string 'replacement'. 'needle' and 'replacement' are constants.
* 'replacement' could contain substitutions, for example: '\2-\3-\1'
* 'replacement' can contain substitutions, for example: '\2-\3-\1'
*/
template <bool replace_one = false>
template <ReplaceRegexpTraits::Replace replace>
struct ReplaceRegexpImpl
{
/// Sequence of instructions, describing how to get resulting string.
struct Instruction
{
/// If not negative - perform substitution of n-th subpattern from the regexp match.
/// If not negative, perform substitution of n-th subpattern from the regexp match.
int substitution_num = -1;
/// Otherwise - paste this string verbatim.
std::string literal;
/// Otherwise, paste this literal string verbatim.
String literal;
Instruction(int substitution_num_) : substitution_num(substitution_num_) {} /// NOLINT
Instruction(std::string literal_) : literal(std::move(literal_)) {} /// NOLINT
explicit Instruction(int substitution_num_) : substitution_num(substitution_num_) {}
explicit Instruction(String literal_) : literal(std::move(literal_)) {}
};
/// Decomposes the replacement string into a sequence of substitutions and literals.
/// E.g. "abc\1de\2fg\1\2" --> inst("abc"), inst(1), inst("de"), inst(2), inst("fg"), inst(1), inst(2)
using Instructions = std::vector<Instruction>;
static const size_t max_captures = 10;
static constexpr int max_captures = 10;
static Instructions createInstructions(const std::string & s, int num_captures)
static Instructions createInstructions(std::string_view replacement, int num_captures)
{
Instructions instructions;
String now;
for (size_t i = 0; i < s.size(); ++i)
String literals;
for (size_t i = 0; i < replacement.size(); ++i)
{
if (s[i] == '\\' && i + 1 < s.size())
if (replacement[i] == '\\' && i + 1 < replacement.size())
{
if (isNumericASCII(s[i + 1])) /// Substitution
if (isNumericASCII(replacement[i + 1])) /// Substitution
{
if (!now.empty())
if (!literals.empty())
{
instructions.emplace_back(now);
now = "";
instructions.emplace_back(literals);
literals = "";
}
instructions.emplace_back(s[i + 1] - '0');
instructions.emplace_back(replacement[i + 1] - '0');
}
else
now += s[i + 1]; /// Escaping
literals += replacement[i + 1]; /// Escaping
++i;
}
else
now += s[i]; /// Plain character
literals += replacement[i]; /// Plain character
}
if (!now.empty())
{
instructions.emplace_back(now);
now = "";
}
if (!literals.empty())
instructions.emplace_back(literals);
for (const auto & it : instructions)
if (it.substitution_num >= num_captures)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Invalid replace instruction in replacement string. Id: {}, but regexp has only {} subpatterns",
it.substitution_num, num_captures - 1);
for (const auto & instr : instructions)
if (instr.substitution_num >= num_captures)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Id {} in replacement string is an invalid substitution, regexp has only {} capturing groups",
instr.substitution_num, num_captures - 1);
return instructions;
}
static void processString(
const re2_st::StringPiece & input,
const char * haystack_data,
size_t haystack_length,
ColumnString::Chars & res_data,
ColumnString::Offset & res_offset,
re2_st::RE2 & searcher,
const re2_st::RE2 & searcher,
int num_captures,
const Instructions & instructions)
{
re2_st::StringPiece haystack(haystack_data, haystack_length);
re2_st::StringPiece matches[max_captures];
size_t copy_pos = 0;
size_t match_pos = 0;
while (match_pos < static_cast<size_t>(input.length()))
while (match_pos < haystack_length)
{
/// If no more replacements possible for current string
bool can_finish_current_string = false;
if (searcher.Match(input, match_pos, input.length(), re2_st::RE2::Anchor::UNANCHORED, matches, num_captures))
if (searcher.Match(haystack, match_pos, haystack_length, re2_st::RE2::Anchor::UNANCHORED, matches, num_captures))
{
const auto & match = matches[0];
size_t bytes_to_copy = (match.data() - input.data()) - copy_pos;
const auto & match = matches[0]; /// Complete match (\0)
size_t bytes_to_copy = (match.data() - haystack.data()) - copy_pos;
/// Copy prefix before matched regexp without modification
/// Copy prefix before current match without modification
res_data.resize(res_data.size() + bytes_to_copy);
memcpySmallAllowReadWriteOverflow15(&res_data[res_offset], input.data() + copy_pos, bytes_to_copy);
memcpySmallAllowReadWriteOverflow15(&res_data[res_offset], haystack.data() + copy_pos, bytes_to_copy);
res_offset += bytes_to_copy;
copy_pos += bytes_to_copy + match.length();
match_pos = copy_pos;
/// Do substitution instructions
for (const auto & it : instructions)
/// Substitute inside current match using instructions
for (const auto & instr : instructions)
{
if (it.substitution_num >= 0)
{
const auto & substitution = matches[it.substitution_num];
res_data.resize(res_data.size() + substitution.length());
memcpy(&res_data[res_offset], substitution.data(), substitution.length());
res_offset += substitution.length();
}
std::string_view replacement;
if (instr.substitution_num >= 0)
replacement = std::string_view(matches[instr.substitution_num].data(), matches[instr.substitution_num].size());
else
{
const auto & literal = it.literal;
res_data.resize(res_data.size() + literal.size());
memcpy(&res_data[res_offset], literal.data(), literal.size());
res_offset += literal.size();
}
replacement = instr.literal;
res_data.resize(res_data.size() + replacement.size());
memcpy(&res_data[res_offset], replacement.data(), replacement.size());
res_offset += replacement.size();
}
if (replace_one)
if constexpr (replace == ReplaceRegexpTraits::Replace::First)
can_finish_current_string = true;
if (match.length() == 0)
if (match.empty())
{
/// Step one character to avoid infinite loop
++match_pos;
if (match_pos >= static_cast<size_t>(input.length()))
if (match_pos >= haystack_length)
can_finish_current_string = true;
}
}
@ -151,10 +149,10 @@ struct ReplaceRegexpImpl
/// If ready, append suffix after match to end of string.
if (can_finish_current_string)
{
res_data.resize(res_data.size() + input.length() - copy_pos);
memcpySmallAllowReadWriteOverflow15(&res_data[res_offset], input.data() + copy_pos, input.length() - copy_pos);
res_offset += input.length() - copy_pos;
copy_pos = input.length();
res_data.resize(res_data.size() + haystack_length - copy_pos);
memcpySmallAllowReadWriteOverflow15(&res_data[res_offset], haystack.data() + copy_pos, haystack_length - copy_pos);
res_offset += haystack_length - copy_pos;
copy_pos = haystack_length;
match_pos = copy_pos;
}
}
@ -164,12 +162,11 @@ struct ReplaceRegexpImpl
++res_offset;
}
static void vector(
const ColumnString::Chars & data,
const ColumnString::Offsets & offsets,
const std::string & needle,
const std::string & replacement,
const String & needle,
const String & replacement,
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets)
{
@ -178,11 +175,19 @@ struct ReplaceRegexpImpl
size_t size = offsets.size();
res_offsets.resize(size);
typename re2_st::RE2::Options regexp_options;
/// Never write error messages to stderr. It's ignorant to do it from library code.
re2_st::RE2::Options regexp_options;
/// Don't write error messages to stderr.
regexp_options.set_log_errors(false);
re2_st::RE2 searcher(needle, regexp_options);
int num_captures = std::min(searcher.NumberOfCapturingGroups() + 1, static_cast<int>(max_captures));
if (!searcher.ok())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"The pattern argument is not a valid re2 pattern: {}",
searcher.error());
int num_captures = std::min(searcher.NumberOfCapturingGroups() + 1, max_captures);
Instructions instructions = createInstructions(replacement, num_captures);
@ -190,9 +195,10 @@ struct ReplaceRegexpImpl
for (size_t i = 0; i < size; ++i)
{
size_t from = i > 0 ? offsets[i - 1] : 0;
re2_st::StringPiece input(reinterpret_cast<const char *>(data.data() + from), offsets[i] - from - 1);
const char * haystack_data = reinterpret_cast<const char *>(data.data() + from);
const size_t haystack_length = static_cast<unsigned>(offsets[i] - from - 1);
processString(input, res_data, res_offset, searcher, num_captures, instructions);
processString(haystack_data, haystack_length, res_data, res_offset, searcher, num_captures, instructions);
res_offsets[i] = res_offset;
}
}
@ -200,8 +206,8 @@ struct ReplaceRegexpImpl
static void vectorFixed(
const ColumnString::Chars & data,
size_t n,
const std::string & needle,
const std::string & replacement,
const String & needle,
const String & replacement,
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets)
{
@ -210,20 +216,29 @@ struct ReplaceRegexpImpl
res_data.reserve(data.size());
res_offsets.resize(size);
typename re2_st::RE2::Options regexp_options;
/// Never write error messages to stderr. It's ignorant to do it from library code.
re2_st::RE2::Options regexp_options;
/// Don't write error messages to stderr.
regexp_options.set_log_errors(false);
re2_st::RE2 searcher(needle, regexp_options);
int num_captures = std::min(searcher.NumberOfCapturingGroups() + 1, static_cast<int>(max_captures));
if (!searcher.ok())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"The pattern argument is not a valid re2 pattern: {}",
searcher.error());
int num_captures = std::min(searcher.NumberOfCapturingGroups() + 1, max_captures);
Instructions instructions = createInstructions(replacement, num_captures);
for (size_t i = 0; i < size; ++i)
{
size_t from = i * n;
re2_st::StringPiece input(reinterpret_cast<const char *>(data.data() + from), n);
const char * haystack_data = reinterpret_cast<const char *>(data.data() + from);
const size_t haystack_length = n;
processString(input, res_data, res_offset, searcher, num_captures, instructions);
processString(haystack_data, haystack_length, res_data, res_offset, searcher, num_captures, instructions);
res_offsets[i] = res_offset;
}
}

View File

@ -8,9 +8,17 @@
namespace DB
{
struct ReplaceStringTraits
{
enum class Replace
{
First,
All
};
};
/** Replace one or all occurencies of substring 'needle' to 'replacement'. 'needle' and 'replacement' are constants.
*/
template <bool replace_one = false>
template <ReplaceStringTraits::Replace replace>
struct ReplaceStringImpl
{
static void vector(
@ -66,7 +74,7 @@ struct ReplaceStringImpl
memcpy(&res_data[res_offset], replacement.data(), replacement.size());
res_offset += replacement.size();
pos = match + needle.size();
if (replace_one)
if constexpr (replace == ReplaceStringTraits::Replace::First)
can_finish_current_string = true;
}
else
@ -155,7 +163,7 @@ struct ReplaceStringImpl
memcpy(&res_data[res_offset], replacement.data(), replacement.size());
res_offset += replacement.size();
pos = match + needle.size();
if (replace_one || pos == begin + n * (i + 1))
if (replace == ReplaceStringTraits::Replace::First || pos == begin + n * (i + 1))
can_finish_current_string = true;
}
else

View File

@ -1,8 +1,7 @@
#include <Functions/FunctionBase64Conversion.h>
#if USE_BASE64
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypeString.h>
namespace DB
{
@ -15,4 +14,5 @@ REGISTER_FUNCTION(Base64Decode)
factory.registerAlias("FROM_BASE64", "base64Decode", FunctionFactory::CaseInsensitive);
}
}
#endif

View File

@ -1,10 +1,7 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionBase64Conversion.h>
#include "config.h"
#if USE_BASE64
# include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
namespace DB
{
@ -17,4 +14,5 @@ REGISTER_FUNCTION(Base64Encode)
factory.registerAlias("TO_BASE64", "base64Encode", FunctionFactory::CaseInsensitive);
}
}
#endif

View File

@ -13,7 +13,7 @@ struct NameReplaceAll
static constexpr auto name = "replaceAll";
};
using FunctionReplaceAll = FunctionStringReplace<ReplaceStringImpl<false>, NameReplaceAll>;
using FunctionReplaceAll = FunctionStringReplace<ReplaceStringImpl<ReplaceStringTraits::Replace::All>, NameReplaceAll>;
}

View File

@ -13,7 +13,7 @@ struct NameReplaceOne
static constexpr auto name = "replaceOne";
};
using FunctionReplaceOne = FunctionStringReplace<ReplaceStringImpl<true>, NameReplaceOne>;
using FunctionReplaceOne = FunctionStringReplace<ReplaceStringImpl<ReplaceStringTraits::Replace::First>, NameReplaceOne>;
}

View File

@ -13,7 +13,7 @@ struct NameReplaceRegexpAll
static constexpr auto name = "replaceRegexpAll";
};
using FunctionReplaceRegexpAll = FunctionStringReplace<ReplaceRegexpImpl<false>, NameReplaceRegexpAll>;
using FunctionReplaceRegexpAll = FunctionStringReplace<ReplaceRegexpImpl<ReplaceRegexpTraits::Replace::All>, NameReplaceRegexpAll>;
}

View File

@ -13,7 +13,7 @@ struct NameReplaceRegexpOne
static constexpr auto name = "replaceRegexpOne";
};
using FunctionReplaceRegexpOne = FunctionStringReplace<ReplaceRegexpImpl<true>, NameReplaceRegexpOne>;
using FunctionReplaceRegexpOne = FunctionStringReplace<ReplaceRegexpImpl<ReplaceRegexpTraits::Replace::First>, NameReplaceRegexpOne>;
}

View File

@ -1,7 +1,7 @@
#include <Functions/FunctionBase64Conversion.h>
#if USE_BASE64
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypeString.h>
namespace DB
{
@ -10,4 +10,5 @@ REGISTER_FUNCTION(TryBase64Decode)
factory.registerFunction<FunctionBase64Conversion<TryBase64Decode>>();
}
}
#endif

View File

@ -528,16 +528,17 @@ namespace detail
auto on_retriable_error = [&]()
{
retry_with_range_header = true;
impl.reset();
auto http_session = session->getSession();
http_session->reset();
sleepForMilliseconds(milliseconds_to_wait);
retry_with_range_header = true;
impl.reset();
auto http_session = session->getSession();
http_session->reset();
sleepForMilliseconds(milliseconds_to_wait);
};
for (size_t i = 0; i < settings.http_max_tries; ++i)
{
exception = nullptr;
initialization_error = InitializeError::NONE;
try
{

View File

@ -123,7 +123,10 @@ void WriteBufferFromS3::nextImpl()
void WriteBufferFromS3::allocateBuffer()
{
if (total_parts_uploaded != 0 && total_parts_uploaded % s3_settings.upload_part_size_multiply_parts_count_threshold == 0)
{
upload_part_size *= s3_settings.upload_part_size_multiply_factor;
upload_part_size = std::min(upload_part_size, s3_settings.max_upload_part_size);
}
temporary_buffer = Aws::MakeShared<Aws::StringStream>("temporary buffer");
temporary_buffer->exceptions(std::ios::badbit);

View File

@ -37,6 +37,7 @@ namespace ProfileEvents
{
extern const Event AsyncInsertQuery;
extern const Event AsyncInsertBytes;
extern const Event FailedAsyncInsertQuery;
}
namespace DB
@ -101,6 +102,8 @@ void AsynchronousInsertQueue::InsertData::Entry::finish(std::exception_ptr excep
{
std::lock_guard lock(mutex);
finished = true;
if (exception_)
ProfileEvents::increment(ProfileEvents::FailedAsyncInsertQuery, 1);
exception = exception_;
cv.notify_all();
}

View File

@ -217,7 +217,7 @@ bool ClusterDiscovery::needUpdate(const Strings & node_uuids, const NodesInfo &
ClusterPtr ClusterDiscovery::makeCluster(const ClusterInfo & cluster_info)
{
std::vector<std::vector<String>> shards;
std::vector<Strings> shards;
{
std::map<size_t, Strings> replica_adresses;
@ -244,7 +244,7 @@ ClusterPtr ClusterDiscovery::makeCluster(const ClusterInfo & cluster_info)
/* password= */ "",
/* clickhouse_port= */ secure ? context->getTCPPortSecure().value_or(DBMS_DEFAULT_SECURE_PORT) : context->getTCPPort(),
/* treat_local_as_remote= */ false,
/* treat_local_port_as_remote= */ context->getApplicationType() == Context::ApplicationType::LOCAL,
/* treat_local_port_as_remote= */ false, /// should be set only for clickhouse-local, but cluster discovery is not used there
/* secure= */ secure);
return cluster;
}

View File

@ -658,7 +658,9 @@ void HashJoin::initRightBlockStructure(Block & saved_block_sample)
/// Save non key columns
for (auto & column : sample_block_with_columns_to_add)
{
if (!saved_block_sample.findByName(column.name))
if (auto * col = saved_block_sample.findByName(column.name))
*col = column;
else
saved_block_sample.insert(column);
}
}

View File

@ -5,13 +5,23 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h>
#include <Interpreters/Context.h>
#include <base/unit.h>
#include <boost/algorithm/string/predicate.hpp>
namespace DB
{
namespace
{
/// An object up to 5 GB can be copied in a single atomic operation.
constexpr UInt64 DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE = 5_GiB;
/// The maximum size of an uploaded part.
constexpr UInt64 DEFAULT_MAX_UPLOAD_PART_SIZE = 5_GiB;
}
void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, const Settings & settings)
{
std::lock_guard lock(mutex);
@ -50,9 +60,11 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
S3Settings::ReadWriteSettings rw_settings;
rw_settings.max_single_read_retries = get_uint_for_key(key, "max_single_read_retries", true, settings.s3_max_single_read_retries);
rw_settings.min_upload_part_size = get_uint_for_key(key, "min_upload_part_size", true, settings.s3_min_upload_part_size);
rw_settings.max_upload_part_size = get_uint_for_key(key, "max_upload_part_size", true, DEFAULT_MAX_UPLOAD_PART_SIZE);
rw_settings.upload_part_size_multiply_factor = get_uint_for_key(key, "upload_part_size_multiply_factor", true, settings.s3_upload_part_size_multiply_factor);
rw_settings.upload_part_size_multiply_parts_count_threshold = get_uint_for_key(key, "upload_part_size_multiply_parts_count_threshold", true, settings.s3_upload_part_size_multiply_parts_count_threshold);
rw_settings.max_single_part_upload_size = get_uint_for_key(key, "max_single_part_upload_size", true, settings.s3_max_single_part_upload_size);
rw_settings.max_single_operation_copy_size = get_uint_for_key(key, "max_single_operation_copy_size", true, DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE);
rw_settings.max_connections = get_uint_for_key(key, "max_connections", true, settings.s3_max_connections);
rw_settings.check_objects_after_upload = get_bool_for_key(key, "check_objects_after_upload", true, false);
@ -95,12 +107,16 @@ void S3Settings::ReadWriteSettings::updateFromSettingsIfEmpty(const Settings & s
max_single_read_retries = settings.s3_max_single_read_retries;
if (!min_upload_part_size)
min_upload_part_size = settings.s3_min_upload_part_size;
if (!max_upload_part_size)
max_upload_part_size = DEFAULT_MAX_UPLOAD_PART_SIZE;
if (!upload_part_size_multiply_factor)
upload_part_size_multiply_factor = settings.s3_upload_part_size_multiply_factor;
if (!upload_part_size_multiply_parts_count_threshold)
upload_part_size_multiply_parts_count_threshold = settings.s3_upload_part_size_multiply_parts_count_threshold;
if (!max_single_part_upload_size)
max_single_part_upload_size = settings.s3_max_single_part_upload_size;
if (!max_single_operation_copy_size)
max_single_operation_copy_size = DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE;
if (!max_connections)
max_connections = settings.s3_max_connections;
if (!max_unexpected_write_error_retries)

View File

@ -27,9 +27,11 @@ struct S3Settings
{
size_t max_single_read_retries = 0;
size_t min_upload_part_size = 0;
size_t max_upload_part_size = 0;
size_t upload_part_size_multiply_factor = 0;
size_t upload_part_size_multiply_parts_count_threshold = 0;
size_t max_single_part_upload_size = 0;
size_t max_single_operation_copy_size = 0;
size_t max_connections = 0;
bool check_objects_after_upload = false;
size_t max_unexpected_write_error_retries = 0;
@ -41,9 +43,11 @@ struct S3Settings
{
return max_single_read_retries == other.max_single_read_retries
&& min_upload_part_size == other.min_upload_part_size
&& max_upload_part_size == other.max_upload_part_size
&& upload_part_size_multiply_factor == other.upload_part_size_multiply_factor
&& upload_part_size_multiply_parts_count_threshold == other.upload_part_size_multiply_parts_count_threshold
&& max_single_part_upload_size == other.max_single_part_upload_size
&& max_single_operation_copy_size == other.max_single_operation_copy_size
&& max_connections == other.max_connections
&& check_objects_after_upload == other.check_objects_after_upload
&& max_unexpected_write_error_retries == other.max_unexpected_write_error_retries;

View File

@ -280,7 +280,7 @@ namespace
timeouts,
credentials,
settings.max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
settings.max_read_buffer_size,
read_settings,
headers,
ReadWriteBufferFromHTTP::Range{0, std::nullopt},
@ -341,7 +341,7 @@ namespace
timeouts,
credentials,
settings.max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
settings.max_read_buffer_size,
read_settings,
headers,
&context->getRemoteHostFilter(),
@ -378,7 +378,7 @@ namespace
timeouts,
credentials,
settings.max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
settings.max_read_buffer_size,
read_settings,
headers,
ReadWriteBufferFromHTTP::Range{},
@ -863,6 +863,8 @@ std::optional<time_t> IStorageURLBase::getLastModificationTime(
const Poco::Net::HTTPBasicCredentials & credentials,
const ContextPtr & context)
{
auto settings = context->getSettingsRef();
try
{
ReadWriteBufferFromHTTP buf(
@ -871,8 +873,8 @@ std::optional<time_t> IStorageURLBase::getLastModificationTime(
{},
ConnectionTimeouts::getHTTPTimeouts(context),
credentials,
context->getSettingsRef().max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
settings.max_http_get_redirects,
settings.max_read_buffer_size,
context->getReadSettings(),
headers,
ReadWriteBufferFromHTTP::Range{},

View File

@ -3,6 +3,7 @@
import argparse
import csv
import itertools
import logging
import os
from github import Github
@ -37,6 +38,8 @@ def process_result(file_path):
state, report_url, description = post_commit_status_from_file(file_path)
prefix = os.path.basename(os.path.dirname(file_path))
is_ok = state == "success"
if is_ok and report_url == "null":
return is_ok, None
status = f'OK: Bug reproduced (<a href="{report_url}">Report</a>'
if not is_ok:
@ -51,15 +54,23 @@ def process_all_results(file_paths):
for status_path in file_paths:
is_ok, test_results = process_result(status_path)
any_ok = any_ok or is_ok
all_results.extend(test_results)
if test_results is not None:
all_results.extend(test_results)
return any_ok, all_results
def main(args):
logging.basicConfig(level=logging.INFO)
check_name_with_group = "Bugfix validate check"
is_ok, test_results = process_all_results(args.status)
if not test_results:
logging.info("No results to upload")
return
pr_info = PRInfo()
report_url = upload_results(
S3Helper(),

View File

@ -1,3 +1,3 @@
requests
PyJWT
cryptography
cryptography==37.0.4

View File

@ -1,3 +1,3 @@
requests
PyJWT
cryptography
cryptography==37.0.4

View File

@ -1,3 +1,3 @@
requests
PyJWT
cryptography
cryptography==37.0.4

View File

@ -1,3 +1,3 @@
requests
PyJWT
cryptography
cryptography==37.0.4

View File

@ -1,3 +1,3 @@
requests
PyJWT
cryptography
cryptography==37.0.4

View File

@ -0,0 +1,12 @@
<clickhouse>
<s3>
<multipart_upload_copy>
<endpoint>http://minio1:9001/root/data/backups/multipart_upload_copy/</endpoint>
<!-- We set max_single_operation_copy_size=1 here so multipart upload copy will always be chosen for that test. -->
<max_single_operation_copy_size>1</max_single_operation_copy_size>
<min_upload_part_size>5242880</min_upload_part_size>
<upload_part_size_multiply_parts_count_threshold>3</upload_part_size_multiply_parts_count_threshold>
<upload_part_size_multiply_factor>2</upload_part_size_multiply_factor>
</multipart_upload_copy>
</s3>
</clickhouse>

View File

@ -4,7 +4,11 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/disk_s3.xml", "configs/named_collection_s3_backups.xml"],
main_configs=[
"configs/disk_s3.xml",
"configs/named_collection_s3_backups.xml",
"configs/s3_settings.xml",
],
with_minio=True,
)
@ -27,17 +31,17 @@ def new_backup_name():
return f"backup{backup_id_counter}"
def check_backup_and_restore(storage_policy, backup_destination):
def check_backup_and_restore(storage_policy, backup_destination, size=1000):
node.query(
f"""
DROP TABLE IF EXISTS data NO DELAY;
CREATE TABLE data (key Int, value String, array Array(String)) Engine=MergeTree() ORDER BY tuple() SETTINGS storage_policy='{storage_policy}';
INSERT INTO data SELECT * FROM generateRandom('key Int, value String, array Array(String)') LIMIT 1000;
INSERT INTO data SELECT * FROM generateRandom('key Int, value String, array Array(String)') LIMIT {size};
BACKUP TABLE data TO {backup_destination};
RESTORE TABLE data AS data_restored FROM {backup_destination};
SELECT throwIf(
(SELECT groupArray(tuple(*)) FROM data) !=
(SELECT groupArray(tuple(*)) FROM data_restored),
(SELECT count(), sum(sipHash64(*)) FROM data) !=
(SELECT count(), sum(sipHash64(*)) FROM data_restored),
'Data does not matched after BACKUP/RESTORE'
);
DROP TABLE data NO DELAY;
@ -106,9 +110,10 @@ def test_backup_to_s3_native_copy():
)
check_backup_and_restore(storage_policy, backup_destination)
assert node.contains_in_log("using native copy")
assert node.contains_in_log("single-operation copy")
def test_backup_to_s3_other_bucket_native_copy():
def test_backup_to_s3_native_copy_other_bucket():
storage_policy = "policy_s3_other_bucket"
backup_name = new_backup_name()
backup_destination = (
@ -116,3 +121,13 @@ def test_backup_to_s3_other_bucket_native_copy():
)
check_backup_and_restore(storage_policy, backup_destination)
assert node.contains_in_log("using native copy")
assert node.contains_in_log("single-operation copy")
def test_backup_to_s3_native_copy_multipart_upload():
storage_policy = "policy_s3"
backup_name = new_backup_name()
backup_destination = f"S3('http://minio1:9001/root/data/backups/multipart_upload_copy/{backup_name}', 'minio', 'minio123')"
check_backup_and_restore(storage_policy, backup_destination, size=1000000)
assert node.contains_in_log("using native copy")
assert node.contains_in_log("multipart upload copy")

View File

@ -0,0 +1,3 @@
<clickhouse>
<max_server_memory_usage>1000</max_server_memory_usage>
</clickhouse>

View File

@ -0,0 +1,54 @@
import logging
from time import sleep
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node", main_configs=["configs/config.xml"], with_zookeeper=True
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_failed_async_inserts(started_cluster):
node = started_cluster.instances["node"]
node.query(
"CREATE TABLE async_insert_30_10_2022 (id UInt32, s String) ENGINE = Memory"
)
node.query(
"INSERT INTO async_insert_30_10_2022 SETTINGS async_insert = 1 VALUES ()",
ignore_error=True,
)
node.query(
"INSERT INTO async_insert_30_10_2022 SETTINGS async_insert = 1 VALUES ([1,2,3], 1)",
ignore_error=True,
)
node.query(
'INSERT INTO async_insert_30_10_2022 SETTINGS async_insert = 1 FORMAT JSONEachRow {"id" : 1} {"x"}',
ignore_error=True,
)
node.query(
"INSERT INTO async_insert_30_10_2022 SETTINGS async_insert = 1 VALUES (throwIf(4),'')",
ignore_error=True,
)
select_query = (
"SELECT value FROM system.events WHERE event == 'FailedAsyncInsertQuery'"
)
assert node.query(select_query) == "4\n"
node.query("DROP TABLE IF EXISTS async_insert_30_10_2022 NO DELAY")

View File

@ -14,3 +14,5 @@ fooba
foobar
1 1
Zm9v
foo

View File

@ -14,3 +14,6 @@ SELECT base64Decode(val, 'excess argument') FROM (select arrayJoin(['', 'Zg==',
SELECT tryBase64Decode('Zm9vYmF=Zm9v', 'excess argument'); -- { serverError 42 }
SELECT base64Decode('Zm9vYmF=Zm9v'); -- { serverError 117 }
select base64Encode(toFixedString('foo', 3));
select base64Decode(toFixedString('Zm9v', 4));

View File

@ -120,8 +120,9 @@ class HttpProcessor(BaseHTTPRequestHandler):
allow_range = False
range_used = False
get_call_num = 0
responses_to_get = []
def send_head(self):
def send_head(self, from_get = False):
if self.headers["Range"] and HttpProcessor.allow_range:
try:
self.range = parse_byte_range(self.headers["Range"])
@ -145,7 +146,14 @@ class HttpProcessor(BaseHTTPRequestHandler):
self.send_error(416, "Requested Range Not Satisfiable")
return None
self.send_response(206 if HttpProcessor.allow_range else 200)
retry_range_request = first != 0 and from_get is True and len(HttpProcessor.responses_to_get) > 0
if retry_range_request:
code = HttpProcessor.responses_to_get.pop()
if code not in HttpProcessor.responses:
self.send_response(int(code))
else:
self.send_response(206 if HttpProcessor.allow_range else 200)
self.send_header("Content-type", "application/json")
if HttpProcessor.allow_range:
@ -169,7 +177,7 @@ class HttpProcessor(BaseHTTPRequestHandler):
self.send_head()
def do_GET(self):
result = self.send_head()
result = self.send_head(True)
if result == None:
return
@ -211,26 +219,36 @@ def start_server():
#####################################################################
def test_select(download_buffer_size):
def test_select(settings):
global HTTP_SERVER_URL_STR
query = f"SELECT * FROM url('{HTTP_SERVER_URL_STR}','JSONAsString') SETTINGS max_download_buffer_size={download_buffer_size};"
query = f"SELECT * FROM url('{HTTP_SERVER_URL_STR}','JSONAsString') SETTINGS {','.join((k+'='+repr(v) for k, v in settings.items()))};"
check_answers(query, EXPECTED_ANSWER)
def run_test(allow_range, download_buffer_size=20):
def run_test(allow_range, settings, check_retries=False):
HttpProcessor.range_used = False
HttpProcessor.get_call_num = 0
HttpProcessor.allow_range = allow_range
if check_retries:
HttpProcessor.responses_to_get = ["500", "200", "206"]
retries_num = len(HttpProcessor.responses_to_get)
t, httpd = start_server()
t.start()
test_select(download_buffer_size)
test_select(settings)
download_buffer_size = settings["max_download_buffer_size"]
expected_get_call_num = (PAYLOAD_LEN - 1) // download_buffer_size + 1
if allow_range:
if not HttpProcessor.range_used:
raise Exception("HTTP Range was not used when supported")
if check_retries and len(HttpProcessor.responses_to_get) > 0:
raise Exception("Expected to get http response 500, which had to be retried, but 200 ok returned and then retried")
if retries_num > 0:
expected_get_call_num += retries_num - 1
if expected_get_call_num != HttpProcessor.get_call_num:
raise Exception(
f"Invalid amount of GET calls with Range. Expected {expected_get_call_num}, actual {HttpProcessor.get_call_num}"
@ -245,9 +263,23 @@ def run_test(allow_range, download_buffer_size=20):
def main():
run_test(allow_range=False)
run_test(allow_range=True, download_buffer_size=20)
run_test(allow_range=True, download_buffer_size=10)
settings = {"max_download_buffer_size" : 20}
# Test Accept-Ranges=False
run_test(allow_range=False, settings=settings)
# Test Accept-Ranges=True, parallel download is used
run_test(allow_range=True, settings=settings)
# Test Accept-Ranges=True, parallel download is used
settings = {"max_download_buffer_size" : 10}
run_test(allow_range=True, settings=settings)
# Test Accept-Ranges=True, parallel download is not used,
# first get request 500 response,
# second get request 200ok response,
# third get request (retry) 206 response.
settings["max_download_threads"] = 2
run_test(allow_range=True, settings=settings, check_retries=True)
if __name__ == "__main__":

View File

@ -1,3 +1,4 @@
PASSED
PASSED
PASSED
PASSED

View File

@ -0,0 +1,10 @@
SELECT analysisOfVariance(number, number % 2) FROM numbers(10) FORMAT Null;
SELECT analysisOfVariance(number :: Decimal32(5), number % 2) FROM numbers(10) FORMAT Null;
SELECT analysisOfVariance(number :: Decimal256(5), number % 2) FROM numbers(10) FORMAT Null;
SELECT analysisOfVariance(1.11, -20); -- { serverError BAD_ARGUMENTS }
SELECT analysisOfVariance(1.11, 20 :: UInt128); -- { serverError BAD_ARGUMENTS }
SELECT analysisOfVariance(1.11, 9000000000000000); -- { serverError BAD_ARGUMENTS }
SELECT analysisOfVariance(number, number % 2), analysisOfVariance(100000000000000000000., number % 65535) FROM numbers(1048575); -- { serverError BAD_ARGUMENTS }

View File

@ -0,0 +1,2 @@
4 6
4 4

View File

@ -0,0 +1,16 @@
DROP TABLE IF EXISTS tab1;
DROP TABLE IF EXISTS tab2;
SET allow_suspicious_low_cardinality_types = 1;
CREATE TABLE tab1 (a1 Int32, b1 Int32, val UInt64) ENGINE = MergeTree ORDER BY a1;
CREATE TABLE tab2 (a2 LowCardinality(Int32), b2 Int32) ENGINE = MergeTree ORDER BY a2;
INSERT INTO tab1 SELECT number, number, 1 from numbers(4);
INSERT INTO tab2 SELECT number + 2, number + 2 from numbers(4);
SELECT sum(val), count(val) FROM tab1 FULL OUTER JOIN tab2 ON b1 - 2 = a2 OR a1 = b2 SETTINGS join_use_nulls = 0;
SELECT sum(val), count(val) FROM tab1 FULL OUTER JOIN tab2 ON b1 - 2 = a2 OR a1 = b2 SETTINGS join_use_nulls = 1;
DROP TABLE IF EXISTS tab1;
DROP TABLE IF EXISTS tab2;