reduce number of seeks in ReaderCompact

This commit is contained in:
CurtizJ 2019-12-25 20:34:23 +03:00
parent aadb948c09
commit c298616eac
2 changed files with 16 additions and 11 deletions

View File

@ -83,7 +83,7 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
try
{
size_t column_size_before_reading = column->size();
readData(name, *type, *column, from_mark, *column_positions[pos], rows_to_read);
readData(*column, *type, from_mark, *column_positions[pos], rows_to_read);
size_t read_rows_in_column = column->size() - column_size_before_reading;
if (read_rows_in_column < rows_to_read)
@ -120,11 +120,11 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
void MergeTreeReaderCompact::readData(
const String & /* name */, const IDataType & type, IColumn & column,
IColumn & column, const IDataType & type,
size_t from_mark, size_t column_position, size_t rows_to_read)
{
/// FIXME seek only if needed
seekToMark(from_mark, column_position);
if (!isContinuousReading(from_mark, column_position))
seekToMark(from_mark, column_position);
IDataType::DeserializeBinaryBulkSettings deserialize_settings;
deserialize_settings.getter = [&](IDataType::SubstreamPath) -> ReadBuffer * { return data_buffer; };
@ -134,6 +134,8 @@ void MergeTreeReaderCompact::readData(
IDataType::DeserializeBinaryBulkStatePtr state;
type.deserializeBinaryBulkStatePrefix(deserialize_settings, state);
type.deserializeBinaryBulkWithMultipleStreams(column, rows_to_read, deserialize_settings, state);
last_read_granule.emplace(from_mark, column_position);
}
@ -203,12 +205,13 @@ void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index)
}
void MergeTreeReaderCompact::seekToStart()
bool MergeTreeReaderCompact::isContinuousReading(size_t mark, size_t column_position)
{
if (cached_buffer)
cached_buffer->seek(0, 0);
if (non_cached_buffer)
non_cached_buffer->seek(0, 0);
if (!last_read_granule)
return false;
const auto & [last_mark, last_column] = *last_read_granule;
return (mark == last_mark && column_position == last_column + 1)
|| (mark == last_mark + 1 && column_position == 0 && last_column == data_part->columns.size() - 1);
}
}

View File

@ -29,6 +29,8 @@ public:
bool canReadIncompleteGranules() const override { return false; }
private:
bool isContinuousReading(size_t mark, size_t column_position);
ReadBuffer * data_buffer;
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
@ -38,12 +40,12 @@ private:
std::vector<std::optional<size_t>> column_positions;
size_t next_mark = 0;
std::optional<std::pair<size_t, size_t>> last_read_granule;
void initMarksLoader();
void seekToStart();
void seekToMark(size_t row_index, size_t column_index);
void readData(const String & name, const IDataType & type, IColumn & column,
void readData(IColumn & column, const IDataType & type,
size_t from_mark, size_t column_position, size_t rows_to_read);
/// Columns that are read.