improve performance of compact parts

This commit is contained in:
CurtizJ 2020-01-15 19:39:29 +03:00
parent 27750f0cd4
commit b3bd306a5d
2 changed files with 30 additions and 17 deletions

View File

@ -98,12 +98,13 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
if (rows_to_write) if (rows_to_write)
data_written = true; data_written = true;
for (const auto & column : columns_list)
{
/// There could already be enough data to compress into the new block. /// There could already be enough data to compress into the new block.
if (stream->compressed.offset() >= settings.min_compress_block_size) if (stream->compressed.offset() >= settings.min_compress_block_size)
stream->compressed.next(); stream->compressed.next();
for (const auto & column : columns_list)
{
size_t old_uncompressed_size = stream->compressed.count(); size_t old_uncompressed_size = stream->compressed.count();
writeIntBinary(stream->plain_hashing.count(), stream->marks); writeIntBinary(stream->plain_hashing.count(), stream->marks);
writeIntBinary(stream->compressed.offset(), stream->marks); writeIntBinary(stream->compressed.offset(), stream->marks);

View File

@ -82,6 +82,19 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
size_t read_rows = 0; size_t read_rows = 0;
size_t num_columns = columns.size(); size_t num_columns = columns.size();
MutableColumns mutable_columns(num_columns);
auto column_it = columns.begin();
for (size_t i = 0; i < num_columns; ++i, ++column_it)
{
if (!column_positions[i])
continue;
bool append = res_columns[i] != nullptr;
if (!append)
res_columns[i] = column_it->type->createColumn();
mutable_columns[i] = res_columns[i]->assumeMutable();
}
while (read_rows < max_rows_to_read) while (read_rows < max_rows_to_read)
{ {
size_t rows_to_read = data_part->index_granularity.getMarkRows(from_mark); size_t rows_to_read = data_part->index_granularity.getMarkRows(from_mark);
@ -89,17 +102,11 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
auto name_and_type = columns.begin(); auto name_and_type = columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type) for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{ {
auto & [name, type] = *name_and_type; if (!res_columns[pos])
if (!column_positions[pos])
continue; continue;
bool append = res_columns[pos] != nullptr; const auto & [name, type] = *name_and_type;
if (!append) auto & column = mutable_columns[pos];
res_columns[pos] = name_and_type->type->createColumn();
/// To keep offsets shared. TODO Very dangerous. Get rid of this.
MutableColumnPtr column = res_columns[pos]->assumeMutable();
try try
{ {
@ -108,6 +115,7 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
readData(*column, *type, from_mark, *column_positions[pos], rows_to_read, read_only_offsets[pos]); readData(*column, *type, from_mark, *column_positions[pos], rows_to_read, read_only_offsets[pos]);
size_t read_rows_in_column = column->size() - column_size_before_reading; size_t read_rows_in_column = column->size() - column_size_before_reading;
if (read_rows_in_column < rows_to_read) if (read_rows_in_column < rows_to_read)
throw Exception("Cannot read all data in MergeTreeReaderCompact. Rows read: " + toString(read_rows_in_column) + throw Exception("Cannot read all data in MergeTreeReaderCompact. Rows read: " + toString(read_rows_in_column) +
". Rows expected: " + toString(rows_to_read) + ".", ErrorCodes::CANNOT_READ_ALL_DATA); ". Rows expected: " + toString(rows_to_read) + ".", ErrorCodes::CANNOT_READ_ALL_DATA);
@ -124,17 +132,21 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
e.addMessage("(while reading column " + name + ")"); e.addMessage("(while reading column " + name + ")");
throw; throw;
} }
if (column->size())
res_columns[pos] = std::move(column);
else
res_columns[pos] = nullptr;
} }
++from_mark; ++from_mark;
read_rows += rows_to_read; read_rows += rows_to_read;
} }
for (size_t i = 0; i < num_columns; ++i)
{
auto & column = mutable_columns[i];
if (column && column->size())
res_columns[i] = std::move(column);
else
res_columns[i] = nullptr;
}
next_mark = from_mark; next_mark = from_mark;
return read_rows; return read_rows;