Restored old logic for filling _part_offset

This commit is contained in:
Alexander Gololobov 2022-12-23 15:21:57 +01:00
parent 4cebc6f3a4
commit ada6422985
13 changed files with 59 additions and 171 deletions

View File

@ -8,6 +8,7 @@
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Common/typeid_cast.h>
namespace DB
{
@ -42,103 +43,17 @@ IMergeTreeReader::IMergeTreeReader(
/// to allow to use shared offset column from cache.
, requested_columns(data_part_info_for_read->isWidePart() ? Nested::convertToSubcolumns(columns_) : columns_)
, part_columns(data_part_info_for_read->isWidePart() ? Nested::collect(data_part_info_for_read->getColumns()) : data_part_info_for_read->getColumns())
, last_read_end_offset(0)
, part_offset_column_index(-1)
{
columns_to_read.reserve(requested_columns.size());
serializations.reserve(requested_columns.size());
ssize_t i = 0;
for (const auto & column : requested_columns)
{
if (column.name == "_part_offset")
{
assert(part_offset_column_index == -1);
part_offset_column_index = i;
}
else
{
columns_to_read.emplace_back(getColumnInPart(column));
serializations.emplace_back(getSerializationInPart(column));
}
++i;
columns_to_read.emplace_back(getColumnInPart(column));
serializations.emplace_back(getSerializationInPart(column));
}
}
size_t IMergeTreeReader::readRows(size_t from_mark, size_t current_task_last_mark,
bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{
// Columns * physical_columns_to_fill = &res_columns;
// Columns tmp_columns;
// if (part_offset_column_index != -1)
// {
// tmp_columns.reserve(res_columns.size()-1);
// tmp_columns.assign(res_columns.begin(), res_columns.begin() + part_offset_column_index);
// tmp_columns.insert(tmp_columns.end(), res_columns.begin() + part_offset_column_index + 1, res_columns.end());
// physical_columns_to_fill = &tmp_columns;
// }
ColumnPtr part_offset_column_before;
if (part_offset_column_index != -1)
{
part_offset_column_before = res_columns[part_offset_column_index];
res_columns.erase(res_columns.begin() + part_offset_column_index);
}
size_t rows_read = readPhysicalRows(from_mark, current_task_last_mark, continue_reading, max_rows_to_read, res_columns/* *physical_columns_to_fill*/);
const size_t start_row = continue_reading ? last_read_end_offset : data_part_info_for_read->getIndexGranularity().getMarkStartingRow(from_mark);
// std::cerr << "READ from " << start_row << " to " << end_row << "\n\n\n";
if (part_offset_column_index != -1)
{
/// In case when all requested physical columns are not present in part rows_read will be zero.
/// But we still need to fill offset column with values.
if (rows_read == 0)
{
const size_t total_rows = data_part_info_for_read->getIndexGranularity().getTotalRows();
rows_read = start_row + max_rows_to_read < total_rows ? max_rows_to_read : total_rows - start_row;
}
const size_t end_row = start_row + rows_read;
MutableColumnPtr part_offset_column = part_offset_column_before ?
part_offset_column_before->assumeMutable()
: ColumnUInt64::create()->getPtr();
part_offset_column->reserve(part_offset_column->size() + rows_read);
// TODO: fill the column with int values efficiently
for (size_t row_offset = start_row; row_offset < end_row; ++ row_offset)
part_offset_column->insert(row_offset);
res_columns.insert(res_columns.begin() + part_offset_column_index, part_offset_column->getPtr());
}
last_read_end_offset = start_row + rows_read;
// if (part_offset_column_index != -1)
// {
// MutableColumnPtr part_offset_column = res_columns[part_offset_column_index] ?
// res_columns[part_offset_column_index]->assumeMutable()
// : MutableColumnPtr(ColumnUInt64::create());
// part_offset_column->reserve(part_offset_column->size() + rows_read);
//
// // TODO: fill the column with int values efficiently
// for (size_t row_offset = start_row; row_offset < end_row; ++ row_offset)
// part_offset_column->insert(row_offset);
//
// res_columns.clear();
// res_columns.reserve(tmp_columns.size() + 1);
// res_columns.assign(tmp_columns.begin(), tmp_columns.begin() + part_offset_column_index);
// res_columns.push_back(part_offset_column->getPtr());
// res_columns.insert(res_columns.end(), tmp_columns.begin() + part_offset_column_index, tmp_columns.end());
// }
return rows_read;
}
const IMergeTreeReader::ValueSizeMap & IMergeTreeReader::getAvgValueSizeHints() const
{
return avg_value_size_hints;
@ -346,7 +261,7 @@ IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const Na
void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const
{
if (num_columns_to_read != columns_to_read.size())//requested_columns.size())
if (num_columns_to_read != requested_columns.size())
throw Exception("invalid number of columns passed to MergeTreeReader::readRows. "
"Expected " + toString(requested_columns.size()) + ", "
"got " + toString(num_columns_to_read), ErrorCodes::LOGICAL_ERROR);

View File

@ -34,8 +34,8 @@ public:
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark.
/// current_task_last mark is needed for asynchronous reading (mainly from remote fs).
size_t readRows(size_t from_mark, size_t current_task_last_mark,
bool continue_reading, size_t max_rows_to_read, Columns & res_columns);
virtual size_t readRows(size_t from_mark, size_t current_task_last_mark,
bool continue_reading, size_t max_rows_to_read, Columns & res_columns) = 0;
virtual bool canReadIncompleteGranules() const = 0;
@ -62,9 +62,6 @@ public:
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read;
protected:
virtual size_t readPhysicalRows(size_t from_mark, size_t current_task_last_mark,
bool continue_reading, size_t max_rows_to_read, Columns & res_columns) = 0;
/// Returns actual column name in part, which can differ from table metadata.
String getColumnNameInPart(const NameAndTypePair & required_column) const;
@ -108,11 +105,6 @@ private:
/// Actual columns description in part.
ColumnsDescription part_columns;
/// Rows offset where the previous read ended
size_t last_read_end_offset;
ssize_t part_offset_column_index;
};
}

View File

@ -292,7 +292,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
/// Read system columns such as lightweight delete mask "_row_exists" if it is persisted in the part
for (const auto & name : system_columns)
{
if (data_part_info_for_reader.getColumns().contains(name) || name == "_part_offset")
if (data_part_info_for_reader.getColumns().contains(name))
column_names.push_back(name);
}

View File

@ -1261,7 +1261,6 @@ static void selectColumnNames(
}
else if (name == "_part_offset")
{
//
virt_column_names.push_back(name);
}
else if (name == LightweightDeleteDescription::FILTER_COLUMN.name)

View File

@ -3,7 +3,6 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsCommon.h>
#include <Common/TargetSpecific.h>
#include "Core/NamesAndTypes.h"
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <base/range.h>
@ -86,7 +85,6 @@ MergeTreeRangeReader::DelayedStream::DelayedStream(
, merge_tree_reader(merge_tree_reader_)
, index_granularity(&(merge_tree_reader->data_part_info_for_read->getIndexGranularity()))
, continue_reading(false), is_finished(false)
// , last_read_end(0) // track the offset where the last read from merge tree ended
{
}
@ -102,13 +100,7 @@ size_t MergeTreeRangeReader::DelayedStream::readRows(Columns & columns, size_t n
{
size_t rows_read = merge_tree_reader->readRows(
current_mark, current_task_last_mark, continue_reading, num_rows, columns);
//const size_t start_row = continue_reading ? last_read_end : index_granularity->getMarkStartingRow(current_mark);
//const size_t end_row = start_row + rows_read;
//std::cerr << "READ from " << start_row << " to " << end_row << "\n\n\n";
continue_reading = true;
// last_read_end = end_row;
/// Zero rows_read maybe either because reading has finished
/// or because there is no columns we can read in current part (for example, all columns are default).
@ -736,30 +728,37 @@ MergeTreeRangeReader::MergeTreeRangeReader(
, is_initialized(true)
{
if (prev_reader)
sample_block = prev_reader->getSampleBlock();
result_sample_block = prev_reader->getSampleBlock();
for (const auto & name_and_type : merge_tree_reader->getColumns())
sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name});
{
read_sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name});
result_sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name});
}
for (const auto & column_name : non_const_virtual_column_names_)
{
if (sample_block.has(column_name))
if (result_sample_block.has(column_name))
continue;
non_const_virtual_column_names.push_back(column_name);
// if (column_name == "_part_offset")
// sample_block.insert(ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), column_name));
if (column_name == "_part_offset" && !prev_reader)
{
/// _part_offset column is filled by the first reader.
read_sample_block.insert(ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), column_name));
result_sample_block.insert(ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), column_name));
}
}
if (prewhere_info)
{
const auto & step = *prewhere_info;
if (step.actions)
step.actions->execute(sample_block, true);
step.actions->execute(result_sample_block, true);
if (step.remove_column)
sample_block.erase(step.column_name);
result_sample_block.erase(step.column_name);
}
}
@ -846,7 +845,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
SCOPE_EXIT({
LOG_TEST(log, "read() returned {}, sample block {}",
read_result.dumpInfo(), this->getSampleBlock().dumpNames());
read_result.dumpInfo(), this->result_sample_block.dumpNames());
});
if (prev_reader)
@ -925,16 +924,23 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
return read_result;
{
/// Physical columns go first and then some virtual columns follow
size_t physical_columns_count = merge_tree_reader->getColumns().size();
Columns physical_columns(read_result.columns.begin(), read_result.columns.begin() + physical_columns_count);
bool should_evaluate_missing_defaults;
merge_tree_reader->fillMissingColumns(read_result.columns, should_evaluate_missing_defaults,
read_result.num_rows);
merge_tree_reader->fillMissingColumns(physical_columns, should_evaluate_missing_defaults,
read_result.num_rows);
/// If some columns absent in part, then evaluate default values
if (should_evaluate_missing_defaults)
merge_tree_reader->evaluateMissingDefaults({}, read_result.columns);
merge_tree_reader->evaluateMissingDefaults({}, physical_columns);
/// If result not empty, then apply on-fly alter conversions if any required
merge_tree_reader->performRequiredConversions(read_result.columns);
merge_tree_reader->performRequiredConversions(physical_columns);
for (size_t i = 0; i < physical_columns.size(); ++i)
read_result.columns[i] = std::move(physical_columns[i]);
}
size_t total_bytes = 0;
@ -963,13 +969,13 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t
/// The stream could be unfinished by the previous read request because of max_rows limit.
/// In this case it will have some rows from the previously started range. We need to save their begin and
/// end offsets to properly fill _part_offset column.
// UInt64 leading_begin_part_offset = 0;
// UInt64 leading_end_part_offset = 0;
// if (!stream.isFinished())
// {
// leading_begin_part_offset = stream.currentPartOffset();
// leading_end_part_offset = stream.lastPartOffset();
// }
UInt64 leading_begin_part_offset = 0;
UInt64 leading_end_part_offset = 0;
if (!stream.isFinished())
{
leading_begin_part_offset = stream.currentPartOffset();
leading_end_part_offset = stream.lastPartOffset();
}
/// Stream is lazy. result.num_added_rows is the number of rows added to block which is not equal to
/// result.num_rows_read until call to stream.finalize(). Also result.num_added_rows may be less than
@ -1008,15 +1014,12 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t
if (!result.rows_per_granule.empty())
result.adjustLastGranule();
// for (const auto & column_name : non_const_virtual_column_names)
// {
// if (column_name == "_part_offset")
// fillPartOffsetColumn(result, leading_begin_part_offset, leading_end_part_offset);
// }
if (read_sample_block.has("_part_offset"))
fillPartOffsetColumn(result, leading_begin_part_offset, leading_end_part_offset);
return result;
}
#if 0
void MergeTreeRangeReader::fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset)
{
size_t num_rows = result.numReadRows();
@ -1042,9 +1045,7 @@ void MergeTreeRangeReader::fillPartOffsetColumn(ReadResult & result, UInt64 lead
}
result.columns.emplace_back(std::move(column));
result.extra_columns_filled.push_back("_part_offset");
}
#endif
Columns MergeTreeRangeReader::continueReadingChain(const ReadResult & result, size_t & num_rows)
{
@ -1167,11 +1168,11 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
if (!prewhere_info)
return;
const auto & header = merge_tree_reader->getColumns();
size_t num_columns = header.size();
const auto & header = read_sample_block;
size_t num_columns = header.columns();
/// Check that we have columns from previous steps and newly read required columns
if (result.columns.size() < num_columns + result.extra_columns_filled.size())
if (result.columns.size() < num_columns)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid number of columns passed to MergeTreeRangeReader. Expected {}, got {}",
num_columns, result.columns.size());
@ -1197,15 +1198,6 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
for (auto name_and_type = header.begin(); name_and_type != header.end() && pos < result.columns.size(); ++pos, ++name_and_type)
block.insert({result.columns[pos], name_and_type->type, name_and_type->name});
/*// HACK!! fix it
if (getSampleBlock().has("_part_offset"))
{
const auto & col = getSampleBlock().getByName("_part_offset");
block.insert({result.columns.back(), col.type, col.name});
}
/////////////*/
{
/// Columns might be projected out. We need to store them here so that default columns can be evaluated later.
Block block_before_prewhere = block;

View File

@ -137,8 +137,6 @@ private:
bool continue_reading = false;
bool is_finished = true;
// size_t last_read_end;
/// Current position from the begging of file in rows
size_t position() const;
size_t readRows(Columns & columns, size_t num_rows);
@ -283,20 +281,18 @@ public:
size_t countZeroTails(const IColumn::Filter & filter, NumRows & zero_tails, bool can_read_incomplete_granules) const;
static size_t numZerosInTail(const UInt8 * begin, const UInt8 * end);
Names extra_columns_filled;
Poco::Logger * log;
};
ReadResult read(size_t max_rows, MarkRanges & ranges);
const Block & getSampleBlock() const { return sample_block; }
const Block & getSampleBlock() const { return result_sample_block; }
private:
ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges);
Columns continueReadingChain(const ReadResult & result, size_t & num_rows);
void executePrewhereActionsAndFilterColumns(ReadResult & result) const;
// void fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset);
void fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset);
IMergeTreeReader * merge_tree_reader = nullptr;
const MergeTreeIndexGranularity * index_granularity = nullptr;
@ -305,7 +301,8 @@ private:
Stream stream;
Block sample_block;
Block read_sample_block; /// Block with columns that are actually read from disk + non-const virtual columns that are filled at this step.
Block result_sample_block; /// Block with columns that are returned by this step.
bool last_reader_in_chain = false;
bool is_initialized = false;

View File

@ -151,7 +151,7 @@ void MergeTreeReaderCompact::fillColumnPositions()
}
}
size_t MergeTreeReaderCompact::readPhysicalRows(
size_t MergeTreeReaderCompact::readRows(
size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{
if (continue_reading)

View File

@ -31,12 +31,11 @@ public:
const ReadBufferFromFileBase::ProfileCallback & profile_callback_ = {},
clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE);
protected:
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readPhysicalRows(size_t from_mark, size_t current_task_last_mark,
size_t readRows(size_t from_mark, size_t current_task_last_mark,
bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override;
public:
bool canReadIncompleteGranules() const override { return false; }
private:

View File

@ -49,7 +49,7 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory(
}
}
size_t MergeTreeReaderInMemory::readPhysicalRows(
size_t MergeTreeReaderInMemory::readRows(
size_t from_mark, size_t /* current_task_last_mark */, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{
if (!continue_reading)

View File

@ -22,12 +22,11 @@ public:
MarkRanges mark_ranges_,
MergeTreeReaderSettings settings_);
protected:
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readPhysicalRows(size_t from_mark, size_t current_task_last_mark,
size_t readRows(size_t from_mark, size_t current_tasl_last_mark,
bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override;
public:
bool canReadIncompleteGranules() const override { return true; }
private:

View File

@ -58,7 +58,7 @@ MergeTreeReaderWide::MergeTreeReaderWide(
}
}
size_t MergeTreeReaderWide::readPhysicalRows(
size_t MergeTreeReaderWide::readRows(
size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{
size_t read_rows = 0;

View File

@ -26,12 +26,11 @@ public:
const ReadBufferFromFileBase::ProfileCallback & profile_callback_ = {},
clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE);
protected:
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readPhysicalRows(size_t from_mark, size_t current_task_last_mark,
size_t readRows(size_t from_mark, size_t current_task_last_mark,
bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override;
public:
bool canReadIncompleteGranules() const override { return true; }
using FileStreams = std::map<std::string, std::unique_ptr<MergeTreeReaderStream>>;

View File

@ -19,11 +19,7 @@ namespace ErrorCodes
void StorageSnapshot::init()
{
for (const auto & [name, type] : storage.getVirtuals())
{
virtual_columns[name] = type;
if (name == "_part_offset") // TODO: properly make _part_offset a system column
system_columns[name] = type;
}
if (storage.hasLightweightDeletedMask())
system_columns[LightweightDeleteDescription::FILTER_COLUMN.name] = LightweightDeleteDescription::FILTER_COLUMN.type;