Merge branch 'master' into new-nav

This commit is contained in:
Rich Raposa 2023-03-14 13:31:33 -06:00 committed by GitHub
commit e9073dfe43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
82 changed files with 937 additions and 420 deletions

View File

@ -159,22 +159,22 @@ inline const char * find_first_symbols_sse42(const char * const begin, const cha
#endif
for (; pos < end; ++pos)
if ( (num_chars >= 1 && maybe_negate<positive>(*pos == c01))
|| (num_chars >= 2 && maybe_negate<positive>(*pos == c02))
|| (num_chars >= 3 && maybe_negate<positive>(*pos == c03))
|| (num_chars >= 4 && maybe_negate<positive>(*pos == c04))
|| (num_chars >= 5 && maybe_negate<positive>(*pos == c05))
|| (num_chars >= 6 && maybe_negate<positive>(*pos == c06))
|| (num_chars >= 7 && maybe_negate<positive>(*pos == c07))
|| (num_chars >= 8 && maybe_negate<positive>(*pos == c08))
|| (num_chars >= 9 && maybe_negate<positive>(*pos == c09))
|| (num_chars >= 10 && maybe_negate<positive>(*pos == c10))
|| (num_chars >= 11 && maybe_negate<positive>(*pos == c11))
|| (num_chars >= 12 && maybe_negate<positive>(*pos == c12))
|| (num_chars >= 13 && maybe_negate<positive>(*pos == c13))
|| (num_chars >= 14 && maybe_negate<positive>(*pos == c14))
|| (num_chars >= 15 && maybe_negate<positive>(*pos == c15))
|| (num_chars >= 16 && maybe_negate<positive>(*pos == c16)))
if ( (num_chars == 1 && maybe_negate<positive>(is_in<c01>(*pos)))
|| (num_chars == 2 && maybe_negate<positive>(is_in<c01, c02>(*pos)))
|| (num_chars == 3 && maybe_negate<positive>(is_in<c01, c02, c03>(*pos)))
|| (num_chars == 4 && maybe_negate<positive>(is_in<c01, c02, c03, c04>(*pos)))
|| (num_chars == 5 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05>(*pos)))
|| (num_chars == 6 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06>(*pos)))
|| (num_chars == 7 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07>(*pos)))
|| (num_chars == 8 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08>(*pos)))
|| (num_chars == 9 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09>(*pos)))
|| (num_chars == 10 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10>(*pos)))
|| (num_chars == 11 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11>(*pos)))
|| (num_chars == 12 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12>(*pos)))
|| (num_chars == 13 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12, c13>(*pos)))
|| (num_chars == 14 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12, c13, c14>(*pos)))
|| (num_chars == 15 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12, c13, c14, c15>(*pos)))
|| (num_chars == 16 && maybe_negate<positive>(is_in<c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12, c13, c14, c15, c16>(*pos))))
return pos;
return return_mode == ReturnMode::End ? end : nullptr;
}

View File

@ -105,6 +105,8 @@ public:
const std::string & getText() const;
/// Returns the text of the message.
void appendText(const std::string & text);
void setPriority(Priority prio);
/// Sets the priority of the message.

View File

@ -157,6 +157,12 @@ void Message::setText(const std::string& text)
}
void Message::appendText(const std::string & text)
{
_text.append(text);
}
void Message::setPriority(Priority prio)
{
_prio = prio;

View File

@ -1,7 +1,76 @@
#include <Common/LoggingFormatStringHelpers.h>
#include <Common/SipHash.h>
#include <Common/thread_local_rng.h>
[[noreturn]] void functionThatFailsCompilationOfConstevalFunctions(const char * error)
{
throw std::runtime_error(error);
}
std::unordered_map<UInt64, std::pair<time_t, size_t>> LogFrequencyLimiterIml::logged_messages;
time_t LogFrequencyLimiterIml::last_cleanup = 0;
std::mutex LogFrequencyLimiterIml::mutex;
void LogFrequencyLimiterIml::log(Poco::Message & message)
{
std::string_view pattern = message.getFormatString();
if (pattern.empty())
{
/// Do not filter messages without a format string
if (auto * channel = logger->getChannel())
channel->log(message);
return;
}
SipHash hash;
hash.update(logger->name());
/// Format strings are compile-time constants, so they are uniquely identified by pointer and size
hash.update(pattern.data());
hash.update(pattern.size());
time_t now = time(nullptr);
size_t skipped_similar_messages = 0;
bool need_cleanup;
bool need_log;
{
std::lock_guard lock(mutex);
need_cleanup = last_cleanup + 300 <= now;
auto & info = logged_messages[hash.get64()];
need_log = info.first + min_interval_s <= now;
if (need_log)
{
skipped_similar_messages = info.second;
info.first = now;
info.second = 0;
}
else
{
++info.second;
}
}
/// We don't need all threads to do cleanup, just randomize
if (need_cleanup && thread_local_rng() % 100 == 0)
cleanup();
/// The message it too frequent, skip it for now
/// NOTE It's not optimal because we format the message first and only then check if we need to actually write it, see LOG_IMPL macro
if (!need_log)
return;
if (skipped_similar_messages)
message.appendText(fmt::format(" (skipped {} similar messages)", skipped_similar_messages));
if (auto * channel = logger->getChannel())
channel->log(message);
}
void LogFrequencyLimiterIml::cleanup(time_t too_old_threshold_s)
{
time_t now = time(nullptr);
time_t old = now - too_old_threshold_s;
std::lock_guard lock(mutex);
std::erase_if(logged_messages, [old](const auto & elem) { return elem.second.first < old; });
last_cleanup = now;
}

View File

@ -1,6 +1,11 @@
#pragma once
#include <base/defines.h>
#include <base/types.h>
#include <fmt/format.h>
#include <mutex>
#include <unordered_map>
#include <Poco/Logger.h>
#include <Poco/Message.h>
struct PreformattedMessage;
consteval void formatStringCheckArgsNumImpl(std::string_view str, size_t nargs);
@ -156,3 +161,59 @@ struct CheckArgsNumHelperImpl
template <typename... Args> using CheckArgsNumHelper = CheckArgsNumHelperImpl<std::type_identity_t<Args>...>;
template <typename... Args> void formatStringCheckArgsNum(CheckArgsNumHelper<Args...>, Args &&...) {}
/// This wrapper helps to avoid too frequent and noisy log messages.
/// For each pair (logger_name, format_string) it remembers when such a message was logged the last time.
/// The message will not be logged again if less than min_interval_s seconds passed since the previously logged message.
class LogFrequencyLimiterIml
{
/// Hash(logger_name, format_string) -> (last_logged_time_s, skipped_messages_count)
static std::unordered_map<UInt64, std::pair<time_t, size_t>> logged_messages;
static time_t last_cleanup;
static std::mutex mutex;
Poco::Logger * logger;
time_t min_interval_s;
public:
LogFrequencyLimiterIml(Poco::Logger * logger_, time_t min_interval_s_) : logger(logger_), min_interval_s(min_interval_s_) {}
LogFrequencyLimiterIml & operator -> () { return *this; }
bool is(Poco::Message::Priority priority) { return logger->is(priority); }
LogFrequencyLimiterIml * getChannel() {return this; }
const String & name() const { return logger->name(); }
void log(Poco::Message & message);
/// Clears messages that were logged last time more than too_old_threshold_s seconds ago
static void cleanup(time_t too_old_threshold_s = 600);
Poco::Logger * getLogger() { return logger; }
};
/// This wrapper is useful to save formatted message into a String before sending it to a logger
class LogToStrImpl
{
String & out_str;
Poco::Logger * logger;
std::unique_ptr<LogFrequencyLimiterIml> maybe_nested;
bool propagate_to_actual_log = true;
public:
LogToStrImpl(String & out_str_, Poco::Logger * logger_) : out_str(out_str_), logger(logger_) {}
LogToStrImpl(String & out_str_, std::unique_ptr<LogFrequencyLimiterIml> && maybe_nested_)
: out_str(out_str_), logger(maybe_nested_->getLogger()), maybe_nested(std::move(maybe_nested_)) {}
LogToStrImpl & operator -> () { return *this; }
bool is(Poco::Message::Priority priority) { propagate_to_actual_log &= logger->is(priority); return true; }
LogToStrImpl * getChannel() {return this; }
const String & name() const { return logger->name(); }
void log(Poco::Message & message)
{
out_str = message.getText();
if (!propagate_to_actual_log)
return;
if (maybe_nested)
maybe_nested->log(message);
else if (auto * channel = logger->getChannel())
channel->log(message);
}
};

View File

@ -166,6 +166,8 @@
\
M(WaitMarksLoadMicroseconds, "Time spent loading marks") \
M(BackgroundLoadingMarksTasks, "Number of background tasks for loading marks") \
M(LoadedMarksCount, "Number of marks loaded (total across columns).") \
M(LoadedMarksMemoryBytes, "Size of in-memory representations of loaded marks.") \
\
M(Merge, "Number of launched background merges.") \
M(MergedRows, "Rows read for background merges. This is the number of rows before merge.") \

View File

@ -10,35 +10,16 @@
namespace Poco { class Logger; }
/// This wrapper is useful to save formatted message into a String before sending it to a logger
class LogToStrImpl
{
String & out_str;
Poco::Logger * logger;
bool propagate_to_actual_log = true;
public:
LogToStrImpl(String & out_str_, Poco::Logger * logger_) : out_str(out_str_) , logger(logger_) {}
LogToStrImpl & operator -> () { return *this; }
bool is(Poco::Message::Priority priority) { propagate_to_actual_log &= logger->is(priority); return true; }
LogToStrImpl * getChannel() {return this; }
const String & name() const { return logger->name(); }
void log(const Poco::Message & message)
{
out_str = message.getText();
if (!propagate_to_actual_log)
return;
if (auto * channel = logger->getChannel())
channel->log(message);
}
};
#define LogToStr(x, y) std::make_unique<LogToStrImpl>(x, y)
#define LogFrequencyLimiter(x, y) std::make_unique<LogFrequencyLimiterIml>(x, y)
namespace
{
[[maybe_unused]] const ::Poco::Logger * getLogger(const ::Poco::Logger * logger) { return logger; };
[[maybe_unused]] const ::Poco::Logger * getLogger(const std::atomic<::Poco::Logger *> & logger) { return logger.load(); };
[[maybe_unused]] std::unique_ptr<LogToStrImpl> getLogger(std::unique_ptr<LogToStrImpl> && logger) { return logger; };
[[maybe_unused]] std::unique_ptr<LogFrequencyLimiterIml> getLogger(std::unique_ptr<LogFrequencyLimiterIml> && logger) { return logger; };
}
#define LOG_IMPL_FIRST_ARG(X, ...) X

View File

@ -4,6 +4,15 @@
#include <gtest/gtest.h>
template <char ... symbols>
void test_find_first_not(const std::string & haystack, std::size_t expected_pos)
{
const char * begin = haystack.data();
const char * end = haystack.data() + haystack.size();
ASSERT_EQ(begin + expected_pos, find_first_not_symbols<symbols...>(begin, end));
}
TEST(FindSymbols, SimpleTest)
{
std::string s = "Hello, world! Goodbye...";
@ -36,3 +45,58 @@ TEST(FindSymbols, SimpleTest)
ASSERT_EQ(vals, (std::vector<std::string>{"s", "String"}));
}
}
TEST(FindNotSymbols, AllSymbolsPresent)
{
std::string str_with_17_bytes = "hello world hello";
std::string str_with_16_bytes = {str_with_17_bytes.begin(), str_with_17_bytes.end() - 1u};
std::string str_with_15_bytes = {str_with_16_bytes.begin(), str_with_16_bytes.end() - 1u};
/*
* The below variations will choose different implementation strategies:
* 1. Loop method only because it does not contain enough bytes for SSE 4.2
* 2. SSE4.2 only since string contains exactly 16 bytes
* 3. SSE4.2 + Loop method will take place because only first 16 bytes are treated by SSE 4.2 and remaining bytes is treated by loop
*
* Below code asserts that all calls return the ::end of the input string. This was not true prior to this fix as mentioned in PR #47304
* */
test_find_first_not<'h', 'e', 'l', 'o', 'w', 'r', 'd', ' '>(str_with_15_bytes, str_with_15_bytes.size());
test_find_first_not<'h', 'e', 'l', 'o', 'w', 'r', 'd', ' '>(str_with_16_bytes, str_with_16_bytes.size());
test_find_first_not<'h', 'e', 'l', 'o', 'w', 'r', 'd', ' '>(str_with_17_bytes, str_with_17_bytes.size());
}
TEST(FindNotSymbols, NoSymbolsMatch)
{
std::string s = "abcdefg";
// begin should be returned since the first character of the string does not match any of the below symbols
test_find_first_not<'h', 'i', 'j'>(s, 0u);
}
TEST(FindNotSymbols, ExtraSymbols)
{
std::string s = "hello_world_hello";
test_find_first_not<'h', 'e', 'l', 'o', ' '>(s, 5u);
}
TEST(FindNotSymbols, EmptyString)
{
std::string s;
test_find_first_not<'h', 'e', 'l', 'o', 'w', 'r', 'd', ' '>(s, s.size());
}
TEST(FindNotSymbols, SingleChar)
{
std::string s = "a";
test_find_first_not<'a'>(s, s.size());
}
TEST(FindNotSymbols, NullCharacter)
{
// special test to ensure only the passed template arguments are used as needles
// since current find_first_symbols implementation takes in 16 characters and defaults
// to \0.
std::string s("abcdefg\0x", 9u);
test_find_first_not<'a', 'b', 'c', 'd', 'e', 'f', 'g'>(s, 7u);
}

View File

@ -800,6 +800,7 @@ class IColumn;
M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \
M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \
M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \
M(UInt64, input_format_parquet_max_block_size, 8192, "Max block size for parquet reader.", 0) \
M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \
M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \
M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \

View File

@ -117,6 +117,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference;
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array;
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method;
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
format_settings.pretty.color = settings.output_format_pretty_color;

View File

@ -211,6 +211,7 @@ struct FormatSettings
std::unordered_set<int> skip_row_groups = {};
bool output_string_as_string = false;
bool output_fixed_string_as_fixed_byte_array = true;
UInt64 max_block_size = 8192;
ParquetVersion output_version;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
} parquet;

View File

@ -0,0 +1,103 @@
#include <Formats/MarkInCompressedFile.h>
#include <Common/BitHelpers.h>
namespace DB
{
// Write a range of bits in a bit-packed array.
// The array must be overallocated by one element.
// The bit range must be pre-filled with zeros.
void writeBits(UInt64 * dest, size_t bit_offset, UInt64 value)
{
size_t mod = bit_offset % 64;
dest[bit_offset / 64] |= value << mod;
if (mod)
dest[bit_offset / 64 + 1] |= value >> (64 - mod);
}
// The array must be overallocated by one element.
UInt64 readBits(const UInt64 * src, size_t bit_offset, size_t num_bits)
{
size_t mod = bit_offset % 64;
UInt64 value = src[bit_offset / 64] >> mod;
if (mod)
value |= src[bit_offset / 64 + 1] << (64 - mod);
return value & maskLowBits<UInt64>(num_bits);
}
MarksInCompressedFile::MarksInCompressedFile(const PlainArray & marks)
: num_marks(marks.size()), blocks((marks.size() + MARKS_PER_BLOCK - 1) / MARKS_PER_BLOCK, BlockInfo{})
{
if (num_marks == 0)
{
return;
}
// First pass: calculate layout of all blocks and total memory required.
size_t packed_bits = 0;
for (size_t block_idx = 0; block_idx < blocks.size(); ++block_idx)
{
BlockInfo & block = blocks[block_idx];
block.bit_offset_in_packed_array = packed_bits;
size_t max_x = 0;
size_t max_y = 0;
size_t num_marks_in_this_block = std::min(MARKS_PER_BLOCK, num_marks - block_idx * MARKS_PER_BLOCK);
for (size_t i = 0; i < num_marks_in_this_block; ++i)
{
const auto & mark = marks[block_idx * MARKS_PER_BLOCK + i];
block.min_x = std::min(block.min_x, mark.offset_in_compressed_file);
max_x = std::max(max_x, mark.offset_in_compressed_file);
block.min_y = std::min(block.min_y, mark.offset_in_decompressed_block);
max_y = std::max(max_y, mark.offset_in_decompressed_block);
block.trailing_zero_bits_in_y
= std::min(block.trailing_zero_bits_in_y, static_cast<UInt8>(getTrailingZeroBits(mark.offset_in_decompressed_block)));
}
block.bits_for_x = sizeof(size_t) * 8 - getLeadingZeroBits(max_x - block.min_x);
block.bits_for_y = sizeof(size_t) * 8 - getLeadingZeroBits((max_y - block.min_y) >> block.trailing_zero_bits_in_y);
packed_bits += num_marks_in_this_block * (block.bits_for_x + block.bits_for_y);
}
// Overallocate by +1 element to let the bit packing/unpacking do less bounds checking.
size_t packed_length = (packed_bits + 63) / 64 + 1;
packed.reserve_exact(packed_length);
packed.resize_fill(packed_length);
// Second pass: write out the packed marks.
for (size_t idx = 0; idx < num_marks; ++idx)
{
const auto & mark = marks[idx];
auto [block, offset] = lookUpMark(idx);
writeBits(packed.data(), offset, mark.offset_in_compressed_file - block->min_x);
writeBits(
packed.data(),
offset + block->bits_for_x,
(mark.offset_in_decompressed_block - block->min_y) >> block->trailing_zero_bits_in_y);
}
}
MarkInCompressedFile MarksInCompressedFile::get(size_t idx) const
{
auto [block, offset] = lookUpMark(idx);
size_t x = block->min_x + readBits(packed.data(), offset, block->bits_for_x);
size_t y = block->min_y + (readBits(packed.data(), offset + block->bits_for_x, block->bits_for_y) << block->trailing_zero_bits_in_y);
return MarkInCompressedFile{.offset_in_compressed_file = x, .offset_in_decompressed_block = y};
}
std::tuple<const MarksInCompressedFile::BlockInfo *, size_t> MarksInCompressedFile::lookUpMark(size_t idx) const
{
size_t block_idx = idx / MARKS_PER_BLOCK;
const BlockInfo & block = blocks[block_idx];
size_t offset = block.bit_offset_in_packed_array + (idx - block_idx * MARKS_PER_BLOCK) * (block.bits_for_x + block.bits_for_y);
return {&block, offset};
}
size_t MarksInCompressedFile::approximateMemoryUsage() const
{
return sizeof(*this) + blocks.size() * sizeof(blocks[0]) + packed.size() * sizeof(packed[0]);
}
}

View File

@ -2,8 +2,8 @@
#include <tuple>
#include <base/types.h>
#include <IO/WriteHelpers.h>
#include <base/types.h>
#include <Common/PODArray.h>
@ -23,15 +23,9 @@ struct MarkInCompressedFile
return std::tie(offset_in_compressed_file, offset_in_decompressed_block)
== std::tie(rhs.offset_in_compressed_file, rhs.offset_in_decompressed_block);
}
bool operator!=(const MarkInCompressedFile & rhs) const
{
return !(*this == rhs);
}
bool operator!=(const MarkInCompressedFile & rhs) const { return !(*this == rhs); }
auto asTuple() const
{
return std::make_tuple(offset_in_compressed_file, offset_in_decompressed_block);
}
auto asTuple() const { return std::make_tuple(offset_in_compressed_file, offset_in_decompressed_block); }
String toString() const
{
@ -40,20 +34,87 @@ struct MarkInCompressedFile
String toStringWithRows(size_t rows_num) const
{
return "(" + DB::toString(offset_in_compressed_file) + "," + DB::toString(offset_in_decompressed_block) + "," + DB::toString(rows_num) + ")";
return "(" + DB::toString(offset_in_compressed_file) + "," + DB::toString(offset_in_decompressed_block) + ","
+ DB::toString(rows_num) + ")";
}
};
class MarksInCompressedFile : public PODArray<MarkInCompressedFile>
/**
* In-memory representation of an array of marks.
*
* Uses an ad-hoc compression scheme that decreases memory usage while allowing
* random access in O(1) time.
* This is independent from the marks *file* format, which may be uncompressed
* or use a different compression method.
*
* Typical memory usage:
* * ~3 bytes/mark for integer columns
* * ~5 bytes/mark for string columns
* * ~0.3 bytes/mark for trivial marks in auxiliary dict files of LowCardinality columns
*/
class MarksInCompressedFile
{
public:
explicit MarksInCompressedFile(size_t n) : PODArray(n) {}
using PlainArray = PODArray<MarkInCompressedFile>;
void read(ReadBuffer & buffer, size_t from, size_t count)
MarksInCompressedFile(const PlainArray & marks);
MarkInCompressedFile get(size_t idx) const;
size_t approximateMemoryUsage() const;
private:
/** Throughout this class:
* * "x" stands for offset_in_compressed_file,
* * "y" stands for offset_in_decompressed_block.
*/
/** We need to store a sequence of marks, each consisting of two 64-bit integers:
* offset_in_compressed_file and offset_in_decompressed_block. We'll call them x and y for
* convenience, since compression doesn't care what they mean. The compression exploits the
* following regularities:
* * y is usually zero.
* * x usually increases steadily.
* * Differences between x values in nearby marks usually fit in much fewer than 64 bits.
*
* We split the sequence of marks into blocks, each containing MARKS_PER_BLOCK marks.
* (Not to be confused with data blocks.)
* For each mark, we store the difference [value] - [min value in the block], for each of the
* two values in the mark. Each block specifies the number of bits to use for these differences
* for all marks in this block.
* The smaller the blocks the fewer bits are required, but the bigger the relative overhead of
* block headers.
*
* Packed marks and block headers all live in one contiguous array.
*/
struct BlockInfo
{
buffer.readStrict(reinterpret_cast<char *>(data() + from), count * sizeof(MarkInCompressedFile));
}
// Min offset_in_compressed_file and offset_in_decompressed_block, correspondingly.
size_t min_x = UINT64_MAX;
size_t min_y = UINT64_MAX;
// Place in `packed` where this block start.
size_t bit_offset_in_packed_array;
// How many bits each mark takes. These numbers are bit-packed in the `packed` array.
// Can be zero. (Especially for y, which is typically all zeroes.)
UInt8 bits_for_x;
UInt8 bits_for_y;
// The `y` values should be <<'ed by this amount.
// Useful for integer columns when marks granularity is a power of 2; in this case all
// offset_in_decompressed_block values are divisible by 2^15 or so.
UInt8 trailing_zero_bits_in_y = 63;
};
static constexpr size_t MARKS_PER_BLOCK = 256;
size_t num_marks;
PODArray<BlockInfo> blocks;
PODArray<UInt64> packed;
// Mark idx -> {block info, bit offset in `packed`}.
std::tuple<const BlockInfo *, size_t> lookUpMark(size_t idx) const;
};
}

View File

@ -0,0 +1,52 @@
#include <random>
#include <gtest/gtest.h>
#include <Formats/MarkInCompressedFile.h>
using namespace DB;
TEST(Marks, Compression)
{
std::random_device dev;
std::mt19937 rng(dev());
auto gen = [&](size_t count, size_t max_x_increment, size_t max_y_increment)
{
size_t x = 0, y = 0;
PODArray<MarkInCompressedFile> plain(count);
for (int i = 0; i < count; ++i)
{
x += rng() % (max_x_increment + 1);
y += rng() % (max_y_increment + 1);
plain[i] = MarkInCompressedFile{.offset_in_compressed_file = x, .offset_in_decompressed_block = y};
}
return plain;
};
auto test = [](const PODArray<MarkInCompressedFile> & plain, size_t max_bits_per_mark)
{
PODArray<MarkInCompressedFile> copy;
copy.assign(plain); // paranoid in case next line mutates it
MarksInCompressedFile marks(copy);
for (size_t i = 0; i < plain.size(); ++i)
ASSERT_EQ(marks.get(i), plain[i]);
EXPECT_LE((marks.approximateMemoryUsage() - sizeof(MarksInCompressedFile)) * 8, plain.size() * max_bits_per_mark);
};
// Typical.
test(gen(10000, 1'000'000, 0), 30);
// Completely random 64-bit values.
test(gen(10000, UINT64_MAX - 1, UINT64_MAX - 1), 130);
// All zeros.
test(gen(10000, 0, 0), 2);
// Short.
test(gen(10, 1000, 1000), 65);
// Empty.
test(gen(0, 0, 0), 0);
}

View File

@ -53,6 +53,10 @@ struct ToDateImpl
{
static constexpr auto name = "toDate";
static inline UInt16 execute(const DecimalUtils::DecimalComponents<DateTime64> & t, const DateLUTImpl & time_zone)
{
return static_cast<UInt16>(time_zone.toDayNum(t.whole));
}
static inline UInt16 execute(Int64 t, const DateLUTImpl & time_zone)
{
return UInt16(time_zone.toDayNum(t));
@ -69,6 +73,10 @@ struct ToDateImpl
{
return d;
}
static inline DecimalUtils::DecimalComponents<DateTime64> executeExtendedResult(const DecimalUtils::DecimalComponents<DateTime64> & t, const DateLUTImpl & time_zone)
{
return {time_zone.toDayNum(t.whole), 0};
}
using FactorTransform = ZeroTransform;
};

View File

@ -46,36 +46,57 @@ public:
{
if constexpr (std::is_same_v<typename Transform::FactorTransform, ZeroTransform>)
return { .is_monotonic = true, .is_always_monotonic = true };
const IFunction::Monotonicity is_monotonic = { .is_monotonic = true };
const IFunction::Monotonicity is_not_monotonic;
const DateLUTImpl * date_lut = &DateLUT::instance();
if (const auto * timezone = dynamic_cast<const TimezoneMixin *>(&type))
date_lut = &timezone->getTimeZone();
if (left.isNull() || right.isNull())
return is_not_monotonic;
/// The function is monotonous on the [left, right] segment, if the factor transformation returns the same values for them.
if (checkAndGetDataType<DataTypeDate>(&type))
{
return Transform::FactorTransform::execute(UInt16(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt16(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else if (checkAndGetDataType<DataTypeDate32>(&type))
{
return Transform::FactorTransform::execute(Int32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(Int32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else
{
return Transform::FactorTransform::execute(UInt32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
const IFunction::Monotonicity is_monotonic = { .is_monotonic = true };
const IFunction::Monotonicity is_not_monotonic;
const DateLUTImpl * date_lut = &DateLUT::instance();
if (const auto * timezone = dynamic_cast<const TimezoneMixin *>(&type))
date_lut = &timezone->getTimeZone();
if (left.isNull() || right.isNull())
return is_not_monotonic;
const auto * type_ptr = &type;
if (const auto * nullable_type = checkAndGetDataType<DataTypeNullable>(type_ptr))
type_ptr = nullable_type->getNestedType().get();
/// The function is monotonous on the [left, right] segment, if the factor transformation returns the same values for them.
if (checkAndGetDataType<DataTypeDate>(type_ptr))
{
return Transform::FactorTransform::execute(UInt16(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt16(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else if (checkAndGetDataType<DataTypeDate32>(type_ptr))
{
return Transform::FactorTransform::execute(Int32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(Int32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else if (checkAndGetDataType<DataTypeDateTime>(type_ptr))
{
return Transform::FactorTransform::execute(UInt32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else
{
assert(checkAndGetDataType<DataTypeDateTime64>(type_ptr));
const auto & left_date_time = left.get<DateTime64>();
TransformDateTime64<typename Transform::FactorTransform> transformer_left(left_date_time.getScale());
const auto & right_date_time = right.get<DateTime64>();
TransformDateTime64<typename Transform::FactorTransform> transformer_right(right_date_time.getScale());
return transformer_left.execute(left_date_time.getValue(), *date_lut)
== transformer_right.execute(right_date_time.getValue(), *date_lut)
? is_monotonic : is_not_monotonic;
}
}
}

View File

@ -189,7 +189,7 @@ std::vector<JoinedElement> getTables(const ASTSelectQuery & select)
if (t.hasUsing())
{
if (has_using)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Multuple USING statements are not supported");
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Multiple USING statements are not supported");
has_using = true;
}

View File

@ -562,7 +562,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
{
/// Allow push down and other optimizations for VIEW: replace with subquery and rewrite it.
ASTPtr view_table;
NameToNameMap parameter_values;
NameToNameMap parameter_types;
if (view)
{
@ -575,14 +574,13 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// and after query is replaced, we use these parameters to substitute in the parameterized view query
if (query_info.is_parameterized_view)
{
parameter_values = analyzeFunctionParamValues(query_ptr);
view->setParameterValues(parameter_values);
parameter_types = view->getParameterValues();
query_info.parameterized_view_values = analyzeFunctionParamValues(query_ptr);
parameter_types = view->getParameterTypes();
}
view->replaceWithSubquery(getSelectQuery(), view_table, metadata_snapshot, view->isParameterizedView());
if (query_info.is_parameterized_view)
{
view->replaceQueryParametersIfParametrizedView(query_ptr);
view->replaceQueryParametersIfParametrizedView(query_ptr, query_info.parameterized_view_values);
}
}
@ -595,7 +593,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
required_result_column_names,
table_join,
query_info.is_parameterized_view,
parameter_values,
query_info.parameterized_view_values,
parameter_types);
@ -747,7 +745,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query_info.filter_asts.push_back(parallel_replicas_custom_filter_ast);
}
source_header = storage_snapshot->getSampleBlockForColumns(required_columns, parameter_values);
source_header = storage_snapshot->getSampleBlockForColumns(required_columns, query_info.parameterized_view_values);
}
/// Calculate structure of the result.

View File

@ -1,6 +1,11 @@
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSetQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExplainQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
namespace DB
{
@ -26,4 +31,59 @@ void InterpreterSetQuery::executeForCurrentContext()
getContext()->resetSettingsToDefaultValue(ast.default_settings);
}
static void applySettingsFromSelectWithUnion(const ASTSelectWithUnionQuery & select_with_union, ContextMutablePtr context)
{
const ASTs & children = select_with_union.list_of_selects->children;
if (children.empty())
return;
// We might have an arbitrarily complex UNION tree, so just give
// up if the last first-order child is not a plain SELECT.
// It is flattened later, when we process UNION ALL/DISTINCT.
const auto * last_select = children.back()->as<ASTSelectQuery>();
if (last_select && last_select->settings())
{
InterpreterSetQuery(last_select->settings(), context).executeForCurrentContext();
}
}
void InterpreterSetQuery::applySettingsFromQuery(const ASTPtr & ast, ContextMutablePtr context_)
{
if (!ast)
return;
if (const auto * select_query = ast->as<ASTSelectQuery>())
{
if (auto new_settings = select_query->settings())
InterpreterSetQuery(new_settings, context_).executeForCurrentContext();
}
else if (const auto * select_with_union_query = ast->as<ASTSelectWithUnionQuery>())
{
applySettingsFromSelectWithUnion(*select_with_union_query, context_);
}
else if (const auto * explain_query = ast->as<ASTExplainQuery>())
{
applySettingsFromQuery(explain_query->getExplainedQuery(), context_);
}
else if (const auto * query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
{
if (query_with_output->settings_ast)
InterpreterSetQuery(query_with_output->settings_ast, context_).executeForCurrentContext();
if (const auto * create_query = ast->as<ASTCreateQuery>())
{
if (create_query->select)
{
applySettingsFromSelectWithUnion(create_query->select->as<ASTSelectWithUnionQuery &>(), context_);
}
}
}
else if (auto * insert_query = ast->as<ASTInsertQuery>())
{
context_->setInsertFormat(insert_query->format);
if (insert_query->settings_ast)
InterpreterSetQuery(insert_query->settings_ast, context_).executeForCurrentContext();
}
}
}

View File

@ -27,6 +27,9 @@ public:
bool supportsTransactions() const override { return true; }
/// To apply SETTINGS clauses from query as early as possible
static void applySettingsFromQuery(const ASTPtr & ast, ContextMutablePtr context_);
private:
ASTPtr query_ptr;
};

View File

@ -119,7 +119,9 @@ void LogicalExpressionsOptimizer::collectDisjunctiveEqualityChains()
bool found_chain = false;
auto * function = to_node->as<ASTFunction>();
if (function && function->name == "or" && function->children.size() == 1)
/// Optimization does not respect aliases properly, which can lead to MULTIPLE_EXPRESSION_FOR_ALIAS error.
/// Disable it if an expression has an alias. Proper implementation is done with the new analyzer.
if (function && function->alias.empty() && function->name == "or" && function->children.size() == 1)
{
const auto * expression_list = function->children[0]->as<ASTExpressionList>();
if (expression_list)
@ -128,14 +130,14 @@ void LogicalExpressionsOptimizer::collectDisjunctiveEqualityChains()
for (const auto & child : expression_list->children)
{
auto * equals = child->as<ASTFunction>();
if (equals && equals->name == "equals" && equals->children.size() == 1)
if (equals && equals->alias.empty() && equals->name == "equals" && equals->children.size() == 1)
{
const auto * equals_expression_list = equals->children[0]->as<ASTExpressionList>();
if (equals_expression_list && equals_expression_list->children.size() == 2)
{
/// Equality expr = xN.
const auto * literal = equals_expression_list->children[1]->as<ASTLiteral>();
if (literal)
if (literal && literal->alias.empty())
{
auto expr_lhs = equals_expression_list->children[0]->getTreeHash();
OrWithExpression or_with_expression{function, expr_lhs, function->tryGetAlias()};
@ -230,6 +232,9 @@ bool LogicalExpressionsOptimizer::mayOptimizeDisjunctiveEqualityChain(const Disj
const auto & equalities = chain.second;
const auto & equality_functions = equalities.functions;
if (settings.optimize_min_equality_disjunction_chain_length == 0)
return false;
/// For LowCardinality column, the dict is usually smaller and the index is relatively large.
/// In most cases, merging OR-chain as IN is better than converting each LowCardinality into full column individually.
/// For non-LowCardinality, we need to eliminate too short chains.

View File

@ -115,7 +115,7 @@ struct TemporaryFileStream::OutputWriter
, out_compressed_buf(*out_buf)
, out_writer(out_compressed_buf, DBMS_TCP_PROTOCOL_VERSION, header_)
{
LOG_TEST(&Poco::Logger::get("TemporaryFileStream"), "Writing to {}", path);
LOG_TEST(&Poco::Logger::get("TemporaryFileStream"), "Writing to temporary file {}", path);
}
OutputWriter(std::unique_ptr<WriteBufferToFileSegment> out_buf_, const Block & header_)
@ -124,7 +124,7 @@ struct TemporaryFileStream::OutputWriter
, out_writer(out_compressed_buf, DBMS_TCP_PROTOCOL_VERSION, header_)
{
LOG_TEST(&Poco::Logger::get("TemporaryFileStream"),
"Writing to {}",
"Writing to temporary file {}",
static_cast<const WriteBufferToFileSegment *>(out_buf.get())->getFileName());
}

View File

@ -306,22 +306,6 @@ static void setQuerySpecificSettings(ASTPtr & ast, ContextMutablePtr context)
}
}
static void applySettingsFromSelectWithUnion(const ASTSelectWithUnionQuery & select_with_union, ContextMutablePtr context)
{
const ASTs & children = select_with_union.list_of_selects->children;
if (children.empty())
return;
// We might have an arbitrarily complex UNION tree, so just give
// up if the last first-order child is not a plain SELECT.
// It is flattened later, when we process UNION ALL/DISTINCT.
const auto * last_select = children.back()->as<ASTSelectQuery>();
if (last_select && last_select->settings())
{
InterpreterSetQuery(last_select->settings(), context).executeForCurrentContext();
}
}
static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
const char * begin,
const char * end,
@ -483,35 +467,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Interpret SETTINGS clauses as early as possible (before invoking the corresponding interpreter),
/// to allow settings to take effect.
if (const auto * select_query = ast->as<ASTSelectQuery>())
{
if (auto new_settings = select_query->settings())
InterpreterSetQuery(new_settings, context).executeForCurrentContext();
}
else if (const auto * select_with_union_query = ast->as<ASTSelectWithUnionQuery>())
{
applySettingsFromSelectWithUnion(*select_with_union_query, context);
}
else if (const auto * query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
{
if (query_with_output->settings_ast)
InterpreterSetQuery(query_with_output->settings_ast, context).executeForCurrentContext();
InterpreterSetQuery::applySettingsFromQuery(ast, context);
if (const auto * create_query = ast->as<ASTCreateQuery>())
{
if (create_query->select)
{
applySettingsFromSelectWithUnion(create_query->select->as<ASTSelectWithUnionQuery &>(), context);
}
}
}
else if (auto * insert_query = ast->as<ASTInsertQuery>())
{
context->setInsertFormat(insert_query->format);
if (insert_query->settings_ast)
InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext();
if (auto * insert_query = ast->as<ASTInsertQuery>())
insert_query->tail = istr;
}
setQuerySpecificSettings(ast, context);
@ -678,6 +637,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
}
bool can_use_query_cache =
settings.allow_experimental_query_cache && settings.use_query_cache
&& !ast->as<ASTExplainQuery>();
if (!async_insert)
{
/// We need to start the (implicit) transaction before getting the interpreter as this will get links to the latest snapshots
@ -757,7 +720,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto query_cache = context->getQueryCache();
bool read_result_from_query_cache = false; /// a query must not read from *and* write to the query cache at the same time
if (query_cache != nullptr
&& (settings.allow_experimental_query_cache && settings.use_query_cache && settings.enable_reads_from_query_cache)
&& (can_use_query_cache && settings.enable_reads_from_query_cache)
&& res.pipeline.pulling())
{
QueryCache::Key key(
@ -778,7 +741,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// then add a processor on top of the pipeline which stores the result in the query cache.
if (!read_result_from_query_cache
&& query_cache != nullptr
&& settings.allow_experimental_query_cache && settings.use_query_cache && settings.enable_writes_to_query_cache
&& can_use_query_cache && settings.enable_writes_to_query_cache
&& res.pipeline.pulling()
&& (!astContainsNonDeterministicFunctions(ast, context) || settings.query_cache_store_results_of_queries_with_nondeterministic_functions))
{
@ -946,8 +909,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto finish_callback = [elem,
context,
ast,
allow_experimental_query_cache = settings.allow_experimental_query_cache,
use_query_cache = settings.use_query_cache,
can_use_query_cache = can_use_query_cache,
enable_writes_to_query_cache = settings.enable_writes_to_query_cache,
query_cache_store_results_of_queries_with_nondeterministic_functions = settings.query_cache_store_results_of_queries_with_nondeterministic_functions,
log_queries,
@ -965,7 +927,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto query_cache = context->getQueryCache();
if (query_cache != nullptr
&& pulling_pipeline
&& allow_experimental_query_cache && use_query_cache && enable_writes_to_query_cache
&& can_use_query_cache && enable_writes_to_query_cache
&& (!astContainsNonDeterministicFunctions(ast, context) || query_cache_store_results_of_queries_with_nondeterministic_functions))
{
query_pipeline.finalizeWriteInQueryCache();

View File

@ -61,11 +61,17 @@ void addDefaultRequiredExpressionsRecursively(
RequiredSourceColumnsVisitor::Data columns_context;
RequiredSourceColumnsVisitor(columns_context).visit(column_default_expr);
NameSet required_columns_names = columns_context.requiredColumns();
auto required_type = std::make_shared<ASTLiteral>(columns.get(required_column_name).type->getName());
auto expr = makeASTFunction("_CAST", column_default_expr, std::make_shared<ASTLiteral>(columns.get(required_column_name).type->getName()));
auto expr = makeASTFunction("_CAST", column_default_expr, required_type);
if (is_column_in_query && convert_null_to_default)
{
expr = makeASTFunction("ifNull", std::make_shared<ASTIdentifier>(required_column_name), std::move(expr));
/// ifNull does not respect LowCardinality.
/// It may be fixed later or re-implemented properly for identical types.
expr = makeASTFunction("_CAST", std::move(expr), required_type);
}
default_expr_list_accum->children.emplace_back(setAlias(expr, required_column_name));
added_columns.emplace(required_column_name);

View File

@ -93,7 +93,7 @@ static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr<arrow::Ch
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
const auto * raw_data = reinterpret_cast<const NumericType *>(buffer->data());
const auto * raw_data = reinterpret_cast<const NumericType *>(buffer->data()) + chunk->offset();
column_data.insert_assume_reserved(raw_data, raw_data + chunk->length());
}
return {std::move(internal_column), std::move(internal_type), column_name};
@ -159,8 +159,8 @@ static ColumnWithTypeAndName readColumnWithFixedStringData(std::shared_ptr<arrow
for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
{
arrow::FixedSizeBinaryArray & chunk = dynamic_cast<arrow::FixedSizeBinaryArray &>(*(arrow_column->chunk(chunk_i)));
std::shared_ptr<arrow::Buffer> buffer = chunk.values();
column_chars_t.insert_assume_reserved(buffer->data(), buffer->data() + buffer->size());
const uint8_t * raw_data = chunk.raw_values();
column_chars_t.insert_assume_reserved(raw_data, raw_data + fixed_len * chunk.length());
}
return {std::move(internal_column), std::move(internal_type), column_name};
}
@ -178,9 +178,6 @@ static ColumnWithTypeAndName readColumnWithBooleanData(std::shared_ptr<arrow::Ch
if (chunk.length() == 0)
continue;
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk.data()->buffers[1];
for (size_t bool_i = 0; bool_i != static_cast<size_t>(chunk.length()); ++bool_i)
column_data.emplace_back(chunk.Value(bool_i));
}
@ -402,7 +399,7 @@ static ColumnWithTypeAndName readColumnWithIndexesDataImpl(std::shared_ptr<arrow
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
const auto * data = reinterpret_cast<const NumericType *>(buffer->data());
const auto * data = reinterpret_cast<const NumericType *>(buffer->data()) + chunk->offset();
/// Check that indexes are correct (protection against corrupted files)
/// Note that on null values index can be arbitrary value.
@ -554,8 +551,7 @@ static ColumnWithTypeAndName readIPv6ColumnFromBinaryData(std::shared_ptr<arrow:
for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
{
auto & chunk = dynamic_cast<arrow::BinaryArray &>(*(arrow_column->chunk(chunk_i)));
std::shared_ptr<arrow::Buffer> buffer = chunk.value_data();
const auto * raw_data = reinterpret_cast<const IPv6 *>(buffer->data());
const auto * raw_data = reinterpret_cast<const IPv6 *>(chunk.raw_data() + chunk.raw_value_offsets()[0]);
data.insert_assume_reserved(raw_data, raw_data + chunk.length());
}
return {std::move(internal_column), std::move(internal_type), column_name};

View File

@ -45,38 +45,42 @@ Chunk ParquetBlockInputFormat::generate()
block_missing_values.clear();
if (!file_reader)
{
prepareReader();
file_reader->set_batch_size(format_settings.parquet.max_block_size);
std::vector<int> row_group_indices;
for (int i = 0; i < row_group_total; ++i)
{
if (!skip_row_groups.contains(i))
row_group_indices.emplace_back(i);
}
auto read_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, &current_record_batch_reader);
if (!read_status.ok())
throw DB::ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString());
}
if (is_stopped)
return {};
while (row_group_current < row_group_total && skip_row_groups.contains(row_group_current))
++row_group_current;
if (row_group_current >= row_group_total)
return res;
std::shared_ptr<arrow::Table> table;
std::unique_ptr<::arrow::RecordBatchReader> rbr;
std::vector<int> row_group_indices { row_group_current };
arrow::Status get_batch_reader_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, &rbr);
if (!get_batch_reader_status.ok())
auto batch = current_record_batch_reader->Next();
if (!batch.ok())
{
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}",
get_batch_reader_status.ToString());
batch.status().ToString());
}
if (*batch)
{
auto tmp_table = arrow::Table::FromRecordBatches({*batch});
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr;
arrow_column_to_ch_column->arrowTableToCHChunk(res, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr);
}
else
{
return {};
}
arrow::Status read_status = rbr->ReadAll(&table);
if (!read_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString());
++row_group_current;
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr;
arrow_column_to_ch_column->arrowTableToCHChunk(res, table, table->num_rows(), block_missing_values_ptr);
return res;
}
@ -85,6 +89,7 @@ void ParquetBlockInputFormat::resetParser()
IInputFormat::resetParser();
file_reader.reset();
current_record_batch_reader.reset();
column_indices.clear();
row_group_current = 0;
block_missing_values.clear();

View File

@ -40,8 +40,10 @@ void FillingStep::transformPipeline(QueryPipelineBuilder & pipeline, const Build
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
bool on_totals = stream_type == QueryPipelineBuilder::StreamType::Totals;
return std::make_shared<FillingTransform>(header, sort_description, std::move(interpolate_description), on_totals);
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
return std::make_shared<FillingNoopTransform>(header, sort_description);
return std::make_shared<FillingTransform>(header, sort_description, std::move(interpolate_description));
});
}

View File

@ -169,17 +169,13 @@ static bool tryConvertFields(FillColumnDescription & descr, const DataTypePtr &
}
FillingTransform::FillingTransform(
const Block & header_, const SortDescription & sort_description_, InterpolateDescriptionPtr interpolate_description_, bool on_totals_)
const Block & header_, const SortDescription & sort_description_, InterpolateDescriptionPtr interpolate_description_)
: ISimpleTransform(header_, transformHeader(header_, sort_description_), true)
, sort_description(sort_description_)
, interpolate_description(interpolate_description_)
, on_totals(on_totals_)
, filling_row(sort_description_)
, next_row(sort_description_)
{
if (on_totals)
return;
if (interpolate_description)
interpolate_actions = std::make_shared<ExpressionActions>(interpolate_description->actions);
@ -239,7 +235,7 @@ FillingTransform::FillingTransform(
IProcessor::Status FillingTransform::prepare()
{
if (!on_totals && input.isFinished() && !output.isFinished() && !has_input && !generate_suffix)
if (input.isFinished() && !output.isFinished() && !has_input && !generate_suffix)
{
should_insert_first = next_row < filling_row || first;
@ -266,9 +262,6 @@ IProcessor::Status FillingTransform::prepare()
void FillingTransform::transform(Chunk & chunk)
{
if (on_totals)
return;
if (!chunk.hasRows() && !generate_suffix)
return;

View File

@ -16,7 +16,7 @@ namespace DB
class FillingTransform : public ISimpleTransform
{
public:
FillingTransform(const Block & header_, const SortDescription & sort_description_, InterpolateDescriptionPtr interpolate_description_, bool on_totals_);
FillingTransform(const Block & header_, const SortDescription & sort_description_, InterpolateDescriptionPtr interpolate_description_);
String getName() const override { return "FillingTransform"; }
@ -33,7 +33,6 @@ private:
const SortDescription sort_description; /// Contains only columns with WITH FILL.
const InterpolateDescriptionPtr interpolate_description; /// Contains INTERPOLATE columns
const bool on_totals; /// FillingTransform does nothing on totals.
FillingRow filling_row; /// Current row, which is used to fill gaps.
FillingRow next_row; /// Row to which we need to generate filling rows.
@ -53,4 +52,16 @@ private:
bool should_insert_first = false;
};
class FillingNoopTransform : public ISimpleTransform
{
public:
FillingNoopTransform(const Block & header, const SortDescription & sort_description_)
: ISimpleTransform(header, FillingTransform::transformHeader(header, sort_description_), true)
{
}
void transform(Chunk &) override {}
String getName() const override { return "FillingNoopTransform"; }
};
}

View File

@ -205,16 +205,10 @@ void DistributedAsyncInsertDirectoryQueue::run()
/// No errors while processing existing files.
/// Let's see maybe there are more files to process.
do_sleep = false;
std::lock_guard status_lock(status_mutex);
status.last_exception = std::exception_ptr{};
}
catch (...)
{
std::lock_guard status_lock(status_mutex);
do_sleep = true;
++status.error_count;
tryLogCurrentException(getLoggerName().data());
UInt64 q = doubleToUInt64(std::exp2(status.error_count));
std::chrono::milliseconds new_sleep_time(default_sleep_time.count() * q);
@ -223,9 +217,7 @@ void DistributedAsyncInsertDirectoryQueue::run()
else
sleep_time = std::min(new_sleep_time, max_sleep_time);
tryLogCurrentException(getLoggerName().data());
status.last_exception = std::current_exception();
status.last_exception_time = std::chrono::system_clock::now();
do_sleep = true;
}
}
else
@ -393,6 +385,7 @@ void DistributedAsyncInsertDirectoryQueue::initializeFilesFromDisk()
}
}
void DistributedAsyncInsertDirectoryQueue::processFiles()
try
{
if (should_batch_inserts)
processFilesWithBatching();
@ -405,6 +398,19 @@ void DistributedAsyncInsertDirectoryQueue::processFiles()
while (pending_files.tryPop(current_file))
processFile(current_file);
}
std::lock_guard status_lock(status_mutex);
status.last_exception = std::exception_ptr{};
}
catch (...)
{
std::lock_guard status_lock(status_mutex);
++status.error_count;
status.last_exception = std::current_exception();
status.last_exception_time = std::chrono::system_clock::now();
throw;
}
void DistributedAsyncInsertDirectoryQueue::processFile(const std::string & file_path)

View File

@ -26,7 +26,7 @@ struct MarksWeightFunction
size_t operator()(const MarksInCompressedFile & marks) const
{
return marks.size() * sizeof(MarkInCompressedFile) + MARK_CACHE_OVERHEAD;
return marks.approximateMemoryUsage() + MARK_CACHE_OVERHEAD;
}
};

View File

@ -1947,9 +1947,9 @@ size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lif
{
if (temporary_parts.contains(basename))
{
/// Actually we don't rely on temporary_directories_lifetime when removing old temporaries directoties,
/// Actually we don't rely on temporary_directories_lifetime when removing old temporaries directories,
/// it's just an extra level of protection just in case we have a bug.
LOG_INFO(log, "{} is in use (by merge/mutation/INSERT) (consider increasing temporary_directories_lifetime setting)", full_path);
LOG_INFO(LogFrequencyLimiter(log, 10), "{} is in use (by merge/mutation/INSERT) (consider increasing temporary_directories_lifetime setting)", full_path);
continue;
}
else

View File

@ -1,13 +1,13 @@
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Common/setThreadName.h>
#include <Common/scope_guard_safe.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/ThreadPool.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <utility>
@ -15,6 +15,8 @@ namespace ProfileEvents
{
extern const Event WaitMarksLoadMicroseconds;
extern const Event BackgroundLoadingMarksTasks;
extern const Event LoadedMarksCount;
extern const Event LoadedMarksMemoryBytes;
}
namespace DB
@ -62,7 +64,7 @@ MergeTreeMarksLoader::~MergeTreeMarksLoader()
}
const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, size_t column_index)
MarkInCompressedFile MergeTreeMarksLoader::getMark(size_t row_index, size_t column_index)
{
if (!marks)
{
@ -87,7 +89,7 @@ const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, siz
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column index: {} is out of range [0, {})", column_index, columns_in_mark);
#endif
return (*marks)[row_index * columns_in_mark + column_index];
return marks->get(row_index * columns_in_mark + column_index);
}
@ -100,14 +102,17 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
size_t expected_uncompressed_size = mark_size * marks_count;
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
// We first read the marks into a temporary simple array, then compress them into a more compact
// representation.
PODArray<MarkInCompressedFile> plain_marks(marks_count * columns_in_mark); // temporary
if (!index_granularity_info.mark_type.compressed && expected_uncompressed_size != file_size)
throw Exception(
ErrorCodes::CORRUPTED_DATA,
"Bad size of marks file '{}': {}, must be: {}",
std::string(fs::path(data_part_storage->getFullPath()) / mrk_path),
file_size, expected_uncompressed_size);
file_size,
expected_uncompressed_size);
auto buffer = data_part_storage->readFile(mrk_path, read_settings.adjustBufferSize(file_size), file_size, std::nullopt);
std::unique_ptr<ReadBuffer> reader;
@ -119,12 +124,16 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
if (!index_granularity_info.mark_type.adaptive)
{
/// Read directly to marks.
reader->readStrict(reinterpret_cast<char *>(res->data()), expected_uncompressed_size);
reader->readStrict(reinterpret_cast<char *>(plain_marks.data()), expected_uncompressed_size);
if (!reader->eof())
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
throw Exception(
ErrorCodes::CANNOT_READ_ALL_DATA,
"Cannot read all marks from file {}, is eof: {}, buffer size: {}, file size: {}",
mrk_path, reader->eof(), reader->buffer().size(), file_size);
mrk_path,
reader->eof(),
reader->buffer().size(),
file_size);
}
else
{
@ -132,7 +141,8 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
size_t granularity;
while (!reader->eof())
{
res->read(*reader, i * columns_in_mark, columns_in_mark);
reader->readStrict(
reinterpret_cast<char *>(plain_marks.data() + i * columns_in_mark), columns_in_mark * sizeof(MarkInCompressedFile));
readIntBinary(granularity, *reader);
++i;
}
@ -141,7 +151,11 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all marks from file {}", mrk_path);
}
res->protect();
auto res = std::make_shared<MarksInCompressedFile>(plain_marks);
ProfileEvents::increment(ProfileEvents::LoadedMarksCount, marks_count * columns_in_mark);
ProfileEvents::increment(ProfileEvents::LoadedMarksMemoryBytes, res->approximateMemoryUsage());
return res;
}
@ -154,7 +168,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarks()
auto key = mark_cache->hash(fs::path(data_part_storage->getFullPath()) / mrk_path);
if (save_marks_in_cache)
{
auto callback = [this]{ return loadMarksImpl(); };
auto callback = [this] { return loadMarksImpl(); };
loaded_marks = mark_cache->getOrSet(key, callback);
}
else
@ -170,8 +184,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarks()
if (!loaded_marks)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Failed to load marks: {}",
(fs::path(data_part_storage->getFullPath()) / mrk_path).string());
ErrorCodes::LOGICAL_ERROR, "Failed to load marks: {}", (fs::path(data_part_storage->getFullPath()) / mrk_path).string());
}
return loaded_marks;
@ -179,11 +192,14 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarks()
std::future<MarkCache::MappedPtr> MergeTreeMarksLoader::loadMarksAsync()
{
return scheduleFromThreadPool<MarkCache::MappedPtr>([this]() -> MarkCache::MappedPtr
{
ProfileEvents::increment(ProfileEvents::BackgroundLoadingMarksTasks);
return loadMarks();
}, *load_marks_threadpool, "LoadMarksThread");
return scheduleFromThreadPool<MarkCache::MappedPtr>(
[this]() -> MarkCache::MappedPtr
{
ProfileEvents::increment(ProfileEvents::BackgroundLoadingMarksTasks);
return loadMarks();
},
*load_marks_threadpool,
"LoadMarksThread");
}
}

View File

@ -30,7 +30,7 @@ public:
~MergeTreeMarksLoader();
const MarkInCompressedFile & getMark(size_t row_index, size_t column_index = 0);
MarkInCompressedFile getMark(size_t row_index, size_t column_index = 0);
private:
DataPartStoragePtr data_part_storage;

View File

@ -1231,8 +1231,7 @@ bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry
"because it is not disjoint with part {} that is currently executing.";
/// This message can be too noisy, do not print it more than once per second
if (!(entry.last_postpone_time == time(nullptr) && entry.postpone_reason.ends_with("that is currently executing.")))
LOG_TEST(LogToStr(out_reason, log), fmt_string, entry.znode_name, new_part_name, future_part_elem.first);
LOG_TEST(LogToStr(out_reason, LogFrequencyLimiter(log, 5)), fmt_string, entry.znode_name, new_part_name, future_part_elem.first);
return true;
}
@ -1423,7 +1422,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {}"
" because source parts size ({}) is greater than the current maximum ({}).";
LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name,
LOG_DEBUG(LogToStr(out_postpone_reason, LogFrequencyLimiter(log, 5)), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name,
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
return false;

View File

@ -254,6 +254,7 @@ struct SelectQueryInfo
MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr;
bool is_parameterized_view = false;
NameToNameMap parameterized_view_values;
// If limit is not 0, that means it's a trivial limit query.
UInt64 limit = 0;

View File

@ -2951,7 +2951,8 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke
}
if (source_replica.empty())
throw Exception(ErrorCodes::ALL_REPLICAS_LOST, "All replicas are lost");
throw Exception(ErrorCodes::ALL_REPLICAS_LOST, "All replicas are lost. "
"See SYSTEM DROP REPLICA and SYSTEM RESTORE REPLICA queries, they may help");
if (is_new_replica)
LOG_INFO(log, "Will mimic {}", source_replica);

View File

@ -37,6 +37,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
namespace
{
@ -123,7 +124,7 @@ StorageView::StorageView(
NormalizeSelectWithUnionQueryVisitor{data}.visit(description.inner_query);
is_parameterized_view = query.isParameterizedView();
parameter_types = analyzeReceiveQueryParamsWithType(description.inner_query);
view_parameter_types = analyzeReceiveQueryParamsWithType(description.inner_query);
storage_metadata.setSelectQuery(description);
setInMemoryMetadata(storage_metadata);
}
@ -172,7 +173,7 @@ void StorageView::read(
query_plan.addStep(std::move(materializing));
/// And also convert to expected structure.
const auto & expected_header = storage_snapshot->getSampleBlockForColumns(column_names, parameter_values);
const auto & expected_header = storage_snapshot->getSampleBlockForColumns(column_names, query_info.parameterized_view_values);
const auto & header = query_plan.getCurrentDataStream().header;
const auto * select_with_union = current_inner_query->as<ASTSelectWithUnionQuery>();
@ -208,7 +209,7 @@ static ASTTableExpression * getFirstTableExpression(ASTSelectQuery & select_quer
return select_element->table_expression->as<ASTTableExpression>();
}
void StorageView::replaceQueryParametersIfParametrizedView(ASTPtr & outer_query)
void StorageView::replaceQueryParametersIfParametrizedView(ASTPtr & outer_query, const NameToNameMap & parameter_values)
{
ReplaceQueryParameterVisitor visitor(parameter_values);
visitor.visit(outer_query);
@ -266,7 +267,8 @@ String StorageView::replaceQueryParameterWithValue(const String & column_name, c
if ((pos = name.find(parameter.first)) != std::string::npos)
{
auto parameter_datatype_iterator = parameter_types.find(parameter.first);
if (parameter_datatype_iterator != parameter_types.end())
size_t parameter_end = pos + parameter.first.size();
if (parameter_datatype_iterator != parameter_types.end() && name.size() >= parameter_end && (name[parameter_end] == ',' || name[parameter_end] == ')'))
{
String parameter_name("_CAST(" + parameter.second + ", '" + parameter_datatype_iterator->second + "')");
name.replace(pos, parameter.first.size(), parameter_name);

View File

@ -35,7 +35,7 @@ public:
size_t max_block_size,
size_t num_streams) override;
void replaceQueryParametersIfParametrizedView(ASTPtr & outer_query);
static void replaceQueryParametersIfParametrizedView(ASTPtr & outer_query, const NameToNameMap & parameter_values);
static void replaceWithSubquery(ASTSelectQuery & select_query, ASTPtr & view_name, const StorageMetadataPtr & metadata_snapshot, const bool parameterized_view)
{
@ -47,20 +47,14 @@ public:
static String replaceQueryParameterWithValue (const String & column_name, const NameToNameMap & parameter_values, const NameToNameMap & parameter_types);
static String replaceValueWithQueryParameter (const String & column_name, const NameToNameMap & parameter_values);
void setParameterValues (NameToNameMap parameter_values_)
const NameToNameMap & getParameterTypes() const
{
parameter_values = parameter_values_;
}
NameToNameMap getParameterValues() const
{
return parameter_types;
return view_parameter_types;
}
protected:
bool is_parameterized_view;
NameToNameMap parameter_values;
NameToNameMap parameter_types;
NameToNameMap view_parameter_types;
};
}

View File

@ -11,6 +11,8 @@
#include <TableFunctions/registerTableFunctions.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Analyzer/TableFunctionNode.h>
#include <Interpreters/InterpreterSetQuery.h>
#include <Interpreters/Context.h>
namespace DB
{
@ -123,7 +125,10 @@ static Block executeMonoBlock(QueryPipeline & pipeline)
StoragePtr TableFunctionExplain::executeImpl(
const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
BlockIO blockio = getInterpreter(context).execute();
/// To support settings inside explain subquery.
auto mutable_context = Context::createCopy(context);
InterpreterSetQuery::applySettingsFromQuery(query, mutable_context);
BlockIO blockio = getInterpreter(mutable_context).execute();
Block block = executeMonoBlock(blockio.pipeline);
StorageID storage_id(getDatabaseName(), table_name);

View File

@ -1946,6 +1946,8 @@ def reportCoverage(args):
def reportLogStats(args):
clickhouse_execute(args, "SYSTEM FLUSH LOGS")
query = """
WITH
120 AS mins,
@ -2014,7 +2016,13 @@ def reportLogStats(args):
'Cyclic aliases', 'Detaching {}', 'Executing {}', 'Fire events: {}', 'Found part {}', 'Loaded queue',
'No sharding key', 'No tables', 'Query: {}', 'Removed', 'Removed part {}', 'Removing parts.',
'Request URI: {}', 'Sending part {}', 'Sent handshake', 'Starting {}', 'Will mimic {}', 'Writing to {}',
'dropIfEmpty', 'loadAll {}', '{} ({}:{})', '{} -> {}', '{} {}', '{}: {}'
'dropIfEmpty', 'loadAll {}', '{} ({}:{})', '{} -> {}', '{} {}', '{}: {}', 'Query was cancelled',
'Table {} already exists', '{}%', 'Cancelled merging parts', 'All replicas are lost',
'Cancelled mutating parts', 'Read object: {}', 'New segment: {}', 'Unknown geometry type {}',
'Table {} is not replicated', '{} {}.{} already exists', 'Attempt to read after eof',
'Replica {} already exists', 'Convert overflow', 'key must be a tuple', 'Division by zero',
'No part {} in committed state', 'Files set to {}', 'Bytes set to {}', 'Sharding key {} is not used',
'Cannot parse datetime', 'Bad get: has {}, requested {}', 'There is no {} in {}', 'Numeric overflow'
) AS known_short_messages
SELECT count() AS c, message_format_string, substr(any(message), 1, 120)
FROM system.text_log

View File

@ -2063,7 +2063,7 @@ def materialized_database_support_all_kinds_of_mysql_datatype(
# increment synchronization check
check_query(
clickhouse_node,
"SELECT v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v22, v23, v24, hex(v25), v26, v28, v29, v30, v32 FROM test_database_datatype.t1 FORMAT TSV",
"SELECT v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v22, v23, v24, hex(v25), v26, v28, v29, v30, v32 FROM test_database_datatype.t1 ORDER BY v1 FORMAT TSV",
"1\t1\t11\t9223372036854775807\t-1\t1\t11\t18446744073709551615\t-1.1\t1.1\t-1.111\t1.111\t1.1111\t2021-10-06\ttext\tvarchar\tBLOB\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786"
+ "\t2021\t3020399000000\t3020399000000\t00000000010100000000000000000000000000000000000000\t10\t1\t11\tvarbinary\tRED\n"
+ "2\t2\t22\t9223372036854775807\t-2\t2\t22\t18446744073709551615\t-2.2\t2.2\t-2.22\t2.222\t2.2222\t2021-10-07\ttext\tvarchar\tBLOB\t2021-10-07 18:32:57\t2021-10-07 18:32:57.482786\t2021-10-07 18:32:57\t2021-10-07 18:32:57.482786"

View File

@ -194,10 +194,12 @@ def test_drop_table(cluster):
)
node.query_with_retry(
"system sync replica test_drop_table",
settings={"receive_timeout": 10},
retry_count=5,
settings={"receive_timeout": 5},
sleep_time=5,
retry_count=10,
)
node2.query("drop table test_drop_table")
node2.query("drop table test_drop_table sync")
assert "1000\t499500\n" == node.query(
"select count(n), sum(n) from test_drop_table"
)
node.query("drop table test_drop_table sync")

View File

@ -58,6 +58,7 @@ select 'number of noisy messages', max2(count(), 10) from (select count() / (sel
select 'incorrect patterns', max2(countDistinct(message_format_string), 15) from (
select message_format_string, any(message) as any_message from logs
where message not like (replaceRegexpAll(message_format_string, '{[:.0-9dfx]*}', '%') as s)
and message not like (s || ' (skipped % similar messages)')
and message not like ('%Exception: '||s||'%') group by message_format_string
) where any_message not like '%Poco::Exception%';

View File

@ -1,5 +1,3 @@
SET allow_experimental_analyzer = 1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t2 (k UInt64, s String) ENGINE = Join(ANY, LEFT, k);
@ -18,6 +16,6 @@ SELECT k, js1.s, t2.s FROM (SELECT toUInt64(number / 3) AS k, sum(number) as s F
SELECT k, js1.s, t2.s FROM (SELECT number AS k, number AS s FROM system.numbers LIMIT 10) js1 ANY LEFT JOIN t2 ON js1.k == t2.k ORDER BY k;
SELECT k, t2.k, js1.s, t2.s FROM (SELECT number AS k, number AS s FROM system.numbers LIMIT 10) js1 ANY LEFT JOIN t2 ON js1.k == t2.k ORDER BY k;
SELECT k, js1.s, t2.s FROM (SELECT number AS k, number AS s FROM system.numbers LIMIT 10) js1 ANY LEFT JOIN t2 ON js1.k == t2.k OR js1.s == t2.k ORDER BY k; -- { serverError 264 }
SELECT k, js1.s, t2.s FROM (SELECT number AS k, number AS s FROM system.numbers LIMIT 10) js1 ANY LEFT JOIN t2 ON js1.k == t2.k OR js1.s == t2.k ORDER BY k; -- { serverError 48, 264 }
DROP TABLE t2;

View File

@ -1,5 +1,3 @@
SET allow_experimental_analyzer = 1;
SELECT [2, 1, 3] AS arr, arraySort(arr), arrayReverseSort(arr), arraySort(x -> -x, arr);
SELECT materialize([2, 1, 3]) AS arr, arraySort(arr), arrayReverseSort(arr), arraySort(x -> -x, arr);
@ -53,4 +51,4 @@ SELECT arrayPartialSort(2, [1,2,3], [1]); -- { serverError ILLEGAL_TYPE_OF_ARGUM
SELECT arrayPartialSort(2, [1,2,3], 3); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
SELECT arrayPartialSort(arraySort([1,2,3]), [1,2,3]); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
SELECT arrayMap(x -> range(x), [4, 1, 2, 3]) AS arr, 100 AS lim, arrayResize(arrayPartialSort(arrayPartialSort(lim, arr), arr), lim), arrayResize(arrayPartialReverseSort(lim, arr), lim), arrayResize(arrayPartialSort(x -> (-length(x)), lim, arr), lim); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
SELECT arrayPartialReverseSort(arraySort((x, y) -> y, [NULL, NULL], [NULL, NULL]), arr), arrayMap(x -> toString(x), [257, -9223372036854775807, 2, -2147483648, 2147483648, NULL, 65536, -2147483648, 2, 65535]) AS arr, NULL, 100 AS lim, 65536, arrayResize(arrayPartialSort(x -> reverse(x), lim, arr), lim) GROUP BY [NULL, 1023, -2, NULL, 255, '0', NULL, 9223372036854775806] WITH ROLLUP; -- { serverError NO_COMMON_TYPE }
SELECT arrayPartialReverseSort(arraySort((x, y) -> y, [NULL, NULL], [NULL, NULL]), arr), arrayMap(x -> toString(x), [257, -9223372036854775807, 2, -2147483648, 2147483648, NULL, 65536, -2147483648, 2, 65535]) AS arr, NULL, 100 AS lim, 65536, arrayResize(arrayPartialSort(x -> reverse(x), lim, arr), lim) GROUP BY [NULL, 1023, -2, NULL, 255, '0', NULL, 9223372036854775806] WITH ROLLUP; -- { serverError ILLEGAL_TYPE_OF_ARGUMENT, NO_COMMON_TYPE }

View File

@ -1,5 +1,3 @@
SET allow_experimental_analyzer = 1;
drop table IF EXISTS joinbug;
set allow_deprecated_syntax_for_merge_tree=1;
@ -38,7 +36,7 @@ SEMI LEFT JOIN joinbug_join using id2;
SELECT * FROM ( SELECT toUInt32(11) AS id2 ) AS js1 SEMI LEFT JOIN joinbug_join USING (id2);
-- can't convert right side in case on storage join
SELECT * FROM ( SELECT toInt64(11) AS id2 ) AS js1 SEMI LEFT JOIN joinbug_join USING (id2); -- { serverError 386 }
SELECT * FROM ( SELECT toInt64(11) AS id2 ) AS js1 SEMI LEFT JOIN joinbug_join USING (id2); -- { serverError 53, 386 }
DROP TABLE joinbug;
DROP TABLE joinbug_join;

View File

@ -1,5 +1,3 @@
SET allow_experimental_analyzer = 1;
SELECT number FROM numbers(10) LIMIT 0 + 1;
SELECT number FROM numbers(10) LIMIT 1 - 1;
SELECT number FROM numbers(10) LIMIT 2 - 1;
@ -11,7 +9,7 @@ SELECT number FROM numbers(10) LIMIT now(); -- { serverError 440 }
SELECT number FROM numbers(10) LIMIT today(); -- { serverError 440 }
SELECT number FROM numbers(10) LIMIT toUInt8('1');
SELECT number FROM numbers(10) LIMIT toFloat32('1');
SELECT number FROM numbers(10) LIMIT rand(); -- { serverError 440 }
SELECT number FROM numbers(10) LIMIT rand(); -- { serverError 36, 440 }
SELECT count() <= 1 FROM (SELECT number FROM numbers(10) LIMIT randConstant() % 2);

View File

@ -1,4 +1,3 @@
SET allow_experimental_analyzer = 1;
SET force_primary_key = 1;
DROP TABLE IF EXISTS samples;
@ -23,8 +22,8 @@ SELECT 'a' IN splitByChar('c', 'abcdef');
SELECT 'errors:';
-- non-constant expressions in the right side of IN
SELECT count() FROM samples WHERE 1 IN range(samples.value); -- { serverError 1 }
SELECT count() FROM samples WHERE 1 IN range(rand() % 1000); -- { serverError 1 }
SELECT count() FROM samples WHERE 1 IN range(samples.value); -- { serverError 1, 47 }
SELECT count() FROM samples WHERE 1 IN range(rand() % 1000); -- { serverError 1, 36 }
-- index is not used
SELECT count() FROM samples WHERE value IN range(3); -- { serverError 277 }
@ -32,4 +31,4 @@ SELECT count() FROM samples WHERE value IN range(3); -- { serverError 277 }
-- wrong type
SELECT 123 IN splitByChar('c', 'abcdef'); -- { serverError 53 }
DROP TABLE samples;
DROP TABLE samples;

View File

@ -1,3 +1 @@
SET allow_experimental_analyzer = 1;
SELECT 1 AS a, a + a AS b, b + b AS c, c + c AS d, d + d AS e, e + e AS f, f + f AS g, g + g AS h, h + h AS i, i + i AS j, j + j AS k, k + k AS l, l + l AS m, m + m AS n, n + n AS o, o + o AS p, p + p AS q, q + q AS r, r + r AS s, s + s AS t, t + t AS u, u + u AS v, v + v AS w, w + w AS x, x + x AS y, y + y AS z; -- { serverError 36 }
SELECT 1 AS a, a + a AS b, b + b AS c, c + c AS d, d + d AS e, e + e AS f, f + f AS g, g + g AS h, h + h AS i, i + i AS j, j + j AS k, k + k AS l, l + l AS m, m + m AS n, n + n AS o, o + o AS p, p + p AS q, q + q AS r, r + r AS s, s + s AS t, t + t AS u, u + u AS v, v + v AS w, w + w AS x, x + x AS y, y + y AS z; -- { serverError 36, 168 }

View File

@ -1,5 +1,3 @@
SET allow_experimental_analyzer = 1;
DROP TABLE IF EXISTS ints;
CREATE TABLE ints (i64 Int64, i32 Int32) ENGINE = Memory;
@ -12,6 +10,6 @@ SELECT '-';
SELECT * FROM ints l INNER JOIN ints r USING i64 ORDER BY l.i32, r.i32;
SELECT '-';
SELECT count() FROM ( SELECT [1], count(1) ) AS t1 ALL RIGHT JOIN ( SELECT number AS s FROM numbers(2) ) AS t2 USING (s); -- { serverError UNKNOWN_IDENTIFIER }
SELECT count() FROM ( SELECT [1], count(1) ) AS t1 ALL RIGHT JOIN ( SELECT number AS s FROM numbers(2) ) AS t2 USING (s); -- { serverError NOT_FOUND_COLUMN_IN_BLOCK, UNKNOWN_IDENTIFIER }
DROP TABLE ints;

View File

@ -85,7 +85,7 @@ SELECT a._shard_num, a.key, b.host_name, b.host_address IN ('::1', '127.0.0.1'),
FROM dist_1 a
JOIN system.clusters b
ON a._shard_num = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost'; -- { serverError 403 }
WHERE b.cluster = 'test_cluster_two_shards_localhost'; -- { serverError 47, 403 }
SELECT 'dist_3';
dist_3
SELECT * FROM dist_3;

View File

@ -2,7 +2,6 @@
-- make the order static
SET max_threads = 1;
SET allow_experimental_analyzer = 1;
DROP TABLE IF EXISTS mem1;
DROP TABLE IF EXISTS mem2;
@ -80,7 +79,7 @@ SELECT a._shard_num, a.key, b.host_name, b.host_address IN ('::1', '127.0.0.1'),
FROM dist_1 a
JOIN system.clusters b
ON a._shard_num = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost'; -- { serverError 403 }
WHERE b.cluster = 'test_cluster_two_shards_localhost'; -- { serverError 47, 403 }
SELECT 'dist_3';
SELECT * FROM dist_3;

View File

@ -1,5 +1,3 @@
SET allow_experimental_analyzer = 1;
DROP TABLE IF EXISTS part;
DROP TABLE IF EXISTS supplier;
DROP TABLE IF EXISTS partsupp;
@ -182,7 +180,7 @@ order by
n_name,
s_name,
p_partkey
limit 100; -- { serverError 1 }
limit 100; -- { serverError 1, 47 }
select 3;
select
@ -600,7 +598,7 @@ where
lineitem
where
l_partkey = p_partkey
); -- { serverError 1 }
); -- { serverError 1, 47 }
select 18;
select
@ -711,7 +709,7 @@ where
and s_nationkey = n_nationkey
and n_name = 'CANADA'
order by
s_name; -- { serverError 1 }
s_name; -- { serverError 1, 47 }
select 21, 'fail: exists, not exists'; -- TODO
-- select

View File

@ -1,13 +1,11 @@
-- Tags: zookeeper
SET allow_experimental_analyzer = 1;
create table rmt1 (d DateTime, n int) engine=ReplicatedMergeTree('/test/01165/{database}/rmt', '1') order by n partition by toYYYYMMDD(d);
create table rmt2 (d DateTime, n int) engine=ReplicatedMergeTree('/test/01165/{database}/rmt', '2') order by n partition by toYYYYMMDD(d);
system stop replicated sends rmt1;
insert into rmt1 values (now(), arrayJoin([1, 2])); -- { clientError 36 }
insert into rmt1(n) select * from system.numbers limit arrayJoin([1, 2]); -- { serverError 440 }
insert into rmt1(n) select * from system.numbers limit arrayJoin([1, 2]); -- { serverError 36, 440 }
insert into rmt1 values (now(), rand());
drop table rmt1;

View File

@ -1,10 +1,8 @@
SET allow_experimental_analyzer = 1;
SELECT 1 FROM (select 1 a) A JOIN (select 1 b) B ON (arrayJoin([1]) = B.b); -- { serverError 403 }
SELECT 1 FROM (select 1 a) A JOIN (select 1 b) B ON (A.a = arrayJoin([1])); -- { serverError 403 }
SELECT 1 FROM (select 1 a) A JOIN (select 1 b) B ON equals(a); -- { serverError 42 }
SELECT 1 FROM (select 1 a) A JOIN (select 1 b) B ON less(a); -- { serverError 42 }
SELECT 1 FROM (select 1 a) A JOIN (select 1 b) B ON equals(a); -- { serverError 42, 62 }
SELECT 1 FROM (select 1 a) A JOIN (select 1 b) B ON less(a); -- { serverError 42, 62 }
SELECT 1 FROM (select 1 a) A JOIN (select 1 b) B ON a = b AND a > b; -- { serverError 403 }
SELECT 1 FROM (select 1 a) A JOIN (select 1 b) B ON a = b AND a < b; -- { serverError 403 }

View File

@ -1,7 +1,5 @@
-- Tags: global
SET allow_experimental_analyzer = 1;
DROP TABLE IF EXISTS local_table;
DROP TABLE IF EXISTS dist_table;
@ -12,7 +10,7 @@ INSERT INTO local_table SELECT number AS id, toString(number) AS val FROM number
CREATE TABLE dist_table AS local_table
ENGINE = Distributed('test_cluster_two_shards_localhost', currentDatabase(), local_table);
SELECT uniq(d.val) FROM dist_table AS d GLOBAL LEFT JOIN numbers(100) AS t USING id; -- { serverError 47 }
SELECT uniq(d.val) FROM dist_table AS d GLOBAL LEFT JOIN numbers(100) AS t USING id; -- { serverError 47, 284 }
SELECT uniq(d.val) FROM dist_table AS d GLOBAL LEFT JOIN local_table AS t USING id;
DROP TABLE local_table;

View File

@ -1,3 +1 @@
SET allow_experimental_analyzer = 1;
SELECT arrayMap(x -> x * sum(x), range(10)); -- { serverError 10 }
SELECT arrayMap(x -> x * sum(x), range(10)); -- { serverError 10, 47 }

View File

@ -1,3 +1,5 @@
masked flush only
3,"default:*@127%2E0%2E0%2E1:9000,default:*@127%2E0%2E0%2E2:9000","AUTHENTICATION_FAILED",1
masked
3,"default:*@127%2E0%2E0%2E1:9000,default:*@127%2E0%2E0%2E2:9000","AUTHENTICATION_FAILED",1
no masking

View File

@ -9,6 +9,20 @@ drop table if exists dist_01555;
drop table if exists data_01555;
create table data_01555 (key Int) Engine=Null();
--
-- masked flush only
--
SELECT 'masked flush only';
create table dist_01555 (key Int) Engine=Distributed(test_cluster_with_incorrect_pw, currentDatabase(), data_01555, key);
system stop distributed sends dist_01555;
insert into dist_01555 values (1)(2);
-- since test_cluster_with_incorrect_pw contains incorrect password ignore error
system flush distributed dist_01555; -- { serverError 516 }
select length(splitByChar('*', data_path)), replaceRegexpOne(data_path, '^.*/([^/]*)/' , '\\1'), extract(last_exception, 'AUTHENTICATION_FAILED'), dateDiff('s', last_exception_time, now()) < 5 from system.distribution_queue where database = currentDatabase() and table = 'dist_01555' format CSV;
drop table dist_01555;
--
-- masked
--
@ -29,7 +43,6 @@ SELECT 'no masking';
create table dist_01555 (key Int) Engine=Distributed(test_shard_localhost, currentDatabase(), data_01555, key);
insert into dist_01555 values (1)(2);
-- since test_cluster_with_incorrect_pw contains incorrect password ignore error
system flush distributed dist_01555;
select length(splitByChar('*', data_path)), replaceRegexpOne(data_path, '^.*/([^/]*)/' , '\\1') from system.distribution_queue where database = currentDatabase() and table = 'dist_01555' format CSV;

View File

@ -1,7 +1,5 @@
SET allow_experimental_analyzer = 1;
drop table if exists tab;
create table tab (x UInt64, `arr.a` Array(UInt64), `arr.b` Array(UInt64)) engine = MergeTree order by x;
select x from tab array join arr prewhere x != 0 where arr; -- { serverError 43 }
select x from tab array join arr prewhere arr where x != 0; -- { serverError 43 }
select x from tab array join arr prewhere x != 0 where arr; -- { serverError 43, 47 }
select x from tab array join arr prewhere arr where x != 0; -- { serverError 43, 47 }
drop table if exists tab;

View File

@ -1,5 +1,3 @@
SET allow_experimental_analyzer = 1;
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
DROP TABLE IF EXISTS t2_nullable;
@ -72,8 +70,8 @@ SELECT * FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t1.id; -- { serverError
SELECT * FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t2.id; -- { serverError 403 }
SELECT * FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t1.id + 2; -- { serverError 403 }
SELECT * FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t2.id + 2; -- { serverError 403 }
SELECT * FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t1.key; -- { serverError 43 }
SELECT * FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t2.key; -- { serverError 43 }
SELECT * FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t1.key; -- { serverError 43, 403 }
SELECT * FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t2.key; -- { serverError 43, 403 }
SELECT * FROM t1 JOIN t2 ON t2.key == t2.key2 AND (t1.id == t2.id OR isNull(t2.key2)); -- { serverError 403 }
SELECT * FROM t1 JOIN t2 ON t2.key == t2.key2 OR t1.id == t2.id; -- { serverError 403 }
SELECT * FROM t1 JOIN t2 ON (t2.key == t2.key2 AND (t1.key == t1.key2 AND t1.key != 'XXX' OR t1.id == t2.id)) AND t1.id == t2.id; -- { serverError 403 }

View File

@ -1,5 +1,3 @@
SET allow_experimental_analyzer = 1;
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
DROP TABLE IF EXISTS t2_nullable;
@ -70,8 +68,8 @@ SELECT * FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t1.id; -- { serverError
SELECT * FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t2.id; -- { serverError 403 }
SELECT * FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t1.id + 2; -- { serverError 403 }
SELECT * FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t2.id + 2; -- { serverError 403 }
SELECT * FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t1.key; -- { serverError 43 }
SELECT * FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t2.key; -- { serverError 43 }
SELECT * FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t1.key; -- { serverError 43, 403 }
SELECT * FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t2.key; -- { serverError 43, 403 }
SELECT * FROM t1 JOIN t2 ON t2.key == t2.key2 AND (t1.id == t2.id OR isNull(t2.key2)); -- { serverError 403 }
SELECT * FROM t1 JOIN t2 ON t2.key == t2.key2 OR t1.id == t2.id; -- { serverError 403 }
SELECT * FROM t1 JOIN t2 ON (t2.key == t2.key2 AND (t1.key == t1.key2 AND t1.key != 'XXX' OR t1.id == t2.id)) AND t1.id == t2.id; -- { serverError 403 }

View File

@ -1,12 +1,10 @@
SET allow_experimental_analyzer = 1;
SELECT groupArray(2 + 3)(number) FROM numbers(10);
SELECT groupArray('5'::UInt8)(number) FROM numbers(10);
SELECT groupArray(NULL)(number) FROM numbers(10); -- { serverError 36 }
SELECT groupArray(NULL + NULL)(number) FROM numbers(10); -- { serverError 36 }
SELECT groupArray([])(number) FROM numbers(10); -- { serverError 36 }
SELECT groupArray(throwIf(1))(number) FROM numbers(10); -- { serverError 36 }
SELECT groupArray(throwIf(1))(number) FROM numbers(10); -- { serverError 36, 134 }
-- Not the best error message, can be improved.
SELECT groupArray(number)(number) FROM numbers(10); -- { serverError 36 }
SELECT groupArray(number)(number) FROM numbers(10); -- { serverError 36, 47 }

View File

@ -1,9 +1,7 @@
-- Tags: global
SET allow_experimental_analyzer = 1;
SELECT
cityHash64(number GLOBAL IN (NULL, -2147483648, -9223372036854775808), nan, 1024, NULL, NULL, 1.000100016593933, NULL),
(NULL, cityHash64(inf, -2147483648, NULL, NULL, 10.000100135803223), cityHash64(1.1754943508222875e-38, NULL, NULL, NULL), 2147483647)
FROM cluster(test_cluster_two_shards_localhost, numbers((NULL, cityHash64(0., 65536, NULL, NULL, 10000000000., NULL), 0) GLOBAL IN (some_identifier), 65536))
WHERE number GLOBAL IN [1025] --{serverError 36}
WHERE number GLOBAL IN [1025] --{serverError 36, 284}

View File

@ -1,5 +1,3 @@
SET allow_experimental_analyzer = 1;
DROP TABLE IF EXISTS t_tuple_element;
CREATE TABLE t_tuple_element(t1 Tuple(a UInt32, s String), t2 Tuple(UInt32, String)) ENGINE = Memory;
@ -18,7 +16,7 @@ EXPLAIN SYNTAX SELECT tupleElement(t1, 'a') FROM t_tuple_element;
SELECT tupleElement(number, 1) FROM numbers(1); -- { serverError 43 }
SELECT tupleElement(t1) FROM t_tuple_element; -- { serverError 42 }
SELECT tupleElement(t1, 'b') FROM t_tuple_element; -- { serverError 10 }
SELECT tupleElement(t1, 'b') FROM t_tuple_element; -- { serverError 10, 47 }
SELECT tupleElement(t1, 0) FROM t_tuple_element; -- { serverError 127 }
SELECT tupleElement(t1, 3) FROM t_tuple_element; -- { serverError 127 }
SELECT tupleElement(t1, materialize('a')) FROM t_tuple_element; -- { serverError 43 }
@ -30,7 +28,7 @@ SELECT tupleElement(t2, 1) FROM t_tuple_element;
EXPLAIN SYNTAX SELECT tupleElement(t2, 1) FROM t_tuple_element;
SELECT tupleElement(t2) FROM t_tuple_element; -- { serverError 42 }
SELECT tupleElement(t2, 'a') FROM t_tuple_element; -- { serverError 10 }
SELECT tupleElement(t2, 'a') FROM t_tuple_element; -- { serverError 10, 47 }
SELECT tupleElement(t2, 0) FROM t_tuple_element; -- { serverError 127 }
SELECT tupleElement(t2, 3) FROM t_tuple_element; -- { serverError 127 }
SELECT tupleElement(t2, materialize(1)) FROM t_tuple_element; -- { serverError 43 }

View File

@ -15,4 +15,4 @@ SELECT _shard_num AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system,
SELECT a._shard_num AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system, one) a GROUP BY shard_num;
2 1
1 1
SELECT _shard_num FROM remote('127.1', system.one) AS a INNER JOIN (SELECT _shard_num FROM system.one) AS b USING (dummy); -- { serverError UNSUPPORTED_METHOD }
SELECT _shard_num FROM remote('127.1', system.one) AS a INNER JOIN (SELECT _shard_num FROM system.one) AS b USING (dummy); -- { serverError UNSUPPORTED_METHOD, UNKNOWN_IDENTIFIER }

View File

@ -1,5 +1,3 @@
SET allow_experimental_analyzer = 1;
-- { echoOn }
SELECT shardNum() AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system, one) GROUP BY _shard_num;
@ -7,6 +5,6 @@ SELECT shardNum() AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system,
SELECT _shard_num AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system, one) GROUP BY _shard_num;
SELECT _shard_num AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system, one) GROUP BY shard_num;
SELECT a._shard_num AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system, one) a GROUP BY shard_num;
SELECT _shard_num FROM remote('127.1', system.one) AS a INNER JOIN (SELECT _shard_num FROM system.one) AS b USING (dummy); -- { serverError UNSUPPORTED_METHOD }
SELECT _shard_num FROM remote('127.1', system.one) AS a INNER JOIN (SELECT _shard_num FROM system.one) AS b USING (dummy); -- { serverError UNSUPPORTED_METHOD, UNKNOWN_IDENTIFIER }
-- { echoOff }

View File

@ -78,17 +78,17 @@ SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE (s1 LIKE \'hell%\') OR (s2 ILIKE \'%привет%\') OR (s1 ILIKE \'world%\')
SETTINGS optimize_or_like_chain = 1
SETTINGS optimize_or_like_chain = 1, allow_hyperscan = 0
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE (s1 LIKE \'hell%\') OR (s2 ILIKE \'%привет%\') OR (s1 ILIKE \'world%\')
SETTINGS optimize_or_like_chain = 1
SETTINGS optimize_or_like_chain = 1, max_hyperscan_regexp_length = 10
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE (s1 LIKE \'hell%\') OR (s2 ILIKE \'%привет%\') OR (s1 ILIKE \'world%\')
SETTINGS optimize_or_like_chain = 1
SETTINGS optimize_or_like_chain = 1, max_hyperscan_regexp_total_length = 10
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2

View File

@ -4,9 +4,9 @@ EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s WHERE (s LIKE 'hel
EXPLAIN QUERY TREE run_passes=1 SELECT materialize('Привет, World') AS s WHERE (s LIKE 'hell%') OR (s ILIKE '%привет%') OR (s ILIKE 'world%') SETTINGS optimize_or_like_chain = 1, allow_experimental_analyzer = 1;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s1, materialize('Привет, World') AS s2 WHERE (s1 LIKE 'hell%') OR (s2 ILIKE '%привет%') OR (s1 ILIKE 'world%') SETTINGS optimize_or_like_chain = 1;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s1, materialize('Привет, World') AS s2 WHERE (s1 LIKE 'hell%') OR (s2 ILIKE '%привет%') OR (s1 ILIKE 'world%') SETTINGS optimize_or_like_chain = 1 SETTINGS allow_hyperscan = 0;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s1, materialize('Привет, World') AS s2 WHERE (s1 LIKE 'hell%') OR (s2 ILIKE '%привет%') OR (s1 ILIKE 'world%') SETTINGS optimize_or_like_chain = 1 SETTINGS max_hyperscan_regexp_length = 10;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s1, materialize('Привет, World') AS s2 WHERE (s1 LIKE 'hell%') OR (s2 ILIKE '%привет%') OR (s1 ILIKE 'world%') SETTINGS optimize_or_like_chain = 1 SETTINGS max_hyperscan_regexp_total_length = 10;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s1, materialize('Привет, World') AS s2 WHERE (s1 LIKE 'hell%') OR (s2 ILIKE '%привет%') OR (s1 ILIKE 'world%') SETTINGS optimize_or_like_chain = 1, allow_hyperscan = 0;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s1, materialize('Привет, World') AS s2 WHERE (s1 LIKE 'hell%') OR (s2 ILIKE '%привет%') OR (s1 ILIKE 'world%') SETTINGS optimize_or_like_chain = 1, max_hyperscan_regexp_length = 10;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s1, materialize('Привет, World') AS s2 WHERE (s1 LIKE 'hell%') OR (s2 ILIKE '%привет%') OR (s1 ILIKE 'world%') SETTINGS optimize_or_like_chain = 1, max_hyperscan_regexp_total_length = 10;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s1, materialize('Привет, World') AS s2 WHERE (s1 LIKE 'hell%') OR (s2 ILIKE '%привет%') OR (s1 ILIKE 'world%') OR s1 == 'Привет' SETTINGS optimize_or_like_chain = 1;

View File

@ -1,5 +1,3 @@
SET allow_experimental_analyzer = 1;
# Test WITH FILL without INTERPOLATE
SELECT n, source, inter FROM (
SELECT toFloat32(number % 10) AS n, 'original' AS source, number as inter FROM numbers(10) WHERE number % 3 = 1
@ -38,7 +36,7 @@ SELECT n, source, inter FROM (
# Test INTERPOLATE with inconsistent column - should produce error
SELECT n, source, inter FROM (
SELECT toFloat32(number % 10) AS n, 'original' AS source, number as inter FROM numbers(10) WHERE number % 3 = 1
) ORDER BY n WITH FILL FROM 0 TO 11.51 STEP 0.5 INTERPOLATE (inter AS source); -- { serverError 32 }
) ORDER BY n WITH FILL FROM 0 TO 11.51 STEP 0.5 INTERPOLATE (inter AS source); -- { serverError 6, 32 }
# Test INTERPOLATE with aliased column
SELECT n, source, inter + 1 AS inter_p FROM (

View File

@ -35,3 +35,4 @@ ERROR
10
20
10
10

View File

@ -15,6 +15,7 @@ $CLICKHOUSE_CLIENT -q "DROP VIEW IF EXISTS test_02428_pv6"
$CLICKHOUSE_CLIENT -q "DROP VIEW IF EXISTS test_02428_pv7"
$CLICKHOUSE_CLIENT -q "DROP VIEW IF EXISTS test_02428_pv8"
$CLICKHOUSE_CLIENT -q "DROP VIEW IF EXISTS test_02428_pv9"
$CLICKHOUSE_CLIENT -q "DROP VIEW IF EXISTS test_02428_pv10"
$CLICKHOUSE_CLIENT -q "DROP VIEW IF EXISTS test_02428_v1"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test_02428_Catalog"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS ${CLICKHOUSE_TEST_UNIQUE_NAME}.pv1"
@ -83,6 +84,9 @@ $CLICKHOUSE_CLIENT -q "SELECT * FROM test_02428_pv8(prices=[10,20])"
$CLICKHOUSE_CLIENT -q "CREATE VIEW test_02428_pv9 AS SELECT Price FROM test_02428_Catalog WHERE Price IN (10,20) AND Quantity={quantity:UInt64} ORDER BY Price"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02428_pv9(quantity=3)"
$CLICKHOUSE_CLIENT -q "CREATE VIEW test_02428_pv10 AS SELECT Price FROM test_02428_Catalog WHERE Price={Pri:UInt64} ORDER BY Price"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02428_pv10(Pri=10)"
$CLICKHOUSE_CLIENT -q "DROP VIEW test_02428_pv1"
$CLICKHOUSE_CLIENT -q "DROP VIEW test_02428_pv2"
$CLICKHOUSE_CLIENT -q "DROP VIEW test_02428_pv3"
@ -91,6 +95,7 @@ $CLICKHOUSE_CLIENT -q "DROP VIEW test_02428_pv6"
$CLICKHOUSE_CLIENT -q "DROP VIEW test_02428_pv7"
$CLICKHOUSE_CLIENT -q "DROP VIEW test_02428_pv8"
$CLICKHOUSE_CLIENT -q "DROP VIEW test_02428_pv9"
$CLICKHOUSE_CLIENT -q "DROP VIEW test_02428_pv10"
$CLICKHOUSE_CLIENT -q "DROP VIEW test_02428_v1"
$CLICKHOUSE_CLIENT -q "DROP TABLE test_02428_Catalog"
$CLICKHOUSE_CLIENT -q "DROP TABLE ${CLICKHOUSE_TEST_UNIQUE_NAME}.pv1"

View File

@ -8,5 +8,3 @@ With merge replicated any part range
1
With merge replicated partition only
1
With merge partition only and new parts
3

View File

@ -0,0 +1,91 @@
#!/usr/bin/env bash
# Tags: long
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# Wait for number of parts in table $1 to become $2.
# Print the changed value. If no changes for $3 seconds, prints initial value.
wait_for_number_of_parts() {
for _ in `seq $3`
do
sleep 1
res=`$CLICKHOUSE_CLIENT -q "SELECT count(*) FROM system.parts WHERE database = currentDatabase() AND table='$1' AND active"`
if [ "$res" -eq "$2" ]
then
echo "$res"
return
fi
done
echo "$res"
}
$CLICKHOUSE_CLIENT -nmq "
DROP TABLE IF EXISTS test_without_merge;
DROP TABLE IF EXISTS test_with_merge;
DROP TABLE IF EXISTS test_replicated;
SELECT 'Without merge';
CREATE TABLE test_without_merge (i Int64) ENGINE = MergeTree ORDER BY i SETTINGS merge_selecting_sleep_ms=1000;
INSERT INTO test_without_merge SELECT 1;
INSERT INTO test_without_merge SELECT 2;
INSERT INTO test_without_merge SELECT 3;"
wait_for_number_of_parts 'test_without_merge' 1 10
$CLICKHOUSE_CLIENT -nmq "
DROP TABLE test_without_merge;
SELECT 'With merge any part range';
CREATE TABLE test_with_merge (i Int64) ENGINE = MergeTree ORDER BY i
SETTINGS min_age_to_force_merge_seconds=1, merge_selecting_sleep_ms=1000, min_age_to_force_merge_on_partition_only=false;
INSERT INTO test_with_merge SELECT 1;
INSERT INTO test_with_merge SELECT 2;
INSERT INTO test_with_merge SELECT 3;"
wait_for_number_of_parts 'test_with_merge' 1 100
$CLICKHOUSE_CLIENT -nmq "
DROP TABLE test_with_merge;
SELECT 'With merge partition only';
CREATE TABLE test_with_merge (i Int64) ENGINE = MergeTree ORDER BY i
SETTINGS min_age_to_force_merge_seconds=1, merge_selecting_sleep_ms=1000, min_age_to_force_merge_on_partition_only=true;
INSERT INTO test_with_merge SELECT 1;
INSERT INTO test_with_merge SELECT 2;
INSERT INTO test_with_merge SELECT 3;"
wait_for_number_of_parts 'test_with_merge' 1 100
$CLICKHOUSE_CLIENT -nmq "
DROP TABLE test_with_merge;
SELECT 'With merge replicated any part range';
CREATE TABLE test_replicated (i Int64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test02473', 'node') ORDER BY i
SETTINGS min_age_to_force_merge_seconds=1, merge_selecting_sleep_ms=1000, min_age_to_force_merge_on_partition_only=false;
INSERT INTO test_replicated SELECT 1;
INSERT INTO test_replicated SELECT 2;
INSERT INTO test_replicated SELECT 3;"
wait_for_number_of_parts 'test_replicated' 1 100
$CLICKHOUSE_CLIENT -nmq "
DROP TABLE test_replicated;
SELECT 'With merge replicated partition only';
CREATE TABLE test_replicated (i Int64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test02473_partition_only', 'node') ORDER BY i
SETTINGS min_age_to_force_merge_seconds=1, merge_selecting_sleep_ms=1000, min_age_to_force_merge_on_partition_only=true;
INSERT INTO test_replicated SELECT 1;
INSERT INTO test_replicated SELECT 2;
INSERT INTO test_replicated SELECT 3;"
wait_for_number_of_parts 'test_replicated' 1 100
$CLICKHOUSE_CLIENT -nmq "
DROP TABLE test_replicated;"

View File

@ -1,87 +0,0 @@
-- Tags: long
DROP TABLE IF EXISTS test_without_merge;
DROP TABLE IF EXISTS test_with_merge;
DROP TABLE IF EXISTS test_replicated;
SELECT 'Without merge';
CREATE TABLE test_without_merge (i Int64) ENGINE = MergeTree ORDER BY i;
INSERT INTO test_without_merge SELECT 1;
INSERT INTO test_without_merge SELECT 2;
INSERT INTO test_without_merge SELECT 3;
SELECT sleepEachRow(1) FROM numbers(9) FORMAT Null;
SELECT count(*) FROM system.parts WHERE database = currentDatabase() AND table='test_without_merge' AND active;
DROP TABLE test_without_merge;
SELECT 'With merge any part range';
CREATE TABLE test_with_merge (i Int64) ENGINE = MergeTree ORDER BY i
SETTINGS min_age_to_force_merge_seconds=3, min_age_to_force_merge_on_partition_only=false;
INSERT INTO test_with_merge SELECT 1;
INSERT INTO test_with_merge SELECT 2;
INSERT INTO test_with_merge SELECT 3;
SELECT sleepEachRow(1) FROM numbers(9) FORMAT Null;
SELECT count(*) FROM system.parts WHERE database = currentDatabase() AND table='test_with_merge' AND active;
DROP TABLE test_with_merge;
SELECT 'With merge partition only';
CREATE TABLE test_with_merge (i Int64) ENGINE = MergeTree ORDER BY i
SETTINGS min_age_to_force_merge_seconds=3, min_age_to_force_merge_on_partition_only=true;
INSERT INTO test_with_merge SELECT 1;
INSERT INTO test_with_merge SELECT 2;
INSERT INTO test_with_merge SELECT 3;
SELECT sleepEachRow(1) FROM numbers(9) FORMAT Null;
SELECT count(*) FROM system.parts WHERE database = currentDatabase() AND table='test_with_merge' AND active;
DROP TABLE test_with_merge;
SELECT 'With merge replicated any part range';
CREATE TABLE test_replicated (i Int64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test02473', 'node') ORDER BY i
SETTINGS min_age_to_force_merge_seconds=3, min_age_to_force_merge_on_partition_only=false;
INSERT INTO test_replicated SELECT 1;
INSERT INTO test_replicated SELECT 2;
INSERT INTO test_replicated SELECT 3;
SELECT sleepEachRow(1) FROM numbers(9) FORMAT Null;
SELECT count(*) FROM system.parts WHERE database = currentDatabase() AND table='test_replicated' AND active;
DROP TABLE test_replicated;
SELECT 'With merge replicated partition only';
CREATE TABLE test_replicated (i Int64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test02473_partition_only', 'node') ORDER BY i
SETTINGS min_age_to_force_merge_seconds=3, min_age_to_force_merge_on_partition_only=true;
INSERT INTO test_replicated SELECT 1;
INSERT INTO test_replicated SELECT 2;
INSERT INTO test_replicated SELECT 3;
SELECT sleepEachRow(1) FROM numbers(9) FORMAT Null;
SELECT count(*) FROM system.parts WHERE database = currentDatabase() AND table='test_replicated' AND active;
DROP TABLE test_replicated;
SELECT 'With merge partition only and new parts';
CREATE TABLE test_with_merge (i Int64) ENGINE = MergeTree ORDER BY i
SETTINGS min_age_to_force_merge_seconds=3, min_age_to_force_merge_on_partition_only=true;
SYSTEM STOP MERGES test_with_merge;
-- These three parts will have min_age=6 at the time of merge
INSERT INTO test_with_merge SELECT 1;
INSERT INTO test_with_merge SELECT 2;
SELECT sleepEachRow(1) FROM numbers(9) FORMAT Null;
-- These three parts will have min_age=0 at the time of merge
-- and so, nothing will be merged.
INSERT INTO test_with_merge SELECT 3;
SYSTEM START MERGES test_with_merge;
SELECT count(*) FROM system.parts WHERE database = currentDatabase() AND table='test_with_merge' AND active;
DROP TABLE test_with_merge;

View File

@ -0,0 +1 @@
22 0 1

View File

@ -0,0 +1,15 @@
DROP TABLE IF EXISTS 02680_datetime64_monotonic_check;
CREATE TABLE 02680_datetime64_monotonic_check (`t` DateTime64(3), `x` Nullable(Decimal(18, 14)))
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(t)
ORDER BY x SETTINGS allow_nullable_key = 1;
INSERT INTO 02680_datetime64_monotonic_check VALUES (toDateTime64('2023-03-13 00:00:00', 3, 'Asia/Jerusalem'), 123);
SELECT toHour(toTimeZone(t, 'UTC')) AS toHour_UTC, toHour(toTimeZone(t, 'Asia/Jerusalem')) AS toHour_Israel, count()
FROM 02680_datetime64_monotonic_check
WHERE toHour_Israel = 0
GROUP BY toHour_UTC, toHour_Israel;
DROP TABLE 02680_datetime64_monotonic_check;

View File

@ -0,0 +1,6 @@
drop table if exists test_null_as_default__fuzz_46;
SET allow_suspicious_low_cardinality_types = 1;
CREATE TABLE test_null_as_default__fuzz_46 (a Nullable(DateTime64(3)), b LowCardinality(Float32) DEFAULT a + 1000) ENGINE = Memory;
INSERT INTO test_null_as_default__fuzz_46 SELECT 1, NULL UNION ALL SELECT 2, NULL;
drop table test_null_as_default__fuzz_46;

View File

@ -0,0 +1,2 @@
create table test_local (id UInt32, path LowCardinality(String)) engine = MergeTree order by id;
WITH ((position(path, '/a') > 0) AND (NOT (position(path, 'a') > 0))) OR (path = '/b') OR (path = '/b/') as alias1 SELECT max(alias1) FROM remote('127.0.0.{1,2}', currentDatabase(), test_local) WHERE (id = 299386662);