Add back missing projection QueryAccessInfo.

This commit is contained in:
Amos Bird 2023-07-19 21:28:17 +08:00
parent 234b5047b5
commit 53d77e6b13
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
9 changed files with 114 additions and 9 deletions

View File

@ -1461,15 +1461,20 @@ void Context::addQueryAccessInfo(
void Context::addQueryAccessInfo(const Names & partition_names) void Context::addQueryAccessInfo(const Names & partition_names)
{ {
if (isGlobalContext()) if (isGlobalContext())
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have query access info"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have query access info");
}
std::lock_guard<std::mutex> lock(query_access_info.mutex); std::lock_guard<std::mutex> lock(query_access_info.mutex);
for (const auto & partition_name : partition_names) for (const auto & partition_name : partition_names)
{
query_access_info.partitions.emplace(partition_name); query_access_info.partitions.emplace(partition_name);
} }
void Context::addQueryAccessInfo(const String & qualified_projection_name)
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have query access info");
std::lock_guard<std::mutex> lock(query_access_info.mutex);
query_access_info.projections.emplace(qualified_projection_name);
} }
void Context::addQueryFactoriesInfo(QueryLogFactories factory_type, const String & created_object) const void Context::addQueryFactoriesInfo(QueryLogFactories factory_type, const String & created_object) const

View File

@ -657,6 +657,7 @@ public:
const String & projection_name = {}, const String & projection_name = {},
const String & view_name = {}); const String & view_name = {});
void addQueryAccessInfo(const Names & partition_names); void addQueryAccessInfo(const Names & partition_names);
void addQueryAccessInfo(const String & qualified_projection_name);
/// Supported factories for records in query_log /// Supported factories for records in query_log

View File

@ -625,7 +625,14 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
// candidates.minmax_projection->block.dumpStructure()); // candidates.minmax_projection->block.dumpStructure());
Pipe pipe(std::make_shared<SourceFromSingleChunk>(std::move(candidates.minmax_projection->block))); Pipe pipe(std::make_shared<SourceFromSingleChunk>(std::move(candidates.minmax_projection->block)));
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe)); projection_reading = std::make_unique<ReadFromPreparedSource>(
std::move(pipe),
context,
query_info.is_internal ? ""
: fmt::format(
"{}.{}",
reading->getMergeTreeData().getStorageID().getFullTableName(),
backQuoteIfNeed(candidates.minmax_projection->candidate.projection->name)));
has_ordinary_parts = !candidates.minmax_projection->normal_parts.empty(); has_ordinary_parts = !candidates.minmax_projection->normal_parts.empty();
if (has_ordinary_parts) if (has_ordinary_parts)
@ -658,7 +665,14 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
{ {
auto header = proj_snapshot->getSampleBlockForColumns(best_candidate->dag->getRequiredColumnsNames()); auto header = proj_snapshot->getSampleBlockForColumns(best_candidate->dag->getRequiredColumnsNames());
Pipe pipe(std::make_shared<NullSource>(std::move(header))); Pipe pipe(std::make_shared<NullSource>(std::move(header)));
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe)); projection_reading = std::make_unique<ReadFromPreparedSource>(
std::move(pipe),
context,
query_info.is_internal ? ""
: fmt::format(
"{}.{}",
reading->getMergeTreeData().getStorageID().getFullTableName(),
backQuoteIfNeed(best_candidate->projection->name)));
} }
has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr; has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr;

View File

@ -183,7 +183,14 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
if (!projection_reading) if (!projection_reading)
{ {
Pipe pipe(std::make_shared<NullSource>(proj_snapshot->getSampleBlockForColumns(required_columns))); Pipe pipe(std::make_shared<NullSource>(proj_snapshot->getSampleBlockForColumns(required_columns)));
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe)); projection_reading = std::make_unique<ReadFromPreparedSource>(
std::move(pipe),
context,
query_info.is_internal ? ""
: fmt::format(
"{}.{}",
reading->getMergeTreeData().getStorageID().getFullTableName(),
backQuoteIfNeed(best_candidate->projection->name)));
} }
bool has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr; bool has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr;

View File

@ -1761,6 +1761,10 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
fmt::format("{}.{}", data.getStorageID().getFullNameNotQuoted(), part.data_part->info.partition_id)); fmt::format("{}.{}", data.getStorageID().getFullNameNotQuoted(), part.data_part->info.partition_id));
} }
context->getQueryContext()->addQueryAccessInfo(partition_names); context->getQueryContext()->addQueryAccessInfo(partition_names);
if (storage_snapshot->projection)
context->getQueryContext()->addQueryAccessInfo(
fmt::format("{}.{}", data.getStorageID().getFullTableName(), backQuoteIfNeed(storage_snapshot->projection->name)));
} }
ProfileEvents::increment(ProfileEvents::SelectedParts, result.selected_parts); ProfileEvents::increment(ProfileEvents::SelectedParts, result.selected_parts);

View File

@ -1,17 +1,23 @@
#include <Interpreters/Context.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h> #include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <QueryPipeline/QueryPipelineBuilder.h> #include <QueryPipeline/QueryPipelineBuilder.h>
namespace DB namespace DB
{ {
ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_) ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_, ContextPtr context_, const String & qualified_projection_name_)
: ISourceStep(DataStream{.header = pipe_.getHeader()}) : ISourceStep(DataStream{.header = pipe_.getHeader()})
, pipe(std::move(pipe_)) , pipe(std::move(pipe_))
, context(context_)
, qualified_projection_name(qualified_projection_name_)
{ {
} }
void ReadFromPreparedSource::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) void ReadFromPreparedSource::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{ {
if (context && context->hasQueryContext() && !qualified_projection_name.empty())
context->getQueryContext()->addQueryAccessInfo(qualified_projection_name);
for (const auto & processor : pipe.getProcessors()) for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor); processors.emplace_back(processor);

View File

@ -9,7 +9,7 @@ namespace DB
class ReadFromPreparedSource : public ISourceStep class ReadFromPreparedSource : public ISourceStep
{ {
public: public:
explicit ReadFromPreparedSource(Pipe pipe_); explicit ReadFromPreparedSource(Pipe pipe_, ContextPtr context_ = nullptr, const String & qualified_projection_name_ = "");
String getName() const override { return "ReadFromPreparedSource"; } String getName() const override { return "ReadFromPreparedSource"; }
@ -18,6 +18,7 @@ public:
protected: protected:
Pipe pipe; Pipe pipe;
ContextPtr context; ContextPtr context;
String qualified_projection_name;
}; };
class ReadFromStorageStep : public ReadFromPreparedSource class ReadFromStorageStep : public ReadFromPreparedSource

View File

@ -0,0 +1,3 @@
t.t_normal
t.t_agg
t._minmax_count_projection

View File

@ -0,0 +1,64 @@
set log_queries=1;
set log_queries_min_type='QUERY_FINISH';
set optimize_use_implicit_projections=1;
DROP TABLE IF EXISTS t;
CREATE TABLE t
(
`id` UInt64,
`id2` UInt64,
`id3` UInt64,
PROJECTION t_normal
(
SELECT
id,
id2,
id3
ORDER BY
id2,
id,
id3
),
PROJECTION t_agg
(
SELECT
sum(id3)
GROUP BY id2
)
)
ENGINE = MergeTree
ORDER BY id
SETTINGS index_granularity = 8;
insert into t SELECT number, -number, number FROM numbers(10000);
SELECT * FROM t WHERE id2 = 3 FORMAT Null;
SELECT sum(id3) FROM t GROUP BY id2 FORMAT Null;
SELECT min(id) FROM t FORMAT Null;
SYSTEM FLUSH LOGS;
SELECT
--Remove the prefix string which is a mutable database name.
arrayStringConcat(arrayPopFront(splitByString('.', projections[1])), '.')
FROM
system.query_log
WHERE
current_database=currentDatabase() and query = 'SELECT * FROM t WHERE id2 = 3 FORMAT Null;';
SELECT
--Remove the prefix string which is a mutable database name.
arrayStringConcat(arrayPopFront(splitByString('.', projections[1])), '.')
FROM
system.query_log
WHERE
current_database=currentDatabase() and query = 'SELECT sum(id3) FROM t GROUP BY id2 FORMAT Null;';
SELECT
--Remove the prefix string which is a mutable database name.
arrayStringConcat(arrayPopFront(splitByString('.', projections[1])), '.')
FROM
system.query_log
WHERE
current_database=currentDatabase() and query = 'SELECT min(id) FROM t FORMAT Null;';