Adjust MergeTreeIndexReader

This commit is contained in:
kssenii 2021-10-19 10:51:18 +03:00
parent 49106f407f
commit df5dc4e991
3 changed files with 17 additions and 4 deletions

View File

@ -54,6 +54,17 @@ MergeTreeIndexReader::MergeTreeIndexReader(
std::move(settings));
version = index_format.version;
auto current_task_last_mark_range = std::max_element(all_mark_ranges_.begin(), all_mark_ranges_.end(),
[&](const MarkRange & range1, const MarkRange & range2)
{
return range1.end < range2.end;
});
size_t current_task_last_mark = 0;
if (current_task_last_mark_range != all_mark_ranges_.end())
current_task_last_mark = current_task_last_mark_range->end;
stream->adjustForRange(0, current_task_last_mark);
stream->seekToStart();
}

View File

@ -122,7 +122,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
}
size_t MergeTreeReaderCompact::readRows(
size_t from_mark, size_t /* current_task_last_mark */, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{
if (continue_reading)
from_mark = next_mark;
@ -158,7 +158,7 @@ size_t MergeTreeReaderCompact::readRows(
auto & column = res_columns[pos];
size_t column_size_before_reading = column->size();
readData(column_from_part, column, from_mark, *column_positions[pos], rows_to_read, read_only_offsets[pos]);
readData(column_from_part, column, from_mark, current_task_last_mark, *column_positions[pos], rows_to_read, read_only_offsets[pos]);
size_t read_rows_in_column = column->size() - column_size_before_reading;
if (read_rows_in_column < rows_to_read)
@ -192,7 +192,7 @@ size_t MergeTreeReaderCompact::readRows(
void MergeTreeReaderCompact::readData(
const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets)
size_t from_mark, size_t current_task_last_mark, size_t column_position, size_t rows_to_read, bool only_offsets)
{
const auto & [name, type] = name_and_type;
@ -204,6 +204,8 @@ void MergeTreeReaderCompact::readData(
if (only_offsets && (substream_path.size() != 1 || substream_path[0].type != ISerialization::Substream::ArraySizes))
return nullptr;
/// For asynchronous reading from remote fs.
data_buffer->setReadUntilPosition(marks_loader.getMark(current_task_last_mark).offset_in_compressed_file);
return data_buffer;
};

View File

@ -58,7 +58,7 @@ private:
void seekToMark(size_t row_index, size_t column_index);
void readData(const NameAndTypePair & name_and_type, ColumnPtr & column, size_t from_mark,
size_t column_position, size_t rows_to_read, bool only_offsets);
size_t current_task_last_mark, size_t column_position, size_t rows_to_read, bool only_offsets);
/// Returns maximal value of granule size in compressed file from @mark_ranges.
/// This value is used as size of read buffer.