mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Merge pull request #44234 from kitaisreal/analyzer-expired-context-crash-fix
Analyzer expired Context crash fix
This commit is contained in:
commit
6594aa8f1e
@ -14,6 +14,7 @@
|
||||
#include <Interpreters/addMissingDefaults.h>
|
||||
#include <Interpreters/getTableExpressions.h>
|
||||
#include <Interpreters/processColumnTransformers.h>
|
||||
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTInsertQuery.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
@ -63,39 +64,54 @@ InterpreterInsertQuery::InterpreterInsertQuery(
|
||||
|
||||
StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query)
|
||||
{
|
||||
auto current_context = getContext();
|
||||
|
||||
if (query.table_function)
|
||||
{
|
||||
const auto & factory = TableFunctionFactory::instance();
|
||||
TableFunctionPtr table_function_ptr = factory.get(query.table_function, getContext());
|
||||
TableFunctionPtr table_function_ptr = factory.get(query.table_function, current_context);
|
||||
|
||||
/// If table function needs structure hint from select query
|
||||
/// we can create a temporary pipeline and get the header.
|
||||
if (query.select && table_function_ptr->needStructureHint())
|
||||
{
|
||||
InterpreterSelectWithUnionQuery interpreter_select{
|
||||
query.select, getContext(), SelectQueryOptions(QueryProcessingStage::Complete, 1)};
|
||||
auto tmp_pipeline = interpreter_select.buildQueryPipeline();
|
||||
ColumnsDescription structure_hint{tmp_pipeline.getHeader().getNamesAndTypesList()};
|
||||
Block header_block;
|
||||
auto select_query_options = SelectQueryOptions(QueryProcessingStage::Complete, 1);
|
||||
|
||||
if (current_context->getSettingsRef().allow_experimental_analyzer)
|
||||
{
|
||||
InterpreterSelectQueryAnalyzer interpreter_select(query.select, current_context, select_query_options);
|
||||
header_block = interpreter_select.getSampleBlock();
|
||||
}
|
||||
else
|
||||
{
|
||||
InterpreterSelectWithUnionQuery interpreter_select{
|
||||
query.select, current_context, select_query_options};
|
||||
auto tmp_pipeline = interpreter_select.buildQueryPipeline();
|
||||
header_block = tmp_pipeline.getHeader();
|
||||
}
|
||||
|
||||
ColumnsDescription structure_hint{header_block.getNamesAndTypesList()};
|
||||
table_function_ptr->setStructureHint(structure_hint);
|
||||
}
|
||||
|
||||
return table_function_ptr->execute(query.table_function, getContext(), table_function_ptr->getName(),
|
||||
return table_function_ptr->execute(query.table_function, current_context, table_function_ptr->getName(),
|
||||
/* cached_columns */ {}, /* use_global_context */ false, /* is_insert_query */true);
|
||||
}
|
||||
|
||||
if (query.table_id)
|
||||
{
|
||||
query.table_id = getContext()->resolveStorageID(query.table_id);
|
||||
query.table_id = current_context->resolveStorageID(query.table_id);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Insert query parser does not fill table_id because table and
|
||||
/// database can be parameters and be filled after parsing.
|
||||
StorageID local_table_id(query.getDatabase(), query.getTable());
|
||||
query.table_id = getContext()->resolveStorageID(local_table_id);
|
||||
query.table_id = current_context->resolveStorageID(local_table_id);
|
||||
}
|
||||
|
||||
return DatabaseCatalog::instance().getTable(query.table_id, getContext());
|
||||
return DatabaseCatalog::instance().getTable(query.table_id, current_context);
|
||||
}
|
||||
|
||||
Block InterpreterInsertQuery::getSampleBlock(
|
||||
@ -384,16 +400,34 @@ BlockIO InterpreterInsertQuery::execute()
|
||||
new_context->setSettings(new_settings);
|
||||
new_context->setInsertionTable(getContext()->getInsertionTable());
|
||||
|
||||
InterpreterSelectWithUnionQuery interpreter_select{
|
||||
query.select, new_context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
|
||||
pipeline = interpreter_select.buildQueryPipeline();
|
||||
auto select_query_options = SelectQueryOptions(QueryProcessingStage::Complete, 1);
|
||||
|
||||
if (settings.allow_experimental_analyzer)
|
||||
{
|
||||
InterpreterSelectQueryAnalyzer interpreter_select_analyzer(query.select, new_context, select_query_options);
|
||||
pipeline = interpreter_select_analyzer.buildQueryPipeline();
|
||||
}
|
||||
else
|
||||
{
|
||||
InterpreterSelectWithUnionQuery interpreter_select(query.select, new_context, select_query_options);
|
||||
pipeline = interpreter_select.buildQueryPipeline();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Passing 1 as subquery_depth will disable limiting size of intermediate result.
|
||||
InterpreterSelectWithUnionQuery interpreter_select{
|
||||
query.select, getContext(), SelectQueryOptions(QueryProcessingStage::Complete, 1)};
|
||||
pipeline = interpreter_select.buildQueryPipeline();
|
||||
auto select_query_options = SelectQueryOptions(QueryProcessingStage::Complete, 1);
|
||||
|
||||
if (settings.allow_experimental_analyzer)
|
||||
{
|
||||
InterpreterSelectQueryAnalyzer interpreter_select_analyzer(query.select, getContext(), select_query_options);
|
||||
pipeline = interpreter_select_analyzer.buildQueryPipeline();
|
||||
}
|
||||
else
|
||||
{
|
||||
InterpreterSelectWithUnionQuery interpreter_select(query.select, getContext(), select_query_options);
|
||||
pipeline = interpreter_select.buildQueryPipeline();
|
||||
}
|
||||
}
|
||||
|
||||
pipeline.dropTotalsAndExtremes();
|
||||
|
@ -119,6 +119,17 @@ QueryPlan && InterpreterSelectQueryAnalyzer::extractQueryPlan() &&
|
||||
return std::move(planner).extractQueryPlan();
|
||||
}
|
||||
|
||||
QueryPipelineBuilder InterpreterSelectQueryAnalyzer::buildQueryPipeline()
|
||||
{
|
||||
planner.buildQueryPlanIfNeeded();
|
||||
auto & query_plan = planner.getQueryPlan();
|
||||
|
||||
QueryPlanOptimizationSettings optimization_settings;
|
||||
BuildQueryPipelineSettings build_pipeline_settings;
|
||||
|
||||
return std::move(*query_plan.buildQueryPipeline(optimization_settings, build_pipeline_settings));
|
||||
}
|
||||
|
||||
void InterpreterSelectQueryAnalyzer::addStorageLimits(const StorageLimitsList & storage_limits)
|
||||
{
|
||||
planner.addStorageLimits(storage_limits);
|
||||
|
@ -36,6 +36,8 @@ public:
|
||||
|
||||
QueryPlan && extractQueryPlan() &&;
|
||||
|
||||
QueryPipelineBuilder buildQueryPipeline();
|
||||
|
||||
void addStorageLimits(const StorageLimitsList & storage_limits);
|
||||
|
||||
bool supportsTransactions() const override { return true; }
|
||||
|
@ -0,0 +1 @@
|
||||
0 0 \N
|
@ -0,0 +1,15 @@
|
||||
SET allow_experimental_analyzer = 1;
|
||||
|
||||
DROP TABLE IF EXISTS test_table;
|
||||
CREATE TABLE test_table
|
||||
(
|
||||
b Int64,
|
||||
a Int64,
|
||||
grp_aggreg AggregateFunction(groupArrayArray, Array(UInt64))
|
||||
) ENGINE = MergeTree() ORDER BY a;
|
||||
|
||||
INSERT INTO test_table SELECT 0, 0, groupArrayArrayState([toUInt64(1)]);
|
||||
|
||||
SELECT b, a, JSONLength(grp_aggreg, 100, NULL) FROM test_table SETTINGS optimize_aggregation_in_order = 1;
|
||||
|
||||
DROP TABLE test_table;
|
@ -0,0 +1,3 @@
|
||||
0 Value_0
|
||||
1 Value_1
|
||||
2 Value_2
|
@ -0,0 +1,26 @@
|
||||
SET allow_experimental_analyzer = 1;
|
||||
|
||||
DROP TABLE IF EXISTS test_table;
|
||||
CREATE TABLE test_table
|
||||
(
|
||||
id UInt64,
|
||||
value String
|
||||
) ENGINE=MergeTree ORDER BY id;
|
||||
|
||||
INSERT INTO test_table SELECT 0, 'Value_0';
|
||||
|
||||
DROP TABLE IF EXISTS test_table_data;
|
||||
CREATE TABLE test_table_data
|
||||
(
|
||||
id UInt64,
|
||||
value String
|
||||
) ENGINE=MergeTree ORDER BY id;
|
||||
|
||||
INSERT INTO test_table_data VALUES (1, 'Value_1'), (2, 'Value_2');
|
||||
|
||||
INSERT INTO test_table SELECT id, value FROM test_table_data;
|
||||
|
||||
SELECT id, value FROM test_table ORDER BY id;
|
||||
|
||||
DROP TABLE test_table_data;
|
||||
DROP TABLE test_table;
|
Loading…
Reference in New Issue
Block a user