ClickHouse/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h

211 lines
8.2 KiB
C++
Raw Normal View History

2017-06-14 10:50:22 +00:00
#pragma once
#include <Core/Block.h>
2017-06-15 17:01:13 +00:00
#include <common/logger_useful.h>
2018-04-02 12:31:04 +00:00
#include <Interpreters/ExpressionActions.h>
#include <Storages/MergeTree/MarkRange.h>
2017-06-14 10:50:22 +00:00
namespace DB
{
2018-03-05 14:41:43 +00:00
template <typename T>
class ColumnVector;
using ColumnUInt8 = ColumnVector<UInt8>;
2017-06-14 10:50:22 +00:00
class MergeTreeReader;
/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part.
/// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark.
/// Used generally for reading number of rows less than index granularity to decrease cache misses for fat blocks.
class MergeTreeRangeReader
2017-06-14 10:50:22 +00:00
{
public:
2018-11-15 14:06:54 +00:00
MergeTreeRangeReader(MergeTreeReader * merge_tree_reader, MergeTreeRangeReader * prev_reader,
ExpressionActionsPtr alias_actions, ExpressionActionsPtr prewhere_actions,
2018-03-05 14:41:43 +00:00
const String * prewhere_column_name, const Names * ordered_names,
bool always_reorder, bool remove_prewhere_column, bool last_reader_in_chain);
2017-06-15 17:01:13 +00:00
MergeTreeRangeReader() = default;
bool isReadingFinished() const;
size_t numReadRowsInCurrentGranule() const;
size_t numPendingRowsInCurrentGranule() const;
2019-03-28 15:03:49 +00:00
size_t numRowsInCurrentGranule() const;
size_t currentMark() const;
bool isCurrentRangeFinished() const;
bool isInitialized() const { return is_initialized; }
2017-06-15 17:01:13 +00:00
2018-02-13 19:34:15 +00:00
class DelayedStream
{
public:
DelayedStream() = default;
2018-11-14 11:26:44 +00:00
DelayedStream(size_t from_mark, MergeTreeReader * merge_tree_reader);
2018-02-13 19:34:15 +00:00
2018-11-14 11:26:44 +00:00
/// Read @num_rows rows from @from_mark starting from @offset row
2018-02-13 19:34:15 +00:00
/// Returns the number of rows added to block.
/// NOTE: have to return number of rows because block has broken invariant:
/// some columns may have different size (for example, default columns may be zero size).
size_t read(Block & block, size_t from_mark, size_t offset, size_t num_rows);
2018-11-14 11:26:44 +00:00
/// Skip extra rows to current_offset and perform actual reading
2018-02-13 19:34:15 +00:00
size_t finalize(Block & block);
bool isFinished() const { return is_finished; }
private:
size_t current_mark = 0;
2018-11-14 11:26:44 +00:00
/// Offset from current mark in rows
size_t current_offset = 0;
2018-11-14 11:26:44 +00:00
/// Num of rows we have to read
size_t num_delayed_rows = 0;
2018-02-13 19:34:15 +00:00
2018-11-14 11:26:44 +00:00
/// Actual reader of data from disk
MergeTreeReader * merge_tree_reader = nullptr;
const MergeTreeIndexGranularity * index_granularity = nullptr;
bool continue_reading = false;
bool is_finished = true;
2018-02-13 19:34:15 +00:00
2018-11-14 11:26:44 +00:00
/// Current position from the begging of file in rows
2018-02-13 19:34:15 +00:00
size_t position() const;
size_t readRows(Block & block, size_t num_rows);
};
2018-11-14 11:26:44 +00:00
/// Very thin wrapper for DelayedStream
/// Check bounds of read ranges and make steps between marks
2018-02-13 19:34:15 +00:00
class Stream
{
public:
Stream() = default;
2018-11-14 11:26:44 +00:00
Stream(size_t from_mark, size_t to_mark, MergeTreeReader * merge_tree_reader);
2018-02-13 19:34:15 +00:00
2018-10-09 14:30:05 +00:00
/// Returns the number of rows added to block.
2018-02-13 19:34:15 +00:00
size_t read(Block & block, size_t num_rows, bool skip_remaining_rows_in_current_granule);
size_t finalize(Block & block);
void skip(size_t num_rows);
void finish() { current_mark = last_mark; }
bool isFinished() const { return current_mark >= last_mark; }
size_t numReadRowsInCurrentGranule() const { return offset_after_current_mark; }
size_t numPendingRowsInCurrentGranule() const
{
2019-03-25 16:55:48 +00:00
return current_mark_index_granularity - numReadRowsInCurrentGranule();
}
2018-11-14 11:26:44 +00:00
size_t numPendingGranules() const { return last_mark - current_mark; }
size_t numPendingRows() const;
2019-03-28 15:03:49 +00:00
size_t currentMark() const { return current_mark; }
2018-02-13 19:34:15 +00:00
size_t current_mark = 0;
2018-02-13 19:34:15 +00:00
/// Invariant: offset_after_current_mark + skipped_rows_after_offset < index_granularity
size_t offset_after_current_mark = 0;
2018-02-13 19:34:15 +00:00
size_t last_mark = 0;
2018-02-13 19:34:15 +00:00
2018-11-14 11:26:44 +00:00
MergeTreeReader * merge_tree_reader = nullptr;
const MergeTreeIndexGranularity * index_granularity = nullptr;
2019-03-27 15:57:14 +00:00
size_t current_mark_index_granularity = 0;
2018-02-13 19:34:15 +00:00
2019-03-27 16:23:38 +00:00
DelayedStream stream;
2018-02-13 19:34:15 +00:00
void checkNotFinished() const;
void checkEnoughSpaceInCurrentGranule(size_t num_rows) const;
2018-02-13 19:34:15 +00:00
size_t readRows(Block & block, size_t num_rows);
2018-11-14 11:26:44 +00:00
void toNextMark();
2018-02-13 19:34:15 +00:00
};
/// Statistics after next reading step.
class ReadResult
{
public:
using NumRows = std::vector<size_t>;
struct RangeInfo
{
size_t num_granules_read_before_start;
MarkRange range;
};
using RangesInfo = std::vector<RangeInfo>;
const RangesInfo & startedRanges() const { return started_ranges; }
const NumRows & rowsPerGranule() const { return rows_per_granule; }
2018-02-13 19:34:15 +00:00
/// The number of rows were read at LAST iteration in chain. <= num_added_rows + num_filtered_rows.
2018-03-05 14:41:43 +00:00
size_t totalRowsPerGranule() const { return total_rows_per_granule; }
2018-02-13 19:34:15 +00:00
/// The number of rows were added to block as a result of reading chain.
2018-03-05 14:41:43 +00:00
size_t numReadRows() const { return num_read_rows; }
size_t numRowsToSkipInLastGranule() const { return num_rows_to_skip_in_last_granule; }
2018-02-20 14:26:22 +00:00
/// The number of bytes read from disk.
size_t numBytesRead() const { return num_bytes_read; }
/// Filter you need to apply to newly-read columns in order to add them to block.
2018-03-05 14:41:43 +00:00
const ColumnUInt8 * getFilter() const { return filter; }
2018-02-13 19:34:15 +00:00
void addGranule(size_t num_rows);
void adjustLastGranule();
2018-03-05 14:41:43 +00:00
void addRows(size_t rows) { num_read_rows += rows; }
void addRange(const MarkRange & range) { started_ranges.push_back({rows_per_granule.size(), range}); }
2018-02-13 19:34:15 +00:00
/// Set filter or replace old one. Filter must have more zeroes than previous.
2018-03-05 14:41:43 +00:00
void setFilter(const ColumnPtr & new_filter);
2018-02-13 19:34:15 +00:00
/// For each granule calculate the number of filtered rows at the end. Remove them and update filter.
void optimize();
/// Remove all rows from granules.
void clear();
2018-02-20 14:26:22 +00:00
void addNumBytesRead(size_t count) { num_bytes_read += count; }
Block block;
2018-02-13 19:34:15 +00:00
private:
RangesInfo started_ranges;
2018-02-13 19:34:15 +00:00
/// The number of rows read from each granule.
2018-11-14 11:26:44 +00:00
/// Granule here is not number of rows between two marks
/// It's amount of rows per single reading act
NumRows rows_per_granule;
2018-02-13 19:34:15 +00:00
/// Sum(rows_per_granule)
2018-03-05 14:41:43 +00:00
size_t total_rows_per_granule = 0;
/// The number of rows was read at first step. May be zero if no read columns present in part.
2018-02-13 19:34:15 +00:00
size_t num_read_rows = 0;
/// The number of rows was removed from last granule after clear or optimize.
size_t num_rows_to_skip_in_last_granule = 0;
2018-02-20 14:26:22 +00:00
/// Without any filtration.
size_t num_bytes_read = 0;
2018-03-05 14:41:43 +00:00
/// nullptr if prev reader hasn't prewhere_actions. Otherwise filter.size() >= total_rows_per_granule.
ColumnPtr filter_holder;
const ColumnUInt8 * filter = nullptr;
2018-02-13 19:34:15 +00:00
void collapseZeroTails(const IColumn::Filter & filter, IColumn::Filter & new_filter, const NumRows & zero_tails);
size_t countZeroTails(const IColumn::Filter & filter, NumRows & zero_tails) const;
2018-02-13 19:34:15 +00:00
static size_t numZerosInTail(const UInt8 * begin, const UInt8 * end);
};
ReadResult read(size_t max_rows, MarkRanges & ranges);
2017-06-14 10:50:22 +00:00
private:
ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges);
Block continueReadingChain(ReadResult & result);
void executePrewhereActionsAndFilterColumns(ReadResult & result);
2018-03-05 14:41:43 +00:00
void filterBlock(Block & block, const IColumn::Filter & filter) const;
2018-02-13 19:34:15 +00:00
MergeTreeReader * merge_tree_reader = nullptr;
const MergeTreeIndexGranularity * index_granularity = nullptr;
MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly.
2018-02-13 19:34:15 +00:00
const String * prewhere_column_name = nullptr;
const Names * ordered_names = nullptr;
ExpressionActionsPtr alias_actions = nullptr; /// If not nullptr, calculate aliases.
2018-03-05 14:41:43 +00:00
ExpressionActionsPtr prewhere_actions = nullptr; /// If not nullptr, calculate filter.
Stream stream;
2017-06-14 10:50:22 +00:00
bool always_reorder = true;
2018-03-05 14:41:43 +00:00
bool remove_prewhere_column = false;
bool last_reader_in_chain = false;
bool is_initialized = false;
2017-06-14 10:50:22 +00:00
};
}