Trying to add same logic for compact parts

This commit is contained in:
alesapin 2020-09-21 20:35:09 +03:00
parent fbd6f10ff7
commit e87b961095
4 changed files with 111 additions and 19 deletions

View File

@ -3,6 +3,7 @@
namespace DB
{
MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_list_,
@ -30,14 +31,41 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
{
const auto & storage_columns = metadata_snapshot->getColumns();
for (const auto & column : columns_list)
{
auto codec = storage_columns.getCodecOrDefault(column.name, default_codec);
auto & stream = streams_by_codec[codec->getHash()];
if (!stream)
stream = std::make_shared<CompressedStream>(plain_hashing, codec);
addStreams(column.name, *column.type, storage_columns.getCodecDescOrDefault(column.name, default_codec));
}
compressed_streams.push_back(stream);
}
void MergeTreeDataPartWriterCompact::addStreams(const String & name, const IDataType & type, const ASTPtr & effective_codec_desc)
{
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path, const IDataType & substream_type)
{
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Shared offsets for Nested type.
if (compressed_streams.count(stream_name))
return;
CompressionCodecPtr compression_codec;
if (IDataType::isSpecialCompressionAllowed(substream_path))
{
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, &substream_type, default_codec);
}
else
{
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, nullptr, default_codec, true);
}
if (compression_codec == nullptr)
compression_codec = CompressionCodecFactory::instance().getDefaultCodec();
UInt64 codec_id = compression_codec->getHash();
auto & stream = streams_by_codec[codec_id];
if (!stream)
stream = std::make_shared<CompressedStream>(codec_id, plain_hashing, compression_codec);
compressed_streams.emplace(stream_name, stream);
};
IDataType::SubstreamPath stream_path;
type.enumerateStreams(callback, stream_path);
}
void MergeTreeDataPartWriterCompact::write(
@ -110,18 +138,27 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
auto name_and_type = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++name_and_type)
{
auto & stream = compressed_streams[i];
std::unordered_map<UInt64, CompressedStreamPtr> used_streams;
auto stream_getter = [&, this](const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
{
String stream_name = IDataType::getFileNameForStream(name_and_type->name, substream_path);
auto & result_stream = compressed_streams[stream_name];
/// Offset should be 0, because compressed block is written for every granule.
assert(stream->hashing_buf.offset() == 0);
/// Offset should be 0, because compressed block is written for every granule.
if (used_streams.try_emplace(result_stream->codec_id, result_stream).second)
assert(result_stream->hashing_buf.offset() == 0);
return &result_stream->hashing_buf;
};
writeIntBinary(plain_hashing.count(), marks);
writeIntBinary(UInt64(0), marks);
writeColumnSingleGranule(block.getByName(name_and_type->name), stream, current_row, rows_to_write);
writeColumnSingleGranule(block.getByName(name_and_type->name), stream_getter, current_row, rows_to_write);
/// Write one compressed block per column in granule for more optimal reading.
stream->hashing_buf.next();
for (auto & [_, stream] : used_streams)
stream->hashing_buf.next();
}
++from_mark;
@ -145,13 +182,14 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
void MergeTreeDataPartWriterCompact::writeColumnSingleGranule(
const ColumnWithTypeAndName & column,
const CompressedStreamPtr & stream,
size_t from_row, size_t number_of_rows)
IDataType::OutputStreamGetter stream_getter,
size_t from_row,
size_t number_of_rows)
{
IDataType::SerializeBinaryBulkStatePtr state;
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = [&stream](IDataType::SubstreamPath) -> WriteBuffer * { return &stream->hashing_buf; };
serialize_settings.getter = stream_getter;
serialize_settings.position_independent_encoding = true;
serialize_settings.low_cardinality_max_dictionary_size = 0;

View File

@ -30,6 +30,8 @@ private:
void addToChecksums(MergeTreeDataPartChecksums & checksums);
void addStreams(const String & name, const IDataType & type, const ASTPtr & effective_codec_desc);
Block header;
/** Simplified SquashingTransform. The original one isn't suitable in this case
@ -54,11 +56,14 @@ private:
struct CompressedStream
{
UInt64 codec_id;
CompressedWriteBuffer compressed_buf;
HashingWriteBuffer hashing_buf;
CompressedStream(WriteBuffer & buf, const CompressionCodecPtr & codec)
: compressed_buf(buf, codec), hashing_buf(compressed_buf) {}
CompressedStream(UInt64 codec_id_, WriteBuffer & buf, const CompressionCodecPtr & codec)
: codec_id(codec_id_)
, compressed_buf(buf, codec)
, hashing_buf(compressed_buf) {}
};
using CompressedStreamPtr = std::shared_ptr<CompressedStream>;
@ -67,7 +72,7 @@ private:
std::unordered_map<UInt64, CompressedStreamPtr> streams_by_codec;
/// For better performance save pointer to stream by every column.
std::vector<CompressedStreamPtr> compressed_streams;
std::unordered_map<String, CompressedStreamPtr> compressed_streams;
/// marks -> marks_file
std::unique_ptr<WriteBufferFromFileBase> marks_file;
@ -76,7 +81,7 @@ private:
/// Write single granule of one column (rows between 2 marks)
static void writeColumnSingleGranule(
const ColumnWithTypeAndName & column,
const CompressedStreamPtr & stream,
IDataType::OutputStreamGetter stream_getter,
size_t from_row,
size_t number_of_rows);
};

View File

@ -7,3 +7,9 @@ CREATE TABLE default.columns_with_multiple_streams\n(\n `field0` Nullable(Int
1 1 [[1]] (1,[1])
2 2 [[2]] (2,[2])
3 3 [[3]] (3,[3])
1 1 [[1]] (1,[1])
1 1 [[1]] (1,[1])
CREATE TABLE default.columns_with_multiple_streams_compact\n(\n `field0` Nullable(Int64) CODEC(Delta(2), LZ4),\n `field1` Nullable(UInt8) CODEC(Delta(8), LZ4),\n `field2` Array(Array(Int64)) CODEC(Delta(8), LZ4),\n `field3` Tuple(UInt32, Array(UInt64)) CODEC(T64, Default)\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_rows_for_wide_part = 100000, min_bytes_for_wide_part = 100000, index_granularity = 8192
1 1 [[1]] (1,[1])
2 2 [[2]] (2,[2])
CREATE TABLE default.columns_with_multiple_streams_compact\n(\n `field0` Nullable(Int64) CODEC(Delta(2), LZ4),\n `field1` Nullable(UInt8) CODEC(Delta(8), LZ4),\n `field2` Array(Array(Int64)) CODEC(Delta(8), LZ4),\n `field3` Tuple(UInt32, Array(UInt64)) CODEC(Delta(8), Default)\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_rows_for_wide_part = 100000, min_bytes_for_wide_part = 100000, index_granularity = 8192

View File

@ -1,5 +1,7 @@
DROP TABLE IF EXISTS columns_with_multiple_streams;
SET mutations_sync = 2;
CREATE TABLE columns_with_multiple_streams (
field0 Nullable(Int64) CODEC(Delta(2), LZ4),
field1 Nullable(Int64) CODEC(Delta, LZ4),
@ -33,7 +35,48 @@ SHOW CREATE TABLE columns_with_multiple_streams;
INSERT INTO columns_with_multiple_streams VALUES(3, 3, [[3]], tuple(3, [3]));
OPTIMIZE TABLE columns_with_multiple_streams FINAL;
SELECT * FROM columns_with_multiple_streams ORDER BY field0;
DROP TABLE IF EXISTS columns_with_multiple_streams;
CREATE TABLE columns_with_multiple_streams_compact (
field0 Nullable(Int64) CODEC(Delta(2), LZ4),
field1 Nullable(Int64) CODEC(Delta, LZ4),
field2 Array(Array(Int64)) CODEC(Delta, LZ4),
field3 Tuple(UInt32, Array(UInt64)) CODEC(Delta, Default)
)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS min_rows_for_wide_part = 100000, min_bytes_for_wide_part = 100000;
INSERT INTO columns_with_multiple_streams_compact VALUES(1, 1, [[1]], tuple(1, [1]));
SELECT * FROM columns_with_multiple_streams_compact;
DETACH TABLE columns_with_multiple_streams_compact;
ATTACH TABLE columns_with_multiple_streams_compact;
SELECT * FROM columns_with_multiple_streams_compact;
ALTER TABLE columns_with_multiple_streams_compact MODIFY COLUMN field1 Nullable(UInt8);
INSERT INTO columns_with_multiple_streams_compact VALUES(2, 2, [[2]], tuple(2, [2]));
SHOW CREATE TABLE columns_with_multiple_streams_compact;
SELECT * FROM columns_with_multiple_streams_compact ORDER BY field0;
ALTER TABLE columns_with_multiple_streams_compact MODIFY COLUMN field3 CODEC(Delta, Default);
SELECT * FROM columns_with_multiple_streams_compact ORDER BY field0;
SHOW CREATE TABLE columns_with_multiple_streams_compact;
INSERT INTO columns_with_multiple_streams_compact VALUES(3, 3, [[3]], tuple(3, [3]));
SELECT * FROM columns_with_multiple_streams_compact ORDER BY field0;
DROP TABLE IF EXISTS columns_with_multiple_streams_compact;