added MergeTreeRangeReader

This commit is contained in:
Nikolai Kochetov 2017-06-14 13:50:22 +03:00 committed by alexey-milovidov
parent b818a116ad
commit c6d5ef6d30
8 changed files with 115 additions and 28 deletions

View File

@ -103,4 +103,11 @@ void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t o
}
}
void CachedCompressedReadBuffer::position(size_t & offset_in_compressed_file, size_t & offset_in_decompressed_block) const
{
offset_in_compressed_file = file_pos - owned_cell->compressed_size;
offset_in_decompressed_block = pos - working_buffer.begin();
}
}

View File

@ -47,6 +47,7 @@ public:
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);
void position(size_t & offset_in_compressed_file, size_t & offset_in_decompressed_block) const;
void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE)
{

View File

@ -236,31 +236,34 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
}
else
{
size_t space_left = std::max(1LU, max_block_size_marks);
size_t space_left = std::max(1LU, max_block_size_rows);
while (!task->mark_ranges.empty() && space_left && !isCancelled())
{
auto & range = task->mark_ranges.back();
size_t marks_to_read = std::min(range.end - range.begin, space_left);
if (task->size_predictor)
if (!task->current_range_reader)
{
size_t recommended_marks = task->size_predictor->estimateNumMarks(preferred_block_size_bytes, storage.index_granularity);
if (res && recommended_marks < 1)
break;
marks_to_read = std::min(marks_to_read, std::max(1LU, recommended_marks));
auto & range = task->mark_ranges.back();
task->current_range_reader = reader->readRange(range.begin, range.end);
task->mark_ranges.pop_back();
}
reader->readRange(range.begin, range.begin + marks_to_read, res);
size_t rows_to_read = max_block_size_rows;
// size_t marks_to_read = std::min(range.end - range.begin, space_left);
if (task->size_predictor)
{
size_t recommended_rows = task->size_predictor->estimateNumRows(preferred_block_size_bytes);
if (res && rows_to_read < 1)
break;
rows_to_read = std::min(rows_to_read, std::max(1LU, recommended_rows));
}
if (!task->current_range_reader->read(res, rows_to_read))
task->current_range_reader = std::experimental::nullopt;
if (task->size_predictor)
task->size_predictor->update(res);
space_left -= marks_to_read;
range.begin += marks_to_read;
if (range.begin == range.end)
task->mark_ranges.pop_back();
space_left -= rows_to_read;
}
/// In the case of isCancelled.

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MergeTreeRangeReader.h>
namespace DB
{
@ -45,6 +46,8 @@ struct MergeTreeReadTask
const bool should_reorder;
/// Used to satistfy preferred_block_size_bytes limitation
MergeTreeBlockSizePredictorPtr size_predictor;
/// used to save current range processing status
std::experimental::optional<MergeTreeRangeReader> current_range_reader;
MergeTreeReadTask(
const MergeTreeData::DataPartPtr & data_part, const MarkRanges & mark_ranges, const std::size_t part_index_in_query,

View File

@ -0,0 +1,31 @@
#include <Storages/MergeTree/MergeTreeReader.h>
namespace DB
{
MergeTreeRangeReader::MergeTreeRangeReader(
MergeTreeReader & merge_tree_reader, size_t from_mark, size_t to_mark, size_t index_granularity)
: merge_tree_reader(merge_tree_reader), current_mark(from_mark), last_mark(to_mark)
, read_rows_after_current_mark(0), index_granularity(index_granularity), is_reading_started(false)
{
}
bool MergeTreeRangeReader::read(Block & res, size_t max_rows_to_read)
{
size_t rows_to_read = (last_mark - current_mark) * index_granularity - read_rows_after_current_mark;
rows_to_read = std::min(rows_to_read, max_rows_to_read);
if (rows_to_read == 0)
return false;
merge_tree_reader.get().readRange(current_mark, !is_reading_started, rows_to_read, res);
read_rows_after_current_mark += rows_to_read;
size_t read_parts = read_rows_after_current_mark / index_granularity;
current_mark += read_parts;
read_rows_after_current_mark -= index_granularity * read_parts;
return current_mark != last_mark;
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <Core/Block.h>
namespace DB
{
class MergeTreeReader;
// void MergeTreeReader::readRange(size_t from_mark, bool is_first_mark_in_range, size_t max_rows_to_read, Block & res);
class MergeTreeRangeReader
{
public:
bool read(Block & res, size_t max_rows_to_read);
// MergeTreeRangeReader & operator=(MergeTreeRangeReader && other) = default;
private:
MergeTreeRangeReader(MergeTreeReader & merge_tree_reader, size_t from_mark, size_t to_mark, size_t index_granularity);
std::reference_wrapper<MergeTreeReader> merge_tree_reader;
size_t current_mark;
size_t last_mark;
size_t read_rows_after_current_mark;
size_t index_granularity;
bool is_reading_started;
friend class MergeTreeReader;
};
}

View File

@ -73,11 +73,16 @@ const MergeTreeReader::ValueSizeMap & MergeTreeReader::getAvgValueSizeHints() co
}
void MergeTreeReader::readRange(size_t from_mark, size_t to_mark, Block & res)
MergeTreeRangeReader MergeTreeReader::readRange(size_t from_mark, size_t to_mark)
{
return MergeTreeRangeReader(*this, from_mark, to_mark, storage.index_granularity);
}
void MergeTreeReader::readRange(size_t from_mark, bool is_first_mark_in_range, size_t max_rows_to_read, Block & res)
{
try
{
size_t max_rows_to_read = (to_mark - from_mark) * storage.index_granularity;
/// Pointers to offset columns that are common to the nested data structure columns.
/// If append is true, then the value will be equal to nullptr and will be used only to
@ -137,7 +142,7 @@ void MergeTreeReader::readRange(size_t from_mark, size_t to_mark, Block & res)
try
{
readData(column.name, *column.type, *column.column, from_mark, max_rows_to_read, 0, read_offsets);
readData(column.name, *column.type, *column.column, from_mark, is_first_mark_in_range, max_rows_to_read, 0, read_offsets);
}
catch (Exception & e)
{
@ -152,7 +157,7 @@ void MergeTreeReader::readRange(size_t from_mark, size_t to_mark, Block & res)
/// NOTE: positions for all streams must be kept in sync. In particular, even if for some streams there are no rows to be read,
/// you must ensure that no seeks are skipped and at this point they all point to to_mark.
cur_mark_idx = to_mark;
// cur_mark_idx = to_mark;
}
catch (Exception & e)
{
@ -160,7 +165,7 @@ void MergeTreeReader::readRange(size_t from_mark, size_t to_mark, Block & res)
storage.reportBrokenPart(data_part->name);
/// Better diagnostics.
e.addMessage("(while reading from part " + path + " from mark " + toString(from_mark) + " to " + toString(to_mark) + ")");
e.addMessage("(while reading from part " + path + " from mark " + toString(from_mark) + " with max_rows_to_read = " + toString(max_rows_to_read) + ")");
throw;
}
catch (...)
@ -434,7 +439,7 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con
void MergeTreeReader::readData(
const String & name, const IDataType & type, IColumn & column,
size_t from_mark, size_t max_rows_to_read,
size_t from_mark, bool is_first_mark_in_range, size_t max_rows_to_read,
size_t level, bool read_offsets)
{
if (type.isNullable())
@ -449,7 +454,7 @@ void MergeTreeReader::readData(
std::string filename = name + NULL_MAP_EXTENSION;
Stream & stream = *(streams.at(filename));
if (from_mark != cur_mark_idx)
if (is_first_mark_in_range)
stream.seekToMark(from_mark);
IColumn & col8 = nullable_col.getNullMapConcreteColumn();
DataTypeUInt8{}.deserializeBinaryBulk(col8, *stream.data_buffer, max_rows_to_read, 0);
@ -463,7 +468,7 @@ void MergeTreeReader::readData(
if (read_offsets)
{
Stream & stream = *streams[DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)];
if (from_mark != cur_mark_idx)
if (is_first_mark_in_range)
stream.seekToMark(from_mark);
type_arr->deserializeOffsets(
column,
@ -509,7 +514,7 @@ void MergeTreeReader::readData(
return;
double & avg_value_size_hint = avg_value_size_hints[name];
if (from_mark != cur_mark_idx)
if (is_first_mark_in_range)
stream.seekToMark(from_mark);
type.deserializeBinaryBulk(column, *stream.data_buffer, max_rows_to_read, avg_value_size_hint);

View File

@ -3,6 +3,7 @@
#include <Storages/MarkCache.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeRangeReader.h>
#include <Core/NamesAndTypes.h>
@ -40,7 +41,9 @@ public:
/// If columns are not present in the block, adds them. If they are present - appends the values that have been read.
/// Do not adds columns, if the files are not present for them (to add them, call fillMissingColumns).
/// Block should contain either no columns from the columns field, or all columns for which files are present.
void readRange(size_t from_mark, size_t to_mark, Block & res);
void readRange(size_t from_mark, size_t to_mark, Block & res)
{ return readRange(from_mark, true, (to_mark - from_mark) * storage.index_granularity, res); }
MergeTreeRangeReader readRange(size_t from_mark, size_t to_mark);
/// Add columns from ordered_names that are not present in the block.
/// Missing columns are added in the order specified by ordered_names.
@ -102,7 +105,7 @@ private:
MergeTreeData::DataPartPtr data_part;
FileStreams streams;
size_t cur_mark_idx = 0; /// Mark index corresponding to the current position for all streams.
// size_t cur_mark_idx = 0; /// Mark index corresponding to the current position for all streams.
/// Columns that are read.
NamesAndTypesList columns;
@ -123,10 +126,14 @@ private:
void readData(
const String & name, const IDataType & type, IColumn & column,
size_t from_mark, size_t max_rows_to_read,
size_t from_mark, bool is_first_mark_in_range, size_t max_rows_to_read,
size_t level = 0, bool read_offsets = true);
void fillMissingColumnsImpl(Block & res, const Names & ordered_names, bool always_reorder);
void readRange(size_t from_mark, bool is_first_mark_in_range, size_t max_rows_to_read, Block & res);
friend class MergeTreeRangeReader;
};
}