Merge pull request #44493 from xiedeyantu/s3_optimize

If user only need virtual columns, we don't need to initialize ReadBufferFromS3
This commit is contained in:
Kruglov Pavel 2022-12-30 15:44:20 +01:00 committed by GitHub
commit 0844fe7089
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 65 additions and 33 deletions

View File

@ -529,7 +529,8 @@ StorageS3Source::StorageS3Source(
const String & bucket_,
const String & version_id_,
std::shared_ptr<IIterator> file_iterator_,
const size_t download_thread_num_)
const size_t download_thread_num_,
bool only_need_virtual_columns_)
: ISource(getHeader(sample_block_, requested_virtual_columns_))
, WithContext(context_)
, name(std::move(name_))
@ -543,12 +544,17 @@ StorageS3Source::StorageS3Source(
, client(client_)
, sample_block(sample_block_)
, format_settings(format_settings_)
, only_need_virtual_columns(only_need_virtual_columns_)
, requested_virtual_columns(requested_virtual_columns_)
, file_iterator(file_iterator_)
, download_thread_num(download_thread_num_)
, create_reader_pool(1)
, create_reader_scheduler(threadPoolCallbackRunner<ReaderHolder>(create_reader_pool, "CreateS3Reader"))
{
/// If user only need virtual columns, StorageS3Source does not use ReaderHolder and does not initialize ReadBufferFromS3.
if (only_need_virtual_columns)
return;
reader = createReader();
if (reader)
reader_future = createReaderAsync();
@ -683,6 +689,35 @@ String StorageS3Source::getName() const
Chunk StorageS3Source::generate()
{
auto add_virtual_columns = [&](Chunk & chunk, const String & file_path, UInt64 num_rows)
{
for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
{
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, file_path)->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = file_path.find_last_of('/');
auto column = virtual_column.type->createColumnConst(num_rows, file_path.substr(last_slash_pos + 1));
chunk.addColumn(column->convertToFullColumnIfConst());
}
}
};
if (only_need_virtual_columns)
{
Chunk chunk;
auto current_key = (*file_iterator)().key;
if (!current_key.empty())
{
const auto & file_path = fs::path(bucket) / current_key;
add_virtual_columns(chunk, file_path, 1);
}
return chunk;
}
while (true)
{
if (!reader || isCancelled())
@ -701,20 +736,7 @@ Chunk StorageS3Source::generate()
*this, chunk, total_size, total_rows_approx_accumulated, total_rows_count_times, total_rows_approx_max);
}
for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
{
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, file_path)->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = file_path.find_last_of('/');
auto column = virtual_column.type->createColumnConst(num_rows, file_path.substr(last_slash_pos + 1));
chunk.addColumn(column->convertToFullColumnIfConst());
}
}
add_virtual_columns(chunk, file_path, num_rows);
return chunk;
}
@ -1035,6 +1057,10 @@ Pipe StorageS3::read(
requested_virtual_columns.push_back(virtual_column);
}
bool only_need_virtual_columns = true;
if (column_names_set.size() > requested_virtual_columns.size())
only_need_virtual_columns = false;
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(
s3_configuration,
keys,
@ -1047,25 +1073,28 @@ Pipe StorageS3::read(
ColumnsDescription columns_description;
Block block_for_format;
if (supportsSubsetOfColumns())
if (!only_need_virtual_columns)
{
auto fetch_columns = column_names;
const auto & virtuals = getVirtuals();
std::erase_if(
fetch_columns,
[&](const String & col)
{ return std::any_of(virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col){ return col == virtual_col.name; }); });
if (supportsSubsetOfColumns())
{
auto fetch_columns = column_names;
const auto & virtuals = getVirtuals();
std::erase_if(
fetch_columns,
[&](const String & col)
{ return std::any_of(virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col){ return col == virtual_col.name; }); });
if (fetch_columns.empty())
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()));
if (fetch_columns.empty())
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()));
columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
}
}
const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
@ -1086,7 +1115,8 @@ Pipe StorageS3::read(
s3_configuration.uri.bucket,
s3_configuration.uri.version_id,
iterator_wrapper,
max_download_threads));
max_download_threads,
only_need_virtual_columns));
}
auto pipe = Pipe::unitePipes(std::move(pipes));

View File

@ -137,7 +137,8 @@ public:
const String & bucket,
const String & version_id,
std::shared_ptr<IIterator> file_iterator_,
size_t download_thread_num);
size_t download_thread_num,
bool only_need_virtual_columns_ = false);
~StorageS3Source() override;
@ -159,6 +160,7 @@ private:
std::shared_ptr<const Aws::S3::S3Client> client;
Block sample_block;
std::optional<FormatSettings> format_settings;
bool only_need_virtual_columns{false};
struct ReaderHolder
{