diff --git a/src/Storages/MergeTree/IMergeTreeReader.cpp b/src/Storages/MergeTree/IMergeTreeReader.cpp index 5744330ea43..8711664d531 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.cpp +++ b/src/Storages/MergeTree/IMergeTreeReader.cpp @@ -8,6 +8,7 @@ #include #include + namespace DB { @@ -42,103 +43,17 @@ IMergeTreeReader::IMergeTreeReader( /// to allow to use shared offset column from cache. , requested_columns(data_part_info_for_read->isWidePart() ? Nested::convertToSubcolumns(columns_) : columns_) , part_columns(data_part_info_for_read->isWidePart() ? Nested::collect(data_part_info_for_read->getColumns()) : data_part_info_for_read->getColumns()) - , last_read_end_offset(0) - , part_offset_column_index(-1) { columns_to_read.reserve(requested_columns.size()); serializations.reserve(requested_columns.size()); - ssize_t i = 0; for (const auto & column : requested_columns) { - if (column.name == "_part_offset") - { - assert(part_offset_column_index == -1); - part_offset_column_index = i; - } - else - { - columns_to_read.emplace_back(getColumnInPart(column)); - serializations.emplace_back(getSerializationInPart(column)); - } - ++i; + columns_to_read.emplace_back(getColumnInPart(column)); + serializations.emplace_back(getSerializationInPart(column)); } } - -size_t IMergeTreeReader::readRows(size_t from_mark, size_t current_task_last_mark, - bool continue_reading, size_t max_rows_to_read, Columns & res_columns) -{ -// Columns * physical_columns_to_fill = &res_columns; -// Columns tmp_columns; -// if (part_offset_column_index != -1) -// { -// tmp_columns.reserve(res_columns.size()-1); -// tmp_columns.assign(res_columns.begin(), res_columns.begin() + part_offset_column_index); -// tmp_columns.insert(tmp_columns.end(), res_columns.begin() + part_offset_column_index + 1, res_columns.end()); -// physical_columns_to_fill = &tmp_columns; -// } - - ColumnPtr part_offset_column_before; - if (part_offset_column_index != -1) - { - part_offset_column_before = res_columns[part_offset_column_index]; - res_columns.erase(res_columns.begin() + part_offset_column_index); - } - - size_t rows_read = readPhysicalRows(from_mark, current_task_last_mark, continue_reading, max_rows_to_read, res_columns/* *physical_columns_to_fill*/); - const size_t start_row = continue_reading ? last_read_end_offset : data_part_info_for_read->getIndexGranularity().getMarkStartingRow(from_mark); - -// std::cerr << "READ from " << start_row << " to " << end_row << "\n\n\n"; - - - if (part_offset_column_index != -1) - { - /// In case when all requested physical columns are not present in part rows_read will be zero. - /// But we still need to fill offset column with values. - if (rows_read == 0) - { - const size_t total_rows = data_part_info_for_read->getIndexGranularity().getTotalRows(); - rows_read = start_row + max_rows_to_read < total_rows ? max_rows_to_read : total_rows - start_row; - } - - const size_t end_row = start_row + rows_read; - - MutableColumnPtr part_offset_column = part_offset_column_before ? - part_offset_column_before->assumeMutable() - : ColumnUInt64::create()->getPtr(); - part_offset_column->reserve(part_offset_column->size() + rows_read); - - // TODO: fill the column with int values efficiently - for (size_t row_offset = start_row; row_offset < end_row; ++ row_offset) - part_offset_column->insert(row_offset); - - res_columns.insert(res_columns.begin() + part_offset_column_index, part_offset_column->getPtr()); - } - - last_read_end_offset = start_row + rows_read; - -// if (part_offset_column_index != -1) -// { -// MutableColumnPtr part_offset_column = res_columns[part_offset_column_index] ? -// res_columns[part_offset_column_index]->assumeMutable() -// : MutableColumnPtr(ColumnUInt64::create()); -// part_offset_column->reserve(part_offset_column->size() + rows_read); -// -// // TODO: fill the column with int values efficiently -// for (size_t row_offset = start_row; row_offset < end_row; ++ row_offset) -// part_offset_column->insert(row_offset); -// -// res_columns.clear(); -// res_columns.reserve(tmp_columns.size() + 1); -// res_columns.assign(tmp_columns.begin(), tmp_columns.begin() + part_offset_column_index); -// res_columns.push_back(part_offset_column->getPtr()); -// res_columns.insert(res_columns.end(), tmp_columns.begin() + part_offset_column_index, tmp_columns.end()); -// } - - return rows_read; -} - const IMergeTreeReader::ValueSizeMap & IMergeTreeReader::getAvgValueSizeHints() const { return avg_value_size_hints; @@ -346,7 +261,7 @@ IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const Na void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const { - if (num_columns_to_read != columns_to_read.size())//requested_columns.size()) + if (num_columns_to_read != requested_columns.size()) throw Exception("invalid number of columns passed to MergeTreeReader::readRows. " "Expected " + toString(requested_columns.size()) + ", " "got " + toString(num_columns_to_read), ErrorCodes::LOGICAL_ERROR); diff --git a/src/Storages/MergeTree/IMergeTreeReader.h b/src/Storages/MergeTree/IMergeTreeReader.h index 64ab953e50f..16db13692aa 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.h +++ b/src/Storages/MergeTree/IMergeTreeReader.h @@ -34,8 +34,8 @@ public: /// Return the number of rows has been read or zero if there is no columns to read. /// If continue_reading is true, continue reading from last state, otherwise seek to from_mark. /// current_task_last mark is needed for asynchronous reading (mainly from remote fs). - size_t readRows(size_t from_mark, size_t current_task_last_mark, - bool continue_reading, size_t max_rows_to_read, Columns & res_columns); + virtual size_t readRows(size_t from_mark, size_t current_task_last_mark, + bool continue_reading, size_t max_rows_to_read, Columns & res_columns) = 0; virtual bool canReadIncompleteGranules() const = 0; @@ -62,9 +62,6 @@ public: MergeTreeDataPartInfoForReaderPtr data_part_info_for_read; protected: - virtual size_t readPhysicalRows(size_t from_mark, size_t current_task_last_mark, - bool continue_reading, size_t max_rows_to_read, Columns & res_columns) = 0; - /// Returns actual column name in part, which can differ from table metadata. String getColumnNameInPart(const NameAndTypePair & required_column) const; @@ -108,11 +105,6 @@ private: /// Actual columns description in part. ColumnsDescription part_columns; - - /// Rows offset where the previous read ended - size_t last_read_end_offset; - - ssize_t part_offset_column_index; }; } diff --git a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp index 042aec54067..525d76d0f0f 100644 --- a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp @@ -292,7 +292,7 @@ MergeTreeReadTaskColumns getReadTaskColumns( /// Read system columns such as lightweight delete mask "_row_exists" if it is persisted in the part for (const auto & name : system_columns) { - if (data_part_info_for_reader.getColumns().contains(name) || name == "_part_offset") + if (data_part_info_for_reader.getColumns().contains(name)) column_names.push_back(name); } diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 627d6139836..65f54495b3c 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1261,7 +1261,6 @@ static void selectColumnNames( } else if (name == "_part_offset") { -// virt_column_names.push_back(name); } else if (name == LightweightDeleteDescription::FILTER_COLUMN.name) diff --git a/src/Storages/MergeTree/MergeTreeRangeReader.cpp b/src/Storages/MergeTree/MergeTreeRangeReader.cpp index 4490f1b8f4d..3cf0a7c343a 100644 --- a/src/Storages/MergeTree/MergeTreeRangeReader.cpp +++ b/src/Storages/MergeTree/MergeTreeRangeReader.cpp @@ -3,7 +3,6 @@ #include #include #include -#include "Core/NamesAndTypes.h" #include #include #include @@ -86,7 +85,6 @@ MergeTreeRangeReader::DelayedStream::DelayedStream( , merge_tree_reader(merge_tree_reader_) , index_granularity(&(merge_tree_reader->data_part_info_for_read->getIndexGranularity())) , continue_reading(false), is_finished(false) -// , last_read_end(0) // track the offset where the last read from merge tree ended { } @@ -102,13 +100,7 @@ size_t MergeTreeRangeReader::DelayedStream::readRows(Columns & columns, size_t n { size_t rows_read = merge_tree_reader->readRows( current_mark, current_task_last_mark, continue_reading, num_rows, columns); - -//const size_t start_row = continue_reading ? last_read_end : index_granularity->getMarkStartingRow(current_mark); -//const size_t end_row = start_row + rows_read; -//std::cerr << "READ from " << start_row << " to " << end_row << "\n\n\n"; - continue_reading = true; -// last_read_end = end_row; /// Zero rows_read maybe either because reading has finished /// or because there is no columns we can read in current part (for example, all columns are default). @@ -736,30 +728,37 @@ MergeTreeRangeReader::MergeTreeRangeReader( , is_initialized(true) { if (prev_reader) - sample_block = prev_reader->getSampleBlock(); + result_sample_block = prev_reader->getSampleBlock(); for (const auto & name_and_type : merge_tree_reader->getColumns()) - sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name}); + { + read_sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name}); + result_sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name}); + } for (const auto & column_name : non_const_virtual_column_names_) { - if (sample_block.has(column_name)) + if (result_sample_block.has(column_name)) continue; non_const_virtual_column_names.push_back(column_name); -// if (column_name == "_part_offset") -// sample_block.insert(ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared(), column_name)); + if (column_name == "_part_offset" && !prev_reader) + { + /// _part_offset column is filled by the first reader. + read_sample_block.insert(ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared(), column_name)); + result_sample_block.insert(ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared(), column_name)); + } } if (prewhere_info) { const auto & step = *prewhere_info; if (step.actions) - step.actions->execute(sample_block, true); + step.actions->execute(result_sample_block, true); if (step.remove_column) - sample_block.erase(step.column_name); + result_sample_block.erase(step.column_name); } } @@ -846,7 +845,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar SCOPE_EXIT({ LOG_TEST(log, "read() returned {}, sample block {}", - read_result.dumpInfo(), this->getSampleBlock().dumpNames()); + read_result.dumpInfo(), this->result_sample_block.dumpNames()); }); if (prev_reader) @@ -925,16 +924,23 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar return read_result; { + /// Physical columns go first and then some virtual columns follow + size_t physical_columns_count = merge_tree_reader->getColumns().size(); + Columns physical_columns(read_result.columns.begin(), read_result.columns.begin() + physical_columns_count); + bool should_evaluate_missing_defaults; - merge_tree_reader->fillMissingColumns(read_result.columns, should_evaluate_missing_defaults, - read_result.num_rows); + merge_tree_reader->fillMissingColumns(physical_columns, should_evaluate_missing_defaults, + read_result.num_rows); /// If some columns absent in part, then evaluate default values if (should_evaluate_missing_defaults) - merge_tree_reader->evaluateMissingDefaults({}, read_result.columns); + merge_tree_reader->evaluateMissingDefaults({}, physical_columns); /// If result not empty, then apply on-fly alter conversions if any required - merge_tree_reader->performRequiredConversions(read_result.columns); + merge_tree_reader->performRequiredConversions(physical_columns); + + for (size_t i = 0; i < physical_columns.size(); ++i) + read_result.columns[i] = std::move(physical_columns[i]); } size_t total_bytes = 0; @@ -963,13 +969,13 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t /// The stream could be unfinished by the previous read request because of max_rows limit. /// In this case it will have some rows from the previously started range. We need to save their begin and /// end offsets to properly fill _part_offset column. -// UInt64 leading_begin_part_offset = 0; -// UInt64 leading_end_part_offset = 0; -// if (!stream.isFinished()) -// { -// leading_begin_part_offset = stream.currentPartOffset(); -// leading_end_part_offset = stream.lastPartOffset(); -// } + UInt64 leading_begin_part_offset = 0; + UInt64 leading_end_part_offset = 0; + if (!stream.isFinished()) + { + leading_begin_part_offset = stream.currentPartOffset(); + leading_end_part_offset = stream.lastPartOffset(); + } /// Stream is lazy. result.num_added_rows is the number of rows added to block which is not equal to /// result.num_rows_read until call to stream.finalize(). Also result.num_added_rows may be less than @@ -1008,15 +1014,12 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t if (!result.rows_per_granule.empty()) result.adjustLastGranule(); -// for (const auto & column_name : non_const_virtual_column_names) -// { -// if (column_name == "_part_offset") -// fillPartOffsetColumn(result, leading_begin_part_offset, leading_end_part_offset); -// } + if (read_sample_block.has("_part_offset")) + fillPartOffsetColumn(result, leading_begin_part_offset, leading_end_part_offset); return result; } -#if 0 + void MergeTreeRangeReader::fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset) { size_t num_rows = result.numReadRows(); @@ -1042,9 +1045,7 @@ void MergeTreeRangeReader::fillPartOffsetColumn(ReadResult & result, UInt64 lead } result.columns.emplace_back(std::move(column)); - result.extra_columns_filled.push_back("_part_offset"); } -#endif Columns MergeTreeRangeReader::continueReadingChain(const ReadResult & result, size_t & num_rows) { @@ -1167,11 +1168,11 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r if (!prewhere_info) return; - const auto & header = merge_tree_reader->getColumns(); - size_t num_columns = header.size(); + const auto & header = read_sample_block; + size_t num_columns = header.columns(); /// Check that we have columns from previous steps and newly read required columns - if (result.columns.size() < num_columns + result.extra_columns_filled.size()) + if (result.columns.size() < num_columns) throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid number of columns passed to MergeTreeRangeReader. Expected {}, got {}", num_columns, result.columns.size()); @@ -1197,15 +1198,6 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r for (auto name_and_type = header.begin(); name_and_type != header.end() && pos < result.columns.size(); ++pos, ++name_and_type) block.insert({result.columns[pos], name_and_type->type, name_and_type->name}); - - /*// HACK!! fix it - if (getSampleBlock().has("_part_offset")) - { - const auto & col = getSampleBlock().getByName("_part_offset"); - block.insert({result.columns.back(), col.type, col.name}); - } -/////////////*/ - { /// Columns might be projected out. We need to store them here so that default columns can be evaluated later. Block block_before_prewhere = block; diff --git a/src/Storages/MergeTree/MergeTreeRangeReader.h b/src/Storages/MergeTree/MergeTreeRangeReader.h index b6d7dec5b89..1fc66feb9f5 100644 --- a/src/Storages/MergeTree/MergeTreeRangeReader.h +++ b/src/Storages/MergeTree/MergeTreeRangeReader.h @@ -137,8 +137,6 @@ private: bool continue_reading = false; bool is_finished = true; -// size_t last_read_end; - /// Current position from the begging of file in rows size_t position() const; size_t readRows(Columns & columns, size_t num_rows); @@ -283,20 +281,18 @@ public: size_t countZeroTails(const IColumn::Filter & filter, NumRows & zero_tails, bool can_read_incomplete_granules) const; static size_t numZerosInTail(const UInt8 * begin, const UInt8 * end); - Names extra_columns_filled; - Poco::Logger * log; }; ReadResult read(size_t max_rows, MarkRanges & ranges); - const Block & getSampleBlock() const { return sample_block; } + const Block & getSampleBlock() const { return result_sample_block; } private: ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges); Columns continueReadingChain(const ReadResult & result, size_t & num_rows); void executePrewhereActionsAndFilterColumns(ReadResult & result) const; -// void fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset); + void fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset); IMergeTreeReader * merge_tree_reader = nullptr; const MergeTreeIndexGranularity * index_granularity = nullptr; @@ -305,7 +301,8 @@ private: Stream stream; - Block sample_block; + Block read_sample_block; /// Block with columns that are actually read from disk + non-const virtual columns that are filled at this step. + Block result_sample_block; /// Block with columns that are returned by this step. bool last_reader_in_chain = false; bool is_initialized = false; diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index 5cd13f891ad..b0488d29f8e 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -151,7 +151,7 @@ void MergeTreeReaderCompact::fillColumnPositions() } } -size_t MergeTreeReaderCompact::readPhysicalRows( +size_t MergeTreeReaderCompact::readRows( size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) { if (continue_reading) diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.h b/src/Storages/MergeTree/MergeTreeReaderCompact.h index e086abfdb78..ee099755a8e 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.h +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.h @@ -31,12 +31,11 @@ public: const ReadBufferFromFileBase::ProfileCallback & profile_callback_ = {}, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE); -protected: /// Return the number of rows has been read or zero if there is no columns to read. /// If continue_reading is true, continue reading from last state, otherwise seek to from_mark - size_t readPhysicalRows(size_t from_mark, size_t current_task_last_mark, + size_t readRows(size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override; -public: + bool canReadIncompleteGranules() const override { return false; } private: diff --git a/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp b/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp index 9e50763f3f5..3b3a6b95cff 100644 --- a/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp @@ -49,7 +49,7 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory( } } -size_t MergeTreeReaderInMemory::readPhysicalRows( +size_t MergeTreeReaderInMemory::readRows( size_t from_mark, size_t /* current_task_last_mark */, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) { if (!continue_reading) diff --git a/src/Storages/MergeTree/MergeTreeReaderInMemory.h b/src/Storages/MergeTree/MergeTreeReaderInMemory.h index 3e1a936044e..cb67bc46eae 100644 --- a/src/Storages/MergeTree/MergeTreeReaderInMemory.h +++ b/src/Storages/MergeTree/MergeTreeReaderInMemory.h @@ -22,12 +22,11 @@ public: MarkRanges mark_ranges_, MergeTreeReaderSettings settings_); -protected: /// Return the number of rows has been read or zero if there is no columns to read. /// If continue_reading is true, continue reading from last state, otherwise seek to from_mark - size_t readPhysicalRows(size_t from_mark, size_t current_task_last_mark, + size_t readRows(size_t from_mark, size_t current_tasl_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override; -public: + bool canReadIncompleteGranules() const override { return true; } private: diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.cpp b/src/Storages/MergeTree/MergeTreeReaderWide.cpp index 0db14127e24..ea367a9502e 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderWide.cpp @@ -58,7 +58,7 @@ MergeTreeReaderWide::MergeTreeReaderWide( } } -size_t MergeTreeReaderWide::readPhysicalRows( +size_t MergeTreeReaderWide::readRows( size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) { size_t read_rows = 0; diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.h b/src/Storages/MergeTree/MergeTreeReaderWide.h index 8aaccdddb94..dbfc0310242 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.h +++ b/src/Storages/MergeTree/MergeTreeReaderWide.h @@ -26,12 +26,11 @@ public: const ReadBufferFromFileBase::ProfileCallback & profile_callback_ = {}, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE); -protected: /// Return the number of rows has been read or zero if there is no columns to read. /// If continue_reading is true, continue reading from last state, otherwise seek to from_mark - size_t readPhysicalRows(size_t from_mark, size_t current_task_last_mark, + size_t readRows(size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override; -public: + bool canReadIncompleteGranules() const override { return true; } using FileStreams = std::map>; diff --git a/src/Storages/StorageSnapshot.cpp b/src/Storages/StorageSnapshot.cpp index 2faf17828cf..48851f0974d 100644 --- a/src/Storages/StorageSnapshot.cpp +++ b/src/Storages/StorageSnapshot.cpp @@ -19,11 +19,7 @@ namespace ErrorCodes void StorageSnapshot::init() { for (const auto & [name, type] : storage.getVirtuals()) - { virtual_columns[name] = type; - if (name == "_part_offset") // TODO: properly make _part_offset a system column - system_columns[name] = type; - } if (storage.hasLightweightDeletedMask()) system_columns[LightweightDeleteDescription::FILTER_COLUMN.name] = LightweightDeleteDescription::FILTER_COLUMN.type;