Analyzer: partition pruning for S3

This commit is contained in:
vdimir 2023-11-13 11:41:40 +00:00
parent 2a59a688de
commit cbb2e02c03
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
8 changed files with 173 additions and 70 deletions

View File

@ -778,23 +778,11 @@ std::optional<std::unordered_set<String>> MergeTreeDataSelectExecutor::filterPar
{
if (!filter_dag)
return {};
auto sample = data.getSampleBlockWithVirtualColumns();
std::unordered_set<const ActionsDAG::Node *> allowed_inputs;
for (const auto * input : filter_dag->getInputs())
if (sample.has(input->result_name))
allowed_inputs.insert(input);
if (allowed_inputs.empty())
auto dag = VirtualColumnUtils::splitFilterDagForAllowedInputs(sample, filter_dag, context);
if (!dag)
return {};
auto atoms = filter_dag->extractConjunctionAtoms(filter_dag->getOutputs().at(0));
atoms = ActionsDAG::filterNodesByAllowedInputs(std::move(atoms), allowed_inputs);
if (atoms.empty())
return {};
auto dag = ActionsDAG::buildFilterActionsDAG(atoms, {}, context);
auto virtual_columns_block = data.getBlockWithVirtualPartColumns(parts, false /* one_part */);
VirtualColumnUtils::filterBlockWithQuery(dag, virtual_columns_block, context);
return VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");

View File

@ -44,6 +44,8 @@
#include <Processors/Sources/ConstChunkGenerator.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Planner/Utils.h>
#include <Analyzer/QueryNode.h>
#include <DataTypes/DataTypeString.h>
@ -126,6 +128,117 @@ namespace ErrorCodes
extern const int FILE_DOESNT_EXIST;
}
static Block getBlockWithVirtuals(const NamesAndTypesList & virtual_columns, const Strings & paths)
{
Block virtual_columns_block;
{
for (const auto & column : virtual_columns)
virtual_columns_block.insert({column.type->createColumn(), column.type, column.name});
virtual_columns_block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
for (size_t i = 0; i != paths.size(); ++i)
{
const auto & path = paths[i];
if (virtual_columns_block.has("_path"))
virtual_columns_block.getByName("_path").column->assumeMutableRef().insert(path);
if (virtual_columns_block.has("_file"))
{
auto pos = path.find_last_of('/');
String file;
if (pos != std::string::npos)
file = path.substr(pos + 1);
else
file = path;
virtual_columns_block.getByName("_file").column->assumeMutableRef().insert(file);
}
virtual_columns_block.getByName("_idx").column->assumeMutableRef().insert(i);
}
}
return virtual_columns_block;
}
static Block renameColumnsInBlock(const Block & source_block, const std::unordered_map<String, String> & rename_map)
{
auto columns = source_block.getColumnsWithTypeAndName();
for (auto & col : columns)
{
auto it = rename_map.find(col.name);
if (it != rename_map.end())
col.name = it->second;
}
return Block(std::move(columns));
}
static ActionsDAGPtr getFilterForPartitionPruning(const SelectQueryInfo & query_info,
const NamesAndTypesList & virtual_columns,
NameToNameMap & column_rename,
ContextPtr context)
{
if (!query_info.query_tree || !query_info.planner_context)
return nullptr;
const auto * query_node = query_info.query_tree->as<QueryNode>();
if (!query_node || !query_node->getWhere())
return nullptr;
Block header = getBlockWithVirtuals(virtual_columns, "", {});
const auto & table_expression_data = query_info.planner_context->getTableExpressionDataOrThrow(query_info.table_expression);
for (const auto & [column_name, column_identifier] : table_expression_data.getColumnNameToIdentifier())
column_rename.emplace(column_name, column_identifier);
header = renameColumnsInBlock(header, column_rename);
auto filter_dag = buildActionsDAGFromExpressionNode(
query_node->getWhere(), header.getColumnsWithTypeAndName(), query_info.planner_context);
if (filter_dag)
return VirtualColumnUtils::splitFilterDagForAllowedInputs(header, filter_dag, context);
return {};
}
static void filterKeysForPartitionPruning(std::vector<String> & keys, const String & bucket, const NamesAndTypesList & virtual_columns, const SelectQueryInfo & query_info, ContextPtr context)
{
ASTPtr filter_ast;
if (!keys.empty())
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query_info.query, virtual_columns, fs::path(bucket) / keys[0], context);
if (filter_ast)
{
std::vector<String> paths;
paths.reserve(keys.size());
for (const auto & key : keys)
paths.push_back(fs::path(bucket) / key);
VirtualColumnUtils::filterByPathOrFile(keys, paths, query_info.query, virtual_columns, context, filter_ast);
LOG_DEBUG(&Poco::Logger::get("StorageS3"), "Applied AST partition pruning {} from {} keys left", keys.size(), paths.size());
return;
}
NameToNameMap column_rename;
auto filter_actions = getFilterForPartitionPruning(query_info, virtual_columns, column_rename, context);
if (filter_actions)
{
auto block = getBlockWithVirtuals(virtual_columns, bucket, keys);
block = renameColumnsInBlock(block, column_rename);
VirtualColumnUtils::filterBlockWithQuery(filter_actions, block, context);
String key_column_name = "_key";
if (auto it = column_rename.find("_key"); it != column_rename.end())
key_column_name = it->second;
auto filtered_keys = VirtualColumnUtils::extractSingleValueFromBlock<String>(block, key_column_name);
LOG_DEBUG(&Poco::Logger::get("StorageS3"), "Applied partition pruning {} from {} keys left", filtered_keys.size(), keys.size());
keys.clear();
keys.reserve(filtered_keys.size());
for (auto && key : filtered_keys)
keys.emplace_back(key);
}
}
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
@ -390,7 +503,7 @@ size_t StorageS3Source::DisclosedGlobIterator::estimatedKeysCount()
return pimpl->objectsCount();
}
class StorageS3Source::KeysIterator::Impl : WithContext
class StorageS3Source::KeysIterator::Impl
{
public:
explicit Impl(
@ -399,35 +512,15 @@ public:
const std::vector<String> & keys_,
const String & bucket_,
const S3Settings::RequestSettings & request_settings_,
ASTPtr query_,
const NamesAndTypesList & virtual_columns_,
ContextPtr context_,
KeysWithInfo * read_keys_,
std::function<void(FileProgress)> file_progress_callback_)
: WithContext(context_)
, keys(keys_)
: keys(keys_)
, client(client_.clone())
, version_id(version_id_)
, bucket(bucket_)
, request_settings(request_settings_)
, query(query_)
, virtual_columns(virtual_columns_)
, file_progress_callback(file_progress_callback_)
{
ASTPtr filter_ast;
if (!keys.empty())
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, fs::path(bucket) / keys[0], getContext());
if (filter_ast)
{
std::vector<String> paths;
paths.reserve(keys.size());
for (const auto & key : keys)
paths.push_back(fs::path(bucket) / key);
VirtualColumnUtils::filterByPathOrFile(keys, paths, query, virtual_columns, getContext(), filter_ast);
}
if (read_keys_)
{
for (const auto & key : keys)
@ -463,8 +556,6 @@ private:
String version_id;
String bucket;
S3Settings::RequestSettings request_settings;
ASTPtr query;
NamesAndTypesList virtual_columns;
std::function<void(FileProgress)> file_progress_callback;
};
@ -474,14 +565,11 @@ StorageS3Source::KeysIterator::KeysIterator(
const std::vector<String> & keys_,
const String & bucket_,
const S3Settings::RequestSettings & request_settings_,
ASTPtr query,
const NamesAndTypesList & virtual_columns_,
ContextPtr context,
KeysWithInfo * read_keys,
std::function<void(FileProgress)> file_progress_callback_)
: pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(
client_, version_id_, keys_, bucket_, request_settings_,
query, virtual_columns_, context, read_keys, file_progress_callback_))
read_keys, file_progress_callback_))
{
}
@ -965,8 +1053,6 @@ private:
const String key;
const std::optional<FormatSettings> format_settings;
ExpressionActionsPtr partition_by_expr;
static void validateBucket(const String & str)
{
S3::URI::validateBucket(str, {});
@ -1038,15 +1124,17 @@ StorageS3::StorageS3(
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
}
std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
const Configuration & configuration,
static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
const StorageS3::Configuration & configuration,
bool distributed_processing,
ContextPtr local_context,
ASTPtr query,
const SelectQueryInfo & query_info,
const NamesAndTypesList & virtual_columns,
KeysWithInfo * read_keys,
std::function<void(FileProgress)> file_progress_callback)
StorageS3::KeysWithInfo * read_keys = nullptr,
std::function<void(FileProgress)> file_progress_callback = {})
{
ASTPtr query = query_info.query;
if (distributed_processing)
{
return std::make_shared<StorageS3Source::ReadTaskIterator>(local_context->getReadTaskCallback(), local_context->getSettingsRef().max_threads);
@ -1060,10 +1148,11 @@ std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
}
else
{
Strings keys = configuration.keys;
filterKeysForPartitionPruning(keys, configuration.url.bucket, virtual_columns, query_info, local_context);
return std::make_shared<StorageS3Source::KeysIterator>(
*configuration.client, configuration.url.version_id, configuration.keys,
configuration.url.bucket, configuration.request_settings, query,
virtual_columns, local_context, read_keys, file_progress_callback);
*configuration.client, configuration.url.version_id, keys,
configuration.url.bucket, configuration.request_settings, read_keys, file_progress_callback);
}
}
@ -1096,10 +1185,10 @@ Pipe StorageS3::read(
if (partition_by && query_configuration.withWildcard())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned S3 storage is not implemented yet");
Pipes pipes;
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(
query_configuration, distributed_processing, local_context, query_info.query, virtual_columns, nullptr, local_context->getFileProgressCallback());
query_configuration, distributed_processing, local_context, query_info, virtual_columns, nullptr, local_context->getFileProgressCallback());
size_t estimated_keys_count = iterator_wrapper->estimatedKeysCount();
if (estimated_keys_count > 1)
@ -1108,7 +1197,6 @@ Pipe StorageS3::read(
/// Disclosed glob iterator can underestimate the amount of keys in some cases. We will keep one stream for this particular case.
num_streams = 1;
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;
@ -1116,6 +1204,7 @@ Pipe StorageS3::read(
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / std::max(num_streams, 1ul));
LOG_DEBUG(&Poco::Logger::get("StorageS3"), "Reading in {} streams, {} threads per stream", num_streams, max_parsing_threads);
Pipes pipes;
pipes.reserve(num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
@ -1580,7 +1669,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
{
KeysWithInfo read_keys;
auto file_iterator = createFileIterator(configuration, false, ctx, nullptr, {}, &read_keys);
auto file_iterator = createFileIterator(configuration, false, ctx, {}, {}, &read_keys);
std::optional<ColumnsDescription> columns_from_cache;
if (ctx->getSettingsRef().schema_inference_use_cache_for_s3)

View File

@ -102,9 +102,6 @@ public:
const std::vector<String> & keys_,
const String & bucket_,
const S3Settings::RequestSettings & request_settings_,
ASTPtr query,
const NamesAndTypesList & virtual_columns,
ContextPtr context,
KeysWithInfo * read_keys = nullptr,
std::function<void(FileProgress)> progress_callback_ = {});
@ -400,15 +397,6 @@ private:
std::optional<FormatSettings> format_settings;
ASTPtr partition_by;
static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
const Configuration & configuration,
bool distributed_processing,
ContextPtr local_context,
ASTPtr query,
const NamesAndTypesList & virtual_columns,
KeysWithInfo * read_keys = nullptr,
std::function<void(FileProgress)> progress_callback = {});
static ColumnsDescription getTableStructureFromDataImpl(
const Configuration & configuration,
const std::optional<FormatSettings> & format_settings,

View File

@ -445,6 +445,24 @@ void addRequestedPathAndFileVirtualsToChunk(
}
}
ActionsDAGPtr splitFilterDagForAllowedInputs(const Block & header, const ActionsDAGPtr & filter_dag, ContextPtr context)
{
std::unordered_set<const ActionsDAG::Node *> allowed_inputs;
for (const auto * input : filter_dag->getInputs())
if (header.has(input->result_name))
allowed_inputs.insert(input);
if (allowed_inputs.empty())
return {};
auto atoms = filter_dag->extractConjunctionAtoms(filter_dag->getOutputs().at(0));
atoms = ActionsDAG::filterNodesByAllowedInputs(std::move(atoms), allowed_inputs);
if (atoms.empty())
return {};
return ActionsDAG::buildFilterActionsDAG(atoms, {}, context);
}
}
}

View File

@ -34,6 +34,7 @@ bool prepareFilterBlockWithQuery(const ASTPtr & query, ContextPtr context, Block
/// If `expression_ast` is passed, use it to filter block.
void filterBlockWithQuery(const ASTPtr & query, Block & block, ContextPtr context, ASTPtr expression_ast = {});
void filterBlockWithQuery(ActionsDAGPtr dag, Block & block, ContextPtr context);
ActionsDAGPtr splitFilterDagForAllowedInputs(const Block & header, const ActionsDAGPtr & filter_dag, ContextPtr context);
/// Extract from the input stream a set of `name` column values
template <typename T>

View File

@ -24,7 +24,6 @@
01952_optimize_distributed_group_by_sharding_key
02139_MV_with_scalar_subquery
02174_cte_scalar_cache_mv
02302_s3_file_pruning
02352_grouby_shadows_arg
02354_annoy
02428_parameterized_view

View File

@ -24,4 +24,14 @@ insert into test_02302 select 1 settings s3_create_new_file_on_insert = true;
insert into test_02302 select 2 settings s3_create_new_file_on_insert = true;
select * from test_02302 where _file like '%1';
1
select _file, * from test_02302 where _file like '%1';
test_02302.1 1
set max_rows_to_read = 2;
select * from test_02302 where (_file like '%.1' OR _file like '%.2') AND a > 1;
2
set max_rows_to_read = 999;
select 'a1' as _file, * from test_02302 where _file like '%1' ORDER BY a;
a1 0
a1 1
a1 2
drop table test_02302;

View File

@ -1,5 +1,5 @@
-- Tags: no-parallel, no-fasttest
-- Tag no-fasttest: Depends on AWS
-- Tag no-fasttest: Depends on S3
-- { echo }
drop table if exists test_02302;
@ -32,4 +32,14 @@ insert into test_02302 select 1 settings s3_create_new_file_on_insert = true;
insert into test_02302 select 2 settings s3_create_new_file_on_insert = true;
select * from test_02302 where _file like '%1';
select _file, * from test_02302 where _file like '%1';
set max_rows_to_read = 2;
select * from test_02302 where (_file like '%.1' OR _file like '%.2') AND a > 1;
set max_rows_to_read = 999;
select 'a1' as _file, * from test_02302 where _file like '%1' ORDER BY a;
drop table test_02302;