Merge branch 'master' into background-schedule-pool-refactoring

This commit is contained in:
mergify[bot] 2022-06-15 13:43:31 +00:00 committed by GitHub
commit c2afc2f6c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
58 changed files with 1728 additions and 463 deletions

View File

@ -6,7 +6,7 @@ sidebar_position: 33
Calculates the amount `Σ((x - x̅)^2) / (n - 1)`, where `n` is the sample size and `x̅`is the average value of `x`. Calculates the amount `Σ((x - x̅)^2) / (n - 1)`, where `n` is the sample size and `x̅`is the average value of `x`.
It represents an unbiased estimate of the variance of a random variable if passed values form its sample. It represents an unbiased estimate of the variance of a random variable if passed values from its sample.
Returns `Float64`. When `n <= 1`, returns `+∞`. Returns `Float64`. When `n <= 1`, returns `+∞`.

View File

@ -22,9 +22,9 @@ ClickHouse позволяет автоматически удалять данн
ClickHouse не удаляет данные в реальном времени, как СУБД [OLTP](https://en.wikipedia.org/wiki/Online_transaction_processing). Больше всего на такое удаление похожи мутации. Они выполняются с помощью запросов `ALTER ... DELETE` или `ALTER ... UPDATE`. В отличие от обычных запросов `DELETE` и `UPDATE`, мутации выполняются асинхронно, в пакетном режиме, не в реальном времени. В остальном после слов `ALTER TABLE` синтаксис обычных запросов и мутаций одинаковый. ClickHouse не удаляет данные в реальном времени, как СУБД [OLTP](https://en.wikipedia.org/wiki/Online_transaction_processing). Больше всего на такое удаление похожи мутации. Они выполняются с помощью запросов `ALTER ... DELETE` или `ALTER ... UPDATE`. В отличие от обычных запросов `DELETE` и `UPDATE`, мутации выполняются асинхронно, в пакетном режиме, не в реальном времени. В остальном после слов `ALTER TABLE` синтаксис обычных запросов и мутаций одинаковый.
`ALTER DELETE` можно использовать для гибкого удаления устаревших данных. Если вам нужно делать это регулярно, единственный недостаток такого подхода будет заключаться в том, что потребуется внешняя система для запуска запроса. Кроме того, могут возникнуть некоторые проблемы с производительностью, поскольку мутации перезаписывают целые куски данных если в них содержится хотя бы одна строка, которую нужно удалить. `ALTER DELETE` можно использовать для гибкого удаления устаревших данных. Если вам нужно делать это регулярно, основной недостаток такого подхода будет заключаться в том, что потребуется внешняя система для запуска запроса. Кроме того, могут возникнуть некоторые проблемы с производительностью, поскольку мутации перезаписывают целые куски данных если в них содержится хотя бы одна строка, которую нужно удалить.
Это самый распространенный подход к тому, чтобы обеспечить соблюдение принципов [GDPR](https://gdpr-info.eu) в вашей системе на ClickHouse. Это - самый распространенный подход к тому, чтобы обеспечить соблюдение принципов [GDPR](https://gdpr-info.eu) в вашей системе на ClickHouse.
Подробнее смотрите в разделе [Мутации](../../sql-reference/statements/alter/index.md#alter-mutations). Подробнее смотрите в разделе [Мутации](../../sql-reference/statements/alter/index.md#alter-mutations).

View File

@ -950,19 +950,18 @@ void ClientBase::onProfileEvents(Block & block)
progress_indication.addThreadIdToList(host_name, thread_id); progress_indication.addThreadIdToList(host_name, thread_id);
auto event_name = names.getDataAt(i); auto event_name = names.getDataAt(i);
auto value = array_values[i]; auto value = array_values[i];
/// Ignore negative time delta or memory usage just in case.
if (value < 0)
continue;
if (event_name == user_time_name) if (event_name == user_time_name)
{
thread_times[host_name][thread_id].user_ms = value; thread_times[host_name][thread_id].user_ms = value;
}
else if (event_name == system_time_name) else if (event_name == system_time_name)
{
thread_times[host_name][thread_id].system_ms = value; thread_times[host_name][thread_id].system_ms = value;
}
else if (event_name == MemoryTracker::USAGE_EVENT_NAME) else if (event_name == MemoryTracker::USAGE_EVENT_NAME)
{
thread_times[host_name][thread_id].memory_usage = value; thread_times[host_name][thread_id].memory_usage = value;
} }
}
auto elapsed_time = profile_events.watch.elapsedMicroseconds(); auto elapsed_time = profile_events.watch.elapsedMicroseconds();
progress_indication.updateThreadEventData(thread_times, elapsed_time); progress_indication.updateThreadEventData(thread_times, elapsed_time);

View File

@ -318,7 +318,59 @@ template <typename T>
void ColumnVector<T>::updatePermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability, void ColumnVector<T>::updatePermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
size_t limit, int nan_direction_hint, IColumn::Permutation & res, EqualRanges & equal_ranges) const size_t limit, int nan_direction_hint, IColumn::Permutation & res, EqualRanges & equal_ranges) const
{ {
auto sort = [](auto begin, auto end, auto pred) { ::sort(begin, end, pred); }; bool reverse = direction == IColumn::PermutationSortDirection::Descending;
bool ascending = direction == IColumn::PermutationSortDirection::Ascending;
bool sort_is_stable = stability == IColumn::PermutationSortStability::Stable;
auto sort = [&](auto begin, auto end, auto pred)
{
/// A case for radix sort
if constexpr (is_arithmetic_v<T> && !is_big_int_v<T>)
{
/// TODO: LSD RadixSort is currently not stable if direction is descending, or value is floating point
bool use_radix_sort = (sort_is_stable && ascending && !std::is_floating_point_v<T>) || !sort_is_stable;
size_t size = end - begin;
/// Thresholds on size. Lower threshold is arbitrary. Upper threshold is chosen by the type for histogram counters.
if (size >= 256 && size <= std::numeric_limits<UInt32>::max() && use_radix_sort)
{
PaddedPODArray<ValueWithIndex<T>> pairs(size);
size_t index = 0;
for (auto it = begin; it != end; ++it)
{
pairs[index] = {data[*it], static_cast<UInt32>(*it)};
++index;
}
RadixSort<RadixSortTraits<T>>::executeLSD(pairs.data(), size, reverse, begin);
/// Radix sort treats all NaNs to be greater than all numbers.
/// If the user needs the opposite, we must move them accordingly.
if (std::is_floating_point_v<T> && nan_direction_hint < 0)
{
size_t nans_to_move = 0;
for (size_t i = 0; i < size; ++i)
{
if (isNaN(data[begin[reverse ? i : size - 1 - i]]))
++nans_to_move;
else
break;
}
if (nans_to_move)
{
std::rotate(begin, begin + (reverse ? nans_to_move : size - nans_to_move), end);
}
}
return;
}
}
::sort(begin, end, pred);
};
auto partial_sort = [](auto begin, auto mid, auto end, auto pred) { ::partial_sort(begin, mid, end, pred); }; auto partial_sort = [](auto begin, auto mid, auto end, auto pred) { ::partial_sort(begin, mid, end, pred); };
if (direction == IColumn::PermutationSortDirection::Ascending && stability == IColumn::PermutationSortStability::Unstable) if (direction == IColumn::PermutationSortDirection::Ascending && stability == IColumn::PermutationSortStability::Unstable)

View File

@ -90,7 +90,7 @@ public:
/// Creates column with the same type and specified size. /// Creates column with the same type and specified size.
/// If size is less current size, then data is cut. /// If size is less current size, then data is cut.
/// If size is greater, than default values are appended. /// If size is greater, than default values are appended.
[[nodiscard]] virtual MutablePtr cloneResized(size_t /*size*/) const { throw Exception("Cannot cloneResized() column " + getName(), ErrorCodes::NOT_IMPLEMENTED); } [[nodiscard]] virtual MutablePtr cloneResized(size_t /*size*/) const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot cloneResized() column {}", getName()); }
/// Returns number of values in column. /// Returns number of values in column.
[[nodiscard]] virtual size_t size() const = 0; [[nodiscard]] virtual size_t size() const = 0;

View File

@ -19,7 +19,7 @@ namespace
double calculateCPUUsage(DB::ThreadIdToTimeMap times, UInt64 elapsed) double calculateCPUUsage(DB::ThreadIdToTimeMap times, UInt64 elapsed)
{ {
auto accumulated = std::accumulate(times.begin(), times.end(), 0, auto accumulated = std::accumulate(times.begin(), times.end(), 0,
[](Int64 acc, const auto & elem) [](UInt64 acc, const auto & elem)
{ {
if (elem.first == ALL_THREADS) if (elem.first == ALL_THREADS)
return acc; return acc;
@ -191,6 +191,10 @@ void ProgressIndication::writeProgress()
{ {
WriteBufferFromOwnString profiling_msg_builder; WriteBufferFromOwnString profiling_msg_builder;
/// We don't want -0. that can appear due to rounding errors.
if (cpu_usage <= 0)
cpu_usage = 0;
profiling_msg_builder << "(" << fmt::format("{:.1f}", cpu_usage) << " CPU"; profiling_msg_builder << "(" << fmt::format("{:.1f}", cpu_usage) << " CPU";
if (memory_usage > 0) if (memory_usage > 0)

View File

@ -16,11 +16,11 @@ namespace DB
struct ThreadEventData struct ThreadEventData
{ {
Int64 time() const noexcept { return user_ms + system_ms; } UInt64 time() const noexcept { return user_ms + system_ms; }
Int64 user_ms = 0; UInt64 user_ms = 0;
Int64 system_ms = 0; UInt64 system_ms = 0;
Int64 memory_usage = 0; UInt64 memory_usage = 0;
}; };
using ThreadIdToTimeMap = std::unordered_map<UInt64, ThreadEventData>; using ThreadIdToTimeMap = std::unordered_map<UInt64, ThreadEventData>;

View File

@ -0,0 +1,511 @@
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
#include <Common/typeid_cast.h>
#include <IO/WriteHelpers.h>
#include <span>
#include <bit>
#include <concepts>
namespace DB
{
class CompressionCodecFPC : public ICompressionCodec
{
public:
CompressionCodecFPC(UInt8 float_size, UInt8 compression_level);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
static constexpr UInt8 MAX_COMPRESSION_LEVEL{28};
static constexpr UInt8 DEFAULT_COMPRESSION_LEVEL{12};
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
static constexpr UInt32 HEADER_SIZE{3};
UInt8 float_width; // size of uncompressed float in bytes
UInt8 level; // compression level, 2^level * float_width is the size of predictors table in bytes
};
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int ILLEGAL_CODEC_PARAMETER;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int BAD_ARGUMENTS;
}
uint8_t CompressionCodecFPC::getMethodByte() const
{
return static_cast<uint8_t>(CompressionMethodByte::FPC);
}
void CompressionCodecFPC::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
CompressionCodecFPC::CompressionCodecFPC(UInt8 float_size, UInt8 compression_level)
: float_width{float_size}, level{compression_level}
{
setCodecDescription("FPC", {std::make_shared<ASTLiteral>(static_cast<UInt64>(level))});
}
UInt32 CompressionCodecFPC::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
auto float_count = (uncompressed_size + float_width - 1) / float_width;
if (float_count % 2 != 0)
++float_count;
return HEADER_SIZE + float_count * float_width + float_count / 2;
}
namespace
{
UInt8 getFloatBytesSize(const IDataType & column_type)
{
if (!WhichDataType(column_type).isFloat())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "FPC codec is not applicable for {} because the data type is not float",
column_type.getName());
}
if (auto float_size = column_type.getSizeOfValueInMemory(); float_size >= 4)
{
return static_cast<UInt8>(float_size);
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "FPC codec is not applicable for floats of size less than 4 bytes. Given type {}",
column_type.getName());
}
std::byte encodeEndianness(std::endian endian)
{
switch (endian)
{
case std::endian::little:
return std::byte{0};
case std::endian::big:
return std::byte{1};
}
throw Exception("Unsupported endianness", ErrorCodes::BAD_ARGUMENTS);
}
std::endian decodeEndianness(std::byte endian)
{
switch (std::to_integer<unsigned char>(endian))
{
case 0:
return std::endian::little;
case 1:
return std::endian::big;
}
throw Exception("Unsupported endianness", ErrorCodes::BAD_ARGUMENTS);
}
}
void registerCodecFPC(CompressionCodecFactory & factory)
{
auto method_code = static_cast<UInt8>(CompressionMethodByte::FPC);
auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
UInt8 float_width{0};
if (column_type != nullptr)
float_width = getFloatBytesSize(*column_type);
UInt8 level = CompressionCodecFPC::DEFAULT_COMPRESSION_LEVEL;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 1)
{
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE,
"FPC codec must have 1 parameter, given {}", arguments->children.size());
}
const auto * literal = arguments->children.front()->as<ASTLiteral>();
if (!literal)
throw Exception("FPC codec argument must be integer", ErrorCodes::ILLEGAL_CODEC_PARAMETER);
level = literal->value.safeGet<UInt8>();
if (level == 0)
throw Exception("FPC codec level must be at least 1", ErrorCodes::ILLEGAL_CODEC_PARAMETER);
if (level > CompressionCodecFPC::MAX_COMPRESSION_LEVEL)
throw Exception("FPC codec level must be at most 28", ErrorCodes::ILLEGAL_CODEC_PARAMETER);
}
return std::make_shared<CompressionCodecFPC>(float_width, level);
};
factory.registerCompressionCodecWithType("FPC", method_code, codec_builder);
}
namespace
{
template <std::unsigned_integral TUint>
requires (sizeof(TUint) >= 4)
class DfcmPredictor
{
public:
explicit DfcmPredictor(std::size_t table_size): table(table_size, 0), prev_value{0}, hash{0}
{
}
[[nodiscard]]
TUint predict() const noexcept
{
return table[hash] + prev_value;
}
void add(TUint value) noexcept
{
table[hash] = value - prev_value;
recalculateHash();
prev_value = value;
}
private:
void recalculateHash() noexcept
{
auto value = table[hash];
if constexpr (sizeof(TUint) >= 8)
{
hash = ((hash << 2) ^ static_cast<std::size_t>(value >> 40)) & (table.size() - 1);
}
else
{
hash = ((hash << 4) ^ static_cast<std::size_t>(value >> 23)) & (table.size() - 1);
}
}
std::vector<TUint> table;
TUint prev_value;
std::size_t hash;
};
template <std::unsigned_integral TUint>
requires (sizeof(TUint) >= 4)
class FcmPredictor
{
public:
explicit FcmPredictor(std::size_t table_size): table(table_size, 0), hash{0}
{
}
[[nodiscard]]
TUint predict() const noexcept
{
return table[hash];
}
void add(TUint value) noexcept
{
table[hash] = value;
recalculateHash();
}
private:
void recalculateHash() noexcept
{
auto value = table[hash];
if constexpr (sizeof(TUint) >= 8)
{
hash = ((hash << 6) ^ static_cast<std::size_t>(value >> 48)) & (table.size() - 1);
}
else
{
hash = ((hash << 1) ^ static_cast<std::size_t>(value >> 22)) & (table.size() - 1);
}
}
std::vector<TUint> table;
std::size_t hash;
};
template <std::unsigned_integral TUint, std::endian Endian = std::endian::native>
requires (Endian == std::endian::little || Endian == std::endian::big)
class FPCOperation
{
static constexpr std::size_t CHUNK_SIZE{64};
static constexpr auto VALUE_SIZE = sizeof(TUint);
static constexpr std::byte FCM_BIT{0};
static constexpr std::byte DFCM_BIT{1u << 3};
static constexpr auto DFCM_BIT_1 = DFCM_BIT << 4;
static constexpr auto DFCM_BIT_2 = DFCM_BIT;
static constexpr unsigned MAX_ZERO_BYTE_COUNT{0b111u};
public:
FPCOperation(std::span<std::byte> destination, UInt8 compression_level)
: dfcm_predictor(1u << compression_level), fcm_predictor(1u << compression_level), chunk{}, result{destination}
{
}
std::size_t encode(std::span<const std::byte> data) &&
{
auto initial_size = result.size();
std::span chunk_view(chunk);
for (std::size_t i = 0; i < data.size(); i += chunk_view.size_bytes())
{
auto written_values = importChunk(data.subspan(i), chunk_view);
encodeChunk(chunk_view.subspan(0, written_values));
}
return initial_size - result.size();
}
void decode(std::span<const std::byte> values, std::size_t decoded_size) &&
{
std::size_t read_bytes{0};
std::span<TUint> chunk_view(chunk);
for (std::size_t i = 0; i < decoded_size; i += chunk_view.size_bytes())
{
if (i + chunk_view.size_bytes() > decoded_size)
chunk_view = chunk_view.first(ceilBytesToEvenValues(decoded_size - i));
read_bytes += decodeChunk(values.subspan(read_bytes), chunk_view);
exportChunk(chunk_view);
}
}
private:
static std::size_t ceilBytesToEvenValues(std::size_t bytes_count)
{
auto values_count = (bytes_count + VALUE_SIZE - 1) / VALUE_SIZE;
return values_count % 2 == 0 ? values_count : values_count + 1;
}
std::size_t importChunk(std::span<const std::byte> values, std::span<TUint> chnk)
{
if (auto chunk_view = std::as_writable_bytes(chnk); chunk_view.size() <= values.size())
{
std::memcpy(chunk_view.data(), values.data(), chunk_view.size());
return chunk_view.size() / VALUE_SIZE;
}
else
{
std::memset(chunk_view.data(), 0, chunk_view.size());
std::memcpy(chunk_view.data(), values.data(), values.size());
return ceilBytesToEvenValues(values.size());
}
}
void exportChunk(std::span<const TUint> chnk)
{
auto chunk_view = std::as_bytes(chnk).first(std::min(result.size(), chnk.size_bytes()));
std::memcpy(result.data(), chunk_view.data(), chunk_view.size());
result = result.subspan(chunk_view.size());
}
void encodeChunk(std::span<const TUint> seq)
{
for (std::size_t i = 0; i < seq.size(); i += 2)
{
encodePair(seq[i], seq[i + 1]);
}
}
struct CompressedValue
{
TUint value;
unsigned compressed_size;
std::byte predictor;
};
unsigned encodeCompressedZeroByteCount(int compressed)
{
if constexpr (VALUE_SIZE == MAX_ZERO_BYTE_COUNT + 1)
{
if (compressed >= 4)
--compressed;
}
return std::min(static_cast<unsigned>(compressed), MAX_ZERO_BYTE_COUNT);
}
unsigned decodeCompressedZeroByteCount(unsigned encoded_size)
{
if constexpr (VALUE_SIZE == MAX_ZERO_BYTE_COUNT + 1)
{
if (encoded_size > 3)
++encoded_size;
}
return encoded_size;
}
CompressedValue compressValue(TUint value) noexcept
{
static constexpr auto BITS_PER_BYTE = std::numeric_limits<unsigned char>::digits;
TUint compressed_dfcm = dfcm_predictor.predict() ^ value;
TUint compressed_fcm = fcm_predictor.predict() ^ value;
dfcm_predictor.add(value);
fcm_predictor.add(value);
auto zeroes_dfcm = std::countl_zero(compressed_dfcm);
auto zeroes_fcm = std::countl_zero(compressed_fcm);
if (zeroes_dfcm > zeroes_fcm)
return {compressed_dfcm, encodeCompressedZeroByteCount(zeroes_dfcm / BITS_PER_BYTE), DFCM_BIT};
return {compressed_fcm, encodeCompressedZeroByteCount(zeroes_fcm / BITS_PER_BYTE), FCM_BIT};
}
void encodePair(TUint first, TUint second)
{
auto [value1, zero_byte_count1, predictor1] = compressValue(first);
auto [value2, zero_byte_count2, predictor2] = compressValue(second);
std::byte header{0x0};
header |= (predictor1 << 4) | predictor2;
header |= static_cast<std::byte>((zero_byte_count1 << 4) | zero_byte_count2);
result.front() = header;
zero_byte_count1 = decodeCompressedZeroByteCount(zero_byte_count1);
zero_byte_count2 = decodeCompressedZeroByteCount(zero_byte_count2);
auto tail_size1 = VALUE_SIZE - zero_byte_count1;
auto tail_size2 = VALUE_SIZE - zero_byte_count2;
std::memcpy(result.data() + 1, valueTail(value1, zero_byte_count1), tail_size1);
std::memcpy(result.data() + 1 + tail_size1, valueTail(value2, zero_byte_count2), tail_size2);
result = result.subspan(1 + tail_size1 + tail_size2);
}
std::size_t decodeChunk(std::span<const std::byte> values, std::span<TUint> seq)
{
std::size_t read_bytes{0};
for (std::size_t i = 0; i < seq.size(); i += 2)
{
read_bytes += decodePair(values.subspan(read_bytes), seq[i], seq[i + 1]);
}
return read_bytes;
}
TUint decompressValue(TUint value, bool isDfcmPredictor)
{
TUint decompressed;
if (isDfcmPredictor)
{
decompressed = dfcm_predictor.predict() ^ value;
}
else
{
decompressed = fcm_predictor.predict() ^ value;
}
dfcm_predictor.add(decompressed);
fcm_predictor.add(decompressed);
return decompressed;
}
std::size_t decodePair(std::span<const std::byte> bytes, TUint& first, TUint& second)
{
if (bytes.empty())
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Unexpected end of encoded sequence");
auto zero_byte_count1 = decodeCompressedZeroByteCount(
std::to_integer<unsigned>(bytes.front() >> 4) & MAX_ZERO_BYTE_COUNT);
auto zero_byte_count2 = decodeCompressedZeroByteCount(
std::to_integer<unsigned>(bytes.front()) & MAX_ZERO_BYTE_COUNT);
auto tail_size1 = VALUE_SIZE - zero_byte_count1;
auto tail_size2 = VALUE_SIZE - zero_byte_count2;
if (bytes.size() < 1 + tail_size1 + tail_size2)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Unexpected end of encoded sequence");
TUint value1{0};
TUint value2{0};
std::memcpy(valueTail(value1, zero_byte_count1), bytes.data() + 1, tail_size1);
std::memcpy(valueTail(value2, zero_byte_count2), bytes.data() + 1 + tail_size1, tail_size2);
auto is_dfcm_predictor1 = std::to_integer<unsigned char>(bytes.front() & DFCM_BIT_1) != 0;
auto is_dfcm_predictor2 = std::to_integer<unsigned char>(bytes.front() & DFCM_BIT_2) != 0;
first = decompressValue(value1, is_dfcm_predictor1);
second = decompressValue(value2, is_dfcm_predictor2);
return 1 + tail_size1 + tail_size2;
}
static void* valueTail(TUint& value, unsigned compressed_size)
{
if constexpr (Endian == std::endian::little)
{
return &value;
}
else
{
return reinterpret_cast<std::byte*>(&value) + compressed_size;
}
}
DfcmPredictor<TUint> dfcm_predictor;
FcmPredictor<TUint> fcm_predictor;
std::array<TUint, CHUNK_SIZE> chunk{};
std::span<std::byte> result{};
};
}
UInt32 CompressionCodecFPC::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
dest[0] = static_cast<char>(float_width);
dest[1] = static_cast<char>(level);
dest[2] = std::to_integer<char>(encodeEndianness(std::endian::native));
auto dest_size = getMaxCompressedDataSize(source_size);
auto destination = std::as_writable_bytes(std::span(dest, dest_size).subspan(HEADER_SIZE));
auto src = std::as_bytes(std::span(source, source_size));
switch (float_width)
{
case sizeof(Float64):
return HEADER_SIZE + FPCOperation<UInt64>(destination, level).encode(src);
case sizeof(Float32):
return HEADER_SIZE + FPCOperation<UInt32>(destination, level).encode(src);
default:
break;
}
throw Exception("Cannot compress. File has incorrect float width", ErrorCodes::CANNOT_COMPRESS);
}
void CompressionCodecFPC::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
if (source_size < HEADER_SIZE)
throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS);
auto compressed_data = std::as_bytes(std::span(source, source_size));
auto compressed_float_width = std::to_integer<UInt8>(compressed_data[0]);
auto compressed_level = std::to_integer<UInt8>(compressed_data[1]);
if (compressed_level == 0 || compressed_level > MAX_COMPRESSION_LEVEL)
throw Exception("Cannot decompress. File has incorrect level", ErrorCodes::CANNOT_DECOMPRESS);
if (decodeEndianness(compressed_data[2]) != std::endian::native)
throw Exception("Cannot decompress. File has incorrect endianness", ErrorCodes::CANNOT_DECOMPRESS);
auto destination = std::as_writable_bytes(std::span(dest, uncompressed_size));
auto src = compressed_data.subspan(HEADER_SIZE);
switch (compressed_float_width)
{
case sizeof(Float64):
FPCOperation<UInt64>(destination, compressed_level).decode(src, uncompressed_size);
break;
case sizeof(Float32):
FPCOperation<UInt32>(destination, compressed_level).decode(src, uncompressed_size);
break;
default:
throw Exception("Cannot decompress. File has incorrect float width", ErrorCodes::CANNOT_DECOMPRESS);
}
}
}

View File

@ -177,6 +177,7 @@ void registerCodecT64(CompressionCodecFactory & factory);
void registerCodecDoubleDelta(CompressionCodecFactory & factory); void registerCodecDoubleDelta(CompressionCodecFactory & factory);
void registerCodecGorilla(CompressionCodecFactory & factory); void registerCodecGorilla(CompressionCodecFactory & factory);
void registerCodecEncrypted(CompressionCodecFactory & factory); void registerCodecEncrypted(CompressionCodecFactory & factory);
void registerCodecFPC(CompressionCodecFactory & factory);
#endif #endif
@ -194,6 +195,7 @@ CompressionCodecFactory::CompressionCodecFactory()
registerCodecDoubleDelta(*this); registerCodecDoubleDelta(*this);
registerCodecGorilla(*this); registerCodecGorilla(*this);
registerCodecEncrypted(*this); registerCodecEncrypted(*this);
registerCodecFPC(*this);
#endif #endif
default_codec = get("LZ4", {}); default_codec = get("LZ4", {});

View File

@ -44,7 +44,8 @@ enum class CompressionMethodByte : uint8_t
DoubleDelta = 0x94, DoubleDelta = 0x94,
Gorilla = 0x95, Gorilla = 0x95,
AES_128_GCM_SIV = 0x96, AES_128_GCM_SIV = 0x96,
AES_256_GCM_SIV = 0x97 AES_256_GCM_SIV = 0x97,
FPC = 0x98
}; };
} }

View File

@ -693,6 +693,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, input_format_values_deduce_templates_of_expressions, true, "For Values format: if the field could not be parsed by streaming parser, run SQL parser, deduce template of the SQL expression, try to parse all rows using template and then interpret expression for all rows.", 0) \ M(Bool, input_format_values_deduce_templates_of_expressions, true, "For Values format: if the field could not be parsed by streaming parser, run SQL parser, deduce template of the SQL expression, try to parse all rows using template and then interpret expression for all rows.", 0) \
M(Bool, input_format_values_accurate_types_of_literals, true, "For Values format: when parsing and interpreting expressions using template, check actual type of literal to avoid possible overflow and precision issues.", 0) \ M(Bool, input_format_values_accurate_types_of_literals, true, "For Values format: when parsing and interpreting expressions using template, check actual type of literal to avoid possible overflow and precision issues.", 0) \
M(Bool, input_format_avro_allow_missing_fields, false, "For Avro/AvroConfluent format: when field is not found in schema use default value instead of error", 0) \ M(Bool, input_format_avro_allow_missing_fields, false, "For Avro/AvroConfluent format: when field is not found in schema use default value instead of error", 0) \
M(Bool, input_format_avro_null_as_default, false, "For Avro/AvroConfluent format: insert default in case of null and non Nullable column", 0) \
M(URI, format_avro_schema_registry_url, "", "For AvroConfluent format: Confluent Schema Registry URL.", 0) \ M(URI, format_avro_schema_registry_url, "", "For AvroConfluent format: Confluent Schema Registry URL.", 0) \
\ \
M(Bool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \ M(Bool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \

View File

@ -1,3 +1,4 @@
#include <cstddef>
#include <Columns/IColumn.h> #include <Columns/IColumn.h>
#include <Columns/ColumnConst.h> #include <Columns/ColumnConst.h>
#include <Columns/ColumnSparse.h> #include <Columns/ColumnSparse.h>
@ -162,6 +163,12 @@ void IDataType::insertDefaultInto(IColumn & column) const
column.insertDefault(); column.insertDefault();
} }
void IDataType::insertManyDefaultsInto(IColumn & column, size_t n) const
{
for (size_t i = 0; i < n; ++i)
insertDefaultInto(column);
}
void IDataType::setCustomization(DataTypeCustomDescPtr custom_desc_) const void IDataType::setCustomization(DataTypeCustomDescPtr custom_desc_) const
{ {
/// replace only if not null /// replace only if not null

View File

@ -159,6 +159,8 @@ public:
*/ */
virtual void insertDefaultInto(IColumn & column) const; virtual void insertDefaultInto(IColumn & column) const;
void insertManyDefaultsInto(IColumn & column, size_t n) const;
/// Checks that two instances belong to the same type /// Checks that two instances belong to the same type
virtual bool equals(const IDataType & rhs) const = 0; virtual bool equals(const IDataType & rhs) const = 0;

View File

@ -56,6 +56,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.avro.schema_registry_url = settings.format_avro_schema_registry_url.toString(); format_settings.avro.schema_registry_url = settings.format_avro_schema_registry_url.toString();
format_settings.avro.string_column_pattern = settings.output_format_avro_string_column_pattern.toString(); format_settings.avro.string_column_pattern = settings.output_format_avro_string_column_pattern.toString();
format_settings.avro.output_rows_in_file = settings.output_format_avro_rows_in_file; format_settings.avro.output_rows_in_file = settings.output_format_avro_rows_in_file;
format_settings.avro.null_as_default = settings.input_format_avro_null_as_default;
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes; format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes; format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
format_settings.csv.crlf_end_of_line = settings.output_format_csv_crlf_end_of_line; format_settings.csv.crlf_end_of_line = settings.output_format_csv_crlf_end_of_line;

View File

@ -92,6 +92,7 @@ struct FormatSettings
bool allow_missing_fields = false; bool allow_missing_fields = false;
String string_column_pattern; String string_column_pattern;
UInt64 output_rows_in_file = 1; UInt64 output_rows_in_file = 1;
bool null_as_default = false;
} avro; } avro;
String bool_true_representation = "true"; String bool_true_representation = "true";

View File

@ -13,6 +13,7 @@
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeFactory.h> #include <DataTypes/DataTypeFactory.h>
#include <Functions/FunctionHelpers.h> #include <Functions/FunctionHelpers.h>
#include <Functions/FunctionUnaryArithmetic.h>
#include <Common/FieldVisitors.h> #include <Common/FieldVisitors.h>
#include <algorithm> #include <algorithm>
@ -50,17 +51,15 @@ MutableColumnPtr buildColumnFromTernaryData(const UInt8Container & ternary_data,
const size_t rows_count = ternary_data.size(); const size_t rows_count = ternary_data.size();
auto new_column = ColumnUInt8::create(rows_count); auto new_column = ColumnUInt8::create(rows_count);
std::transform( for (size_t i = 0; i < rows_count; ++i)
ternary_data.cbegin(), ternary_data.cend(), new_column->getData().begin(), new_column->getData()[i] = (ternary_data[i] == Ternary::True);
[](const auto x) { return x == Ternary::True; });
if (!make_nullable) if (!make_nullable)
return new_column; return new_column;
auto null_column = ColumnUInt8::create(rows_count); auto null_column = ColumnUInt8::create(rows_count);
std::transform( for (size_t i = 0; i < rows_count; ++i)
ternary_data.cbegin(), ternary_data.cend(), null_column->getData().begin(), null_column->getData()[i] = (ternary_data[i] == Ternary::Null);
[](const auto x) { return x == Ternary::Null; });
return ColumnNullable::create(std::move(new_column), std::move(null_column)); return ColumnNullable::create(std::move(new_column), std::move(null_column));
} }
@ -68,13 +67,14 @@ MutableColumnPtr buildColumnFromTernaryData(const UInt8Container & ternary_data,
template <typename T> template <typename T>
bool tryConvertColumnToBool(const IColumn * column, UInt8Container & res) bool tryConvertColumnToBool(const IColumn * column, UInt8Container & res)
{ {
const auto col = checkAndGetColumn<ColumnVector<T>>(column); const auto column_typed = checkAndGetColumn<ColumnVector<T>>(column);
if (!col) if (!column_typed)
return false; return false;
std::transform( auto & data = column_typed->getData();
col->getData().cbegin(), col->getData().cend(), res.begin(), size_t data_size = data.size();
[](const auto x) { return !!x; }); for (size_t i = 0; i < data_size; ++i)
res[i] = static_cast<bool>(data[i]);
return true; return true;
} }
@ -99,7 +99,7 @@ bool extractConstColumns(ColumnRawPtrs & in, UInt8 & res, Func && func)
{ {
bool has_res = false; bool has_res = false;
for (int i = static_cast<int>(in.size()) - 1; i >= 0; --i) for (Int64 i = static_cast<Int64>(in.size()) - 1; i >= 0; --i)
{ {
UInt8 x; UInt8 x;
@ -458,7 +458,9 @@ ColumnPtr basicExecuteImpl(ColumnRawPtrs arguments, size_t input_rows_count)
for (const IColumn * column : arguments) for (const IColumn * column : arguments)
{ {
if (const auto * uint8_column = checkAndGetColumn<ColumnUInt8>(column)) if (const auto * uint8_column = checkAndGetColumn<ColumnUInt8>(column))
{
uint8_args.push_back(uint8_column); uint8_args.push_back(uint8_column);
}
else else
{ {
auto converted_column = ColumnUInt8::create(input_rows_count); auto converted_column = ColumnUInt8::create(input_rows_count);
@ -596,14 +598,14 @@ ColumnPtr FunctionAnyArityLogical<Impl, Name>::executeShortCircuit(ColumnsWithTy
if (nulls) if (nulls)
applyTernaryLogic<Name>(mask, *nulls); applyTernaryLogic<Name>(mask, *nulls);
MutableColumnPtr res = ColumnUInt8::create(); auto res = ColumnUInt8::create();
typeid_cast<ColumnUInt8 *>(res.get())->getData() = std::move(mask); res->getData() = std::move(mask);
if (!nulls) if (!nulls)
return res; return res;
MutableColumnPtr bytemap = ColumnUInt8::create(); auto bytemap = ColumnUInt8::create();
typeid_cast<ColumnUInt8 *>(bytemap.get())->getData() = std::move(*nulls); bytemap->getData() = std::move(*nulls);
return ColumnNullable::create(std::move(res), std::move(bytemap)); return ColumnNullable::create(std::move(res), std::move(bytemap));
} }
@ -692,29 +694,14 @@ ColumnPtr FunctionAnyArityLogical<Impl, Name>::getConstantResultForNonConstArgum
return result_column; return result_column;
} }
template <typename A, typename Op>
struct UnaryOperationImpl
{
using ResultType = typename Op::ResultType;
using ArrayA = typename ColumnVector<A>::Container;
using ArrayC = typename ColumnVector<ResultType>::Container;
static void NO_INLINE vector(const ArrayA & a, ArrayC & c)
{
std::transform(
a.cbegin(), a.cend(), c.begin(),
[](const auto x) { return Op::apply(x); });
}
};
template <template <typename> class Impl, typename Name> template <template <typename> class Impl, typename Name>
DataTypePtr FunctionUnaryLogical<Impl, Name>::getReturnTypeImpl(const DataTypes & arguments) const DataTypePtr FunctionUnaryLogical<Impl, Name>::getReturnTypeImpl(const DataTypes & arguments) const
{ {
if (!isNativeNumber(arguments[0])) if (!isNativeNumber(arguments[0]))
throw Exception("Illegal type (" throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
+ arguments[0]->getName() "Illegal type ({}) of argument of function {}",
+ ") of argument of function " + getName(), arguments[0]->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); getName());
return isBool(arguments[0]) ? DataTypeFactory::instance().get("Bool") : std::make_shared<DataTypeUInt8>(); return isBool(arguments[0]) ? DataTypeFactory::instance().get("Bool") : std::make_shared<DataTypeUInt8>();
} }
@ -724,10 +711,9 @@ ColumnPtr functionUnaryExecuteType(const ColumnsWithTypeAndName & arguments)
{ {
if (auto col = checkAndGetColumn<ColumnVector<T>>(arguments[0].column.get())) if (auto col = checkAndGetColumn<ColumnVector<T>>(arguments[0].column.get()))
{ {
auto col_res = ColumnUInt8::create(); auto col_res = ColumnUInt8::create(col->getData().size());
auto & vec_res = col_res->getData();
typename ColumnUInt8::Container & vec_res = col_res->getData();
vec_res.resize(col->getData().size());
UnaryOperationImpl<T, Impl<T>>::vector(col->getData(), vec_res); UnaryOperationImpl<T, Impl<T>>::vector(col->getData(), vec_res);
return col_res; return col_res;
@ -750,9 +736,10 @@ ColumnPtr FunctionUnaryLogical<Impl, Name>::executeImpl(const ColumnsWithTypeAnd
|| (res = functionUnaryExecuteType<Impl, Int64>(arguments)) || (res = functionUnaryExecuteType<Impl, Int64>(arguments))
|| (res = functionUnaryExecuteType<Impl, Float32>(arguments)) || (res = functionUnaryExecuteType<Impl, Float32>(arguments))
|| (res = functionUnaryExecuteType<Impl, Float64>(arguments)))) || (res = functionUnaryExecuteType<Impl, Float64>(arguments))))
throw Exception("Illegal column " + arguments[0].column->getName() throw Exception(ErrorCodes::ILLEGAL_COLUMN,
+ " of argument of function " + getName(), "Illegal column {} of argument of function {}",
ErrorCodes::ILLEGAL_COLUMN); arguments[0].column->getName(),
getName());
return res; return res;
} }

View File

@ -76,7 +76,7 @@ bool FillingRow::next(const FillingRow & to_row)
auto next_value = row[pos]; auto next_value = row[pos];
getFillDescription(pos).step_func(next_value); getFillDescription(pos).step_func(next_value);
if (less(to_row.row[pos], next_value, getDirection(pos))) if (less(to_row.row[pos], next_value, getDirection(pos)) || equals(next_value, getFillDescription(pos).fill_to))
return false; return false;
row[pos] = next_value; row[pos] = next_value;

View File

@ -4,7 +4,6 @@
#include <numeric> #include <numeric>
#include <Core/Defines.h>
#include <Core/Field.h> #include <Core/Field.h>
#include <Common/LRUCache.h> #include <Common/LRUCache.h>
@ -13,12 +12,9 @@
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/HTTPCommon.h> #include <IO/HTTPCommon.h>
#include <Formats/verbosePrintString.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h> #include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeEnum.h> #include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeFixedString.h> #include <DataTypes/DataTypeFixedString.h>
@ -28,7 +24,7 @@
#include <DataTypes/DataTypeTuple.h> #include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeUUID.h> #include <DataTypes/DataTypeUUID.h>
#include <DataTypes/IDataType.h> #include <DataTypes/IDataType.h>
#include <DataTypes/getLeastSupertype.h> #include <DataTypes/DataTypeMap.h>
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h> #include <Columns/ColumnFixedString.h>
@ -36,33 +32,24 @@
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Columns/ColumnLowCardinality.h> #include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnMap.h>
#include <avro/Compiler.hh> #include <Compiler.hh>
#include <avro/DataFile.hh> #include <DataFile.hh>
#include <avro/Decoder.hh> #include <Decoder.hh>
#include <avro/Encoder.hh> #include <Node.hh>
#include <avro/Generic.hh> #include <NodeConcepts.hh>
#include <avro/GenericDatum.hh> #include <NodeImpl.hh>
#include <avro/Node.hh> #include <Types.hh>
#include <avro/NodeConcepts.hh> #include <ValidSchema.hh>
#include <avro/NodeImpl.hh>
#include <avro/Reader.hh>
#include <avro/Schema.hh>
#include <avro/Specific.hh>
#include <avro/Types.hh>
#include <avro/ValidSchema.hh>
#include <avro/Writer.hh>
#include <Poco/BinaryReader.h>
#include <Poco/Buffer.h> #include <Poco/Buffer.h>
#include <Poco/JSON/JSON.h> #include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h> #include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h> #include <Poco/JSON/Parser.h>
#include <Poco/MemoryStream.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h> #include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h> #include <Poco/Net/HTTPResponse.h>
#include <Poco/Poco.h>
#include <Poco/URI.h> #include <Poco/URI.h>
@ -292,9 +279,13 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
break; break;
case avro::AVRO_UNION: case avro::AVRO_UNION:
{ {
auto nullable_deserializer = [root_node, target_type](size_t non_null_union_index) if (root_node->leaves() == 2
&& (root_node->leafAt(0)->type() == avro::AVRO_NULL || root_node->leafAt(1)->type() == avro::AVRO_NULL))
{ {
auto nested_deserialize = createDeserializeFn(root_node->leafAt(non_null_union_index), removeNullable(target_type)); size_t non_null_union_index = root_node->leafAt(0)->type() == avro::AVRO_NULL ? 1 : 0;
if (target.isNullable())
{
auto nested_deserialize = this->createDeserializeFn(root_node->leafAt(non_null_union_index), removeNullable(target_type));
return [non_null_union_index, nested_deserialize](IColumn & column, avro::Decoder & decoder) return [non_null_union_index, nested_deserialize](IColumn & column, avro::Decoder & decoder)
{ {
ColumnNullable & col = assert_cast<ColumnNullable &>(column); ColumnNullable & col = assert_cast<ColumnNullable &>(column);
@ -309,13 +300,21 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
col.insertDefault(); col.insertDefault();
} }
}; };
}; }
if (root_node->leaves() == 2 && target.isNullable())
if (null_as_default)
{ {
if (root_node->leafAt(0)->type() == avro::AVRO_NULL) auto nested_deserialize = this->createDeserializeFn(root_node->leafAt(non_null_union_index), target_type);
return nullable_deserializer(1); return [non_null_union_index, nested_deserialize](IColumn & column, avro::Decoder & decoder)
if (root_node->leafAt(1)->type() == avro::AVRO_NULL) {
return nullable_deserializer(0); size_t union_index = decoder.decodeUnionIndex();
if (union_index == non_null_union_index)
nested_deserialize(column, decoder);
else
column.insertDefault();
};
}
} }
break; break;
} }
@ -386,8 +385,69 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
} }
case avro::AVRO_SYMBOLIC: case avro::AVRO_SYMBOLIC:
return createDeserializeFn(avro::resolveSymbol(root_node), target_type); return createDeserializeFn(avro::resolveSymbol(root_node), target_type);
case avro::AVRO_MAP:
case avro::AVRO_RECORD: case avro::AVRO_RECORD:
{
if (target.isTuple())
{
const DataTypeTuple & tuple_type = assert_cast<const DataTypeTuple &>(*target_type);
const auto & nested_types = tuple_type.getElements();
std::vector<std::pair<DeserializeFn, size_t>> nested_deserializers;
nested_deserializers.reserve(root_node->leaves());
if (root_node->leaves() != nested_types.size())
throw Exception(ErrorCodes::INCORRECT_DATA, "The number of leaves in record doesn't match the number of elements in tuple");
for (size_t i = 0; i != root_node->leaves(); ++i)
{
const auto & name = root_node->nameAt(i);
size_t pos = tuple_type.getPositionByName(name);
auto nested_deserializer = createDeserializeFn(root_node->leafAt(i), nested_types[pos]);
nested_deserializers.emplace_back(nested_deserializer, pos);
}
return [nested_deserializers](IColumn & column, avro::Decoder & decoder)
{
ColumnTuple & column_tuple = assert_cast<ColumnTuple &>(column);
auto nested_columns = column_tuple.getColumns();
for (const auto & [nested_deserializer, pos] : nested_deserializers)
nested_deserializer(*nested_columns[pos], decoder);
};
}
break;
}
case avro::AVRO_MAP:
{
if (target.isMap())
{
const auto & map_type = assert_cast<const DataTypeMap &>(*target_type);
const auto & keys_type = map_type.getKeyType();
const auto & values_type = map_type.getValueType();
auto keys_source_type = root_node->leafAt(0);
auto values_source_type = root_node->leafAt(1);
auto keys_deserializer = createDeserializeFn(keys_source_type, keys_type);
auto values_deserializer = createDeserializeFn(values_source_type, values_type);
return [keys_deserializer, values_deserializer](IColumn & column, avro::Decoder & decoder)
{
ColumnMap & column_map = assert_cast<ColumnMap &>(column);
ColumnArray & column_array = column_map.getNestedColumn();
ColumnArray::Offsets & offsets = column_array.getOffsets();
ColumnTuple & nested_columns = column_map.getNestedData();
IColumn & keys_column = nested_columns.getColumn(0);
IColumn & values_column = nested_columns.getColumn(1);
size_t total = 0;
for (size_t n = decoder.mapStart(); n != 0; n = decoder.mapNext())
{
total += n;
for (size_t i = 0; i < n; ++i)
{
keys_deserializer(keys_column, decoder);
values_deserializer(values_column, decoder);
}
}
offsets.push_back(offsets.back() + total);
};
}
break;
}
default: default:
break; break;
} }
@ -577,7 +637,8 @@ AvroDeserializer::Action AvroDeserializer::createAction(const Block & header, co
} }
} }
AvroDeserializer::AvroDeserializer(const Block & header, avro::ValidSchema schema, bool allow_missing_fields) AvroDeserializer::AvroDeserializer(const Block & header, avro::ValidSchema schema, bool allow_missing_fields, bool null_as_default_)
: null_as_default(null_as_default_)
{ {
const auto & schema_root = schema.root(); const auto & schema_root = schema.root();
if (schema_root->type() != avro::AVRO_RECORD) if (schema_root->type() != avro::AVRO_RECORD)
@ -615,15 +676,15 @@ void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder &
AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_) AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, in_, params_), : IRowInputFormat(header_, in_, params_), format_settings(format_settings_)
allow_missing_fields(format_settings_.avro.allow_missing_fields)
{ {
} }
void AvroRowInputFormat::readPrefix() void AvroRowInputFormat::readPrefix()
{ {
file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<InputStreamReadBufferAdapter>(*in)); file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<InputStreamReadBufferAdapter>(*in));
deserializer_ptr = std::make_unique<AvroDeserializer>(output.getHeader(), file_reader_ptr->dataSchema(), allow_missing_fields); deserializer_ptr = std::make_unique<AvroDeserializer>(
output.getHeader(), file_reader_ptr->dataSchema(), format_settings.avro.allow_missing_fields, format_settings.avro.null_as_default);
file_reader_ptr->init(); file_reader_ptr->init();
} }
@ -809,7 +870,8 @@ const AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(Sc
if (it == deserializer_cache.end()) if (it == deserializer_cache.end())
{ {
auto schema = schema_registry->getSchema(schema_id); auto schema = schema_registry->getSchema(schema_id);
AvroDeserializer deserializer(output.getHeader(), schema, format_settings.avro.allow_missing_fields); AvroDeserializer deserializer(
output.getHeader(), schema, format_settings.avro.allow_missing_fields, format_settings.avro.null_as_default);
it = deserializer_cache.emplace(schema_id, deserializer).first; it = deserializer_cache.emplace(schema_id, deserializer).first;
} }
return it->second; return it->second;
@ -891,11 +953,27 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node)
if (node->leaves() == 2 && (node->leafAt(0)->type() == avro::Type::AVRO_NULL || node->leafAt(1)->type() == avro::Type::AVRO_NULL)) if (node->leaves() == 2 && (node->leafAt(0)->type() == avro::Type::AVRO_NULL || node->leafAt(1)->type() == avro::Type::AVRO_NULL))
{ {
size_t nested_leaf_index = node->leafAt(0)->type() == avro::Type::AVRO_NULL ? 1 : 0; size_t nested_leaf_index = node->leafAt(0)->type() == avro::Type::AVRO_NULL ? 1 : 0;
return makeNullable(avroNodeToDataType(node->leafAt(nested_leaf_index))); auto nested_type = avroNodeToDataType(node->leafAt(nested_leaf_index));
return nested_type->canBeInsideNullable() ? makeNullable(nested_type) : nested_type;
} }
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Avro type UNION is not supported for inserting."); throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Avro type UNION is not supported for inserting.");
case avro::Type::AVRO_SYMBOLIC: case avro::Type::AVRO_SYMBOLIC:
return avroNodeToDataType(avro::resolveSymbol(node)); return avroNodeToDataType(avro::resolveSymbol(node));
case avro::Type::AVRO_RECORD:
{
DataTypes nested_types;
nested_types.reserve(node->leaves());
Names nested_names;
nested_names.reserve(node->leaves());
for (size_t i = 0; i != node->leaves(); ++i)
{
nested_types.push_back(avroNodeToDataType(node->leafAt(i)));
nested_names.push_back(node->nameAt(i));
}
return std::make_shared<DataTypeTuple>(nested_types, nested_names);
}
case avro::Type::AVRO_MAP:
return std::make_shared<DataTypeMap>(avroNodeToDataType(node->leafAt(0)), avroNodeToDataType(node->leafAt(1)));
default: default:
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Avro column {} is not supported for inserting."); throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Avro column {} is not supported for inserting.");
} }

View File

@ -15,10 +15,10 @@
#include <Processors/Formats/IRowInputFormat.h> #include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/ISchemaReader.h> #include <Processors/Formats/ISchemaReader.h>
#include <avro/DataFile.hh> #include <DataFile.hh>
#include <avro/Decoder.hh> #include <Decoder.hh>
#include <avro/Schema.hh> #include <Schema.hh>
#include <avro/ValidSchema.hh> #include <ValidSchema.hh>
namespace DB namespace DB
@ -32,13 +32,13 @@ namespace ErrorCodes
class AvroDeserializer class AvroDeserializer
{ {
public: public:
AvroDeserializer(const Block & header, avro::ValidSchema schema, bool allow_missing_fields); AvroDeserializer(const Block & header, avro::ValidSchema schema, bool allow_missing_fields, bool null_as_default_);
void deserializeRow(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const; void deserializeRow(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const;
private: private:
using DeserializeFn = std::function<void(IColumn & column, avro::Decoder & decoder)>; using DeserializeFn = std::function<void(IColumn & column, avro::Decoder & decoder)>;
using SkipFn = std::function<void(avro::Decoder & decoder)>; using SkipFn = std::function<void(avro::Decoder & decoder)>;
static DeserializeFn createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type); DeserializeFn createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type);
SkipFn createSkipFn(avro::NodePtr root_node); SkipFn createSkipFn(avro::NodePtr root_node);
struct Action struct Action
@ -113,6 +113,8 @@ private:
/// Map from name of named Avro type (record, enum, fixed) to SkipFn. /// Map from name of named Avro type (record, enum, fixed) to SkipFn.
/// This is to avoid infinite recursion when Avro schema contains self-references. e.g. LinkedList /// This is to avoid infinite recursion when Avro schema contains self-references. e.g. LinkedList
std::map<avro::Name, SkipFn> symbolic_skip_fn_map; std::map<avro::Name, SkipFn> symbolic_skip_fn_map;
bool null_as_default = false;
}; };
class AvroRowInputFormat final : public IRowInputFormat class AvroRowInputFormat final : public IRowInputFormat
@ -128,7 +130,7 @@ private:
std::unique_ptr<avro::DataFileReaderBase> file_reader_ptr; std::unique_ptr<avro::DataFileReaderBase> file_reader_ptr;
std::unique_ptr<AvroDeserializer> deserializer_ptr; std::unique_ptr<AvroDeserializer> deserializer_ptr;
bool allow_missing_fields; FormatSettings format_settings;
}; };
/// Confluent framing + Avro binary datum encoding. Mainly used for Kafka. /// Confluent framing + Avro binary datum encoding. Mainly used for Kafka.

View File

@ -1,23 +1,21 @@
#include "AvroRowOutputFormat.h" #include "AvroRowOutputFormat.h"
#if USE_AVRO #if USE_AVRO
#include <Core/Defines.h>
#include <Core/Field.h> #include <Core/Field.h>
#include <IO/Operators.h>
#include <IO/WriteBuffer.h> #include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Formats/verbosePrintString.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h> #include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h> #include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeEnum.h> #include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeLowCardinality.h> #include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeUUID.h> #include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeMap.h>
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h> #include <Columns/ColumnFixedString.h>
@ -25,21 +23,13 @@
#include <Columns/ColumnNullable.h> #include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnMap.h>
#include <avro/Compiler.hh> #include <DataFile.hh>
#include <avro/DataFile.hh> #include <Encoder.hh>
#include <avro/Decoder.hh> #include <Node.hh>
#include <avro/Encoder.hh> #include <Schema.hh>
#include <avro/Generic.hh>
#include <avro/GenericDatum.hh>
#include <avro/Node.hh>
#include <avro/NodeConcepts.hh>
#include <avro/NodeImpl.hh>
#include <avro/Reader.hh>
#include <avro/Schema.hh>
#include <avro/Specific.hh>
#include <avro/ValidSchema.hh>
#include <avro/Writer.hh>
#include <re2/re2.h> #include <re2/re2.h>
@ -321,6 +311,70 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
} }
case TypeIndex::Nothing: case TypeIndex::Nothing:
return {avro::NullSchema(), [](const IColumn &, size_t, avro::Encoder & encoder) { encoder.encodeNull(); }}; return {avro::NullSchema(), [](const IColumn &, size_t, avro::Encoder & encoder) { encoder.encodeNull(); }};
case TypeIndex::Tuple:
{
const auto & tuple_type = assert_cast<const DataTypeTuple &>(*data_type);
const auto & nested_types = tuple_type.getElements();
const auto & nested_names = tuple_type.getElementNames();
std::vector<SerializeFn> nested_serializers;
nested_serializers.reserve(nested_types.size());
auto schema = avro::RecordSchema(column_name);
for (size_t i = 0; i != nested_types.size(); ++i)
{
auto nested_mapping = createSchemaWithSerializeFn(nested_types[i], type_name_increment, nested_names[i]);
schema.addField(nested_names[i], nested_mapping.schema);
nested_serializers.push_back(nested_mapping.serialize);
}
return {schema, [nested_serializers](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
const ColumnTuple & column_tuple = assert_cast<const ColumnTuple &>(column);
const auto & nested_columns = column_tuple.getColumns();
for (size_t i = 0; i != nested_serializers.size(); ++i)
nested_serializers[i](*nested_columns[i], row_num, encoder);
}};
}
case TypeIndex::Map:
{
const auto & map_type = assert_cast<const DataTypeMap &>(*data_type);
const auto & keys_type = map_type.getKeyType();
if (!isStringOrFixedString(keys_type))
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Avro Maps support only keys with type String, got {}", keys_type->getName());
auto keys_serializer = [](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
const StringRef & s = column.getDataAt(row_num);
encoder.encodeString(s.toString());
};
const auto & values_type = map_type.getValueType();
auto values_mapping = createSchemaWithSerializeFn(values_type, type_name_increment, column_name + ".value");
auto schema = avro::MapSchema(values_mapping.schema);
return {schema, [keys_serializer, values_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
const ColumnMap & column_map = assert_cast<const ColumnMap &>(column);
const ColumnArray & column_array = column_map.getNestedColumn();
const ColumnArray::Offsets & offsets = column_array.getOffsets();
size_t offset = offsets[row_num - 1];
size_t next_offset = offsets[row_num];
size_t row_count = next_offset - offset;
const ColumnTuple & nested_columns = column_map.getNestedData();
const IColumn & keys_column = nested_columns.getColumn(0);
const IColumn & values_column = nested_columns.getColumn(1);
encoder.mapStart();
if (row_count > 0)
encoder.setItemCount(row_count);
for (size_t i = offset; i < next_offset; ++i)
{
keys_serializer(keys_column, i, encoder);
values_mapping.serialize(values_column, i, encoder);
}
encoder.mapEnd();
}};
}
default: default:
break; break;
} }

View File

@ -9,9 +9,9 @@
#include <IO/WriteBuffer.h> #include <IO/WriteBuffer.h>
#include <Processors/Formats/IRowOutputFormat.h> #include <Processors/Formats/IRowOutputFormat.h>
#include <avro/DataFile.hh> #include <DataFile.hh>
#include <avro/Schema.hh> #include <Schema.hh>
#include <avro/ValidSchema.hh> #include <ValidSchema.hh>
namespace DB namespace DB

View File

@ -1,5 +1,4 @@
#include <Processors/Merges/IMergingTransform.h> #include <Processors/Merges/IMergingTransform.h>
#include <Processors/Transforms/SelectorInfo.h>
namespace DB namespace DB
{ {
@ -181,68 +180,4 @@ IProcessor::Status IMergingTransformBase::prepare()
return Status::Ready; return Status::Ready;
} }
static void filterChunk(IMergingAlgorithm::Input & input, size_t selector_position)
{
if (!input.chunk.getChunkInfo())
throw Exception("IMergingTransformBase expected ChunkInfo for input chunk", ErrorCodes::LOGICAL_ERROR);
const auto * chunk_info = typeid_cast<const SelectorInfo *>(input.chunk.getChunkInfo().get());
if (!chunk_info)
throw Exception("IMergingTransformBase expected SelectorInfo for input chunk", ErrorCodes::LOGICAL_ERROR);
const auto & selector = chunk_info->selector;
IColumn::Filter filter;
filter.resize_fill(selector.size());
size_t num_rows = input.chunk.getNumRows();
auto columns = input.chunk.detachColumns();
size_t num_result_rows = 0;
for (size_t row = 0; row < num_rows; ++row)
{
if (selector[row] == selector_position)
{
++num_result_rows;
filter[row] = 1;
}
}
if (!filter.empty() && filter.back() == 0)
{
filter.back() = 1;
++num_result_rows;
input.skip_last_row = true;
}
for (auto & column : columns)
column = column->filter(filter, num_result_rows);
input.chunk.clear();
input.chunk.setColumns(std::move(columns), num_result_rows);
}
void IMergingTransformBase::filterChunks()
{
if (state.selector_position < 0)
return;
if (!state.init_chunks.empty())
{
for (size_t i = 0; i < input_states.size(); ++i)
{
auto & input = state.init_chunks[i];
if (!input.chunk)
continue;
filterChunk(input, state.selector_position);
}
}
if (state.has_input)
filterChunk(state.input_chunk, state.selector_position);
}
} }

View File

@ -28,17 +28,10 @@ public:
Status prepare() override; Status prepare() override;
/// Set position which will be used in selector if input chunk has attached SelectorInfo (see SelectorInfo.h).
/// Columns will be filtered, keep only rows labeled with this position.
/// It is used in parallel final.
void setSelectorPosition(size_t position) { state.selector_position = position; }
protected: protected:
virtual void onNewInput(); /// Is called when new input is added. Only if have_all_inputs = false. virtual void onNewInput(); /// Is called when new input is added. Only if have_all_inputs = false.
virtual void onFinish() {} /// Is called when all data is processed. virtual void onFinish() {} /// Is called when all data is processed.
void filterChunks(); /// Filter chunks if selector position was set. For parallel final.
/// Processor state. /// Processor state.
struct State struct State
{ {
@ -50,7 +43,6 @@ protected:
size_t next_input_to_read = 0; size_t next_input_to_read = 0;
IMergingAlgorithm::Inputs init_chunks; IMergingAlgorithm::Inputs init_chunks;
ssize_t selector_position = -1;
}; };
State state; State state;
@ -92,8 +84,6 @@ public:
void work() override void work() override
{ {
filterChunks();
if (!state.init_chunks.empty()) if (!state.init_chunks.empty())
algorithm.initialize(std::move(state.init_chunks)); algorithm.initialize(std::move(state.init_chunks));

View File

@ -241,7 +241,9 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
if (missign_column_index < missing_columns.size() && missing_columns[missign_column_index] == i) if (missign_column_index < missing_columns.size() && missing_columns[missign_column_index] == i)
{ {
++missign_column_index; ++missign_column_index;
auto column = ColumnConst::create(col.column->cloneResized(1), 0); auto column_with_default = col.column->cloneEmpty();
col.type->insertDefaultInto(*column_with_default);
auto column = ColumnConst::create(std::move(column_with_default), 0);
const auto * node = &dag->addColumn({ColumnPtr(std::move(column)), col.type, col.name}); const auto * node = &dag->addColumn({ColumnPtr(std::move(column)), col.type, col.name});
node = &dag->materializeNode(*node); node = &dag->materializeNode(*node);
index.push_back(node); index.push_back(node);

View File

@ -86,6 +86,9 @@ static void doDescribeProcessor(const IProcessor & processor, size_t count, IQue
doDescribeHeader(*last_header, num_equal_headers, settings); doDescribeHeader(*last_header, num_equal_headers, settings);
} }
if (!processor.getDescription().empty())
settings.out << String(settings.offset, settings.indent_char) << "Description: " << processor.getDescription() << '\n';
settings.offset += settings.indent; settings.offset += settings.indent;
} }

View File

@ -0,0 +1,274 @@
#include <algorithm>
#include <memory>
#include <numeric>
#include <queue>
#include <unordered_map>
#include <vector>
#include <Core/Field.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Processors/QueryPlan/PartsSplitter.h>
#include <Processors/Transforms/FilterSortedStreamByRange.h>
#include <Storages/MergeTree/RangesInDataPart.h>
using namespace DB;
namespace
{
using Value = std::vector<Field>;
std::string toString(const Value & value)
{
return fmt::format("({})", fmt::join(value, ", "));
}
/// Adaptor to access PK values from index.
class IndexAccess
{
public:
explicit IndexAccess(const RangesInDataParts & parts_) : parts(parts_) { }
Value getValue(size_t part_idx, size_t mark) const
{
const auto & index = parts[part_idx].data_part->index;
Value value(index.size());
for (size_t i = 0; i < value.size(); ++i)
index[i]->get(mark, value[i]);
return value;
}
size_t getMarkRows(size_t part_idx, size_t mark) const { return parts[part_idx].data_part->index_granularity.getMarkRows(mark); }
size_t getTotalRowCount() const
{
size_t total = 0;
for (const auto & part : parts)
total += part.getRowsCount();
return total;
}
private:
const RangesInDataParts & parts;
};
/// Splits parts into layers, each layer will contain parts subranges with PK values from its own range.
/// Will try to produce exactly max_layer layers but may return less if data is distributed in not a very parallelizable way.
std::pair<std::vector<Value>, std::vector<RangesInDataParts>> split(RangesInDataParts parts, size_t max_layers)
{
// We will advance the iterator pointing to the mark with the smallest PK value until there will be not less than rows_per_layer rows in the current layer (roughly speaking).
// Then we choose the last observed value as the new border, so the current layer will consists of granules with values greater than the previous mark and less or equal
// than the new border.
struct PartsRangesIterator
{
struct RangeInDataPart : MarkRange
{
size_t part_idx;
};
enum class EventType
{
RangeBeginning,
RangeEnding,
};
bool operator<(const PartsRangesIterator & other) const { return std::tie(value, event) > std::tie(other.value, other.event); }
Value value;
RangeInDataPart range;
EventType event;
};
const auto index_access = std::make_unique<IndexAccess>(parts);
std::priority_queue<PartsRangesIterator> parts_ranges_queue;
for (size_t part_idx = 0; part_idx < parts.size(); ++part_idx)
{
for (const auto & range : parts[part_idx].ranges)
{
parts_ranges_queue.push(
{index_access->getValue(part_idx, range.begin), {range, part_idx}, PartsRangesIterator::EventType::RangeBeginning});
const auto & index_granularity = parts[part_idx].data_part->index_granularity;
if (index_granularity.hasFinalMark() && range.end + 1 == index_granularity.getMarksCount())
parts_ranges_queue.push(
{index_access->getValue(part_idx, range.end), {range, part_idx}, PartsRangesIterator::EventType::RangeEnding});
}
}
/// The beginning of currently started (but not yet finished) range of marks of a part in the current layer.
std::unordered_map<size_t, size_t> current_part_range_begin;
/// The current ending of a range of marks of a part in the current layer.
std::unordered_map<size_t, size_t> current_part_range_end;
/// Determine borders between layers.
std::vector<Value> borders;
std::vector<RangesInDataParts> result_layers;
const size_t rows_per_layer = std::max<size_t>(index_access->getTotalRowCount() / max_layers, 1);
while (!parts_ranges_queue.empty())
{
// New layer should include last granules of still open ranges from the previous layer,
// because they may already contain values greater than the last border.
size_t rows_in_current_layer = 0;
size_t marks_in_current_layer = 0;
// Intersection between the current and next layers is just the last observed marks of each still open part range. Ratio is empirical.
auto layers_intersection_is_too_big = [&]()
{
const auto intersected_parts = current_part_range_end.size();
return marks_in_current_layer < intersected_parts * 2;
};
result_layers.emplace_back();
while (rows_in_current_layer < rows_per_layer || layers_intersection_is_too_big() || result_layers.size() == max_layers)
{
// We're advancing iterators until a new value showed up.
Value last_value;
while (!parts_ranges_queue.empty() && (last_value.empty() || last_value == parts_ranges_queue.top().value))
{
auto current = parts_ranges_queue.top();
parts_ranges_queue.pop();
const auto part_idx = current.range.part_idx;
if (current.event == PartsRangesIterator::EventType::RangeEnding)
{
result_layers.back().emplace_back(
parts[part_idx].data_part,
parts[part_idx].part_index_in_query,
MarkRanges{{current_part_range_begin[part_idx], current.range.end}});
current_part_range_begin.erase(part_idx);
current_part_range_end.erase(part_idx);
continue;
}
last_value = std::move(current.value);
rows_in_current_layer += index_access->getMarkRows(part_idx, current.range.begin);
marks_in_current_layer++;
current_part_range_begin.try_emplace(part_idx, current.range.begin);
current_part_range_end[part_idx] = current.range.begin;
if (current.range.begin + 1 < current.range.end)
{
current.range.begin++;
current.value = index_access->getValue(part_idx, current.range.begin);
parts_ranges_queue.push(std::move(current));
}
}
if (parts_ranges_queue.empty())
break;
if (rows_in_current_layer >= rows_per_layer && !layers_intersection_is_too_big() && result_layers.size() < max_layers)
borders.push_back(last_value);
}
for (const auto & [part_idx, last_mark] : current_part_range_end)
{
result_layers.back().emplace_back(
parts[part_idx].data_part,
parts[part_idx].part_index_in_query,
MarkRanges{{current_part_range_begin[part_idx], last_mark + 1}});
current_part_range_begin[part_idx] = current_part_range_end[part_idx];
}
}
for (auto & layer : result_layers)
{
std::stable_sort(
layer.begin(),
layer.end(),
[](const auto & lhs, const auto & rhs) { return lhs.part_index_in_query < rhs.part_index_in_query; });
}
return std::make_pair(std::move(borders), std::move(result_layers));
}
/// Will return borders.size()+1 filters in total, i-th filter will accept rows with PK values within the range [borders[i-1], borders[i]).
std::vector<ASTPtr> buildFilters(const KeyDescription & primary_key, const std::vector<Value> & borders)
{
auto add_and_condition = [&](ASTPtr & result, const ASTPtr & foo) { result = !result ? foo : makeASTFunction("and", result, foo); };
/// Produces ASTPtr to predicate (pk_col0, pk_col1, ... , pk_colN) > (value[0], value[1], ... , value[N])
auto lexicographically_greater = [&](const Value & value)
{
// PK may contain functions of the table columns, so we need the actual PK AST with all expressions it contains.
ASTPtr pk_columns_as_tuple = makeASTFunction("tuple", primary_key.expression_list_ast->children);
ASTPtr value_ast = std::make_shared<ASTExpressionList>();
for (size_t i = 0; i < value.size(); ++i)
{
const auto & types = primary_key.data_types;
ASTPtr component_ast = std::make_shared<ASTLiteral>(value[i]);
// Values of some types (e.g. Date, DateTime) are stored in columns as numbers and we get them as just numbers from the index.
// So we need an explicit Cast for them.
if (isColumnedAsNumber(types.at(i)->getTypeId()) && !isNumber(types.at(i)->getTypeId()))
component_ast = makeASTFunction("cast", std::move(component_ast), std::make_shared<ASTLiteral>(types.at(i)->getName()));
value_ast->children.push_back(std::move(component_ast));
}
ASTPtr value_as_tuple = makeASTFunction("tuple", value_ast->children);
return makeASTFunction("greater", pk_columns_as_tuple, value_as_tuple);
};
std::vector<ASTPtr> filters(borders.size() + 1);
for (size_t layer = 0; layer <= borders.size(); ++layer)
{
if (layer > 0)
add_and_condition(filters[layer], lexicographically_greater(borders[layer - 1]));
if (layer < borders.size())
add_and_condition(filters[layer], makeASTFunction("not", lexicographically_greater(borders[layer])));
}
return filters;
}
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
Pipes buildPipesForReadingByPKRanges(
const KeyDescription & primary_key,
RangesInDataParts parts,
size_t max_layers,
ContextPtr context,
ReadingInOrderStepGetter && reading_step_getter)
{
if (max_layers <= 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "max_layer should be greater than 1.");
auto && [borders, result_layers] = split(std::move(parts), max_layers);
auto filters = buildFilters(primary_key, borders);
Pipes pipes(result_layers.size());
for (size_t i = 0; i < result_layers.size(); ++i)
{
pipes[i] = reading_step_getter(std::move(result_layers[i]));
auto & filter_function = filters[i];
if (!filter_function)
continue;
auto syntax_result = TreeRewriter(context).analyze(filter_function, primary_key.expression->getRequiredColumnsWithTypes());
auto actions = ExpressionAnalyzer(filter_function, syntax_result, context).getActionsDAG(false);
ExpressionActionsPtr expression_actions = std::make_shared<ExpressionActions>(std::move(actions));
auto description = fmt::format(
"filter values in [{}, {})", i ? ::toString(borders[i - 1]) : "-inf", i < borders.size() ? ::toString(borders[i]) : "+inf");
auto pk_expression = std::make_shared<ExpressionActions>(primary_key.expression->getActionsDAG().clone());
pipes[i].addSimpleTransform([pk_expression](const Block & header)
{ return std::make_shared<ExpressionTransform>(header, pk_expression); });
pipes[i].addSimpleTransform(
[&](const Block & header)
{
auto step = std::make_shared<FilterSortedStreamByRange>(header, expression_actions, filter_function->getColumnName(), true);
step->setDescription(description);
return step;
});
}
return pipes;
}
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <functional>
#include <Interpreters/Context_fwd.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/KeyDescription.h>
#include <Storages/MergeTree/RangesInDataPart.h>
namespace DB
{
using ReadingInOrderStepGetter = std::function<Pipe(RangesInDataParts)>;
/// Splits parts into layers, each layer will contain parts subranges with PK values from its own range.
/// A separate pipe will be constructed for each layer with a reading step (provided by the reading_step_getter) and a filter for this layer's range of PK values.
/// Will try to produce exactly max_layer pipes but may return less if data is distributed in not a very parallelizable way.
Pipes buildPipesForReadingByPKRanges(
const KeyDescription & primary_key,
RangesInDataParts parts,
size_t max_layers,
ContextPtr context,
ReadingInOrderStepGetter && reading_step_getter);
}

View File

@ -1,14 +1,16 @@
#include <algorithm>
#include <functional>
#include <memory>
#include <numeric>
#include <queue>
#include <stdexcept>
#include <IO/Operators.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTSelectQuery.h> #include <Parsers/ASTSelectQuery.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/ConcatProcessor.h> #include <Processors/ConcatProcessor.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/AddingSelectorTransform.h>
#include <Processors/Transforms/CopyTransform.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Merges/AggregatingSortedTransform.h> #include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/CollapsingSortedTransform.h> #include <Processors/Merges/CollapsingSortedTransform.h>
#include <Processors/Merges/GraphiteRollupSortedTransform.h> #include <Processors/Merges/GraphiteRollupSortedTransform.h>
@ -16,17 +18,22 @@
#include <Processors/Merges/ReplacingSortedTransform.h> #include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h> #include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h> #include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Processors/QueryPlan/PartsSplitter.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeInOrderSelectProcessor.h> #include <Storages/MergeTree/MergeTreeInOrderSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h> #include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h> #include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/VirtualColumnUtils.h> #include <Storages/VirtualColumnUtils.h>
#include <IO/Operators.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <base/sort.h> #include <base/sort.h>
#include <Poco/Logger.h>
#include <Common/JSONBuilder.h> #include <Common/JSONBuilder.h>
namespace ProfileEvents namespace ProfileEvents
@ -560,7 +567,6 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
static void addMergingFinal( static void addMergingFinal(
Pipe & pipe, Pipe & pipe,
size_t num_output_streams,
const SortDescription & sort_description, const SortDescription & sort_description,
MergeTreeData::MergingParams merging_params, MergeTreeData::MergingParams merging_params,
Names partition_key_columns, Names partition_key_columns,
@ -607,56 +613,7 @@ static void addMergingFinal(
__builtin_unreachable(); __builtin_unreachable();
}; };
if (num_output_streams <= 1 || sort_description.empty())
{
pipe.addTransform(get_merging_processor()); pipe.addTransform(get_merging_processor());
return;
}
ColumnNumbers key_columns;
key_columns.reserve(sort_description.size());
for (const auto & desc : sort_description)
key_columns.push_back(header.getPositionByName(desc.column_name));
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<AddingSelectorTransform>(stream_header, num_output_streams, key_columns);
});
pipe.transform([&](OutputPortRawPtrs ports)
{
Processors transforms;
std::vector<OutputPorts::iterator> output_ports;
transforms.reserve(ports.size() + num_output_streams);
output_ports.reserve(ports.size());
for (auto & port : ports)
{
auto copier = std::make_shared<CopyTransform>(header, num_output_streams);
connect(*port, copier->getInputPort());
output_ports.emplace_back(copier->getOutputs().begin());
transforms.emplace_back(std::move(copier));
}
for (size_t i = 0; i < num_output_streams; ++i)
{
auto merge = get_merging_processor();
merge->setSelectorPosition(i);
auto input = merge->getInputs().begin();
/// Connect i-th merge with i-th input port of every copier.
for (size_t j = 0; j < ports.size(); ++j)
{
connect(*output_ports[j], *input);
++output_ports[j];
++input;
}
transforms.emplace_back(std::move(merge));
}
return transforms;
});
} }
@ -710,8 +667,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index) for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index)
{ {
Pipe pipe; Pipes pipes;
{ {
RangesInDataParts new_parts; RangesInDataParts new_parts;
@ -738,21 +694,39 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
if (new_parts.empty()) if (new_parts.empty())
continue; continue;
pipe = read(std::move(new_parts), column_names, ReadFromMergeTree::ReadType::InOrder, if (num_streams > 1 && metadata_for_reading->hasPrimaryKey())
num_streams, 0, info.use_uncompressed_cache); {
// Let's split parts into layers to ensure data parallelism of final.
auto reading_step_getter = [this, &column_names, &info](auto parts)
{
return read(
std::move(parts),
column_names,
ReadFromMergeTree::ReadType::InOrder,
1 /* num_streams */,
0 /* min_marks_for_concurrent_read */,
info.use_uncompressed_cache);
};
pipes = buildPipesForReadingByPKRanges(
metadata_for_reading->getPrimaryKey(), std::move(new_parts), num_streams, context, std::move(reading_step_getter));
}
else
{
pipes.emplace_back(read(
std::move(new_parts), column_names, ReadFromMergeTree::ReadType::InOrder, num_streams, 0, info.use_uncompressed_cache));
}
/// Drop temporary columns, added by 'sorting_key_expr' /// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection) if (!out_projection)
out_projection = createProjection(pipe.getHeader()); out_projection = createProjection(pipes.front().getHeader());
} }
auto sorting_expr = std::make_shared<ExpressionActions>( auto sorting_expr = std::make_shared<ExpressionActions>(
metadata_for_reading->getSortingKey().expression->getActionsDAG().clone()); metadata_for_reading->getSortingKey().expression->getActionsDAG().clone());
for (auto & pipe : pipes)
pipe.addSimpleTransform([sorting_expr](const Block & header) pipe.addSimpleTransform([sorting_expr](const Block & header)
{ { return std::make_shared<ExpressionTransform>(header, sorting_expr); });
return std::make_shared<ExpressionTransform>(header, sorting_expr);
});
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition /// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
/// with level > 0 then we won't postprocess this part /// with level > 0 then we won't postprocess this part
@ -760,7 +734,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0) parts_to_merge_ranges[range_index]->data_part->info.level > 0)
{ {
partition_pipes.emplace_back(std::move(pipe)); partition_pipes.emplace_back(Pipe::unitePipes(std::move(pipes)));
continue; continue;
} }
@ -777,21 +751,21 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
for (size_t i = 0; i < sort_columns_size; ++i) for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(sort_columns[i], 1, 1); sort_description.emplace_back(sort_columns[i], 1, 1);
for (auto & pipe : pipes)
addMergingFinal( addMergingFinal(
pipe, pipe,
std::min<size_t>(num_streams, settings.max_final_threads), sort_description,
sort_description, data.merging_params, partition_key_columns, max_block_size); data.merging_params,
partition_key_columns,
max_block_size);
partition_pipes.emplace_back(std::move(pipe)); partition_pipes.emplace_back(Pipe::unitePipes(std::move(pipes)));
} }
if (!lonely_parts.empty()) if (!lonely_parts.empty())
{ {
RangesInDataParts new_parts;
size_t num_streams_for_lonely_parts = num_streams * lonely_parts.size(); size_t num_streams_for_lonely_parts = num_streams * lonely_parts.size();
const size_t min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead( const size_t min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead(
settings.merge_tree_min_rows_for_concurrent_read, settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read, settings.merge_tree_min_bytes_for_concurrent_read,

View File

@ -1,76 +0,0 @@
#include <Processors/Transforms/AddingSelectorTransform.h>
#include <Processors/Transforms/SelectorInfo.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
AddingSelectorTransform::AddingSelectorTransform(
const Block & header, size_t num_outputs_, ColumnNumbers key_columns_)
: ISimpleTransform(header, header, false)
, num_outputs(num_outputs_)
, key_columns(std::move(key_columns_))
, hash(0)
{
setInputNotNeededAfterRead(false);
if (num_outputs <= 1)
throw Exception("SplittingByHashTransform expects more than 1 outputs, got " + std::to_string(num_outputs),
ErrorCodes::LOGICAL_ERROR);
if (key_columns.empty())
throw Exception("SplittingByHashTransform cannot split by empty set of key columns",
ErrorCodes::LOGICAL_ERROR);
for (auto & column : key_columns)
if (column >= header.columns())
throw Exception("Invalid column number: " + std::to_string(column) +
". There is only " + std::to_string(header.columns()) + " columns in header",
ErrorCodes::LOGICAL_ERROR);
}
static void calculateWeakHash32(const Chunk & chunk, const ColumnNumbers & key_columns, WeakHash32 & hash)
{
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
hash.reset(num_rows);
for (const auto & column_number : key_columns)
columns[column_number]->updateWeakHash32(hash);
}
static IColumn::Selector fillSelector(const WeakHash32 & hash, size_t num_outputs)
{
/// Row from interval [(2^32 / num_outputs) * i, (2^32 / num_outputs) * (i + 1)) goes to bucket with number i.
const auto & hash_data = hash.getData();
size_t num_rows = hash_data.size();
IColumn::Selector selector(num_rows);
for (size_t row = 0; row < num_rows; ++row)
{
selector[row] = hash_data[row]; /// [0, 2^32)
selector[row] *= num_outputs; /// [0, num_outputs * 2^32), selector stores 64 bit values.
selector[row] >>= 32u; /// [0, num_outputs)
}
return selector;
}
void AddingSelectorTransform::transform(Chunk & input_chunk, Chunk & output_chunk)
{
auto chunk_info = std::make_shared<SelectorInfo>();
calculateWeakHash32(input_chunk, key_columns, hash);
chunk_info->selector = fillSelector(hash, num_outputs);
input_chunk.swap(output_chunk);
output_chunk.setChunkInfo(std::move(chunk_info));
}
}

View File

@ -1,26 +0,0 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/ISimpleTransform.h>
#include <Core/ColumnNumbers.h>
#include <Common/WeakHash.h>
namespace DB
{
/// Add IColumn::Selector to chunk (see SelectorInfo.h).
/// Selector is filled by formula (WeakHash(key_columns) * num_outputs / MAX_INT).
class AddingSelectorTransform : public ISimpleTransform
{
public:
AddingSelectorTransform(const Block & header, size_t num_outputs_, ColumnNumbers key_columns_);
String getName() const override { return "AddingSelector"; }
void transform(Chunk & input_chunk, Chunk & output_chunk) override;
private:
size_t num_outputs;
ColumnNumbers key_columns;
WeakHash32 hash;
};
}

View File

@ -35,6 +35,8 @@ void CubeTransform::consume(Chunk chunk)
consumed_chunks.emplace_back(std::move(chunk)); consumed_chunks.emplace_back(std::move(chunk));
} }
MutableColumnPtr getColumnWithDefaults(Block const & header, size_t key, size_t n);
Chunk CubeTransform::generate() Chunk CubeTransform::generate()
{ {
if (!consumed_chunks.empty()) if (!consumed_chunks.empty())
@ -53,8 +55,9 @@ Chunk CubeTransform::generate()
current_zero_columns.clear(); current_zero_columns.clear();
current_zero_columns.reserve(keys.size()); current_zero_columns.reserve(keys.size());
auto const & input_header = getInputPort().getHeader();
for (auto key : keys) for (auto key : keys)
current_zero_columns.emplace_back(current_columns[key]->cloneEmpty()->cloneResized(num_rows)); current_zero_columns.emplace_back(getColumnWithDefaults(input_header, key, num_rows));
} }
auto gen_chunk = std::move(cube_chunk); auto gen_chunk = std::move(cube_chunk);

View File

@ -0,0 +1,66 @@
#pragma once
#include <Interpreters/ExpressionActions.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
namespace DB
{
/// Could be used when the predicate given by expression_ is true only on one continuous range of values and input is monotonous by that value.
/// The following optimization applies: when a new chunk of data comes in we firstly execute the expression_ only on the first and the last row.
/// If it evaluates to true on both rows then the whole chunk is immediately passed to further steps.
/// Otherwise, we apply the expression_ to all rows.
class FilterSortedStreamByRange : public ISimpleTransform
{
public:
FilterSortedStreamByRange(
const Block & header_,
ExpressionActionsPtr expression_,
String filter_column_name_,
bool remove_filter_column_,
bool on_totals_ = false)
: ISimpleTransform(
header_,
FilterTransform::transformHeader(header_, expression_->getActionsDAG(), filter_column_name_, remove_filter_column_),
true)
, filter_transform(header_, expression_, filter_column_name_, remove_filter_column_, on_totals_)
{
}
String getName() const override { return "FilterSortedStreamByRange"; }
void transform(Chunk & chunk) override
{
int rows_before_filtration = chunk.getNumRows();
if (rows_before_filtration < 2)
{
filter_transform.transform(chunk);
return;
}
// Evaluate expression on just the first and the last row.
// If both of them satisfies conditions, than skip calculation for all the rows in between.
auto quick_check_columns = chunk.cloneEmptyColumns();
auto src_columns = chunk.detachColumns();
for (auto row : {0, rows_before_filtration - 1})
for (size_t col = 0; col < quick_check_columns.size(); ++col)
quick_check_columns[col]->insertFrom(*src_columns[col].get(), row);
chunk.setColumns(std::move(quick_check_columns), 2);
filter_transform.transform(chunk);
const bool all_rows_will_pass_filter = chunk.getNumRows() == 2;
chunk.setColumns(std::move(src_columns), rows_before_filtration);
// Not all rows satisfy conditions.
if (!all_rows_will_pass_filter)
filter_transform.transform(chunk);
}
private:
FilterTransform filter_transform;
};
}

View File

@ -32,7 +32,6 @@ public:
Status prepare() override; Status prepare() override;
protected:
void transform(Chunk & chunk) override; void transform(Chunk & chunk) override;
private: private:

View File

@ -29,6 +29,14 @@ Chunk RollupTransform::merge(Chunks && chunks, bool final)
return Chunk(rollup_block.getColumns(), num_rows); return Chunk(rollup_block.getColumns(), num_rows);
} }
MutableColumnPtr getColumnWithDefaults(Block const & header, size_t key, size_t n)
{
auto const & col = header.getByPosition(key);
auto result_column = col.column->cloneEmpty();
col.type->insertManyDefaultsInto(*result_column, n);
return result_column;
}
Chunk RollupTransform::generate() Chunk RollupTransform::generate()
{ {
if (!consumed_chunks.empty()) if (!consumed_chunks.empty())
@ -51,7 +59,7 @@ Chunk RollupTransform::generate()
auto num_rows = gen_chunk.getNumRows(); auto num_rows = gen_chunk.getNumRows();
auto columns = gen_chunk.getColumns(); auto columns = gen_chunk.getColumns();
columns[key] = columns[key]->cloneEmpty()->cloneResized(num_rows); columns[key] = getColumnWithDefaults(getInputPort().getHeader(), key, num_rows);
Chunks chunks; Chunks chunks;
chunks.emplace_back(std::move(columns), num_rows); chunks.emplace_back(std::move(columns), num_rows);

View File

@ -1,14 +0,0 @@
#pragma once
#include <Processors/Chunk.h>
#include <Common/PODArray.h>
namespace DB
{
/// ChunkInfo with IColumn::Selector. It is added by AddingSelectorTransform.
struct SelectorInfo : public ChunkInfo
{
IColumn::Selector selector;
};
}

View File

@ -28,7 +28,10 @@ void printPipeline(const Processors & processors, const Statuses & statuses, Wri
/// Nodes // TODO quoting and escaping /// Nodes // TODO quoting and escaping
for (const auto & processor : processors) for (const auto & processor : processors)
{ {
out << " n" << get_proc_id(*processor) << "[label=\"" << processor->getName() << processor->getDescription(); auto description = processor->getDescription();
if (!description.empty())
description = ": " + description;
out << " n" << get_proc_id(*processor) << "[label=\"" << processor->getName() << description;
if (statuses_iter != statuses.end()) if (statuses_iter != statuses.end())
{ {

View File

@ -1256,27 +1256,6 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{ {
ignore_max_size = max_source_parts_size == data_settings->max_bytes_to_merge_at_max_space_in_pool; ignore_max_size = max_source_parts_size == data_settings->max_bytes_to_merge_at_max_space_in_pool;
if (data_settings->always_fetch_merged_part && entry.num_tries > 0)
{
static constexpr auto MAX_SECONDS_TO_WAIT = 300L;
static constexpr auto BACKOFF_SECONDS = 3;
auto time_to_wait_seconds = std::min<int64_t>(MAX_SECONDS_TO_WAIT, entry.num_tries * BACKOFF_SECONDS);
auto time_since_last_try_seconds = std::time(nullptr) - entry.last_attempt_time;
/// Otherwise we will constantly look for part on other replicas
/// and load zookeeper too much.
if (time_to_wait_seconds > time_since_last_try_seconds)
{
out_postpone_reason = fmt::format(
"Not executing log entry ({}) to merge parts for part {} because `always_fetch_merged_part` enabled and "
" not enough time had been passed since last try, have to wait {} seconds",
entry.znode_name, entry.new_part_name, time_to_wait_seconds - time_since_last_try_seconds);
LOG_DEBUG(log, fmt::runtime(out_postpone_reason));
return false;
}
}
if (isTTLMergeType(entry.merge_type)) if (isTTLMergeType(entry.merge_type))
{ {
if (merger_mutator.ttl_merges_blocker.isCancelled()) if (merger_mutator.ttl_merges_blocker.isCancelled())

View File

@ -93,7 +93,7 @@ def get_failed_report(
elapsed_seconds=0, elapsed_seconds=0,
with_coverage=False, with_coverage=False,
) )
return [build_result], [[""]], [""] return [build_result], [[""]], [GITHUB_RUN_URL]
def process_report( def process_report(

View File

@ -12,6 +12,7 @@
<value>ZSTD</value> <value>ZSTD</value>
<value>DoubleDelta</value> <value>DoubleDelta</value>
<value>Gorilla</value> <value>Gorilla</value>
<value>FPC</value>
</values> </values>
</substitution> </substitution>
<substitution> <substitution>

View File

@ -12,6 +12,7 @@
<value>ZSTD</value> <value>ZSTD</value>
<value>DoubleDelta</value> <value>DoubleDelta</value>
<value>Gorilla</value> <value>Gorilla</value>
<value>FPC</value>
</values> </values>
</substitution> </substitution>
<substitution> <substitution>

View File

@ -18,6 +18,7 @@
<value>collapsing_final_16p_str_keys_rnd</value> <value>collapsing_final_16p_str_keys_rnd</value>
<value>collapsing_final_1024p_ord</value> <value>collapsing_final_1024p_ord</value>
<value>collapsing_final_1024p_rnd</value> <value>collapsing_final_1024p_rnd</value>
<value>collapsing_final_1p_ord</value>
</values> </values>
</substitution> </substitution>
</substitutions> </substitutions>
@ -30,6 +31,7 @@
<create_query>create table collapsing_final_16p_str_keys_rnd (key1 UInt32, key2 String, key3 String, key4 String, key5 String, key6 String, key7 String, key8 String, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2, key3, key4, key5, key6, key7, key8) partition by key1 % 16 </create_query> <create_query>create table collapsing_final_16p_str_keys_rnd (key1 UInt32, key2 String, key3 String, key4 String, key5 String, key6 String, key7 String, key8 String, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2, key3, key4, key5, key6, key7, key8) partition by key1 % 16 </create_query>
<create_query>create table collapsing_final_1024p_ord (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by intDiv(key1, 8192 * 2) </create_query> <create_query>create table collapsing_final_1024p_ord (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by intDiv(key1, 8192 * 2) </create_query>
<create_query>create table collapsing_final_1024p_rnd (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by key1 % 1024 </create_query> <create_query>create table collapsing_final_1024p_rnd (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by key1 % 1024 </create_query>
<create_query>create table collapsing_final_1p_ord (key1 UInt64, key2 UInt64, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2)</create_query>
<!-- 16 parts, 8192 * 1024 rows each --> <!-- 16 parts, 8192 * 1024 rows each -->
<fill_query>insert into collapsing_final_16p_ord select number, number, 1, number from numbers_mt(8388608) </fill_query> <fill_query>insert into collapsing_final_16p_ord select number, number, 1, number from numbers_mt(8388608) </fill_query>
@ -43,6 +45,9 @@
<fill_query>insert into collapsing_final_1024p_ord select number, 1, number from numbers_mt(16777216) </fill_query> <fill_query>insert into collapsing_final_1024p_ord select number, 1, number from numbers_mt(16777216) </fill_query>
<fill_query>insert into collapsing_final_1024p_rnd select number, 1, number from numbers_mt(16777216) </fill_query> <fill_query>insert into collapsing_final_1024p_rnd select number, 1, number from numbers_mt(16777216) </fill_query>
<!-- 1 big part of 5e7 rows -->
<fill_query>insert into collapsing_final_1p_ord select number, number + 1, 1, number from numbers_mt(5e7)</fill_query>
<fill_query>optimize table {collapsing} final</fill_query> <fill_query>optimize table {collapsing} final</fill_query>
<query>SELECT count() FROM {collapsing} final</query> <query>SELECT count() FROM {collapsing} final</query>

View File

@ -0,0 +1,31 @@
<test>
<substitutions>
<substitution>
<name>func</name>
<values>
<value>not</value>
</values>
</substitution>
<substitution>
<name>expr</name>
<values>
<value>number</value>
<value>toUInt32(number)</value>
<value>toUInt16(number)</value>
<value>toUInt8(number)</value>
<value>toInt64(number)</value>
<value>toInt32(number)</value>
<value>toInt16(number)</value>
<value>toInt8(number)</value>
<value>toFloat64(number)</value>
<value>toFloat32(number)</value>
</values>
</substitution>
</substitutions>
<query>SELECT {func}({expr}) FROM numbers(1000000000) FORMAT Null</query>
</test>

View File

@ -376,11 +376,6 @@
2019-05-03 4 2019-05-03 4
2019-05-03 1 2019-05-03 1
2019-05-03 -2 2019-05-03 -2
2019-05-01 10
2019-05-01 7
2019-05-01 4
2019-05-01 1
2019-05-01 -2
*** date WITH FILL TO 2019-06-23 STEP 3, val WITH FILL FROM -10 STEP 2 *** date WITH FILL TO 2019-06-23 STEP 3, val WITH FILL FROM -10 STEP 2
2019-05-07 -10 2019-05-07 -10
2019-05-07 -8 2019-05-07 -8

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# Tags: no-replicated-database, no-parallel # Tags: no-replicated-database, no-parallel, no-ordinary-database
# Tag no-replicated-database: Unsupported type of ALTER query # Tag no-replicated-database: Unsupported type of ALTER query
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)

View File

@ -16,8 +16,15 @@ ExpressionTransform
ExpressionTransform × 2 ExpressionTransform × 2
(ReadFromMergeTree) (ReadFromMergeTree)
ExpressionTransform × 2 ExpressionTransform × 2
ReplacingSorted × 2 2 → 1 ReplacingSorted
Copy × 2 1 → 2 ExpressionTransform
AddingSelector × 2 FilterSortedStreamByRange
Description: filter values in [(5), +inf)
ExpressionTransform
MergeTreeInOrder 0 → 1
ReplacingSorted 2 → 1
ExpressionTransform × 2
FilterSortedStreamByRange × 2
Description: filter values in [-inf, (5))
ExpressionTransform × 2 ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1 MergeTreeInOrder × 2 0 → 1

View File

@ -34,12 +34,3 @@
6 -4 6 -4
6 -3 6 -3
6 -2 6 -2
7 -10
7 -9
7 -8
7 -7
7 -6
7 -5
7 -4
7 -3
7 -2

View File

@ -103,10 +103,6 @@
2020-04-01 2 0 2020-04-01 2 0
2020-04-01 3 0 2020-04-01 3 0
2020-04-01 4 0 2020-04-01 4 0
2020-05-01 1 0
2020-05-01 2 0
2020-05-01 3 0
2020-05-01 4 0
1970-01-04 1970-01-04
1970-01-03 1970-01-03
1970-01-02 1970-01-02

View File

@ -0,0 +1,9 @@
2
2
3
5
8
8
8
8
8

View File

@ -0,0 +1,31 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
test_random_values() {
layers=$1
$CLICKHOUSE_CLIENT -n -q "
create table tbl_8parts_${layers}granules_rnd (key1 UInt32, sign Int8) engine = CollapsingMergeTree(sign) order by (key1) partition by (key1 % 8);
insert into tbl_8parts_${layers}granules_rnd select number, 1 from numbers_mt($((layers * 8 * 8192)));
explain pipeline select * from tbl_8parts_${layers}granules_rnd final settings max_threads = 16;" 2>&1 |
grep -c "CollapsingSortedTransform"
}
for layers in 2 3 5 8; do
test_random_values $layers
done;
test_sequential_values() {
layers=$1
$CLICKHOUSE_CLIENT -n -q "
create table tbl_8parts_${layers}granules_seq (key1 UInt32, sign Int8) engine = CollapsingMergeTree(sign) order by (key1) partition by (key1 / $((layers * 8192)))::UInt64;
insert into tbl_8parts_${layers}granules_seq select number, 1 from numbers_mt($((layers * 8 * 8192)));
explain pipeline select * from tbl_8parts_${layers}granules_seq final settings max_threads = 8;" 2>&1 |
grep -c "CollapsingSortedTransform"
}
for layers in 2 3 5 8 16; do
test_sequential_values $layers
done;

View File

@ -0,0 +1,25 @@
t Tuple(a Int32, b String)
(0,'String')
(1,'String')
(2,'String')
t Tuple(a Int32, b Tuple(c Int32, d Int32), e Array(Int32))
(0,(1,2),[])
(1,(2,3),[0])
(2,(3,4),[0,1])
a.b Array(Int32)
a.c Array(Int32)
[0,1] [2,3]
[1,2] [3,4]
[2,3] [4,5]
a.b Array(Array(Tuple(c Int32, d Int32)))
[[(0,1),(2,3)]]
[[(1,2),(3,4)]]
[[(2,3),(4,5)]]
m Map(String, Int64)
{'key_0':0}
{'key_1':1}
{'key_2':2}
m Map(String, Tuple(`1` Int64, `2` Array(Int64)))
{'key_0':(0,[])}
{'key_1':(1,[0])}
{'key_2':(2,[0,1])}

View File

@ -0,0 +1,26 @@
-- Tags: no-parallel, no-fasttest
insert into function file(data_02313.avro) select tuple(number, 'String')::Tuple(a UInt32, b String) as t from numbers(3) settings engine_file_truncate_on_insert=1;
desc file(data_02313.avro);
select * from file(data_02313.avro);
insert into function file(data_02313.avro) select tuple(number, tuple(number + 1, number + 2), range(number))::Tuple(a UInt32, b Tuple(c UInt32, d UInt32), e Array(UInt32)) as t from numbers(3) settings engine_file_truncate_on_insert=1;
desc file(data_02313.avro);
select * from file(data_02313.avro);
insert into function file(data_02313.avro, auto, 'a Nested(b UInt32, c UInt32)') select [number, number + 1], [number + 2, number + 3] from numbers(3) settings engine_file_truncate_on_insert=1;
desc file(data_02313.avro);
select * from file(data_02313.avro);
insert into function file(data_02313.avro, auto, 'a Nested(b Nested(c UInt32, d UInt32))') select [[(number, number + 1), (number + 2, number + 3)]] from numbers(3) settings engine_file_truncate_on_insert=1;
desc file(data_02313.avro);
select * from file(data_02313.avro);
insert into function file(data_02313.avro) select map(concat('key_', toString(number)), number) as m from numbers(3) settings engine_file_truncate_on_insert=1;
desc file(data_02313.avro);
select * from file(data_02313.avro);
insert into function file(data_02313.avro) select map(concat('key_', toString(number)), tuple(number, range(number))) as m from numbers(3) settings engine_file_truncate_on_insert=1;
desc file(data_02313.avro);
select * from file(data_02313.avro);

View File

@ -0,0 +1,113 @@
-- { echoOn }
SELECT
count() as d, a, b, c
FROM test02313
GROUP BY ROLLUP(a, b, c)
ORDER BY d, a, b, c;
1 one default 0
1 one default 2
1 one default 4
1 one default 6
1 one default 8
1 two non-default 1
1 two non-default 3
1 two non-default 5
1 two non-default 7
1 two non-default 9
5 one default 0
5 one default 0
5 two default 0
5 two non-default 0
10 one default 0
SELECT
count() as d, a, b, c
FROM test02313
GROUP BY CUBE(a, b, c)
ORDER BY d, a, b, c;
1 one default 0
1 one default 0
1 one default 0
1 one default 0
1 one default 1
1 one default 2
1 one default 2
1 one default 2
1 one default 2
1 one default 3
1 one default 4
1 one default 4
1 one default 4
1 one default 4
1 one default 5
1 one default 6
1 one default 6
1 one default 6
1 one default 6
1 one default 7
1 one default 8
1 one default 8
1 one default 8
1 one default 8
1 one default 9
1 one non-default 1
1 one non-default 3
1 one non-default 5
1 one non-default 7
1 one non-default 9
1 two default 1
1 two default 3
1 two default 5
1 two default 7
1 two default 9
1 two non-default 1
1 two non-default 3
1 two non-default 5
1 two non-default 7
1 two non-default 9
5 one default 0
5 one default 0
5 one default 0
5 one non-default 0
5 two default 0
5 two non-default 0
10 one default 0
SELECT
count() as d, a, b, c
FROM test02313
GROUP BY GROUPING SETS
(
(c),
(a, c),
(b, c)
)
ORDER BY d, a, b, c;
1 one default 0
1 one default 0
1 one default 0
1 one default 1
1 one default 2
1 one default 2
1 one default 2
1 one default 3
1 one default 4
1 one default 4
1 one default 4
1 one default 5
1 one default 6
1 one default 6
1 one default 6
1 one default 7
1 one default 8
1 one default 8
1 one default 8
1 one default 9
1 one non-default 1
1 one non-default 3
1 one non-default 5
1 one non-default 7
1 one non-default 9
1 two default 1
1 two default 3
1 two default 5
1 two default 7
1 two default 9

View File

@ -0,0 +1,39 @@
DROP TABLE IF EXISTS test02313;
CREATE TABLE test02313
(
a Enum('one' = 1, 'two' = 2),
b Enum('default' = 0, 'non-default' = 1),
c UInt8
)
ENGINE = MergeTree()
ORDER BY (a, b, c);
INSERT INTO test02313 SELECT number % 2 + 1 AS a, number % 2 AS b, number FROM numbers(10);
-- { echoOn }
SELECT
count() as d, a, b, c
FROM test02313
GROUP BY ROLLUP(a, b, c)
ORDER BY d, a, b, c;
SELECT
count() as d, a, b, c
FROM test02313
GROUP BY CUBE(a, b, c)
ORDER BY d, a, b, c;
SELECT
count() as d, a, b, c
FROM test02313
GROUP BY GROUPING SETS
(
(c),
(a, c),
(b, c)
)
ORDER BY d, a, b, c;
-- { echoOff }
DROP TABLE test02313;

View File

@ -0,0 +1,4 @@
F64
F32
F64
F32

View File

@ -0,0 +1,121 @@
DROP TABLE IF EXISTS codecTest;
CREATE TABLE codecTest (
key UInt64,
name String,
ref_valueF64 Float64,
ref_valueF32 Float32,
valueF64 Float64 CODEC(FPC),
valueF32 Float32 CODEC(FPC)
) Engine = MergeTree ORDER BY key;
-- best case - same value
INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32)
SELECT number AS n, 'e()', e() AS v, v, v, v FROM system.numbers LIMIT 1, 100;
-- good case - values that grow insignificantly
INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32)
SELECT number AS n, 'log2(n)', log2(n) AS v, v, v, v FROM system.numbers LIMIT 101, 100;
-- bad case - values differ significantly
INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32)
SELECT number AS n, 'n*sqrt(n)', n*sqrt(n) AS v, v, v, v FROM system.numbers LIMIT 201, 100;
-- worst case - almost like a random values
INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32)
SELECT number AS n, 'sin(n*n*n)*n', sin(n * n * n * n* n) AS v, v, v, v FROM system.numbers LIMIT 301, 100;
-- These floating-point values are expected to be BINARY equal, so comparing by-value is Ok here.
-- referencing previous row key, value, and case name to simplify debugging.
SELECT 'F64';
SELECT
c1.key, c1.name,
c1.ref_valueF64, c1.valueF64, c1.ref_valueF64 - c1.valueF64 AS dF64,
'prev:',
c2.key, c2.ref_valueF64
FROM
codecTest as c1, codecTest as c2
WHERE
dF64 != 0
AND
c2.key = c1.key - 1
LIMIT 10;
SELECT 'F32';
SELECT
c1.key, c1.name,
c1.ref_valueF32, c1.valueF32, c1.ref_valueF32 - c1.valueF32 AS dF32,
'prev:',
c2.key, c2.ref_valueF32
FROM
codecTest as c1, codecTest as c2
WHERE
dF32 != 0
AND
c2.key = c1.key - 1
LIMIT 10;
DROP TABLE IF EXISTS codecTest;
CREATE TABLE codecTest (
key UInt64,
name String,
ref_valueF64 Float64,
ref_valueF32 Float32,
valueF64 Float64 CODEC(FPC(4)),
valueF32 Float32 CODEC(FPC(4))
) Engine = MergeTree ORDER BY key;
-- best case - same value
INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32)
SELECT number AS n, 'e()', e() AS v, v, v, v FROM system.numbers LIMIT 1, 100;
-- good case - values that grow insignificantly
INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32)
SELECT number AS n, 'log2(n)', log2(n) AS v, v, v, v FROM system.numbers LIMIT 101, 100;
-- bad case - values differ significantly
INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32)
SELECT number AS n, 'n*sqrt(n)', n*sqrt(n) AS v, v, v, v FROM system.numbers LIMIT 201, 100;
-- worst case - almost like a random values
INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32)
SELECT number AS n, 'sin(n*n*n)*n', sin(n * n * n * n* n) AS v, v, v, v FROM system.numbers LIMIT 301, 100;
-- These floating-point values are expected to be BINARY equal, so comparing by-value is Ok here.
-- referencing previous row key, value, and case name to simplify debugging.
SELECT 'F64';
SELECT
c1.key, c1.name,
c1.ref_valueF64, c1.valueF64, c1.ref_valueF64 - c1.valueF64 AS dF64,
'prev:',
c2.key, c2.ref_valueF64
FROM
codecTest as c1, codecTest as c2
WHERE
dF64 != 0
AND
c2.key = c1.key - 1
LIMIT 10;
SELECT 'F32';
SELECT
c1.key, c1.name,
c1.ref_valueF32, c1.valueF32, c1.ref_valueF32 - c1.valueF32 AS dF32,
'prev:',
c2.key, c2.ref_valueF32
FROM
codecTest as c1, codecTest as c2
WHERE
dF32 != 0
AND
c2.key = c1.key - 1
LIMIT 10;
DROP TABLE IF EXISTS codecTest;

View File

@ -0,0 +1,6 @@
a Nullable(Int64)
b Array(Tuple(c Nullable(Int64), d Nullable(String)))
1 [(100,'Q'),(200,'W')]
0
0
0

View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
# Tags: no-parallel, no-fasttest
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
DATA_DIR=$CUR_DIR/data_avro
$CLICKHOUSE_LOCAL -q "desc file('$DATA_DIR/nullable_array.avro') settings input_format_avro_null_as_default=1"
$CLICKHOUSE_LOCAL -q "select * from file('$DATA_DIR/nullable_array.avro') settings input_format_avro_null_as_default=1"
$CLICKHOUSE_CLIENT -q "insert into function file(data_02314.avro) select NULL::Nullable(UInt32) as x from numbers(3) settings engine_file_truncate_on_insert=1"
$CLICKHOUSE_CLIENT -q "select * from file(data_02314.avro, auto, 'x UInt32') settings input_format_avro_null_as_default=1"