diff --git a/dbms/src/IO/CachedCompressedReadBuffer.cpp b/dbms/src/IO/CachedCompressedReadBuffer.cpp index bbbf82267a5..f65780fa7f6 100644 --- a/dbms/src/IO/CachedCompressedReadBuffer.cpp +++ b/dbms/src/IO/CachedCompressedReadBuffer.cpp @@ -103,11 +103,4 @@ void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t o } } - -void CachedCompressedReadBuffer::position(size_t & offset_in_compressed_file, size_t & offset_in_decompressed_block) const -{ - offset_in_compressed_file = file_pos - owned_cell->compressed_size; - offset_in_decompressed_block = pos - working_buffer.begin(); -} - } diff --git a/dbms/src/IO/CachedCompressedReadBuffer.h b/dbms/src/IO/CachedCompressedReadBuffer.h index eb8e43713a8..11c72a569cd 100644 --- a/dbms/src/IO/CachedCompressedReadBuffer.h +++ b/dbms/src/IO/CachedCompressedReadBuffer.h @@ -47,7 +47,6 @@ public: void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block); - void position(size_t & offset_in_compressed_file, size_t & offset_in_decompressed_block) const; void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE) { diff --git a/dbms/src/Storages/MergeTree/MergeTreeBaseBlockInputStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeBaseBlockInputStream.cpp index b29f253baf1..b715d4ab024 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBaseBlockInputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBaseBlockInputStream.cpp @@ -70,12 +70,29 @@ Block MergeTreeBaseBlockInputStream::readFromPart() if (task->size_predictor) task->size_predictor->startBlock(); + const auto preferred_block_size_bytes = this->preferred_block_size_bytes; + + // read rows from reader and clean columns + auto skipRows = [& preferred_block_size_bytes](Block & block, MergeTreeRangeReader & reader, MergeTreeReadTask & task, size_t rows) { + size_t recommended_rows = std::max(1, task.size_predictor->estimateNumRows(preferred_block_size_bytes)); + while (rows) + { + size_t rows_to_skip = std::min(rows, recommended_rows); + rows -= rows_to_skip; + reader.read(block, rows_to_skip); + for (const auto i : ext::range(0, block.columns())) + { + auto & col = block.safeGetByPosition(i); + if (task.column_name_set.count(col.name)) + col.column = col.column->cloneEmpty(); + } + } + }; + if (prewhere_actions) { - // LOG_TRACE(log, "readFromPart prewhere"); do { - // LOG_TRACE(log, "readFromPart prewhe do"); /// Let's read the full block of columns needed to calculate the expression in PREWHERE. size_t space_left = std::max(1LU, max_block_size_rows); MarkRanges ranges_to_read; @@ -92,35 +109,36 @@ Block MergeTreeBaseBlockInputStream::readFromPart() space_left = std::min(space_left, std::max(1LU, recommended_rows)); } - // LOG_TRACE(log, "readFromPart prewhere key read " << space_left << "rows" ); - - std::experimental::optional range_reader; + //LOG_TRACE(log, "******************* prewhere"); + std::experimental::optional pre_range_reader; if (task->current_range_reader) { - range_reader = task->current_range_reader->copyForReader(*pre_reader); + pre_range_reader = task->current_range_reader->copyForReader(*pre_reader); + if (task->unread_rows_in_current_granule) + { + // LOG_TRACE(log, "skipping " << task->unread_rows_in_current_granule << " rows"); + pre_range_reader = pre_range_reader->skipRows(task->unread_rows_in_current_granule); + pre_range_reader->disableNextSeek(); + } rows_was_read.push_back(0); } - while ((range_reader || !task->mark_ranges.empty()) && space_left && !isCancelled()) + while ((pre_range_reader || !task->mark_ranges.empty()) && space_left && !isCancelled()) { - if (!range_reader) + if (!pre_range_reader) { auto & range = task->mark_ranges.back(); - range_reader = pre_reader->readRange(range.begin, range.end); + pre_range_reader = pre_reader->readRange(range.begin, range.end); ranges_to_read.push_back(range); rows_was_read.push_back(0); task->mark_ranges.pop_back(); } - size_t rows_to_read = std::min(range_reader->unreadRows(), space_left); - size_t read_rows = range_reader->read(res, rows_to_read); + size_t rows_to_read = std::min(pre_range_reader->unreadRows(), space_left); + size_t read_rows = pre_range_reader->read(res, rows_to_read); rows_was_read.back() += read_rows; - if (range_reader->isReadingFinished()) - { - if (range_reader->unreadRows()) - LOG_TRACE(log, "??????????????? was read " << read_rows << " expected " << rows_to_read); - range_reader = std::experimental::nullopt; - } + if (pre_range_reader->isReadingFinished()) + pre_range_reader = std::experimental::nullopt; space_left -= read_rows; } @@ -128,13 +146,11 @@ Block MergeTreeBaseBlockInputStream::readFromPart() /// In case of isCancelled. if (!res) { - if (!range_reader) + if (!pre_range_reader) task->current_range_reader = std::experimental::nullopt; return res; } - // LOG_TRACE(log, "readFromPart prewhe actions " ); - progressImpl({ res.rows(), res.bytes() }); pre_reader->fillMissingColumns(res, task->ordered_names, task->should_reorder); @@ -161,15 +177,14 @@ Block MergeTreeBaseBlockInputStream::readFromPart() */ if (const auto column_const = typeid_cast(observed_column.get())) { - // LOG_TRACE(log, "readFromPart prewhe column_const " ); if (!column_const->getData()) { - if (range_reader) + if (pre_range_reader) { /// have to read rows from last partly read granula auto & range = ranges_to_read.back(); task->current_range_reader = reader->readRange(range.begin, range.end); - task->current_range_reader->read(res, rows_was_read.back()); + task->unread_rows_in_current_granule = rows_was_read.back(); } else task->current_range_reader = std::experimental::nullopt; @@ -182,6 +197,11 @@ Block MergeTreeBaseBlockInputStream::readFromPart() if (task->current_range_reader) { + if (task->unread_rows_in_current_granule) + { + skipRows(res, *task->current_range_reader, *task, task->unread_rows_in_current_granule); + task->current_range_reader->read(res, task->unread_rows_in_current_granule); + } task->current_range_reader->read(res, rows_was_read[rows_was_read_idx++]); } @@ -191,16 +211,19 @@ Block MergeTreeBaseBlockInputStream::readFromPart() task->current_range_reader->read(res, rows_was_read[rows_was_read_idx++]); } - if (!range_reader) + if (!pre_range_reader) task->current_range_reader = std::experimental::nullopt; + task->unread_rows_in_current_granule = 0; progressImpl({ 0, res.bytes() - pre_bytes }); } else if (const auto column_vec = typeid_cast(observed_column.get())) { - - // LOG_TRACE(log, "readFromPart prewhe column_vec " ); + //LOG_TRACE(log, ">>>>>>>>>>>>>>>>>>>>>>>>>>>>> where"); const auto & pre_filter = column_vec->getData(); + auto & additional_rows_to_read = task->unread_rows_in_current_granule; + if (!task->current_range_reader) + additional_rows_to_read = 0; IColumn::Filter post_filter(pre_filter.size()); /// Let's read the rest of the columns in the required segments and compose our own filter for them. @@ -225,17 +248,17 @@ Block MergeTreeBaseBlockInputStream::readFromPart() while (current_range_rows_read + (pre_filter_pos - pre_filter_begin_pos) < rows_was_read[rows_was_read_idx]) { auto rows_should_be_copied = pre_filter_pos - pre_filter_begin_pos; - auto range_reader_with_skipped_rows = range_reader.skipRows(rows_should_be_copied); - auto unread_rows_in_current_part = range_reader_with_skipped_rows.unreadRowsInCurrentPart(); + auto range_reader_with_skipped_rows = range_reader.skipRows(additional_rows_to_read + rows_should_be_copied); + auto unread_rows_in_current_granule = range_reader_with_skipped_rows.unreadRowsInCurrentGranule(); + + const size_t limit = std::min(pre_filter.size(), pre_filter_pos + unread_rows_in_current_granule); + bool will_read_until_mark = unread_rows_in_current_granule == limit - pre_filter_pos; - const size_t limit = std::min(pre_filter.size(), pre_filter_pos + unread_rows_in_current_part); UInt8 nonzero = 0; for (size_t row = pre_filter_pos; row < limit; ++row) nonzero |= pre_filter[row]; - bool will_read_until_mark = unread_rows_in_current_part == limit - pre_filter_pos; - /// can't skip empty rows if won't read until mark - if (!nonzero && will_read_until_mark) + if (!nonzero) { if (pre_filter_pos != pre_filter_begin_pos) { @@ -243,11 +266,31 @@ Block MergeTreeBaseBlockInputStream::readFromPart() memcpy(&post_filter[post_filter_pos], &pre_filter[pre_filter_begin_pos], rows); post_filter_pos += rows; current_range_rows_read += rows; + if (additional_rows_to_read) + { + skipRows(res, range_reader, *task, additional_rows_to_read); + // LOG_TRACE(log, "additional " << additional_rows_to_read); + } range_reader.read(res, rows); + additional_rows_to_read = 0; + } + + if (will_read_until_mark) + { + //if (additional_rows_to_read) + // LOG_TRACE(log, "additional skiped " << additional_rows_to_read); + current_range_rows_read += range_reader.skipToNextMark() - additional_rows_to_read; + additional_rows_to_read = 0; + } + else + { + //LOG_TRACE(log, "additional " << limit - pre_filter_pos << " rows to " << additional_rows_to_read << ", current_range_rows_read = " << current_range_rows_read); + //LOG_TRACE(log, rows_was_read_idx << ' ' << rows_was_read.size() << ' ' << pre_filter_begin_pos << ' ' << pre_filter_pos << ' ' << limit); + additional_rows_to_read += limit - pre_filter_pos; + current_range_rows_read += limit - pre_filter_pos; } pre_filter_begin_pos = pre_filter_pos = limit; - current_range_rows_read += range_reader.skipToNextMark(); } else pre_filter_pos = limit; @@ -259,7 +302,14 @@ Block MergeTreeBaseBlockInputStream::readFromPart() memcpy(&post_filter[post_filter_pos], &pre_filter[pre_filter_begin_pos], rows); post_filter_pos += rows; current_range_rows_read += rows; + if (additional_rows_to_read) + { + skipRows(res, range_reader, *task, additional_rows_to_read); + //LOG_TRACE(log, "additional " << additional_rows_to_read); + } range_reader.read(res, rows); + + additional_rows_to_read = 0; } if (rows_was_read_idx + 1 < rows_was_read.size()) @@ -267,11 +317,9 @@ Block MergeTreeBaseBlockInputStream::readFromPart() ++rows_was_read_idx; } - if (!range_reader) + if (!pre_range_reader) task->current_range_reader = std::experimental::nullopt; - // LOG_TRACE(log, "readFromPart prewhe column_vec " << post_filter_pos << "rows" ); - if (!post_filter_pos) { res.clear(); @@ -333,7 +381,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart() { size_t recommended_rows = task->size_predictor->estimateNumRows(preferred_block_size_bytes); - /// TODO: stop reading if recommended_rows small enough + /// TODO: stop reading if recommended_rows small enough, same in prewhere if (res && recommended_rows < 1) break; rows_to_read = std::min(rows_to_read, std::max(1LU, recommended_rows)); diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.h b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.h index 40fe2d2bd8c..d48c61b1392 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.h +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.h @@ -48,6 +48,9 @@ struct MergeTreeReadTask MergeTreeBlockSizePredictorPtr size_predictor; /// used to save current range processing status std::experimental::optional current_range_reader; + /// the number of rows wasn't read by range_reader if condition in prewhere was false + /// halps to skip graunule if all conditions will be aslo false + std::size_t unread_rows_in_current_granule; bool isFinished() const { return mark_ranges.empty() && !current_range_reader; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp index 1d2ad3d2e2f..28917fb4901 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp @@ -13,14 +13,14 @@ MergeTreeRangeReader::MergeTreeRangeReader( size_t MergeTreeRangeReader::skipToNextMark() { - auto unread_rows_in_current_part = unreadRowsInCurrentPart(); + auto unread_rows_in_current_part = unreadRowsInCurrentGranule(); seek_to_from_mark = true; ++current_mark; read_rows_after_current_mark = 0; return unread_rows_in_current_part; } -const MergeTreeRangeReader MergeTreeRangeReader::skipRows(size_t rows) const +MergeTreeRangeReader MergeTreeRangeReader::skipRows(size_t rows) const { MergeTreeRangeReader copy = *this; copy.read_rows_after_current_mark += rows; @@ -38,6 +38,7 @@ size_t MergeTreeRangeReader::read(Block & res, size_t max_rows_to_read) return false; auto read_rows = merge_tree_reader.get().readRange(current_mark, seek_to_from_mark, rows_to_read, res); + //LOG_TRACE(logger, "to read: " << rows_to_read << " read: " << read_rows << " max: " << max_rows_to_read); /// if no columns to read, consider all rows was read if (!read_rows) read_rows = rows_to_read; diff --git a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h index 36b72f48927..6abfa58d2c9 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h +++ b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h @@ -14,15 +14,19 @@ public: size_t unreadRows() const { return (last_mark - current_mark) * index_granularity - read_rows_after_current_mark; } - size_t unreadRowsInCurrentPart() const { + size_t unreadRowsInCurrentGranule() const { return index_granularity - read_rows_after_current_mark; } + size_t readRowsInCurrentGranule() const { return read_rows_after_current_mark; } + size_t skipToNextMark(); - const MergeTreeRangeReader skipRows(size_t rows) const; + MergeTreeRangeReader skipRows(size_t rows) const; size_t read(Block & res, size_t max_rows_to_read); bool isReadingFinished() const { return is_reading_finished; } + void disableNextSeek() { seek_to_from_mark = false; } + ~MergeTreeRangeReader() { //if (last_mark != current_mark) // LOG_ERROR(logger, "last_mark = " << last_mark << " current_mark = " << current_mark << " read_rows_after_current_mark = " << read_rows_after_current_mark); diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.h b/dbms/src/Storages/MergeTree/MergeTreeReader.h index 5708ec054d2..983c344f77d 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.h +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.h @@ -41,8 +41,6 @@ public: /// If columns are not present in the block, adds them. If they are present - appends the values that have been read. /// Do not adds columns, if the files are not present for them (to add them, call fillMissingColumns). /// Block should contain either no columns from the columns field, or all columns for which files are present. - size_t readRange(size_t from_mark, size_t to_mark, Block & res) - { return readRange(from_mark, true, (to_mark - from_mark) * storage.index_granularity, res); } MergeTreeRangeReader readRange(size_t from_mark, size_t to_mark); /// Add columns from ordered_names that are not present in the block. @@ -105,7 +103,6 @@ private: MergeTreeData::DataPartPtr data_part; FileStreams streams; - // size_t cur_mark_idx = 0; /// Mark index corresponding to the current position for all streams. /// Columns that are read. NamesAndTypesList columns; @@ -131,6 +128,7 @@ private: void fillMissingColumnsImpl(Block & res, const Names & ordered_names, bool always_reorder); + /// return the number of rows has been read or zero if ther is no columns to read size_t readRange(size_t from_mark, bool seek_to_from_mark, size_t max_rows_to_read, Block & res); friend class MergeTreeRangeReader;