diff --git a/dbms/src/Compression/CompressionCodecDoubleDelta.cpp b/dbms/src/Compression/CompressionCodecDoubleDelta.cpp new file mode 100644 index 00000000000..7a274119572 --- /dev/null +++ b/dbms/src/Compression/CompressionCodecDoubleDelta.cpp @@ -0,0 +1,320 @@ +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int CANNOT_COMPRESS; +extern const int CANNOT_DECOMPRESS; +} + +namespace +{ +UInt32 getDeltaTypeByteSize(UInt8 data_bytes_size) +{ + // both delta and double delta can be twice the size of data item, but not less than 32 bits and not more that 64. + return std::min(64/8, std::max(32/8, data_bytes_size * 2)); +} + +UInt32 getCompressedHeaderSize(UInt8 data_bytes_size) +{ + const UInt8 items_count_size = 4; + + return items_count_size + data_bytes_size + getDeltaTypeByteSize(data_bytes_size); +} + +UInt32 getCompressedDataSize(UInt8 data_bytes_size, UInt32 uncompressed_size) +{ + const UInt32 items_count = uncompressed_size / data_bytes_size; + + // 11111 + max 64 bits of double delta. + const UInt32 max_item_size_bits = 5 + getDeltaTypeByteSize(data_bytes_size) * 8; + + // + 8 is to round up to next byte. + return (items_count * max_item_size_bits + 8) / 8; +} + +template +UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest) +{ + if (source_size % sizeof(T) != 0) + throw Exception("Cannot compress, data size " + toString(source_size) + " is not aligned to " + toString(sizeof(T)), ErrorCodes::CANNOT_COMPRESS); + const char * source_end = source + source_size; + + const UInt32 items_count = source_size / sizeof(T); + unalignedStore(dest, items_count); + dest += sizeof(items_count); + + T prev_value{}; + DeltaType prev_delta{}; + + if (source < source_end) + { + prev_value = unalignedLoad(source); + unalignedStore(dest, prev_value); + + source += sizeof(prev_value); + dest += sizeof(prev_value); + } + + if (source < source_end) + { + const T curr_value = unalignedLoad(source); + prev_delta = static_cast(curr_value - prev_value); + unalignedStore(dest, prev_delta); + + source += sizeof(curr_value); + dest += sizeof(prev_delta); + prev_value = curr_value; + } + + WriteBuffer buffer(dest, getCompressedDataSize(sizeof(T), source_size - sizeof(T)*2)); + BitWriter writer(buffer); + + while (source < source_end) + { + const T curr_value = unalignedLoad(source); + source += sizeof(curr_value); + + const auto delta = curr_value - prev_value; + const DeltaType double_delta = static_cast(delta - static_cast(prev_delta)); + + prev_delta = delta; + prev_value = curr_value; + + if (double_delta == 0) + { + writer.writeBits(1, 0); + } + else + { + const auto sign = std::signbit(double_delta); + const auto abs_value = static_cast::type>(std::abs(double_delta)); + if (double_delta > -63 && double_delta < 64) + { + writer.writeBits(2, 0b10); + writer.writeBits(1, sign); + writer.writeBits(6, abs_value); + } + else if (double_delta > -255 && double_delta < 256) + { + writer.writeBits(3, 0b110); + writer.writeBits(1, sign); + writer.writeBits(8, abs_value); + } + else if (double_delta > -2047 && double_delta < 2048) + { + writer.writeBits(4, 0b1110); + writer.writeBits(1, sign); + writer.writeBits(11, abs_value); + } + else if (double_delta > std::numeric_limits::min() && double_delta < std::numeric_limits::max()) + { + writer.writeBits(5, 0b11110); + writer.writeBits(1, sign); + writer.writeBits(31, abs_value); + } + else + { + writer.writeBits(5, 0b11111); + writer.writeBits(1, sign); + writer.writeBits(63, abs_value); + } + } + } + + writer.flush(); + + return sizeof(items_count) + sizeof(prev_value) + sizeof(prev_delta) + buffer.count(); +} + +template +void decompressDataForType(const char * source, UInt32 source_size, char * dest) +{ + const char * source_end = source + source_size; + + const UInt32 items_count = unalignedLoad(source); + source += sizeof(items_count); + + T prev_value{}; + DeltaType prev_delta{}; + + if (source < source_end) + { + prev_value = unalignedLoad(source); + unalignedStore(dest, prev_value); + + source += sizeof(prev_value); + dest += sizeof(prev_value); + } + + if (source < source_end) + { + prev_delta = unalignedLoad(source); + prev_value = static_cast(prev_value) + prev_delta; + unalignedStore(dest, prev_value); + + source += sizeof(prev_delta); + dest += sizeof(prev_value); + } + + ReadBufferFromMemory buffer(source, source_size - sizeof(prev_value) - sizeof(prev_delta) - sizeof(items_count)); + BitReader reader(buffer); + + // since data is tightly packed, up to 1 bit per value, and last byte is padded with zeroes, + // we have to keep track of items to avoid reading more that there is. + for (UInt32 items_read = 2; items_read < items_count && !reader.eof(); ++items_read) + { + DeltaType double_delta = 0; + if (reader.readBit() == 0) + { + double_delta = 0; + } + else + { + // first bit is 1 + const UInt8 data_sizes[] = {6, 8, 11, 31, 63}; + UInt8 i = 0; + for (; i < sizeof(data_sizes) - 1; ++i) + { + const auto next_bit = reader.readBit(); + if (next_bit == 0) + break; + } + + const UInt8 sign = reader.readBit(); + double_delta = static_cast(reader.readBits(data_sizes[i])); + if (sign) + { + double_delta *= -1; + } + } + const T curr_value = static_cast(prev_value + prev_delta + double_delta); + unalignedStore(dest, curr_value); + dest += sizeof(curr_value); + + prev_delta = curr_value - prev_value; + prev_value = curr_value; + } +} + +UInt8 getDataBytesSize(DataTypePtr column_type) +{ + UInt8 data_bytes_size = 1; + if (column_type && column_type->haveMaximumSizeOfValue()) + { + size_t max_size = column_type->getSizeOfValueInMemory(); + if (max_size == 1 || max_size == 2 || max_size == 4 || max_size == 8) + data_bytes_size = static_cast(max_size); + } + return data_bytes_size; +} + +} // namespace + + +CompressionCodecDoubleDelta::CompressionCodecDoubleDelta(UInt8 data_bytes_size_) + : data_bytes_size(data_bytes_size_) +{ +} + +UInt8 CompressionCodecDoubleDelta::getMethodByte() const +{ + return static_cast(CompressionMethodByte::DoubleDelta); +} + +String CompressionCodecDoubleDelta::getCodecDesc() const +{ + return "DoubleDelta"; +} + +UInt32 CompressionCodecDoubleDelta::getMaxCompressedDataSize(UInt32 uncompressed_size) const +{ + const auto result = 2 // common header + + data_bytes_size // max bytes skipped if source is not properly aligned. + + getCompressedHeaderSize(data_bytes_size) // data-specific header + + getCompressedDataSize(data_bytes_size, uncompressed_size); + + return result; +} + +UInt32 CompressionCodecDoubleDelta::doCompressData(const char * source, UInt32 source_size, char * dest) const +{ + UInt8 bytes_to_skip = source_size % data_bytes_size; + dest[0] = data_bytes_size; + dest[1] = bytes_to_skip; + memcpy(&dest[2], source, bytes_to_skip); + size_t start_pos = 2 + bytes_to_skip; + UInt32 compressed_size = 0; + switch (data_bytes_size) + { + case 1: + compressed_size = compressDataForType(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]); + break; + case 2: + compressed_size = compressDataForType(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]); + break; + case 4: + compressed_size = compressDataForType(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]); + break; + case 8: + compressed_size = compressDataForType(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]); + break; + } + + return 1 + 1 + compressed_size; +} + +void CompressionCodecDoubleDelta::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 /* uncompressed_size */) const +{ + UInt8 bytes_size = source[0]; + UInt8 bytes_to_skip = source[1]; + + memcpy(dest, &source[2], bytes_to_skip); + UInt32 source_size_no_header = source_size - bytes_to_skip - 2; + switch (bytes_size) + { + case 1: + decompressDataForType(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]); + break; + case 2: + decompressDataForType(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]); + break; + case 4: + decompressDataForType(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]); + break; + case 8: + decompressDataForType(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]); + break; + } +} + +void CompressionCodecDoubleDelta::useInfoAboutType(DataTypePtr data_type) +{ + data_bytes_size = getDataBytesSize(data_type); +} + +void registerCodecDoubleDelta(CompressionCodecFactory & factory) +{ + UInt8 method_code = UInt8(CompressionMethodByte::DoubleDelta); + factory.registerCompressionCodecWithType("DoubleDelta", method_code, [&](const ASTPtr &, DataTypePtr column_type) -> CompressionCodecPtr + { + UInt8 delta_bytes_size = getDataBytesSize(column_type); + return std::make_shared(delta_bytes_size); + }); +} +} diff --git a/dbms/src/Compression/CompressionCodecDoubleDelta.h b/dbms/src/Compression/CompressionCodecDoubleDelta.h new file mode 100644 index 00000000000..19c07214115 --- /dev/null +++ b/dbms/src/Compression/CompressionCodecDoubleDelta.h @@ -0,0 +1,30 @@ +#pragma once + +#include + +namespace DB +{ + +class CompressionCodecDoubleDelta : public ICompressionCodec +{ +public: + CompressionCodecDoubleDelta(UInt8 data_bytes_size_); + + UInt8 getMethodByte() const override; + + String getCodecDesc() const override; + + void useInfoAboutType(DataTypePtr data_type) override; + +protected: + UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; + + void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; + + UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override; + +private: + UInt8 data_bytes_size; +}; + +} diff --git a/dbms/src/Compression/CompressionFactory.cpp b/dbms/src/Compression/CompressionFactory.cpp index 7f9a7a24f09..bcde23e4d2f 100644 --- a/dbms/src/Compression/CompressionFactory.cpp +++ b/dbms/src/Compression/CompressionFactory.cpp @@ -138,6 +138,7 @@ void registerCodecMultiple(CompressionCodecFactory & factory); void registerCodecLZ4HC(CompressionCodecFactory & factory); void registerCodecDelta(CompressionCodecFactory & factory); void registerCodecT64(CompressionCodecFactory & factory); +void registerCodecDoubleDelta(CompressionCodecFactory & factory); CompressionCodecFactory::CompressionCodecFactory() { @@ -149,6 +150,7 @@ CompressionCodecFactory::CompressionCodecFactory() registerCodecLZ4HC(*this); registerCodecDelta(*this); registerCodecT64(*this); + registerCodecDoubleDelta(*this); } } diff --git a/dbms/src/Compression/CompressionInfo.h b/dbms/src/Compression/CompressionInfo.h index 430840686c9..50faa809622 100644 --- a/dbms/src/Compression/CompressionInfo.h +++ b/dbms/src/Compression/CompressionInfo.h @@ -41,6 +41,7 @@ enum class CompressionMethodByte : uint8_t Multiple = 0x91, Delta = 0x92, T64 = 0x93, + DoubleDelta = 0x94, }; } diff --git a/dbms/src/IO/BitHelpers.cpp b/dbms/src/IO/BitHelpers.cpp new file mode 100644 index 00000000000..e735ef2ebbe --- /dev/null +++ b/dbms/src/IO/BitHelpers.cpp @@ -0,0 +1,135 @@ +#include "BitHelpers.h" + +#include + +namespace +{ +const DB::UInt8 MAX_BUFFER_SIZE_BITS = 8; +} + +namespace DB +{ + +BitReader::BitReader(ReadBuffer & buf_) + : buf(buf_), + bits_buffer(0), + bits_count(0) +{} + +BitReader::~BitReader() +{} + +UInt64 BitReader::readBits(UInt8 bits) +{ + UInt64 result = 0; + bits = std::min(static_cast(sizeof(result) * 8), bits); + + while (bits != 0) + { + if (bits_count == 0) + { + fillBuffer(); + if (bits_count == 0) + { + // EOF. + break; + } + } + + const auto to_read = std::min(bits, bits_count); + // read MSB bits from bits_bufer + const UInt8 v = bits_buffer >> (bits_count - to_read); + const UInt8 mask = static_cast(~(~0U << to_read)); + const UInt8 value = v & mask; + result |= value; + + // unset MSB that were read + bits_buffer &= ~(mask << (bits_count - to_read)); + bits_count -= to_read; + bits -= to_read; + + result <<= std::min(bits, static_cast(sizeof(bits_buffer)*8)); + } + + return result; +} + +UInt8 BitReader::readBit() +{ + return static_cast(readBits(1)); +} + +bool BitReader::eof() const +{ + return bits_count == 0 && buf.eof(); +} + +void BitReader::fillBuffer() +{ + auto read = buf.read(reinterpret_cast(&bits_buffer), MAX_BUFFER_SIZE_BITS/8); + bits_count = static_cast(read) * 8; +} + +BitWriter::BitWriter(WriteBuffer & buf_) + : buf(buf_), + bits_buffer(0), + bits_count(0) +{} + +BitWriter::~BitWriter() +{ + flush(); +} + +void BitWriter::writeBits(UInt8 bits, UInt64 value) +{ + bits = std::min(static_cast(sizeof(value) * 8), bits); + + while (bits > 0) + { + auto v = value; + auto to_write = bits; + + const UInt8 capacity = MAX_BUFFER_SIZE_BITS - bits_count; + if (capacity < bits) + { + // write MSB: + v >>= bits - capacity; + to_write = capacity; + } + + + const UInt64 mask = (1 << to_write) - 1; + v &= mask; + assert(v <= 255); + + bits_buffer <<= to_write; + bits_buffer |= v; + bits_count += to_write; + + if (bits_count < MAX_BUFFER_SIZE_BITS) + break; + + doFlush(); + bits -= to_write; + } +} + +void BitWriter::flush() +{ + if (bits_count != 0) + { + bits_buffer <<= (MAX_BUFFER_SIZE_BITS - bits_count); + doFlush(); + } +} + +void BitWriter::doFlush() +{ + buf.write(reinterpret_cast(&bits_buffer), MAX_BUFFER_SIZE_BITS/8); + + bits_count = 0; + bits_buffer = 0; +} + +} // namespace DB diff --git a/dbms/src/IO/BitHelpers.h b/dbms/src/IO/BitHelpers.h new file mode 100644 index 00000000000..fc62642ecb9 --- /dev/null +++ b/dbms/src/IO/BitHelpers.h @@ -0,0 +1,70 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +/** Reads data from underlying ReadBuffer in bit by bit, max 64 bits at once. + * + * reads MSB bits first, imagine that you have a data: + * 11110000 10101010 00100100 11111111 + * + * Given that r is BitReader created with a ReadBuffer that reads from data above: + * r.readBits(3) => 0b111 + * r.readBit() => 0b1 + * r.readBits(8) => 0b1010 // 4 leading zero-bits are not shown + * r.readBit() => 0b1 + * r.readBit() => 0b0 + * r.readBits(16) => 0b100010010011111111 +**/ + +class BitReader +{ + ReadBuffer & buf; + + UInt8 bits_buffer; + UInt8 bits_count; + +public: + BitReader(ReadBuffer & buf_); + ~BitReader(); + + BitReader(BitReader &&) = default; + + // bits is at most 64 + UInt64 readBits(UInt8 bits); + UInt8 readBit(); + + // true when both bit-buffer and underlying byte-buffer are empty. + bool eof() const; + +private: + void fillBuffer(); +}; + +class BitWriter +{ + WriteBuffer & buf; + + UInt8 bits_buffer; + UInt8 bits_count; + +public: + BitWriter(WriteBuffer & buf_); + ~BitWriter(); + + BitWriter(BitWriter &&) = default; + + // write `size` low bits of the `value`. + void writeBits(UInt8 size, UInt64 value); + + void flush(); + +private: + void doFlush(); +}; + +} // namespace DB diff --git a/dbms/src/IO/tests/CMakeLists.txt b/dbms/src/IO/tests/CMakeLists.txt index 127dc45d9bb..71190d11942 100644 --- a/dbms/src/IO/tests/CMakeLists.txt +++ b/dbms/src/IO/tests/CMakeLists.txt @@ -82,3 +82,6 @@ target_link_libraries (zlib_ng_bug PRIVATE ${Poco_Foundation_LIBRARY}) if(NOT USE_INTERNAL_POCO_LIBRARY) target_include_directories(zlib_ng_bug SYSTEM BEFORE PRIVATE ${Poco_INCLUDE_DIRS}) endif() + +add_executable(bit_io bit_io.cpp) +target_link_libraries (bit_io PRIVATE clickhouse_common_io) diff --git a/dbms/src/IO/tests/bit_io.cpp b/dbms/src/IO/tests/bit_io.cpp new file mode 100644 index 00000000000..4bb2d5012d0 --- /dev/null +++ b/dbms/src/IO/tests/bit_io.cpp @@ -0,0 +1,188 @@ + +#include + +#include +#include +#include + +#include +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-const-variable" +#pragma GCC diagnostic ignored "-Wunused-variable" + +namespace +{ +using namespace DB; + +// Intentionally asymetric both byte and word-size to detect read and write inconsistencies +// each prime bit is set to 0. +// v-61 v-53 v-47 v-41 v-37 v-31 v-23 v-17 v-11 v-5 +const UInt64 BIT_PATTERN = 0b11101011'11101111'10111010'11101111'10101111'10111010'11101011'10101001; +const UInt8 PRIMES[] = {2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61}; +const UInt8 REPEAT_TIMES = 11; + +template +std::string bin(const T & value, size_t bits = sizeof(T)*8) +{ + static const UInt8 MAX_BITS = sizeof(T)*8; + assert(bits <= MAX_BITS); + + return std::bitset(static_cast(value)) + .to_string().substr(MAX_BITS - bits, bits); +} + +template +T getBits(UInt8 bits, const T & value) +{ + const T mask = ((static_cast(1) << static_cast(bits)) - 1); + return value & mask; +} + +std::ostream & dumpBuffer(const char * begin, const char * end, std::ostream * destination, const char* col_sep = " ", const char* row_sep = "\n", const size_t cols_in_row = 8, UInt32 max_bytes = 0xFFFFFFFF) +{ + size_t col = 0; + for (auto p = begin; p < end && p - begin < max_bytes; ++p) + { + *destination << bin(*p); + if (++col % cols_in_row == 0) + { + if (row_sep) + *destination << row_sep; + } + else if (col_sep) + { + *destination << col_sep; + } + } + + return *destination; +} + +std::ostream & dumpBufferContents(BufferBase & buffer, std::ostream * destination, const char* col_sep = " ", const char* row_sep = "\n", const size_t cols_in_row = 8, UInt32 max_bytes = 0xFFFFFFFF) +{ + const auto & data = buffer.buffer(); + return dumpBuffer(data.begin(), data.end(), destination, col_sep, row_sep, cols_in_row, max_bytes); +} + +std::string dumpBufferContents(BufferBase & buffer, const char* col_sep = " ", const char* row_sep = "\n", const size_t cols_in_row = 8) +{ + std::stringstream sstr; + dumpBufferContents(buffer, &sstr, col_sep, row_sep, cols_in_row); + + return sstr.str(); +} + + +bool test(const std::vector> & bits_and_vals, const char * expected_buffer_binary = nullptr) +{ + MemoryWriteBuffer memory_write_buffer(1024, 1024, 1.5, 20*1024); + + { + BitWriter writer(memory_write_buffer); + for (const auto & bv : bits_and_vals) + { + writer.writeBits(bv.first, bv.second); + } + writer.flush(); + } + + { + auto memory_read_buffer = memory_write_buffer.tryGetReadBuffer(); + + if (expected_buffer_binary != nullptr) + { + const auto actual_buffer_binary = dumpBufferContents(*memory_read_buffer, " ", " "); + if (actual_buffer_binary != expected_buffer_binary) + { + std::cerr << "Invalid buffer memory after writing\n" + << "expected: " << strlen(expected_buffer_binary) << "\n" << expected_buffer_binary + << "\ngot: " << actual_buffer_binary.size() << "\n" << actual_buffer_binary + << std::endl; + + return false; + } + } + + BitReader reader(*memory_read_buffer); + + int item = 0; + for (const auto & bv : bits_and_vals) + { + const auto expected_value = getBits(bv.first, bv.second); + + const auto actual_value = reader.readBits(bv.first); + + if (expected_value != actual_value) + { + std::cerr << "Invalid value #" << item << " with " << static_cast(bv.first) << ", " << bin(bv.second) << "\n" + << "\texpected: " << bin(expected_value) << "\n" + << "\tgot : " << bin(actual_value) << ".\n\n\nBuffer memory:\n"; + dumpBufferContents(*memory_read_buffer, &std::cerr) << std::endl << std::endl; + + return false; + } + ++item; + } + } + + return true; +} + +bool primes_test() +{ + std::vector> test_data; + MemoryWriteBuffer memory_write_buffer; + + { + for (UInt8 r = 0; r < REPEAT_TIMES; ++r) + { + for (const auto p : PRIMES) + { + test_data.emplace_back(p, BIT_PATTERN); + } + } + } + + return test(test_data); +} + +void simple_test(UInt8 bits, UInt64 value) +{ + test({{bits, value}}); +} + +} // namespace + +int main() +{ + UInt32 test_case = 0; + for (const auto p : PRIMES) + { + simple_test(p, 0xFFFFFFFFFFFFFFFF); + std::cout << ++test_case << " with all-ones and " << static_cast(p) << std::endl; + } + + for (const auto p : PRIMES) + { + simple_test(p, BIT_PATTERN); + std::cout << ++test_case << " with fancy bit pattern and " << static_cast(p) << std::endl; + } + + test({{9, 0xFFFFFFFF}, {9, 0x00}, {9, 0xFFFFFFFF}, {9, 0x00}, {9, 0xFFFFFFFF}}, + "11111111 10000000 00111111 11100000 00001111 11111000 "); + + test({{7, 0x3f}, {7, 0x3f}, {7, 0x3f}, {7, 0x3f}, {7, 0x3f}, {7, 0x3f}, {7, 0x3f}, {7, 0x3f}, {7, 0x3f}, {3, 0xFFFF}}, + "01111110 11111101 11111011 11110111 11101111 11011111 10111111 01111111 11000000 "); + + test({{33, 0xFF110d0b07050300}, {33, 0xAAEE29251f1d1713}, }); + test({{33, BIT_PATTERN}, {33, BIT_PATTERN}}); + + std::cout << ++test_case << " primes " << std::endl; + primes_test(); + + return 0; +} diff --git a/dbms/tests/queries/0_stateless/00950_column_encoding_double_delta.reference b/dbms/tests/queries/0_stateless/00950_column_encoding_double_delta.reference new file mode 100644 index 00000000000..6b25688b4dd --- /dev/null +++ b/dbms/tests/queries/0_stateless/00950_column_encoding_double_delta.reference @@ -0,0 +1,11 @@ +0 +U64 +U32 +U16 +U8 +I64 +I32 +I16 +I8 +DT +D diff --git a/dbms/tests/queries/0_stateless/00950_column_encoding_double_delta.sql b/dbms/tests/queries/0_stateless/00950_column_encoding_double_delta.sql new file mode 100644 index 00000000000..335403c44ab --- /dev/null +++ b/dbms/tests/queries/0_stateless/00950_column_encoding_double_delta.sql @@ -0,0 +1,211 @@ +DROP TABLE IF EXISTS reference; +DROP TABLE IF EXISTS doubleDelta; + +CREATE TABLE reference ( + key UInt64, + valueU64 UInt64, + valueU32 UInt32, + valueU16 UInt16, + valueU8 UInt8, + valueI64 Int64, + valueI32 Int32, + valueI16 Int16, + valueI8 Int8, + valueDT DateTime, + valueD Date +) Engine = MergeTree ORDER BY key; + + +CREATE TABLE doubleDelta ( + key UInt64 CODEC(DoubleDelta), + valueU64 UInt64 CODEC(DoubleDelta), + valueU32 UInt32 CODEC(DoubleDelta), + valueU16 UInt16 CODEC(DoubleDelta), + valueU8 UInt8 CODEC(DoubleDelta), + valueI64 Int64 CODEC(DoubleDelta), + valueI32 Int32 CODEC(DoubleDelta), + valueI16 Int16 CODEC(DoubleDelta), + valueI8 Int8 CODEC(DoubleDelta), + valueDT DateTime CODEC(DoubleDelta), + valueD Date CODEC(DoubleDelta) +) Engine = MergeTree ORDER BY key; + + +-- n^3 covers all double delta storage cases, from small difference between neighbour values (stride) to big. +INSERT INTO reference (key, valueU64, valueU32, valueU16, valueU8, valueI64, valueI32, valueI16, valueI8, valueDT, valueD) + SELECT number as n, n * n * n as v, v, v, v, v, v, v, v, toDateTime(v), toDate(v) FROM system.numbers LIMIT 1, 100; + +-- best case - constant stride +INSERT INTO reference (key, valueU64, valueU32, valueU16, valueU8, valueI64, valueI32, valueI16, valueI8, valueDT, valueD) + SELECT number as n, n as v, v, v, v, v, v, v, v, toDateTime(v), toDate(v) FROM system.numbers LIMIT 101, 100; + +-- checking for overflow +INSERT INTO reference (key, valueU64, valueI64) +VALUES (201, 18446744073709551616, 9223372036854775808), (202, 0, -9223372036854775808), (203, 18446744073709551616, 9223372036854775808); + +-- worst case - random stride +INSERT INTO reference (key, valueU64, valueU32, valueU16, valueU8, valueI64, valueI32, valueI16, valueI8, valueDT, valueD) + SELECT number as n, n + (rand64() - 9223372036854775808)/1000 as v, v, v, v, v, v, v, v, toDateTime(v), toDate(v) FROM system.numbers LIMIT 301, 100; + + +INSERT INTO doubleDelta SELECT * FROM reference; + +-- same number of rows +SELECT a[1] - a[2] FROM ( + SELECT groupArray(1) AS a FROM ( + SELECT count() FROM reference + UNION ALL + SELECT count() FROM doubleDelta + ) +); + +SELECT 'U64'; +SELECT + key, + r.valueU64, d.valueU64, r.valueU64 - d.valueU64 as dU64 +FROM reference as r, doubleDelta as d +WHERE + r.key == d.key +AND + dU64 != 0 +ORDER BY r.key +LIMIT 10; + + +SELECT 'U32'; +SELECT + key, + r.valueU32, d.valueU32, r.valueU32 - d.valueU32 as dU32 +FROM reference as r, doubleDelta as d +WHERE + r.key == d.key +AND + dU32 != 0 +ORDER BY r.key +LIMIT 10; + + +SELECT 'U16'; +SELECT + key, + r.valueU16, d.valueU16, r.valueU16 - d.valueU16 as dU16 +FROM reference as r, doubleDelta as d +WHERE + r.key == d.key +AND + dU16 != 0 +ORDER BY r.key +LIMIT 10; + + +SELECT 'U8'; +SELECT + key, + r.valueU8, d.valueU8, r.valueU8 - d.valueU8 as dU8 +FROM reference as r, doubleDelta as d +WHERE + r.key == d.key +AND + dU8 != 0 +ORDER BY r.key +LIMIT 10; + + +SELECT 'I64'; +SELECT + key, + r.valueI64, d.valueI64, r.valueI64 - d.valueI64 as dI64 +FROM reference as r, doubleDelta as d +WHERE + r.key == d.key +AND + dI64 != 0 +ORDER BY r.key +LIMIT 10; + + +SELECT 'I32'; +SELECT + key, + r.valueI32, d.valueI32, r.valueI32 - d.valueI32 as dI32 +FROM reference as r, doubleDelta as d +WHERE + r.key == d.key +AND + dI32 != 0 +ORDER BY r.key +LIMIT 10; + + +SELECT 'I16'; +SELECT + key, + r.valueI16, d.valueI16, r.valueI16 - d.valueI16 as dI16 +FROM reference as r, doubleDelta as d +WHERE + r.key == d.key +AND + dI16 != 0 +ORDER BY r.key +LIMIT 10; + + +SELECT 'I8'; +SELECT + key, + r.valueI8, d.valueI8, r.valueI8 - d.valueI8 as dI8 +FROM reference as r, doubleDelta as d +WHERE + r.key == d.key +AND + dI8 != 0 +ORDER BY r.key +LIMIT 10; + + +SELECT 'DT'; +SELECT + key, + r.valueDT, d.valueDT, r.valueDT - d.valueDT as dDT +FROM reference as r, doubleDelta as d +WHERE + r.key == d.key +AND + dDT != 0 +ORDER BY r.key +LIMIT 10; + + +SELECT 'D'; +SELECT + key, + r.valueD, d.valueD, r.valueD - d.valueD as dD +FROM reference as r, doubleDelta as d +WHERE + r.key == d.key +AND + dD != 0 +ORDER BY r.key +LIMIT 10; + +-- Compatibity with other codecs +DROP TABLE IF EXISTS dd_lz4_codec; +CREATE TABLE dd_lz4_codec ( + key UInt64 CODEC(DoubleDelta, LZ4), + valueU64 UInt64 CODEC(DoubleDelta, LZ4), + valueU32 UInt32 CODEC(DoubleDelta, LZ4), + valueU16 UInt16 CODEC(DoubleDelta, LZ4), + valueU8 UInt8 CODEC(DoubleDelta, LZ4), + valueI64 Int64 CODEC(DoubleDelta, LZ4), + valueI32 Int32 CODEC(DoubleDelta, LZ4), + valueI16 Int16 CODEC(DoubleDelta, LZ4), + valueI8 Int8 CODEC(DoubleDelta, LZ4), + valueDT DateTime CODEC(DoubleDelta, LZ4), + valueD Date CODEC(DoubleDelta, LZ4) +) Engine = MergeTree ORDER BY key; + +INSERT INTO dd_lz4_codec SELECT * FROM reference; + +DROP TABLE IF EXISTS reference; +DROP TABLE IF EXISTS doubleDelta; +DROP TABLE IF EXISTS dd_lz4_codec; \ No newline at end of file