Less changes

This commit is contained in:
alesapin 2020-09-22 15:16:15 +03:00
parent b57708c3dd
commit 96d06c6ae6
4 changed files with 24 additions and 14 deletions

View File

@ -59,7 +59,7 @@ void MergeTreeDataPartWriterCompact::addStreams(const String & name, const IData
UInt64 codec_id = compression_codec->getHash();
auto & stream = streams_by_codec[codec_id];
if (!stream)
stream = std::make_shared<CompressedStream>(codec_id, plain_hashing, compression_codec);
stream = std::make_shared<CompressedStream>(plain_hashing, compression_codec);
compressed_streams.emplace(stream_name, stream);
};
@ -138,27 +138,32 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
auto name_and_type = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++name_and_type)
{
std::unordered_map<UInt64, CompressedStreamPtr> used_streams;
CompressedStreamPtr prev_stream;
auto stream_getter = [&, this](const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
{
String stream_name = IDataType::getFileNameForStream(name_and_type->name, substream_path);
auto & result_stream = compressed_streams[stream_name];
/// Offset should be 0, because compressed block is written for every granule.
if (used_streams.try_emplace(result_stream->codec_id, result_stream).second)
auto & result_stream = compressed_streams[stream_name];
/// Write one compressed block per column in granule for more optimal reading.
if (prev_stream && prev_stream != result_stream)
{
/// Offset should be 0, because compressed block is written for every granule.
assert(result_stream->hashing_buf.offset() == 0);
prev_stream->hashing_buf.next();
}
prev_stream = result_stream;
return &result_stream->hashing_buf;
};
writeIntBinary(plain_hashing.count(), marks);
writeIntBinary(UInt64(0), marks);
writeColumnSingleGranule(block.getByName(name_and_type->name), stream_getter, current_row, rows_to_write);
/// Write one compressed block per column in granule for more optimal reading.
for (auto & [_, stream] : used_streams)
stream->hashing_buf.next();
prev_stream->hashing_buf.next();
}
++from_mark;

View File

@ -56,13 +56,11 @@ private:
struct CompressedStream
{
UInt64 codec_id;
CompressedWriteBuffer compressed_buf;
HashingWriteBuffer hashing_buf;
CompressedStream(UInt64 codec_id_, WriteBuffer & buf, const CompressionCodecPtr & codec)
: codec_id(codec_id_)
, compressed_buf(buf, codec)
CompressedStream(WriteBuffer & buf, const CompressionCodecPtr & codec)
: compressed_buf(buf, codec)
, hashing_buf(compressed_buf) {}
};

View File

@ -9,7 +9,12 @@ CREATE TABLE default.columns_with_multiple_streams\n(\n `field0` Nullable(Int
3 3 [[3]] (3,[3])
1 1 [[1]] (1,[1])
1 1 [[1]] (1,[1])
CREATE TABLE default.columns_with_multiple_streams_compact\n(\n `field0` Nullable(Int64) CODEC(Delta(2), LZ4),\n `field1` Nullable(UInt8) CODEC(Delta(8), LZ4),\n `field2` Array(Array(Int64)) CODEC(Delta(8), LZ4),\n `field3` Tuple(UInt32, Array(UInt64)) CODEC(T64, Default)\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_rows_for_wide_part = 100000, min_bytes_for_wide_part = 100000, index_granularity = 8192
CREATE TABLE default.columns_with_multiple_streams_compact\n(\n `field0` Nullable(Int64) CODEC(Delta(2), LZ4),\n `field1` Nullable(UInt8) CODEC(Delta(8), LZ4),\n `field2` Array(Array(Int64)) CODEC(Delta(8), LZ4),\n `field3` Tuple(UInt32, Array(UInt64)) CODEC(Delta(8), Default)\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_rows_for_wide_part = 100000, min_bytes_for_wide_part = 100000, index_granularity = 8192
1 1 [[1]] (1,[1])
2 2 [[2]] (2,[2])
1 1 [[1]] (1,[1])
2 2 [[2]] (2,[2])
CREATE TABLE default.columns_with_multiple_streams_compact\n(\n `field0` Nullable(Int64) CODEC(Delta(2), LZ4),\n `field1` Nullable(UInt8) CODEC(Delta(8), LZ4),\n `field2` Array(Array(Int64)) CODEC(Delta(8), LZ4),\n `field3` Tuple(UInt32, Array(UInt64)) CODEC(Delta(8), Default)\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_rows_for_wide_part = 100000, min_bytes_for_wide_part = 100000, index_granularity = 8192
1 1 [[1]] (1,[1])
2 2 [[2]] (2,[2])
3 3 [[3]] (3,[3])

View File

@ -41,6 +41,8 @@ SELECT * FROM columns_with_multiple_streams ORDER BY field0;
DROP TABLE IF EXISTS columns_with_multiple_streams;
DROP TABLE IF EXISTS columns_with_multiple_streams_compact;
CREATE TABLE columns_with_multiple_streams_compact (
field0 Nullable(Int64) CODEC(Delta(2), LZ4),
field1 Nullable(Int64) CODEC(Delta, LZ4),