partition pruning for s3

This commit is contained in:
Amos Bird 2022-05-19 19:18:58 +08:00
parent 8d876653e8
commit 093d315756
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
7 changed files with 264 additions and 49 deletions

View File

@ -250,7 +250,10 @@ ColumnPtr IExecutableFunction::executeWithoutSparseColumns(const ColumnsWithType
: columns_without_low_cardinality.front().column->size();
auto res = executeWithoutLowCardinalityColumns(columns_without_low_cardinality, dictionary_type, new_input_rows_count, dry_run);
auto keys = res->convertToFullColumnIfConst();
bool res_is_constant = isColumnConst(*res);
auto keys = res_is_constant
? res->cloneResized(std::min(static_cast<size_t>(1), input_rows_count))->convertToFullColumnIfConst()
: res;
auto res_mut_dictionary = DataTypeLowCardinality::createColumnUnique(*res_low_cardinality_type->getDictionaryType());
ColumnPtr res_indexes = res_mut_dictionary->uniqueInsertRangeFrom(*keys, 0, keys->size());
@ -260,6 +263,9 @@ ColumnPtr IExecutableFunction::executeWithoutSparseColumns(const ColumnsWithType
result = ColumnLowCardinality::create(res_dictionary, res_indexes->index(*indexes, 0));
else
result = ColumnLowCardinality::create(res_dictionary, res_indexes);
if (res_is_constant)
result = ColumnConst::create(std::move(result), input_rows_count);
}
else
{

View File

@ -25,6 +25,7 @@
#include <Storages/StorageS3Settings.h>
#include <Storages/StorageSnapshot.h>
#include <Storages/PartitionedSink.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/getVirtualsForStorage.h>
#include <IO/ReadBufferFromS3.h>
@ -75,17 +76,18 @@ namespace ErrorCodes
extern const int UNEXPECTED_EXPRESSION;
extern const int DATABASE_ACCESS_DENIED;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int NOT_IMPLEMENTED;
}
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
class StorageS3Source::DisclosedGlobIterator::Impl
class StorageS3Source::DisclosedGlobIterator::Impl : WithContext
{
public:
Impl(Aws::S3::S3Client & client_, const S3::URI & globbed_uri_)
: client(client_), globbed_uri(globbed_uri_)
Impl(Aws::S3::S3Client & client_, const S3::URI & globbed_uri_, ASTPtr & query_, const Block & virtual_header_, ContextPtr context_)
: WithContext(context_), client(client_), globbed_uri(globbed_uri_), query(query_), virtual_header(virtual_header_)
{
if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos)
throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION);
@ -101,6 +103,20 @@ public:
return;
}
/// Create a virtual block with one row to construct filter
if (query && virtual_header)
{
/// Append "key" column as the filter result
virtual_header.insert({ColumnString::create(), std::make_shared<DataTypeString>(), "_key"});
auto block = virtual_header.cloneEmpty();
MutableColumns columns = block.mutateColumns();
for (auto & column : columns)
column->insertDefault();
block.setColumns(std::move(columns));
VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast);
}
request.SetBucket(globbed_uri.bucket);
request.SetPrefix(key_prefix);
matcher = std::make_unique<re2::RE2>(makeRegexpPatternFromGlobs(globbed_uri.key));
@ -144,13 +160,52 @@ private:
const auto & result_batch = outcome.GetResult().GetContents();
buffer.reserve(result_batch.size());
for (const auto & row : result_batch)
if (filter_ast)
{
String key = row.GetKey();
if (re2::RE2::FullMatch(key, *matcher))
buffer.emplace_back(std::move(key));
auto block = virtual_header.cloneEmpty();
MutableColumnPtr path_column;
MutableColumnPtr file_column;
MutableColumnPtr key_column = block.getByName("_key").column->assumeMutable();
if (block.has("_path"))
path_column = block.getByName("_path").column->assumeMutable();
if (block.has("_file"))
file_column = block.getByName("_file").column->assumeMutable();
for (const auto & row : result_batch)
{
const String & key = row.GetKey();
if (re2::RE2::FullMatch(key, *matcher))
{
String path = fs::path(globbed_uri.bucket) / key;
String file = path.substr(path.find_last_of('/') + 1);
if (path_column)
path_column->insert(path);
if (file_column)
file_column->insert(file);
key_column->insert(key);
}
}
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
const ColumnString & keys = typeid_cast<const ColumnString &>(*block.getByName("_key").column);
size_t rows = block.rows();
buffer.reserve(rows);
for (size_t i = 0; i < rows; ++i)
buffer.emplace_back(keys.getDataAt(i).toString());
}
else
{
buffer.reserve(result_batch.size());
for (const auto & row : result_batch)
{
String key = row.GetKey();
if (re2::RE2::FullMatch(key, *matcher))
buffer.emplace_back(std::move(key));
}
}
/// Set iterator only after the whole batch is processed
buffer_iter = buffer.begin();
@ -165,25 +220,83 @@ private:
Strings::iterator buffer_iter;
Aws::S3::S3Client client;
S3::URI globbed_uri;
ASTPtr query;
Block virtual_header;
ASTPtr filter_ast;
Aws::S3::Model::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
std::unique_ptr<re2::RE2> matcher;
bool is_finished{false};
};
StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(Aws::S3::S3Client & client_, const S3::URI & globbed_uri_)
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_)) {}
StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
Aws::S3::S3Client & client_, const S3::URI & globbed_uri_, ASTPtr query, const Block & virtual_header, ContextPtr context)
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, query, virtual_header, context))
{
}
String StorageS3Source::DisclosedGlobIterator::next()
{
return pimpl->next();
}
class StorageS3Source::KeysIterator::Impl
class StorageS3Source::KeysIterator::Impl : WithContext
{
public:
explicit Impl(const std::vector<String> & keys_) : keys(keys_)
explicit Impl(
const std::vector<String> & keys_, const String & bucket_, ASTPtr query_, const Block & virtual_header_, ContextPtr context_)
: WithContext(context_), keys(keys_), bucket(bucket_), query(query_), virtual_header(virtual_header_)
{
/// Create a virtual block with one row to construct filter
if (query && virtual_header)
{
/// Append "key" column as the filter result
virtual_header.insert({ColumnString::create(), std::make_shared<DataTypeString>(), "_key"});
auto block = virtual_header.cloneEmpty();
MutableColumns columns = block.mutateColumns();
for (auto & column : columns)
column->insertDefault();
block.setColumns(std::move(columns));
ASTPtr filter_ast;
VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast);
if (filter_ast)
{
block = virtual_header.cloneEmpty();
MutableColumnPtr path_column;
MutableColumnPtr file_column;
MutableColumnPtr key_column = block.getByName("_key").column->assumeMutable();
if (block.has("_path"))
path_column = block.getByName("_path").column->assumeMutable();
if (block.has("_file"))
file_column = block.getByName("_file").column->assumeMutable();
for (const auto & key : keys)
{
String path = fs::path(bucket) / key;
String file = path.substr(path.find_last_of('/') + 1);
if (path_column)
path_column->insert(path);
if (file_column)
file_column->insert(file);
key_column->insert(key);
}
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
const ColumnString & keys_col = typeid_cast<const ColumnString &>(*block.getByName("_key").column);
size_t rows = block.rows();
Strings filtered_keys;
filtered_keys.reserve(rows);
for (size_t i = 0; i < rows; ++i)
filtered_keys.emplace_back(keys_col.getDataAt(i).toString());
keys = std::move(filtered_keys);
}
}
}
String next()
@ -197,9 +310,15 @@ public:
private:
Strings keys;
std::atomic_size_t index = 0;
String bucket;
ASTPtr query;
Block virtual_header;
};
StorageS3Source::KeysIterator::KeysIterator(const std::vector<String> & keys_) : pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(keys_))
StorageS3Source::KeysIterator::KeysIterator(
const std::vector<String> & keys_, const String & bucket_, ASTPtr query, const Block & virtual_header, ContextPtr context)
: pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(keys_, bucket_, query, virtual_header, context))
{
}
@ -639,6 +758,8 @@ StorageS3::StorageS3(
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
for (const auto & column : virtual_columns)
virtual_block.insert({column.type->createColumn(), column.type, column.name});
}
std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
@ -647,6 +768,8 @@ std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
bool is_key_with_globs,
bool distributed_processing,
ContextPtr local_context,
ASTPtr query,
const Block & virtual_block,
const std::vector<String> & read_tasks)
{
if (distributed_processing)
@ -660,19 +783,15 @@ std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
else if (is_key_with_globs)
{
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(*s3_configuration.client, s3_configuration.uri);
return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]()
{
return glob_iterator->next();
});
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query, virtual_block, local_context);
return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]() { return glob_iterator->next(); });
}
else
{
auto keys_iterator = std::make_shared<StorageS3Source::KeysIterator>(keys);
return std::make_shared<StorageS3Source::IteratorWrapper>([keys_iterator]()
{
return keys_iterator->next();
});
auto keys_iterator
= std::make_shared<StorageS3Source::KeysIterator>(keys, s3_configuration.uri.bucket, query, virtual_block, local_context);
return std::make_shared<StorageS3Source::IteratorWrapper>([keys_iterator]() { return keys_iterator->next(); });
}
}
@ -684,12 +803,17 @@ bool StorageS3::isColumnOriented() const
Pipe StorageS3::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned num_streams)
{
bool has_wildcards = s3_configuration.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos
|| keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
if (partition_by && has_wildcards)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned S3 storage is not implemented yet");
updateS3Configuration(local_context, s3_configuration);
Pipes pipes;
@ -703,7 +827,15 @@ Pipe StorageS3::read(
requested_virtual_columns.push_back(virtual_column);
}
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper = createFileIterator(s3_configuration, keys, is_key_with_globs, distributed_processing, local_context, read_tasks_used_in_schema_inference);
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper = createFileIterator(
s3_configuration,
keys,
is_key_with_globs,
distributed_processing,
local_context,
query_info.query,
virtual_block,
read_tasks_used_in_schema_inference);
ColumnsDescription columns_description;
Block block_for_format;
@ -999,7 +1131,8 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
ContextPtr ctx,
std::vector<String> * read_keys_in_distributed_processing)
{
auto file_iterator = createFileIterator(s3_configuration, {s3_configuration.uri.key}, is_key_with_globs, distributed_processing, ctx);
auto file_iterator
= createFileIterator(s3_configuration, {s3_configuration.uri.key}, is_key_with_globs, distributed_processing, ctx, nullptr, {});
ReadBufferIterator read_buffer_iterator = [&, first = false]() mutable -> std::unique_ptr<ReadBuffer>
{

View File

@ -34,25 +34,28 @@ class StorageS3Source : public SourceWithProgress, WithContext
public:
class DisclosedGlobIterator
{
public:
DisclosedGlobIterator(Aws::S3::S3Client &, const S3::URI &);
String next();
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
public:
DisclosedGlobIterator(
Aws::S3::S3Client & client_, const S3::URI & globbed_uri_, ASTPtr query, const Block & virtual_header, ContextPtr context);
String next();
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
class KeysIterator
{
public:
explicit KeysIterator(const std::vector<String> & keys_);
String next();
public:
explicit KeysIterator(
const std::vector<String> & keys_, const String & bucket_, ASTPtr query, const Block & virtual_header, ContextPtr context);
String next();
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
class ReadTasksIterator
@ -203,6 +206,7 @@ private:
S3Configuration s3_configuration;
std::vector<String> keys;
NamesAndTypesList virtual_columns;
Block virtual_block;
String format_name;
String compression_method;
@ -222,6 +226,8 @@ private:
bool is_key_with_globs,
bool distributed_processing,
ContextPtr local_context,
ASTPtr query,
const Block & virtual_block,
const std::vector<String> & read_tasks = {});
static ColumnsDescription getTableStructureFromDataImpl(

View File

@ -30,6 +30,7 @@
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/getVirtualsForStorage.h>
#include <Common/logger_useful.h>
#include <aws/core/auth/AWSCredentials.h>
@ -82,6 +83,15 @@ StorageS3Cluster::StorageS3Cluster(
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
for (const auto & column : virtual_columns)
virtual_block.insert({column.type->createColumn(), column.type, column.name});
}
/// The code executes on initiator
@ -98,11 +108,9 @@ Pipe StorageS3Cluster::read(
auto cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(*s3_configuration.client, s3_configuration.uri);
auto callback = std::make_shared<StorageS3Source::IteratorWrapper>([iterator]() mutable -> String
{
return iterator->next();
});
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query_info.query, virtual_block, context);
auto callback = std::make_shared<StorageS3Source::IteratorWrapper>([iterator]() mutable -> String { return iterator->next(); });
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
Block header =
@ -164,9 +172,7 @@ QueryProcessingStage::Enum StorageS3Cluster::getQueryProcessingStage(
NamesAndTypesList StorageS3Cluster::getVirtuals() const
{
return NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
return virtual_columns;
}

View File

@ -49,6 +49,8 @@ private:
String cluster_name;
String format_name;
String compression_method;
NamesAndTypesList virtual_columns;
Block virtual_block;
};

View File

@ -0,0 +1,27 @@
-- { echo }
drop table if exists test_02302;
create table test_02302 (a UInt64) engine = S3(s3_conn, filename='test_02302_{_partition_id}', format=Parquet) partition by a;
insert into test_02302 select number from numbers(10) settings s3_truncate_on_insert=1;
select * from test_02302; -- { serverError 48 }
drop table test_02302;
set max_rows_to_read = 1;
-- Test s3 table function with glob
select * from s3(s3_conn, filename='test_02302_*', format=Parquet) where _file like '%5';
5
-- Test s3 table with explicit keys (no glob)
-- TODO support truncate table function
drop table if exists test_02302;
create table test_02302 (a UInt64) engine = S3(s3_conn, filename='test_02302.2', format=Parquet);
truncate table test_02302;
drop table if exists test_02302;
create table test_02302 (a UInt64) engine = S3(s3_conn, filename='test_02302.1', format=Parquet);
truncate table test_02302;
drop table if exists test_02302;
create table test_02302 (a UInt64) engine = S3(s3_conn, filename='test_02302', format=Parquet);
truncate table test_02302;
insert into test_02302 select 0 settings s3_create_new_file_on_insert = true;
insert into test_02302 select 1 settings s3_create_new_file_on_insert = true;
insert into test_02302 select 2 settings s3_create_new_file_on_insert = true;
select * from test_02302 where _file like '%1';
1
drop table test_02302;

View File

@ -0,0 +1,35 @@
-- Tags: no-fasttest
-- Tag no-fasttest: Depends on AWS
-- { echo }
drop table if exists test_02302;
create table test_02302 (a UInt64) engine = S3(s3_conn, filename='test_02302_{_partition_id}', format=Parquet) partition by a;
insert into test_02302 select number from numbers(10) settings s3_truncate_on_insert=1;
select * from test_02302; -- { serverError 48 }
drop table test_02302;
set max_rows_to_read = 1;
-- Test s3 table function with glob
select * from s3(s3_conn, filename='test_02302_*', format=Parquet) where _file like '%5';
-- Test s3 table with explicit keys (no glob)
-- TODO support truncate table function
drop table if exists test_02302;
create table test_02302 (a UInt64) engine = S3(s3_conn, filename='test_02302.2', format=Parquet);
truncate table test_02302;
drop table if exists test_02302;
create table test_02302 (a UInt64) engine = S3(s3_conn, filename='test_02302.1', format=Parquet);
truncate table test_02302;
drop table if exists test_02302;
create table test_02302 (a UInt64) engine = S3(s3_conn, filename='test_02302', format=Parquet);
truncate table test_02302;
insert into test_02302 select 0 settings s3_create_new_file_on_insert = true;
insert into test_02302 select 1 settings s3_create_new_file_on_insert = true;
insert into test_02302 select 2 settings s3_create_new_file_on_insert = true;
select * from test_02302 where _file like '%1';
drop table test_02302;