Merge branch 'virtual-column-prewhere' of https://github.com/amosbird/ClickHouse into refactor-virtual-columns

This commit is contained in:
Anton Popov 2024-02-15 21:45:58 +00:00
commit ef6b8275b1
43 changed files with 353 additions and 295 deletions

View File

@ -381,6 +381,7 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
reader_settings,
required_columns,
virt_column_names,
data.getPartitionValueType(),
pool_settings,
context);
@ -462,6 +463,7 @@ Pipe ReadFromMergeTree::readFromPool(
reader_settings,
required_columns,
virt_column_names,
data.getPartitionValueType(),
pool_settings,
context);
}
@ -475,6 +477,7 @@ Pipe ReadFromMergeTree::readFromPool(
reader_settings,
required_columns,
virt_column_names,
data.getPartitionValueType(),
pool_settings,
context);
}
@ -551,6 +554,7 @@ Pipe ReadFromMergeTree::readInOrder(
reader_settings,
required_columns,
virt_column_names,
data.getPartitionValueType(),
pool_settings,
context);
}
@ -566,6 +570,7 @@ Pipe ReadFromMergeTree::readInOrder(
reader_settings,
required_columns,
virt_column_names,
data.getPartitionValueType(),
pool_settings,
context);
}

View File

@ -482,6 +482,10 @@ NamesAndTypesList ColumnsDescription::get(const GetColumnsOptions & options) con
NamesAndTypesList res;
switch (options.kind)
{
case GetColumnsOptions::None:
{
break;
}
case GetColumnsOptions::All:
{
res = getAll();
@ -572,7 +576,8 @@ static GetColumnsOptions::Kind defaultKindToGetKind(ColumnDefaultKind kind)
case ColumnDefaultKind::Ephemeral:
return GetColumnsOptions::Ephemeral;
}
UNREACHABLE();
return GetColumnsOptions::None;
}
NamesAndTypesList ColumnsDescription::getByNames(const GetColumnsOptions & options, const Names & names) const

View File

@ -33,6 +33,7 @@ struct GetColumnsOptions
{
enum Kind : UInt8
{
None = 0,
Ordinary = 1,
Materialized = 2,
Aliases = 4,

View File

@ -47,6 +47,8 @@ class MarkCache;
class UncompressedCache;
class MergeTreeTransaction;
struct MergeTreeReadTaskInfo;
using MergeTreeReadTaskInfoPtr = std::shared_ptr<const MergeTreeReadTaskInfo>;
enum class DataPartRemovalState
{
@ -93,6 +95,7 @@ public:
const NamesAndTypesList & columns_,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
const MergeTreeReadTaskInfoPtr & read_task_info_,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
const AlterConversionsPtr & alter_conversions,

View File

@ -1,7 +1,10 @@
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeReadTask.h>
#include <Storages/BlockNumberColumn.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNested.h>
#include <DataTypes/DataTypeUUID.h>
#include <Common/escapeForFileName.h>
#include <Compression/CachedCompressedReadBuffer.h>
#include <Columns/ColumnArray.h>
@ -25,6 +28,7 @@ namespace ErrorCodes
IMergeTreeReader::IMergeTreeReader(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
const NamesAndTypesList & columns_,
const MergeTreeReadTaskInfoPtr & read_task_info_,
const StorageSnapshotPtr & storage_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
@ -47,14 +51,21 @@ IMergeTreeReader::IMergeTreeReader(
, part_columns(data_part_info_for_read->isWidePart()
? data_part_info_for_read->getColumnsDescriptionWithCollectedNested()
: data_part_info_for_read->getColumnsDescription())
, read_task_info(read_task_info_)
{
columns_to_read.reserve(requested_columns.size());
serializations.reserve(requested_columns.size());
size_t pos = 0;
for (const auto & column : requested_columns)
{
columns_to_read.emplace_back(getColumnInPart(column));
serializations.emplace_back(getSerializationInPart(column));
if (read_task_info && read_task_info->virt_column_names.contains(column.name))
virt_column_pos_to_name.emplace(pos, column.name);
++pos;
}
}
@ -63,6 +74,73 @@ const IMergeTreeReader::ValueSizeMap & IMergeTreeReader::getAvgValueSizeHints()
return avg_value_size_hints;
}
void IMergeTreeReader::fillVirtualColumns(Columns & columns, size_t rows) const
{
if (std::all_of(
virt_column_pos_to_name.begin(),
virt_column_pos_to_name.end(),
[&columns](auto & elem)
{
chassert(elem.first < columns.size());
return columns[elem.first] != nullptr;
}))
return;
chassert(read_task_info != nullptr);
const IMergeTreeDataPart * part = read_task_info->data_part.get();
if (part->isProjectionPart())
part = part->getParentPart();
for (auto [pos, name] : virt_column_pos_to_name)
{
auto & column = columns[pos];
if (column != nullptr)
continue;
if (name == "_part_offset")
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column {} must have been filled by part reader", name);
}
else if (name == LightweightDeleteDescription::FILTER_COLUMN.name)
{
/// If _row_exists column isn't present in the part then fill it here with 1s
column = LightweightDeleteDescription::FILTER_COLUMN.type->createColumnConst(rows, 1)->convertToFullColumnIfConst();
}
else if (name == BlockNumberColumn::name)
{
column = BlockNumberColumn::type->createColumnConst(rows, part->info.min_block)->convertToFullColumnIfConst();
}
else if (name == "_part")
{
column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}
.createColumnConst(rows, part->name)
->convertToFullColumnIfConst();
}
else if (name == "_part_index")
{
column = DataTypeUInt64().createColumnConst(rows, read_task_info->part_index_in_query)->convertToFullColumnIfConst();
}
else if (name == "_part_uuid")
{
column = DataTypeUUID().createColumnConst(rows, part->uuid)->convertToFullColumnIfConst();
}
else if (name == "_partition_id")
{
column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}
.createColumnConst(rows, part->info.partition_id)
->convertToFullColumnIfConst();
}
else if (name == "_partition_value")
{
column = read_task_info->partition_value_type
->createColumnConst(rows, Tuple(part->partition.value.begin(), part->partition.value.end()))
->convertToFullColumnIfConst();
}
}
}
void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_evaluate_missing_defaults, size_t num_rows, size_t block_number) const
{
try

View File

@ -23,6 +23,7 @@ public:
IMergeTreeReader(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
const NamesAndTypesList & columns_,
const MergeTreeReadTaskInfoPtr & read_task_info_,
const StorageSnapshotPtr & storage_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
@ -42,6 +43,9 @@ public:
const ValueSizeMap & getAvgValueSizeHints() const;
/// Add virtual columns that are not present in the block.
void fillVirtualColumns(Columns & columns, size_t rows) const;
/// Add columns from ordered_names that are not present in the block.
/// Missing columns are added in the order specified by ordered_names.
/// num_rows is needed in case if all res_columns are nullptr.
@ -113,6 +117,12 @@ private:
/// Actual columns description in part.
const ColumnsDescription & part_columns;
/// Shared information required for reading.
MergeTreeReadTaskInfoPtr read_task_info;
/// Map of positions in requested_columns which are virtual columns to their names.
std::map<size_t, String> virt_column_pos_to_name;
};
}

View File

@ -111,11 +111,18 @@ NameSet injectRequiredColumns(
if (with_subcolumns)
options.withSubcolumns();
auto virtuals_options = GetColumnsOptions(GetColumnsOptions::None).withVirtuals();
for (size_t i = 0; i < columns.size(); ++i)
{
/// We are going to fetch only physical columns and system columns
/// We are going to fetch physical columns and system columns first
if (!storage_snapshot->tryGetColumn(options, columns[i]))
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "There is no physical column or subcolumn {} in table", columns[i]);
{
if (storage_snapshot->tryGetColumn(virtuals_options, columns[i]))
continue;
else
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "There is no column or subcolumn {} in table", columns[i]);
}
have_at_least_one_physical_column |= injectRequiredColumnsRecursively(
columns[i], storage_snapshot, alter_conversions,
@ -258,11 +265,10 @@ void MergeTreeBlockSizePredictor::update(const Block & sample_block, const Colum
}
MergeTreeReadTask::Columns getReadTaskColumns(
MergeTreeReadTaskColumns getReadTaskColumns(
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const StorageSnapshotPtr & storage_snapshot,
const Names & required_columns,
const Names & system_columns,
const PrewhereInfoPtr & prewhere_info,
const ExpressionActionsSettings & actions_settings,
const MergeTreeReaderSettings & reader_settings,
@ -270,16 +276,11 @@ MergeTreeReadTask::Columns getReadTaskColumns(
{
Names column_to_read_after_prewhere = required_columns;
/// Read system columns such as lightweight delete mask "_row_exists" if it is persisted in the part
for (const auto & name : system_columns)
if (data_part_info_for_reader.getColumns().contains(name))
column_to_read_after_prewhere.push_back(name);
/// Inject columns required for defaults evaluation
injectRequiredColumns(
data_part_info_for_reader, storage_snapshot, with_subcolumns, column_to_read_after_prewhere);
MergeTreeReadTask::Columns result;
MergeTreeReadTaskColumns result;
auto options = GetColumnsOptions(GetColumnsOptions::All)
.withExtendedObjects()
.withSystemColumns();
@ -287,6 +288,9 @@ MergeTreeReadTask::Columns getReadTaskColumns(
if (with_subcolumns)
options.withSubcolumns();
options.withVirtuals();
bool has_part_offset = std::find(required_columns.begin(), required_columns.end(), "_part_offset") != required_columns.end();
NameSet columns_from_previous_steps;
auto add_step = [&](const PrewhereExprStep & step)
{
@ -302,6 +306,13 @@ MergeTreeReadTask::Columns getReadTaskColumns(
if (!columns_from_previous_steps.contains(name))
step_column_names.push_back(name);
/// Make sure _part_offset is read in STEP 0
if (columns_from_previous_steps.empty() && has_part_offset)
{
if (std::find(step_column_names.begin(), step_column_names.end(), "_part_offset") == step_column_names.end())
step_column_names.push_back("_part_offset");
}
if (!step_column_names.empty())
injectRequiredColumns(
data_part_info_for_reader, storage_snapshot,

View File

@ -1,6 +1,5 @@
#pragma once
#include <optional>
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/MergeTreeReadTask.h>
@ -22,11 +21,10 @@ NameSet injectRequiredColumns(
bool with_subcolumns,
Names & columns);
MergeTreeReadTask::Columns getReadTaskColumns(
MergeTreeReadTaskColumns getReadTaskColumns(
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const StorageSnapshotPtr & storage_snapshot,
const Names & required_columns,
const Names & system_columns,
const PrewhereInfoPtr & prewhere_info,
const ExpressionActionsSettings & actions_settings,
const MergeTreeReaderSettings & reader_settings,

View File

@ -33,6 +33,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
const NamesAndTypesList & columns_to_read,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
const MergeTreeReadTaskInfoPtr & read_task_info_,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
const AlterConversionsPtr & alter_conversions,
@ -41,12 +42,21 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
const ReadBufferFromFileBase::ProfileCallback & profile_callback) const
{
auto read_info = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this(), alter_conversions);
auto * load_marks_threadpool = reader_settings.read_settings.load_marks_asynchronously ? &read_info->getContext()->getLoadMarksThreadpool() : nullptr;
auto * load_marks_threadpool
= reader_settings.read_settings.load_marks_asynchronously ? &read_info->getContext()->getLoadMarksThreadpool() : nullptr;
return std::make_unique<MergeTreeReaderCompact>(
read_info, columns_to_read, storage_snapshot, uncompressed_cache,
mark_cache, mark_ranges, reader_settings, load_marks_threadpool,
avg_value_size_hints, profile_callback);
read_info,
columns_to_read,
read_task_info_,
storage_snapshot,
uncompressed_cache,
mark_cache,
mark_ranges,
reader_settings,
load_marks_threadpool,
avg_value_size_hints,
profile_callback);
}
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(

View File

@ -32,6 +32,7 @@ public:
const NamesAndTypesList & columns,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
const MergeTreeReadTaskInfoPtr & read_task_info_,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
const AlterConversionsPtr & alter_conversions,

View File

@ -33,6 +33,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartInMemory::getReader(
const NamesAndTypesList & columns_to_read,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
const MergeTreeReadTaskInfoPtr & read_task_info_,
UncompressedCache * /* uncompressed_cache */,
MarkCache * /* mark_cache */,
const AlterConversionsPtr & alter_conversions,
@ -44,7 +45,13 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartInMemory::getReader(
auto ptr = std::static_pointer_cast<const MergeTreeDataPartInMemory>(shared_from_this());
return std::make_unique<MergeTreeReaderInMemory>(
read_info, ptr, columns_to_read, storage_snapshot, mark_ranges, reader_settings);
read_info,
ptr,
columns_to_read,
read_task_info_,
storage_snapshot,
mark_ranges,
reader_settings);
}
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartInMemory::getWriter(

View File

@ -21,6 +21,7 @@ public:
const NamesAndTypesList & columns,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
const MergeTreeReadTaskInfoPtr & read_task_info_,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
const AlterConversionsPtr & alter_conversions,

View File

@ -31,6 +31,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader(
const NamesAndTypesList & columns_to_read,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
const MergeTreeReadTaskInfoPtr & read_task_info_,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
const AlterConversionsPtr & alter_conversions,
@ -40,10 +41,16 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader(
{
auto read_info = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this(), alter_conversions);
return std::make_unique<MergeTreeReaderWide>(
read_info, columns_to_read,
storage_snapshot, uncompressed_cache,
mark_cache, mark_ranges, reader_settings,
avg_value_size_hints, profile_callback);
read_info,
columns_to_read,
read_task_info_,
storage_snapshot,
uncompressed_cache,
mark_cache,
mark_ranges,
reader_settings,
avg_value_size_hints,
profile_callback);
}
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartWide::getWriter(

View File

@ -27,6 +27,7 @@ public:
const NamesAndTypesList & columns,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
const MergeTreeReadTaskInfoPtr & read_task_info_,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
const AlterConversionsPtr & alter_conversions,

View File

@ -114,6 +114,7 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
const MergeTreeReaderSettings & reader_settings_,
const Names & column_names_,
const Names & virtual_column_names_,
const DataTypePtr & partition_value_type_,
const PoolSettings & settings_,
const ContextPtr & context_)
: MergeTreeReadPoolBase(
@ -124,6 +125,7 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
reader_settings_,
column_names_,
virtual_column_names_,
partition_value_type_,
settings_,
context_)
, WithContext(context_)

View File

@ -24,6 +24,7 @@ public:
const MergeTreeReaderSettings & reader_settings_,
const Names & column_names_,
const Names & virtual_column_names_,
const DataTypePtr & partition_value_type_,
const PoolSettings & settings_,
const ContextPtr & context_);
@ -67,7 +68,7 @@ private:
struct ThreadTask
{
using InfoPtr = MergeTreeReadTask::InfoPtr;
using InfoPtr = MergeTreeReadTaskInfoPtr;
ThreadTask(InfoPtr read_info_, MarkRanges ranges_, Priority priority_)
: read_info(std::move(read_info_)), ranges(std::move(ranges_)), priority(priority_)

View File

@ -362,7 +362,7 @@ void MergeTreeRangeReader::ReadResult::shrink(Columns & old_columns, const NumRo
}
}
/// The main invariant of the data in the read result is that he number of rows is
/// The main invariant of the data in the read result is that the number of rows is
/// either equal to total_rows_per_granule (if filter has not been applied) or to the number of
/// 1s in the filter (if filter has been applied).
void MergeTreeRangeReader::ReadResult::checkInternalConsistency() const
@ -808,8 +808,7 @@ MergeTreeRangeReader::MergeTreeRangeReader(
IMergeTreeReader * merge_tree_reader_,
MergeTreeRangeReader * prev_reader_,
const PrewhereExprStep * prewhere_info_,
bool last_reader_in_chain_,
const Names & non_const_virtual_column_names_)
bool last_reader_in_chain_)
: merge_tree_reader(merge_tree_reader_)
, index_granularity(&(merge_tree_reader->data_part_info_for_read->getIndexGranularity()))
, prev_reader(prev_reader_)
@ -826,21 +825,6 @@ MergeTreeRangeReader::MergeTreeRangeReader(
result_sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name});
}
for (const auto & column_name : non_const_virtual_column_names_)
{
if (result_sample_block.has(column_name))
continue;
non_const_virtual_column_names.push_back(column_name);
if (column_name == "_part_offset" && !prev_reader)
{
/// _part_offset column is filled by the first reader.
read_sample_block.insert(ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), column_name));
result_sample_block.insert(ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), column_name));
}
}
if (prewhere_info)
{
const auto & step = *prewhere_info;
@ -1006,6 +990,8 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
if (num_read_rows == 0)
num_read_rows = read_result.num_rows;
merge_tree_reader->fillVirtualColumns(columns, num_read_rows);
/// fillMissingColumns() must be called after reading but befoe any filterings because
/// some columns (e.g. arrays) might be only partially filled and thus not be valid and
/// fillMissingColumns() fixes this.
@ -1055,23 +1041,23 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
return read_result;
{
/// Physical columns go first and then some virtual columns follow
size_t physical_columns_count = merge_tree_reader->getColumns().size();
Columns physical_columns(read_result.columns.begin(), read_result.columns.begin() + physical_columns_count);
size_t columns_count = merge_tree_reader->getColumns().size();
Columns columns(read_result.columns.begin(), read_result.columns.begin() + columns_count);
merge_tree_reader->fillVirtualColumns(columns, read_result.num_rows);
bool should_evaluate_missing_defaults;
merge_tree_reader->fillMissingColumns(physical_columns, should_evaluate_missing_defaults, read_result.num_rows);
merge_tree_reader->fillMissingColumns(columns, should_evaluate_missing_defaults, read_result.num_rows);
/// If some columns absent in part, then evaluate default values
if (should_evaluate_missing_defaults)
merge_tree_reader->evaluateMissingDefaults({}, physical_columns);
merge_tree_reader->evaluateMissingDefaults({}, columns);
/// If result not empty, then apply on-fly alter conversions if any required
if (!prewhere_info || prewhere_info->perform_alter_conversions)
merge_tree_reader->performRequiredConversions(physical_columns);
merge_tree_reader->performRequiredConversions(columns);
for (size_t i = 0; i < physical_columns.size(); ++i)
read_result.columns[i] = std::move(physical_columns[i]);
for (size_t i = 0; i < columns.size(); ++i)
read_result.columns[i] = std::move(columns[i]);
}
size_t total_bytes = 0;
@ -1163,12 +1149,17 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t
result.adjustLastGranule();
if (read_sample_block.has("_part_offset"))
fillPartOffsetColumn(result, leading_begin_part_offset, leading_end_part_offset);
{
size_t pos = read_sample_block.getPositionByName("_part_offset");
chassert(pos < result.columns.size());
chassert(result.columns[pos] == nullptr);
result.columns[pos] = fillPartOffsetColumn(result, leading_begin_part_offset, leading_end_part_offset);
}
return result;
}
void MergeTreeRangeReader::fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset)
ColumnPtr MergeTreeRangeReader::fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset)
{
size_t num_rows = result.numReadRows();
@ -1194,7 +1185,7 @@ void MergeTreeRangeReader::fillPartOffsetColumn(ReadResult & result, UInt64 lead
*pos++ = start_part_offset++;
}
result.columns.emplace_back(std::move(column));
return column;
}
Columns MergeTreeRangeReader::continueReadingChain(const ReadResult & result, size_t & num_rows)
@ -1208,7 +1199,7 @@ Columns MergeTreeRangeReader::continueReadingChain(const ReadResult & result, si
if (result.rows_per_granule.empty())
{
/// If zero rows were read on prev step, than there is no more rows to read.
/// If zero rows were read on prev step, there is no more rows to read.
/// Last granule may have less rows than index_granularity, so finish reading manually.
stream.finish();
return columns;

View File

@ -101,8 +101,7 @@ public:
IMergeTreeReader * merge_tree_reader_,
MergeTreeRangeReader * prev_reader_,
const PrewhereExprStep * prewhere_info_,
bool last_reader_in_chain_,
const Names & non_const_virtual_column_names);
bool last_reader_in_chain_);
MergeTreeRangeReader() = default;
@ -309,7 +308,7 @@ private:
ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges);
Columns continueReadingChain(const ReadResult & result, size_t & num_rows);
void executePrewhereActionsAndFilterColumns(ReadResult & result) const;
void fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset);
ColumnPtr fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset);
IMergeTreeReader * merge_tree_reader = nullptr;
const MergeTreeIndexGranularity * index_granularity = nullptr;
@ -323,7 +322,6 @@ private:
bool last_reader_in_chain = false;
bool is_initialized = false;
Names non_const_virtual_column_names;
LoggerPtr log = getLogger("MergeTreeRangeReader");
};

View File

@ -40,6 +40,7 @@ MergeTreeReadPool::MergeTreeReadPool(
const MergeTreeReaderSettings & reader_settings_,
const Names & column_names_,
const Names & virtual_column_names_,
const DataTypePtr & partition_value_type_,
const PoolSettings & settings_,
const ContextPtr & context_)
: MergeTreeReadPoolBase(
@ -50,6 +51,7 @@ MergeTreeReadPool::MergeTreeReadPool(
reader_settings_,
column_names_,
virtual_column_names_,
partition_value_type_,
settings_,
context_)
, min_marks_for_concurrent_read(pool_settings.min_marks_for_concurrent_read)

View File

@ -32,6 +32,7 @@ public:
const MergeTreeReaderSettings & reader_settings_,
const Names & column_names_,
const Names & virtual_column_names_,
const DataTypePtr & partition_value_type_,
const PoolSettings & settings_,
const ContextPtr & context_);

View File

@ -13,6 +13,7 @@ MergeTreeReadPoolBase::MergeTreeReadPoolBase(
const MergeTreeReaderSettings & reader_settings_,
const Names & column_names_,
const Names & virtual_column_names_,
const DataTypePtr & partition_value_type_,
const PoolSettings & pool_settings_,
const ContextPtr & context_)
: parts_ranges(std::move(parts_))
@ -22,6 +23,7 @@ MergeTreeReadPoolBase::MergeTreeReadPoolBase(
, reader_settings(reader_settings_)
, column_names(column_names_)
, virtual_column_names(virtual_column_names_)
, partition_value_type(partition_value_type_)
, pool_settings(pool_settings_)
, owned_mark_cache(context_->getGlobalContext()->getMarkCache())
, owned_uncompressed_cache(pool_settings_.use_uncompressed_cache ? context_->getGlobalContext()->getUncompressedCache() : nullptr)
@ -44,7 +46,7 @@ void MergeTreeReadPoolBase::fillPerPartInfos()
assertSortedAndNonIntersecting(part_with_ranges.ranges);
#endif
MergeTreeReadTask::Info read_task_info;
MergeTreeReadTaskInfo read_task_info;
read_task_info.data_part = part_with_ranges.data_part;
read_task_info.part_index_in_query = part_with_ranges.part_index_in_query;
@ -52,10 +54,22 @@ void MergeTreeReadPoolBase::fillPerPartInfos()
LoadedMergeTreeDataPartInfoForReader part_info(part_with_ranges.data_part, part_with_ranges.alter_conversions);
Names column_and_virtual_column_names;
column_and_virtual_column_names.reserve(column_names.size() + virtual_column_names.size());
column_and_virtual_column_names.insert(column_and_virtual_column_names.end(), column_names.begin(), column_names.end());
column_and_virtual_column_names.insert(
column_and_virtual_column_names.end(), virtual_column_names.begin(), virtual_column_names.end());
read_task_info.task_columns = getReadTaskColumns(
part_info, storage_snapshot, column_names, virtual_column_names,
prewhere_info, actions_settings,
reader_settings, /*with_subcolumns=*/ true);
part_info,
storage_snapshot,
column_and_virtual_column_names,
prewhere_info,
actions_settings,
reader_settings,
/*with_subcolumns=*/true);
read_task_info.virt_column_names = {virtual_column_names.begin(), virtual_column_names.end()};
read_task_info.partition_value_type = partition_value_type;
if (pool_settings.preferred_block_size_bytes > 0)
{
@ -75,7 +89,7 @@ void MergeTreeReadPoolBase::fillPerPartInfos()
}
is_part_on_remote_disk.push_back(part_with_ranges.data_part->isStoredOnRemoteDisk());
per_part_infos.push_back(std::make_shared<MergeTreeReadTask::Info>(std::move(read_task_info)));
per_part_infos.push_back(std::make_shared<MergeTreeReadTaskInfo>(std::move(read_task_info)));
}
}
@ -97,7 +111,7 @@ std::vector<size_t> MergeTreeReadPoolBase::getPerPartSumMarks() const
}
MergeTreeReadTaskPtr MergeTreeReadPoolBase::createTask(
MergeTreeReadTask::InfoPtr read_info,
MergeTreeReadTaskInfoPtr read_info,
MarkRanges ranges,
MergeTreeReadTask * previous_task) const
{

View File

@ -29,6 +29,7 @@ public:
const MergeTreeReaderSettings & reader_settings_,
const Names & column_names_,
const Names & virtual_column_names_,
const DataTypePtr & partition_value_type_,
const PoolSettings & settings_,
const ContextPtr & context_);
@ -43,6 +44,7 @@ protected:
const MergeTreeReaderSettings reader_settings;
const Names column_names;
const Names virtual_column_names;
const DataTypePtr partition_value_type;
const PoolSettings pool_settings;
const MarkCachePtr owned_mark_cache;
const UncompressedCachePtr owned_uncompressed_cache;
@ -52,13 +54,13 @@ protected:
std::vector<size_t> getPerPartSumMarks() const;
MergeTreeReadTaskPtr createTask(
MergeTreeReadTask::InfoPtr read_info,
MergeTreeReadTaskInfoPtr read_info,
MarkRanges ranges,
MergeTreeReadTask * previous_task) const;
MergeTreeReadTask::Extras getExtras() const;
std::vector<MergeTreeReadTask::InfoPtr> per_part_infos;
std::vector<MergeTreeReadTaskInfoPtr> per_part_infos;
std::vector<bool> is_part_on_remote_disk;
ReadBufferFromFileBase::ProfileCallback profile_callback;

View File

@ -18,6 +18,7 @@ MergeTreeReadPoolInOrder::MergeTreeReadPoolInOrder(
const MergeTreeReaderSettings & reader_settings_,
const Names & column_names_,
const Names & virtual_column_names_,
const DataTypePtr & partition_value_type_,
const PoolSettings & settings_,
const ContextPtr & context_)
: MergeTreeReadPoolBase(
@ -28,6 +29,7 @@ MergeTreeReadPoolInOrder::MergeTreeReadPoolInOrder(
reader_settings_,
column_names_,
virtual_column_names_,
partition_value_type_,
settings_,
context_)
, has_limit_below_one_block(has_limit_below_one_block_)

View File

@ -17,6 +17,7 @@ public:
const MergeTreeReaderSettings & reader_settings_,
const Names & column_names_,
const Names & virtual_column_names_,
const DataTypePtr & partition_value_type_,
const PoolSettings & settings_,
const ContextPtr & context_);

View File

@ -19,6 +19,7 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas(
const MergeTreeReaderSettings & reader_settings_,
const Names & column_names_,
const Names & virtual_column_names_,
const DataTypePtr & partition_value_type_,
const PoolSettings & settings_,
const ContextPtr & context_)
: MergeTreeReadPoolBase(
@ -29,6 +30,7 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas(
reader_settings_,
column_names_,
virtual_column_names_,
partition_value_type_,
settings_,
context_)
, extension(std::move(extension_))

View File

@ -17,6 +17,7 @@ public:
const MergeTreeReaderSettings & reader_settings_,
const Names & column_names_,
const Names & virtual_column_names_,
const DataTypePtr & partition_value_type_,
const PoolSettings & settings_,
const ContextPtr & context_);

View File

@ -18,6 +18,7 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd
const MergeTreeReaderSettings & reader_settings_,
const Names & column_names_,
const Names & virtual_column_names_,
const DataTypePtr & partition_value_type_,
const PoolSettings & settings_,
const ContextPtr & context_)
: MergeTreeReadPoolBase(
@ -28,6 +29,7 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd
reader_settings_,
column_names_,
virtual_column_names_,
partition_value_type_,
settings_,
context_)
, extension(std::move(extension_))

View File

@ -18,6 +18,7 @@ public:
const MergeTreeReaderSettings & reader_settings_,
const Names & column_names_,
const Names & virtual_column_names_,
const DataTypePtr & partition_value_type_,
const PoolSettings & settings_,
const ContextPtr & context_);

View File

@ -10,7 +10,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
String MergeTreeReadTask::Columns::dump() const
String MergeTreeReadTaskColumns::dump() const
{
WriteBufferFromOwnString s;
for (size_t i = 0; i < pre_columns.size(); ++i)
@ -22,7 +22,7 @@ String MergeTreeReadTask::Columns::dump() const
}
MergeTreeReadTask::MergeTreeReadTask(
InfoPtr info_,
MergeTreeReadTaskInfoPtr info_,
Readers readers_,
MarkRanges mark_ranges_,
MergeTreeBlockSizePredictorPtr size_predictor_)
@ -34,16 +34,23 @@ MergeTreeReadTask::MergeTreeReadTask(
}
MergeTreeReadTask::Readers MergeTreeReadTask::createReaders(
const InfoPtr & read_info, const Extras & extras, const MarkRanges & ranges)
const MergeTreeReadTaskInfoPtr & read_info, const Extras & extras, const MarkRanges & ranges)
{
Readers new_readers;
auto create_reader = [&](const NamesAndTypesList & columns_to_read)
{
return read_info->data_part->getReader(
columns_to_read, extras.storage_snapshot, ranges,
extras.uncompressed_cache, extras.mark_cache,
read_info->alter_conversions, extras.reader_settings, extras.value_size_map, extras.profile_callback);
columns_to_read,
extras.storage_snapshot,
ranges,
read_info,
extras.uncompressed_cache,
extras.mark_cache,
read_info->alter_conversions,
extras.reader_settings,
extras.value_size_map,
extras.profile_callback);
};
new_readers.main = create_reader(read_info->task_columns.columns);
@ -58,10 +65,8 @@ MergeTreeReadTask::Readers MergeTreeReadTask::createReaders(
return new_readers;
}
MergeTreeReadTask::RangeReaders MergeTreeReadTask::createRangeReaders(
const Readers & task_readers,
const PrewhereExprInfo & prewhere_actions,
const Names & non_const_virtual_column_names)
MergeTreeReadTask::RangeReaders
MergeTreeReadTask::createRangeReaders(const Readers & task_readers, const PrewhereExprInfo & prewhere_actions)
{
MergeTreeReadTask::RangeReaders new_range_readers;
if (prewhere_actions.steps.size() != task_readers.prewhere.size())
@ -77,10 +82,7 @@ MergeTreeReadTask::RangeReaders MergeTreeReadTask::createRangeReaders(
{
last_reader = task_readers.main->getColumns().empty() && (i + 1 == prewhere_actions.steps.size());
MergeTreeRangeReader current_reader(
task_readers.prewhere[i].get(),
prev_reader, prewhere_actions.steps[i].get(),
last_reader, non_const_virtual_column_names);
MergeTreeRangeReader current_reader(task_readers.prewhere[i].get(), prev_reader, prewhere_actions.steps[i].get(), last_reader);
new_range_readers.prewhere.push_back(std::move(current_reader));
prev_reader = &new_range_readers.prewhere.back();
@ -88,11 +90,11 @@ MergeTreeReadTask::RangeReaders MergeTreeReadTask::createRangeReaders(
if (!last_reader)
{
new_range_readers.main = MergeTreeRangeReader(task_readers.main.get(), prev_reader, nullptr, true, non_const_virtual_column_names);
new_range_readers.main = MergeTreeRangeReader(task_readers.main.get(), prev_reader, nullptr, true);
}
else
{
/// If all columns are read by prewhere range readers than move last prewhere range reader to main.
/// If all columns are read by prewhere range readers, move last prewhere range reader to main.
new_range_readers.main = std::move(new_range_readers.prewhere.back());
new_range_readers.prewhere.pop_back();
}
@ -100,14 +102,12 @@ MergeTreeReadTask::RangeReaders MergeTreeReadTask::createRangeReaders(
return new_range_readers;
}
void MergeTreeReadTask::initializeRangeReaders(
const PrewhereExprInfo & prewhere_actions,
const Names & non_const_virtual_column_names)
void MergeTreeReadTask::initializeRangeReaders(const PrewhereExprInfo & prewhere_actions)
{
if (range_readers.main.isInitialized())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Range reader is already initialized");
range_readers = createRangeReaders(readers, prewhere_actions, non_const_virtual_column_names);
range_readers = createRangeReaders(readers, prewhere_actions);
}
UInt64 MergeTreeReadTask::estimateNumRows(const BlockSizeParams & params) const

View File

@ -40,36 +40,40 @@ enum class MergeTreeReadType
ParallelReplicas,
};
struct MergeTreeReadTaskColumns
{
/// Column names to read during WHERE
NamesAndTypesList columns;
/// Column names to read during each PREWHERE step
std::vector<NamesAndTypesList> pre_columns;
String dump() const;
};
struct MergeTreeReadTaskInfo
{
/// Data part which should be read while performing this task
DataPartPtr data_part;
/// For `part_index` virtual column
size_t part_index_in_query;
/// Alter converversionss that should be applied on-fly for part.
AlterConversionsPtr alter_conversions;
/// Column names to read during PREWHERE and WHERE
MergeTreeReadTaskColumns task_columns;
/// Virtual column names to read
NameSet virt_column_names;
/// For `partition_value` virtual column
DataTypePtr partition_value_type;
/// Shared initialized size predictor. It is copied for each new task.
MergeTreeBlockSizePredictorPtr shared_size_predictor;
};
using MergeTreeReadTaskInfoPtr = std::shared_ptr<const MergeTreeReadTaskInfo>;
/// A batch of work for MergeTreeSelectProcessor
struct MergeTreeReadTask : private boost::noncopyable
{
public:
struct Columns
{
/// Column names to read during WHERE
NamesAndTypesList columns;
/// Column names to read during each PREWHERE step
std::vector<NamesAndTypesList> pre_columns;
String dump() const;
};
struct Info
{
/// Data part which should be read while performing this task
DataPartPtr data_part;
/// For virtual `part_index` virtual column
size_t part_index_in_query;
/// Alter converversionss that should be applied on-fly for part.
AlterConversionsPtr alter_conversions;
/// Column names to read during PREWHERE and WHERE
Columns task_columns;
/// Shared initialized size predictor. It is copied for each new task.
MergeTreeBlockSizePredictorPtr shared_size_predictor;
};
using InfoPtr = std::shared_ptr<const Info>;
/// Extra params that required for creation of reader.
struct Extras
{
@ -115,27 +119,28 @@ public:
size_t num_read_bytes = 0;
};
MergeTreeReadTask(InfoPtr info_, Readers readers_, MarkRanges mark_ranges_, MergeTreeBlockSizePredictorPtr size_predictor_);
MergeTreeReadTask(
MergeTreeReadTaskInfoPtr info_, Readers readers_, MarkRanges mark_ranges_, MergeTreeBlockSizePredictorPtr size_predictor_);
void initializeRangeReaders(const PrewhereExprInfo & prewhere_actions, const Names & non_const_virtual_column_names);
void initializeRangeReaders(const PrewhereExprInfo & prewhere_actions);
BlockAndProgress read(const BlockSizeParams & params);
bool isFinished() const { return mark_ranges.empty() && range_readers.main.isCurrentRangeFinished(); }
const Info & getInfo() const { return *info; }
const MergeTreeReadTaskInfo & getInfo() const { return *info; }
const MergeTreeRangeReader & getMainRangeReader() const { return range_readers.main; }
const IMergeTreeReader & getMainReader() const { return *readers.main; }
Readers releaseReaders() { return std::move(readers); }
static Readers createReaders(const InfoPtr & read_info, const Extras & extras, const MarkRanges & ranges);
static RangeReaders createRangeReaders(const Readers & readers, const PrewhereExprInfo & prewhere_actions, const Names & non_const_virtual_column_names);
static Readers createReaders(const MergeTreeReadTaskInfoPtr & read_info, const Extras & extras, const MarkRanges & ranges);
static RangeReaders createRangeReaders(const Readers & readers, const PrewhereExprInfo & prewhere_actions);
private:
UInt64 estimateNumRows(const BlockSizeParams & params) const;
/// Shared information required for reading.
InfoPtr info;
MergeTreeReadTaskInfoPtr info;
/// Readers for data_part of this task.
/// May be reused and released to the next task.

View File

@ -17,6 +17,7 @@ namespace ErrorCodes
MergeTreeReaderCompact::MergeTreeReaderCompact(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
NamesAndTypesList columns_,
const MergeTreeReadTaskInfoPtr & read_task_info_,
const StorageSnapshotPtr & storage_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
@ -29,6 +30,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
: IMergeTreeReader(
data_part_info_for_read_,
columns_,
read_task_info_,
storage_snapshot_,
uncompressed_cache_,
mark_cache_,

View File

@ -21,6 +21,7 @@ public:
MergeTreeReaderCompact(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
NamesAndTypesList columns_,
const MergeTreeReadTaskInfoPtr & read_task_info_,
const StorageSnapshotPtr & storage_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,

View File

@ -19,12 +19,14 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
DataPartInMemoryPtr data_part_,
NamesAndTypesList columns_,
const MergeTreeReadTaskInfoPtr & read_task_info_,
const StorageSnapshotPtr & storage_snapshot_,
MarkRanges mark_ranges_,
MergeTreeReaderSettings settings_)
: IMergeTreeReader(
data_part_info_for_read_,
columns_,
read_task_info_,
storage_snapshot_,
nullptr,
nullptr,

View File

@ -18,6 +18,7 @@ public:
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
DataPartInMemoryPtr data_part_,
NamesAndTypesList columns_,
const MergeTreeReadTaskInfoPtr & read_task_info_,
const StorageSnapshotPtr & storage_snapshot_,
MarkRanges mark_ranges_,
MergeTreeReaderSettings settings_);

View File

@ -24,6 +24,7 @@ namespace
MergeTreeReaderWide::MergeTreeReaderWide(
MergeTreeDataPartInfoForReaderPtr data_part_info_,
NamesAndTypesList columns_,
const MergeTreeReadTaskInfoPtr & read_task_info_,
const StorageSnapshotPtr & storage_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
@ -35,6 +36,7 @@ MergeTreeReaderWide::MergeTreeReaderWide(
: IMergeTreeReader(
data_part_info_,
columns_,
read_task_info_,
storage_snapshot_,
uncompressed_cache_,
mark_cache_,

View File

@ -17,6 +17,7 @@ public:
MergeTreeReaderWide(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
NamesAndTypesList columns_,
const MergeTreeReadTaskInfoPtr & read_task_info_,
const StorageSnapshotPtr & storage_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,

View File

@ -20,23 +20,9 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
extern const int LOGICAL_ERROR;
extern const int QUERY_WAS_CANCELLED;
}
static void injectNonConstVirtualColumns(
size_t rows,
Block & block,
const Names & virtual_columns,
MergeTreeReadTask * task = nullptr);
static void injectPartConstVirtualColumns(
size_t rows,
Block & block,
MergeTreeReadTask * task,
const DataTypePtr & partition_value_type,
const Names & virtual_columns);
MergeTreeSelectProcessor::MergeTreeSelectProcessor(
MergeTreeReadPoolPtr pool_,
MergeTreeSelectAlgorithmPtr algorithm_,
@ -71,15 +57,9 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
lightweight_delete_filter_step = std::make_shared<PrewhereExprStep>(std::move(step));
}
header_without_const_virtual_columns = applyPrewhereActions(pool->getHeader(), prewhere_info);
size_t non_const_columns_offset = header_without_const_virtual_columns.columns();
injectNonConstVirtualColumns(0, header_without_const_virtual_columns, virt_column_names);
for (size_t col_num = non_const_columns_offset; col_num < header_without_const_virtual_columns.columns(); ++col_num)
non_const_virtual_column_names.emplace_back(header_without_const_virtual_columns.getByPosition(col_num).name);
result_header = header_without_const_virtual_columns;
injectPartConstVirtualColumns(0, result_header, nullptr, partition_value_type, virt_column_names);
result_header = pool->getHeader();
injectVirtualColumns(result_header, partition_value_type, virt_column_names);
result_header = applyPrewhereActions(result_header, prewhere_info);
if (!prewhere_actions.steps.empty())
LOG_TRACE(log, "PREWHERE condition was split into {} steps: {}", prewhere_actions.steps.size(), prewhere_actions.dumpConditions());
@ -163,8 +143,6 @@ ChunkAndProgress MergeTreeSelectProcessor::read()
if (res.row_count)
{
injectVirtualColumns(res.block, res.row_count, task.get(), partition_value_type, virt_column_names);
/// Reorder the columns according to result_header
Columns ordered_columns;
ordered_columns.reserve(result_header.columns());
@ -198,7 +176,7 @@ void MergeTreeSelectProcessor::initializeRangeReaders()
for (const auto & step : prewhere_actions.steps)
all_prewhere_actions.steps.push_back(step);
task->initializeRangeReaders(all_prewhere_actions, non_const_virtual_column_names);
task->initializeRangeReaders(all_prewhere_actions);
}
@ -208,8 +186,6 @@ namespace
{
explicit VirtualColumnsInserter(Block & block_) : block(block_) {}
bool columnExists(const String & name) const { return block.has(name); }
void insertUInt8Column(const ColumnPtr & column, const String & name)
{
block.insert({column, std::make_shared<DataTypeUInt8>(), name});
@ -230,16 +206,9 @@ namespace
block.insert({column, std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()), name});
}
void insertPartitionValueColumn(
size_t rows, const Row & partition_value, const DataTypePtr & partition_value_type, const String & name)
void insertPartitionValueColumn(const DataTypePtr & partition_value_type, const String & name)
{
ColumnPtr column;
if (rows)
column = partition_value_type->createColumnConst(rows, Tuple(partition_value.begin(), partition_value.end()))
->convertToFullColumnIfConst();
else
column = partition_value_type->createColumn();
ColumnPtr column = partition_value_type->createColumn();
block.insert({column, partition_value_type, name});
}
@ -247,154 +216,55 @@ namespace
};
}
/// Adds virtual columns that are not const for all rows
static void injectNonConstVirtualColumns(
size_t rows,
Block & block,
const Names & virtual_columns,
MergeTreeReadTask * task)
void MergeTreeSelectProcessor::injectVirtualColumns(Block & block, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
VirtualColumnsInserter inserter(block);
/// add virtual columns
/// Except _sample_factor, which is added from the outside.
for (const auto & virtual_column_name : virtual_columns)
{
if (virtual_column_name == "_part_offset")
{
if (!rows)
{
inserter.insertUInt64Column(DataTypeUInt64().createColumn(), virtual_column_name);
}
else
{
if (!inserter.columnExists(virtual_column_name))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Column {} must have been filled part reader",
virtual_column_name);
}
inserter.insertUInt64Column(DataTypeUInt64().createColumn(), virtual_column_name);
}
if (virtual_column_name == LightweightDeleteDescription::FILTER_COLUMN.name)
else if (virtual_column_name == LightweightDeleteDescription::FILTER_COLUMN.name)
{
/// If _row_exists column isn't present in the part then fill it here with 1s
ColumnPtr column;
if (rows)
column = LightweightDeleteDescription::FILTER_COLUMN.type->createColumnConst(rows, 1)->convertToFullColumnIfConst();
else
column = LightweightDeleteDescription::FILTER_COLUMN.type->createColumn();
inserter.insertUInt8Column(column, virtual_column_name);
ColumnPtr column = LightweightDeleteDescription::FILTER_COLUMN.type->createColumn();
inserter.insertUInt8Column(column, virtual_column_name);
}
if (virtual_column_name == BlockNumberColumn::name)
else if (virtual_column_name == BlockNumberColumn::name)
{
ColumnPtr column;
if (rows)
{
size_t value = 0;
if (task)
{
value = task->getInfo().data_part ? task->getInfo().data_part->info.min_block : 0;
}
column = BlockNumberColumn::type->createColumnConst(rows, value)->convertToFullColumnIfConst();
}
else
column = BlockNumberColumn::type->createColumn();
ColumnPtr column = BlockNumberColumn::type->createColumn();
inserter.insertUInt64Column(column, virtual_column_name);
}
}
}
/// Adds virtual columns that are const for the whole part
static void injectPartConstVirtualColumns(
size_t rows,
Block & block,
MergeTreeReadTask * task,
const DataTypePtr & partition_value_type,
const Names & virtual_columns)
{
VirtualColumnsInserter inserter(block);
/// add virtual columns
/// Except _sample_factor, which is added from the outside.
if (!virtual_columns.empty())
{
if (unlikely(rows && !task))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot insert virtual columns to non-empty chunk without specified task.");
const IMergeTreeDataPart * part = nullptr;
if (rows)
else if (virtual_column_name == "_part")
{
part = task->getInfo().data_part.get();
if (part->isProjectionPart())
part = part->getParentPart();
ColumnPtr column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn();
inserter.insertLowCardinalityColumn(column, virtual_column_name);
}
for (const auto & virtual_column_name : virtual_columns)
else if (virtual_column_name == "_part_index")
{
if (virtual_column_name == "_part")
{
ColumnPtr column;
if (rows)
column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}
.createColumnConst(rows, part->name)
->convertToFullColumnIfConst();
else
column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn();
inserter.insertLowCardinalityColumn(column, virtual_column_name);
}
else if (virtual_column_name == "_part_index")
{
ColumnPtr column;
if (rows)
column = DataTypeUInt64().createColumnConst(rows, task->getInfo().part_index_in_query)->convertToFullColumnIfConst();
else
column = DataTypeUInt64().createColumn();
inserter.insertUInt64Column(column, virtual_column_name);
}
else if (virtual_column_name == "_part_uuid")
{
ColumnPtr column;
if (rows)
column = DataTypeUUID().createColumnConst(rows, part->uuid)->convertToFullColumnIfConst();
else
column = DataTypeUUID().createColumn();
inserter.insertUUIDColumn(column, virtual_column_name);
}
else if (virtual_column_name == "_partition_id")
{
ColumnPtr column;
if (rows)
column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}
.createColumnConst(rows, part->info.partition_id)
->convertToFullColumnIfConst();
else
column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn();
inserter.insertLowCardinalityColumn(column, virtual_column_name);
}
else if (virtual_column_name == "_partition_value")
{
if (rows)
inserter.insertPartitionValueColumn(rows, part->partition.value, partition_value_type, virtual_column_name);
else
inserter.insertPartitionValueColumn(rows, {}, partition_value_type, virtual_column_name);
}
ColumnPtr column = DataTypeUInt64().createColumn();
inserter.insertUInt64Column(column, virtual_column_name);
}
else if (virtual_column_name == "_part_uuid")
{
ColumnPtr column = DataTypeUUID().createColumn();
inserter.insertUUIDColumn(column, virtual_column_name);
}
else if (virtual_column_name == "_partition_id")
{
ColumnPtr column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn();
inserter.insertLowCardinalityColumn(column, virtual_column_name);
}
else if (virtual_column_name == "_partition_value")
{
inserter.insertPartitionValueColumn(partition_value_type, virtual_column_name);
}
}
}
void MergeTreeSelectProcessor::injectVirtualColumns(
Block & block, size_t row_count, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
/// First add non-const columns that are filled by the range reader and then const columns that we will fill ourselves.
/// Note that the order is important: virtual columns filled by the range reader must go first
injectNonConstVirtualColumns(row_count, block, virtual_columns,task);
injectPartConstVirtualColumns(row_count, block, task, partition_value_type, virtual_columns);
}
Block MergeTreeSelectProcessor::applyPrewhereActions(Block block, const PrewhereInfoPtr & prewhere_info)
{
if (prewhere_info)
@ -449,7 +319,7 @@ Block MergeTreeSelectProcessor::applyPrewhereActions(Block block, const Prewhere
Block MergeTreeSelectProcessor::transformHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
injectVirtualColumns(block, 0, nullptr, partition_value_type, virtual_columns);
injectVirtualColumns(block, partition_value_type, virtual_columns);
auto transformed = applyPrewhereActions(std::move(block), prewhere_info);
return transformed;
}

View File

@ -82,7 +82,7 @@ private:
};
/// Used for filling header with no rows as well as block with data
static void injectVirtualColumns(Block & block, size_t row_count, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns);
static void injectVirtualColumns(Block & block, const DataTypePtr & partition_value_type, const Names & virtual_columns);
static Block applyPrewhereActions(Block block, const PrewhereInfoPtr & prewhere_info);
/// Sets up range readers corresponding to data readers
@ -104,10 +104,6 @@ private:
MergeTreeReadTaskPtr task;
/// This step is added when the part has lightweight delete mask
PrewhereExprStepPtr lightweight_delete_filter_step;
/// These columns will be filled by the merge tree range reader
Names non_const_virtual_column_names;
/// This header is used for chunks from readFromPart().
Block header_without_const_virtual_columns;
/// A result of getHeader(). A chunk which this header is returned from read().
Block result_header;

View File

@ -181,9 +181,16 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
mark_ranges.emplace(MarkRanges{MarkRange(0, data_part->getMarksCount())});
reader = data_part->getReader(
columns_for_reader, storage_snapshot,
*mark_ranges, /* uncompressed_cache = */ nullptr,
mark_cache.get(), alter_conversions, reader_settings, {}, {});
columns_for_reader,
storage_snapshot,
*mark_ranges,
/* read_task_info = */ nullptr,
/* uncompressed_cache = */ nullptr,
mark_cache.get(),
alter_conversions,
reader_settings,
{},
{});
}
Chunk MergeTreeSequentialSource::generate()

View File

@ -50,6 +50,8 @@ SOME GRANULES FILTERED OUT
100002 foo
PREWHERE
301408 164953047376 164953047376
335872 166463369216 166463369216
301407 164952947376 164952947376
42
10042
20042

View File

@ -52,7 +52,7 @@ SELECT _part_offset, foo FROM t_1 where granule == 0 AND _part_offset >= 100000
SELECT 'PREWHERE';
SELECT count(*), sum(_part_offset), sum(order_0) from t_1 prewhere granule == 0 where _part_offset >= 100000;
SELECT count(*), sum(_part_offset), sum(order_0) from t_1 prewhere _part != '' where granule == 0; -- { serverError 10, 16 }
SELECT count(*), sum(_part_offset), sum(order_0) from t_1 prewhere _part_offset > 100000 where granule == 0; -- { serverError 10, 16 }
SELECT count(*), sum(_part_offset), sum(order_0) from t_1 prewhere _part != '' where granule == 0;
SELECT count(*), sum(_part_offset), sum(order_0) from t_1 prewhere _part_offset > 100000 where granule == 0;
SELECT _part_offset FROM t_1 PREWHERE order_0 % 10000 == 42 ORDER BY order_0 LIMIT 3;
SELECT _part_offset, foo FROM t_1 PREWHERE order_0 % 10000 == 42 ORDER BY order_0 LIMIT 3;

View File

@ -0,0 +1 @@
0 0 0

View File

@ -0,0 +1,10 @@
drop table if exists x;
create table x (i int, j int, k int) engine MergeTree order by tuple() settings index_granularity=8192, index_granularity_bytes = '10Mi', min_bytes_for_wide_part=0, min_rows_for_wide_part=0, ratio_of_defaults_for_sparse_serialization=1;
insert into x select number, number * 2, number * 3 from numbers(100000);
-- One granule, (_part_offset (8 bytes) + <one minimal physical column> (4 bytes)) * 8192 + <other two physical columns>(8 bytes) * 1 = 98312
select * from x prewhere _part_offset = 0 settings max_bytes_to_read = 98312;
drop table x;