Remove ast based code from filterKeysForPartitionPruning

This commit is contained in:
vdimir 2023-11-20 17:59:07 +00:00
parent a915eeded8
commit 95e9a27417
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862

View File

@ -229,30 +229,14 @@ static Block getBlockWithVirtuals(const NamesAndTypesList & virtual_columns, con
static void filterKeysForPartitionPruning(std::vector<String> & keys,
const String & bucket,
const NamesAndTypesList & virtual_columns,
const SelectQueryInfo & query_info,
const std::vector<ActionsDAGPtr> & filter_dags,
ContextPtr context)
{
if (keys.empty())
return;
ASTPtr filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query_info.query, virtual_columns, fs::path(bucket) / keys[0], context);
if (filter_ast)
{
std::vector<String> paths;
paths.reserve(keys.size());
for (const auto & key : keys)
paths.push_back(fs::path(bucket) / key);
VirtualColumnUtils::filterByPathOrFile(keys, paths, query_info.query, virtual_columns, context, filter_ast);
LOG_DEBUG(&Poco::Logger::get("StorageS3"), "Applied AST partition pruning {} from {} keys left", keys.size(), paths.size());
return;
}
NameToNameMap column_rename;
for (const auto & filter_dag : filter_dags)
{
if (keys.empty())
break;
auto block = getBlockWithVirtuals(virtual_columns, bucket, keys);
auto filter_actions = VirtualColumnUtils::splitFilterDagForAllowedInputs(block, filter_dag, context);
@ -260,10 +244,7 @@ static void filterKeysForPartitionPruning(std::vector<String> & keys,
continue;
VirtualColumnUtils::filterBlockWithQuery(filter_actions, block, context);
String key_column_name = "_key";
if (auto it = column_rename.find("_key"); it != column_rename.end())
key_column_name = it->second;
auto filtered_keys = VirtualColumnUtils::extractSingleValueFromBlock<String>(block, key_column_name);
std::unordered_set<String> filtered_keys = VirtualColumnUtils::extractSingleValueFromBlock<String>(block, "_key");
LOG_DEBUG(&Poco::Logger::get("StorageS3"), "Applied partition pruning {} from {} keys left", filtered_keys.size(), keys.size());
keys.clear();
keys.reserve(filtered_keys.size());
@ -1161,14 +1142,12 @@ static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
const StorageS3::Configuration & configuration,
bool distributed_processing,
ContextPtr local_context,
const SelectQueryInfo & query_info,
ASTPtr query,
const std::vector<ActionsDAGPtr> & filter_dags,
const NamesAndTypesList & virtual_columns,
StorageS3::KeysWithInfo * read_keys = nullptr,
std::function<void(FileProgress)> file_progress_callback = {})
{
ASTPtr query = query_info.query;
if (distributed_processing)
{
return std::make_shared<StorageS3Source::ReadTaskIterator>(local_context->getReadTaskCallback(), local_context->getSettingsRef().max_threads);
@ -1183,7 +1162,7 @@ static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
else
{
Strings keys = configuration.keys;
filterKeysForPartitionPruning(keys, configuration.url.bucket, virtual_columns, query_info, filter_dags, local_context);
filterKeysForPartitionPruning(keys, configuration.url.bucket, virtual_columns, filter_dags, local_context);
return std::make_shared<StorageS3Source::KeysIterator>(
*configuration.client, configuration.url.version_id, keys,
configuration.url.bucket, configuration.request_settings, read_keys, file_progress_callback);
@ -1241,7 +1220,7 @@ void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline,
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, storage.supportsSubsetOfColumns(local_context), virtual_columns);
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(
query_configuration, storage.distributed_processing, local_context, query_info, filter_dags,
query_configuration, storage.distributed_processing, local_context, query_info.query, filter_dags,
virtual_columns, nullptr, local_context->getFileProgressCallback());
size_t estimated_keys_count = iterator_wrapper->estimatedKeysCount();