From b3bd306a5dfd98d00bfef646ae136354ea6cd9d1 Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Wed, 15 Jan 2020 19:39:29 +0300 Subject: [PATCH] improve performance of compact parts --- .../MergeTreeDataPartWriterCompact.cpp | 7 ++-- .../MergeTree/MergeTreeReaderCompact.cpp | 40 ++++++++++++------- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index d157d19baa4..b4d9ec9dd92 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -98,12 +98,13 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block) if (rows_to_write) data_written = true; - /// There could already be enough data to compress into the new block. - if (stream->compressed.offset() >= settings.min_compress_block_size) - stream->compressed.next(); for (const auto & column : columns_list) { + /// There could already be enough data to compress into the new block. + if (stream->compressed.offset() >= settings.min_compress_block_size) + stream->compressed.next(); + size_t old_uncompressed_size = stream->compressed.count(); writeIntBinary(stream->plain_hashing.count(), stream->marks); writeIntBinary(stream->compressed.offset(), stream->marks); diff --git a/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index 973bef49d70..4b3e87f113c 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -82,6 +82,19 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading, size_t read_rows = 0; size_t num_columns = columns.size(); + MutableColumns mutable_columns(num_columns); + auto column_it = columns.begin(); + for (size_t i = 0; i < num_columns; ++i, ++column_it) + { + if (!column_positions[i]) + continue; + + bool append = res_columns[i] != nullptr; + if (!append) + res_columns[i] = column_it->type->createColumn(); + mutable_columns[i] = res_columns[i]->assumeMutable(); + } + while (read_rows < max_rows_to_read) { size_t rows_to_read = data_part->index_granularity.getMarkRows(from_mark); @@ -89,17 +102,11 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading, auto name_and_type = columns.begin(); for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type) { - auto & [name, type] = *name_and_type; - - if (!column_positions[pos]) + if (!res_columns[pos]) continue; - bool append = res_columns[pos] != nullptr; - if (!append) - res_columns[pos] = name_and_type->type->createColumn(); - - /// To keep offsets shared. TODO Very dangerous. Get rid of this. - MutableColumnPtr column = res_columns[pos]->assumeMutable(); + const auto & [name, type] = *name_and_type; + auto & column = mutable_columns[pos]; try { @@ -108,6 +115,7 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading, readData(*column, *type, from_mark, *column_positions[pos], rows_to_read, read_only_offsets[pos]); size_t read_rows_in_column = column->size() - column_size_before_reading; + if (read_rows_in_column < rows_to_read) throw Exception("Cannot read all data in MergeTreeReaderCompact. Rows read: " + toString(read_rows_in_column) + ". Rows expected: " + toString(rows_to_read) + ".", ErrorCodes::CANNOT_READ_ALL_DATA); @@ -124,17 +132,21 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading, e.addMessage("(while reading column " + name + ")"); throw; } - - if (column->size()) - res_columns[pos] = std::move(column); - else - res_columns[pos] = nullptr; } ++from_mark; read_rows += rows_to_read; } + for (size_t i = 0; i < num_columns; ++i) + { + auto & column = mutable_columns[i]; + if (column && column->size()) + res_columns[i] = std::move(column); + else + res_columns[i] = nullptr; + } + next_mark = from_mark; return read_rows;