Fix build for S3Queue

This commit is contained in:
avogar 2023-08-03 14:20:19 +00:00
parent 89bb3f3410
commit f0eb22ac5f
3 changed files with 16 additions and 98 deletions

View File

@ -148,22 +148,12 @@ StorageS3QueueSource::KeyWithInfo StorageS3QueueSource::QueueGlobIterator::next(
return KeyWithInfo();
}
Block StorageS3QueueSource::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
{
for (const auto & virtual_column : requested_virtual_columns)
sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
return sample_block;
}
StorageS3QueueSource::StorageS3QueueSource(
const std::vector<NameAndTypePair> & requested_virtual_columns_,
const ReadFromFormatInfo & info,
const String & format_,
String name_,
const Block & sample_block_,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
const S3Settings::RequestSettings & request_settings_,
String compression_hint_,
@ -174,28 +164,27 @@ StorageS3QueueSource::StorageS3QueueSource(
std::shared_ptr<S3QueueFilesMetadata> files_metadata_,
const S3QueueAction & action_,
const size_t download_thread_num_)
: ISource(getHeader(sample_block_, requested_virtual_columns_))
: ISource(info.source_header)
, WithContext(context_)
, name(std::move(name_))
, bucket(bucket_)
, version_id(version_id_)
, format(format_)
, columns_desc(columns_)
, columns_desc(info.columns_description)
, request_settings(request_settings_)
, client(client_)
, files_metadata(files_metadata_)
, requested_virtual_columns(requested_virtual_columns_)
, requested_virtual_columns(info.requested_virtual_columns)
, requested_columns(info.requested_columns)
, file_iterator(file_iterator_)
, action(action_)
{
internal_source = std::make_shared<StorageS3Source>(
requested_virtual_columns_,
info,
format_,
name_,
sample_block_,
context_,
format_settings_,
columns_,
max_block_size_,
request_settings_,
compression_hint_,

View File

@ -11,6 +11,7 @@
# include <Storages/S3Queue/S3QueueFilesMetadata.h>
# include <Storages/StorageS3.h>
# include <Storages/StorageS3Settings.h>
# include <Storages/prepareReadingFromFormat.h>
# include <IO/CompressionMethod.h>
# include <IO/S3/getObjectInfo.h>
@ -67,13 +68,11 @@ public:
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
StorageS3QueueSource(
const std::vector<NameAndTypePair> & requested_virtual_columns_,
const ReadFromFormatInfo & info,
const String & format,
String name_,
const Block & sample_block,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
const S3Settings::RequestSettings & request_settings_,
String compression_hint_,
@ -105,7 +104,8 @@ private:
using ReaderHolder = StorageS3Source::ReaderHolder;
ReaderHolder reader;
std::vector<NameAndTypePair> requested_virtual_columns;
NamesAndTypesList requested_virtual_columns;
NamesAndTypesList requested_columns;
std::shared_ptr<IIterator> file_iterator;
const S3QueueAction action;

View File

@ -33,6 +33,7 @@
# include <Storages/StorageSnapshot.h>
# include <Storages/VirtualColumnUtils.h>
# include <Storages/getVirtualsForStorage.h>
# include <Storages/prepareReadingFromFormat.h>
# include <Common/NamedCollections/NamedCollections.h>
@ -187,7 +188,7 @@ StorageS3Queue::StorageS3Queue(
bool StorageS3Queue::supportsSubcolumns() const
{
return FormatFactory::instance().checkIfFormatSupportsSubcolumns(configuration.format);
return true;
}
bool StorageS3Queue::supportsSubsetOfColumns() const
@ -213,55 +214,18 @@ Pipe StorageS3Queue::read(
auto query_configuration = updateConfigurationAndGetCopy(local_context);
Pipes pipes;
std::unordered_set<String> column_names_set(column_names.begin(), column_names.end());
std::vector<NameAndTypePair> requested_virtual_columns;
for (const auto & virtual_column : getVirtuals())
{
if (column_names_set.contains(virtual_column.name))
requested_virtual_columns.push_back(virtual_column);
}
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(local_context, query_info.query);
ColumnsDescription columns_description;
Block block_for_format;
if (supportsSubsetOfColumns())
{
auto fetch_columns = column_names;
const auto & virtuals = getVirtuals();
std::erase_if(
fetch_columns,
[&](const String & col)
{
return std::any_of(
virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col) { return col == virtual_col.name; });
});
if (fetch_columns.empty())
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
}
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
return Pipe(std::make_shared<StorageS3QueueSource>(
requested_virtual_columns,
read_from_format_info,
configuration.format,
getName(),
block_for_format,
local_context,
format_settings,
columns_description,
max_block_size,
query_configuration.request_settings,
configuration.compression_method,
@ -425,52 +389,17 @@ void StorageS3Queue::streamToViews()
auto column_names = block_io.pipeline.getHeader().getNames();
// Create a stream for each consumer and join them in a union stream
std::vector<NameAndTypePair> requested_virtual_columns;
for (const auto & virtual_column : getVirtuals())
{
requested_virtual_columns.push_back(virtual_column);
}
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(s3queue_context, nullptr);
ColumnsDescription columns_description;
Block block_for_format;
if (supportsSubsetOfColumns())
{
auto fetch_columns = column_names;
const auto & virtuals = getVirtuals();
std::erase_if(
fetch_columns,
[&](const String & col)
{
return std::any_of(
virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col) { return col == virtual_col.name; });
});
if (fetch_columns.empty())
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
}
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
const size_t max_download_threads = s3queue_context->getSettingsRef().max_download_threads;
Pipes pipes;
auto pipe = Pipe(std::make_shared<StorageS3QueueSource>(
requested_virtual_columns,
read_from_format_info,
configuration.format,
getName(),
block_for_format,
s3queue_context,
format_settings,
columns_description,
block_size,
query_configuration.request_settings,
configuration.compression_method,