polymorphic parts (development) fix adjust last granule

This commit is contained in:
CurtizJ 2019-12-02 20:10:22 +03:00
parent a3875a6ca2
commit 511ae82e27
6 changed files with 29 additions and 4 deletions

View File

@ -24,8 +24,8 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_AGGREGATE_FUNCTION;
}
extern const int LOGICAL_ERROR;
}
void AggregateFunctionFactory::registerFunction(const String & name, Creator creator, CaseSensitiveness case_sensitiveness)

View File

@ -31,6 +31,8 @@ public:
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
virtual size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res) = 0;
virtual bool canReadIncompleteGranules() const = 0;
virtual ~IMergeTreeReader();
const ValueSizeMap & getAvgValueSizeHints() const;

View File

@ -472,6 +472,19 @@ size_t MergeTreeRangeReader::Stream::numPendingRows() const
return rows_between_marks - offset_after_current_mark;
}
size_t MergeTreeRangeReader::Stream::ceilRowsToCompleteGranules(size_t rows_num) const
{
/// FIXME suboptimal
size_t result = 0;
size_t from_mark = current_mark;
while (result < rows_num && from_mark < last_mark)
result += index_granularity->getMarkRows(from_mark++);
return result;
}
bool MergeTreeRangeReader::isCurrentRangeFinished() const
{
return prev_reader ? prev_reader->isCurrentRangeFinished() : stream.isFinished();
@ -595,14 +608,19 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t
ranges.pop_back();
}
auto rows_to_read = std::min(space_left, stream.numPendingRowsInCurrentGranule());
size_t current_space = space_left;
std::cerr << "(startReadingChain) rows_to_read: " << rows_to_read << "\n";
/// If reader can't read part of granule, we have to increase number of reading rows
/// to read complete granules and exceed max_rows a bit.
if (!merge_tree_reader->canReadIncompleteGranules())
current_space = stream.ceilRowsToCompleteGranules(space_left);
auto rows_to_read = std::min(current_space, stream.numPendingRowsInCurrentGranule());
bool last = rows_to_read == space_left;
result.addRows(stream.read(result.block, rows_to_read, !last));
result.addGranule(rows_to_read);
space_left -= rows_to_read;
space_left = (rows_to_read > space_left ? 0 : space_left - rows_to_read);
}
}

View File

@ -114,6 +114,7 @@ public:
void checkEnoughSpaceInCurrentGranule(size_t num_rows) const;
size_t readRows(Block & block, size_t num_rows);
void toNextMark();
size_t ceilRowsToCompleteGranules(size_t rows_num) const;
};
/// Statistics after next reading step.

View File

@ -26,6 +26,8 @@ public:
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res) override;
bool canReadIncompleteGranules() const override { return false; }
private:
ReadBuffer * data_buffer;
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;

View File

@ -28,6 +28,8 @@ public:
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res) override;
bool canReadIncompleteGranules() const override { return true; }
private:
using FileStreams = std::map<std::string, std::unique_ptr<MergeTreeReaderStream>>;