mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
DoubleDelta column encoding.
Added DoubleDelta column encoding + test cases; Added BitWriter and BitReader that allow to read data from ReadBuffer and write data to WriteBuffer bit by bit, up to 64 bits at once; Added test for BitReader and BitWriter.
This commit is contained in:
parent
36ff3f14c2
commit
4ae63072d0
320
dbms/src/Compression/CompressionCodecDoubleDelta.cpp
Normal file
320
dbms/src/Compression/CompressionCodecDoubleDelta.cpp
Normal file
@ -0,0 +1,320 @@
|
||||
#include <Compression/CompressionCodecDoubleDelta.h>
|
||||
#include <Compression/CompressionInfo.h>
|
||||
#include <Compression/CompressionFactory.h>
|
||||
#include <common/unaligned.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
|
||||
#include <IO/ReadBufferFromMemory.h>
|
||||
#include <IO/BitHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
||||
#include <string.h>
|
||||
#include <algorithm>
|
||||
#include <cstdlib>
|
||||
#include <type_traits>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_COMPRESS;
|
||||
extern const int CANNOT_DECOMPRESS;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
UInt32 getDeltaTypeByteSize(UInt8 data_bytes_size)
|
||||
{
|
||||
// both delta and double delta can be twice the size of data item, but not less than 32 bits and not more that 64.
|
||||
return std::min(64/8, std::max(32/8, data_bytes_size * 2));
|
||||
}
|
||||
|
||||
UInt32 getCompressedHeaderSize(UInt8 data_bytes_size)
|
||||
{
|
||||
const UInt8 items_count_size = 4;
|
||||
|
||||
return items_count_size + data_bytes_size + getDeltaTypeByteSize(data_bytes_size);
|
||||
}
|
||||
|
||||
UInt32 getCompressedDataSize(UInt8 data_bytes_size, UInt32 uncompressed_size)
|
||||
{
|
||||
const UInt32 items_count = uncompressed_size / data_bytes_size;
|
||||
|
||||
// 11111 + max 64 bits of double delta.
|
||||
const UInt32 max_item_size_bits = 5 + getDeltaTypeByteSize(data_bytes_size) * 8;
|
||||
|
||||
// + 8 is to round up to next byte.
|
||||
return (items_count * max_item_size_bits + 8) / 8;
|
||||
}
|
||||
|
||||
template <typename T, typename DeltaType>
|
||||
UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest)
|
||||
{
|
||||
if (source_size % sizeof(T) != 0)
|
||||
throw Exception("Cannot compress, data size " + toString(source_size) + " is not aligned to " + toString(sizeof(T)), ErrorCodes::CANNOT_COMPRESS);
|
||||
const char * source_end = source + source_size;
|
||||
|
||||
const UInt32 items_count = source_size / sizeof(T);
|
||||
unalignedStore(dest, items_count);
|
||||
dest += sizeof(items_count);
|
||||
|
||||
T prev_value{};
|
||||
DeltaType prev_delta{};
|
||||
|
||||
if (source < source_end)
|
||||
{
|
||||
prev_value = unalignedLoad<T>(source);
|
||||
unalignedStore(dest, prev_value);
|
||||
|
||||
source += sizeof(prev_value);
|
||||
dest += sizeof(prev_value);
|
||||
}
|
||||
|
||||
if (source < source_end)
|
||||
{
|
||||
const T curr_value = unalignedLoad<T>(source);
|
||||
prev_delta = static_cast<DeltaType>(curr_value - prev_value);
|
||||
unalignedStore(dest, prev_delta);
|
||||
|
||||
source += sizeof(curr_value);
|
||||
dest += sizeof(prev_delta);
|
||||
prev_value = curr_value;
|
||||
}
|
||||
|
||||
WriteBuffer buffer(dest, getCompressedDataSize(sizeof(T), source_size - sizeof(T)*2));
|
||||
BitWriter writer(buffer);
|
||||
|
||||
while (source < source_end)
|
||||
{
|
||||
const T curr_value = unalignedLoad<T>(source);
|
||||
source += sizeof(curr_value);
|
||||
|
||||
const auto delta = curr_value - prev_value;
|
||||
const DeltaType double_delta = static_cast<DeltaType>(delta - static_cast<T>(prev_delta));
|
||||
|
||||
prev_delta = delta;
|
||||
prev_value = curr_value;
|
||||
|
||||
if (double_delta == 0)
|
||||
{
|
||||
writer.writeBits(1, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto sign = std::signbit(double_delta);
|
||||
const auto abs_value = static_cast<typename std::make_unsigned<DeltaType>::type>(std::abs(double_delta));
|
||||
if (double_delta > -63 && double_delta < 64)
|
||||
{
|
||||
writer.writeBits(2, 0b10);
|
||||
writer.writeBits(1, sign);
|
||||
writer.writeBits(6, abs_value);
|
||||
}
|
||||
else if (double_delta > -255 && double_delta < 256)
|
||||
{
|
||||
writer.writeBits(3, 0b110);
|
||||
writer.writeBits(1, sign);
|
||||
writer.writeBits(8, abs_value);
|
||||
}
|
||||
else if (double_delta > -2047 && double_delta < 2048)
|
||||
{
|
||||
writer.writeBits(4, 0b1110);
|
||||
writer.writeBits(1, sign);
|
||||
writer.writeBits(11, abs_value);
|
||||
}
|
||||
else if (double_delta > std::numeric_limits<Int32>::min() && double_delta < std::numeric_limits<Int32>::max())
|
||||
{
|
||||
writer.writeBits(5, 0b11110);
|
||||
writer.writeBits(1, sign);
|
||||
writer.writeBits(31, abs_value);
|
||||
}
|
||||
else
|
||||
{
|
||||
writer.writeBits(5, 0b11111);
|
||||
writer.writeBits(1, sign);
|
||||
writer.writeBits(63, abs_value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
writer.flush();
|
||||
|
||||
return sizeof(items_count) + sizeof(prev_value) + sizeof(prev_delta) + buffer.count();
|
||||
}
|
||||
|
||||
template <typename T, typename DeltaType>
|
||||
void decompressDataForType(const char * source, UInt32 source_size, char * dest)
|
||||
{
|
||||
const char * source_end = source + source_size;
|
||||
|
||||
const UInt32 items_count = unalignedLoad<UInt32>(source);
|
||||
source += sizeof(items_count);
|
||||
|
||||
T prev_value{};
|
||||
DeltaType prev_delta{};
|
||||
|
||||
if (source < source_end)
|
||||
{
|
||||
prev_value = unalignedLoad<T>(source);
|
||||
unalignedStore(dest, prev_value);
|
||||
|
||||
source += sizeof(prev_value);
|
||||
dest += sizeof(prev_value);
|
||||
}
|
||||
|
||||
if (source < source_end)
|
||||
{
|
||||
prev_delta = unalignedLoad<DeltaType>(source);
|
||||
prev_value = static_cast<DeltaType>(prev_value) + prev_delta;
|
||||
unalignedStore(dest, prev_value);
|
||||
|
||||
source += sizeof(prev_delta);
|
||||
dest += sizeof(prev_value);
|
||||
}
|
||||
|
||||
ReadBufferFromMemory buffer(source, source_size - sizeof(prev_value) - sizeof(prev_delta) - sizeof(items_count));
|
||||
BitReader reader(buffer);
|
||||
|
||||
// since data is tightly packed, up to 1 bit per value, and last byte is padded with zeroes,
|
||||
// we have to keep track of items to avoid reading more that there is.
|
||||
for (UInt32 items_read = 2; items_read < items_count && !reader.eof(); ++items_read)
|
||||
{
|
||||
DeltaType double_delta = 0;
|
||||
if (reader.readBit() == 0)
|
||||
{
|
||||
double_delta = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
// first bit is 1
|
||||
const UInt8 data_sizes[] = {6, 8, 11, 31, 63};
|
||||
UInt8 i = 0;
|
||||
for (; i < sizeof(data_sizes) - 1; ++i)
|
||||
{
|
||||
const auto next_bit = reader.readBit();
|
||||
if (next_bit == 0)
|
||||
break;
|
||||
}
|
||||
|
||||
const UInt8 sign = reader.readBit();
|
||||
double_delta = static_cast<DeltaType>(reader.readBits(data_sizes[i]));
|
||||
if (sign)
|
||||
{
|
||||
double_delta *= -1;
|
||||
}
|
||||
}
|
||||
const T curr_value = static_cast<T>(prev_value + prev_delta + double_delta);
|
||||
unalignedStore(dest, curr_value);
|
||||
dest += sizeof(curr_value);
|
||||
|
||||
prev_delta = curr_value - prev_value;
|
||||
prev_value = curr_value;
|
||||
}
|
||||
}
|
||||
|
||||
UInt8 getDataBytesSize(DataTypePtr column_type)
|
||||
{
|
||||
UInt8 data_bytes_size = 1;
|
||||
if (column_type && column_type->haveMaximumSizeOfValue())
|
||||
{
|
||||
size_t max_size = column_type->getSizeOfValueInMemory();
|
||||
if (max_size == 1 || max_size == 2 || max_size == 4 || max_size == 8)
|
||||
data_bytes_size = static_cast<UInt8>(max_size);
|
||||
}
|
||||
return data_bytes_size;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
|
||||
CompressionCodecDoubleDelta::CompressionCodecDoubleDelta(UInt8 data_bytes_size_)
|
||||
: data_bytes_size(data_bytes_size_)
|
||||
{
|
||||
}
|
||||
|
||||
UInt8 CompressionCodecDoubleDelta::getMethodByte() const
|
||||
{
|
||||
return static_cast<UInt8>(CompressionMethodByte::DoubleDelta);
|
||||
}
|
||||
|
||||
String CompressionCodecDoubleDelta::getCodecDesc() const
|
||||
{
|
||||
return "DoubleDelta";
|
||||
}
|
||||
|
||||
UInt32 CompressionCodecDoubleDelta::getMaxCompressedDataSize(UInt32 uncompressed_size) const
|
||||
{
|
||||
const auto result = 2 // common header
|
||||
+ data_bytes_size // max bytes skipped if source is not properly aligned.
|
||||
+ getCompressedHeaderSize(data_bytes_size) // data-specific header
|
||||
+ getCompressedDataSize(data_bytes_size, uncompressed_size);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
UInt32 CompressionCodecDoubleDelta::doCompressData(const char * source, UInt32 source_size, char * dest) const
|
||||
{
|
||||
UInt8 bytes_to_skip = source_size % data_bytes_size;
|
||||
dest[0] = data_bytes_size;
|
||||
dest[1] = bytes_to_skip;
|
||||
memcpy(&dest[2], source, bytes_to_skip);
|
||||
size_t start_pos = 2 + bytes_to_skip;
|
||||
UInt32 compressed_size = 0;
|
||||
switch (data_bytes_size)
|
||||
{
|
||||
case 1:
|
||||
compressed_size = compressDataForType<UInt8, Int16>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
|
||||
break;
|
||||
case 2:
|
||||
compressed_size = compressDataForType<UInt16, Int32>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
|
||||
break;
|
||||
case 4:
|
||||
compressed_size = compressDataForType<UInt32, Int64>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
|
||||
break;
|
||||
case 8:
|
||||
compressed_size = compressDataForType<UInt64, Int64>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
|
||||
break;
|
||||
}
|
||||
|
||||
return 1 + 1 + compressed_size;
|
||||
}
|
||||
|
||||
void CompressionCodecDoubleDelta::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 /* uncompressed_size */) const
|
||||
{
|
||||
UInt8 bytes_size = source[0];
|
||||
UInt8 bytes_to_skip = source[1];
|
||||
|
||||
memcpy(dest, &source[2], bytes_to_skip);
|
||||
UInt32 source_size_no_header = source_size - bytes_to_skip - 2;
|
||||
switch (bytes_size)
|
||||
{
|
||||
case 1:
|
||||
decompressDataForType<UInt8, Int16>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
|
||||
break;
|
||||
case 2:
|
||||
decompressDataForType<UInt16, Int32>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
|
||||
break;
|
||||
case 4:
|
||||
decompressDataForType<UInt32, Int64>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
|
||||
break;
|
||||
case 8:
|
||||
decompressDataForType<UInt64, Int64>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void CompressionCodecDoubleDelta::useInfoAboutType(DataTypePtr data_type)
|
||||
{
|
||||
data_bytes_size = getDataBytesSize(data_type);
|
||||
}
|
||||
|
||||
void registerCodecDoubleDelta(CompressionCodecFactory & factory)
|
||||
{
|
||||
UInt8 method_code = UInt8(CompressionMethodByte::DoubleDelta);
|
||||
factory.registerCompressionCodecWithType("DoubleDelta", method_code, [&](const ASTPtr &, DataTypePtr column_type) -> CompressionCodecPtr
|
||||
{
|
||||
UInt8 delta_bytes_size = getDataBytesSize(column_type);
|
||||
return std::make_shared<CompressionCodecDoubleDelta>(delta_bytes_size);
|
||||
});
|
||||
}
|
||||
}
|
30
dbms/src/Compression/CompressionCodecDoubleDelta.h
Normal file
30
dbms/src/Compression/CompressionCodecDoubleDelta.h
Normal file
@ -0,0 +1,30 @@
|
||||
#pragma once
|
||||
|
||||
#include <Compression/ICompressionCodec.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class CompressionCodecDoubleDelta : public ICompressionCodec
|
||||
{
|
||||
public:
|
||||
CompressionCodecDoubleDelta(UInt8 data_bytes_size_);
|
||||
|
||||
UInt8 getMethodByte() const override;
|
||||
|
||||
String getCodecDesc() const override;
|
||||
|
||||
void useInfoAboutType(DataTypePtr data_type) override;
|
||||
|
||||
protected:
|
||||
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
|
||||
|
||||
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
|
||||
|
||||
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
|
||||
|
||||
private:
|
||||
UInt8 data_bytes_size;
|
||||
};
|
||||
|
||||
}
|
@ -138,6 +138,7 @@ void registerCodecMultiple(CompressionCodecFactory & factory);
|
||||
void registerCodecLZ4HC(CompressionCodecFactory & factory);
|
||||
void registerCodecDelta(CompressionCodecFactory & factory);
|
||||
void registerCodecT64(CompressionCodecFactory & factory);
|
||||
void registerCodecDoubleDelta(CompressionCodecFactory & factory);
|
||||
|
||||
CompressionCodecFactory::CompressionCodecFactory()
|
||||
{
|
||||
@ -149,6 +150,7 @@ CompressionCodecFactory::CompressionCodecFactory()
|
||||
registerCodecLZ4HC(*this);
|
||||
registerCodecDelta(*this);
|
||||
registerCodecT64(*this);
|
||||
registerCodecDoubleDelta(*this);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -41,6 +41,7 @@ enum class CompressionMethodByte : uint8_t
|
||||
Multiple = 0x91,
|
||||
Delta = 0x92,
|
||||
T64 = 0x93,
|
||||
DoubleDelta = 0x94,
|
||||
};
|
||||
|
||||
}
|
||||
|
135
dbms/src/IO/BitHelpers.cpp
Normal file
135
dbms/src/IO/BitHelpers.cpp
Normal file
@ -0,0 +1,135 @@
|
||||
#include "BitHelpers.h"
|
||||
|
||||
#include <cassert>
|
||||
|
||||
namespace
|
||||
{
|
||||
const DB::UInt8 MAX_BUFFER_SIZE_BITS = 8;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
BitReader::BitReader(ReadBuffer & buf_)
|
||||
: buf(buf_),
|
||||
bits_buffer(0),
|
||||
bits_count(0)
|
||||
{}
|
||||
|
||||
BitReader::~BitReader()
|
||||
{}
|
||||
|
||||
UInt64 BitReader::readBits(UInt8 bits)
|
||||
{
|
||||
UInt64 result = 0;
|
||||
bits = std::min(static_cast<UInt8>(sizeof(result) * 8), bits);
|
||||
|
||||
while (bits != 0)
|
||||
{
|
||||
if (bits_count == 0)
|
||||
{
|
||||
fillBuffer();
|
||||
if (bits_count == 0)
|
||||
{
|
||||
// EOF.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
const auto to_read = std::min(bits, bits_count);
|
||||
// read MSB bits from bits_bufer
|
||||
const UInt8 v = bits_buffer >> (bits_count - to_read);
|
||||
const UInt8 mask = static_cast<UInt8>(~(~0U << to_read));
|
||||
const UInt8 value = v & mask;
|
||||
result |= value;
|
||||
|
||||
// unset MSB that were read
|
||||
bits_buffer &= ~(mask << (bits_count - to_read));
|
||||
bits_count -= to_read;
|
||||
bits -= to_read;
|
||||
|
||||
result <<= std::min(bits, static_cast<UInt8>(sizeof(bits_buffer)*8));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
UInt8 BitReader::readBit()
|
||||
{
|
||||
return static_cast<UInt8>(readBits(1));
|
||||
}
|
||||
|
||||
bool BitReader::eof() const
|
||||
{
|
||||
return bits_count == 0 && buf.eof();
|
||||
}
|
||||
|
||||
void BitReader::fillBuffer()
|
||||
{
|
||||
auto read = buf.read(reinterpret_cast<char *>(&bits_buffer), MAX_BUFFER_SIZE_BITS/8);
|
||||
bits_count = static_cast<UInt8>(read) * 8;
|
||||
}
|
||||
|
||||
BitWriter::BitWriter(WriteBuffer & buf_)
|
||||
: buf(buf_),
|
||||
bits_buffer(0),
|
||||
bits_count(0)
|
||||
{}
|
||||
|
||||
BitWriter::~BitWriter()
|
||||
{
|
||||
flush();
|
||||
}
|
||||
|
||||
void BitWriter::writeBits(UInt8 bits, UInt64 value)
|
||||
{
|
||||
bits = std::min(static_cast<UInt8>(sizeof(value) * 8), bits);
|
||||
|
||||
while (bits > 0)
|
||||
{
|
||||
auto v = value;
|
||||
auto to_write = bits;
|
||||
|
||||
const UInt8 capacity = MAX_BUFFER_SIZE_BITS - bits_count;
|
||||
if (capacity < bits)
|
||||
{
|
||||
// write MSB:
|
||||
v >>= bits - capacity;
|
||||
to_write = capacity;
|
||||
}
|
||||
|
||||
|
||||
const UInt64 mask = (1 << to_write) - 1;
|
||||
v &= mask;
|
||||
assert(v <= 255);
|
||||
|
||||
bits_buffer <<= to_write;
|
||||
bits_buffer |= v;
|
||||
bits_count += to_write;
|
||||
|
||||
if (bits_count < MAX_BUFFER_SIZE_BITS)
|
||||
break;
|
||||
|
||||
doFlush();
|
||||
bits -= to_write;
|
||||
}
|
||||
}
|
||||
|
||||
void BitWriter::flush()
|
||||
{
|
||||
if (bits_count != 0)
|
||||
{
|
||||
bits_buffer <<= (MAX_BUFFER_SIZE_BITS - bits_count);
|
||||
doFlush();
|
||||
}
|
||||
}
|
||||
|
||||
void BitWriter::doFlush()
|
||||
{
|
||||
buf.write(reinterpret_cast<const char *>(&bits_buffer), MAX_BUFFER_SIZE_BITS/8);
|
||||
|
||||
bits_count = 0;
|
||||
bits_buffer = 0;
|
||||
}
|
||||
|
||||
} // namespace DB
|
70
dbms/src/IO/BitHelpers.h
Normal file
70
dbms/src/IO/BitHelpers.h
Normal file
@ -0,0 +1,70 @@
|
||||
#pragma once
|
||||
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <IO/WriteBuffer.h>
|
||||
#include <Core/Types.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Reads data from underlying ReadBuffer in bit by bit, max 64 bits at once.
|
||||
*
|
||||
* reads MSB bits first, imagine that you have a data:
|
||||
* 11110000 10101010 00100100 11111111
|
||||
*
|
||||
* Given that r is BitReader created with a ReadBuffer that reads from data above:
|
||||
* r.readBits(3) => 0b111
|
||||
* r.readBit() => 0b1
|
||||
* r.readBits(8) => 0b1010 // 4 leading zero-bits are not shown
|
||||
* r.readBit() => 0b1
|
||||
* r.readBit() => 0b0
|
||||
* r.readBits(16) => 0b100010010011111111
|
||||
**/
|
||||
|
||||
class BitReader
|
||||
{
|
||||
ReadBuffer & buf;
|
||||
|
||||
UInt8 bits_buffer;
|
||||
UInt8 bits_count;
|
||||
|
||||
public:
|
||||
BitReader(ReadBuffer & buf_);
|
||||
~BitReader();
|
||||
|
||||
BitReader(BitReader &&) = default;
|
||||
|
||||
// bits is at most 64
|
||||
UInt64 readBits(UInt8 bits);
|
||||
UInt8 readBit();
|
||||
|
||||
// true when both bit-buffer and underlying byte-buffer are empty.
|
||||
bool eof() const;
|
||||
|
||||
private:
|
||||
void fillBuffer();
|
||||
};
|
||||
|
||||
class BitWriter
|
||||
{
|
||||
WriteBuffer & buf;
|
||||
|
||||
UInt8 bits_buffer;
|
||||
UInt8 bits_count;
|
||||
|
||||
public:
|
||||
BitWriter(WriteBuffer & buf_);
|
||||
~BitWriter();
|
||||
|
||||
BitWriter(BitWriter &&) = default;
|
||||
|
||||
// write `size` low bits of the `value`.
|
||||
void writeBits(UInt8 size, UInt64 value);
|
||||
|
||||
void flush();
|
||||
|
||||
private:
|
||||
void doFlush();
|
||||
};
|
||||
|
||||
} // namespace DB
|
@ -82,3 +82,6 @@ target_link_libraries (zlib_ng_bug PRIVATE ${Poco_Foundation_LIBRARY})
|
||||
if(NOT USE_INTERNAL_POCO_LIBRARY)
|
||||
target_include_directories(zlib_ng_bug SYSTEM BEFORE PRIVATE ${Poco_INCLUDE_DIRS})
|
||||
endif()
|
||||
|
||||
add_executable(bit_io bit_io.cpp)
|
||||
target_link_libraries (bit_io PRIVATE clickhouse_common_io)
|
||||
|
188
dbms/src/IO/tests/bit_io.cpp
Normal file
188
dbms/src/IO/tests/bit_io.cpp
Normal file
@ -0,0 +1,188 @@
|
||||
|
||||
#include <IO/BitHelpers.h>
|
||||
|
||||
#include <Core/Types.h>
|
||||
#include <IO/MemoryReadWriteBuffer.h>
|
||||
#include <IO/ReadBufferFromMemory.h>
|
||||
|
||||
#include <memory>
|
||||
#include <iostream>
|
||||
#include <iomanip>
|
||||
#include <bitset>
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wunused-const-variable"
|
||||
#pragma GCC diagnostic ignored "-Wunused-variable"
|
||||
|
||||
namespace
|
||||
{
|
||||
using namespace DB;
|
||||
|
||||
// Intentionally asymetric both byte and word-size to detect read and write inconsistencies
|
||||
// each prime bit is set to 0.
|
||||
// v-61 v-53 v-47 v-41 v-37 v-31 v-23 v-17 v-11 v-5
|
||||
const UInt64 BIT_PATTERN = 0b11101011'11101111'10111010'11101111'10101111'10111010'11101011'10101001;
|
||||
const UInt8 PRIMES[] = {2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61};
|
||||
const UInt8 REPEAT_TIMES = 11;
|
||||
|
||||
template <typename T>
|
||||
std::string bin(const T & value, size_t bits = sizeof(T)*8)
|
||||
{
|
||||
static const UInt8 MAX_BITS = sizeof(T)*8;
|
||||
assert(bits <= MAX_BITS);
|
||||
|
||||
return std::bitset<sizeof(T) * 8>(static_cast<unsigned long long>(value))
|
||||
.to_string().substr(MAX_BITS - bits, bits);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
T getBits(UInt8 bits, const T & value)
|
||||
{
|
||||
const T mask = ((static_cast<T>(1) << static_cast<T>(bits)) - 1);
|
||||
return value & mask;
|
||||
}
|
||||
|
||||
std::ostream & dumpBuffer(const char * begin, const char * end, std::ostream * destination, const char* col_sep = " ", const char* row_sep = "\n", const size_t cols_in_row = 8, UInt32 max_bytes = 0xFFFFFFFF)
|
||||
{
|
||||
size_t col = 0;
|
||||
for (auto p = begin; p < end && p - begin < max_bytes; ++p)
|
||||
{
|
||||
*destination << bin(*p);
|
||||
if (++col % cols_in_row == 0)
|
||||
{
|
||||
if (row_sep)
|
||||
*destination << row_sep;
|
||||
}
|
||||
else if (col_sep)
|
||||
{
|
||||
*destination << col_sep;
|
||||
}
|
||||
}
|
||||
|
||||
return *destination;
|
||||
}
|
||||
|
||||
std::ostream & dumpBufferContents(BufferBase & buffer, std::ostream * destination, const char* col_sep = " ", const char* row_sep = "\n", const size_t cols_in_row = 8, UInt32 max_bytes = 0xFFFFFFFF)
|
||||
{
|
||||
const auto & data = buffer.buffer();
|
||||
return dumpBuffer(data.begin(), data.end(), destination, col_sep, row_sep, cols_in_row, max_bytes);
|
||||
}
|
||||
|
||||
std::string dumpBufferContents(BufferBase & buffer, const char* col_sep = " ", const char* row_sep = "\n", const size_t cols_in_row = 8)
|
||||
{
|
||||
std::stringstream sstr;
|
||||
dumpBufferContents(buffer, &sstr, col_sep, row_sep, cols_in_row);
|
||||
|
||||
return sstr.str();
|
||||
}
|
||||
|
||||
|
||||
bool test(const std::vector<std::pair<UInt8, UInt64>> & bits_and_vals, const char * expected_buffer_binary = nullptr)
|
||||
{
|
||||
MemoryWriteBuffer memory_write_buffer(1024, 1024, 1.5, 20*1024);
|
||||
|
||||
{
|
||||
BitWriter writer(memory_write_buffer);
|
||||
for (const auto & bv : bits_and_vals)
|
||||
{
|
||||
writer.writeBits(bv.first, bv.second);
|
||||
}
|
||||
writer.flush();
|
||||
}
|
||||
|
||||
{
|
||||
auto memory_read_buffer = memory_write_buffer.tryGetReadBuffer();
|
||||
|
||||
if (expected_buffer_binary != nullptr)
|
||||
{
|
||||
const auto actual_buffer_binary = dumpBufferContents(*memory_read_buffer, " ", " ");
|
||||
if (actual_buffer_binary != expected_buffer_binary)
|
||||
{
|
||||
std::cerr << "Invalid buffer memory after writing\n"
|
||||
<< "expected: " << strlen(expected_buffer_binary) << "\n" << expected_buffer_binary
|
||||
<< "\ngot: " << actual_buffer_binary.size() << "\n" << actual_buffer_binary
|
||||
<< std::endl;
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
BitReader reader(*memory_read_buffer);
|
||||
|
||||
int item = 0;
|
||||
for (const auto & bv : bits_and_vals)
|
||||
{
|
||||
const auto expected_value = getBits(bv.first, bv.second);
|
||||
|
||||
const auto actual_value = reader.readBits(bv.first);
|
||||
|
||||
if (expected_value != actual_value)
|
||||
{
|
||||
std::cerr << "Invalid value #" << item << " with " << static_cast<UInt32>(bv.first) << ", " << bin(bv.second) << "\n"
|
||||
<< "\texpected: " << bin(expected_value) << "\n"
|
||||
<< "\tgot : " << bin(actual_value) << ".\n\n\nBuffer memory:\n";
|
||||
dumpBufferContents(*memory_read_buffer, &std::cerr) << std::endl << std::endl;
|
||||
|
||||
return false;
|
||||
}
|
||||
++item;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool primes_test()
|
||||
{
|
||||
std::vector<std::pair<UInt8, UInt64>> test_data;
|
||||
MemoryWriteBuffer memory_write_buffer;
|
||||
|
||||
{
|
||||
for (UInt8 r = 0; r < REPEAT_TIMES; ++r)
|
||||
{
|
||||
for (const auto p : PRIMES)
|
||||
{
|
||||
test_data.emplace_back(p, BIT_PATTERN);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return test(test_data);
|
||||
}
|
||||
|
||||
void simple_test(UInt8 bits, UInt64 value)
|
||||
{
|
||||
test({{bits, value}});
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
int main()
|
||||
{
|
||||
UInt32 test_case = 0;
|
||||
for (const auto p : PRIMES)
|
||||
{
|
||||
simple_test(p, 0xFFFFFFFFFFFFFFFF);
|
||||
std::cout << ++test_case << " with all-ones and " << static_cast<UInt32>(p) << std::endl;
|
||||
}
|
||||
|
||||
for (const auto p : PRIMES)
|
||||
{
|
||||
simple_test(p, BIT_PATTERN);
|
||||
std::cout << ++test_case << " with fancy bit pattern and " << static_cast<UInt32>(p) << std::endl;
|
||||
}
|
||||
|
||||
test({{9, 0xFFFFFFFF}, {9, 0x00}, {9, 0xFFFFFFFF}, {9, 0x00}, {9, 0xFFFFFFFF}},
|
||||
"11111111 10000000 00111111 11100000 00001111 11111000 ");
|
||||
|
||||
test({{7, 0x3f}, {7, 0x3f}, {7, 0x3f}, {7, 0x3f}, {7, 0x3f}, {7, 0x3f}, {7, 0x3f}, {7, 0x3f}, {7, 0x3f}, {3, 0xFFFF}},
|
||||
"01111110 11111101 11111011 11110111 11101111 11011111 10111111 01111111 11000000 ");
|
||||
|
||||
test({{33, 0xFF110d0b07050300}, {33, 0xAAEE29251f1d1713}, });
|
||||
test({{33, BIT_PATTERN}, {33, BIT_PATTERN}});
|
||||
|
||||
std::cout << ++test_case << " primes " << std::endl;
|
||||
primes_test();
|
||||
|
||||
return 0;
|
||||
}
|
@ -0,0 +1,11 @@
|
||||
0
|
||||
U64
|
||||
U32
|
||||
U16
|
||||
U8
|
||||
I64
|
||||
I32
|
||||
I16
|
||||
I8
|
||||
DT
|
||||
D
|
@ -0,0 +1,211 @@
|
||||
DROP TABLE IF EXISTS reference;
|
||||
DROP TABLE IF EXISTS doubleDelta;
|
||||
|
||||
CREATE TABLE reference (
|
||||
key UInt64,
|
||||
valueU64 UInt64,
|
||||
valueU32 UInt32,
|
||||
valueU16 UInt16,
|
||||
valueU8 UInt8,
|
||||
valueI64 Int64,
|
||||
valueI32 Int32,
|
||||
valueI16 Int16,
|
||||
valueI8 Int8,
|
||||
valueDT DateTime,
|
||||
valueD Date
|
||||
) Engine = MergeTree ORDER BY key;
|
||||
|
||||
|
||||
CREATE TABLE doubleDelta (
|
||||
key UInt64 CODEC(DoubleDelta),
|
||||
valueU64 UInt64 CODEC(DoubleDelta),
|
||||
valueU32 UInt32 CODEC(DoubleDelta),
|
||||
valueU16 UInt16 CODEC(DoubleDelta),
|
||||
valueU8 UInt8 CODEC(DoubleDelta),
|
||||
valueI64 Int64 CODEC(DoubleDelta),
|
||||
valueI32 Int32 CODEC(DoubleDelta),
|
||||
valueI16 Int16 CODEC(DoubleDelta),
|
||||
valueI8 Int8 CODEC(DoubleDelta),
|
||||
valueDT DateTime CODEC(DoubleDelta),
|
||||
valueD Date CODEC(DoubleDelta)
|
||||
) Engine = MergeTree ORDER BY key;
|
||||
|
||||
|
||||
-- n^3 covers all double delta storage cases, from small difference between neighbour values (stride) to big.
|
||||
INSERT INTO reference (key, valueU64, valueU32, valueU16, valueU8, valueI64, valueI32, valueI16, valueI8, valueDT, valueD)
|
||||
SELECT number as n, n * n * n as v, v, v, v, v, v, v, v, toDateTime(v), toDate(v) FROM system.numbers LIMIT 1, 100;
|
||||
|
||||
-- best case - constant stride
|
||||
INSERT INTO reference (key, valueU64, valueU32, valueU16, valueU8, valueI64, valueI32, valueI16, valueI8, valueDT, valueD)
|
||||
SELECT number as n, n as v, v, v, v, v, v, v, v, toDateTime(v), toDate(v) FROM system.numbers LIMIT 101, 100;
|
||||
|
||||
-- checking for overflow
|
||||
INSERT INTO reference (key, valueU64, valueI64)
|
||||
VALUES (201, 18446744073709551616, 9223372036854775808), (202, 0, -9223372036854775808), (203, 18446744073709551616, 9223372036854775808);
|
||||
|
||||
-- worst case - random stride
|
||||
INSERT INTO reference (key, valueU64, valueU32, valueU16, valueU8, valueI64, valueI32, valueI16, valueI8, valueDT, valueD)
|
||||
SELECT number as n, n + (rand64() - 9223372036854775808)/1000 as v, v, v, v, v, v, v, v, toDateTime(v), toDate(v) FROM system.numbers LIMIT 301, 100;
|
||||
|
||||
|
||||
INSERT INTO doubleDelta SELECT * FROM reference;
|
||||
|
||||
-- same number of rows
|
||||
SELECT a[1] - a[2] FROM (
|
||||
SELECT groupArray(1) AS a FROM (
|
||||
SELECT count() FROM reference
|
||||
UNION ALL
|
||||
SELECT count() FROM doubleDelta
|
||||
)
|
||||
);
|
||||
|
||||
SELECT 'U64';
|
||||
SELECT
|
||||
key,
|
||||
r.valueU64, d.valueU64, r.valueU64 - d.valueU64 as dU64
|
||||
FROM reference as r, doubleDelta as d
|
||||
WHERE
|
||||
r.key == d.key
|
||||
AND
|
||||
dU64 != 0
|
||||
ORDER BY r.key
|
||||
LIMIT 10;
|
||||
|
||||
|
||||
SELECT 'U32';
|
||||
SELECT
|
||||
key,
|
||||
r.valueU32, d.valueU32, r.valueU32 - d.valueU32 as dU32
|
||||
FROM reference as r, doubleDelta as d
|
||||
WHERE
|
||||
r.key == d.key
|
||||
AND
|
||||
dU32 != 0
|
||||
ORDER BY r.key
|
||||
LIMIT 10;
|
||||
|
||||
|
||||
SELECT 'U16';
|
||||
SELECT
|
||||
key,
|
||||
r.valueU16, d.valueU16, r.valueU16 - d.valueU16 as dU16
|
||||
FROM reference as r, doubleDelta as d
|
||||
WHERE
|
||||
r.key == d.key
|
||||
AND
|
||||
dU16 != 0
|
||||
ORDER BY r.key
|
||||
LIMIT 10;
|
||||
|
||||
|
||||
SELECT 'U8';
|
||||
SELECT
|
||||
key,
|
||||
r.valueU8, d.valueU8, r.valueU8 - d.valueU8 as dU8
|
||||
FROM reference as r, doubleDelta as d
|
||||
WHERE
|
||||
r.key == d.key
|
||||
AND
|
||||
dU8 != 0
|
||||
ORDER BY r.key
|
||||
LIMIT 10;
|
||||
|
||||
|
||||
SELECT 'I64';
|
||||
SELECT
|
||||
key,
|
||||
r.valueI64, d.valueI64, r.valueI64 - d.valueI64 as dI64
|
||||
FROM reference as r, doubleDelta as d
|
||||
WHERE
|
||||
r.key == d.key
|
||||
AND
|
||||
dI64 != 0
|
||||
ORDER BY r.key
|
||||
LIMIT 10;
|
||||
|
||||
|
||||
SELECT 'I32';
|
||||
SELECT
|
||||
key,
|
||||
r.valueI32, d.valueI32, r.valueI32 - d.valueI32 as dI32
|
||||
FROM reference as r, doubleDelta as d
|
||||
WHERE
|
||||
r.key == d.key
|
||||
AND
|
||||
dI32 != 0
|
||||
ORDER BY r.key
|
||||
LIMIT 10;
|
||||
|
||||
|
||||
SELECT 'I16';
|
||||
SELECT
|
||||
key,
|
||||
r.valueI16, d.valueI16, r.valueI16 - d.valueI16 as dI16
|
||||
FROM reference as r, doubleDelta as d
|
||||
WHERE
|
||||
r.key == d.key
|
||||
AND
|
||||
dI16 != 0
|
||||
ORDER BY r.key
|
||||
LIMIT 10;
|
||||
|
||||
|
||||
SELECT 'I8';
|
||||
SELECT
|
||||
key,
|
||||
r.valueI8, d.valueI8, r.valueI8 - d.valueI8 as dI8
|
||||
FROM reference as r, doubleDelta as d
|
||||
WHERE
|
||||
r.key == d.key
|
||||
AND
|
||||
dI8 != 0
|
||||
ORDER BY r.key
|
||||
LIMIT 10;
|
||||
|
||||
|
||||
SELECT 'DT';
|
||||
SELECT
|
||||
key,
|
||||
r.valueDT, d.valueDT, r.valueDT - d.valueDT as dDT
|
||||
FROM reference as r, doubleDelta as d
|
||||
WHERE
|
||||
r.key == d.key
|
||||
AND
|
||||
dDT != 0
|
||||
ORDER BY r.key
|
||||
LIMIT 10;
|
||||
|
||||
|
||||
SELECT 'D';
|
||||
SELECT
|
||||
key,
|
||||
r.valueD, d.valueD, r.valueD - d.valueD as dD
|
||||
FROM reference as r, doubleDelta as d
|
||||
WHERE
|
||||
r.key == d.key
|
||||
AND
|
||||
dD != 0
|
||||
ORDER BY r.key
|
||||
LIMIT 10;
|
||||
|
||||
-- Compatibity with other codecs
|
||||
DROP TABLE IF EXISTS dd_lz4_codec;
|
||||
CREATE TABLE dd_lz4_codec (
|
||||
key UInt64 CODEC(DoubleDelta, LZ4),
|
||||
valueU64 UInt64 CODEC(DoubleDelta, LZ4),
|
||||
valueU32 UInt32 CODEC(DoubleDelta, LZ4),
|
||||
valueU16 UInt16 CODEC(DoubleDelta, LZ4),
|
||||
valueU8 UInt8 CODEC(DoubleDelta, LZ4),
|
||||
valueI64 Int64 CODEC(DoubleDelta, LZ4),
|
||||
valueI32 Int32 CODEC(DoubleDelta, LZ4),
|
||||
valueI16 Int16 CODEC(DoubleDelta, LZ4),
|
||||
valueI8 Int8 CODEC(DoubleDelta, LZ4),
|
||||
valueDT DateTime CODEC(DoubleDelta, LZ4),
|
||||
valueD Date CODEC(DoubleDelta, LZ4)
|
||||
) Engine = MergeTree ORDER BY key;
|
||||
|
||||
INSERT INTO dd_lz4_codec SELECT * FROM reference;
|
||||
|
||||
DROP TABLE IF EXISTS reference;
|
||||
DROP TABLE IF EXISTS doubleDelta;
|
||||
DROP TABLE IF EXISTS dd_lz4_codec;
|
Loading…
Reference in New Issue
Block a user