mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-25 03:00:49 +00:00
refactor MergeTree select streams
This commit is contained in:
parent
2797c16930
commit
1bec5a8241
@ -36,8 +36,6 @@ public:
|
||||
protected:
|
||||
Block readImpl() final;
|
||||
|
||||
Block readFromPartImpl();
|
||||
|
||||
/// Creates new this->task, and initilizes readers
|
||||
virtual bool getNewTask() = 0;
|
||||
|
||||
@ -46,6 +44,8 @@ protected:
|
||||
|
||||
virtual Block readFromPart();
|
||||
|
||||
Block readFromPartImpl();
|
||||
|
||||
void injectVirtualColumns(Block & block) const;
|
||||
|
||||
void initializeRangeReaders(MergeTreeReadTask & task);
|
||||
|
@ -193,4 +193,64 @@ void MergeTreeBlockSizePredictor::update(const Block & block, double decay)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
MergeTreeReadTaskColumns getReadTaskColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & data_part,
|
||||
const Names & required_columns, const PrewhereInfoPtr & prewhere_info, bool check_columns)
|
||||
{
|
||||
Names column_names = required_columns;
|
||||
Names pre_column_names;
|
||||
|
||||
/// inject columns required for defaults evaluation
|
||||
bool should_reorder = !injectRequiredColumns(storage, data_part, column_names).empty();
|
||||
|
||||
if (prewhere_info)
|
||||
{
|
||||
if (prewhere_info->alias_actions)
|
||||
pre_column_names = prewhere_info->alias_actions->getRequiredColumns();
|
||||
else
|
||||
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
|
||||
|
||||
if (pre_column_names.empty())
|
||||
pre_column_names.push_back(column_names[0]);
|
||||
|
||||
const auto injected_pre_columns = injectRequiredColumns(storage, data_part, pre_column_names);
|
||||
if (!injected_pre_columns.empty())
|
||||
should_reorder = true;
|
||||
|
||||
const NameSet pre_name_set(pre_column_names.begin(), pre_column_names.end());
|
||||
|
||||
Names post_column_names;
|
||||
for (const auto & name : column_names)
|
||||
if (!pre_name_set.count(name))
|
||||
post_column_names.push_back(name);
|
||||
|
||||
column_names = post_column_names;
|
||||
}
|
||||
|
||||
MergeTreeReadTaskColumns result;
|
||||
|
||||
if (check_columns)
|
||||
{
|
||||
/// Under owned_data_part->columns_lock we check that all requested columns are of the same type as in the table.
|
||||
/// This may be not true in case of ALTER MODIFY.
|
||||
if (!pre_column_names.empty())
|
||||
storage.check(data_part->columns, pre_column_names);
|
||||
if (!column_names.empty())
|
||||
storage.check(data_part->columns, column_names);
|
||||
|
||||
const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
|
||||
result.pre_columns = physical_columns.addTypes(pre_column_names);
|
||||
result.columns = physical_columns.addTypes(column_names);
|
||||
}
|
||||
else
|
||||
{
|
||||
result.pre_columns = data_part->columns.addTypes(pre_column_names);
|
||||
result.columns = data_part->columns.addTypes(column_names);
|
||||
}
|
||||
|
||||
result.should_reorder = should_reorder;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -64,6 +64,18 @@ struct MergeTreeReadTask
|
||||
virtual ~MergeTreeReadTask();
|
||||
};
|
||||
|
||||
struct MergeTreeReadTaskColumns
|
||||
{
|
||||
/// column names to read during WHERE
|
||||
NamesAndTypesList columns;
|
||||
/// column names to read during PREWHERE
|
||||
NamesAndTypesList pre_columns;
|
||||
/// resulting block may require reordering in accordance with `ordered_names`
|
||||
bool should_reorder;
|
||||
};
|
||||
|
||||
MergeTreeReadTaskColumns getReadTaskColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & data_part,
|
||||
const Names & required_columns, const PrewhereInfoPtr & prewhere_info, bool check_columns);
|
||||
|
||||
struct MergeTreeBlockSizePredictor
|
||||
{
|
||||
|
@ -219,65 +219,16 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
|
||||
|
||||
per_part_columns_lock.emplace_back(part.data_part->columns_lock);
|
||||
|
||||
/// inject column names required for DEFAULT evaluation in current part
|
||||
auto required_column_names = column_names;
|
||||
auto [columns, pre_columns, should_reorder] =
|
||||
getReadTaskColumns(data, part.data_part, column_names, prewhere_info, check_columns);
|
||||
|
||||
const auto injected_columns = injectRequiredColumns(data, part.data_part, required_column_names);
|
||||
auto should_reoder = !injected_columns.empty();
|
||||
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
|
||||
const auto & column_names = columns.getNames();
|
||||
per_part_column_name_set.emplace_back(column_names.begin(), column_names.end());
|
||||
|
||||
Names required_pre_column_names;
|
||||
|
||||
if (prewhere_info)
|
||||
{
|
||||
/// collect columns required for PREWHERE evaluation
|
||||
if (prewhere_info->alias_actions)
|
||||
required_pre_column_names = prewhere_info->alias_actions->getRequiredColumns();
|
||||
else
|
||||
required_pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
|
||||
|
||||
/// there must be at least one column required for PREWHERE
|
||||
if (required_pre_column_names.empty())
|
||||
required_pre_column_names.push_back(required_column_names[0]);
|
||||
|
||||
/// PREWHERE columns may require some additional columns for DEFAULT evaluation
|
||||
const auto injected_pre_columns = injectRequiredColumns(data, part.data_part, required_pre_column_names);
|
||||
if (!injected_pre_columns.empty())
|
||||
should_reoder = true;
|
||||
|
||||
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
|
||||
const NameSet pre_name_set(required_pre_column_names.begin(), required_pre_column_names.end());
|
||||
|
||||
Names post_column_names;
|
||||
for (const auto & name : required_column_names)
|
||||
if (!pre_name_set.count(name))
|
||||
post_column_names.push_back(name);
|
||||
|
||||
required_column_names = post_column_names;
|
||||
}
|
||||
|
||||
per_part_column_name_set.emplace_back(std::begin(required_column_names), std::end(required_column_names));
|
||||
|
||||
if (check_columns)
|
||||
{
|
||||
/** Under part->columns_lock check that all requested columns in part are of same type that in table.
|
||||
* This could be violated during ALTER MODIFY.
|
||||
*/
|
||||
if (!required_pre_column_names.empty())
|
||||
data.check(part.data_part->columns, required_pre_column_names);
|
||||
if (!required_column_names.empty())
|
||||
data.check(part.data_part->columns, required_column_names);
|
||||
|
||||
const NamesAndTypesList & physical_columns = data.getColumns().getAllPhysical();
|
||||
per_part_pre_columns.push_back(physical_columns.addTypes(required_pre_column_names));
|
||||
per_part_columns.push_back(physical_columns.addTypes(required_column_names));
|
||||
}
|
||||
else
|
||||
{
|
||||
per_part_pre_columns.push_back(part.data_part->columns.addTypes(required_pre_column_names));
|
||||
per_part_columns.push_back(part.data_part->columns.addTypes(required_column_names));
|
||||
}
|
||||
|
||||
per_part_should_reorder.push_back(should_reoder);
|
||||
per_part_pre_columns.push_back(std::move(pre_columns));
|
||||
per_part_columns.push_back(std::move(columns));
|
||||
per_part_should_reorder.push_back(should_reorder);
|
||||
|
||||
parts_with_idx.push_back({ part.data_part, part.part_index_in_query });
|
||||
|
||||
|
@ -19,7 +19,7 @@ MergeTreeReverseSelectBlockInputStream::MergeTreeReverseSelectBlockInputStream(
|
||||
UInt64 max_block_size_rows_,
|
||||
size_t preferred_block_size_bytes_,
|
||||
size_t preferred_max_column_in_block_size_bytes_,
|
||||
Names column_names,
|
||||
Names required_columns_,
|
||||
const MarkRanges & mark_ranges_,
|
||||
bool use_uncompressed_cache_,
|
||||
const PrewhereInfoPtr & prewhere_info_,
|
||||
@ -34,7 +34,7 @@ MergeTreeReverseSelectBlockInputStream::MergeTreeReverseSelectBlockInputStream(
|
||||
MergeTreeBaseSelectBlockInputStream{storage_, prewhere_info_, max_block_size_rows_,
|
||||
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_,
|
||||
max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names_},
|
||||
required_columns{column_names},
|
||||
required_columns{required_columns_},
|
||||
data_part{owned_data_part_},
|
||||
part_columns_lock(data_part->columns_lock),
|
||||
all_mark_ranges(mark_ranges_),
|
||||
@ -78,56 +78,11 @@ MergeTreeReverseSelectBlockInputStream::MergeTreeReverseSelectBlockInputStream(
|
||||
|
||||
ordered_names = getHeader().getNames();
|
||||
|
||||
Names pre_column_names;
|
||||
|
||||
/// inject columns required for defaults evaluation
|
||||
should_reorder = !injectRequiredColumns(storage, data_part, required_columns).empty();
|
||||
|
||||
if (prewhere_info)
|
||||
{
|
||||
if (prewhere_info->alias_actions)
|
||||
pre_column_names = prewhere_info->alias_actions->getRequiredColumns();
|
||||
else
|
||||
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
|
||||
|
||||
if (pre_column_names.empty())
|
||||
pre_column_names.push_back(required_columns[0]);
|
||||
|
||||
const auto injected_pre_columns = injectRequiredColumns(storage, data_part, pre_column_names);
|
||||
if (!injected_pre_columns.empty())
|
||||
should_reorder = true;
|
||||
|
||||
const NameSet pre_name_set(pre_column_names.begin(), pre_column_names.end());
|
||||
|
||||
Names post_column_names;
|
||||
for (const auto & name : required_columns)
|
||||
if (!pre_name_set.count(name))
|
||||
post_column_names.push_back(name);
|
||||
|
||||
required_columns = post_column_names;
|
||||
}
|
||||
task_columns = getReadTaskColumns(storage, data_part, required_columns, prewhere_info, check_columns);
|
||||
|
||||
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
|
||||
column_name_set = NameSet{required_columns.begin(), required_columns.end()};
|
||||
|
||||
if (check_columns)
|
||||
{
|
||||
/// Under owned_data_part->columns_lock we check that all requested columns are of the same type as in the table.
|
||||
/// This may be not true in case of ALTER MODIFY.
|
||||
if (!pre_column_names.empty())
|
||||
storage.check(data_part->columns, pre_column_names);
|
||||
if (!required_columns.empty())
|
||||
storage.check(data_part->columns, required_columns);
|
||||
|
||||
const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
|
||||
pre_columns = physical_columns.addTypes(pre_column_names);
|
||||
columns = physical_columns.addTypes(column_names);
|
||||
}
|
||||
else
|
||||
{
|
||||
pre_columns = data_part->columns.addTypes(pre_column_names);
|
||||
columns = data_part->columns.addTypes(required_columns);
|
||||
}
|
||||
const auto & column_names = task_columns.columns.getNames();
|
||||
column_name_set = NameSet{column_names.begin(), column_names.end()};
|
||||
|
||||
if (use_uncompressed_cache)
|
||||
owned_uncompressed_cache = storage.global_context.getUncompressedCache();
|
||||
@ -135,13 +90,13 @@ MergeTreeReverseSelectBlockInputStream::MergeTreeReverseSelectBlockInputStream(
|
||||
owned_mark_cache = storage.global_context.getMarkCache();
|
||||
|
||||
reader = std::make_unique<MergeTreeReader>(
|
||||
path, data_part, columns, owned_uncompressed_cache.get(),
|
||||
path, data_part, task_columns.columns, owned_uncompressed_cache.get(),
|
||||
owned_mark_cache.get(), save_marks_in_cache, storage,
|
||||
all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
|
||||
|
||||
if (prewhere_info)
|
||||
pre_reader = std::make_unique<MergeTreeReader>(
|
||||
path, data_part, pre_columns, owned_uncompressed_cache.get(),
|
||||
path, data_part, task_columns.pre_columns, owned_uncompressed_cache.get(),
|
||||
owned_mark_cache.get(), save_marks_in_cache, storage,
|
||||
all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
|
||||
}
|
||||
@ -176,8 +131,9 @@ try
|
||||
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, data_part->storage.getSampleBlock());
|
||||
|
||||
task = std::make_unique<MergeTreeReadTask>(
|
||||
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set, columns, pre_columns,
|
||||
prewhere_info && prewhere_info->remove_prewhere_column, should_reorder, std::move(size_predictor));
|
||||
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set,
|
||||
task_columns.columns, task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
|
||||
task_columns.should_reorder, std::move(size_predictor));
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -230,8 +186,6 @@ void MergeTreeReverseSelectBlockInputStream::finish()
|
||||
data_part.reset();
|
||||
}
|
||||
|
||||
|
||||
MergeTreeReverseSelectBlockInputStream::~MergeTreeReverseSelectBlockInputStream() = default;
|
||||
|
||||
|
||||
}
|
||||
|
@ -56,8 +56,8 @@ private:
|
||||
/// Names from header. Used in order to order columns in read blocks.
|
||||
Names ordered_names;
|
||||
NameSet column_name_set;
|
||||
NamesAndTypesList columns;
|
||||
NamesAndTypesList pre_columns;
|
||||
|
||||
MergeTreeReadTaskColumns task_columns;
|
||||
|
||||
/// Data part will not be removed if the pointer owns it
|
||||
MergeTreeData::DataPartPtr data_part;
|
||||
@ -73,8 +73,6 @@ private:
|
||||
|
||||
String path;
|
||||
|
||||
bool should_reorder = false;
|
||||
|
||||
Blocks blocks;
|
||||
|
||||
Logger * log = &Logger::get("MergeTreeReverseSelectBlockInputStream");
|
||||
|
@ -19,7 +19,7 @@ MergeTreeSelectBlockInputStream::MergeTreeSelectBlockInputStream(
|
||||
UInt64 max_block_size_rows_,
|
||||
size_t preferred_block_size_bytes_,
|
||||
size_t preferred_max_column_in_block_size_bytes_,
|
||||
Names column_names,
|
||||
Names required_columns_,
|
||||
const MarkRanges & mark_ranges_,
|
||||
bool use_uncompressed_cache_,
|
||||
const PrewhereInfoPtr & prewhere_info_,
|
||||
@ -34,7 +34,7 @@ MergeTreeSelectBlockInputStream::MergeTreeSelectBlockInputStream(
|
||||
MergeTreeBaseSelectBlockInputStream{storage_, prewhere_info_, max_block_size_rows_,
|
||||
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_,
|
||||
max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names_},
|
||||
required_columns{column_names},
|
||||
required_columns{required_columns_},
|
||||
data_part{owned_data_part_},
|
||||
part_columns_lock(data_part->columns_lock),
|
||||
all_mark_ranges(mark_ranges_),
|
||||
@ -99,57 +99,7 @@ try
|
||||
}
|
||||
is_first_task = false;
|
||||
|
||||
Names pre_column_names;
|
||||
Names column_names = required_columns;
|
||||
|
||||
/// inject columns required for defaults evaluation
|
||||
bool should_reorder = !injectRequiredColumns(storage, data_part, column_names).empty();
|
||||
|
||||
if (prewhere_info)
|
||||
{
|
||||
if (prewhere_info->alias_actions)
|
||||
pre_column_names = prewhere_info->alias_actions->getRequiredColumns();
|
||||
else
|
||||
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
|
||||
|
||||
if (pre_column_names.empty())
|
||||
pre_column_names.push_back(column_names[0]);
|
||||
|
||||
const auto injected_pre_columns = injectRequiredColumns(storage, data_part, pre_column_names);
|
||||
if (!injected_pre_columns.empty())
|
||||
should_reorder = true;
|
||||
|
||||
const NameSet pre_name_set(pre_column_names.begin(), pre_column_names.end());
|
||||
|
||||
Names post_column_names;
|
||||
for (const auto & name : column_names)
|
||||
if (!pre_name_set.count(name))
|
||||
post_column_names.push_back(name);
|
||||
|
||||
column_names = post_column_names;
|
||||
}
|
||||
|
||||
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
|
||||
column_name_set = NameSet{column_names.begin(), column_names.end()};
|
||||
|
||||
if (check_columns)
|
||||
{
|
||||
/// Under owned_data_part->columns_lock we check that all requested columns are of the same type as in the table.
|
||||
/// This may be not true in case of ALTER MODIFY.
|
||||
if (!pre_column_names.empty())
|
||||
storage.check(data_part->columns, pre_column_names);
|
||||
if (!column_names.empty())
|
||||
storage.check(data_part->columns, column_names);
|
||||
|
||||
const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
|
||||
pre_columns = physical_columns.addTypes(pre_column_names);
|
||||
columns = physical_columns.addTypes(column_names);
|
||||
}
|
||||
else
|
||||
{
|
||||
pre_columns = data_part->columns.addTypes(pre_column_names);
|
||||
columns = data_part->columns.addTypes(column_names);
|
||||
}
|
||||
task_columns = getReadTaskColumns(storage, data_part, required_columns, prewhere_info, check_columns);
|
||||
|
||||
/** @note you could simply swap `reverse` in if and else branches of MergeTreeDataSelectExecutor,
|
||||
* and remove this reverse. */
|
||||
@ -160,9 +110,14 @@ try
|
||||
? nullptr
|
||||
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, data_part->storage.getSampleBlock());
|
||||
|
||||
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
|
||||
const auto & column_names = task_columns.columns.getNames();
|
||||
column_name_set = NameSet{column_names.begin(), column_names.end()};
|
||||
|
||||
task = std::make_unique<MergeTreeReadTask>(
|
||||
data_part, remaining_mark_ranges, part_index_in_query, ordered_names, column_name_set, columns, pre_columns,
|
||||
prewhere_info && prewhere_info->remove_prewhere_column, should_reorder, std::move(size_predictor));
|
||||
data_part, remaining_mark_ranges, part_index_in_query, ordered_names, column_name_set, task_columns.columns,
|
||||
task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
|
||||
task_columns.should_reorder, std::move(size_predictor));
|
||||
|
||||
if (!reader)
|
||||
{
|
||||
@ -172,13 +127,13 @@ try
|
||||
owned_mark_cache = storage.global_context.getMarkCache();
|
||||
|
||||
reader = std::make_unique<MergeTreeReader>(
|
||||
path, data_part, columns, owned_uncompressed_cache.get(),
|
||||
path, data_part, task_columns.columns, owned_uncompressed_cache.get(),
|
||||
owned_mark_cache.get(), save_marks_in_cache, storage,
|
||||
all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
|
||||
|
||||
if (prewhere_info)
|
||||
pre_reader = std::make_unique<MergeTreeReader>(
|
||||
path, data_part, pre_columns, owned_uncompressed_cache.get(),
|
||||
path, data_part, task_columns.pre_columns, owned_uncompressed_cache.get(),
|
||||
owned_mark_cache.get(), save_marks_in_cache, storage,
|
||||
all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
|
||||
}
|
||||
|
@ -55,8 +55,8 @@ private:
|
||||
/// Names from header. Used in order to order columns in read blocks.
|
||||
Names ordered_names;
|
||||
NameSet column_name_set;
|
||||
NamesAndTypesList columns;
|
||||
NamesAndTypesList pre_columns;
|
||||
|
||||
MergeTreeReadTaskColumns task_columns;
|
||||
|
||||
/// Data part will not be removed if the pointer owns it
|
||||
MergeTreeData::DataPartPtr data_part;
|
||||
|
Loading…
Reference in New Issue
Block a user