Merge pull request #52327 from amosbird/fix_50183

Add back missing projection QueryAccessInfo.
This commit is contained in:
Alexey Milovidov 2023-07-22 17:50:03 +03:00 committed by GitHub
commit 04462333d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 135 additions and 10 deletions

View File

@ -1461,15 +1461,24 @@ void Context::addQueryAccessInfo(
void Context::addQueryAccessInfo(const Names & partition_names)
{
if (isGlobalContext())
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have query access info");
}
std::lock_guard<std::mutex> lock(query_access_info.mutex);
for (const auto & partition_name : partition_names)
{
query_access_info.partitions.emplace(partition_name);
}
}
void Context::addQueryAccessInfo(const QualifiedProjectionName & qualified_projection_name)
{
if (!qualified_projection_name)
return;
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have query access info");
std::lock_guard<std::mutex> lock(query_access_info.mutex);
query_access_info.projections.emplace(fmt::format(
"{}.{}", qualified_projection_name.storage_id.getFullTableName(), backQuoteIfNeed(qualified_projection_name.projection_name)));
}
void Context::addQueryFactoriesInfo(QueryLogFactories factory_type, const String & created_object) const

View File

@ -658,6 +658,14 @@ public:
const String & view_name = {});
void addQueryAccessInfo(const Names & partition_names);
struct QualifiedProjectionName
{
StorageID storage_id = StorageID::createEmpty();
String projection_name;
explicit operator bool() const { return !projection_name.empty(); }
};
void addQueryAccessInfo(const QualifiedProjectionName & qualified_projection_name);
/// Supported factories for records in query_log
enum class QueryLogFactories

View File

@ -628,8 +628,16 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
// candidates.minmax_projection->block.dumpStructure());
Pipe pipe(std::make_shared<SourceFromSingleChunk>(std::move(candidates.minmax_projection->block)));
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
projection_reading = std::make_unique<ReadFromPreparedSource>(
std::move(pipe),
context,
query_info.is_internal
? Context::QualifiedProjectionName{}
: Context::QualifiedProjectionName
{
.storage_id = reading->getMergeTreeData().getStorageID(),
.projection_name = candidates.minmax_projection->candidate.projection->name,
});
has_ordinary_parts = !candidates.minmax_projection->normal_parts.empty();
if (has_ordinary_parts)
reading->resetParts(std::move(candidates.minmax_projection->normal_parts));
@ -661,7 +669,16 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
{
auto header = proj_snapshot->getSampleBlockForColumns(best_candidate->dag->getRequiredColumnsNames());
Pipe pipe(std::make_shared<NullSource>(std::move(header)));
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
projection_reading = std::make_unique<ReadFromPreparedSource>(
std::move(pipe),
context,
query_info.is_internal
? Context::QualifiedProjectionName{}
: Context::QualifiedProjectionName
{
.storage_id = reading->getMergeTreeData().getStorageID(),
.projection_name = best_candidate->projection->name,
});
}
has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr;

View File

@ -183,7 +183,16 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
if (!projection_reading)
{
Pipe pipe(std::make_shared<NullSource>(proj_snapshot->getSampleBlockForColumns(required_columns)));
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
projection_reading = std::make_unique<ReadFromPreparedSource>(
std::move(pipe),
context,
query_info.is_internal
? Context::QualifiedProjectionName{}
: Context::QualifiedProjectionName
{
.storage_id = reading->getMergeTreeData().getStorageID(),
.projection_name = best_candidate->projection->name,
});
}
bool has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr;

View File

@ -1761,6 +1761,10 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
fmt::format("{}.{}", data.getStorageID().getFullNameNotQuoted(), part.data_part->info.partition_id));
}
context->getQueryContext()->addQueryAccessInfo(partition_names);
if (storage_snapshot->projection)
context->getQueryContext()->addQueryAccessInfo(
Context::QualifiedProjectionName{.storage_id = data.getStorageID(), .projection_name = storage_snapshot->projection->name});
}
ProfileEvents::increment(ProfileEvents::SelectedParts, result.selected_parts);

View File

@ -4,14 +4,19 @@
namespace DB
{
ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_)
ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_, ContextPtr context_, Context::QualifiedProjectionName qualified_projection_name_)
: ISourceStep(DataStream{.header = pipe_.getHeader()})
, pipe(std::move(pipe_))
, context(std::move(context_))
, qualified_projection_name(std::move(qualified_projection_name_))
{
}
void ReadFromPreparedSource::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (context && context->hasQueryContext())
context->getQueryContext()->addQueryAccessInfo(qualified_projection_name);
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);

View File

@ -1,4 +1,6 @@
#pragma once
#include <Interpreters/Context.h>
#include <Processors/QueryPlan/ISourceStep.h>
#include <QueryPipeline/Pipe.h>
@ -9,7 +11,8 @@ namespace DB
class ReadFromPreparedSource : public ISourceStep
{
public:
explicit ReadFromPreparedSource(Pipe pipe_);
explicit ReadFromPreparedSource(
Pipe pipe_, ContextPtr context_ = nullptr, Context::QualifiedProjectionName qualified_projection_name_ = {});
String getName() const override { return "ReadFromPreparedSource"; }
@ -18,6 +21,7 @@ public:
protected:
Pipe pipe;
ContextPtr context;
Context::QualifiedProjectionName qualified_projection_name;
};
class ReadFromStorageStep : public ReadFromPreparedSource

View File

@ -0,0 +1,3 @@
t.t_normal
t.t_agg
t._minmax_count_projection

View File

@ -0,0 +1,66 @@
set log_queries=1;
set log_queries_min_type='QUERY_FINISH';
set optimize_use_implicit_projections=1;
DROP TABLE IF EXISTS t;
CREATE TABLE t
(
`id` UInt64,
`id2` UInt64,
`id3` UInt64,
PROJECTION t_normal
(
SELECT
id,
id2,
id3
ORDER BY
id2,
id,
id3
),
PROJECTION t_agg
(
SELECT
sum(id3)
GROUP BY id2
)
)
ENGINE = MergeTree
ORDER BY id
SETTINGS index_granularity = 8;
insert into t SELECT number, -number, number FROM numbers(10000);
SELECT * FROM t WHERE id2 = 3 FORMAT Null;
SELECT sum(id3) FROM t GROUP BY id2 FORMAT Null;
SELECT min(id) FROM t FORMAT Null;
SYSTEM FLUSH LOGS;
SELECT
--Remove the prefix string which is a mutable database name.
arrayStringConcat(arrayPopFront(splitByString('.', projections[1])), '.')
FROM
system.query_log
WHERE
current_database=currentDatabase() and query = 'SELECT * FROM t WHERE id2 = 3 FORMAT Null;';
SELECT
--Remove the prefix string which is a mutable database name.
arrayStringConcat(arrayPopFront(splitByString('.', projections[1])), '.')
FROM
system.query_log
WHERE
current_database=currentDatabase() and query = 'SELECT sum(id3) FROM t GROUP BY id2 FORMAT Null;';
SELECT
--Remove the prefix string which is a mutable database name.
arrayStringConcat(arrayPopFront(splitByString('.', projections[1])), '.')
FROM
system.query_log
WHERE
current_database=currentDatabase() and query = 'SELECT min(id) FROM t FORMAT Null;';
DROP TABLE t;