ClickHouse/src/Interpreters/IInterpreterUnionOrSelectQuery.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

139 lines
5.2 KiB
C++
Raw Normal View History

2020-12-14 03:30:39 +00:00
#include <Interpreters/IInterpreterUnionOrSelectQuery.h>
#include <Interpreters/QueryLog.h>
2021-09-15 19:35:48 +00:00
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
2022-07-06 12:37:37 +00:00
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ExpressionAnalyzer.h>
2022-08-13 13:03:16 +00:00
#include <Interpreters/TreeRewriter.h>
2022-07-06 12:37:37 +00:00
#include <Processors/QueryPlan/FilterStep.h>
2020-12-14 03:30:39 +00:00
2022-08-13 13:03:16 +00:00
2020-12-14 03:30:39 +00:00
namespace DB
{
2022-05-23 13:46:57 +00:00
QueryPipelineBuilder IInterpreterUnionOrSelectQuery::buildQueryPipeline()
2021-09-15 19:35:48 +00:00
{
QueryPlan query_plan;
buildQueryPlan(query_plan);
2022-05-23 13:46:57 +00:00
return std::move(*query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)));
2022-05-20 19:49:31 +00:00
}
static StreamLocalLimits getLimitsForStorage(const Settings & settings, const SelectQueryOptions & options)
{
StreamLocalLimits limits;
limits.mode = LimitsMode::LIMITS_TOTAL;
limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode);
limits.speed_limits.max_execution_time = settings.max_execution_time;
limits.timeout_overflow_mode = settings.timeout_overflow_mode;
/** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers,
* because the initiating server has a summary of the execution of the request on all servers.
*
* But limits on data size to read and maximum execution time are reasonable to check both on initiator and
* additionally on each remote server, because these limits are checked per block of data processed,
* and remote servers may process way more blocks of data than are received by initiator.
*
* The limits to throttle maximum execution speed is also checked on all servers.
*/
if (options.to_stage == QueryProcessingStage::Complete)
{
limits.speed_limits.min_execution_rps = settings.min_execution_speed;
limits.speed_limits.min_execution_bps = settings.min_execution_speed_bytes;
}
limits.speed_limits.max_execution_rps = settings.max_execution_speed;
limits.speed_limits.max_execution_bps = settings.max_execution_speed_bytes;
limits.speed_limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed;
return limits;
}
2022-05-31 14:43:38 +00:00
StorageLimits IInterpreterUnionOrSelectQuery::getStorageLimits(const Context & context, const SelectQueryOptions & options)
2022-05-20 19:49:31 +00:00
{
2022-05-27 20:47:35 +00:00
const auto & settings = context.getSettingsRef();
2022-05-20 19:49:31 +00:00
StreamLocalLimits limits;
SizeLimits leaf_limits;
/// Set the limits and quota for reading data, the speed and time of the query.
if (!options.ignore_limits)
{
limits = getLimitsForStorage(settings, options);
leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, settings.max_bytes_to_read_leaf, settings.read_overflow_mode_leaf);
}
2022-05-31 14:43:38 +00:00
return {limits, leaf_limits};
}
2022-06-02 10:34:40 +00:00
void IInterpreterUnionOrSelectQuery::setQuota(QueryPipeline & pipeline) const
2022-05-31 14:43:38 +00:00
{
std::shared_ptr<const EnabledQuota> quota;
2022-05-20 19:49:31 +00:00
if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete))
2022-06-02 10:34:40 +00:00
quota = context->getQuota();
2022-05-27 20:47:35 +00:00
2022-05-20 19:49:31 +00:00
pipeline.setQuota(quota);
2021-09-15 19:35:48 +00:00
}
2022-07-06 12:37:37 +00:00
static ASTPtr parseAdditionalPostFilter(const Context & context)
{
const auto & settings = context.getSettingsRef();
const String & filter = settings.additional_result_filter;
if (filter.empty())
return nullptr;
ParserExpression parser;
return parseQuery(
parser, filter.data(), filter.data() + filter.size(),
"additional filter", settings.max_query_size, settings.max_parser_depth);
}
static ActionsDAGPtr makeAdditionalPostFilter(ASTPtr & ast, ContextPtr context, const Block & header)
{
auto syntax_result = TreeRewriter(context).analyze(ast, header.getNamesAndTypesList());
String result_column_name = ast->getColumnName();
auto dag = ExpressionAnalyzer(ast, syntax_result, context).getActionsDAG(false, false);
2022-08-08 15:54:51 +00:00
const ActionsDAG::Node * result_node = &dag->findInOutputs(result_column_name);
auto & outputs = dag->getOutputs();
outputs.clear();
outputs.reserve(dag->getInputs().size() + 1);
2022-07-06 12:37:37 +00:00
for (const auto * node : dag->getInputs())
2022-08-08 15:54:51 +00:00
outputs.push_back(node);
2022-07-06 12:37:37 +00:00
2022-08-08 15:54:51 +00:00
outputs.push_back(result_node);
2022-07-06 12:37:37 +00:00
return dag;
}
void IInterpreterUnionOrSelectQuery::addAdditionalPostFilter(QueryPlan & plan) const
{
if (options.subquery_depth != 0)
return;
auto ast = parseAdditionalPostFilter(*context);
if (!ast)
return;
auto dag = makeAdditionalPostFilter(ast, context, plan.getCurrentDataStream().header);
2022-08-08 15:54:51 +00:00
std::string filter_name = dag->getOutputs().back()->result_name;
2022-07-06 12:37:37 +00:00
auto filter_step = std::make_unique<FilterStep>(
plan.getCurrentDataStream(), std::move(dag), std::move(filter_name), true);
filter_step->setStepDescription("Additional result filter");
plan.addStep(std::move(filter_step));
}
2022-05-31 14:43:38 +00:00
void IInterpreterUnionOrSelectQuery::addStorageLimits(const StorageLimitsList & limits)
{
for (const auto & val : limits)
storage_limits.push_back(val);
2022-05-27 20:47:35 +00:00
}
2020-12-14 03:30:39 +00:00
}