support codecs in compact parts

This commit is contained in:
Anton Popov 2020-07-07 03:15:02 +03:00
parent 8d65deaf85
commit d6434f61dc
4 changed files with 109 additions and 26 deletions

View File

@ -14,19 +14,23 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
const MergeTreeIndexGranularity & index_granularity_) const MergeTreeIndexGranularity & index_granularity_)
: IMergeTreeDataPartWriter( : IMergeTreeDataPartWriter(
data_part_, columns_list_, metadata_snapshot_, indices_to_recalc_, marks_file_extension_, default_codec_, settings_, index_granularity_) data_part_, columns_list_, metadata_snapshot_, indices_to_recalc_, marks_file_extension_, default_codec_, settings_, index_granularity_)
, plain_file(data_part->volume->getDisk()->writeFile(
part_path + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION,
settings.max_compress_block_size,
WriteMode::Rewrite,
settings.estimated_size,
settings.aio_threshold))
, plain_hashing(*plain_file)
, marks_file(data_part->volume->getDisk()->writeFile(
part_path + MergeTreeDataPartCompact::DATA_FILE_NAME + marks_file_extension_,
4096,
WriteMode::Rewrite))
, marks(*marks_file)
{ {
using DataPart = MergeTreeDataPartCompact; const auto & storage_columns = metadata_snapshot->getColumns();
String data_file_name = DataPart::DATA_FILE_NAME; for (const auto & column : columns_list)
compressed_streams[column.name] = std::make_unique<CompressedStream>(
stream = std::make_unique<Stream>( plain_hashing, storage_columns.getCodecOrDefault(column.name, default_codec));
data_file_name,
data_part->volume->getDisk(),
part_path + data_file_name, DataPart::DATA_FILE_EXTENSION,
part_path + data_file_name, marks_file_extension,
default_codec,
settings.max_compress_block_size,
settings.estimated_size,
settings.aio_threshold);
} }
void MergeTreeDataPartWriterCompact::write( void MergeTreeDataPartWriterCompact::write(
@ -98,14 +102,13 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
for (const auto & column : columns_list) for (const auto & column : columns_list)
{ {
/// There could already be enough data to compress into the new block. auto & stream = compressed_streams[column.name];
if (stream->compressed.offset() >= settings.min_compress_block_size)
stream->compressed.next();
writeIntBinary(stream->plain_hashing.count(), stream->marks); writeIntBinary(plain_hashing.count(), marks);
writeIntBinary(stream->compressed.offset(), stream->marks); writeIntBinary(UInt64(0), marks);
writeColumnSingleGranule(block.getByName(column.name), current_row, rows_to_write); writeColumnSingleGranule(block.getByName(column.name), current_row, rows_to_write);
stream->hashing_buf.next();
} }
++from_mark; ++from_mark;
@ -120,7 +123,7 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
index_granularity.appendMark(rows_written); index_granularity.appendMark(rows_written);
} }
writeIntBinary(rows_to_write, stream->marks); writeIntBinary(rows_to_write, marks);
} }
next_index_offset = 0; next_index_offset = 0;
@ -132,7 +135,7 @@ void MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWithTy
IDataType::SerializeBinaryBulkStatePtr state; IDataType::SerializeBinaryBulkStatePtr state;
IDataType::SerializeBinaryBulkSettings serialize_settings; IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = [this](IDataType::SubstreamPath) -> WriteBuffer * { return &stream->compressed; }; serialize_settings.getter = [this, &column](IDataType::SubstreamPath) -> WriteBuffer * { return &compressed_streams.at(column.name)->hashing_buf; };
serialize_settings.position_independent_encoding = true; serialize_settings.position_independent_encoding = true;
serialize_settings.low_cardinality_max_dictionary_size = 0; serialize_settings.low_cardinality_max_dictionary_size = 0;
@ -150,15 +153,15 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart:
{ {
for (size_t i = 0; i < columns_list.size(); ++i) for (size_t i = 0; i < columns_list.size(); ++i)
{ {
writeIntBinary(stream->plain_hashing.count(), stream->marks); writeIntBinary(plain_hashing.count(), marks);
writeIntBinary(stream->compressed.offset(), stream->marks); writeIntBinary(UInt64(0), marks);
} }
writeIntBinary(0ULL, stream->marks); writeIntBinary(UInt64(0), marks);
} }
stream->finalize(); plain_file->next();
stream->addToChecksums(checksums); marks.next();
stream.reset(); addToChecksums(checksums);
} }
static void fillIndexGranularityImpl( static void fillIndexGranularityImpl(
@ -199,6 +202,33 @@ void MergeTreeDataPartWriterCompact::fillIndexGranularity(size_t index_granulari
rows_in_block); rows_in_block);
} }
void MergeTreeDataPartWriterCompact::addToChecksums(MergeTreeDataPartChecksums & checksums)
{
using uint128 = CityHash_v1_0_2::uint128;
String data_file_name = MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
String marks_file_name = MergeTreeDataPartCompact::DATA_FILE_NAME + marks_file_extension;
checksums.files[data_file_name].is_compressed = true;
size_t uncompressed_size = 0;
uint128 uncompressed_hash{0, 0};
for (const auto & [_, stream] : compressed_streams)
{
uncompressed_size += stream->hashing_buf.count();
uncompressed_hash = CityHash_v1_0_2::CityHash128WithSeed(
reinterpret_cast<char *>(&uncompressed_hash), sizeof(uncompressed_hash), uncompressed_hash);
}
checksums.files[data_file_name].uncompressed_size = uncompressed_size;
checksums.files[data_file_name].uncompressed_hash = uncompressed_hash;
checksums.files[data_file_name].file_size = plain_hashing.count();
checksums.files[data_file_name].file_hash = plain_hashing.getHash();
checksums.files[marks_file_name].file_size = marks.count();
checksums.files[marks_file_name].file_hash = marks.getHash();
}
void MergeTreeDataPartWriterCompact::ColumnsBuffer::add(MutableColumns && columns) void MergeTreeDataPartWriterCompact::ColumnsBuffer::add(MutableColumns && columns)
{ {
if (accumulated_columns.empty()) if (accumulated_columns.empty())

View File

@ -34,7 +34,7 @@ private:
void writeBlock(const Block & block); void writeBlock(const Block & block);
StreamPtr stream; void addToChecksums(MergeTreeDataPartChecksums & checksumns);
Block header; Block header;
@ -53,6 +53,25 @@ private:
}; };
ColumnsBuffer columns_buffer; ColumnsBuffer columns_buffer;
/// compressed -> compressed_buf -> plain_hashing -> plain_file
std::unique_ptr<WriteBufferFromFileBase> plain_file;
HashingWriteBuffer plain_hashing;
struct CompressedStream
{
CompressedWriteBuffer compressed_buf;
HashingWriteBuffer hashing_buf;
CompressedStream(WriteBuffer & buf, const CompressionCodecPtr & codec)
: compressed_buf(buf, codec), hashing_buf(compressed_buf) {}
};
std::unordered_map<String, std::unique_ptr<CompressedStream>> compressed_streams;
/// marks -> marks_file
std::unique_ptr<WriteBufferFromFileBase> marks_file;
HashingWriteBuffer marks;
}; };
} }

View File

@ -0,0 +1,3 @@
12000 11890
11965 11890
5858 11890

View File

@ -0,0 +1,31 @@
DROP TABLE IF EXISTS codecs;
CREATE TABLE codecs (id UInt32, val UInt32, s String)
ENGINE = MergeTree ORDER BY id
SETTINGS min_rows_for_wide_part = 10000;
INSERT INTO codecs SELECT number, number, toString(number) FROM numbers(1000);
SELECT sum(data_compressed_bytes), sum(data_uncompressed_bytes)
FROM system.parts
WHERE table = 'codecs' AND database = currentDatabase();
DROP TABLE codecs;
CREATE TABLE codecs (id UInt32 CODEC(NONE), val UInt32 CODEC(NONE), s String CODEC(NONE))
ENGINE = MergeTree ORDER BY id
SETTINGS min_rows_for_wide_part = 10000;
INSERT INTO codecs SELECT number, number, toString(number) FROM numbers(1000);
SELECT sum(data_compressed_bytes), sum(data_uncompressed_bytes)
FROM system.parts
WHERE table = 'codecs' AND database = currentDatabase();
DROP TABLE codecs;
CREATE TABLE codecs (id UInt32, val UInt32 CODEC(Delta, ZSTD), s String CODEC(ZSTD))
ENGINE = MergeTree ORDER BY id
SETTINGS min_rows_for_wide_part = 10000;
INSERT INTO codecs SELECT number, number, toString(number) FROM numbers(1000);
SELECT sum(data_compressed_bytes), sum(data_uncompressed_bytes)
FROM system.parts
WHERE table = 'codecs' AND database = currentDatabase();
DROP TABLE codecs;