Merge pull request #56668 from ClickHouse/vdimir/analyzer_s3_partition_pruning

Analyzer: filtering by virtual columns for StorageS3
This commit is contained in:
vdimir 2023-11-22 16:44:44 +01:00 committed by GitHub
commit ffbe85d3a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 221 additions and 76 deletions

View File

@ -778,23 +778,11 @@ std::optional<std::unordered_set<String>> MergeTreeDataSelectExecutor::filterPar
{
if (!filter_dag)
return {};
auto sample = data.getSampleBlockWithVirtualColumns();
std::unordered_set<const ActionsDAG::Node *> allowed_inputs;
for (const auto * input : filter_dag->getInputs())
if (sample.has(input->result_name))
allowed_inputs.insert(input);
if (allowed_inputs.empty())
auto dag = VirtualColumnUtils::splitFilterDagForAllowedInputs(sample, filter_dag, context);
if (!dag)
return {};
auto atoms = filter_dag->extractConjunctionAtoms(filter_dag->getOutputs().at(0));
atoms = ActionsDAG::filterNodesByAllowedInputs(std::move(atoms), allowed_inputs);
if (atoms.empty())
return {};
auto dag = ActionsDAG::buildFilterActionsDAG(atoms, {}, context);
auto virtual_columns_block = data.getBlockWithVirtualPartColumns(parts, false /* one_part */);
VirtualColumnUtils::filterBlockWithQuery(dag, virtual_columns_block, context);
return VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");

View File

@ -42,8 +42,12 @@
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Sources/ConstChunkGenerator.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Planner/Utils.h>
#include <Analyzer/QueryNode.h>
#include <DataTypes/DataTypeString.h>
@ -126,6 +130,129 @@ namespace ErrorCodes
extern const int FILE_DOESNT_EXIST;
}
class ReadFromStorageS3Step : public SourceStepWithFilter
{
public:
std::string getName() const override { return "ReadFromStorageS3Step"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void applyFilters() override;
ReadFromStorageS3Step(
Block sample_block,
const Names & column_names_,
StorageSnapshotPtr storage_snapshot_,
StorageS3 & storage_,
SelectQueryInfo query_info_,
ContextPtr context_,
size_t max_block_size_,
size_t num_streams_)
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)})
, column_names(column_names_)
, storage_snapshot(std::move(storage_snapshot_))
, storage(storage_)
, query_info(std::move(query_info_))
, local_context(std::move(context_))
, max_block_size(max_block_size_)
, num_streams(num_streams_)
{
}
private:
Names column_names;
StorageSnapshotPtr storage_snapshot;
StorageS3 & storage;
SelectQueryInfo query_info;
ContextPtr local_context;
size_t max_block_size;
size_t num_streams;
};
static Block getBlockWithVirtuals(const NamesAndTypesList & virtual_columns, const String & bucket, const std::unordered_set<String> & keys)
{
Block virtual_columns_block;
fs::path bucket_path(bucket);
for (const auto & [column_name, column_type] : virtual_columns)
{
if (column_name == "_path")
{
auto column = column_type->createColumn();
for (const auto & key : keys)
column->insert((bucket_path / key).string());
virtual_columns_block.insert({std::move(column), column_type, column_name});
}
else if (column_name == "_file")
{
auto column = column_type->createColumn();
for (const auto & key : keys)
{
auto pos = key.find_last_of('/');
if (pos != std::string::npos)
column->insert(key.substr(pos + 1));
else
column->insert(key);
}
virtual_columns_block.insert({std::move(column), column_type, column_name});
}
else if (column_name == "_key")
{
auto column = column_type->createColumn();
for (const auto & key : keys)
column->insert(key);
virtual_columns_block.insert({std::move(column), column_type, column_name});
}
else
{
auto column = column_type->createColumn();
column->insertManyDefaults(keys.size());
virtual_columns_block.insert({std::move(column), column_type, column_name});
}
}
/// Column _key is mandatory and may not be in virtual_columns list
if (!virtual_columns_block.has("_key"))
{
auto column_type = std::make_shared<DataTypeString>();
auto column = column_type->createColumn(); for (const auto & key : keys)
column->insert(key);
virtual_columns_block.insert({std::move(column), column_type, "_key"});
}
return virtual_columns_block;
}
static std::vector<String> filterKeysForPartitionPruning(
const std::vector<String> & keys,
const String & bucket,
const NamesAndTypesList & virtual_columns,
const std::vector<ActionsDAGPtr> & filter_dags,
ContextPtr context)
{
std::unordered_set<String> result_keys(keys.begin(), keys.end());
for (const auto & filter_dag : filter_dags)
{
if (result_keys.empty())
break;
auto block = getBlockWithVirtuals(virtual_columns, bucket, result_keys);
auto filter_actions = VirtualColumnUtils::splitFilterDagForAllowedInputs(block, filter_dag, context);
if (!filter_actions)
continue;
VirtualColumnUtils::filterBlockWithQuery(filter_actions, block, context);
result_keys = VirtualColumnUtils::extractSingleValueFromBlock<String>(block, "_key");
}
LOG_DEBUG(&Poco::Logger::get("StorageS3"), "Applied partition pruning {} from {} keys left", result_keys.size(), keys.size());
return std::vector<String>(result_keys.begin(), result_keys.end());
}
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
@ -390,7 +517,7 @@ size_t StorageS3Source::DisclosedGlobIterator::estimatedKeysCount()
return pimpl->objectsCount();
}
class StorageS3Source::KeysIterator::Impl : WithContext
class StorageS3Source::KeysIterator::Impl
{
public:
explicit Impl(
@ -399,35 +526,15 @@ public:
const std::vector<String> & keys_,
const String & bucket_,
const S3Settings::RequestSettings & request_settings_,
ASTPtr query_,
const NamesAndTypesList & virtual_columns_,
ContextPtr context_,
KeysWithInfo * read_keys_,
std::function<void(FileProgress)> file_progress_callback_)
: WithContext(context_)
, keys(keys_)
: keys(keys_)
, client(client_.clone())
, version_id(version_id_)
, bucket(bucket_)
, request_settings(request_settings_)
, query(query_)
, virtual_columns(virtual_columns_)
, file_progress_callback(file_progress_callback_)
{
ASTPtr filter_ast;
if (!keys.empty())
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, fs::path(bucket) / keys[0], getContext());
if (filter_ast)
{
std::vector<String> paths;
paths.reserve(keys.size());
for (const auto & key : keys)
paths.push_back(fs::path(bucket) / key);
VirtualColumnUtils::filterByPathOrFile(keys, paths, query, virtual_columns, getContext(), filter_ast);
}
if (read_keys_)
{
for (const auto & key : keys)
@ -463,8 +570,6 @@ private:
String version_id;
String bucket;
S3Settings::RequestSettings request_settings;
ASTPtr query;
NamesAndTypesList virtual_columns;
std::function<void(FileProgress)> file_progress_callback;
};
@ -474,14 +579,11 @@ StorageS3Source::KeysIterator::KeysIterator(
const std::vector<String> & keys_,
const String & bucket_,
const S3Settings::RequestSettings & request_settings_,
ASTPtr query,
const NamesAndTypesList & virtual_columns_,
ContextPtr context,
KeysWithInfo * read_keys,
std::function<void(FileProgress)> file_progress_callback_)
: pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(
client_, version_id_, keys_, bucket_, request_settings_,
query, virtual_columns_, context, read_keys, file_progress_callback_))
read_keys, file_progress_callback_))
{
}
@ -973,8 +1075,6 @@ private:
const String key;
const std::optional<FormatSettings> format_settings;
ExpressionActionsPtr partition_by_expr;
static void validateBucket(const String & str)
{
S3::URI::validateBucket(str, {});
@ -1046,14 +1146,15 @@ StorageS3::StorageS3(
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
}
std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
const Configuration & configuration,
static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
const StorageS3::Configuration & configuration,
bool distributed_processing,
ContextPtr local_context,
ASTPtr query,
const std::vector<ActionsDAGPtr> & filter_dags,
const NamesAndTypesList & virtual_columns,
KeysWithInfo * read_keys,
std::function<void(FileProgress)> file_progress_callback)
StorageS3::KeysWithInfo * read_keys = nullptr,
std::function<void(FileProgress)> file_progress_callback = {})
{
if (distributed_processing)
{
@ -1068,10 +1169,10 @@ std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
}
else
{
Strings keys = filterKeysForPartitionPruning(configuration.keys, configuration.url.bucket, virtual_columns, filter_dags, local_context);
return std::make_shared<StorageS3Source::KeysIterator>(
*configuration.client, configuration.url.version_id, configuration.keys,
configuration.url.bucket, configuration.request_settings, query,
virtual_columns, local_context, read_keys, file_progress_callback);
*configuration.client, configuration.url.version_id, keys,
configuration.url.bucket, configuration.request_settings, read_keys, file_progress_callback);
}
}
@ -1090,7 +1191,8 @@ bool StorageS3::parallelizeOutputAfterReading(ContextPtr context) const
return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration.format, context);
}
Pipe StorageS3::read(
void StorageS3::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -1099,15 +1201,34 @@ Pipe StorageS3::read(
size_t max_block_size,
size_t num_streams)
{
auto query_configuration = updateConfigurationAndGetCopy(local_context);
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), virtual_columns);
if (partition_by && query_configuration.withWildcard())
auto reading = std::make_unique<ReadFromStorageS3Step>(
read_from_format_info.source_header,
column_names,
storage_snapshot,
*this,
query_info,
local_context,
max_block_size,
num_streams);
query_plan.addStep(std::move(reading));
}
void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto query_configuration = storage.updateConfigurationAndGetCopy(local_context);
if (storage.partition_by && query_configuration.withWildcard())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned S3 storage is not implemented yet");
Pipes pipes;
auto virtual_columns = storage.getVirtuals();
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, storage.supportsSubsetOfColumns(local_context), virtual_columns);
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(
query_configuration, distributed_processing, local_context, query_info.query, virtual_columns, nullptr, local_context->getFileProgressCallback());
query_configuration, storage.distributed_processing, local_context, query_info.query, filter_dags,
virtual_columns, nullptr, local_context->getFileProgressCallback());
size_t estimated_keys_count = iterator_wrapper->estimatedKeysCount();
if (estimated_keys_count > 1)
@ -1116,7 +1237,6 @@ Pipe StorageS3::read(
/// Disclosed glob iterator can underestimate the amount of keys in some cases. We will keep one stream for this particular case.
num_streams = 1;
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;
@ -1124,15 +1244,16 @@ Pipe StorageS3::read(
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / std::max(num_streams, 1ul));
LOG_DEBUG(&Poco::Logger::get("StorageS3"), "Reading in {} streams, {} threads per stream", num_streams, max_parsing_threads);
Pipes pipes;
pipes.reserve(num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageS3Source>(
read_from_format_info,
query_configuration.format,
getName(),
storage.getName(),
local_context,
format_settings,
storage.format_settings,
max_block_size,
query_configuration.request_settings,
query_configuration.compression_method,
@ -1146,7 +1267,13 @@ Pipe StorageS3::read(
query_info));
}
return Pipe::unitePipes(std::move(pipes));
pipeline.init(Pipe::unitePipes(std::move(pipes)));
}
void ReadFromStorageS3Step::applyFilters()
{
/// We will use filter_dags in filterKeysForPartitionPruning called from initializePipeline, nothing to do here
}
SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
@ -1597,7 +1724,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
{
KeysWithInfo read_keys;
auto file_iterator = createFileIterator(configuration, false, ctx, nullptr, {}, &read_keys);
auto file_iterator = createFileIterator(configuration, false, ctx, {}, {}, {}, &read_keys);
std::optional<ColumnsDescription> columns_from_cache;
if (ctx->getSettingsRef().schema_inference_use_cache_for_s3)

View File

@ -103,9 +103,6 @@ public:
const std::vector<String> & keys_,
const String & bucket_,
const S3Settings::RequestSettings & request_settings_,
ASTPtr query,
const NamesAndTypesList & virtual_columns,
ContextPtr context,
KeysWithInfo * read_keys = nullptr,
std::function<void(FileProgress)> progress_callback_ = {});
@ -331,7 +328,8 @@ public:
return name;
}
Pipe read(
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -391,6 +389,7 @@ private:
friend class StorageS3Cluster;
friend class TableFunctionS3Cluster;
friend class StorageS3Queue;
friend class ReadFromStorageS3Step;
Configuration configuration;
std::mutex configuration_update_mutex;
@ -401,15 +400,6 @@ private:
std::optional<FormatSettings> format_settings;
ASTPtr partition_by;
static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
const Configuration & configuration,
bool distributed_processing,
ContextPtr local_context,
ASTPtr query,
const NamesAndTypesList & virtual_columns,
KeysWithInfo * read_keys = nullptr,
std::function<void(FileProgress)> progress_callback = {});
static ColumnsDescription getTableStructureFromDataImpl(
const Configuration & configuration,
const std::optional<FormatSettings> & format_settings,

View File

@ -445,6 +445,24 @@ void addRequestedPathAndFileVirtualsToChunk(
}
}
ActionsDAGPtr splitFilterDagForAllowedInputs(const Block & header, const ActionsDAGPtr & filter_dag, ContextPtr context)
{
std::unordered_set<const ActionsDAG::Node *> allowed_inputs;
for (const auto * input : filter_dag->getInputs())
if (header.has(input->result_name))
allowed_inputs.insert(input);
if (allowed_inputs.empty())
return {};
auto atoms = filter_dag->extractConjunctionAtoms(filter_dag->getOutputs().at(0));
atoms = ActionsDAG::filterNodesByAllowedInputs(std::move(atoms), allowed_inputs);
if (atoms.empty())
return {};
return ActionsDAG::buildFilterActionsDAG(atoms, {}, context);
}
}
}

View File

@ -35,6 +35,9 @@ bool prepareFilterBlockWithQuery(const ASTPtr & query, ContextPtr context, Block
void filterBlockWithQuery(const ASTPtr & query, Block & block, ContextPtr context, ASTPtr expression_ast = {});
void filterBlockWithQuery(ActionsDAGPtr dag, Block & block, ContextPtr context);
/// Extract subset of filter_dag that can be evaluated using only columns from header
ActionsDAGPtr splitFilterDagForAllowedInputs(const Block & header, const ActionsDAGPtr & filter_dag, ContextPtr context);
/// Extract from the input stream a set of `name` column values
template <typename T>
auto extractSingleValueFromBlock(const Block & block, const String & name)

View File

@ -24,7 +24,6 @@
01952_optimize_distributed_group_by_sharding_key
02139_MV_with_scalar_subquery
02174_cte_scalar_cache_mv
02302_s3_file_pruning
02352_grouby_shadows_arg
02354_annoy
02428_parameterized_view

View File

@ -24,4 +24,14 @@ insert into test_02302 select 1 settings s3_create_new_file_on_insert = true;
insert into test_02302 select 2 settings s3_create_new_file_on_insert = true;
select * from test_02302 where _file like '%1';
1
select _file, * from test_02302 where _file like '%1';
test_02302.1 1
set max_rows_to_read = 2;
select * from test_02302 where (_file like '%.1' OR _file like '%.2') AND a > 1;
2
set max_rows_to_read = 999;
select 'a1' as _file, * from test_02302 where _file like '%1' ORDER BY a;
a1 0
a1 1
a1 2
drop table test_02302;

View File

@ -1,5 +1,5 @@
-- Tags: no-parallel, no-fasttest
-- Tag no-fasttest: Depends on AWS
-- Tag no-fasttest: Depends on S3
-- { echo }
drop table if exists test_02302;
@ -32,4 +32,14 @@ insert into test_02302 select 1 settings s3_create_new_file_on_insert = true;
insert into test_02302 select 2 settings s3_create_new_file_on_insert = true;
select * from test_02302 where _file like '%1';
select _file, * from test_02302 where _file like '%1';
set max_rows_to_read = 2;
select * from test_02302 where (_file like '%.1' OR _file like '%.2') AND a > 1;
set max_rows_to_read = 999;
select 'a1' as _file, * from test_02302 where _file like '%1' ORDER BY a;
drop table test_02302;