diff --git a/docs/en/sql-reference/aggregate-functions/reference/varsamp.md b/docs/en/sql-reference/aggregate-functions/reference/varsamp.md index 844b2006c91..9c0636ad1b4 100644 --- a/docs/en/sql-reference/aggregate-functions/reference/varsamp.md +++ b/docs/en/sql-reference/aggregate-functions/reference/varsamp.md @@ -6,10 +6,10 @@ sidebar_position: 33 Calculates the amount `Σ((x - x̅)^2) / (n - 1)`, where `n` is the sample size and `x̅`is the average value of `x`. -It represents an unbiased estimate of the variance of a random variable if passed values form its sample. +It represents an unbiased estimate of the variance of a random variable if passed values from its sample. Returns `Float64`. When `n <= 1`, returns `+∞`. :::note This function uses a numerically unstable algorithm. If you need [numerical stability](https://en.wikipedia.org/wiki/Numerical_stability) in calculations, use the `varSampStable` function. It works slower but provides a lower computational error. -::: \ No newline at end of file +::: diff --git a/docs/ru/faq/operations/delete-old-data.md b/docs/ru/faq/operations/delete-old-data.md index 92736ef5205..ab221f8303b 100644 --- a/docs/ru/faq/operations/delete-old-data.md +++ b/docs/ru/faq/operations/delete-old-data.md @@ -22,9 +22,9 @@ ClickHouse позволяет автоматически удалять данн ClickHouse не удаляет данные в реальном времени, как СУБД [OLTP](https://en.wikipedia.org/wiki/Online_transaction_processing). Больше всего на такое удаление похожи мутации. Они выполняются с помощью запросов `ALTER ... DELETE` или `ALTER ... UPDATE`. В отличие от обычных запросов `DELETE` и `UPDATE`, мутации выполняются асинхронно, в пакетном режиме, не в реальном времени. В остальном после слов `ALTER TABLE` синтаксис обычных запросов и мутаций одинаковый. -`ALTER DELETE` можно использовать для гибкого удаления устаревших данных. Если вам нужно делать это регулярно, единственный недостаток такого подхода будет заключаться в том, что потребуется внешняя система для запуска запроса. Кроме того, могут возникнуть некоторые проблемы с производительностью, поскольку мутации перезаписывают целые куски данных если в них содержится хотя бы одна строка, которую нужно удалить. +`ALTER DELETE` можно использовать для гибкого удаления устаревших данных. Если вам нужно делать это регулярно, основной недостаток такого подхода будет заключаться в том, что потребуется внешняя система для запуска запроса. Кроме того, могут возникнуть некоторые проблемы с производительностью, поскольку мутации перезаписывают целые куски данных если в них содержится хотя бы одна строка, которую нужно удалить. -Это самый распространенный подход к тому, чтобы обеспечить соблюдение принципов [GDPR](https://gdpr-info.eu) в вашей системе на ClickHouse. +Это - самый распространенный подход к тому, чтобы обеспечить соблюдение принципов [GDPR](https://gdpr-info.eu) в вашей системе на ClickHouse. Подробнее смотрите в разделе [Мутации](../../sql-reference/statements/alter/index.md#alter-mutations). diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index b586979b546..733c2d6b4df 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -950,18 +950,17 @@ void ClientBase::onProfileEvents(Block & block) progress_indication.addThreadIdToList(host_name, thread_id); auto event_name = names.getDataAt(i); auto value = array_values[i]; + + /// Ignore negative time delta or memory usage just in case. + if (value < 0) + continue; + if (event_name == user_time_name) - { thread_times[host_name][thread_id].user_ms = value; - } else if (event_name == system_time_name) - { thread_times[host_name][thread_id].system_ms = value; - } else if (event_name == MemoryTracker::USAGE_EVENT_NAME) - { thread_times[host_name][thread_id].memory_usage = value; - } } auto elapsed_time = profile_events.watch.elapsedMicroseconds(); progress_indication.updateThreadEventData(thread_times, elapsed_time); diff --git a/src/Columns/ColumnVector.cpp b/src/Columns/ColumnVector.cpp index 04f2efab0d7..391baa188d6 100644 --- a/src/Columns/ColumnVector.cpp +++ b/src/Columns/ColumnVector.cpp @@ -318,7 +318,59 @@ template void ColumnVector::updatePermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability, size_t limit, int nan_direction_hint, IColumn::Permutation & res, EqualRanges & equal_ranges) const { - auto sort = [](auto begin, auto end, auto pred) { ::sort(begin, end, pred); }; + bool reverse = direction == IColumn::PermutationSortDirection::Descending; + bool ascending = direction == IColumn::PermutationSortDirection::Ascending; + bool sort_is_stable = stability == IColumn::PermutationSortStability::Stable; + + auto sort = [&](auto begin, auto end, auto pred) + { + /// A case for radix sort + if constexpr (is_arithmetic_v && !is_big_int_v) + { + /// TODO: LSD RadixSort is currently not stable if direction is descending, or value is floating point + bool use_radix_sort = (sort_is_stable && ascending && !std::is_floating_point_v) || !sort_is_stable; + size_t size = end - begin; + + /// Thresholds on size. Lower threshold is arbitrary. Upper threshold is chosen by the type for histogram counters. + if (size >= 256 && size <= std::numeric_limits::max() && use_radix_sort) + { + PaddedPODArray> pairs(size); + size_t index = 0; + + for (auto it = begin; it != end; ++it) + { + pairs[index] = {data[*it], static_cast(*it)}; + ++index; + } + + RadixSort>::executeLSD(pairs.data(), size, reverse, begin); + + /// Radix sort treats all NaNs to be greater than all numbers. + /// If the user needs the opposite, we must move them accordingly. + if (std::is_floating_point_v && nan_direction_hint < 0) + { + size_t nans_to_move = 0; + + for (size_t i = 0; i < size; ++i) + { + if (isNaN(data[begin[reverse ? i : size - 1 - i]])) + ++nans_to_move; + else + break; + } + + if (nans_to_move) + { + std::rotate(begin, begin + (reverse ? nans_to_move : size - nans_to_move), end); + } + } + + return; + } + } + + ::sort(begin, end, pred); + }; auto partial_sort = [](auto begin, auto mid, auto end, auto pred) { ::partial_sort(begin, mid, end, pred); }; if (direction == IColumn::PermutationSortDirection::Ascending && stability == IColumn::PermutationSortStability::Unstable) diff --git a/src/Columns/IColumn.h b/src/Columns/IColumn.h index f62f6c444b3..a99d4172e5b 100644 --- a/src/Columns/IColumn.h +++ b/src/Columns/IColumn.h @@ -90,7 +90,7 @@ public: /// Creates column with the same type and specified size. /// If size is less current size, then data is cut. /// If size is greater, than default values are appended. - [[nodiscard]] virtual MutablePtr cloneResized(size_t /*size*/) const { throw Exception("Cannot cloneResized() column " + getName(), ErrorCodes::NOT_IMPLEMENTED); } + [[nodiscard]] virtual MutablePtr cloneResized(size_t /*size*/) const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot cloneResized() column {}", getName()); } /// Returns number of values in column. [[nodiscard]] virtual size_t size() const = 0; diff --git a/src/Common/ProgressIndication.cpp b/src/Common/ProgressIndication.cpp index ffc90807060..315080115a6 100644 --- a/src/Common/ProgressIndication.cpp +++ b/src/Common/ProgressIndication.cpp @@ -19,7 +19,7 @@ namespace double calculateCPUUsage(DB::ThreadIdToTimeMap times, UInt64 elapsed) { auto accumulated = std::accumulate(times.begin(), times.end(), 0, - [](Int64 acc, const auto & elem) + [](UInt64 acc, const auto & elem) { if (elem.first == ALL_THREADS) return acc; @@ -191,6 +191,10 @@ void ProgressIndication::writeProgress() { WriteBufferFromOwnString profiling_msg_builder; + /// We don't want -0. that can appear due to rounding errors. + if (cpu_usage <= 0) + cpu_usage = 0; + profiling_msg_builder << "(" << fmt::format("{:.1f}", cpu_usage) << " CPU"; if (memory_usage > 0) diff --git a/src/Common/ProgressIndication.h b/src/Common/ProgressIndication.h index 9db9af96b49..d44becc416a 100644 --- a/src/Common/ProgressIndication.h +++ b/src/Common/ProgressIndication.h @@ -16,11 +16,11 @@ namespace DB struct ThreadEventData { - Int64 time() const noexcept { return user_ms + system_ms; } + UInt64 time() const noexcept { return user_ms + system_ms; } - Int64 user_ms = 0; - Int64 system_ms = 0; - Int64 memory_usage = 0; + UInt64 user_ms = 0; + UInt64 system_ms = 0; + UInt64 memory_usage = 0; }; using ThreadIdToTimeMap = std::unordered_map; diff --git a/src/Compression/CompressionCodecFPC.cpp b/src/Compression/CompressionCodecFPC.cpp new file mode 100644 index 00000000000..3b66060b6fc --- /dev/null +++ b/src/Compression/CompressionCodecFPC.cpp @@ -0,0 +1,511 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + +namespace DB +{ + +class CompressionCodecFPC : public ICompressionCodec +{ +public: + CompressionCodecFPC(UInt8 float_size, UInt8 compression_level); + + uint8_t getMethodByte() const override; + + void updateHash(SipHash & hash) const override; + + static constexpr UInt8 MAX_COMPRESSION_LEVEL{28}; + static constexpr UInt8 DEFAULT_COMPRESSION_LEVEL{12}; + +protected: + UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; + + void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; + + UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override; + + bool isCompression() const override { return true; } + bool isGenericCompression() const override { return false; } + +private: + static constexpr UInt32 HEADER_SIZE{3}; + + UInt8 float_width; // size of uncompressed float in bytes + UInt8 level; // compression level, 2^level * float_width is the size of predictors table in bytes +}; + + +namespace ErrorCodes +{ + extern const int CANNOT_COMPRESS; + extern const int CANNOT_DECOMPRESS; + extern const int ILLEGAL_CODEC_PARAMETER; + extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE; + extern const int BAD_ARGUMENTS; +} + +uint8_t CompressionCodecFPC::getMethodByte() const +{ + return static_cast(CompressionMethodByte::FPC); +} + +void CompressionCodecFPC::updateHash(SipHash & hash) const +{ + getCodecDesc()->updateTreeHash(hash); +} + +CompressionCodecFPC::CompressionCodecFPC(UInt8 float_size, UInt8 compression_level) + : float_width{float_size}, level{compression_level} +{ + setCodecDescription("FPC", {std::make_shared(static_cast(level))}); +} + +UInt32 CompressionCodecFPC::getMaxCompressedDataSize(UInt32 uncompressed_size) const +{ + auto float_count = (uncompressed_size + float_width - 1) / float_width; + if (float_count % 2 != 0) + ++float_count; + return HEADER_SIZE + float_count * float_width + float_count / 2; +} + +namespace +{ + +UInt8 getFloatBytesSize(const IDataType & column_type) +{ + if (!WhichDataType(column_type).isFloat()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "FPC codec is not applicable for {} because the data type is not float", + column_type.getName()); + } + + if (auto float_size = column_type.getSizeOfValueInMemory(); float_size >= 4) + { + return static_cast(float_size); + } + throw Exception(ErrorCodes::BAD_ARGUMENTS, "FPC codec is not applicable for floats of size less than 4 bytes. Given type {}", + column_type.getName()); +} + +std::byte encodeEndianness(std::endian endian) +{ + switch (endian) + { + case std::endian::little: + return std::byte{0}; + case std::endian::big: + return std::byte{1}; + } + throw Exception("Unsupported endianness", ErrorCodes::BAD_ARGUMENTS); +} + +std::endian decodeEndianness(std::byte endian) +{ + switch (std::to_integer(endian)) + { + case 0: + return std::endian::little; + case 1: + return std::endian::big; + } + throw Exception("Unsupported endianness", ErrorCodes::BAD_ARGUMENTS); +} + +} + +void registerCodecFPC(CompressionCodecFactory & factory) +{ + auto method_code = static_cast(CompressionMethodByte::FPC); + auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr + { + UInt8 float_width{0}; + if (column_type != nullptr) + float_width = getFloatBytesSize(*column_type); + + UInt8 level = CompressionCodecFPC::DEFAULT_COMPRESSION_LEVEL; + if (arguments && !arguments->children.empty()) + { + if (arguments->children.size() > 1) + { + throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, + "FPC codec must have 1 parameter, given {}", arguments->children.size()); + } + + const auto * literal = arguments->children.front()->as(); + if (!literal) + throw Exception("FPC codec argument must be integer", ErrorCodes::ILLEGAL_CODEC_PARAMETER); + + level = literal->value.safeGet(); + if (level == 0) + throw Exception("FPC codec level must be at least 1", ErrorCodes::ILLEGAL_CODEC_PARAMETER); + if (level > CompressionCodecFPC::MAX_COMPRESSION_LEVEL) + throw Exception("FPC codec level must be at most 28", ErrorCodes::ILLEGAL_CODEC_PARAMETER); + } + return std::make_shared(float_width, level); + }; + factory.registerCompressionCodecWithType("FPC", method_code, codec_builder); +} + +namespace +{ + +template + requires (sizeof(TUint) >= 4) +class DfcmPredictor +{ +public: + explicit DfcmPredictor(std::size_t table_size): table(table_size, 0), prev_value{0}, hash{0} + { + } + + [[nodiscard]] + TUint predict() const noexcept + { + return table[hash] + prev_value; + } + + void add(TUint value) noexcept + { + table[hash] = value - prev_value; + recalculateHash(); + prev_value = value; + } + +private: + void recalculateHash() noexcept + { + auto value = table[hash]; + if constexpr (sizeof(TUint) >= 8) + { + hash = ((hash << 2) ^ static_cast(value >> 40)) & (table.size() - 1); + } + else + { + hash = ((hash << 4) ^ static_cast(value >> 23)) & (table.size() - 1); + } + } + + std::vector table; + TUint prev_value; + std::size_t hash; +}; + +template + requires (sizeof(TUint) >= 4) +class FcmPredictor +{ +public: + explicit FcmPredictor(std::size_t table_size): table(table_size, 0), hash{0} + { + } + + [[nodiscard]] + TUint predict() const noexcept + { + return table[hash]; + } + + void add(TUint value) noexcept + { + table[hash] = value; + recalculateHash(); + } + +private: + void recalculateHash() noexcept + { + auto value = table[hash]; + if constexpr (sizeof(TUint) >= 8) + { + hash = ((hash << 6) ^ static_cast(value >> 48)) & (table.size() - 1); + } + else + { + hash = ((hash << 1) ^ static_cast(value >> 22)) & (table.size() - 1); + } + } + + std::vector table; + std::size_t hash; +}; + +template + requires (Endian == std::endian::little || Endian == std::endian::big) +class FPCOperation +{ + static constexpr std::size_t CHUNK_SIZE{64}; + + static constexpr auto VALUE_SIZE = sizeof(TUint); + static constexpr std::byte FCM_BIT{0}; + static constexpr std::byte DFCM_BIT{1u << 3}; + static constexpr auto DFCM_BIT_1 = DFCM_BIT << 4; + static constexpr auto DFCM_BIT_2 = DFCM_BIT; + static constexpr unsigned MAX_ZERO_BYTE_COUNT{0b111u}; + +public: + FPCOperation(std::span destination, UInt8 compression_level) + : dfcm_predictor(1u << compression_level), fcm_predictor(1u << compression_level), chunk{}, result{destination} + { + } + + std::size_t encode(std::span data) && + { + auto initial_size = result.size(); + + std::span chunk_view(chunk); + for (std::size_t i = 0; i < data.size(); i += chunk_view.size_bytes()) + { + auto written_values = importChunk(data.subspan(i), chunk_view); + encodeChunk(chunk_view.subspan(0, written_values)); + } + + return initial_size - result.size(); + } + + void decode(std::span values, std::size_t decoded_size) && + { + std::size_t read_bytes{0}; + + std::span chunk_view(chunk); + for (std::size_t i = 0; i < decoded_size; i += chunk_view.size_bytes()) + { + if (i + chunk_view.size_bytes() > decoded_size) + chunk_view = chunk_view.first(ceilBytesToEvenValues(decoded_size - i)); + read_bytes += decodeChunk(values.subspan(read_bytes), chunk_view); + exportChunk(chunk_view); + } + } + +private: + static std::size_t ceilBytesToEvenValues(std::size_t bytes_count) + { + auto values_count = (bytes_count + VALUE_SIZE - 1) / VALUE_SIZE; + return values_count % 2 == 0 ? values_count : values_count + 1; + } + + std::size_t importChunk(std::span values, std::span chnk) + { + if (auto chunk_view = std::as_writable_bytes(chnk); chunk_view.size() <= values.size()) + { + std::memcpy(chunk_view.data(), values.data(), chunk_view.size()); + return chunk_view.size() / VALUE_SIZE; + } + else + { + std::memset(chunk_view.data(), 0, chunk_view.size()); + std::memcpy(chunk_view.data(), values.data(), values.size()); + return ceilBytesToEvenValues(values.size()); + } + } + + void exportChunk(std::span chnk) + { + auto chunk_view = std::as_bytes(chnk).first(std::min(result.size(), chnk.size_bytes())); + std::memcpy(result.data(), chunk_view.data(), chunk_view.size()); + result = result.subspan(chunk_view.size()); + } + + void encodeChunk(std::span seq) + { + for (std::size_t i = 0; i < seq.size(); i += 2) + { + encodePair(seq[i], seq[i + 1]); + } + } + + struct CompressedValue + { + TUint value; + unsigned compressed_size; + std::byte predictor; + }; + + unsigned encodeCompressedZeroByteCount(int compressed) + { + if constexpr (VALUE_SIZE == MAX_ZERO_BYTE_COUNT + 1) + { + if (compressed >= 4) + --compressed; + } + return std::min(static_cast(compressed), MAX_ZERO_BYTE_COUNT); + } + + unsigned decodeCompressedZeroByteCount(unsigned encoded_size) + { + if constexpr (VALUE_SIZE == MAX_ZERO_BYTE_COUNT + 1) + { + if (encoded_size > 3) + ++encoded_size; + } + return encoded_size; + } + + CompressedValue compressValue(TUint value) noexcept + { + static constexpr auto BITS_PER_BYTE = std::numeric_limits::digits; + + TUint compressed_dfcm = dfcm_predictor.predict() ^ value; + TUint compressed_fcm = fcm_predictor.predict() ^ value; + dfcm_predictor.add(value); + fcm_predictor.add(value); + auto zeroes_dfcm = std::countl_zero(compressed_dfcm); + auto zeroes_fcm = std::countl_zero(compressed_fcm); + if (zeroes_dfcm > zeroes_fcm) + return {compressed_dfcm, encodeCompressedZeroByteCount(zeroes_dfcm / BITS_PER_BYTE), DFCM_BIT}; + return {compressed_fcm, encodeCompressedZeroByteCount(zeroes_fcm / BITS_PER_BYTE), FCM_BIT}; + } + + void encodePair(TUint first, TUint second) + { + auto [value1, zero_byte_count1, predictor1] = compressValue(first); + auto [value2, zero_byte_count2, predictor2] = compressValue(second); + std::byte header{0x0}; + header |= (predictor1 << 4) | predictor2; + header |= static_cast((zero_byte_count1 << 4) | zero_byte_count2); + result.front() = header; + + zero_byte_count1 = decodeCompressedZeroByteCount(zero_byte_count1); + zero_byte_count2 = decodeCompressedZeroByteCount(zero_byte_count2); + auto tail_size1 = VALUE_SIZE - zero_byte_count1; + auto tail_size2 = VALUE_SIZE - zero_byte_count2; + + std::memcpy(result.data() + 1, valueTail(value1, zero_byte_count1), tail_size1); + std::memcpy(result.data() + 1 + tail_size1, valueTail(value2, zero_byte_count2), tail_size2); + result = result.subspan(1 + tail_size1 + tail_size2); + } + + std::size_t decodeChunk(std::span values, std::span seq) + { + std::size_t read_bytes{0}; + for (std::size_t i = 0; i < seq.size(); i += 2) + { + read_bytes += decodePair(values.subspan(read_bytes), seq[i], seq[i + 1]); + } + return read_bytes; + } + + TUint decompressValue(TUint value, bool isDfcmPredictor) + { + TUint decompressed; + if (isDfcmPredictor) + { + decompressed = dfcm_predictor.predict() ^ value; + } + else + { + decompressed = fcm_predictor.predict() ^ value; + } + dfcm_predictor.add(decompressed); + fcm_predictor.add(decompressed); + return decompressed; + } + + std::size_t decodePair(std::span bytes, TUint& first, TUint& second) + { + if (bytes.empty()) + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Unexpected end of encoded sequence"); + + auto zero_byte_count1 = decodeCompressedZeroByteCount( + std::to_integer(bytes.front() >> 4) & MAX_ZERO_BYTE_COUNT); + auto zero_byte_count2 = decodeCompressedZeroByteCount( + std::to_integer(bytes.front()) & MAX_ZERO_BYTE_COUNT); + + auto tail_size1 = VALUE_SIZE - zero_byte_count1; + auto tail_size2 = VALUE_SIZE - zero_byte_count2; + + if (bytes.size() < 1 + tail_size1 + tail_size2) + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Unexpected end of encoded sequence"); + + TUint value1{0}; + TUint value2{0}; + + std::memcpy(valueTail(value1, zero_byte_count1), bytes.data() + 1, tail_size1); + std::memcpy(valueTail(value2, zero_byte_count2), bytes.data() + 1 + tail_size1, tail_size2); + + auto is_dfcm_predictor1 = std::to_integer(bytes.front() & DFCM_BIT_1) != 0; + auto is_dfcm_predictor2 = std::to_integer(bytes.front() & DFCM_BIT_2) != 0; + first = decompressValue(value1, is_dfcm_predictor1); + second = decompressValue(value2, is_dfcm_predictor2); + + return 1 + tail_size1 + tail_size2; + } + + static void* valueTail(TUint& value, unsigned compressed_size) + { + if constexpr (Endian == std::endian::little) + { + return &value; + } + else + { + return reinterpret_cast(&value) + compressed_size; + } + } + + DfcmPredictor dfcm_predictor; + FcmPredictor fcm_predictor; + std::array chunk{}; + std::span result{}; +}; + +} + +UInt32 CompressionCodecFPC::doCompressData(const char * source, UInt32 source_size, char * dest) const +{ + dest[0] = static_cast(float_width); + dest[1] = static_cast(level); + dest[2] = std::to_integer(encodeEndianness(std::endian::native)); + + auto dest_size = getMaxCompressedDataSize(source_size); + auto destination = std::as_writable_bytes(std::span(dest, dest_size).subspan(HEADER_SIZE)); + auto src = std::as_bytes(std::span(source, source_size)); + switch (float_width) + { + case sizeof(Float64): + return HEADER_SIZE + FPCOperation(destination, level).encode(src); + case sizeof(Float32): + return HEADER_SIZE + FPCOperation(destination, level).encode(src); + default: + break; + } + throw Exception("Cannot compress. File has incorrect float width", ErrorCodes::CANNOT_COMPRESS); +} + +void CompressionCodecFPC::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const +{ + if (source_size < HEADER_SIZE) + throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS); + + auto compressed_data = std::as_bytes(std::span(source, source_size)); + auto compressed_float_width = std::to_integer(compressed_data[0]); + auto compressed_level = std::to_integer(compressed_data[1]); + if (compressed_level == 0 || compressed_level > MAX_COMPRESSION_LEVEL) + throw Exception("Cannot decompress. File has incorrect level", ErrorCodes::CANNOT_DECOMPRESS); + if (decodeEndianness(compressed_data[2]) != std::endian::native) + throw Exception("Cannot decompress. File has incorrect endianness", ErrorCodes::CANNOT_DECOMPRESS); + + auto destination = std::as_writable_bytes(std::span(dest, uncompressed_size)); + auto src = compressed_data.subspan(HEADER_SIZE); + switch (compressed_float_width) + { + case sizeof(Float64): + FPCOperation(destination, compressed_level).decode(src, uncompressed_size); + break; + case sizeof(Float32): + FPCOperation(destination, compressed_level).decode(src, uncompressed_size); + break; + default: + throw Exception("Cannot decompress. File has incorrect float width", ErrorCodes::CANNOT_DECOMPRESS); + } +} + +} diff --git a/src/Compression/CompressionFactory.cpp b/src/Compression/CompressionFactory.cpp index abf5e38a8c3..b8a1c5877a4 100644 --- a/src/Compression/CompressionFactory.cpp +++ b/src/Compression/CompressionFactory.cpp @@ -177,6 +177,7 @@ void registerCodecT64(CompressionCodecFactory & factory); void registerCodecDoubleDelta(CompressionCodecFactory & factory); void registerCodecGorilla(CompressionCodecFactory & factory); void registerCodecEncrypted(CompressionCodecFactory & factory); +void registerCodecFPC(CompressionCodecFactory & factory); #endif @@ -194,6 +195,7 @@ CompressionCodecFactory::CompressionCodecFactory() registerCodecDoubleDelta(*this); registerCodecGorilla(*this); registerCodecEncrypted(*this); + registerCodecFPC(*this); #endif default_codec = get("LZ4", {}); diff --git a/src/Compression/CompressionInfo.h b/src/Compression/CompressionInfo.h index bbe8315f3ea..839fb68e8c3 100644 --- a/src/Compression/CompressionInfo.h +++ b/src/Compression/CompressionInfo.h @@ -44,7 +44,8 @@ enum class CompressionMethodByte : uint8_t DoubleDelta = 0x94, Gorilla = 0x95, AES_128_GCM_SIV = 0x96, - AES_256_GCM_SIV = 0x97 + AES_256_GCM_SIV = 0x97, + FPC = 0x98 }; } diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 9e3b60a8e54..e3f756c85f5 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -693,6 +693,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Bool, input_format_values_deduce_templates_of_expressions, true, "For Values format: if the field could not be parsed by streaming parser, run SQL parser, deduce template of the SQL expression, try to parse all rows using template and then interpret expression for all rows.", 0) \ M(Bool, input_format_values_accurate_types_of_literals, true, "For Values format: when parsing and interpreting expressions using template, check actual type of literal to avoid possible overflow and precision issues.", 0) \ M(Bool, input_format_avro_allow_missing_fields, false, "For Avro/AvroConfluent format: when field is not found in schema use default value instead of error", 0) \ + M(Bool, input_format_avro_null_as_default, false, "For Avro/AvroConfluent format: insert default in case of null and non Nullable column", 0) \ M(URI, format_avro_schema_registry_url, "", "For AvroConfluent format: Confluent Schema Registry URL.", 0) \ \ M(Bool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \ diff --git a/src/DataTypes/IDataType.cpp b/src/DataTypes/IDataType.cpp index 0976233c031..f2bb878a533 100644 --- a/src/DataTypes/IDataType.cpp +++ b/src/DataTypes/IDataType.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -162,6 +163,12 @@ void IDataType::insertDefaultInto(IColumn & column) const column.insertDefault(); } +void IDataType::insertManyDefaultsInto(IColumn & column, size_t n) const +{ + for (size_t i = 0; i < n; ++i) + insertDefaultInto(column); +} + void IDataType::setCustomization(DataTypeCustomDescPtr custom_desc_) const { /// replace only if not null diff --git a/src/DataTypes/IDataType.h b/src/DataTypes/IDataType.h index fc9e50dc55b..420ef61a13f 100644 --- a/src/DataTypes/IDataType.h +++ b/src/DataTypes/IDataType.h @@ -159,6 +159,8 @@ public: */ virtual void insertDefaultInto(IColumn & column) const; + void insertManyDefaultsInto(IColumn & column, size_t n) const; + /// Checks that two instances belong to the same type virtual bool equals(const IDataType & rhs) const = 0; diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 4719ae014a4..dc6344137d2 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -56,6 +56,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings) format_settings.avro.schema_registry_url = settings.format_avro_schema_registry_url.toString(); format_settings.avro.string_column_pattern = settings.output_format_avro_string_column_pattern.toString(); format_settings.avro.output_rows_in_file = settings.output_format_avro_rows_in_file; + format_settings.avro.null_as_default = settings.input_format_avro_null_as_default; format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes; format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes; format_settings.csv.crlf_end_of_line = settings.output_format_csv_crlf_end_of_line; diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index eabfa2ad58b..7e0ce001405 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -92,6 +92,7 @@ struct FormatSettings bool allow_missing_fields = false; String string_column_pattern; UInt64 output_rows_in_file = 1; + bool null_as_default = false; } avro; String bool_true_representation = "true"; diff --git a/src/Functions/FunctionsLogical.cpp b/src/Functions/FunctionsLogical.cpp index b615f52652c..be295186943 100644 --- a/src/Functions/FunctionsLogical.cpp +++ b/src/Functions/FunctionsLogical.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -50,17 +51,15 @@ MutableColumnPtr buildColumnFromTernaryData(const UInt8Container & ternary_data, const size_t rows_count = ternary_data.size(); auto new_column = ColumnUInt8::create(rows_count); - std::transform( - ternary_data.cbegin(), ternary_data.cend(), new_column->getData().begin(), - [](const auto x) { return x == Ternary::True; }); + for (size_t i = 0; i < rows_count; ++i) + new_column->getData()[i] = (ternary_data[i] == Ternary::True); if (!make_nullable) return new_column; auto null_column = ColumnUInt8::create(rows_count); - std::transform( - ternary_data.cbegin(), ternary_data.cend(), null_column->getData().begin(), - [](const auto x) { return x == Ternary::Null; }); + for (size_t i = 0; i < rows_count; ++i) + null_column->getData()[i] = (ternary_data[i] == Ternary::Null); return ColumnNullable::create(std::move(new_column), std::move(null_column)); } @@ -68,13 +67,14 @@ MutableColumnPtr buildColumnFromTernaryData(const UInt8Container & ternary_data, template bool tryConvertColumnToBool(const IColumn * column, UInt8Container & res) { - const auto col = checkAndGetColumn>(column); - if (!col) + const auto column_typed = checkAndGetColumn>(column); + if (!column_typed) return false; - std::transform( - col->getData().cbegin(), col->getData().cend(), res.begin(), - [](const auto x) { return !!x; }); + auto & data = column_typed->getData(); + size_t data_size = data.size(); + for (size_t i = 0; i < data_size; ++i) + res[i] = static_cast(data[i]); return true; } @@ -99,7 +99,7 @@ bool extractConstColumns(ColumnRawPtrs & in, UInt8 & res, Func && func) { bool has_res = false; - for (int i = static_cast(in.size()) - 1; i >= 0; --i) + for (Int64 i = static_cast(in.size()) - 1; i >= 0; --i) { UInt8 x; @@ -458,7 +458,9 @@ ColumnPtr basicExecuteImpl(ColumnRawPtrs arguments, size_t input_rows_count) for (const IColumn * column : arguments) { if (const auto * uint8_column = checkAndGetColumn(column)) + { uint8_args.push_back(uint8_column); + } else { auto converted_column = ColumnUInt8::create(input_rows_count); @@ -596,14 +598,14 @@ ColumnPtr FunctionAnyArityLogical::executeShortCircuit(ColumnsWithTy if (nulls) applyTernaryLogic(mask, *nulls); - MutableColumnPtr res = ColumnUInt8::create(); - typeid_cast(res.get())->getData() = std::move(mask); + auto res = ColumnUInt8::create(); + res->getData() = std::move(mask); if (!nulls) return res; - MutableColumnPtr bytemap = ColumnUInt8::create(); - typeid_cast(bytemap.get())->getData() = std::move(*nulls); + auto bytemap = ColumnUInt8::create(); + bytemap->getData() = std::move(*nulls); return ColumnNullable::create(std::move(res), std::move(bytemap)); } @@ -692,29 +694,14 @@ ColumnPtr FunctionAnyArityLogical::getConstantResultForNonConstArgum return result_column; } -template -struct UnaryOperationImpl -{ - using ResultType = typename Op::ResultType; - using ArrayA = typename ColumnVector::Container; - using ArrayC = typename ColumnVector::Container; - - static void NO_INLINE vector(const ArrayA & a, ArrayC & c) - { - std::transform( - a.cbegin(), a.cend(), c.begin(), - [](const auto x) { return Op::apply(x); }); - } -}; - template