Merge pull request #12183 from CurtizJ/polymorphic-parts-2

Support codecs in compact parts
This commit is contained in:
Anton Popov 2020-09-09 12:34:13 +03:00 committed by GitHub
commit 32135d96f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 353 additions and 73 deletions

View File

@ -72,9 +72,10 @@ bool CachedCompressedReadBuffer::nextImpl()
}
CachedCompressedReadBuffer::CachedCompressedReadBuffer(
const std::string & path_, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator_, UncompressedCache * cache_)
const std::string & path_, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator_, UncompressedCache * cache_, bool allow_different_codecs_)
: ReadBuffer(nullptr, 0), file_in_creator(std::move(file_in_creator_)), cache(cache_), path(path_), file_pos(0)
{
allow_different_codecs = allow_different_codecs_;
}
void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)

View File

@ -38,7 +38,7 @@ private:
clockid_t clock_type {};
public:
CachedCompressedReadBuffer(const std::string & path, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator, UncompressedCache * cache_);
CachedCompressedReadBuffer(const std::string & path, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator, UncompressedCache * cache_, bool allow_different_codecs_ = false);
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

View File

@ -105,13 +105,24 @@ size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed,
uint8_t method = ICompressionCodec::readMethod(own_compressed_buffer.data());
if (!codec)
{
codec = CompressionCodecFactory::instance().get(method);
}
else if (method != codec->getMethodByte())
throw Exception("Data compressed with different methods, given method byte 0x"
+ getHexUIntLowercase(method)
+ ", previous method byte 0x"
+ getHexUIntLowercase(codec->getMethodByte()),
ErrorCodes::CANNOT_DECOMPRESS);
{
if (allow_different_codecs)
{
codec = CompressionCodecFactory::instance().get(method);
}
else
{
throw Exception("Data compressed with different methods, given method byte 0x"
+ getHexUIntLowercase(method)
+ ", previous method byte 0x"
+ getHexUIntLowercase(codec->getMethodByte()),
ErrorCodes::CANNOT_DECOMPRESS);
}
}
size_compressed_without_checksum = ICompressionCodec::readCompressedBlockSize(own_compressed_buffer.data());
size_decompressed = ICompressionCodec::readDecompressedBlockSize(own_compressed_buffer.data());
@ -163,21 +174,32 @@ void CompressedReadBufferBase::decompress(char * to, size_t size_decompressed, s
uint8_t method = ICompressionCodec::readMethod(compressed_buffer);
if (!codec)
{
codec = CompressionCodecFactory::instance().get(method);
}
else if (codec->getMethodByte() != method)
throw Exception("Data compressed with different methods, given method byte "
+ getHexUIntLowercase(method)
+ ", previous method byte "
+ getHexUIntLowercase(codec->getMethodByte()),
ErrorCodes::CANNOT_DECOMPRESS);
{
if (allow_different_codecs)
{
codec = CompressionCodecFactory::instance().get(method);
}
else
{
throw Exception("Data compressed with different methods, given method byte "
+ getHexUIntLowercase(method)
+ ", previous method byte "
+ getHexUIntLowercase(codec->getMethodByte()),
ErrorCodes::CANNOT_DECOMPRESS);
}
}
codec->decompress(compressed_buffer, size_compressed_without_checksum, to);
}
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
CompressedReadBufferBase::CompressedReadBufferBase(ReadBuffer * in)
: compressed_in(in), own_compressed_buffer(0)
CompressedReadBufferBase::CompressedReadBufferBase(ReadBuffer * in, bool allow_different_codecs_)
: compressed_in(in), own_compressed_buffer(0), allow_different_codecs(allow_different_codecs_)
{
}

View File

@ -26,6 +26,9 @@ protected:
/// Don't checksum on decompressing.
bool disable_checksum = false;
/// Allow reading data, compressed by different codecs from one file.
bool allow_different_codecs;
/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
/// Returns number of compressed bytes read.
size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum);
@ -34,7 +37,7 @@ protected:
public:
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
CompressedReadBufferBase(ReadBuffer * in = nullptr);
CompressedReadBufferBase(ReadBuffer * in = nullptr, bool allow_different_codecs_ = false);
~CompressedReadBufferBase();
/** Disable checksums.

View File

@ -36,20 +36,22 @@ bool CompressedReadBufferFromFile::nextImpl()
return true;
}
CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf)
CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_)
: BufferWithOwnMemory<ReadBuffer>(0), p_file_in(std::move(buf)), file_in(*p_file_in)
{
compressed_in = &file_in;
allow_different_codecs = allow_different_codecs_;
}
CompressedReadBufferFromFile::CompressedReadBufferFromFile(
const std::string & path, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, size_t buf_size)
const std::string & path, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, size_t buf_size, bool allow_different_codecs_)
: BufferWithOwnMemory<ReadBuffer>(0)
, p_file_in(createReadBufferFromFileBase(path, estimated_size, aio_threshold, mmap_threshold, buf_size))
, file_in(*p_file_in)
{
compressed_in = &file_in;
allow_different_codecs = allow_different_codecs_;
}

View File

@ -28,10 +28,11 @@ private:
bool nextImpl() override;
public:
CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf);
CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_ = false);
CompressedReadBufferFromFile(
const std::string & path, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
const std::string & path, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, bool allow_different_codecs_ = false);
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

View File

@ -36,6 +36,11 @@ ASTPtr CompressionCodecDelta::getCodecDesc() const
return makeASTFunction("Delta", literal);
}
void CompressionCodecDelta::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
namespace
{

View File

@ -14,7 +14,10 @@ public:
ASTPtr getCodecDesc() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -339,6 +339,12 @@ ASTPtr CompressionCodecDoubleDelta::getCodecDesc() const
return std::make_shared<ASTIdentifier>("DoubleDelta");
}
void CompressionCodecDoubleDelta::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
hash.update(data_bytes_size);
}
UInt32 CompressionCodecDoubleDelta::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
const auto result = 2 // common header

View File

@ -100,7 +100,10 @@ public:
ASTPtr getCodecDesc() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -254,6 +254,12 @@ ASTPtr CompressionCodecGorilla::getCodecDesc() const
return std::make_shared<ASTIdentifier>("Gorilla");
}
void CompressionCodecGorilla::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
hash.update(data_bytes_size);
}
UInt32 CompressionCodecGorilla::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
const auto result = 2 // common header

View File

@ -97,7 +97,10 @@ public:
ASTPtr getCodecDesc() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -35,6 +35,11 @@ ASTPtr CompressionCodecLZ4::getCodecDesc() const
return std::make_shared<ASTIdentifier>("LZ4");
}
void CompressionCodecLZ4::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
UInt32 CompressionCodecLZ4::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
return LZ4_COMPRESSBOUND(uncompressed_size);

View File

@ -18,6 +18,8 @@ public:
UInt32 getAdditionalSizeAtTheEndOfBuffer() const override { return LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER; }
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;

View File

@ -37,6 +37,12 @@ ASTPtr CompressionCodecMultiple::getCodecDesc() const
return result;
}
void CompressionCodecMultiple::updateHash(SipHash & hash) const
{
for (const auto & codec : codecs)
codec->updateHash(hash);
}
UInt32 CompressionCodecMultiple::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
UInt32 compressed_size = uncompressed_size;

View File

@ -19,7 +19,10 @@ public:
static std::vector<uint8_t> getCodecsBytesFromData(const char * source);
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 decompressed_size) const override;

View File

@ -17,6 +17,11 @@ ASTPtr CompressionCodecNone::getCodecDesc() const
return std::make_shared<ASTIdentifier>("NONE");
}
void CompressionCodecNone::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
UInt32 CompressionCodecNone::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
memcpy(dest, source, source_size);

View File

@ -15,7 +15,10 @@ public:
ASTPtr getCodecDesc() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -646,6 +646,13 @@ ASTPtr CompressionCodecT64::getCodecDesc() const
return makeASTFunction("T64", literal);
}
void CompressionCodecT64::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
hash.update(type_idx);
hash.update(variant);
}
void registerCodecT64(CompressionCodecFactory & factory)
{
auto reg_func = [&](const ASTPtr & arguments, DataTypePtr type) -> CompressionCodecPtr

View File

@ -35,6 +35,8 @@ public:
ASTPtr getCodecDesc() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * src, UInt32 src_size, char * dst) const override;
void doDecompressData(const char * src, UInt32 src_size, char * dst, UInt32 uncompressed_size) const override;

View File

@ -32,6 +32,11 @@ ASTPtr CompressionCodecZSTD::getCodecDesc() const
return makeASTFunction("ZSTD", literal);
}
void CompressionCodecZSTD::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
UInt32 CompressionCodecZSTD::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
return ZSTD_compressBound(uncompressed_size);

View File

@ -21,7 +21,10 @@ public:
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -35,6 +35,13 @@ ASTPtr ICompressionCodec::getFullCodecDesc() const
return result;
}
UInt64 ICompressionCodec::getHash() const
{
SipHash hash;
updateHash(hash);
return hash.get64();
}
UInt32 ICompressionCodec::compress(const char * source, UInt32 source_size, char * dest) const
{
assert(source != nullptr && dest != nullptr);

View File

@ -5,6 +5,7 @@
#include <Compression/CompressionInfo.h>
#include <Core/Types.h>
#include <Parsers/IAST.h>
#include <Common/SipHash.h>
namespace DB
@ -36,6 +37,10 @@ public:
/// "CODEC(LZ4,LZ4HC(5))"
ASTPtr getFullCodecDesc() const;
/// Hash, that depends on codec ast and optional parameters like data type
virtual void updateHash(SipHash & hash) const = 0;
UInt64 getHash() const;
/// Compressed bytes from uncompressed source to dest. Dest should preallocate memory
UInt32 compress(const char * source, UInt32 source_size, char * dest) const;

View File

@ -123,6 +123,11 @@ public:
return bytes_ignored;
}
void ignoreAll()
{
tryIgnore(std::numeric_limits<size_t>::max());
}
/** Reads a single byte. */
bool ALWAYS_INLINE read(char & c)
{

View File

@ -15,19 +15,29 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
: MergeTreeDataPartWriterOnDisk(data_part_, columns_list_, metadata_snapshot_,
indices_to_recalc_, marks_file_extension_,
default_codec_, settings_, index_granularity_)
, plain_file(data_part->volume->getDisk()->writeFile(
part_path + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION,
settings.max_compress_block_size,
WriteMode::Rewrite,
settings.estimated_size,
settings.aio_threshold))
, plain_hashing(*plain_file)
, marks_file(data_part->volume->getDisk()->writeFile(
part_path + MergeTreeDataPartCompact::DATA_FILE_NAME + marks_file_extension_,
4096,
WriteMode::Rewrite))
, marks(*marks_file)
{
using DataPart = MergeTreeDataPartCompact;
String data_file_name = DataPart::DATA_FILE_NAME;
const auto & storage_columns = metadata_snapshot->getColumns();
for (const auto & column : columns_list)
{
auto codec = storage_columns.getCodecOrDefault(column.name, default_codec);
auto & stream = streams_by_codec[codec->getHash()];
if (!stream)
stream = std::make_shared<CompressedStream>(plain_hashing, codec);
stream = std::make_unique<Stream>(
data_file_name,
data_part->volume->getDisk(),
part_path + data_file_name, DataPart::DATA_FILE_EXTENSION,
part_path + data_file_name, marks_file_extension,
default_codec,
settings.max_compress_block_size,
settings.estimated_size,
settings.aio_threshold);
compressed_streams.push_back(stream);
}
}
void MergeTreeDataPartWriterCompact::write(
@ -97,15 +107,21 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
if (rows_to_write)
data_written = true;
for (const auto & column : columns_list)
auto name_and_type = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++name_and_type)
{
writeIntBinary(stream->plain_hashing.count(), stream->marks);
writeIntBinary(stream->compressed.offset(), stream->marks);
auto & stream = compressed_streams[i];
writeColumnSingleGranule(block.getByName(column.name), current_row, rows_to_write);
/// Offset should be 0, because compressed block is written for every granule.
assert(stream->hashing_buf.offset() == 0);
writeIntBinary(plain_hashing.count(), marks);
writeIntBinary(UInt64(0), marks);
writeColumnSingleGranule(block.getByName(name_and_type->name), stream, current_row, rows_to_write);
/// Write one compressed block per column in granule for more optimal reading.
stream->compressed.next();
stream->hashing_buf.next();
}
++from_mark;
@ -120,19 +136,22 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
index_granularity.appendMark(rows_written);
}
writeIntBinary(rows_to_write, stream->marks);
writeIntBinary(rows_to_write, marks);
}
next_index_offset = 0;
next_mark = from_mark;
}
void MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWithTypeAndName & column, size_t from_row, size_t number_of_rows) const
void MergeTreeDataPartWriterCompact::writeColumnSingleGranule(
const ColumnWithTypeAndName & column,
const CompressedStreamPtr & stream,
size_t from_row, size_t number_of_rows)
{
IDataType::SerializeBinaryBulkStatePtr state;
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = [this](IDataType::SubstreamPath) -> WriteBuffer * { return &stream->compressed; };
serialize_settings.getter = [&stream](IDataType::SubstreamPath) -> WriteBuffer * { return &stream->hashing_buf; };
serialize_settings.position_independent_encoding = true;
serialize_settings.low_cardinality_max_dictionary_size = 0;
@ -146,19 +165,25 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart:
if (columns_buffer.size() != 0)
writeBlock(header.cloneWithColumns(columns_buffer.releaseColumns()));
#ifndef NDEBUG
/// Offsets should be 0, because compressed block is written for every granule.
for (const auto & [_, stream] : streams_by_codec)
assert(stream->hashing_buf.offset() == 0);
#endif
if (with_final_mark && data_written)
{
for (size_t i = 0; i < columns_list.size(); ++i)
{
writeIntBinary(stream->plain_hashing.count(), stream->marks);
writeIntBinary(stream->compressed.offset(), stream->marks);
writeIntBinary(plain_hashing.count(), marks);
writeIntBinary(UInt64(0), marks);
}
writeIntBinary(0ULL, stream->marks);
writeIntBinary(UInt64(0), marks);
}
stream->finalize();
stream->addToChecksums(checksums);
stream.reset();
plain_file->next();
marks.next();
addToChecksums(checksums);
}
static void fillIndexGranularityImpl(
@ -199,6 +224,32 @@ void MergeTreeDataPartWriterCompact::fillIndexGranularity(size_t index_granulari
rows_in_block);
}
void MergeTreeDataPartWriterCompact::addToChecksums(MergeTreeDataPartChecksums & checksums)
{
String data_file_name = MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
String marks_file_name = MergeTreeDataPartCompact::DATA_FILE_NAME + marks_file_extension;
size_t uncompressed_size = 0;
CityHash_v1_0_2::uint128 uncompressed_hash{0, 0};
for (const auto & [_, stream] : streams_by_codec)
{
uncompressed_size += stream->hashing_buf.count();
auto stream_hash = stream->hashing_buf.getHash();
uncompressed_hash = CityHash_v1_0_2::CityHash128WithSeed(
reinterpret_cast<char *>(&stream_hash), sizeof(stream_hash), uncompressed_hash);
}
checksums.files[data_file_name].is_compressed = true;
checksums.files[data_file_name].uncompressed_size = uncompressed_size;
checksums.files[data_file_name].uncompressed_hash = uncompressed_hash;
checksums.files[data_file_name].file_size = plain_hashing.count();
checksums.files[data_file_name].file_hash = plain_hashing.getHash();
checksums.files[marks_file_name].file_size = marks.count();
checksums.files[marks_file_name].file_hash = marks.getHash();
}
void MergeTreeDataPartWriterCompact::ColumnsBuffer::add(MutableColumns && columns)
{
if (accumulated_columns.empty())

View File

@ -26,15 +26,9 @@ protected:
void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override;
private:
/// Write single granule of one column (rows between 2 marks)
void writeColumnSingleGranule(
const ColumnWithTypeAndName & column,
size_t from_row,
size_t number_of_rows) const;
void writeBlock(const Block & block);
StreamPtr stream;
void addToChecksums(MergeTreeDataPartChecksums & checksums);
Block header;
@ -53,6 +47,38 @@ private:
};
ColumnsBuffer columns_buffer;
/// hashing_buf -> compressed_buf -> plain_hashing -> plain_file
std::unique_ptr<WriteBufferFromFileBase> plain_file;
HashingWriteBuffer plain_hashing;
struct CompressedStream
{
CompressedWriteBuffer compressed_buf;
HashingWriteBuffer hashing_buf;
CompressedStream(WriteBuffer & buf, const CompressionCodecPtr & codec)
: compressed_buf(buf, codec), hashing_buf(compressed_buf) {}
};
using CompressedStreamPtr = std::shared_ptr<CompressedStream>;
/// Create compressed stream for every different codec.
std::unordered_map<UInt64, CompressedStreamPtr> streams_by_codec;
/// For better performance save pointer to stream by every column.
std::vector<CompressedStreamPtr> compressed_streams;
/// marks -> marks_file
std::unique_ptr<WriteBufferFromFileBase> marks_file;
HashingWriteBuffer marks;
/// Write single granule of one column (rows between 2 marks)
static void writeColumnSingleGranule(
const ColumnWithTypeAndName & column,
const CompressedStreamPtr & stream,
size_t from_row,
size_t number_of_rows);
};
}

View File

@ -76,17 +76,18 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
if (uncompressed_cache)
{
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
fullPath(data_part->volume->getDisk(), full_data_path),
[this, full_data_path, buffer_size]()
{
return data_part->volume->getDisk()->readFile(
full_data_path,
buffer_size,
0,
settings.min_bytes_to_use_direct_io,
settings.min_bytes_to_use_mmap_io);
},
uncompressed_cache);
fullPath(data_part->volume->getDisk(), full_data_path),
[this, full_data_path, buffer_size]()
{
return data_part->volume->getDisk()->readFile(
full_data_path,
buffer_size,
0,
settings.min_bytes_to_use_direct_io,
settings.min_bytes_to_use_mmap_io);
},
uncompressed_cache,
/* allow_different_codecs = */ true);
if (profile_callback_)
buffer->setProfileCallback(profile_callback_, clock_type_);
@ -97,9 +98,10 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
else
{
auto buffer =
std::make_unique<CompressedReadBufferFromFile>(
data_part->volume->getDisk()->readFile(
full_data_path, buffer_size, 0, settings.min_bytes_to_use_direct_io, settings.min_bytes_to_use_mmap_io));
std::make_unique<CompressedReadBufferFromFile>(
data_part->volume->getDisk()->readFile(
full_data_path, buffer_size, 0, settings.min_bytes_to_use_direct_io, settings.min_bytes_to_use_mmap_io),
/* allow_different_codecs = */ true);
if (profile_callback_)
buffer->setProfileCallback(profile_callback_, clock_type_);
@ -248,7 +250,6 @@ void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index)
}
}
bool MergeTreeReaderCompact::isContinuousReading(size_t mark, size_t column_position)
{
if (!last_read_granule)

View File

@ -2,6 +2,7 @@
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <IO/ReadBufferFromFileBase.h>
namespace DB

View File

@ -89,7 +89,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
CompressedReadBuffer uncompressing_buf(compressed_hashing_buf);
HashingReadBuffer uncompressed_hashing_buf(uncompressing_buf);
uncompressed_hashing_buf.tryIgnore(std::numeric_limits<size_t>::max());
uncompressed_hashing_buf.ignoreAll();
return IMergeTreeDataPart::Checksums::Checksum
{
compressed_hashing_buf.count(), compressed_hashing_buf.getHash(),
@ -97,11 +97,24 @@ IMergeTreeDataPart::Checksums checkDataPart(
};
};
/// This function calculates only checksum of file content (compressed or uncompressed).
auto checksum_file = [](const DiskPtr & disk_, const String & file_path)
{
auto file_buf = disk_->readFile(file_path);
HashingReadBuffer hashing_buf(*file_buf);
hashing_buf.ignoreAll();
return IMergeTreeDataPart::Checksums::Checksum{hashing_buf.count(), hashing_buf.getHash()};
};
bool check_uncompressed = true;
/// First calculate checksums for columns data
if (part_type == MergeTreeDataPartType::COMPACT)
{
const auto & file_name = MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
checksums_data.files[file_name] = checksum_compressed_file(disk, path + file_name);
checksums_data.files[file_name] = checksum_file(disk, path + file_name);
/// Uncompressed checksums in compact parts are computed in a complex way.
/// We check only checksum of compressed file.
check_uncompressed = false;
}
else if (part_type == MergeTreeDataPartType::WIDE)
{
@ -142,10 +155,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
if (txt_checksum_it == checksum_files_txt.end() || txt_checksum_it->second.uncompressed_size == 0)
{
/// The file is not compressed.
auto file_buf = disk->readFile(it->path());
HashingReadBuffer hashing_buf(*file_buf);
hashing_buf.tryIgnore(std::numeric_limits<size_t>::max());
checksums_data.files[file_name] = IMergeTreeDataPart::Checksums::Checksum(hashing_buf.count(), hashing_buf.getHash());
checksums_data.files[file_name] = checksum_file(disk, it->path());
}
else /// If we have both compressed and uncompressed in txt, than calculate them
{
@ -158,7 +168,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
return {};
if (require_checksums || !checksums_txt.files.empty())
checksums_txt.checkEqual(checksums_data, true);
checksums_txt.checkEqual(checksums_data, check_uncompressed);
return checksums_data;
}

View File

@ -0,0 +1,9 @@
12000 11890
499500 499500 999
499500 499500 999
11965 11890
499500 499500 999
499500 499500 999
5858 11890
499500 499500 999
499500 499500 999

View File

@ -0,0 +1,52 @@
DROP TABLE IF EXISTS codecs;
CREATE TABLE codecs (id UInt32, val UInt32, s String)
ENGINE = MergeTree ORDER BY id
SETTINGS min_rows_for_wide_part = 10000;
INSERT INTO codecs SELECT number, number, toString(number) FROM numbers(1000);
SELECT sum(data_compressed_bytes), sum(data_uncompressed_bytes)
FROM system.parts
WHERE table = 'codecs' AND database = currentDatabase();
SELECT sum(id), sum(val), max(s) FROM codecs;
DETACH TABLE codecs;
ATTACH table codecs;
SELECT sum(id), sum(val), max(s) FROM codecs;
DROP TABLE codecs;
CREATE TABLE codecs (id UInt32 CODEC(NONE), val UInt32 CODEC(NONE), s String CODEC(NONE))
ENGINE = MergeTree ORDER BY id
SETTINGS min_rows_for_wide_part = 10000;
INSERT INTO codecs SELECT number, number, toString(number) FROM numbers(1000);
SELECT sum(data_compressed_bytes), sum(data_uncompressed_bytes)
FROM system.parts
WHERE table = 'codecs' AND database = currentDatabase();
SELECT sum(id), sum(val), max(s) FROM codecs;
DETACH TABLE codecs;
ATTACH table codecs;
SELECT sum(id), sum(val), max(s) FROM codecs;
DROP TABLE codecs;
CREATE TABLE codecs (id UInt32, val UInt32 CODEC(Delta, ZSTD), s String CODEC(ZSTD))
ENGINE = MergeTree ORDER BY id
SETTINGS min_rows_for_wide_part = 10000;
INSERT INTO codecs SELECT number, number, toString(number) FROM numbers(1000);
SELECT sum(data_compressed_bytes), sum(data_uncompressed_bytes)
FROM system.parts
WHERE table = 'codecs' AND database = currentDatabase();
SELECT sum(id), sum(val), max(s) FROM codecs;
DETACH TABLE codecs;
ATTACH table codecs;
SELECT sum(id), sum(val), max(s) FROM codecs;
DROP TABLE codecs;

View File

@ -0,0 +1,2 @@
all_1_1_0 1
all_1_1_0 1

View File

@ -0,0 +1,15 @@
SET check_query_single_value_result = 0;
DROP TABLE IF EXISTS check_codec;
CREATE TABLE check_codec(a Int, b Int CODEC(Delta, ZSTD)) ENGINE = MergeTree ORDER BY a SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO check_codec SELECT number, number * 2 FROM numbers(1000);
CHECK TABLE check_codec;
DROP TABLE check_codec;
CREATE TABLE check_codec(a Int, b Int CODEC(Delta, ZSTD)) ENGINE = MergeTree ORDER BY a SETTINGS min_bytes_for_wide_part = '10M';
INSERT INTO check_codec SELECT number, number * 2 FROM numbers(1000);
CHECK TABLE check_codec;
DROP TABLE check_codec;