remove unused columns from prewhere actions

This commit is contained in:
Nikolai Kochetov 2018-04-06 16:58:06 +03:00
parent e203457d55
commit 791a03e776
10 changed files with 66 additions and 29 deletions

View File

@ -2429,6 +2429,21 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
return true;
}
bool ExpressionAnalyzer::appendPrewhere(ExpressionActionsChain & chain, bool only_types)
{
assertSelect();
if (!select_query->prewhere_expression)
return false;
initChain(chain, source_columns);
ExpressionActionsChain::Step & step = chain.steps.back();
step.required_output.push_back(select_query->prewhere_expression->getColumnName());
getRootActions(select_query->prewhere_expression, only_types, false, step.actions);
return true;
}
bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_types)
{

View File

@ -103,6 +103,7 @@ public:
/// Before aggregation:
bool appendArrayJoin(ExpressionActionsChain & chain, bool only_types);
bool appendJoin(ExpressionActionsChain & chain, bool only_types);
bool appendPrewhere(ExpressionActionsChain & chain, bool only_types);
bool appendWhere(ExpressionActionsChain & chain, bool only_types);
bool appendGroupBy(ExpressionActionsChain & chain, bool only_types);
void appendAggregateFunctionsArguments(ExpressionActionsChain & chain, bool only_types);

View File

@ -234,7 +234,8 @@ BlockInputStreams InterpreterSelectQuery::executeWithMultipleStreams()
}
InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage)
InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage,
ExpressionActionsChain & chain)
{
AnalysisResult res;
@ -251,8 +252,6 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
*/
{
ExpressionActionsChain chain;
res.need_aggregate = query_analyzer->hasAggregation();
query_analyzer->appendArrayJoin(chain, !res.first_stage);
@ -336,16 +335,22 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
* then perform the remaining operations with one resulting stream.
*/
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
QueryProcessingStage::Enum from_stage = executeFetchColumns(pipeline, dry_run);
AnalysisResult expressions;
{
ExpressionActionsChain chain;
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
QueryProcessingStage::Enum from_stage = executeFetchColumns(pipeline, dry_run, chain);
if (from_stage == QueryProcessingStage::WithMergeableState && to_stage == QueryProcessingStage::WithMergeableState)
throw Exception("Distributed on Distributed is not supported", ErrorCodes::NOT_IMPLEMENTED);
if (from_stage == QueryProcessingStage::WithMergeableState &&
to_stage == QueryProcessingStage::WithMergeableState)
throw Exception("Distributed on Distributed is not supported", ErrorCodes::NOT_IMPLEMENTED);
if (!dry_run)
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
if (!dry_run)
LOG_TRACE(log,
QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
AnalysisResult expressions = analyzeExpressions(from_stage);
expressions = analyzeExpressions(from_stage, chain);
}
const Settings & settings = context.getSettingsRef();
@ -510,7 +515,8 @@ static void getLimitLengthAndOffset(ASTSelectQuery & query, size_t & length, siz
}
}
QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline & pipeline, bool dry_run)
QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline & pipeline, bool dry_run,
ExpressionActionsChain & chain)
{
/// List of columns to read to execute the query.
Names required_columns = query_analyzer->getRequiredSourceColumns();
@ -676,6 +682,12 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
optimize_prewhere(*merge_tree);
}
if (query_analyzer->appendPrewhere(chain, false))
{
query_info.prewhere_actions = chain.getLastActions();
chain.addStep();
}
if (!dry_run)
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);

View File

@ -124,6 +124,7 @@ private:
bool has_limit_by = false;
ExpressionActionsPtr before_join; /// including JOIN
ExpressionActionsPtr before_prewhere;
ExpressionActionsPtr before_where;
ExpressionActionsPtr before_aggregation;
ExpressionActionsPtr before_having;
@ -142,7 +143,7 @@ private:
SubqueriesForSets subqueries_for_sets;
};
AnalysisResult analyzeExpressions(QueryProcessingStage::Enum from_stage);
AnalysisResult analyzeExpressions(QueryProcessingStage::Enum from_stage, ExpressionActionsChain & chain);
/** From which table to read. With JOIN, the "left" table is returned.
@ -155,7 +156,7 @@ private:
void executeWithMultipleStreamsImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run);
/// Fetch data from the table. Returns the stage to which the query was processed in Storage.
QueryProcessingStage::Enum executeFetchColumns(Pipeline & pipeline, bool dry_run);
QueryProcessingStage::Enum executeFetchColumns(Pipeline & pipeline, bool dry_run, ExpressionActionsChain & chain);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);

View File

@ -216,6 +216,18 @@ void MergeTreeBaseBlockInputStream::injectVirtualColumns(Block & block) const
}
void MergeTreeBaseBlockInputStream::executePrewhereActions(Block & block) const
{
if (prewhere_actions)
{
bool had_prewhere_column = block.has(prewhere_column_name);
prewhere_actions->execute(block);
if (!had_prewhere_column)
block.erase(prewhere_column_name);
}
}
MergeTreeBaseBlockInputStream::~MergeTreeBaseBlockInputStream() = default;
}

View File

@ -44,6 +44,8 @@ protected:
void injectVirtualColumns(Block & block) const;
void executePrewhereActions(Block & block) const;
protected:
MergeTreeData & storage;

View File

@ -62,6 +62,7 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(
addTotalRowsApprox(total_rows);
header = storage.getSampleBlockForColumns(ordered_names);
executePrewhereActions(header);
/// Types may be different during ALTER (when this stream is used to perform an ALTER).
/// NOTE: We may use similar code to implement non blocking ALTERs.

View File

@ -494,23 +494,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
LOG_DEBUG(log, "MinMax index condition: " << minmax_idx_condition->toString());
/// PREWHERE
ExpressionActionsPtr prewhere_actions;
String prewhere_column;
if (select.prewhere_expression)
{
ExpressionAnalyzer analyzer(select.prewhere_expression, context, nullptr, available_real_columns);
prewhere_actions = analyzer.getActions(false);
prewhere_column = select.prewhere_expression->getColumnName();
SubqueriesForSets prewhere_subqueries = analyzer.getSubqueriesForSets();
/** Compute the subqueries right now.
* NOTE Disadvantage - these calculations do not fit into the query execution pipeline.
* They are done before the execution of the pipeline; they can not be interrupted; during the computation, packets of progress are not sent.
*/
if (!prewhere_subqueries.empty())
CreatingSetsBlockInputStream(std::make_shared<NullBlockInputStream>(Block()), prewhere_subqueries,
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode)).read();
}
RangesInDataParts parts_with_ranges;
@ -567,7 +553,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
column_names_to_read,
max_block_size,
settings.use_uncompressed_cache,
prewhere_actions,
query_info.prewhere_actions,
prewhere_column,
virt_column_names,
settings,
@ -581,7 +567,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
column_names_to_read,
max_block_size,
settings.use_uncompressed_cache,
prewhere_actions,
query_info.prewhere_actions,
prewhere_column,
virt_column_names,
settings);

View File

@ -42,6 +42,7 @@ Block MergeTreeThreadBlockInputStream::getHeader() const
{
auto res = pool->getHeader();
injectVirtualColumns(res);
executePrewhereActions(res);
return res;
};

View File

@ -10,6 +10,9 @@ namespace DB
class IAST;
using ASTPtr = std::shared_ptr<IAST>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class Set;
using SetPtr = std::shared_ptr<Set>;
@ -25,6 +28,9 @@ struct SelectQueryInfo
{
ASTPtr query;
/// Actions which are executed on block in order to get filter column for prewhere step.
ExpressionActionsPtr prewhere_actions;
/// Prepared sets are used for indices by storage engine.
/// Example: x IN (1, 2, 3)
PreparedSets sets;