Pass and handle a chain of multiple prewhere infos

This commit is contained in:
Denis Glazachev 2021-01-25 18:31:59 +04:00
parent ab6343ff1f
commit 4f6c880232
19 changed files with 247 additions and 174 deletions

View File

@ -1186,36 +1186,40 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(QueryPlan & query_plan, c
{
Pipe pipe(std::make_shared<NullSource>(source_header));
if (query_info.prewhere_info)
if (query_info.prewhere_info_list)
{
if (query_info.prewhere_info->alias_actions)
for (const auto & prewhere_info : *query_info.prewhere_info_list)
{
if (prewhere_info.alias_actions)
{
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(
header, prewhere_info.alias_actions);
});
}
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, query_info.prewhere_info->alias_actions);
return std::make_shared<FilterTransform>(
header,
prewhere_info.prewhere_actions,
prewhere_info.prewhere_column_name,
prewhere_info.remove_prewhere_column);
});
}
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(
header,
query_info.prewhere_info->prewhere_actions,
query_info.prewhere_info->prewhere_column_name,
query_info.prewhere_info->remove_prewhere_column);
});
// To remove additional columns
// In some cases, we did not read any marks so that the pipeline.streams is empty
// Thus, some columns in prewhere are not removed as expected
// This leads to mismatched header in distributed table
if (query_info.prewhere_info->remove_columns_actions)
{
pipe.addSimpleTransform([&](const Block & header)
// To remove additional columns
// In some cases, we did not read any marks so that the pipeline.streams is empty
// Thus, some columns in prewhere are not removed as expected
// This leads to mismatched header in distributed table
if (prewhere_info.remove_columns_actions)
{
return std::make_shared<ExpressionTransform>(
header, query_info.prewhere_info->remove_columns_actions);
});
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(
header, prewhere_info.remove_columns_actions);
});
}
}
}
@ -1552,17 +1556,23 @@ void InterpreterSelectQuery::executeFetchColumns(
if (prewhere_info)
{
query_info.prewhere_info = std::make_shared<PrewhereInfo>(
std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions),
prewhere_info->prewhere_column_name);
if (!query_info.prewhere_info_list)
query_info.prewhere_info_list = std::make_shared<PrewhereInfoList>();
query_info.prewhere_info_list->emplace_back(
std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions),
prewhere_info->prewhere_column_name);
auto & new_prewhere_info = query_info.prewhere_info_list->back();
if (prewhere_info->alias_actions)
query_info.prewhere_info->alias_actions = std::make_shared<ExpressionActions>(prewhere_info->alias_actions);
if (prewhere_info->remove_columns_actions)
query_info.prewhere_info->remove_columns_actions = std::make_shared<ExpressionActions>(prewhere_info->remove_columns_actions);
new_prewhere_info.alias_actions = std::make_shared<ExpressionActions>(prewhere_info->alias_actions);
query_info.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column;
query_info.prewhere_info->need_filter = prewhere_info->need_filter;
if (prewhere_info->remove_columns_actions)
new_prewhere_info.remove_columns_actions = std::make_shared<ExpressionActions>(prewhere_info->remove_columns_actions);
new_prewhere_info.remove_prewhere_column = prewhere_info->remove_prewhere_column;
new_prewhere_info.need_filter = prewhere_info->need_filter;
}
/// Create optimizer with prepared actions.

View File

@ -42,11 +42,14 @@ Block getHeaderForProcessingStage(
case QueryProcessingStage::FetchColumns:
{
Block header = metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID());
if (query_info.prewhere_info)
if (query_info.prewhere_info_list)
{
query_info.prewhere_info->prewhere_actions->execute(header);
if (query_info.prewhere_info->remove_prewhere_column)
header.erase(query_info.prewhere_info->prewhere_column_name);
for (const auto & prewhere_info : *query_info.prewhere_info_list)
{
prewhere_info.prewhere_actions->execute(header);
if (prewhere_info.remove_prewhere_column)
header.erase(prewhere_info.prewhere_column_name);
}
}
return header;
}

View File

@ -22,17 +22,17 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
Block header,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const PrewhereInfoListPtr & prewhere_info_list_,
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
UInt64 preferred_max_column_in_block_size_bytes_,
const MergeTreeReaderSettings & reader_settings_,
bool use_uncompressed_cache_,
const Names & virt_column_names_)
: SourceWithProgress(getHeader(std::move(header), prewhere_info_, virt_column_names_))
: SourceWithProgress(getHeader(std::move(header), prewhere_info_list_, virt_column_names_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, prewhere_info(prewhere_info_)
, prewhere_info_list(prewhere_info_list_)
, max_block_size_rows(max_block_size_rows_)
, preferred_block_size_bytes(preferred_block_size_bytes_)
, preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes_)
@ -70,18 +70,18 @@ Chunk MergeTreeBaseSelectProcessor::generate()
void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & current_task)
{
if (prewhere_info)
if (prewhere_info_list)
{
if (reader->getColumns().empty())
{
current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, true);
current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info_list, true);
}
else
{
MergeTreeRangeReader * pre_reader_ptr = nullptr;
if (pre_reader != nullptr)
{
current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, false);
current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info_list, false);
pre_reader_ptr = &current_task.pre_range_reader;
}
@ -309,34 +309,37 @@ void MergeTreeBaseSelectProcessor::injectVirtualColumns(Chunk & chunk, MergeTree
chunk.setColumns(columns, num_rows);
}
void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info)
void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const PrewhereInfoListPtr & prewhere_info_list)
{
if (prewhere_info)
{
if (prewhere_info->alias_actions)
prewhere_info->alias_actions->execute(block);
if (!prewhere_info_list)
return;
prewhere_info->prewhere_actions->execute(block);
auto & prewhere_column = block.getByName(prewhere_info->prewhere_column_name);
for (const auto & prewhere_info : *prewhere_info_list)
{
if (prewhere_info.alias_actions)
prewhere_info.alias_actions->execute(block);
prewhere_info.prewhere_actions->execute(block);
auto & prewhere_column = block.getByName(prewhere_info.prewhere_column_name);
if (!prewhere_column.type->canBeUsedInBooleanContext())
throw Exception("Invalid type for filter in PREWHERE: " + prewhere_column.type->getName(),
ErrorCodes::LOGICAL_ERROR);
ErrorCodes::LOGICAL_ERROR);
if (prewhere_info->remove_prewhere_column)
block.erase(prewhere_info->prewhere_column_name);
if (prewhere_info.remove_prewhere_column)
block.erase(prewhere_info.prewhere_column_name);
else
{
auto & ctn = block.getByName(prewhere_info->prewhere_column_name);
auto & ctn = block.getByName(prewhere_info.prewhere_column_name);
ctn.column = ctn.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst();
}
}
}
Block MergeTreeBaseSelectProcessor::getHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns)
Block block, const PrewhereInfoListPtr & prewhere_info_list, const Names & virtual_columns)
{
executePrewhereActions(block, prewhere_info);
executePrewhereActions(block, prewhere_info_list);
injectVirtualColumns(block, nullptr, virtual_columns);
return block;
}

View File

@ -23,7 +23,7 @@ public:
Block header,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const PrewhereInfoListPtr & prewhere_info_list_,
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
UInt64 preferred_max_column_in_block_size_bytes_,
@ -33,7 +33,7 @@ public:
~MergeTreeBaseSelectProcessor() override;
static void executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info);
static void executePrewhereActions(Block & block, const PrewhereInfoListPtr & prewhere_info_list);
protected:
Chunk generate() final;
@ -49,7 +49,7 @@ protected:
static void injectVirtualColumns(Block & block, MergeTreeReadTask * task, const Names & virtual_columns);
static void injectVirtualColumns(Chunk & chunk, MergeTreeReadTask * task, const Names & virtual_columns);
static Block getHeader(Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns);
static Block getHeader(Block block, const PrewhereInfoListPtr & prewhere_info_list, const Names & virtual_columns);
void initializeRangeReaders(MergeTreeReadTask & task);
@ -57,7 +57,7 @@ protected:
const MergeTreeData & storage;
StorageMetadataPtr metadata_snapshot;
PrewhereInfoPtr prewhere_info;
PrewhereInfoListPtr prewhere_info_list;
UInt64 max_block_size_rows;
UInt64 preferred_block_size_bytes;

View File

@ -118,11 +118,10 @@ NameSet injectRequiredColumns(const MergeTreeData & storage, const StorageMetada
MergeTreeReadTask::MergeTreeReadTask(
const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, const size_t part_index_in_query_,
const Names & ordered_names_, const NameSet & column_name_set_, const NamesAndTypesList & columns_,
const NamesAndTypesList & pre_columns_, const bool remove_prewhere_column_, const bool should_reorder_,
MergeTreeBlockSizePredictorPtr && size_predictor_)
const NamesAndTypesList & pre_columns_, const bool should_reorder_, MergeTreeBlockSizePredictorPtr && size_predictor_)
: data_part{data_part_}, mark_ranges{mark_ranges_}, part_index_in_query{part_index_in_query_},
ordered_names{ordered_names_}, column_name_set{column_name_set_}, columns{columns_}, pre_columns{pre_columns_},
remove_prewhere_column{remove_prewhere_column_}, should_reorder{should_reorder_}, size_predictor{std::move(size_predictor_)}
should_reorder{should_reorder_}, size_predictor{std::move(size_predictor_)}
{
}
@ -258,7 +257,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData::DataPartPtr & data_part,
const Names & required_columns,
const PrewhereInfoPtr & prewhere_info,
const PrewhereInfoListPtr & prewhere_info_list,
bool check_columns)
{
Names column_names = required_columns;
@ -267,12 +266,14 @@ MergeTreeReadTaskColumns getReadTaskColumns(
/// inject columns required for defaults evaluation
bool should_reorder = !injectRequiredColumns(storage, metadata_snapshot, data_part, column_names).empty();
if (prewhere_info)
if (prewhere_info_list)
{
if (prewhere_info->alias_actions)
pre_column_names = prewhere_info->alias_actions->getRequiredColumns();
else
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
for (const auto & prewhere_info : *prewhere_info_list)
{
const auto required_column_names = (prewhere_info.alias_actions ?
prewhere_info.alias_actions->getRequiredColumns() : prewhere_info.prewhere_actions->getRequiredColumns());
pre_column_names.insert(pre_column_names.end(), required_column_names.begin(), required_column_names.end());
}
if (pre_column_names.empty())
pre_column_names.push_back(column_names[0]);

View File

@ -42,8 +42,6 @@ struct MergeTreeReadTask
const NamesAndTypesList & columns;
/// column names to read during PREWHERE
const NamesAndTypesList & pre_columns;
/// should PREWHERE column be returned to requesting side?
const bool remove_prewhere_column;
/// resulting block may require reordering in accordance with `ordered_names`
const bool should_reorder;
/// Used to satistfy preferred_block_size_bytes limitation
@ -57,8 +55,7 @@ struct MergeTreeReadTask
MergeTreeReadTask(
const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, const size_t part_index_in_query_,
const Names & ordered_names_, const NameSet & column_name_set_, const NamesAndTypesList & columns_,
const NamesAndTypesList & pre_columns_, const bool remove_prewhere_column_, const bool should_reorder_,
MergeTreeBlockSizePredictorPtr && size_predictor_);
const NamesAndTypesList & pre_columns_, const bool should_reorder_, MergeTreeBlockSizePredictorPtr && size_predictor_);
virtual ~MergeTreeReadTask();
};
@ -78,7 +75,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData::DataPartPtr & data_part,
const Names & required_columns,
const PrewhereInfoPtr & prewhere_info,
const PrewhereInfoListPtr & prewhere_info_list,
bool check_columns);
struct MergeTreeBlockSizePredictor

View File

@ -833,14 +833,20 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
plan->addStep(std::move(adding_column));
}
if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions)
if (query_info.prewhere_info_list)
{
auto expression_step = std::make_unique<ExpressionStep>(
plan->getCurrentDataStream(),
query_info.prewhere_info->remove_columns_actions->getActionsDAG().clone());
for (const auto & prewhere_info : *query_info.prewhere_info_list)
{
if (prewhere_info.remove_columns_actions)
{
auto expression_step = std::make_unique<ExpressionStep>(
plan->getCurrentDataStream(),
prewhere_info.remove_columns_actions->getActionsDAG().clone());
expression_step->setStepDescription("Remove unused columns after PREWHERE");
plan->addStep(std::move(expression_step));
expression_step->setStepDescription("Remove unused columns after PREWHERE");
plan->addStep(std::move(expression_step));
}
}
}
return plan;
@ -948,7 +954,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
std::move(parts),
data,
metadata_snapshot,
query_info.prewhere_info,
query_info.prewhere_info_list,
true,
column_names,
MergeTreeReadPool::BackoffSettings(settings),
@ -964,7 +970,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
i, pool, min_marks_for_concurrent_read, max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
data, metadata_snapshot, use_uncompressed_cache,
query_info.prewhere_info, reader_settings, virt_columns);
query_info.prewhere_info_list, reader_settings, virt_columns);
if (i == 0)
{
@ -987,7 +993,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
auto source = std::make_shared<MergeTreeSelectProcessor>(
data, metadata_snapshot, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
query_info.prewhere_info, true, reader_settings, virt_columns, part.part_index_in_query);
query_info.prewhere_info_list, true, reader_settings, virt_columns, part.part_index_in_query);
res.emplace_back(std::move(source));
}
@ -1187,7 +1193,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
column_names,
ranges_to_get_from_part,
use_uncompressed_cache,
query_info.prewhere_info,
query_info.prewhere_info_list,
true,
reader_settings,
virt_columns,
@ -1205,7 +1211,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
column_names,
ranges_to_get_from_part,
use_uncompressed_cache,
query_info.prewhere_info,
query_info.prewhere_info_list,
true,
reader_settings,
virt_columns,
@ -1359,7 +1365,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
column_names,
part_it->ranges,
use_uncompressed_cache,
query_info.prewhere_info,
query_info.prewhere_info_list,
true,
reader_settings,
virt_columns,

View File

@ -443,32 +443,79 @@ size_t MergeTreeRangeReader::ReadResult::numZerosInTail(const UInt8 * begin, con
return count;
}
void MergeTreeRangeReader::ReadResult::setFilter(const ColumnPtr & new_filter)
void MergeTreeRangeReader::ReadResult::addFilter(const ColumnPtr & new_filter)
{
if (!new_filter && filter)
throw Exception("Can't replace existing filter with empty.", ErrorCodes::LOGICAL_ERROR);
if (filter)
{
size_t new_size = new_filter->size();
if (!new_filter)
throw Exception("Can't add an empty filter to the existing one.", ErrorCodes::LOGICAL_ERROR);
const auto new_size = new_filter->size();
if (new_size != total_rows_per_granule)
throw Exception("Can't set filter because it's size is " + toString(new_size) + " but "
throw Exception("Can't add the new filter because it's size is " + toString(new_size) + " but "
+ toString(total_rows_per_granule) + " rows was read.", ErrorCodes::LOGICAL_ERROR);
}
ConstantFilterDescription const_description(*new_filter);
if (const_description.always_true)
setFilterConstTrue();
{
if (!filter)
setFilterConstTrue();
}
else if (const_description.always_false)
{
clear();
}
else
{
FilterDescription filter_description(*new_filter);
filter_holder = filter_description.data_holder ? filter_description.data_holder : new_filter;
filter = typeid_cast<const ColumnUInt8 *>(filter_holder.get());
if (!filter)
throw Exception("setFilter function expected ColumnUInt8.", ErrorCodes::LOGICAL_ERROR);
FilterDescription description(*new_filter);
auto new_holder = (description.data_holder ? description.data_holder : new_filter);
auto * new_holder_cast = typeid_cast<const ColumnUInt8 *>(new_holder.get());
if (!new_holder_cast)
throw Exception("addFilter function expected ColumnUInt8.", ErrorCodes::LOGICAL_ERROR);
if (filter)
{
MutableColumnPtr new_mutable_holder = IColumn::mutate(std::move(new_holder));
auto * new_mutable_holder_cast = typeid_cast<ColumnUInt8 *>(new_mutable_holder.get());
if (!new_mutable_holder_cast)
throw Exception("addFilter function expected ColumnUInt8.", ErrorCodes::LOGICAL_ERROR);
const auto & data = filter->getData();
auto it = data.begin();
auto & new_data = new_mutable_holder_cast->getData();
auto n_it = new_data.begin();
while (it != data.end() && n_it != new_data.end())
{
*n_it = (*n_it && *it);
++it;
++n_it;
}
ConstantFilterDescription new_const_description(*new_mutable_holder);
if (new_const_description.always_true)
{
setFilterConstTrue();
}
else if (new_const_description.always_false)
{
clear();
}
else
{
filter_holder = std::move(new_mutable_holder);
filter = new_mutable_holder_cast;
}
}
else
{
filter_holder = std::move(new_holder);
filter = new_holder_cast;
}
}
}
@ -489,11 +536,14 @@ size_t MergeTreeRangeReader::ReadResult::countBytesInResultFilter(const IColumn:
MergeTreeRangeReader::MergeTreeRangeReader(
IMergeTreeReader * merge_tree_reader_,
MergeTreeRangeReader * prev_reader_,
const PrewhereInfoPtr & prewhere_,
const PrewhereInfoListPtr & prewhere_info_list_,
bool last_reader_in_chain_)
: merge_tree_reader(merge_tree_reader_)
, index_granularity(&(merge_tree_reader->data_part->index_granularity)), prev_reader(prev_reader_)
, prewhere(prewhere_), last_reader_in_chain(last_reader_in_chain_), is_initialized(true)
, index_granularity(&(merge_tree_reader->data_part->index_granularity))
, prev_reader(prev_reader_)
, prewhere_info_list(prewhere_info_list_)
, last_reader_in_chain(last_reader_in_chain_)
, is_initialized(true)
{
if (prev_reader)
sample_block = prev_reader->getSampleBlock();
@ -501,16 +551,19 @@ MergeTreeRangeReader::MergeTreeRangeReader(
for (const auto & name_and_type : merge_tree_reader->getColumns())
sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name});
if (prewhere)
if (prewhere_info_list)
{
if (prewhere->alias_actions)
prewhere->alias_actions->execute(sample_block, true);
for (const auto & prewhere_info : *prewhere_info_list)
{
if (prewhere_info.alias_actions)
prewhere_info.alias_actions->execute(sample_block, true);
if (prewhere->prewhere_actions)
prewhere->prewhere_actions->execute(sample_block, true);
if (prewhere_info.prewhere_actions)
prewhere_info.prewhere_actions->execute(sample_block, true);
if (prewhere->remove_prewhere_column)
sample_block.erase(prewhere->prewhere_column_name);
if (prewhere_info.remove_prewhere_column)
sample_block.erase(prewhere_info.prewhere_column_name);
}
}
}
@ -701,7 +754,13 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
if (read_result.num_rows == 0)
return read_result;
executePrewhereActionsAndFilterColumns(read_result);
if (prewhere_info_list)
{
for (const auto & prewhere_info : *prewhere_info_list)
{
executePrewhereActionsAndFilterColumns(read_result, prewhere_info);
}
}
return read_result;
}
@ -798,11 +857,8 @@ Columns MergeTreeRangeReader::continueReadingChain(ReadResult & result, size_t &
return columns;
}
void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & result)
void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & result, const PrewhereInfo & prewhere_info)
{
if (!prewhere)
return;
const auto & header = merge_tree_reader->getColumns();
size_t num_columns = header.size();
@ -831,14 +887,14 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
for (auto name_and_type = header.begin(); pos < num_columns; ++pos, ++name_and_type)
block.insert({result.columns[pos], name_and_type->type, name_and_type->name});
if (prewhere->alias_actions)
prewhere->alias_actions->execute(block);
if (prewhere_info.alias_actions)
prewhere_info.alias_actions->execute(block);
/// Columns might be projected out. We need to store them here so that default columns can be evaluated later.
result.block_before_prewhere = block;
prewhere->prewhere_actions->execute(block);
prewhere_info.prewhere_actions->execute(block);
prewhere_column_pos = block.getPositionByName(prewhere->prewhere_column_name);
prewhere_column_pos = block.getPositionByName(prewhere_info.prewhere_column_name);
result.columns.clear();
result.columns.reserve(block.columns());
@ -848,15 +904,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
filter.swap(result.columns[prewhere_column_pos]);
}
if (result.getFilter())
{
/// TODO: implement for prewhere chain.
/// In order to do it we need combine filter and result.filter, where filter filters only '1' in result.filter.
throw Exception("MergeTreeRangeReader chain with several prewhere actions in not implemented.",
ErrorCodes::LOGICAL_ERROR);
}
result.setFilter(filter);
result.addFilter(filter);
/// If there is a WHERE, we filter in there, and only optimize IO and shrink columns here
if (!last_reader_in_chain)
@ -866,7 +914,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
if (result.totalRowsPerGranule() == 0)
result.setFilterConstFalse();
/// If we need to filter in PREWHERE
else if (prewhere->need_filter || result.need_filter)
else if (prewhere_info.need_filter || result.need_filter)
{
/// If there is a filter and without optimized
if (result.getFilter() && last_reader_in_chain)
@ -907,11 +955,11 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
/// Check if the PREWHERE column is needed
if (!result.columns.empty())
{
if (prewhere->remove_prewhere_column)
if (prewhere_info.remove_prewhere_column)
result.columns.erase(result.columns.begin() + prewhere_column_pos);
else
result.columns[prewhere_column_pos] =
getSampleBlock().getByName(prewhere->prewhere_column_name).type->
getSampleBlock().getByName(prewhere_info.prewhere_column_name).type->
createColumnConst(result.num_rows, 1u)->convertToFullColumnIfConst();
}
}
@ -919,7 +967,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
else
{
result.columns[prewhere_column_pos] = result.getFilterHolder()->convertToFullColumnIfConst();
if (getSampleBlock().getByName(prewhere->prewhere_column_name).type->isNullable())
if (getSampleBlock().getByName(prewhere_info.prewhere_column_name).type->isNullable())
result.columns[prewhere_column_pos] = makeNullable(std::move(result.columns[prewhere_column_pos]));
result.clearFilter(); // Acting as a flag to not filter in PREWHERE
}

View File

@ -13,7 +13,8 @@ using ColumnUInt8 = ColumnVector<UInt8>;
class IMergeTreeReader;
class MergeTreeIndexGranularity;
struct PrewhereInfo;
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
using PrewhereInfoList = std::vector<PrewhereInfo>;
using PrewhereInfoListPtr = std::shared_ptr<PrewhereInfoList>;
/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part.
/// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark.
@ -24,7 +25,7 @@ public:
MergeTreeRangeReader(
IMergeTreeReader * merge_tree_reader_,
MergeTreeRangeReader * prev_reader_,
const PrewhereInfoPtr & prewhere_,
const PrewhereInfoListPtr & prewhere_info_list,
bool last_reader_in_chain_);
MergeTreeRangeReader() = default;
@ -153,8 +154,8 @@ public:
void addRows(size_t rows) { num_read_rows += rows; }
void addRange(const MarkRange & range) { started_ranges.push_back({rows_per_granule.size(), range}); }
/// Set filter or replace old one. Filter must have more zeroes than previous.
void setFilter(const ColumnPtr & new_filter);
/// Apply a filter on top of the existing one (AND'ed) or set it if there isn't any.
void addFilter(const ColumnPtr & new_filter);
/// For each granule calculate the number of filtered rows at the end. Remove them and update filter.
void optimize(bool can_read_incomplete_granules);
/// Remove all rows from granules.
@ -212,12 +213,12 @@ private:
ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges);
Columns continueReadingChain(ReadResult & result, size_t & num_rows);
void executePrewhereActionsAndFilterColumns(ReadResult & result);
void executePrewhereActionsAndFilterColumns(ReadResult & result, const PrewhereInfo & prewhere_info);
IMergeTreeReader * merge_tree_reader = nullptr;
const MergeTreeIndexGranularity * index_granularity = nullptr;
MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly.
PrewhereInfoPtr prewhere;
PrewhereInfoListPtr prewhere_info_list;
Stream stream;

View File

@ -24,7 +24,7 @@ MergeTreeReadPool::MergeTreeReadPool(
RangesInDataParts && parts_,
const MergeTreeData & data_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const PrewhereInfoListPtr & prewhere_info_list_,
const bool check_columns_,
const Names & column_names_,
const BackoffSettings & backoff_settings_,
@ -37,7 +37,7 @@ MergeTreeReadPool::MergeTreeReadPool(
, column_names{column_names_}
, do_not_steal_tasks{do_not_steal_tasks_}
, predict_block_size_bytes{preferred_block_size_bytes_ > 0}
, prewhere_info{prewhere_info_}
, prewhere_info_list{prewhere_info_list_}
, parts_ranges{std::move(parts_)}
{
/// parts don't contain duplicate MergeTreeDataPart's.
@ -139,7 +139,7 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
return std::make_unique<MergeTreeReadTask>(
part.data_part, ranges_to_get_from_part, part.part_index_in_query, ordered_names,
per_part_column_name_set[part_idx], per_part_columns[part_idx], per_part_pre_columns[part_idx],
prewhere_info && prewhere_info->remove_prewhere_column, per_part_should_reorder[part_idx], std::move(curr_task_size_predictor));
per_part_should_reorder[part_idx], std::move(curr_task_size_predictor));
}
MarkRanges MergeTreeReadPool::getRestMarks(const IMergeTreeDataPart & part, const MarkRange & from) const
@ -229,7 +229,7 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
per_part_sum_marks.push_back(sum_marks);
auto [required_columns, required_pre_columns, should_reorder] =
getReadTaskColumns(data, metadata_snapshot, part.data_part, column_names, prewhere_info, check_columns);
getReadTaskColumns(data, metadata_snapshot, part.data_part, column_names, prewhere_info_list, check_columns);
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & required_column_names = required_columns.getNames();

View File

@ -71,10 +71,9 @@ private:
public:
MergeTreeReadPool(
const size_t threads_, const size_t sum_marks_, const size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_,
const bool check_columns_, const Names & column_names_,
const BackoffSettings & backoff_settings_, size_t preferred_block_size_bytes_,
const bool do_not_steal_tasks_ = false);
RangesInDataParts && parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoListPtr & prewhere_info_list, const bool check_columns_, const Names & column_names_,
const BackoffSettings & backoff_settings_, size_t preferred_block_size_bytes_, const bool do_not_steal_tasks_ = false);
MergeTreeReadTaskPtr getTask(const size_t min_marks_to_read, const size_t thread, const Names & ordered_names);
@ -107,7 +106,7 @@ private:
std::vector<NamesAndTypesList> per_part_pre_columns;
std::vector<char> per_part_should_reorder;
std::vector<MergeTreeBlockSizePredictorPtr> per_part_size_predictor;
PrewhereInfoPtr prewhere_info;
PrewhereInfoListPtr prewhere_info_list;
struct Part
{

View File

@ -22,7 +22,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
Names required_columns_,
MarkRanges mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
const PrewhereInfoListPtr & prewhere_info_list_,
bool check_columns,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_,
@ -31,7 +31,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
:
MergeTreeBaseSelectProcessor{
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
storage_, metadata_snapshot_, prewhere_info_list_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},
@ -56,7 +56,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
ordered_names = header_without_virtual_columns.getNames();
task_columns = getReadTaskColumns(storage, metadata_snapshot, data_part, required_columns, prewhere_info, check_columns);
task_columns = getReadTaskColumns(storage, metadata_snapshot, data_part, required_columns, prewhere_info_list, check_columns);
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & column_names = task_columns.columns.getNames();
@ -71,7 +71,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
all_mark_ranges, owned_uncompressed_cache.get(),
owned_mark_cache.get(), reader_settings);
if (prewhere_info)
if (prewhere_info_list)
pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
}
@ -100,8 +100,7 @@ try
task = std::make_unique<MergeTreeReadTask>(
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set,
task_columns.columns, task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
task_columns.should_reorder, std::move(size_predictor));
task_columns.columns, task_columns.pre_columns, task_columns.should_reorder, std::move(size_predictor));
return true;
}

View File

@ -26,7 +26,7 @@ public:
Names required_columns_,
MarkRanges mark_ranges,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
const PrewhereInfoListPtr & prewhere_info_list,
bool check_columns,
const MergeTreeReaderSettings & reader_settings,
const Names & virt_column_names = {},

View File

@ -22,7 +22,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
Names required_columns_,
MarkRanges mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
const PrewhereInfoListPtr & prewhere_info_list_,
bool check_columns_,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_,
@ -31,7 +31,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
:
MergeTreeBaseSelectProcessor{
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
storage_, metadata_snapshot_, prewhere_info_list_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},
@ -69,7 +69,7 @@ try
task_columns = getReadTaskColumns(
storage, metadata_snapshot, data_part,
required_columns, prewhere_info, check_columns);
required_columns, prewhere_info_list, check_columns);
auto size_predictor = (preferred_block_size_bytes == 0)
? nullptr
@ -81,8 +81,7 @@ try
task = std::make_unique<MergeTreeReadTask>(
data_part, all_mark_ranges, part_index_in_query, ordered_names, column_name_set, task_columns.columns,
task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
task_columns.should_reorder, std::move(size_predictor));
task_columns.pre_columns, task_columns.should_reorder, std::move(size_predictor));
if (!reader)
{
@ -94,7 +93,7 @@ try
reader = data_part->getReader(task_columns.columns, metadata_snapshot, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
if (prewhere_info)
if (prewhere_info_list)
pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
}

View File

@ -26,7 +26,7 @@ public:
Names required_columns_,
MarkRanges mark_ranges,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
const PrewhereInfoListPtr & prewhere_info_list,
bool check_columns,
const MergeTreeReaderSettings & reader_settings,
const Names & virt_column_names = {},

View File

@ -18,12 +18,12 @@ MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcess
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
const PrewhereInfoListPtr & prewhere_info_list_,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_)
:
MergeTreeBaseSelectProcessor{
pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_,
pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_list_,
max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
@ -78,7 +78,7 @@ bool MergeTreeThreadSelectBlockInputProcessor::getNewTask()
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);
if (prewhere_info)
if (prewhere_info_list)
pre_reader = task->data_part->getReader(task->pre_columns, metadata_snapshot, rest_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);
@ -94,7 +94,7 @@ bool MergeTreeThreadSelectBlockInputProcessor::getNewTask()
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback);
if (prewhere_info)
if (prewhere_info_list)
pre_reader = task->data_part->getReader(task->pre_columns, metadata_snapshot, rest_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback);

View File

@ -24,7 +24,7 @@ public:
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
const PrewhereInfoListPtr & prewhere_info_list_,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_);

View File

@ -32,6 +32,8 @@ struct PrewhereInfo
: prewhere_actions(std::move(prewhere_actions_)), prewhere_column_name(std::move(prewhere_column_name_)) {}
};
using PrewhereInfoList = std::vector<PrewhereInfo>;
/// Same as PrewhereInfo, but with ActionsDAG
struct PrewhereDAGInfo
{
@ -75,7 +77,7 @@ struct InputOrderInfo
bool operator !=(const InputOrderInfo & other) const { return !(*this == other); }
};
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
using PrewhereInfoListPtr = std::shared_ptr<PrewhereInfoList>;
using PrewhereDAGInfoPtr = std::shared_ptr<PrewhereDAGInfo>;
using FilterInfoPtr = std::shared_ptr<FilterInfo>;
using InputOrderInfoPtr = std::shared_ptr<const InputOrderInfo>;
@ -104,7 +106,7 @@ struct SelectQueryInfo
TreeRewriterResultPtr syntax_analyzer_result;
PrewhereInfoPtr prewhere_info;
PrewhereInfoListPtr prewhere_info_list;
ReadInOrderOptimizerPtr order_optimizer;
/// Can be modified while reading from storage

View File

@ -314,21 +314,26 @@ void StorageBuffer::read(
}
else
{
if (query_info.prewhere_info)
if (query_info.prewhere_info_list)
{
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(
header, query_info.prewhere_info->prewhere_actions,
query_info.prewhere_info->prewhere_column_name, query_info.prewhere_info->remove_prewhere_column);
});
if (query_info.prewhere_info->alias_actions)
for (const auto & prewhere_info : *query_info.prewhere_info_list)
{
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, query_info.prewhere_info->alias_actions);
return std::make_shared<FilterTransform>(
header, prewhere_info.prewhere_actions,
prewhere_info.prewhere_column_name,
prewhere_info.remove_prewhere_column);
});
if (prewhere_info.alias_actions)
{
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(
header, prewhere_info.alias_actions);
});
}
}
}