Added InterpreterSelectQuery::getSampleBlockImpl. Disable dry_run.

This commit is contained in:
Nikolai Kochetov 2019-08-08 18:18:28 +03:00
parent 42c2833932
commit 9ac401573f
6 changed files with 72 additions and 91 deletions

View File

@ -270,7 +270,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
String database_name;
String table_name;
getDatabaseAndTableNames(database_name, table_name);
getDatabaseAndTableNames(query, database_name, table_name, context);
if (auto view_source = context.getViewSource())
{
@ -344,17 +344,16 @@ InterpreterSelectQuery::InterpreterSelectQuery(
source_header = storage->getSampleBlockForColumns(required_columns);
/// Calculate structure of the result.
{
Pipeline pipeline;
executeImpl(pipeline, nullptr, true);
result_header = pipeline.firstStream()->getHeader();
}
result_header = getSampleBlockImpl();
for (auto & col : result_header)
if (!col.column)
col.column = col.type->createColumn();
}
void InterpreterSelectQuery::getDatabaseAndTableNames(String & database_name, String & table_name)
void InterpreterSelectQuery::getDatabaseAndTableNames(const ASTSelectQuery & query, String & database_name, String & table_name, const Context & context)
{
if (auto db_and_table = getDatabaseAndTable(getSelectQuery(), 0))
if (auto db_and_table = getDatabaseAndTable(query, 0))
{
table_name = db_and_table->table;
database_name = db_and_table->database;
@ -403,60 +402,35 @@ QueryPipeline InterpreterSelectQuery::executeWithProcessors()
}
Block InterpreterSelectQuery::getHeaderForExecutionStep(
const ASTPtr & query_ptr,
const StoragePtr & storage,
QueryProcessingStage::Enum stage,
size_t subquery_depth,
const Context & context,
const PrewhereInfoPtr & prewhere_info)
Block InterpreterSelectQuery::getSampleBlockImpl()
{
SelectQueryOptions options(stage, subquery_depth);
options.only_analyze = true;
Names required_result_column_names;
/// TODO: remove it.
auto query = query_ptr->clone();
auto syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze(
query, {}, required_result_column_names, storage);
auto query_analyzer = ExpressionAnalyzer(
query, syntax_analyzer_result, context, NamesAndTypesList(),
NameSet(required_result_column_names.begin(), required_result_column_names.end()),
options.subquery_depth, !options.only_analyze);
if (stage == QueryProcessingStage::Enum::FetchColumns)
{
auto required_columns = query_analyzer.getRequiredSourceColumns();
auto header = storage->getSampleBlockForColumns(required_columns);
if (prewhere_info)
{
prewhere_info->prewhere_actions->execute(header);
header = materializeBlock(header);
if (prewhere_info->remove_prewhere_column)
header.erase(prewhere_info->prewhere_column_name);
}
return header;
}
FilterInfoPtr filter_info;
auto & select_query = query->as<const ASTSelectQuery &>();
auto analysis_result = analyzeExpressions(
select_query,
query_analyzer,
getSelectQuery(),
*query_analyzer,
QueryProcessingStage::Enum::FetchColumns,
stage,
options.to_stage,
context,
storage,
true,
filter_info);
if (stage == QueryProcessingStage::Enum::WithMergeableState)
if (options.to_stage == QueryProcessingStage::Enum::FetchColumns)
{
auto header = source_header;
if (analysis_result.prewhere_info)
{
analysis_result.prewhere_info->prewhere_actions->execute(header);
header = materializeBlock(header);
if (analysis_result.prewhere_info->remove_prewhere_column)
header.erase(analysis_result.prewhere_info->prewhere_column_name);
}
return header;
}
if (options.to_stage == QueryProcessingStage::Enum::WithMergeableState)
{
if (!analysis_result.need_aggregate)
return analysis_result.before_order_and_select->getSampleBlock();
@ -465,7 +439,7 @@ Block InterpreterSelectQuery::getHeaderForExecutionStep(
Names key_names;
AggregateDescriptions aggregates;
query_analyzer.getAggregateInfo(key_names, aggregates);
query_analyzer->getAggregateInfo(key_names, aggregates);
Block res;
@ -524,6 +498,24 @@ InterpreterSelectQuery::analyzeExpressions(
{
chain.finalize();
/// Check that actions on current step are valid.
/// Now this in needed for mutations to check in mutation is valid before execute it in background.
/// Because some functions only checking correctness of constant arguments during execution,
/// but not in getReturnType method (e.g. compare date with constant string).
if (dry_run)
{
for (auto & step : chain.steps)
{
auto step_required_columns = step.actions->getRequiredColumnsWithTypes();
Block sample;
for (auto & col : step_required_columns)
sample.insert({col.type->createColumn(), col.type, col.name});
step.actions->execute(sample);
}
}
if (has_prewhere)
{
const ExpressionActionsChain::Step & step = chain.steps.at(0);

View File

@ -79,14 +79,6 @@ public:
ASTPtr getQuery() const { return query_ptr; }
static Block getHeaderForExecutionStep(
const ASTPtr & query,
const StoragePtr & storage,
QueryProcessingStage::Enum stage,
size_t subquery_depth,
const Context & context,
const PrewhereInfoPtr & prewhere_info);
private:
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
@ -98,6 +90,7 @@ private:
ASTSelectQuery & getSelectQuery() { return query_ptr->as<ASTSelectQuery &>(); }
Block getSampleBlockImpl();
struct Pipeline
{
@ -192,7 +185,7 @@ private:
/** From which table to read. With JOIN, the "left" table is returned.
*/
void getDatabaseAndTableNames(String & database_name, String & table_name);
static void getDatabaseAndTableNames(const ASTSelectQuery & query, String & database_name, String & table_name, const Context & context);
/// Different stages of query execution.

View File

@ -50,6 +50,7 @@ private:
Context context;
std::vector<std::unique_ptr<InterpreterSelectQuery>> nested_interpreters;
Blocks nested_headers;
Block result_header;

View File

@ -301,22 +301,8 @@ BlockInputStreams StorageDistributed::read(
const auto & modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table, remote_table_function_ptr);
StoragePtr tmp_storage;
if (remote_table_function_ptr)
tmp_storage = context.getQueryContext().executeTableFunction(remote_table_function_ptr);
else
tmp_storage = context.getTable(remote_database, remote_table);
Block header =
//InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage)).getSampleBlock());
InterpreterSelectQuery::getHeaderForExecutionStep(query_info.query, tmp_storage, processed_stage, 0, context, query_info.prewhere_info);
/// Create empty columns for header.
/// All columns must be empty, because otherwise (by some reason) remote query can return one excessive row.
/// So, all columns are recreated.
for (auto & col : header)
col.column = col.type->createColumn();
Block header = materializeBlock(
InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage)).getSampleBlock());
ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr
? ClusterProxy::SelectStreamFactory(

View File

@ -190,7 +190,7 @@ BlockInputStreams StorageMerge::read(
modified_context.getSettingsRef().optimize_move_to_prewhere = false;
/// What will be result structure depending on query processed stage in source tables?
Block header = getQueryHeader(query_info, context, processed_stage);
Block header = getQueryHeader(column_names, query_info, context, processed_stage);
/** First we make list of selected tables to find out its size.
* This is necessary to correctly pass the recommended number of threads to each table.
@ -407,20 +407,29 @@ void StorageMerge::alter(
}
Block StorageMerge::getQueryHeader(
const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage)
const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage)
{
auto storage = shared_from_this();
auto header = InterpreterSelectQuery::getHeaderForExecutionStep(query_info.query, storage, processed_stage, 0, context, query_info.prewhere_info);
for (auto & col : header)
switch (processed_stage)
{
if (!col.column)
col.column = col.type->createColumn();
else
col.column = col.column->convertToFullColumnIfConst();
case QueryProcessingStage::FetchColumns:
{
Block header = getSampleBlockForColumns(column_names);
if (query_info.prewhere_info)
{
query_info.prewhere_info->prewhere_actions->execute(header);
header = materializeBlock(header);
if (query_info.prewhere_info->remove_prewhere_column)
header.erase(query_info.prewhere_info->prewhere_column_name);
}
return header;
}
case QueryProcessingStage::WithMergeableState:
case QueryProcessingStage::Complete:
return materializeBlock(InterpreterSelectQuery(
query_info.query, context, std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names)),
SelectQueryOptions(processed_stage).analyze()).getSampleBlock());
}
return header;
throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
}
void StorageMerge::convertingSourceStream(const Block & header, const Context & context, ASTPtr & query,

View File

@ -79,7 +79,7 @@ protected:
const String & table_name_regexp_,
const Context & context_);
Block getQueryHeader(const SelectQueryInfo & query_info,
Block getQueryHeader(const Names & column_names, const SelectQueryInfo & query_info,
const Context & context, QueryProcessingStage::Enum processed_stage);
BlockInputStreams createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,