Merge branch 'master' into fix_trash

This commit is contained in:
Alexander Tokmakov 2022-05-24 17:45:51 +02:00
commit bb5f04efaf
117 changed files with 1441 additions and 324 deletions

View File

@ -105,6 +105,25 @@
# define ASAN_POISON_MEMORY_REGION(a, b)
#endif
#if !defined(ABORT_ON_LOGICAL_ERROR)
#if !defined(NDEBUG) || defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER) || defined(MEMORY_SANITIZER) || defined(UNDEFINED_BEHAVIOR_SANITIZER)
#define ABORT_ON_LOGICAL_ERROR
#endif
#endif
/// chassert(x) is similar to assert(x), but:
/// - works in builds with sanitizers, not only in debug builds
/// - tries to print failed assertion into server log
/// It can be used for all assertions except heavy ones.
/// Heavy assertions (that run loops or call complex functions) are allowed in debug builds only.
#if !defined(chassert)
#if defined(ABORT_ON_LOGICAL_ERROR)
#define chassert(x) static_cast<bool>(x) ? void(0) : abortOnFailedAssertion(#x)
#else
#define chassert(x) ((void)0)
#endif
#endif
/// A template function for suppressing warnings about unused variables or function results.
template <typename... Args>
constexpr void UNUSED(Args &&... args [[maybe_unused]])

View File

@ -170,7 +170,13 @@ endif ()
target_compile_definitions(_jemalloc PRIVATE -DJEMALLOC_PROF=1)
if (USE_UNWIND)
target_compile_definitions (_jemalloc PRIVATE -DJEMALLOC_PROF_LIBUNWIND=1)
# jemalloc provides support for two different libunwind flavors: the original HP libunwind and the one coming with gcc / g++ / libstdc++.
# The latter is identified by `JEMALLOC_PROF_LIBGCC` and uses `_Unwind_Backtrace` method instead of `unw_backtrace`.
# At the time ClickHouse uses LLVM libunwind which follows libgcc's way of backtracing.
# ClickHouse has to provide `unw_backtrace` method by the means of [commit 8e2b31e](https://github.com/ClickHouse/libunwind/commit/8e2b31e766dd502f6df74909e04a7dbdf5182eb1).
target_compile_definitions (_jemalloc PRIVATE -DJEMALLOC_PROF_LIBGCC=1)
target_link_libraries (_jemalloc PRIVATE unwind)
endif ()

View File

@ -1,4 +1,3 @@
# rebuild in #33610
# docker build -t clickhouse/docs-release .
FROM ubuntu:20.04

View File

@ -3,8 +3,6 @@
from multiprocessing import cpu_count
from subprocess import Popen, call, check_output, STDOUT
import os
import sys
import shutil
import argparse
import logging
import time
@ -31,6 +29,9 @@ def get_options(i, backward_compatibility_check):
if i % 5 == 1:
client_options.append("join_use_nulls=1")
if i % 15 == 1:
client_options.append("join_algorithm='parallel_hash'")
if i % 15 == 6:
client_options.append("join_algorithm='partial_merge'")

View File

@ -9,11 +9,6 @@ cmake .. \
-DCMAKE_C_COMPILER=$(which clang-13) \
-DCMAKE_CXX_COMPILER=$(which clang++-13) \
-DCMAKE_BUILD_TYPE=Debug \
-DENABLE_CLICKHOUSE_ALL=OFF \
-DENABLE_CLICKHOUSE_SERVER=ON \
-DENABLE_CLICKHOUSE_CLIENT=ON \
-DENABLE_LIBRARIES=OFF \
-DUSE_UNWIND=ON \
-DENABLE_UTILS=OFF \
-DENABLE_TESTS=OFF
```

View File

@ -13,11 +13,6 @@ cmake .. \
-DCMAKE_C_COMPILER=$(which clang-13) \
-DCMAKE_CXX_COMPILER=$(which clang++-13) \
-DCMAKE_BUILD_TYPE=Debug \
-DENABLE_CLICKHOUSE_ALL=OFF \
-DENABLE_CLICKHOUSE_SERVER=ON \
-DENABLE_CLICKHOUSE_CLIENT=ON \
-DENABLE_LIBRARIES=OFF \
-DUSE_UNWIND=ON \
-DENABLE_UTILS=OFF \
-DENABLE_TESTS=OFF
```

View File

@ -12,12 +12,11 @@
#
set -ex
BASE_DIR=$(dirname $(readlink -f $0))
BASE_DIR=$(dirname "$(readlink -f "$0")")
GIT_USER=${GIT_USER:-$USER}
GIT_TEST_URI=git@github.com:${GIT_USER}/clickhouse.github.io.git \
GIT_PROD_URI=git@github.com:${GIT_USER}/clickhouse.github.io.git \
BASE_DOMAIN=${GIT_USER}-test.clickhouse.com \
EXTRA_BUILD_ARGS="${@}" \
EXTRA_BUILD_ARGS="${*}" \
CLOUDFLARE_TOKEN="" \
HISTORY_SIZE=3 \
${BASE_DIR}/release.sh
"${BASE_DIR}/release.sh"

View File

@ -1,24 +1,24 @@
#!/usr/bin/env bash
set -ex
BASE_DIR=$(dirname $(readlink -f $0))
BASE_DIR=$(dirname "$(readlink -f "$0")")
BUILD_DIR="${BASE_DIR}/../build"
PUBLISH_DIR="${BASE_DIR}/../publish"
BASE_DOMAIN="${BASE_DOMAIN:-content.clickhouse.com}"
GIT_TEST_URI="${GIT_TEST_URI:-git@github.com:ClickHouse/clickhouse-com-content.git}"
GIT_PROD_URI="git@github.com:ClickHouse/clickhouse-website-content.git"
GIT_PROD_URI="${GIT_PROD_URI:-git@github.com:ClickHouse/clickhouse-com-content.git}"
EXTRA_BUILD_ARGS="${EXTRA_BUILD_ARGS:---verbose}"
if [[ -z "$1" ]]
then
source "${BASE_DIR}/venv/bin/activate"
# shellcheck disable=2086
python3 "${BASE_DIR}/build.py" ${EXTRA_BUILD_ARGS}
rm -rf "${PUBLISH_DIR}"
mkdir "${PUBLISH_DIR}" && cd "${PUBLISH_DIR}"
# Will make a repository with website content as the only commit.
git init
git remote add origin "${GIT_TEST_URI}"
git remote add origin "${GIT_PROD_URI}"
git config user.email "robot-clickhouse@clickhouse.com"
git config user.name "robot-clickhouse"
@ -28,7 +28,7 @@ then
echo -n "" > README.md
echo -n "" > ".nojekyll"
cp "${BASE_DIR}/../../LICENSE" .
git add *
git add ./*
git add ".nojekyll"
git commit --quiet -m "Add new release at $(date)"
@ -40,7 +40,7 @@ then
# Turn off logging.
set +x
if [[ ! -z "${CLOUDFLARE_TOKEN}" ]]
if [[ -n "${CLOUDFLARE_TOKEN}" ]]
then
sleep 1m
# https://api.cloudflare.com/#zone-purge-files-by-cache-tags,-host-or-prefix

View File

@ -81,6 +81,8 @@
{
height: 100%;
margin: 0;
/* This enables position: sticky on controls */
overflow: auto;
}
html
@ -89,9 +91,26 @@
font-family: Liberation Sans, DejaVu Sans, sans-serif, Noto Color Emoji, Apple Color Emoji, Segoe UI Emoji;
background: var(--background-color);
color: var(--text-color);
}
body
{
/* This element will show scroll-bar on overflow, and the scroll-bar will be outside of the padding. */
padding: 0.5rem;
}
#controls
{
/* Make enough space for even huge queries. */
height: 20%;
/* When a page will be scrolled horizontally due to large table size, keep controls in place. */
position: sticky;
left: 0;
/* This allows query textarea to occupy the remaining height while other elements have fixed height. */
display: flex;
flex-direction: column;
}
/* Otherwise Webkit based browsers will display ugly border on focus. */
textarea, input, button
{
@ -129,8 +148,7 @@
#query_div
{
/* Make enough space for even huge queries. */
height: 20%;
height: 100%;
}
#query
@ -380,19 +398,21 @@
</head>
<body>
<div id="inputs">
<input class="monospace shadow" id="url" type="text" value="http://localhost:8123/" placeholder="url" /><input class="monospace shadow" id="user" type="text" value="default" placeholder="user" /><input class="monospace shadow" id="password" type="password" placeholder="password" />
</div>
<div id="query_div">
<textarea autofocus spellcheck="false" class="monospace shadow" id="query"></textarea>
</div>
<div id="run_div">
<button class="shadow" id="run">Run</button>
<span class="hint">&nbsp;(Ctrl/Cmd+Enter)</span>
<span id="hourglass"></span>
<span id="check-mark"></span>
<span id="stats"></span>
<span id="toggle-dark">🌑</span><span id="toggle-light">🌞</span>
<div id="controls">
<div id="inputs">
<input class="monospace shadow" id="url" type="text" value="http://localhost:8123/" placeholder="url" /><input class="monospace shadow" id="user" type="text" value="default" placeholder="user" /><input class="monospace shadow" id="password" type="password" placeholder="password" />
</div>
<div id="query_div">
<textarea autofocus spellcheck="false" class="monospace shadow" id="query"></textarea>
</div>
<div id="run_div">
<button class="shadow" id="run">Run</button>
<span class="hint">&nbsp;(Ctrl/Cmd+Enter)</span>
<span id="hourglass"></span>
<span id="check-mark"></span>
<span id="stats"></span>
<span id="toggle-dark">🌑</span><span id="toggle-light">🌞</span>
</div>
</div>
<div id="data_div">
<table class="monospace-table shadow" id="data-table"></table>

View File

@ -627,6 +627,7 @@
M(656, MEILISEARCH_EXCEPTION) \
M(657, UNSUPPORTED_MEILISEARCH_TYPE) \
M(658, MEILISEARCH_MISSING_SOME_COLUMNS) \
M(659, UNKNOWN_STATUS_OF_TRANSACTION) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -35,6 +35,18 @@ namespace ErrorCodes
extern const int CANNOT_MREMAP;
}
void abortOnFailedAssertion(const String & description)
{
LOG_FATAL(&Poco::Logger::root(), "Logical error: '{}'.", description);
/// This is to suppress -Wmissing-noreturn
volatile bool always_false = false;
if (always_false)
return;
abort();
}
/// - Aborts the process if error code is LOGICAL_ERROR.
/// - Increments error codes statistics.
void handle_error_code([[maybe_unused]] const std::string & msg, int code, bool remote, const Exception::FramePointers & trace)
@ -44,8 +56,7 @@ void handle_error_code([[maybe_unused]] const std::string & msg, int code, bool
#ifdef ABORT_ON_LOGICAL_ERROR
if (code == ErrorCodes::LOGICAL_ERROR)
{
LOG_FATAL(&Poco::Logger::root(), "Logical error: '{}'.", msg);
abort();
abortOnFailedAssertion(msg);
}
#endif

View File

@ -12,16 +12,14 @@
#include <fmt/format.h>
#if !defined(NDEBUG) || defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER) || defined(MEMORY_SANITIZER) || defined(UNDEFINED_BEHAVIOR_SANITIZER)
#define ABORT_ON_LOGICAL_ERROR
#endif
namespace Poco { class Logger; }
namespace DB
{
void abortOnFailedAssertion(const String & description);
class Exception : public Poco::Exception
{
public:

View File

@ -22,15 +22,22 @@ void CompressedWriteBuffer::nextImpl()
if (!offset())
return;
UInt32 compressed_size = 0;
size_t decompressed_size = offset();
UInt32 compressed_reserve_size = codec->getCompressedReserveSize(decompressed_size);
if (out.available() > compressed_reserve_size + CHECKSUM_SIZE)
/** During compression we need buffer with capacity >= compressed_reserve_size + CHECKSUM_SIZE.
*
* If output buffer has necessary capacity, we can compress data directly in output buffer.
* Then we can write checksum at the output buffer begin.
*
* If output buffer does not have necessary capacity. Compress data in temporary buffer.
* Then we can write checksum and temporary buffer in output buffer.
*/
if (out.available() >= compressed_reserve_size + CHECKSUM_SIZE)
{
char * out_checksum_ptr = out.position();
char * out_compressed_ptr = out.position() + CHECKSUM_SIZE;
compressed_size = codec->compress(working_buffer.begin(), decompressed_size, out_compressed_ptr);
UInt32 compressed_size = codec->compress(working_buffer.begin(), decompressed_size, out_compressed_ptr);
CityHash_v1_0_2::uint128 checksum = CityHash_v1_0_2::CityHash128(out_compressed_ptr, compressed_size);
memcpy(out_checksum_ptr, reinterpret_cast<const char *>(&checksum), CHECKSUM_SIZE);
@ -39,7 +46,7 @@ void CompressedWriteBuffer::nextImpl()
else
{
compressed_buffer.resize(compressed_reserve_size);
compressed_size = codec->compress(working_buffer.begin(), decompressed_size, compressed_buffer.data());
UInt32 compressed_size = codec->compress(working_buffer.begin(), decompressed_size, compressed_buffer.data());
CityHash_v1_0_2::uint128 checksum = CityHash_v1_0_2::CityHash128(compressed_buffer.data(), compressed_size);
out.write(reinterpret_cast<const char *>(&checksum), CHECKSUM_SIZE);

View File

@ -466,20 +466,23 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
{
if (is_recovering)
{
const auto finish_recovering = [&]
{
auto new_params = raft_instance->get_current_params();
new_params.custom_commit_quorum_size_ = 0;
new_params.custom_election_quorum_size_ = 0;
raft_instance->update_params(new_params);
LOG_INFO(log, "Recovery is done. You can continue using cluster normally.");
is_recovering = false;
};
switch (type)
{
case nuraft::cb_func::HeartBeat:
{
if (raft_instance->isClusterHealthy())
{
auto new_params = raft_instance->get_current_params();
new_params.custom_commit_quorum_size_ = 0;
new_params.custom_election_quorum_size_ = 0;
raft_instance->update_params(new_params);
LOG_INFO(log, "Recovery is done. You can continue using cluster normally.");
is_recovering = false;
}
finish_recovering();
break;
}
case nuraft::cb_func::NewConfig:
@ -490,8 +493,19 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
// Because we manually set the config to commit
// we need to call the reconfigure also
uint64_t log_idx = *static_cast<uint64_t *>(param->ctx);
if (log_idx == state_manager->load_config()->get_log_idx())
raft_instance->forceReconfigure(state_manager->load_config());
auto config = state_manager->load_config();
if (log_idx == config->get_log_idx())
{
raft_instance->forceReconfigure(config);
// Single node cluster doesn't need to wait for any other nodes
// so we can finish recovering immediately after applying
// new configuration
if (config->get_servers().size() == 1)
finish_recovering();
}
break;
}
case nuraft::cb_func::ProcessReq:

View File

@ -156,7 +156,7 @@ inline DecimalComponents<DecimalType> splitWithScaleMultiplier(
using T = typename DecimalType::NativeType;
const auto whole = decimal.value / scale_multiplier;
auto fractional = decimal.value % scale_multiplier;
if (fractional < T(0))
if (whole && fractional < T(0))
fractional *= T(-1);
return {whole, fractional};
@ -199,7 +199,7 @@ inline typename DecimalType::NativeType getFractionalPartWithScaleMultiplier(
/// Anycase we make modulo before compare to make scale_multiplier > 1 unaffected.
T result = decimal.value % scale_multiplier;
if constexpr (!keep_sign)
if (result < T(0))
if (decimal.value / scale_multiplier && result < T(0))
result = -result;
return result;

View File

@ -592,6 +592,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(String, insert_deduplication_token, "", "If not empty, used for duplicate detection instead of data digest", 0) \
M(Bool, count_distinct_optimization, false, "Rewrite count distinct to subquery of group by", 0) \
M(Bool, throw_on_unsupported_query_inside_transaction, true, "Throw exception if unsupported query is used inside transaction", 0) \
M(TransactionsWaitCSNMode, wait_changes_become_visible_after_commit_mode, TransactionsWaitCSNMode::WAIT_UNKNOWN, "Wait for committed changes to become actually visible in the latest snapshot", 0) \
M(Bool, throw_if_no_data_to_insert, true, "Enables or disables empty INSERTs, enabled by default", 0) \
M(Bool, compatibility_ignore_auto_increment_in_create_table, false, "Ignore AUTO_INCREMENT keyword in column declaration if true, otherwise return error. It simplifies migration from MySQL", 0) \
// End of COMMON_SETTINGS
@ -637,7 +638,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, output_format_csv_crlf_end_of_line, false, "If it is set true, end of line in CSV format will be \\r\\n instead of \\n.", 0) \
M(Bool, input_format_csv_enum_as_number, false, "Treat inserted enum values in CSV formats as enum indices \\N", 0) \
M(Bool, input_format_csv_arrays_as_nested_csv, false, R"(When reading Array from CSV, expect that its elements were serialized in nested CSV and then put into string. Example: "[""Hello"", ""world"", ""42"""" TV""]". Braces around array can be omitted.)", 0) \
M(Bool, input_format_skip_unknown_fields, false, "Skip columns with unknown names from input data (it works for JSONEachRow, -WithNames, -WithNamesAndTypes and TSKV formats).", 0) \
M(Bool, input_format_skip_unknown_fields, true, "Skip columns with unknown names from input data (it works for JSONEachRow, -WithNames, -WithNamesAndTypes and TSKV formats).", 0) \
M(Bool, input_format_with_names_use_header, true, "For -WithNames input formats this controls whether format parser is to assume that column data appear in the input exactly as they are specified in the header.", 0) \
M(Bool, input_format_with_types_use_header, true, "For -WithNamesAndTypes input formats this controls whether format parser should check if data types from the input match data types from the header.", 0) \
M(Bool, input_format_import_nested_json, false, "Map nested JSON data to nested tables (it works for JSONEachRow format).", 0) \

View File

@ -131,6 +131,11 @@ IMPLEMENT_SETTING_ENUM(ShortCircuitFunctionEvaluation, ErrorCodes::BAD_ARGUMENTS
{"force_enable", ShortCircuitFunctionEvaluation::FORCE_ENABLE},
{"disable", ShortCircuitFunctionEvaluation::DISABLE}})
IMPLEMENT_SETTING_ENUM(TransactionsWaitCSNMode, ErrorCodes::BAD_ARGUMENTS,
{{"async", TransactionsWaitCSNMode::ASYNC},
{"wait", TransactionsWaitCSNMode::WAIT},
{"wait_unknown", TransactionsWaitCSNMode::WAIT_UNKNOWN}})
IMPLEMENT_SETTING_ENUM(EnumComparingMode, ErrorCodes::BAD_ARGUMENTS,
{{"by_names", FormatSettings::EnumComparingMode::BY_NAMES},
{"by_values", FormatSettings::EnumComparingMode::BY_VALUES},

View File

@ -183,6 +183,15 @@ enum class ShortCircuitFunctionEvaluation
DECLARE_SETTING_ENUM(ShortCircuitFunctionEvaluation)
enum class TransactionsWaitCSNMode
{
ASYNC,
WAIT,
WAIT_UNKNOWN,
};
DECLARE_SETTING_ENUM(TransactionsWaitCSNMode)
DECLARE_SETTING_ENUM_WITH_RENAME(EnumComparingMode, FormatSettings::EnumComparingMode)
DECLARE_SETTING_ENUM_WITH_RENAME(EscapingRule, FormatSettings::EscapingRule)

View File

@ -176,7 +176,7 @@ INSTANTIATE_TEST_SUITE_P(Basic,
}
},
{
"When scale is not 0 and whole part is 0.",
"For positive Decimal value, with scale not 0, and whole part is 0.",
123,
3,
{
@ -184,6 +184,16 @@ INSTANTIATE_TEST_SUITE_P(Basic,
123
}
},
{
"For negative Decimal value, with scale not 0, and whole part is 0.",
-123,
3,
{
0,
-123
}
},
{
"For negative Decimal value whole part is negative, fractional is non-negative.",
-1234567'89,
@ -216,6 +226,24 @@ INSTANTIATE_TEST_SUITE_P(Basic,
187618332,
123
}
},
{
"Negative timestamp 1969-12-31 23:59:59.123 UTC",
DateTime64(-877),
3,
{
0,
-877
}
},
{
"Positive timestamp 1970-01-01 00:00:00.123 UTC",
DateTime64(123),
3,
{
0,
123
}
}
})
);

View File

@ -71,7 +71,7 @@ String escapingRuleToString(FormatSettings::EscapingRule escaping_rule)
void skipFieldByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule escaping_rule, const FormatSettings & format_settings)
{
String tmp;
NullOutput out;
constexpr const char * field_name = "<SKIPPED COLUMN>";
constexpr size_t field_name_len = 16;
switch (escaping_rule)
@ -80,19 +80,19 @@ void skipFieldByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule esca
/// Empty field, just skip spaces
break;
case FormatSettings::EscapingRule::Escaped:
readEscapedString(tmp, buf);
readEscapedStringInto(out, buf);
break;
case FormatSettings::EscapingRule::Quoted:
readQuotedField(tmp, buf);
readQuotedFieldInto(out, buf);
break;
case FormatSettings::EscapingRule::CSV:
readCSVString(tmp, buf, format_settings.csv);
readCSVStringInto(out, buf, format_settings.csv);
break;
case FormatSettings::EscapingRule::JSON:
skipJSONField(buf, StringRef(field_name, field_name_len));
break;
case FormatSettings::EscapingRule::Raw:
readString(tmp, buf);
readStringInto(out, buf);
break;
default:
__builtin_unreachable();

View File

@ -541,19 +541,19 @@ void FormatFactory::markOutputFormatSupportsParallelFormatting(const String & na
}
void FormatFactory::markFormatAsColumnOriented(const String & name)
void FormatFactory::markFormatSupportsSubsetOfColumns(const String & name)
{
auto & target = dict[name].is_column_oriented;
auto & target = dict[name].supports_subset_of_columns;
if (target)
throw Exception("FormatFactory: Format " + name + " is already marked as column oriented", ErrorCodes::LOGICAL_ERROR);
throw Exception("FormatFactory: Format " + name + " is already marked as supporting subset of columns", ErrorCodes::LOGICAL_ERROR);
target = true;
}
bool FormatFactory::checkIfFormatIsColumnOriented(const String & name)
bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const String & name) const
{
const auto & target = getCreators(name);
return target.is_column_oriented;
return target.supports_subset_of_columns;
}
bool FormatFactory::isInputFormat(const String & name) const
@ -568,19 +568,19 @@ bool FormatFactory::isOutputFormat(const String & name) const
return it != dict.end() && it->second.output_creator;
}
bool FormatFactory::checkIfFormatHasSchemaReader(const String & name)
bool FormatFactory::checkIfFormatHasSchemaReader(const String & name) const
{
const auto & target = getCreators(name);
return bool(target.schema_reader_creator);
}
bool FormatFactory::checkIfFormatHasExternalSchemaReader(const String & name)
bool FormatFactory::checkIfFormatHasExternalSchemaReader(const String & name) const
{
const auto & target = getCreators(name);
return bool(target.external_schema_reader_creator);
}
bool FormatFactory::checkIfFormatHasAnySchemaReader(const String & name)
bool FormatFactory::checkIfFormatHasAnySchemaReader(const String & name) const
{
return checkIfFormatHasSchemaReader(name) || checkIfFormatHasExternalSchemaReader(name);
}

View File

@ -108,7 +108,7 @@ private:
SchemaReaderCreator schema_reader_creator;
ExternalSchemaReaderCreator external_schema_reader_creator;
bool supports_parallel_formatting{false};
bool is_column_oriented{false};
bool supports_subset_of_columns{false};
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
AppendSupportChecker append_support_checker;
};
@ -194,13 +194,13 @@ public:
void registerExternalSchemaReader(const String & name, ExternalSchemaReaderCreator external_schema_reader_creator);
void markOutputFormatSupportsParallelFormatting(const String & name);
void markFormatAsColumnOriented(const String & name);
void markFormatSupportsSubsetOfColumns(const String & name);
bool checkIfFormatIsColumnOriented(const String & name);
bool checkIfFormatSupportsSubsetOfColumns(const String & name) const;
bool checkIfFormatHasSchemaReader(const String & name);
bool checkIfFormatHasExternalSchemaReader(const String & name);
bool checkIfFormatHasAnySchemaReader(const String & name);
bool checkIfFormatHasSchemaReader(const String & name) const;
bool checkIfFormatHasExternalSchemaReader(const String & name) const;
bool checkIfFormatHasAnySchemaReader(const String & name) const;
const FormatsDictionary & getAllFormats() const
{

View File

@ -23,6 +23,7 @@ namespace ErrorCodes
extern const int INCORRECT_INDEX;
extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_ALL_DATA;
extern const int INCORRECT_DATA;
}
@ -31,8 +32,8 @@ NativeReader::NativeReader(ReadBuffer & istr_, UInt64 server_revision_)
{
}
NativeReader::NativeReader(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_)
: istr(istr_), header(header_), server_revision(server_revision_)
NativeReader::NativeReader(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_, bool skip_unknown_columns_)
: istr(istr_), header(header_), server_revision(server_revision_), skip_unknown_columns(skip_unknown_columns_)
{
}
@ -186,18 +187,29 @@ Block NativeReader::read()
column.column = std::move(read_column);
bool use_in_result = true;
if (header)
{
/// Support insert from old clients without low cardinality type.
auto & header_column = header.getByName(column.name);
if (!header_column.type->equals(*column.type))
if (header.has(column.name))
{
column.column = recursiveTypeConversion(column.column, column.type, header.safeGetByPosition(i).type);
column.type = header.safeGetByPosition(i).type;
/// Support insert from old clients without low cardinality type.
auto & header_column = header.getByName(column.name);
if (!header_column.type->equals(*column.type))
{
column.column = recursiveTypeConversion(column.column, column.type, header.safeGetByPosition(i).type);
column.type = header.safeGetByPosition(i).type;
}
}
else
{
if (!skip_unknown_columns)
throw Exception(ErrorCodes::INCORRECT_DATA, "Unknown column with name {} found while reading data in Native format", column.name);
use_in_result = false;
}
}
res.insert(std::move(column));
if (use_in_result)
res.insert(std::move(column));
if (use_index)
++index_column_it;

View File

@ -24,7 +24,7 @@ public:
/// For cases when data structure (header) is known in advance.
/// NOTE We may use header for data validation and/or type conversions. It is not implemented.
NativeReader(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_);
NativeReader(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_, bool skip_unknown_columns_ = false);
/// For cases when we have an index. It allows to skip columns. Only columns specified in the index will be read.
NativeReader(ReadBuffer & istr_, UInt64 server_revision_,
@ -43,6 +43,7 @@ private:
ReadBuffer & istr;
Block header;
UInt64 server_revision;
bool skip_unknown_columns;
bool use_index = false;
IndexForNativeFormat::Blocks::const_iterator index_block_it;

View File

@ -10,4 +10,10 @@ void registerWithNamesAndTypes(const std::string & base_format_name, RegisterWit
register_func(base_format_name + "WithNamesAndTypes", true, true);
}
void markFormatWithNamesAndTypesSupportsSamplingColumns(const std::string & base_format_name, FormatFactory & factory)
{
factory.markFormatSupportsSubsetOfColumns(base_format_name + "WithNames");
factory.markFormatSupportsSubsetOfColumns(base_format_name + "WithNamesAndTypes");
}
}

View File

@ -2,6 +2,7 @@
#include <string>
#include <functional>
#include <Formats/FormatFactory.h>
namespace DB
{
@ -9,4 +10,6 @@ namespace DB
using RegisterWithNamesAndTypesFunc = std::function<void(const std::string & format_name, bool with_names, bool with_types)>;
void registerWithNamesAndTypes(const std::string & base_format_name, RegisterWithNamesAndTypesFunc register_func);
void markFormatWithNamesAndTypesSupportsSamplingColumns(const std::string & base_format_name, FormatFactory & factory);
}

View File

@ -213,7 +213,7 @@ private:
template <typename Name, Float64(Function)(Float64, Float64)>
struct BinaryFunctionPlain
struct BinaryFunctionVectorized
{
static constexpr auto name = Name::name;
static constexpr auto rows_per_iteration = 1;
@ -225,6 +225,4 @@ struct BinaryFunctionPlain
}
};
#define BinaryFunctionVectorized BinaryFunctionPlain
}

View File

@ -106,7 +106,7 @@ struct NormalizeUTF8Impl
size_t from_size = offsets[i] - current_from_offset - 1;
from_uchars.resize(from_size + 1);
int32_t from_code_points;
int32_t from_code_points = 0;
u_strFromUTF8(
from_uchars.data(),
from_uchars.size(),
@ -133,7 +133,7 @@ struct NormalizeUTF8Impl
if (res_data.size() < max_to_size)
res_data.resize(max_to_size);
int32_t to_size;
int32_t to_size = 0;
u_strToUTF8(
reinterpret_cast<char*>(&res_data[current_to_offset]),
res_data.size() - current_to_offset,
@ -151,6 +151,8 @@ struct NormalizeUTF8Impl
current_from_offset = offsets[i];
}
res_data.resize(current_to_offset);
}
[[noreturn]] static void vectorFixed(const ColumnString::Chars &, size_t, ColumnString::Chars &)

View File

@ -256,6 +256,7 @@ void readString(String & s, ReadBuffer & buf)
template void readStringInto<PaddedPODArray<UInt8>>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readStringInto<String>(String & s, ReadBuffer & buf);
template void readStringInto<NullOutput>(NullOutput & s, ReadBuffer & buf);
template <typename Vector>
void readStringUntilEOFInto(Vector & s, ReadBuffer & buf)
@ -617,6 +618,12 @@ void readBackQuotedStringWithSQLStyle(String & s, ReadBuffer & buf)
readBackQuotedStringInto<true>(s, buf);
}
template<typename T>
concept WithResize = requires (T value)
{
{ value.resize(1) };
{ value.size() } -> std::integral<>;
};
template <typename Vector>
void readCSVStringInto(Vector & s, ReadBuffer & buf, const FormatSettings::CSV & settings)
@ -700,16 +707,18 @@ void readCSVStringInto(Vector & s, ReadBuffer & buf, const FormatSettings::CSV &
if (!buf.hasPendingData())
continue;
/** CSV format can contain insignificant spaces and tabs.
* Usually the task of skipping them is for the calling code.
* But in this case, it will be difficult to do this, so remove the trailing whitespace by ourself.
*/
size_t size = s.size();
while (size > 0
&& (s[size - 1] == ' ' || s[size - 1] == '\t'))
--size;
if constexpr (WithResize<Vector>)
{
/** CSV format can contain insignificant spaces and tabs.
* Usually the task of skipping them is for the calling code.
* But in this case, it will be difficult to do this, so remove the trailing whitespace by ourself.
*/
size_t size = s.size();
while (size > 0 && (s[size - 1] == ' ' || s[size - 1] == '\t'))
--size;
s.resize(size);
s.resize(size);
}
return;
}
}
@ -741,6 +750,7 @@ void readCSVField(String & s, ReadBuffer & buf, const FormatSettings::CSV & sett
}
template void readCSVStringInto<PaddedPODArray<UInt8>>(PaddedPODArray<UInt8> & s, ReadBuffer & buf, const FormatSettings::CSV & settings);
template void readCSVStringInto<NullOutput>(NullOutput & s, ReadBuffer & buf, const FormatSettings::CSV & settings);
template <typename Vector, typename ReturnType>
@ -1313,8 +1323,8 @@ void skipToNextRowOrEof(PeekableReadBuffer & buf, const String & row_after_delim
}
// Use PeekableReadBuffer to copy field to string after parsing.
template <typename ParseFunc>
static void readParsedValueIntoString(String & s, ReadBuffer & buf, ParseFunc parse_func)
template <typename Vector, typename ParseFunc>
static void readParsedValueInto(Vector & s, ReadBuffer & buf, ParseFunc parse_func)
{
PeekableReadBuffer peekable_buf(buf);
peekable_buf.setCheckpoint();
@ -1326,8 +1336,8 @@ static void readParsedValueIntoString(String & s, ReadBuffer & buf, ParseFunc pa
peekable_buf.position() = end;
}
template <char opening_bracket, char closing_bracket>
static void readQuotedFieldInBrackets(String & s, ReadBuffer & buf)
template <char opening_bracket, char closing_bracket, typename Vector>
static void readQuotedFieldInBracketsInto(Vector & s, ReadBuffer & buf)
{
assertChar(opening_bracket, buf);
s.push_back(opening_bracket);
@ -1363,10 +1373,9 @@ static void readQuotedFieldInBrackets(String & s, ReadBuffer & buf)
}
}
void readQuotedField(String & s, ReadBuffer & buf)
template <typename Vector>
void readQuotedFieldInto(Vector & s, ReadBuffer & buf)
{
s.clear();
if (buf.eof())
return;
@ -1386,11 +1395,11 @@ void readQuotedField(String & s, ReadBuffer & buf)
s.push_back('\'');
}
else if (*buf.position() == '[')
readQuotedFieldInBrackets<'[', ']'>(s, buf);
readQuotedFieldInBracketsInto<'[', ']'>(s, buf);
else if (*buf.position() == '(')
readQuotedFieldInBrackets<'(', ')'>(s, buf);
readQuotedFieldInBracketsInto<'(', ')'>(s, buf);
else if (*buf.position() == '{')
readQuotedFieldInBrackets<'{', '}'>(s, buf);
readQuotedFieldInBracketsInto<'{', '}'>(s, buf);
else if (checkCharCaseInsensitive('n', buf))
{
/// NULL or NaN
@ -1423,15 +1432,23 @@ void readQuotedField(String & s, ReadBuffer & buf)
Float64 tmp;
readFloatText(tmp, in);
};
readParsedValueIntoString(s, buf, parse_func);
readParsedValueInto(s, buf, parse_func);
}
}
template void readQuotedFieldInto<NullOutput>(NullOutput & s, ReadBuffer & buf);
void readQuotedField(String & s, ReadBuffer & buf)
{
s.clear();
readQuotedFieldInto(s, buf);
}
void readJSONField(String & s, ReadBuffer & buf)
{
s.clear();
auto parse_func = [](ReadBuffer & in) { skipJSONField(in, "json_field"); };
readParsedValueIntoString(s, buf, parse_func);
readParsedValueInto(s, buf, parse_func);
}
}

View File

@ -618,6 +618,8 @@ void readStringUntilNewlineInto(Vector & s, ReadBuffer & buf);
struct NullOutput
{
void append(const char *, size_t) {}
void append(const char *) {}
void append(const char *, const char *) {}
void push_back(char) {} /// NOLINT
};
@ -931,12 +933,29 @@ inline ReturnType readDateTimeTextImpl(DateTime64 & datetime64, UInt32 scale, Re
++buf.position();
/// Keep sign of fractional part the same with whole part if datetime64 is negative
/// 1965-12-12 12:12:12.123 => whole = -127914468, fraction = 123(sign>0) -> new whole = -127914467, new fraction = 877(sign<0)
/// Case1:
/// 1965-12-12 12:12:12.123
/// => whole = -127914468, fractional = 123(coefficient>0)
/// => new whole = -127914467, new fractional = 877(coefficient<0)
///
/// Case2:
/// 1969-12-31 23:59:59.123
/// => whole = -1, fractional = 123(coefficient>0)
/// => new whole = 0, new fractional = -877(coefficient>0)
if (components.whole < 0 && components.fractional != 0)
{
const auto scale_multiplier = DecimalUtils::scaleMultiplier<DateTime64::NativeType>(scale);
++components.whole;
components.fractional = scale_multiplier - components.fractional;
if (components.whole)
{
/// whole keep the sign, fractional should be non-negative
components.fractional = scale_multiplier - components.fractional;
}
else
{
/// when whole is zero, fractional should keep the sign
components.fractional = components.fractional - scale_multiplier;
}
}
}
/// 9908870400 is time_t value for 2184-01-01 UTC (a bit over the last year supported by DateTime64)
@ -1425,6 +1444,9 @@ struct PcgDeserializer
}
};
template <typename Vector>
void readQuotedFieldInto(Vector & s, ReadBuffer & buf);
void readQuotedField(String & s, ReadBuffer & buf);
void readJSONField(String & s, ReadBuffer & buf);

View File

@ -805,11 +805,21 @@ inline void writeDateTimeText(DateTime64 datetime64, UInt32 scale, WriteBuffer &
scale = scale > MaxScale ? MaxScale : scale;
auto components = DecimalUtils::split(datetime64, scale);
/// -127914467.877 => whole = -127914467, fraction = 877 => new whole = -127914468(1965-12-12 12:12:12), new fraction = 123(.123) => 1965-12-12 12:12:12.123
if (components.whole < 0 && components.fractional != 0)
/// Case1:
/// -127914467.877
/// => whole = -127914467, fraction = 877(After DecimalUtils::split)
/// => new whole = -127914468(1965-12-12 12:12:12), new fraction = 1000 - 877 = 123(.123)
/// => 1965-12-12 12:12:12.123
///
/// Case2:
/// -0.877
/// => whole = 0, fractional = -877(After DecimalUtils::split)
/// => whole = -1(1969-12-31 23:59:59), fractional = 1000 + (-877) = 123(.123)
using T = typename DateTime64::NativeType;
if (datetime64.value < 0 && components.fractional)
{
components.fractional = DecimalUtils::scaleMultiplier<T>(scale) + (components.whole ? T(-1) : T(1)) * components.fractional;
--components.whole;
components.fractional = DecimalUtils::scaleMultiplier<DateTime64::NativeType>(scale) - components.fractional;
}
writeDateTimeText<date_delimeter, time_delimeter, between_date_time_delimiter>(LocalDateTime(components.whole, time_zone), buf);
@ -989,7 +999,12 @@ void writeText(Decimal<T> x, UInt32 scale, WriteBuffer & ostr, bool trailing_zer
{
part = DecimalUtils::getFractionalPart(x, scale);
if (part || trailing_zeros)
{
if (part < 0)
part *= T(-1);
writeDecimalFractional(part, scale, ostr, trailing_zeros);
}
}
}

View File

@ -997,8 +997,8 @@ void ActionsDAG::addMaterializingOutputActions()
const ActionsDAG::Node & ActionsDAG::materializeNode(const Node & node)
{
FunctionOverloadResolverPtr func_builder_materialize = std::make_unique<FunctionToOverloadResolverAdaptor>(
std::make_shared<FunctionMaterialize>());
FunctionOverloadResolverPtr func_builder_materialize
= std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionMaterialize>());
const auto & name = node.result_name;
const auto * func = &addFunction(func_builder_materialize, {&node}, {});
@ -1102,7 +1102,8 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
const auto * left_arg = dst_node;
FunctionCastBase::Diagnostic diagnostic = {dst_node->result_name, res_elem.name};
FunctionOverloadResolverPtr func_builder_cast = CastInternalOverloadResolver<CastType::nonAccurate>::createImpl(std::move(diagnostic));
FunctionOverloadResolverPtr func_builder_cast
= CastInternalOverloadResolver<CastType::nonAccurate>::createImpl(std::move(diagnostic));
NodeRawConstPtrs children = { left_arg, right_arg };
dst_node = &actions_dag->addFunction(func_builder_cast, std::move(children), {});
@ -1150,7 +1151,8 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
ActionsDAGPtr ActionsDAG::makeAddingColumnActions(ColumnWithTypeAndName column)
{
auto adding_column_action = std::make_shared<ActionsDAG>();
FunctionOverloadResolverPtr func_builder_materialize = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionMaterialize>());
FunctionOverloadResolverPtr func_builder_materialize
= std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionMaterialize>());
auto column_name = column.name;
const auto * column_node = &adding_column_action->addColumn(std::move(column));
@ -1612,7 +1614,7 @@ ConjunctionNodes getConjunctionNodes(ActionsDAG::Node * predicate, std::unordere
std::stack<Frame> stack;
std::unordered_set<const ActionsDAG::Node *> visited_nodes;
stack.push(Frame{.node = predicate});
stack.push({.node = predicate});
visited_nodes.insert(predicate);
while (!stack.empty())
{
@ -1798,9 +1800,8 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
{
Node * predicate = const_cast<Node *>(tryFindInIndex(filter_name));
if (!predicate)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Index for ActionsDAG does not contain filter column name {}. DAG:\n{}",
filter_name, dumpDAG());
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Index for ActionsDAG does not contain filter column name {}. DAG:\n{}", filter_name, dumpDAG());
/// If condition is constant let's do nothing.
/// It means there is nothing to push down or optimization was already applied.
@ -1870,8 +1871,6 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
index_node = new_predicate;
}
}
removeUnusedActions(false);
}
else
{
@ -1926,10 +1925,9 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
predicate->function_base = predicate->function_builder->build(arguments);
predicate->function = predicate->function_base->prepare(arguments);
}
removeUnusedActions(false);
}
removeUnusedActions(false);
return actions;
}

View File

@ -287,7 +287,7 @@ void DDLWorker::scheduleTasks(bool reinitialized)
Strings queue_nodes = zookeeper->getChildren(queue_dir, &queue_node_stat, queue_updated_event);
size_t size_before_filtering = queue_nodes.size();
filterAndSortQueueNodes(queue_nodes);
/// The following message is too verbose, but it can be useful too debug mysterious test failures in CI
/// The following message is too verbose, but it can be useful to debug mysterious test failures in CI
LOG_TRACE(log, "scheduleTasks: initialized={}, size_before_filtering={}, queue_size={}, "
"entries={}..{}, "
"first_failed_task_name={}, current_tasks_size={}, "

View File

@ -1041,7 +1041,7 @@ void ExpressionActionsChain::ArrayJoinStep::finalize(const NameSet & required_ou
ExpressionActionsChain::JoinStep::JoinStep(
std::shared_ptr<TableJoin> analyzed_join_,
JoinPtr join_,
ColumnsWithTypeAndName required_columns_)
const ColumnsWithTypeAndName & required_columns_)
: Step({})
, analyzed_join(std::move(analyzed_join_))
, join(std::move(join_))
@ -1049,11 +1049,8 @@ ExpressionActionsChain::JoinStep::JoinStep(
for (const auto & column : required_columns_)
required_columns.emplace_back(column.name, column.type);
NamesAndTypesList result_names_and_types = required_columns;
analyzed_join->addJoinedColumnsAndCorrectTypes(result_names_and_types, true);
for (const auto & [name, type] : result_names_and_types)
/// `column` is `nullptr` because we don't care on constness here, it may be changed in join
result_columns.emplace_back(nullptr, type, name);
result_columns = required_columns_;
analyzed_join->addJoinedColumnsAndCorrectTypes(result_columns, true);
}
void ExpressionActionsChain::JoinStep::finalize(const NameSet & required_output_)

View File

@ -233,7 +233,7 @@ struct ExpressionActionsChain : WithContext
NamesAndTypesList required_columns;
ColumnsWithTypeAndName result_columns;
JoinStep(std::shared_ptr<TableJoin> analyzed_join_, JoinPtr join_, ColumnsWithTypeAndName required_columns_);
JoinStep(std::shared_ptr<TableJoin> analyzed_join_, JoinPtr join_, const ColumnsWithTypeAndName & required_columns_);
NamesAndTypesList getRequiredColumns() const override { return required_columns; }
ColumnsWithTypeAndName getResultColumns() const override { return result_columns; }
void finalize(const NameSet & required_output_) override;

View File

@ -10,6 +10,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INVALID_TRANSACTION;
extern const int UNKNOWN_STATUS_OF_TRANSACTION;
}
BlockIO InterpreterTransactionControlQuery::execute()
@ -55,7 +56,42 @@ BlockIO InterpreterTransactionControlQuery::executeCommit(ContextMutablePtr sess
if (txn->getState() != MergeTreeTransaction::RUNNING)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction is not in RUNNING state");
TransactionLog::instance().commitTransaction(txn);
TransactionsWaitCSNMode mode = query_context->getSettingsRef().wait_changes_become_visible_after_commit_mode;
CSN csn;
try
{
csn = TransactionLog::instance().commitTransaction(txn, /* throw_on_unknown_status */ mode != TransactionsWaitCSNMode::WAIT_UNKNOWN);
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_STATUS_OF_TRANSACTION)
{
/// Detach transaction from current context if connection was lost and its status is unknown
session_context->setCurrentTransaction(NO_TRANSACTION_PTR);
}
throw;
}
if (csn == Tx::CommittingCSN)
{
chassert(mode == TransactionsWaitCSNMode::WAIT_UNKNOWN);
/// Try to wait for connection to be restored and its status to be loaded.
/// It's useful for testing. It allows to enable fault injection (after commit) without breaking tests.
txn->waitStateChange(Tx::CommittingCSN);
if (txn->getState() == MergeTreeTransaction::ROLLED_BACK)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction {} was rolled back", txn->tid);
if (txn->getState() != MergeTreeTransaction::COMMITTED)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction {} has invalid state {}", txn->tid, txn->getState());
csn = txn->getCSN();
}
/// Wait for committed changes to become actually visible, so the next transaction in this session will see the changes
if (mode != TransactionsWaitCSNMode::ASYNC)
TransactionLog::instance().waitForCSNLoaded(csn);
session_context->setCurrentTransaction(NO_TRANSACTION_PTR);
return {};
}
@ -67,6 +103,8 @@ BlockIO InterpreterTransactionControlQuery::executeRollback(ContextMutablePtr se
throw Exception(ErrorCodes::INVALID_TRANSACTION, "There is no current transaction");
if (txn->getState() == MergeTreeTransaction::COMMITTED)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction is in COMMITTED state");
if (txn->getState() == MergeTreeTransaction::COMMITTING)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction is in COMMITTING state");
if (txn->getState() == MergeTreeTransaction::RUNNING)
TransactionLog::instance().rollbackTransaction(txn);

View File

@ -22,7 +22,7 @@ public:
private:
BlockIO executeBegin(ContextMutablePtr session_context);
static BlockIO executeCommit(ContextMutablePtr session_context);
BlockIO executeCommit(ContextMutablePtr session_context);
static BlockIO executeRollback(ContextMutablePtr session_context);
static BlockIO executeSetSnapshot(ContextMutablePtr session_context, UInt64 snapshot);

View File

@ -66,7 +66,7 @@ void JoinSwitcher::switchJoin()
for (const auto & sample_column : right_sample_block)
{
positions.emplace_back(tmp_block.getPositionByName(sample_column.name));
is_nullable.emplace_back(sample_column.type->isNullable());
is_nullable.emplace_back(JoinCommon::isNullable(sample_column.type));
}
}

View File

@ -38,13 +38,26 @@ void MergeTreeTransaction::setSnapshot(CSN new_snapshot)
MergeTreeTransaction::State MergeTreeTransaction::getState() const
{
CSN c = csn.load();
if (c == Tx::UnknownCSN || c == Tx::CommittingCSN)
if (c == Tx::UnknownCSN)
return RUNNING;
if (c == Tx::CommittingCSN)
return COMMITTING;
if (c == Tx::RolledBackCSN)
return ROLLED_BACK;
return COMMITTED;
}
bool MergeTreeTransaction::waitStateChange(CSN current_state_csn) const
{
CSN current_value = current_state_csn;
while (current_value == current_state_csn && !TransactionLog::instance().isShuttingDown())
{
csn.wait(current_value);
current_value = csn.load();
}
return current_value != current_state_csn;
}
void MergeTreeTransaction::checkIsNotCancelled() const
{
CSN c = csn.load();
@ -158,7 +171,7 @@ void MergeTreeTransaction::addMutation(const StoragePtr & table, const String &
bool MergeTreeTransaction::isReadOnly() const
{
std::lock_guard lock{mutex};
assert((creating_parts.empty() && removing_parts.empty() && mutations.empty()) == storages.empty());
chassert((creating_parts.empty() && removing_parts.empty() && mutations.empty()) == storages.empty());
return storages.empty();
}
@ -204,7 +217,7 @@ void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept
/// and we will be able to remove old entries from transaction log in ZK.
/// It's not a problem if server crash before CSN is written, because we already have TID in data part and entry in the log.
[[maybe_unused]] CSN prev_value = csn.exchange(assigned_csn);
assert(prev_value == Tx::CommittingCSN);
chassert(prev_value == Tx::CommittingCSN);
for (const auto & part : creating_parts)
{
part->version.creation_csn.store(csn);
@ -321,7 +334,7 @@ String MergeTreeTransaction::dumpDescription() const
{
String info = fmt::format("{} (created by {}, {})", part->name, part->version.getCreationTID(), part->version.creation_csn);
std::get<1>(storage_to_changes[&(part->storage)]).push_back(std::move(info));
assert(!part->version.creation_csn || part->version.creation_csn <= snapshot);
chassert(!part->version.creation_csn || part->version.creation_csn <= snapshot);
}
for (const auto & mutation : mutations)

View File

@ -26,6 +26,7 @@ public:
enum State
{
RUNNING,
COMMITTING,
COMMITTED,
ROLLED_BACK,
};
@ -55,6 +56,11 @@ public:
Float64 elapsedSeconds() const { return elapsed.elapsedSeconds(); }
/// Waits for transaction state to become not equal to the state corresponding to current_state_csn
bool waitStateChange(CSN current_state_csn) const;
CSN getCSN() const { return csn; }
private:
scope_guard beforeCommit();
void afterCommit(CSN assigned_csn) noexcept;

View File

@ -53,7 +53,7 @@ void MergeTreeTransactionHolder::onDestroy() noexcept
{
try
{
TransactionLog::instance().commitTransaction(txn);
TransactionLog::instance().commitTransaction(txn, /* throw_on_unknown_status */ false);
return;
}
catch (...)

View File

@ -27,6 +27,7 @@
#include <Common/logger_useful.h>
#include <algorithm>
#include <string>
#include <type_traits>
#include <vector>
@ -328,6 +329,21 @@ NamesAndTypesList TableJoin::correctedColumnsAddedByJoin() const
void TableJoin::addJoinedColumnsAndCorrectTypes(NamesAndTypesList & left_columns, bool correct_nullability)
{
addJoinedColumnsAndCorrectTypesImpl(left_columns, correct_nullability);
}
void TableJoin::addJoinedColumnsAndCorrectTypes(ColumnsWithTypeAndName & left_columns, bool correct_nullability)
{
addJoinedColumnsAndCorrectTypesImpl(left_columns, correct_nullability);
}
template <typename TColumns>
void TableJoin::addJoinedColumnsAndCorrectTypesImpl(TColumns & left_columns, bool correct_nullability)
{
static_assert(std::is_same_v<typename TColumns::value_type, ColumnWithTypeAndName> ||
std::is_same_v<typename TColumns::value_type, NameAndTypePair>);
constexpr bool has_column = std::is_same_v<typename TColumns::value_type, ColumnWithTypeAndName>;
for (auto & col : left_columns)
{
if (hasUsing())
@ -342,15 +358,26 @@ void TableJoin::addJoinedColumnsAndCorrectTypes(NamesAndTypesList & left_columns
inferJoinKeyCommonType(left_columns, columns_from_joined_table, !isSpecialStorage());
if (auto it = left_type_map.find(col.name); it != left_type_map.end())
{
col.type = it->second;
if constexpr (has_column)
col.column = nullptr;
}
}
if (correct_nullability && leftBecomeNullable(col.type))
{
col.type = JoinCommon::convertTypeToNullable(col.type);
if constexpr (has_column)
col.column = nullptr;
}
}
for (const auto & col : correctedColumnsAddedByJoin())
left_columns.emplace_back(col.name, col.type);
if constexpr (has_column)
left_columns.emplace_back(nullptr, col.type, col.name);
else
left_columns.emplace_back(col.name, col.type);
}
bool TableJoin::sameStrictnessAndKind(ASTTableJoin::Strictness strictness_, ASTTableJoin::Kind kind_) const

View File

@ -254,7 +254,11 @@ public:
bool rightBecomeNullable(const DataTypePtr & column_type) const;
void addJoinedColumn(const NameAndTypePair & joined_column);
template <typename TColumns>
void addJoinedColumnsAndCorrectTypesImpl(TColumns & left_columns, bool correct_nullability);
void addJoinedColumnsAndCorrectTypes(NamesAndTypesList & left_columns, bool correct_nullability);
void addJoinedColumnsAndCorrectTypes(ColumnsWithTypeAndName & left_columns, bool correct_nullability);
/// Calculate converting actions, rename key columns in required
/// For `USING` join we will convert key columns inplace and affect into types in the result table

View File

@ -21,6 +21,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_STATUS_OF_TRANSACTION;
}
static void tryWriteEventToSystemLog(Poco::Logger * log, ContextPtr context,
@ -52,6 +53,8 @@ TransactionLog::TransactionLog()
zookeeper_path = global_context->getConfigRef().getString("transaction_log.zookeeper_path", "/clickhouse/txn");
zookeeper_path_log = zookeeper_path + "/log";
fault_probability_before_commit = global_context->getConfigRef().getDouble("transaction_log.fault_probability_before_commit", 0);
fault_probability_after_commit = global_context->getConfigRef().getDouble("transaction_log.fault_probability_after_commit", 0);
loadLogFromZooKeeper();
@ -145,24 +148,29 @@ void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_ite
NOEXCEPT_SCOPE;
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
std::lock_guard lock{mutex};
for (const auto & entry : loaded)
{
if (entry.first == Tx::EmptyTID.getHash())
continue;
std::lock_guard lock{mutex};
for (const auto & entry : loaded)
{
if (entry.first == Tx::EmptyTID.getHash())
continue;
tid_to_csn.emplace(entry.first, entry.second);
tid_to_csn.emplace(entry.first, entry.second);
}
last_loaded_entry = last_entry;
}
{
std::lock_guard lock{running_list_mutex};
latest_snapshot = loaded.back().second.csn;
local_tid_counter = Tx::MaxReservedLocalTID;
}
last_loaded_entry = last_entry;
latest_snapshot = loaded.back().second.csn;
local_tid_counter = Tx::MaxReservedLocalTID;
}
void TransactionLog::loadLogFromZooKeeper()
{
assert(!zookeeper);
assert(tid_to_csn.empty());
assert(last_loaded_entry.empty());
chassert(!zookeeper);
chassert(tid_to_csn.empty());
chassert(last_loaded_entry.empty());
zookeeper = global_context->getZooKeeper();
/// We do not write local_tid_counter to disk or zk and maintain it only in memory.
@ -172,7 +180,7 @@ void TransactionLog::loadLogFromZooKeeper()
if (code != Coordination::Error::ZOK)
{
/// Log probably does not exist, create it
assert(code == Coordination::Error::ZNONODE);
chassert(code == Coordination::Error::ZNONODE);
zookeeper->createAncestors(zookeeper_path_log);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/tail_ptr", serializeCSN(Tx::MaxReservedCSN), zkutil::CreateMode::Persistent));
@ -192,11 +200,11 @@ void TransactionLog::loadLogFromZooKeeper()
/// 2. simplify log rotation
/// 3. support 64-bit CSNs on top of Apache ZooKeeper (it uses Int32 for sequential numbers)
Strings entries_list = zookeeper->getChildren(zookeeper_path_log, nullptr, log_updated_event);
assert(!entries_list.empty());
chassert(!entries_list.empty());
std::sort(entries_list.begin(), entries_list.end());
loadEntries(entries_list.begin(), entries_list.end());
assert(!last_loaded_entry.empty());
assert(latest_snapshot == deserializeCSN(last_loaded_entry));
chassert(!last_loaded_entry.empty());
chassert(latest_snapshot == deserializeCSN(last_loaded_entry));
local_tid_counter = Tx::MaxReservedLocalTID;
tail_ptr = deserializeCSN(zookeeper->get(zookeeper_path + "/tail_ptr"));
@ -208,19 +216,31 @@ void TransactionLog::runUpdatingThread()
{
try
{
log_updated_event->wait();
/// Do not wait if we have some transactions to finalize
if (!unknown_state_list_loaded.empty())
log_updated_event->wait();
if (stop_flag.load())
return;
if (getZooKeeper()->expired())
bool connection_loss = getZooKeeper()->expired();
if (connection_loss)
{
auto new_zookeeper = global_context->getZooKeeper();
std::lock_guard lock{mutex};
zookeeper = new_zookeeper;
{
std::lock_guard lock{mutex};
zookeeper = new_zookeeper;
}
/// It's possible that we connected to different [Zoo]Keeper instance
/// so we may read a bit stale state. Run some writing request before loading log entries
/// to make that instance up-to-date.
zookeeper->set(zookeeper_path_log, "");
}
loadNewEntries();
removeOldEntries();
tryFinalizeUnknownStateTransactions();
}
catch (const Coordination::Exception &)
{
@ -241,12 +261,12 @@ void TransactionLog::runUpdatingThread()
void TransactionLog::loadNewEntries()
{
Strings entries_list = zookeeper->getChildren(zookeeper_path_log, nullptr, log_updated_event);
assert(!entries_list.empty());
chassert(!entries_list.empty());
std::sort(entries_list.begin(), entries_list.end());
auto it = std::upper_bound(entries_list.begin(), entries_list.end(), last_loaded_entry);
loadEntries(it, entries_list.end());
assert(last_loaded_entry == entries_list.back());
assert(latest_snapshot == deserializeCSN(last_loaded_entry));
chassert(last_loaded_entry == entries_list.back());
chassert(latest_snapshot == deserializeCSN(last_loaded_entry));
latest_snapshot.notify_all();
}
@ -309,6 +329,46 @@ void TransactionLog::removeOldEntries()
tid_to_csn.erase(tid_hash);
}
void TransactionLog::tryFinalizeUnknownStateTransactions()
{
/// We just recovered connection to [Zoo]Keeper.
/// Check if transactions in unknown state were actually committed or not and finalize or rollback them.
UnknownStateList list;
{
/// We must be sure that the corresponding CSN entry is loaded from ZK.
/// Otherwise we may accidentally rollback committed transaction in case of race condition like this:
/// - runUpdatingThread: loaded some entries, ready to call tryFinalizeUnknownStateTransactions()
/// - commitTransaction: creates CSN entry in the log (txn is committed)
/// - [session expires]
/// - commitTransaction: catches Coordination::Exception (maybe due to fault injection), appends txn to unknown_state_list
/// - runUpdatingThread: calls tryFinalizeUnknownStateTransactions(), fails to find CSN for this txn, rolls it back
/// So all CSN entries that might exist at the moment of appending txn to unknown_state_list
/// must be loaded from ZK before we start finalize that txn.
/// That's why we use two lists here:
/// 1. At first we put txn into unknown_state_list
/// 2. We move it to unknown_state_list_loaded when runUpdatingThread done at least one iteration
/// 3. Then we can safely finalize txns from unknown_state_list_loaded, because all required entries are loaded
std::lock_guard lock{running_list_mutex};
std::swap(list, unknown_state_list);
std::swap(list, unknown_state_list_loaded);
}
for (auto & [txn, state_guard] : list)
{
/// CSNs must be already loaded, only need to check if the corresponding mapping exists.
if (auto csn = getCSN(txn->tid))
{
finalizeCommittedTransaction(txn, csn, state_guard);
}
else
{
assertTIDIsNotOutdated(txn->tid);
state_guard = {};
rollbackTransaction(txn->shared_from_this());
}
}
}
CSN TransactionLog::getLatestSnapshot() const
{
return latest_snapshot.load();
@ -334,58 +394,117 @@ MergeTreeTransactionPtr TransactionLog::beginTransaction()
return txn;
}
CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn)
CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn, bool throw_on_unknown_status)
{
/// Some precommit checks, may throw
auto committing_lock = txn->beforeCommit();
auto state_guard = txn->beforeCommit();
CSN new_csn;
CSN allocated_csn = Tx::UnknownCSN;
if (txn->isReadOnly())
{
/// Don't need to allocate CSN in ZK for readonly transactions, it's safe to use snapshot/start_csn as "commit" timestamp
LOG_TEST(log, "Closing readonly transaction {}", txn->tid);
new_csn = txn->snapshot;
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, new_csn);
}
else
{
LOG_TEST(log, "Committing transaction {}", txn->dumpDescription());
/// TODO handle connection loss
/// TODO support batching
auto current_zookeeper = getZooKeeper();
String path_created = current_zookeeper->create(zookeeper_path_log + "/csn-", serializeTID(txn->tid), zkutil::CreateMode::PersistentSequential); /// Commit point
NOEXCEPT_SCOPE;
String csn_path_created;
try
{
if (unlikely(fault_probability_before_commit))
{
std::bernoulli_distribution fault(fault_probability_before_commit);
if (fault(thread_local_rng))
throw Coordination::Exception("Fault injected (before commit)", Coordination::Error::ZCONNECTIONLOSS);
}
/// Commit point
csn_path_created = current_zookeeper->create(zookeeper_path_log + "/csn-", serializeTID(txn->tid), zkutil::CreateMode::PersistentSequential);
if (unlikely(fault_probability_after_commit))
{
std::bernoulli_distribution fault(fault_probability_after_commit);
if (fault(thread_local_rng))
throw Coordination::Exception("Fault injected (after commit)", Coordination::Error::ZCONNECTIONLOSS);
}
}
catch (const Coordination::Exception & e)
{
if (!Coordination::isHardwareError(e.code))
throw;
/// We don't know if transaction has been actually committed or not.
/// The only thing we can do is to postpone its finalization.
{
std::lock_guard lock{running_list_mutex};
unknown_state_list.emplace_back(txn.get(), std::move(state_guard));
}
log_updated_event->set();
if (throw_on_unknown_status)
throw Exception(ErrorCodes::UNKNOWN_STATUS_OF_TRANSACTION,
"Connection lost on attempt to commit transaction {}, will finalize it later: {}",
txn->tid, e.message());
LOG_INFO(log, "Connection lost on attempt to commit transaction {}, will finalize it later: {}", txn->tid, e.message());
return Tx::CommittingCSN;
}
/// Do not allow exceptions between commit point and the and of transaction finalization
/// (otherwise it may stuck in COMMITTING state holding snapshot).
NOEXCEPT_SCOPE;
/// FIXME Transactions: Sequential node numbers in ZooKeeper are Int32, but 31 bit is not enough for production use
/// (overflow is possible in a several weeks/months of active usage)
new_csn = deserializeCSN(path_created.substr(zookeeper_path_log.size() + 1));
allocated_csn = deserializeCSN(csn_path_created.substr(zookeeper_path_log.size() + 1));
}
LOG_INFO(log, "Transaction {} committed with CSN={}", txn->tid, new_csn);
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, new_csn);
return finalizeCommittedTransaction(txn.get(), allocated_csn, state_guard);
}
/// Wait for committed changes to become actually visible, so the next transaction in this session will see the changes
/// TODO it's optional, add a setting for this
auto current_latest_snapshot = latest_snapshot.load();
while (current_latest_snapshot < new_csn && !stop_flag)
{
latest_snapshot.wait(current_latest_snapshot);
current_latest_snapshot = latest_snapshot.load();
}
CSN TransactionLog::finalizeCommittedTransaction(MergeTreeTransaction * txn, CSN allocated_csn, scope_guard & state_guard) noexcept
{
chassert(!allocated_csn == txn->isReadOnly());
if (allocated_csn)
{
LOG_INFO(log, "Transaction {} committed with CSN={}", txn->tid, allocated_csn);
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, allocated_csn);
}
else
{
/// Transaction was readonly
allocated_csn = txn->snapshot;
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, allocated_csn);
}
/// Write allocated CSN, so we will be able to cleanup log in ZK. This method is noexcept.
txn->afterCommit(new_csn);
txn->afterCommit(allocated_csn);
state_guard = {};
{
/// Finally we can remove transaction from the list and release the snapshot
std::lock_guard lock{running_list_mutex};
snapshots_in_use.erase(txn->snapshot_in_use_it);
bool removed = running_list.erase(txn->tid.getHash());
if (!removed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} doesn't exist", txn->tid.getHash(), txn->tid);
snapshots_in_use.erase(txn->snapshot_in_use_it);
{
LOG_ERROR(log , "I's a bug: TID {} {} doesn't exist", txn->tid.getHash(), txn->tid);
abort();
}
}
return new_csn;
return allocated_csn;
}
bool TransactionLog::waitForCSNLoaded(CSN csn) const
{
auto current_latest_snapshot = latest_snapshot.load();
while (current_latest_snapshot < csn && !stop_flag)
{
latest_snapshot.wait(current_latest_snapshot);
current_latest_snapshot = latest_snapshot.load();
}
return csn <= current_latest_snapshot;
}
void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) noexcept
@ -395,8 +514,8 @@ void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) no
if (!txn->rollback())
{
/// Transaction was cancelled concurrently, it's already rolled back.
assert(txn->csn == Tx::RolledBackCSN);
/// Transaction was cancelled or committed concurrently
chassert(txn->csn != Tx::UnknownCSN);
return;
}
@ -438,8 +557,8 @@ CSN TransactionLog::getCSN(const TIDHash & tid)
CSN TransactionLog::getCSNImpl(const TIDHash & tid_hash) const
{
assert(tid_hash);
assert(tid_hash != Tx::EmptyTID.getHash());
chassert(tid_hash);
chassert(tid_hash != Tx::EmptyTID.getHash());
std::lock_guard lock{mutex};
auto it = tid_to_csn.find(tid_hash);
@ -467,6 +586,8 @@ CSN TransactionLog::getOldestSnapshot() const
std::lock_guard lock{running_list_mutex};
if (snapshots_in_use.empty())
return getLatestSnapshot();
chassert(running_list.size() == snapshots_in_use.size());
chassert(snapshots_in_use.size() < 2 || snapshots_in_use.front() <= *++snapshots_in_use.begin());
return snapshots_in_use.front();
}

View File

@ -97,7 +97,8 @@ public:
/// Tries to commit transaction. Returns Commit Sequence Number.
/// Throw if transaction was concurrently killed or if some precommit check failed.
/// May throw if ZK connection is lost. Transaction status is unknown in this case.
CSN commitTransaction(const MergeTreeTransactionPtr & txn);
/// Returns CommittingCSN if throw_on_unknown_status is false and connection was lost.
CSN commitTransaction(const MergeTreeTransactionPtr & txn, bool throw_on_unknown_status);
/// Releases locks that that were acquired by transaction, releases snapshot, removes transaction from the list of active transactions.
/// Normally it should not throw, but if it does for some reason (global memory limit exceeded, disk failure, etc)
@ -119,6 +120,12 @@ public:
/// Returns copy of list of running transactions.
TransactionsList getTransactionsList() const;
/// Waits for provided CSN (and all previous ones) to be loaded from the log.
/// Returns false if waiting was interrupted (e.g. by shutdown)
bool waitForCSNLoaded(CSN csn) const;
bool isShuttingDown() const { return stop_flag.load(); }
private:
void loadLogFromZooKeeper();
void runUpdatingThread();
@ -127,6 +134,10 @@ private:
void loadNewEntries();
void removeOldEntries();
CSN finalizeCommittedTransaction(MergeTreeTransaction * txn, CSN allocated_csn, scope_guard & state_guard) noexcept;
void tryFinalizeUnknownStateTransactions();
static UInt64 deserializeCSN(const String & csn_node_name);
static String serializeCSN(CSN csn);
static TransactionID deserializeTID(const String & csn_node_content);
@ -159,6 +170,10 @@ private:
mutable std::mutex running_list_mutex;
/// Transactions that are currently processed
TransactionsList running_list;
/// If we lost connection on attempt to create csn- node then we don't know transaction's state.
using UnknownStateList = std::vector<std::pair<MergeTreeTransaction *, scope_guard>>;
UnknownStateList unknown_state_list;
UnknownStateList unknown_state_list_loaded;
/// Ordered list of snapshots that are currently used by some transactions. Needed for background cleanup.
std::list<CSN> snapshots_in_use;
@ -175,6 +190,9 @@ private:
std::atomic_bool stop_flag = false;
ThreadFromGlobalPool updating_thread;
Float64 fault_probability_before_commit = 0;
Float64 fault_probability_after_commit = 0;
};
template <typename Derived>

View File

@ -88,8 +88,8 @@ void VersionMetadata::lockRemovalTID(const TransactionID & tid, const Transactio
bool VersionMetadata::tryLockRemovalTID(const TransactionID & tid, const TransactionInfoContext & context, TIDHash * locked_by_id)
{
assert(!tid.isEmpty());
assert(!creation_tid.isEmpty());
chassert(!tid.isEmpty());
chassert(!creation_tid.isEmpty());
TIDHash removal_lock_value = tid.getHash();
TIDHash expected_removal_lock_value = 0;
bool locked = removal_tid_lock.compare_exchange_strong(expected_removal_lock_value, removal_lock_value);
@ -115,7 +115,7 @@ bool VersionMetadata::tryLockRemovalTID(const TransactionID & tid, const Transac
void VersionMetadata::unlockRemovalTID(const TransactionID & tid, const TransactionInfoContext & context)
{
LOG_TEST(log, "Unlocking removal_tid by {}, table: {}, part: {}", tid, context.table.getNameForLogs(), context.part_name);
assert(!tid.isEmpty());
chassert(!tid.isEmpty());
TIDHash removal_lock_value = tid.getHash();
TIDHash locked_by = removal_tid_lock.load();
@ -145,7 +145,7 @@ bool VersionMetadata::isRemovalTIDLocked() const
void VersionMetadata::setCreationTID(const TransactionID & tid, TransactionInfoContext * context)
{
/// NOTE ReplicatedMergeTreeSink may add one part multiple times
assert(creation_tid.isEmpty() || creation_tid == tid);
chassert(creation_tid.isEmpty() || creation_tid == tid);
creation_tid = tid;
if (context)
tryWriteEventToSystemLog(log, TransactionsInfoLogElement::ADD_PART, tid, *context);
@ -158,7 +158,7 @@ bool VersionMetadata::isVisible(const MergeTreeTransaction & txn)
bool VersionMetadata::isVisible(CSN snapshot_version, TransactionID current_tid)
{
assert(!creation_tid.isEmpty());
chassert(!creation_tid.isEmpty());
CSN creation = creation_csn.load(std::memory_order_relaxed);
TIDHash removal_lock = removal_tid_lock.load(std::memory_order_relaxed);
CSN removal = removal_csn.load(std::memory_order_relaxed);
@ -166,10 +166,10 @@ bool VersionMetadata::isVisible(CSN snapshot_version, TransactionID current_tid)
[[maybe_unused]] bool had_creation_csn = creation;
[[maybe_unused]] bool had_removal_tid = removal_lock;
[[maybe_unused]] bool had_removal_csn = removal;
assert(!had_removal_csn || had_removal_tid);
assert(!had_removal_csn || had_creation_csn);
assert(creation == Tx::UnknownCSN || creation == Tx::PrehistoricCSN || Tx::MaxReservedCSN < creation);
assert(removal == Tx::UnknownCSN || removal == Tx::PrehistoricCSN || Tx::MaxReservedCSN < removal);
chassert(!had_removal_csn || had_removal_tid);
chassert(!had_removal_csn || had_creation_csn);
chassert(creation == Tx::UnknownCSN || creation == Tx::PrehistoricCSN || Tx::MaxReservedCSN < creation);
chassert(removal == Tx::UnknownCSN || removal == Tx::PrehistoricCSN || Tx::MaxReservedCSN < removal);
/// Special snapshot for introspection purposes
if (unlikely(snapshot_version == Tx::EverythingVisibleCSN))
@ -204,8 +204,8 @@ bool VersionMetadata::isVisible(CSN snapshot_version, TransactionID current_tid)
/// Data part has creation_tid/removal_tid, but does not have creation_csn/removal_csn.
/// It means that some transaction is creating/removing the part right now or has done it recently
/// and we don't know if it was already committed or not.
assert(!had_creation_csn || (had_removal_tid && !had_removal_csn));
assert(current_tid.isEmpty() || (creation_tid != current_tid && removal_lock != current_tid.getHash()));
chassert(!had_creation_csn || (had_removal_tid && !had_removal_csn));
chassert(current_tid.isEmpty() || (creation_tid != current_tid && removal_lock != current_tid.getHash()));
/// Before doing CSN lookup, let's check some extra conditions.
/// If snapshot_version <= some_tid.start_csn, then changes of the transaction with some_tid
@ -347,8 +347,8 @@ void VersionMetadata::write(WriteBuffer & buf) const
if (removal_tid_lock)
{
assert(!removal_tid.isEmpty());
assert(removal_tid.getHash() == removal_tid_lock);
chassert(!removal_tid.isEmpty());
chassert(removal_tid.getHash() == removal_tid_lock);
writeRemovalTID(buf);
writeCSN(buf, REMOVAL, /* internal */ true);
}
@ -384,21 +384,23 @@ void VersionMetadata::read(ReadBuffer & buf)
if (name == CREATION_CSN_STR)
{
assert(!creation_csn);
chassert(!creation_csn);
creation_csn = read_csn();
}
else if (name == REMOVAL_TID_STR)
{
/// NOTE Metadata file may actually contain multiple creation TIDs, we need the last one.
removal_tid = TransactionID::read(buf);
if (!removal_tid.isEmpty())
if (removal_tid.isEmpty())
removal_tid_lock = 0;
else
removal_tid_lock = removal_tid.getHash();
}
else if (name == REMOVAL_CSN_STR)
{
if (removal_tid.isEmpty())
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Found removal_csn in metadata file, but removal_tid is {}", removal_tid);
assert(!removal_csn);
chassert(!removal_csn);
removal_csn = read_csn();
}
else

View File

@ -444,9 +444,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (auto txn = context->getCurrentTransaction())
{
assert(txn->getState() != MergeTreeTransaction::COMMITTED);
chassert(txn->getState() != MergeTreeTransaction::COMMITTING);
chassert(txn->getState() != MergeTreeTransaction::COMMITTED);
if (txn->getState() == MergeTreeTransaction::ROLLED_BACK && !ast->as<ASTTransactionControl>() && !ast->as<ASTExplainQuery>())
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Cannot execute query: transaction is rolled back");
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Cannot execute query because current transaction failed. Expecting ROLLBACK statement.");
}
/// Interpret SETTINGS clauses as early as possible (before invoking the corresponding interpreter),

View File

@ -187,7 +187,7 @@ void registerInputFormatArrow(FormatFactory & factory)
{
return std::make_shared<ArrowBlockInputFormat>(buf, sample, false, format_settings);
});
factory.markFormatAsColumnOriented("Arrow");
factory.markFormatSupportsSubsetOfColumns("Arrow");
factory.registerInputFormat(
"ArrowStream",
[](ReadBuffer & buf,

View File

@ -36,7 +36,6 @@
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/case_conv.hpp>
/// UINT16 and UINT32 are processed separately, see comments in readColumnFromArrowColumn.
#define FOR_ARROW_NUMERIC_TYPES(M) \
M(arrow::Type::UINT8, DB::UInt8) \

View File

@ -11,7 +11,6 @@
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnMap.h>
#include <Core/callOnTypeIndex.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeArray.h>
@ -215,14 +214,16 @@ namespace DB
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values)
{
const auto * column_tuple = assert_cast<const ColumnTuple *>(column.get());
const auto & nested_types = assert_cast<const DataTypeTuple *>(column_type.get())->getElements();
const auto * type_tuple = assert_cast<const DataTypeTuple *>(column_type.get());
const auto & nested_types = type_tuple->getElements();
const auto & nested_names = type_tuple->getElementNames();
arrow::StructBuilder & builder = assert_cast<arrow::StructBuilder &>(*array_builder);
for (size_t i = 0; i != column_tuple->tupleSize(); ++i)
{
ColumnPtr nested_column = column_tuple->getColumnPtr(i);
fillArrowArray(column_name + "." + std::to_string(i), nested_column, nested_types[i], null_bytemap, builder.field_builder(i), format_name, start, end, output_string_as_string, dictionary_values);
fillArrowArray(column_name + "." + nested_names[i], nested_column, nested_types[i], null_bytemap, builder.field_builder(i), format_name, start, end, output_string_as_string, dictionary_values);
}
for (size_t i = start; i != end; ++i)
@ -661,14 +662,15 @@ namespace DB
if (isTuple(column_type))
{
const auto & nested_types = assert_cast<const DataTypeTuple *>(column_type.get())->getElements();
const auto & tuple_type = assert_cast<const DataTypeTuple *>(column_type.get());
const auto & nested_types = tuple_type->getElements();
const auto & nested_names = tuple_type->getElementNames();
const auto * tuple_column = assert_cast<const ColumnTuple *>(column.get());
std::vector<std::shared_ptr<arrow::Field>> nested_fields;
for (size_t i = 0; i != nested_types.size(); ++i)
{
String name = column_name + "." + std::to_string(i);
auto nested_arrow_type = getArrowType(nested_types[i], tuple_column->getColumnPtr(i), name, format_name, output_string_as_string, out_is_column_nullable);
nested_fields.push_back(std::make_shared<arrow::Field>(name, nested_arrow_type, *out_is_column_nullable));
auto nested_arrow_type = getArrowType(nested_types[i], tuple_column->getColumnPtr(i), nested_names[i], format_name, output_string_as_string, out_is_column_nullable);
nested_fields.push_back(std::make_shared<arrow::Field>(nested_names[i], nested_arrow_type, *out_is_column_nullable));
}
return arrow::struct_(nested_fields);
}

View File

@ -112,7 +112,9 @@ String CSVFormatReader::readCSVFieldIntoString()
void CSVFormatReader::skipField()
{
readCSVFieldIntoString<true>();
skipWhitespacesAndTabs(*in);
NullOutput out;
readCSVStringInto(out, *in, format_settings.csv);
}
void CSVFormatReader::skipRowEndDelimiter()
@ -374,6 +376,7 @@ void registerFileSegmentationEngineCSV(FormatFactory & factory)
};
registerWithNamesAndTypes("CSV", register_func);
markFormatWithNamesAndTypesSupportsSamplingColumns("CSV", factory);
}
void registerCSVSchemaReader(FormatFactory & factory)

View File

@ -310,6 +310,7 @@ void registerInputFormatCapnProto(FormatFactory & factory)
return std::make_shared<CapnProtoRowInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings, "CapnProto", true), settings);
});
factory.markFormatSupportsSubsetOfColumns("CapnProto");
factory.registerFileExtension("capnp", "CapnProto");
}

View File

@ -333,6 +333,7 @@ void registerInputFormatCustomSeparated(FormatFactory & factory)
});
};
registerWithNamesAndTypes(ignore_spaces ? "CustomSeparatedIgnoreSpaces" : "CustomSeparated", register_func);
markFormatWithNamesAndTypesSupportsSamplingColumns(ignore_spaces ? "CustomSeparatedIgnoreSpaces" : "CustomSeparated", factory);
}
}

View File

@ -54,6 +54,7 @@ void registerInputFormatJSONColumns(FormatFactory & factory)
return std::make_shared<JSONColumnsBlockInputFormatBase>(buf, sample, settings, std::make_unique<JSONColumnsReader>(buf));
}
);
factory.markFormatSupportsSubsetOfColumns("JSONColumns");
}
void registerJSONColumnsSchemaReader(FormatFactory & factory)

View File

@ -229,6 +229,7 @@ void registerInputFormatJSONCompactEachRow(FormatFactory & factory)
};
registerWithNamesAndTypes(yield_strings ? "JSONCompactStringsEachRow" : "JSONCompactEachRow", register_func);
markFormatWithNamesAndTypesSupportsSamplingColumns(yield_strings ? "JSONCompactStringsEachRow" : "JSONCompactEachRow", factory);
}
}

View File

@ -393,6 +393,11 @@ void registerInputFormatJSONEachRow(FormatFactory & factory)
{
return std::make_shared<JSONEachRowRowInputFormat>(buf, sample, std::move(params), settings, true);
});
factory.markFormatSupportsSubsetOfColumns("JSONEachRow");
factory.markFormatSupportsSubsetOfColumns("JSONLines");
factory.markFormatSupportsSubsetOfColumns("NDJSON");
factory.markFormatSupportsSubsetOfColumns("JSONStringsEachRow");
}
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory)

View File

@ -397,8 +397,8 @@ bool MySQLDumpRowInputFormat::readField(IColumn & column, size_t column_idx)
void MySQLDumpRowInputFormat::skipField()
{
String tmp;
readQuotedField(tmp, *in);
NullOutput out;
readQuotedFieldInto(out, *in);
}
MySQLDumpSchemaReader::MySQLDumpSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)

View File

@ -15,9 +15,9 @@ namespace DB
class NativeInputFormat final : public IInputFormat
{
public:
NativeInputFormat(ReadBuffer & buf, const Block & header_)
NativeInputFormat(ReadBuffer & buf, const Block & header_, const FormatSettings & settings)
: IInputFormat(header_, buf)
, reader(std::make_unique<NativeReader>(buf, header_, 0))
, reader(std::make_unique<NativeReader>(buf, header_, 0, settings.skip_unknown_fields))
, header(header_) {}
String getName() const override { return "Native"; }
@ -112,10 +112,11 @@ void registerInputFormatNative(FormatFactory & factory)
ReadBuffer & buf,
const Block & sample,
const RowInputFormatParams &,
const FormatSettings &)
const FormatSettings & settings)
{
return std::make_shared<NativeInputFormat>(buf, sample);
return std::make_shared<NativeInputFormat>(buf, sample, settings);
});
factory.markFormatSupportsSubsetOfColumns("Native");
}
void registerOutputFormatNative(FormatFactory & factory)

View File

@ -198,7 +198,7 @@ void registerInputFormatORC(FormatFactory & factory)
{
return std::make_shared<ORCBlockInputFormat>(buf, sample, settings);
});
factory.markFormatAsColumnOriented("ORC");
factory.markFormatSupportsSubsetOfColumns("ORC");
}
void registerORCSchemaReader(FormatFactory & factory)

View File

@ -55,7 +55,7 @@ ORCBlockOutputFormat::ORCBlockOutputFormat(WriteBuffer & out_, const Block & hea
data_types.push_back(recursiveRemoveLowCardinality(type));
}
ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & type, const std::string & column_name)
ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & type)
{
switch (type->getTypeId())
{
@ -106,12 +106,12 @@ ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & t
}
case TypeIndex::Nullable:
{
return getORCType(removeNullable(type), column_name);
return getORCType(removeNullable(type));
}
case TypeIndex::Array:
{
const auto * array_type = assert_cast<const DataTypeArray *>(type.get());
return orc::createListType(getORCType(array_type->getNestedType(), column_name));
return orc::createListType(getORCType(array_type->getNestedType()));
}
case TypeIndex::Decimal32:
{
@ -131,21 +131,19 @@ ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & t
case TypeIndex::Tuple:
{
const auto * tuple_type = assert_cast<const DataTypeTuple *>(type.get());
const auto & nested_names = tuple_type->getElementNames();
const auto & nested_types = tuple_type->getElements();
auto struct_type = orc::createStructType();
for (size_t i = 0; i < nested_types.size(); ++i)
{
String name = column_name + "." + std::to_string(i);
struct_type->addStructField(name, getORCType(nested_types[i], name));
}
struct_type->addStructField(nested_names[i], getORCType(nested_types[i]));
return struct_type;
}
case TypeIndex::Map:
{
const auto * map_type = assert_cast<const DataTypeMap *>(type.get());
return orc::createMapType(
getORCType(map_type->getKeyType(), column_name),
getORCType(map_type->getValueType(), column_name)
getORCType(map_type->getKeyType()),
getORCType(map_type->getValueType())
);
}
default:
@ -514,7 +512,7 @@ void ORCBlockOutputFormat::prepareWriter()
options.setCompression(orc::CompressionKind::CompressionKind_NONE);
size_t columns_count = header.columns();
for (size_t i = 0; i != columns_count; ++i)
schema->addStructField(header.safeGetByPosition(i).name, getORCType(recursiveRemoveLowCardinality(data_types[i]), header.safeGetByPosition(i).name));
schema->addStructField(header.safeGetByPosition(i).name, getORCType(recursiveRemoveLowCardinality(data_types[i])));
writer = orc::createWriter(*schema, &output_stream, options);
}

View File

@ -42,7 +42,7 @@ private:
void consume(Chunk chunk) override;
void finalizeImpl() override;
ORC_UNIQUE_PTR<orc::Type> getORCType(const DataTypePtr & type, const std::string & column_name);
ORC_UNIQUE_PTR<orc::Type> getORCType(const DataTypePtr & type);
/// ConvertFunc is needed for type UInt8, because firstly UInt8 (char8_t) must be
/// converted to unsigned char (bugprone-signed-char-misuse in clang).

View File

@ -192,7 +192,7 @@ void registerInputFormatParquet(FormatFactory & factory)
{
return std::make_shared<ParquetBlockInputFormat>(buf, sample, settings);
});
factory.markFormatAsColumnOriented("Parquet");
factory.markFormatSupportsSubsetOfColumns("Parquet");
}
void registerParquetSchemaReader(FormatFactory & factory)

View File

@ -79,7 +79,7 @@ void registerInputFormatProtobufList(FormatFactory & factory)
return std::make_shared<ProtobufListInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings, "Protobuf", true), settings.protobuf.input_flatten_google_wrappers);
});
factory.markFormatAsColumnOriented("ProtobufList");
factory.markFormatSupportsSubsetOfColumns("ProtobufList");
}
void registerProtobufListSchemaReader(FormatFactory & factory)

View File

@ -69,6 +69,7 @@ void registerInputFormatProtobuf(FormatFactory & factory)
with_length_delimiter,
settings.protobuf.input_flatten_google_wrappers);
});
factory.markFormatSupportsSubsetOfColumns(with_length_delimiter ? "Protobuf" : "ProtobufSingle");
}
}

View File

@ -277,6 +277,8 @@ void registerInputFormatTSKV(FormatFactory & factory)
{
return std::make_shared<TSKVRowInputFormat>(buf, sample, std::move(params), settings);
});
factory.markFormatSupportsSubsetOfColumns("TSKV");
}
void registerTSKVSchemaReader(FormatFactory & factory)
{

View File

@ -80,7 +80,11 @@ String TabSeparatedFormatReader::readFieldIntoString()
void TabSeparatedFormatReader::skipField()
{
readFieldIntoString();
NullOutput out;
if (is_raw)
readStringInto(out, *in);
else
readEscapedStringInto(out, *in);
}
void TabSeparatedFormatReader::skipHeaderRow()
@ -347,6 +351,8 @@ void registerFileSegmentationEngineTabSeparated(FormatFactory & factory)
registerWithNamesAndTypes(is_raw ? "TSVRaw" : "TSV", register_func);
registerWithNamesAndTypes(is_raw ? "TabSeparatedRaw" : "TabSeparated", register_func);
markFormatWithNamesAndTypesSupportsSamplingColumns(is_raw ? "TSVRaw" : "TSV", factory);
markFormatWithNamesAndTypesSupportsSamplingColumns(is_raw ? "TabSeparatedRaw" : "TabSeparated", factory);
}
// We can use the same segmentation engine for TSKV.

View File

@ -77,8 +77,7 @@ static size_t tryAddNewFilterStep(
/// New filter column is the first one.
auto split_filter_column_name = (*split_filter->getIndex().begin())->result_name;
node.step = std::make_unique<FilterStep>(
node.children.at(0)->step->getOutputStream(),
std::move(split_filter), std::move(split_filter_column_name), true);
node.children.at(0)->step->getOutputStream(), std::move(split_filter), std::move(split_filter_column_name), true);
return 3;
}
@ -194,13 +193,13 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
/// Push down is for left table only. We need to update JoinStep for push down into right.
/// Only inner and left join are supported. Other types may generate default values for left table keys.
/// So, if we push down a condition like `key != 0`, not all rows may be filtered.
if (table_join.oneDisjunct() && (table_join.kind() == ASTTableJoin::Kind::Inner || table_join.kind() == ASTTableJoin::Kind::Left))
if (table_join.kind() == ASTTableJoin::Kind::Inner || table_join.kind() == ASTTableJoin::Kind::Left)
{
const auto & left_header = join->getInputStreams().front().header;
const auto & res_header = join->getOutputStream().header;
Names allowed_keys;
const auto & key_names_left = table_join.getOnlyClause().key_names_left;
for (const auto & name : key_names_left)
const auto & source_columns = left_header.getNames();
for (const auto & name : source_columns)
{
/// Skip key if it is renamed.
/// I don't know if it is possible. Just in case.

View File

@ -34,7 +34,7 @@ void optimizeTree(const QueryPlanOptimizationSettings & settings, QueryPlan::Nod
};
std::stack<Frame> stack;
stack.push(Frame{.node = &root});
stack.push({.node = &root});
size_t max_optimizations_to_apply = settings.max_optimizations_to_apply;
size_t total_applied_optimizations = 0;
@ -50,10 +50,10 @@ void optimizeTree(const QueryPlanOptimizationSettings & settings, QueryPlan::Nod
/// Traverse all children first.
if (frame.next_child < frame.node->children.size())
{
stack.push(Frame
stack.push(
{
.node = frame.node->children[frame.next_child],
.depth_limit = frame.depth_limit ? (frame.depth_limit - 1) : 0,
.node = frame.node->children[frame.next_child],
.depth_limit = frame.depth_limit ? (frame.depth_limit - 1) : 0,
});
++frame.next_child;

View File

@ -476,9 +476,9 @@ private:
};
bool StorageHDFS::isColumnOriented() const
bool StorageHDFS::supportsSubsetOfColumns() const
{
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatIsColumnOriented(format_name);
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
}
Pipe StorageHDFS::read(
@ -527,7 +527,7 @@ Pipe StorageHDFS::read(
ColumnsDescription columns_description;
Block block_for_format;
if (isColumnOriented())
if (supportsSubsetOfColumns())
{
auto fetch_columns = column_names;
const auto & virtuals = getVirtuals();

View File

@ -57,7 +57,7 @@ public:
/// Is is useful because column oriented formats could effectively skip unknown columns
/// So we can create a header of only required columns in read method and ask
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool isColumnOriented() const override;
bool supportsSubsetOfColumns() const override;
static ColumnsDescription getTableStructureFromData(
const String & format,

View File

@ -668,7 +668,7 @@ HiveFilePtr StorageHive::getHiveFileIfNeeded(
return hive_file;
}
bool StorageHive::isColumnOriented() const
bool StorageHive::supportsSubsetOfColumns() const
{
return format_name == "Parquet" || format_name == "ORC";
}
@ -822,7 +822,7 @@ std::optional<UInt64>
StorageHive::totalRowsImpl(const Settings & settings, const SelectQueryInfo & query_info, ContextPtr context_, PruneLevel prune_level) const
{
/// Row-based format like Text doesn't support totalRowsByPartitionPredicate
if (!isColumnOriented())
if (!supportsSubsetOfColumns())
return {};
auto hive_metastore_client = HiveMetastoreClientFactory::instance().getOrCreate(hive_metastore_url);

View File

@ -63,7 +63,7 @@ public:
NamesAndTypesList getVirtuals() const override;
bool isColumnOriented() const override;
bool supportsSubsetOfColumns() const override;
std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr context_) const override;

View File

@ -585,7 +585,7 @@ public:
/// Returns true if all disks of storage are read-only.
virtual bool isStaticStorage() const;
virtual bool isColumnOriented() const { return false; }
virtual bool supportsSubsetOfColumns() const { return false; }
/// If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it.
/// Used for:

View File

@ -1282,12 +1282,12 @@ void IMergeTreeDataPart::storeVersionMetadata() const
void IMergeTreeDataPart::appendCSNToVersionMetadata(VersionMetadata::WhichCSN which_csn) const
{
assert(!version.creation_tid.isEmpty());
assert(!(which_csn == VersionMetadata::WhichCSN::CREATION && version.creation_tid.isPrehistoric()));
assert(!(which_csn == VersionMetadata::WhichCSN::CREATION && version.creation_csn == 0));
assert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && (version.removal_tid.isPrehistoric() || version.removal_tid.isEmpty())));
assert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && version.removal_csn == 0));
assert(isStoredOnDisk());
chassert(!version.creation_tid.isEmpty());
chassert(!(which_csn == VersionMetadata::WhichCSN::CREATION && version.creation_tid.isPrehistoric()));
chassert(!(which_csn == VersionMetadata::WhichCSN::CREATION && version.creation_csn == 0));
chassert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && (version.removal_tid.isPrehistoric() || version.removal_tid.isEmpty())));
chassert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && version.removal_csn == 0));
chassert(isStoredOnDisk());
/// Small enough appends to file are usually atomic,
/// so we append new metadata instead of rewriting file to reduce number of fsyncs.
@ -1303,10 +1303,10 @@ void IMergeTreeDataPart::appendCSNToVersionMetadata(VersionMetadata::WhichCSN wh
void IMergeTreeDataPart::appendRemovalTIDToVersionMetadata(bool clear) const
{
assert(!version.creation_tid.isEmpty());
assert(version.removal_csn == 0);
assert(!version.removal_tid.isEmpty());
assert(isStoredOnDisk());
chassert(!version.creation_tid.isEmpty());
chassert(version.removal_csn == 0);
chassert(!version.removal_tid.isEmpty());
chassert(isStoredOnDisk());
if (version.creation_tid.isPrehistoric() && !clear)
{
@ -1437,7 +1437,9 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const
bool valid_removal_tid = version.removal_tid == file.removal_tid || version.removal_tid == Tx::PrehistoricTID;
bool valid_creation_csn = version.creation_csn == file.creation_csn || version.creation_csn == Tx::RolledBackCSN;
bool valid_removal_csn = version.removal_csn == file.removal_csn || version.removal_csn == Tx::PrehistoricCSN;
if (!valid_creation_tid || !valid_removal_tid || !valid_creation_csn || !valid_removal_csn)
bool valid_removal_tid_lock = (version.removal_tid.isEmpty() && version.removal_tid_lock == 0)
|| (version.removal_tid_lock == version.removal_tid.getHash());
if (!valid_creation_tid || !valid_removal_tid || !valid_creation_csn || !valid_removal_csn || !valid_removal_tid_lock)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Invalid version metadata file");
return true;
}
@ -1445,7 +1447,8 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const
{
WriteBufferFromOwnString expected;
version.write(expected);
tryLogCurrentException(storage.log, fmt::format("File {} contains:\n{}\nexpected:\n{}", version_file_name, content, expected.str()));
tryLogCurrentException(storage.log, fmt::format("File {} contains:\n{}\nexpected:\n{}\nlock: {}",
version_file_name, content, expected.str(), version.removal_tid_lock));
return false;
}
}

View File

@ -1364,7 +1364,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
/// Check if CSNs were witten after committing transaction, update and write if needed.
bool version_updated = false;
assert(!version.creation_tid.isEmpty());
chassert(!version.creation_tid.isEmpty());
if (!part->version.creation_csn)
{
auto min = TransactionLog::getCSN(version.creation_tid);

View File

@ -316,9 +316,9 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context);
}
bool StorageFile::isColumnOriented() const
bool StorageFile::supportsSubsetOfColumns() const
{
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatIsColumnOriented(format_name);
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
}
StorageFile::StorageFile(int table_fd_, CommonArguments args)
@ -465,7 +465,7 @@ public:
const ColumnsDescription & columns_description,
const FilesInfoPtr & files_info)
{
if (storage->isColumnOriented())
if (storage->supportsSubsetOfColumns())
return storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
else
return getHeader(storage_snapshot->metadata, files_info->need_path_column, files_info->need_file_column);
@ -530,7 +530,7 @@ public:
auto get_block_for_format = [&]() -> Block
{
if (storage->isColumnOriented())
if (storage->supportsSubsetOfColumns())
return storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
return storage_snapshot->metadata->getSampleBlock();
};
@ -690,7 +690,7 @@ Pipe StorageFile::read(
{
const auto get_columns_for_format = [&]() -> ColumnsDescription
{
if (isColumnOriented())
if (supportsSubsetOfColumns())
return storage_snapshot->getDescriptionForColumns(column_names);
else
return storage_snapshot->metadata->getColumns();

View File

@ -69,11 +69,11 @@ public:
static Strings getPathsList(const String & table_path, const String & user_files_path, ContextPtr context, size_t & total_bytes_to_read);
/// Check if the format is column-oriented.
/// Is is useful because column oriented formats could effectively skip unknown columns
/// Check if the format supports reading only some subset of columns.
/// Is is useful because such formats could effectively skip unknown columns
/// So we can create a header of only required columns in read method and ask
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool isColumnOriented() const override;
bool supportsSubsetOfColumns() const override;
bool supportsPartitionBy() const override { return true; }

View File

@ -676,9 +676,9 @@ std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
}
}
bool StorageS3::isColumnOriented() const
bool StorageS3::supportsSubsetOfColumns() const
{
return FormatFactory::instance().checkIfFormatIsColumnOriented(format_name);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
}
Pipe StorageS3::read(
@ -707,7 +707,7 @@ Pipe StorageS3::read(
ColumnsDescription columns_description;
Block block_for_format;
if (isColumnOriented())
if (supportsSubsetOfColumns())
{
auto fetch_columns = column_names;
const auto & virtuals = getVirtuals();

View File

@ -234,7 +234,7 @@ private:
ContextPtr ctx,
std::vector<String> * read_keys_in_distributed_processing = nullptr);
bool isColumnOriented() const override;
bool supportsSubsetOfColumns() const override;
};
}

View File

@ -92,10 +92,32 @@ NameAndTypePair StorageSnapshot::getColumn(const GetColumnsOptions & options, co
Block StorageSnapshot::getSampleBlockForColumns(const Names & column_names) const
{
Block res;
auto columns_description = getDescriptionForColumns(column_names);
for (const auto & column : columns_description)
res.insert({column.type->createColumn(), column.type, column.name});
const auto & columns = getMetadataForQuery()->getColumns();
for (const auto & name : column_names)
{
auto column = columns.tryGetColumnOrSubcolumn(GetColumnsOptions::All, name);
auto object_column = object_columns.tryGetColumnOrSubcolumn(GetColumnsOptions::All, name);
if (column && !object_column)
{
res.insert({column->type->createColumn(), column->type, column->name});
}
else if (object_column)
{
res.insert({object_column->type->createColumn(), object_column->type, object_column->name});
}
else if (auto it = virtual_columns.find(name); it != virtual_columns.end())
{
/// Virtual columns must be appended after ordinary, because user can
/// override them.
const auto & type = it->second;
res.insert({type->createColumn(), type, name});
}
else
{
throw Exception(ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK,
"Column {} not found in table {}", backQuote(name), storage.getStorageID().getNameForLogs());
}
}
return res;
}

View File

@ -582,9 +582,9 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context);
}
bool IStorageURLBase::isColumnOriented() const
bool IStorageURLBase::supportsSubsetOfColumns() const
{
return FormatFactory::instance().checkIfFormatIsColumnOriented(format_name);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
}
Pipe IStorageURLBase::read(
@ -600,7 +600,7 @@ Pipe IStorageURLBase::read(
ColumnsDescription columns_description;
Block block_for_format;
if (isColumnOriented())
if (supportsSubsetOfColumns())
{
columns_description = storage_snapshot->getDescriptionForColumns(column_names);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
@ -687,7 +687,7 @@ Pipe StorageURLWithFailover::read(
{
ColumnsDescription columns_description;
Block block_for_format;
if (isColumnOriented())
if (supportsSubsetOfColumns())
{
columns_description = storage_snapshot->getDescriptionForColumns(column_names);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());

View File

@ -93,7 +93,7 @@ protected:
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const;
bool isColumnOriented() const override;
bool supportsSubsetOfColumns() const override;
private:
virtual Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const = 0;

View File

@ -140,7 +140,7 @@ SinkToStoragePtr StorageXDBC::write(const ASTPtr & /* query */, const StorageMet
chooseCompressionMethod(uri, compression_method));
}
bool StorageXDBC::isColumnOriented() const
bool StorageXDBC::supportsSubsetOfColumns() const
{
return true;
}

View File

@ -67,7 +67,7 @@ private:
Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const override;
bool isColumnOriented() const override;
bool supportsSubsetOfColumns() const override;
};
}

View File

@ -85,6 +85,7 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_)
{"visible", std::make_shared<DataTypeUInt8>()},
{"creation_tid", getTransactionIDDataType()},
{"removal_tid_lock", std::make_shared<DataTypeUInt64>()},
{"removal_tid", getTransactionIDDataType()},
{"creation_csn", std::make_shared<DataTypeUInt64>()},
{"removal_csn", std::make_shared<DataTypeUInt64>()},
@ -295,6 +296,8 @@ void StorageSystemParts::processNextStorage(
if (columns_mask[src_index++])
columns[res_index++]->insert(get_tid_as_field(part->version.creation_tid));
if (columns_mask[src_index++])
columns[res_index++]->insert(part->version.removal_tid_lock.load(std::memory_order_relaxed));
if (columns_mask[src_index++])
columns[res_index++]->insert(get_tid_as_field(part->version.getRemovalTID()));
if (columns_mask[src_index++])

View File

@ -15,6 +15,7 @@ static DataTypePtr getStateEnumType()
DataTypeEnum8::Values
{
{"RUNNING", static_cast<Int8>(MergeTreeTransaction::State::RUNNING)},
{"COMMITTING", static_cast<Int8>(MergeTreeTransaction::State::COMMITTING)},
{"COMMITTED", static_cast<Int8>(MergeTreeTransaction::State::COMMITTED)},
{"ROLLED_BACK", static_cast<Int8>(MergeTreeTransaction::State::ROLLED_BACK)},
});

View File

@ -404,7 +404,11 @@ def main():
elif args.image_path:
pr_info.changed_files = set(i for i in args.image_path)
else:
pr_info.fetch_changed_files()
try:
pr_info.fetch_changed_files()
except TypeError:
# If the event does not contain diff, nothing will be built
pass
changed_images = get_changed_docker_images(pr_info, images_dict)
if changed_images:

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python3
import argparse
import logging
import subprocess
import os
@ -15,17 +16,31 @@ from upload_result_helper import upload_results
from docker_pull_helper import get_image_with_version
from commit_status_helper import get_commit
from rerun_helper import RerunHelper
from tee_popen import TeePopen
NAME = "Docs Release (actions)"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description="ClickHouse building script using prebuilt Docker image",
)
parser.add_argument(
"--as-root", action="store_true", help="if the container should run as root"
)
return parser.parse_args()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
args = parse_args()
temp_path = TEMP_PATH
repo_path = REPO_COPY
gh = Github(get_best_robot_token())
pr_info = PRInfo(need_changed_files=True)
pr_info = PRInfo()
rerun_helper = RerunHelper(gh, pr_info, NAME)
if rerun_helper.is_already_finished_by_status():
logging.info("Check is already finished according to github status, exiting")
@ -40,20 +55,23 @@ if __name__ == "__main__":
if not os.path.exists(test_output):
os.makedirs(test_output)
token = CLOUDFLARE_TOKEN
cmd = (
"docker run --cap-add=SYS_PTRACE --volume=$SSH_AUTH_SOCK:/ssh-agent "
f"-e SSH_AUTH_SOCK=/ssh-agent -e CLOUDFLARE_TOKEN={token} "
f"-e EXTRA_BUILD_ARGS='--verbose' --volume={repo_path}:/repo_path"
f" --volume={test_output}:/output_path {docker_image}"
)
if args.as_root:
user = "0:0"
else:
user = f"{os.geteuid()}:{os.getegid()}"
run_log_path = os.path.join(test_output, "runlog.log")
with open(run_log_path, "w", encoding="utf-8") as log, SSHKey(
"ROBOT_CLICKHOUSE_SSH_KEY"
):
with subprocess.Popen(cmd, shell=True, stderr=log, stdout=log) as process:
with SSHKey("ROBOT_CLICKHOUSE_SSH_KEY"):
cmd = (
f"docker run --cap-add=SYS_PTRACE --user={user} "
f"--volume='{os.getenv('SSH_AUTH_SOCK', '')}:/ssh-agent' "
f"--volume={repo_path}:/repo_path --volume={test_output}:/output_path "
f"-e SSH_AUTH_SOCK=/ssh-agent -e EXTRA_BUILD_ARGS='--verbose' "
f"-e CLOUDFLARE_TOKEN={CLOUDFLARE_TOKEN} {docker_image}"
)
logging.info("Running command: %s", cmd)
with TeePopen(cmd, run_log_path) as process:
retcode = process.wait()
if retcode == 0:
logging.info("Run successfully")
@ -98,3 +116,6 @@ if __name__ == "__main__":
commit.create_status(
context=NAME, description=description, state=status, target_url=report_url
)
if status == "failure":
sys.exit(1)

View File

@ -186,6 +186,7 @@ class PRInfo:
else:
self.diff_url = pull_request["diff_url"]
else:
print("event.json does not match pull_request or push:")
print(json.dumps(github_event, sort_keys=True, indent=4))
self.sha = os.getenv("GITHUB_SHA")
self.number = 0
@ -204,8 +205,8 @@ class PRInfo:
self.fetch_changed_files()
def fetch_changed_files(self):
if not self.diff_url:
raise Exception("Diff URL cannot be find for event")
if not getattr(self, "diff_url", False):
raise TypeError("The event does not have diff URL")
response = get_with_retries(
self.diff_url,

View File

@ -10,4 +10,12 @@
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</transactions_info_log>
<transaction_log>
<zookeeper_path>/test/clickhouse/txn</zookeeper_path>
<fault_probability_before_commit>0.0</fault_probability_before_commit>
<!-- Fault injection after commit should not affect tests, because default waiting mode is WAIT_UNKNOWN -->
<fault_probability_after_commit>0.01</fault_probability_after_commit>
</transaction_log>
</yandex>

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,33 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,24 @@
<clickhouse>
<keeper_server>
<force_recovery>1</force_recovery>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,33 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,33 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,16 @@
<clickhouse>
<zookeeper>
<node index="1">
<host>node1</host>
<port>9181</port>
</node>
<node index="2">
<host>node2</host>
<port>9181</port>
</node>
<node index="3">
<host>node3</host>
<port>9181</port>
</node>
</zookeeper>
</clickhouse>

View File

@ -0,0 +1,157 @@
import os
import pytest
import socket
from helpers.cluster import ClickHouseCluster
import time
from kazoo.client import KazooClient
CLUSTER_SIZE = 3
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
def get_nodes():
nodes = []
for i in range(CLUSTER_SIZE):
nodes.append(
cluster.add_instance(
f"node{i+1}",
main_configs=[
f"configs/enable_keeper{i+1}.xml",
f"configs/use_keeper.xml",
],
stay_alive=True,
)
)
return nodes
nodes = get_nodes()
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(
hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout
)
_fake_zk_instance.start()
return _fake_zk_instance
def get_keeper_socket(node_name):
hosts = cluster.get_instance_ip(node_name)
client = socket.socket()
client.settimeout(10)
client.connect((hosts, 9181))
return client
def send_4lw_cmd(node_name, cmd="ruok"):
client = None
try:
client = get_keeper_socket(node_name)
client.send(cmd.encode())
data = client.recv(100_000)
data = data.decode()
return data
finally:
if client is not None:
client.close()
def wait_until_connected(node_name):
while send_4lw_cmd(node_name, "mntr") == NOT_SERVING_REQUESTS_ERROR_MSG:
time.sleep(0.1)
def wait_nodes(nodes):
for node in nodes:
wait_until_connected(node.name)
def wait_and_assert_data(zk, path, data):
while zk.exists(path) is None:
time.sleep(0.1)
assert zk.get(path)[0] == data.encode()
def close_zk(zk):
zk.stop()
zk.close()
NOT_SERVING_REQUESTS_ERROR_MSG = "This instance is not currently serving requests"
def test_cluster_recovery(started_cluster):
node_zks = []
try:
wait_nodes(nodes)
node_zks = [get_fake_zk(node.name) for node in nodes]
data_in_cluster = []
def add_data(zk, path, data):
zk.create(path, data.encode())
data_in_cluster.append((path, data))
def assert_all_data(zk):
for path, data in data_in_cluster:
wait_and_assert_data(zk, path, data)
for i, zk in enumerate(node_zks):
add_data(zk, f"/test_force_recovery_node{i+1}", f"somedata{i+1}")
for zk in node_zks:
assert_all_data(zk)
nodes[0].stop_clickhouse()
add_data(node_zks[1], "/test_force_recovery_extra", "somedataextra")
for node_zk in node_zks[2:CLUSTER_SIZE]:
wait_and_assert_data(node_zk, "/test_force_recovery_extra", "somedataextra")
nodes[0].start_clickhouse()
wait_until_connected(nodes[0].name)
node_zks[0] = get_fake_zk(nodes[0].name)
wait_and_assert_data(node_zks[0], "/test_force_recovery_extra", "somedataextra")
# stop all nodes
for node_zk in node_zks:
close_zk(node_zk)
node_zks = []
for node in nodes:
node.stop_clickhouse()
nodes[0].copy_file_to_container(
os.path.join(CONFIG_DIR, "enable_keeper1_solo.xml"),
"/etc/clickhouse-server/config.d/enable_keeper1.xml",
)
nodes[0].start_clickhouse()
wait_until_connected(nodes[0].name)
assert_all_data(get_fake_zk(nodes[0].name))
finally:
try:
for zk_conn in node_zks:
close_zk(zk_conn)
except:
pass

View File

@ -0,0 +1,32 @@
<test>
<settings>
<max_threads>1</max_threads>
</settings>
<substitutions>
<substitution>
<name>format</name>
<values>
<value>TabSeparatedWithNames</value>
<value>CustomSeparatedWithNames</value>
<value>CSVWithNames</value>
<value>JSONEachRow</value>
<value>JSONCompactEachRowWithNames</value>
<value>TSKV</value>
<value>Avro</value>
<value>ORC</value>
<value>Parquet</value>
<value>Arrow</value>
<value>Native</value>
</values>
</substitution>
</substitutions>
<create_query>CREATE TABLE IF NOT EXISTS table_{format} ENGINE = File({format}) AS test.hits</create_query>
<fill_query>INSERT INTO table_{format} SELECT * FROM test.hits LIMIT 100000</fill_query>
<query>SELECT WatchID FROM table_{format} FORMAT Null</query>
<drop_query>DROP TABLE IF EXISTS table_{format}</drop_query>
</test>

View File

@ -0,0 +1,100 @@
<test>
<substitutions>
<substitution>
<name>element_type</name>
<values>
<value>UInt8</value>
<value>Int16</value>
<value>Int32</value>
<value>Int64</value>
<value>Float32</value>
<value>Float64</value>
</values>
</substitution>
</substitutions>
<create_query>
CREATE TABLE vecs_{element_type} (
v Array({element_type})
) ENGINE=Memory;
</create_query>
<!-- Gererate arrays with random data -->
<fill_query>
INSERT INTO vecs_{element_type}
SELECT v FROM (
SELECT
number AS n,
[
rand(n*10),
rand(n*10+1),
rand(n*10+2),
rand(n*10+3),
rand(n*10+4),
rand(n*10+5),
rand(n*10+6),
rand(n*10+7),
rand(n*10+8),
rand(n*10+9)
] AS v
FROM system.numbers
LIMIT 10000000
);
</fill_query>
<!-- The same data in the form of tuples -->
<create_query>
CREATE TABLE tuples_{element_type} (
t Tuple(
{element_type},
{element_type},
{element_type},
{element_type},
{element_type},
{element_type},
{element_type},
{element_type},
{element_type},
{element_type}
)
) ENGINE=Memory;
</create_query>
<fill_query>
INSERT INTO tuples_{element_type}
SELECT (v[1], v[2], v[3], v[4], v[5], v[6], v[7], v[8], v[9], v[10]) FROM vecs_{element_type};
</fill_query>
<settings>
<max_threads>1</max_threads>
</settings>
<!-- Norm kinds-->
<substitutions>
<substitution>
<name>norm</name>
<values>
<value>L1</value>
<value>L2</value>
<value>Linf</value>
</values>
</substitution>
</substitutions>
<!-- Tuples -->
<query>SELECT sum(dist) FROM (SELECT {norm}Norm(t) AS dist FROM tuples_{element_type})</query>
<query>WITH (SELECT t FROM tuples_{element_type} limit 1) AS a SELECT sum(dist) FROM (SELECT {norm}Distance(a, t) AS dist FROM tuples_{element_type})</query>
<query>WITH (SELECT t FROM tuples_{element_type} limit 1) AS a SELECT sum(dist) FROM (SELECT cosineDistance(a, t) AS dist FROM tuples_{element_type})</query>
<!-- Arrays -->
<query>SELECT sum(dist) FROM (SELECT array{norm}Norm(v) AS dist FROM vecs_{element_type})</query>
<query>WITH (SELECT v FROM vecs_{element_type} limit 1) AS a SELECT sum(dist) FROM (SELECT array{norm}Distance(a, v) AS dist FROM vecs_{element_type})</query>
<query>WITH (SELECT v FROM vecs_{element_type} limit 1) AS a SELECT sum(dist) FROM (SELECT arrayCosineDistance(a, v) AS dist FROM vecs_{element_type})</query>
<drop_query>DROP TABLE vecs_{element_type}</drop_query>
<drop_query>DROP TABLE tuples_{element_type}</drop_query>
</test>

Some files were not shown because too many files have changed in this diff Show More