Support reading subcolumns from file/s3/hdfs/url/azureBlobStorage table functions

This commit is contained in:
avogar 2023-07-04 16:50:31 +00:00
parent 305e3d1f66
commit 98aa6b317f
35 changed files with 541 additions and 399 deletions

View File

@ -825,8 +825,11 @@ class IColumn;
MAKE_OBSOLETE(M, Seconds, drain_timeout, 3) \
MAKE_OBSOLETE(M, UInt64, backup_threads, 16) \
MAKE_OBSOLETE(M, UInt64, restore_threads, 16) \
MAKE_OBSOLETE(M, Bool, input_format_arrow_import_nested, false) \
MAKE_OBSOLETE(M, Bool, input_format_parquet_import_nested, false) \
MAKE_OBSOLETE(M, Bool, input_format_orc_import_nested, false) \
/** The section above is for obsolete settings. Do not add anything there. */
/** The section above is for obsolete settings. Do not add anything there. */
#define FORMAT_FACTORY_SETTINGS(M, ALIAS) \
@ -845,12 +848,9 @@ class IColumn;
M(Bool, input_format_tsv_empty_as_default, false, "Treat empty fields in TSV input as default values.", 0) \
M(Bool, input_format_tsv_enum_as_number, false, "Treat inserted enum values in TSV formats as enum indices.", 0) \
M(Bool, input_format_null_as_default, true, "Initialize null fields with default values if the data type of this field is not nullable and it is supported by the input format", 0) \
M(Bool, input_format_arrow_import_nested, false, "Allow to insert array of structs into Nested table in Arrow input format.", 0) \
M(Bool, input_format_arrow_case_insensitive_column_matching, false, "Ignore case when matching Arrow columns with CH columns.", 0) \
M(Bool, input_format_orc_import_nested, false, "Allow to insert array of structs into Nested table in ORC input format.", 0) \
M(Int64, input_format_orc_row_batch_size, 100'000, "Batch size when reading ORC stripes.", 0) \
M(Bool, input_format_orc_case_insensitive_column_matching, false, "Ignore case when matching ORC columns with CH columns.", 0) \
M(Bool, input_format_parquet_import_nested, false, "Allow to insert array of structs into Nested table in Parquet input format.", 0) \
M(Bool, input_format_parquet_case_insensitive_column_matching, false, "Ignore case when matching Parquet columns with CH columns.", 0) \
M(Bool, input_format_parquet_preserve_order, false, "Avoid reordering rows when reading from Parquet files. Usually makes it much slower.", 0) \
M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \

View File

@ -71,6 +71,12 @@ void applySettingsQuirks(Settings & settings, Poco::Logger * log)
}
}
//#if defined(THREAD_SANITIZER)
settings.use_hedged_requests.value = false;
if (log)
LOG_WARNING(log, "use_hedged_requests has been disabled for the build with Thread Sanitizer, because they are using fibers, leading to a failed assertion inside TSan");
//#endif
if (!queryProfilerWorks())
{
if (settings.query_profiler_real_time_period_ns)

View File

@ -117,7 +117,6 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.parquet.row_group_rows = settings.output_format_parquet_row_group_size;
format_settings.parquet.row_group_bytes = settings.output_format_parquet_row_group_size_bytes;
format_settings.parquet.output_version = settings.output_format_parquet_version;
format_settings.parquet.import_nested = settings.input_format_parquet_import_nested;
format_settings.parquet.case_insensitive_column_matching = settings.input_format_parquet_case_insensitive_column_matching;
format_settings.parquet.preserve_order = settings.input_format_parquet_preserve_order;
format_settings.parquet.allow_missing_columns = settings.input_format_parquet_allow_missing_columns;
@ -161,7 +160,6 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.with_types_use_header = settings.input_format_with_types_use_header;
format_settings.write_statistics = settings.output_format_write_statistics;
format_settings.arrow.low_cardinality_as_dictionary = settings.output_format_arrow_low_cardinality_as_dictionary;
format_settings.arrow.import_nested = settings.input_format_arrow_import_nested;
format_settings.arrow.allow_missing_columns = settings.input_format_arrow_allow_missing_columns;
format_settings.arrow.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference;
format_settings.arrow.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference;
@ -169,11 +167,9 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.arrow.output_string_as_string = settings.output_format_arrow_string_as_string;
format_settings.arrow.output_fixed_string_as_fixed_byte_array = settings.output_format_arrow_fixed_string_as_fixed_byte_array;
format_settings.arrow.output_compression_method = settings.output_format_arrow_compression_method;
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
format_settings.orc.allow_missing_columns = settings.input_format_orc_allow_missing_columns;
format_settings.orc.row_batch_size = settings.input_format_orc_row_batch_size;
format_settings.orc.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_orc_skip_columns_with_unsupported_types_in_schema_inference;
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
format_settings.orc.allow_missing_columns = settings.input_format_orc_allow_missing_columns;
format_settings.orc.row_batch_size = settings.input_format_orc_row_batch_size;
format_settings.orc.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_orc_skip_columns_with_unsupported_types_in_schema_inference;
@ -676,14 +672,6 @@ void FormatFactory::markFormatSupportsSubsetOfColumns(const String & name)
target = true;
}
void FormatFactory::markFormatSupportsSubcolumns(const String & name)
{
auto & target = dict[name].supports_subcolumns;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Format {} is already marked as supporting subcolumns", name);
target = true;
}
void FormatFactory::markOutputFormatPrefersLargeBlocks(const String & name)
{
auto & target = dict[name].prefers_large_blocks;
@ -692,12 +680,6 @@ void FormatFactory::markOutputFormatPrefersLargeBlocks(const String & name)
target = true;
}
bool FormatFactory::checkIfFormatSupportsSubcolumns(const String & name) const
{
const auto & target = getCreators(name);
return target.supports_subcolumns;
}
bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const String & name) const
{
const auto & target = getCreators(name);

View File

@ -228,10 +228,8 @@ public:
void markOutputFormatSupportsParallelFormatting(const String & name);
void markOutputFormatPrefersLargeBlocks(const String & name);
void markFormatSupportsSubcolumns(const String & name);
void markFormatSupportsSubsetOfColumns(const String & name);
bool checkIfFormatSupportsSubcolumns(const String & name) const;
bool checkIfFormatSupportsSubsetOfColumns(const String & name) const;
bool checkIfFormatHasSchemaReader(const String & name) const;

View File

@ -100,7 +100,6 @@ struct FormatSettings
{
UInt64 row_group_size = 1000000;
bool low_cardinality_as_dictionary = false;
bool import_nested = false;
bool allow_missing_columns = false;
bool skip_columns_with_unsupported_types_in_schema_inference = false;
bool case_insensitive_column_matching = false;
@ -212,7 +211,6 @@ struct FormatSettings
{
UInt64 row_group_rows = 1000000;
UInt64 row_group_bytes = 512 * 1024 * 1024;
bool import_nested = false;
bool allow_missing_columns = false;
bool skip_columns_with_unsupported_types_in_schema_inference = false;
bool case_insensitive_column_matching = false;
@ -317,7 +315,6 @@ struct FormatSettings
struct
{
bool import_nested = false;
bool allow_missing_columns = false;
int64_t row_batch_size = 100'000;
bool skip_columns_with_unsupported_types_in_schema_inference = false;

View File

@ -143,7 +143,6 @@ void ArrowBlockInputFormat::prepareReader()
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
getPort().getHeader(),
"Arrow",
format_settings.arrow.import_nested,
format_settings.arrow.allow_missing_columns,
format_settings.null_as_default,
format_settings.arrow.case_insensitive_column_matching);
@ -190,7 +189,6 @@ void registerInputFormatArrow(FormatFactory & factory)
{
return std::make_shared<ArrowBlockInputFormat>(buf, sample, false, format_settings);
});
factory.markFormatSupportsSubcolumns("Arrow");
factory.markFormatSupportsSubsetOfColumns("Arrow");
factory.registerInputFormat(
"ArrowStream",

View File

@ -1032,13 +1032,11 @@ Block ArrowColumnToCHColumn::arrowSchemaToCHHeader(
ArrowColumnToCHColumn::ArrowColumnToCHColumn(
const Block & header_,
const std::string & format_name_,
bool import_nested_,
bool allow_missing_columns_,
bool null_as_default_,
bool case_insensitive_matching_)
: header(header_)
, format_name(format_name_)
, import_nested(import_nested_)
, allow_missing_columns(allow_missing_columns_)
, null_as_default(null_as_default_)
, case_insensitive_matching(case_insensitive_matching_)
@ -1080,42 +1078,40 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
if (!name_to_column_ptr.contains(search_column_name))
{
bool read_from_nested = false;
/// Check if it's a column from nested table.
if (import_nested)
/// Check if it's a subcolumn from some struct.
String nested_table_name = Nested::extractTableName(header_column.name);
String search_nested_table_name = nested_table_name;
if (case_insensitive_matching)
boost::to_lower(search_nested_table_name);
if (name_to_column_ptr.contains(search_nested_table_name))
{
String nested_table_name = Nested::extractTableName(header_column.name);
String search_nested_table_name = nested_table_name;
if (case_insensitive_matching)
boost::to_lower(search_nested_table_name);
if (name_to_column_ptr.contains(search_nested_table_name))
if (!nested_tables.contains(search_nested_table_name))
{
if (!nested_tables.contains(search_nested_table_name))
NamesAndTypesList nested_columns;
for (const auto & name_and_type : header.getNamesAndTypesList())
{
NamesAndTypesList nested_columns;
for (const auto & name_and_type : header.getNamesAndTypesList())
{
if (name_and_type.name.starts_with(nested_table_name + "."))
nested_columns.push_back(name_and_type);
}
auto nested_table_type = Nested::collect(nested_columns).front().type;
if (name_and_type.name.starts_with(nested_table_name + "."))
nested_columns.push_back(name_and_type);
}
auto nested_table_type = Nested::collect(nested_columns).front().type;
std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[search_nested_table_name];
ColumnsWithTypeAndName cols = {readColumnFromArrowColumn(
arrow_column, nested_table_name, format_name, false, dictionary_infos, true, false, skipped, nested_table_type)};
BlockPtr block_ptr = std::make_shared<Block>(cols);
auto column_extractor = std::make_shared<NestedColumnExtractHelper>(*block_ptr, case_insensitive_matching);
nested_tables[search_nested_table_name] = {block_ptr, column_extractor};
}
auto nested_column = nested_tables[search_nested_table_name].second->extractColumn(search_column_name);
if (nested_column)
{
column = *nested_column;
if (case_insensitive_matching)
column.name = header_column.name;
read_from_nested = true;
}
std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[search_nested_table_name];
ColumnsWithTypeAndName cols = {readColumnFromArrowColumn(
arrow_column, nested_table_name, format_name, false, dictionary_infos, true, false, skipped, nested_table_type)};
BlockPtr block_ptr = std::make_shared<Block>(cols);
auto column_extractor = std::make_shared<NestedColumnExtractHelper>(*block_ptr, case_insensitive_matching);
nested_tables[search_nested_table_name] = {block_ptr, column_extractor};
}
auto nested_column = nested_tables[search_nested_table_name].second->extractColumn(search_column_name);
if (nested_column)
{
column = *nested_column;
if (case_insensitive_matching)
column.name = header_column.name;
read_from_nested = true;
}
}
if (!read_from_nested)
{
if (!allow_missing_columns)

View File

@ -24,7 +24,6 @@ public:
ArrowColumnToCHColumn(
const Block & header_,
const std::string & format_name_,
bool import_nested_,
bool allow_missing_columns_,
bool null_as_default_,
bool case_insensitive_matching_ = false);
@ -53,7 +52,6 @@ public:
private:
const Block & header;
const std::string format_name;
bool import_nested;
/// If false, throw exception if some columns in header not exists in arrow table.
bool allow_missing_columns;
bool null_as_default;

View File

@ -1223,6 +1223,8 @@ void registerInputFormatAvro(FormatFactory & factory)
{
return std::make_shared<AvroConfluentRowInputFormat>(sample, buf, params, settings);
});
factory.markFormatSupportsSubsetOfColumns("AvroConfluent");
}
void registerAvroSchemaReader(FormatFactory & factory)

View File

@ -125,16 +125,12 @@ void ORCBlockInputFormat::prepareReader()
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
getPort().getHeader(),
"ORC",
format_settings.orc.import_nested,
format_settings.orc.allow_missing_columns,
format_settings.null_as_default,
format_settings.orc.case_insensitive_column_matching);
const bool ignore_case = format_settings.orc.case_insensitive_column_matching;
std::unordered_set<String> nested_table_names;
if (format_settings.orc.import_nested)
nested_table_names = Nested::getAllTableNames(getPort().getHeader(), ignore_case);
std::unordered_set<String> nested_table_names = Nested::getAllTableNames(getPort().getHeader(), ignore_case);
for (int i = 0; i < schema->num_fields(); ++i)
{
const auto & name = schema->field(i)->name();
@ -171,7 +167,6 @@ void registerInputFormatORC(FormatFactory & factory)
{
return std::make_shared<ORCBlockInputFormat>(buf, sample, settings);
});
factory.markFormatSupportsSubcolumns("ORC");
factory.markFormatSupportsSubsetOfColumns("ORC");
}

View File

@ -143,7 +143,6 @@ void ParquetBlockInputFormat::initializeRowGroupReader(size_t row_group_idx)
row_group.arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
getPort().getHeader(),
"Parquet",
format_settings.parquet.import_nested,
format_settings.parquet.allow_missing_columns,
format_settings.null_as_default,
format_settings.parquet.case_insensitive_column_matching);
@ -415,7 +414,6 @@ void registerInputFormatParquet(FormatFactory & factory)
max_parsing_threads,
min_bytes_for_seek);
});
factory.markFormatSupportsSubcolumns("Parquet");
factory.markFormatSupportsSubsetOfColumns("Parquet");
}

View File

@ -504,7 +504,6 @@ void registerInputFormatParquetMetadata(FormatFactory & factory)
{
return std::make_shared<ParquetMetadataInputFormat>(buf, sample, settings);
});
factory.markFormatSupportsSubcolumns("ParquetMetadata");
factory.markFormatSupportsSubsetOfColumns("ParquetMetadata");
}

View File

@ -0,0 +1,35 @@
#include <Processors/Transforms/ExtractColumnsTransform.h>
#include <Interpreters/getColumnFromBlock.h>
namespace DB
{
ExtractColumnsTransform::ExtractColumnsTransform(const Block & header_, const NamesAndTypesList & requested_columns_)
: ISimpleTransform(header_, transformHeader(header_, requested_columns_), false), requested_columns(requested_columns_)
{
}
Block ExtractColumnsTransform::transformHeader(Block header, const NamesAndTypesList & requested_columns_)
{
ColumnsWithTypeAndName columns;
columns.reserve(requested_columns_.size());
for (const auto & required_column : requested_columns_)
columns.emplace_back(getColumnFromBlock(header, required_column), required_column.type, required_column.name);
return Block(std::move(columns));
}
void ExtractColumnsTransform::transform(Chunk & chunk)
{
size_t num_rows = chunk.getNumRows();
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
Columns columns;
columns.reserve(requested_columns.size());
for (const auto & required_column : requested_columns)
columns.emplace_back(getColumnFromBlock(block, required_column));
chunk.setColumns(std::move(columns), num_rows);
}
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <Processors/ISimpleTransform.h>
namespace DB
{
/// Extracts required columns and subcolumns from the block.
class ExtractColumnsTransform final : public ISimpleTransform
{
public:
ExtractColumnsTransform(
const Block & header_,
const NamesAndTypesList & requested_columns_);
String getName() const override { return "ExtractColumnsTransform"; }
static Block transformHeader(Block header, const NamesAndTypesList & requested_columns_);
protected:
void transform(Chunk & chunk) override;
private:
const NamesAndTypesList requested_columns;
};
}

View File

@ -281,7 +281,6 @@ struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl
ArrowColumnToCHColumn column_reader(
header, "Parquet",
format_settings.parquet.import_nested,
format_settings.parquet.allow_missing_columns,
/* null_as_default */true,
/* case_insensitive_column_matching */false);

View File

@ -13,6 +13,7 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Transforms/ExtractColumnsTransform.h>
#include <IO/WriteHelpers.h>
#include <IO/CompressionMethod.h>
@ -332,30 +333,21 @@ StorageHDFS::PathWithInfo HDFSSource::URISIterator::next()
return pimpl->next();
}
Block HDFSSource::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
{
for (const auto & virtual_column : requested_virtual_columns)
sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
return sample_block;
}
HDFSSource::HDFSSource(
const ReadFromFormatInfo & info,
StorageHDFSPtr storage_,
const Block & block_for_format_,
const std::vector<NameAndTypePair> & requested_virtual_columns_,
ContextPtr context_,
UInt64 max_block_size_,
std::shared_ptr<IteratorWrapper> file_iterator_,
ColumnsDescription columns_description_)
: ISource(getHeader(block_for_format_, requested_virtual_columns_))
std::shared_ptr<IteratorWrapper> file_iterator_)
: ISource(info.source_header)
, WithContext(context_)
, storage(std::move(storage_))
, block_for_format(block_for_format_)
, requested_virtual_columns(requested_virtual_columns_)
, block_for_format(info.format_header)
, requested_columns(info.requested_columns)
, requested_virtual_columns(info.requested_virtual_columns)
, max_block_size(max_block_size_)
, file_iterator(file_iterator_)
, columns_description(std::move(columns_description_))
, columns_description(info.columns_description)
{
initialize();
}
@ -408,6 +400,14 @@ bool HDFSSource::initialize()
return std::make_shared<AddingDefaultsTransform>(header, columns_description, *input_format, getContext());
});
}
/// Add ExtractColumnsTransform to extract requested columns/subcolumns
/// from chunk read by IInputFormat.
builder.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExtractColumnsTransform>(header, requested_columns);
});
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
return true;
@ -640,50 +640,17 @@ Pipe StorageHDFS::read(
});
}
std::unordered_set<String> column_names_set(column_names.begin(), column_names.end());
std::vector<NameAndTypePair> requested_virtual_columns;
for (const auto & virtual_column : getVirtuals())
{
if (column_names_set.contains(virtual_column.name))
requested_virtual_columns.push_back(virtual_column);
}
ColumnsDescription columns_description;
Block block_for_format;
if (supportsSubsetOfColumns())
{
auto fetch_columns = column_names;
const auto & virtuals = getVirtuals();
std::erase_if(
fetch_columns,
[&](const String & col)
{ return std::any_of(virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col){ return col == virtual_col.name; }); });
if (fetch_columns.empty())
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
}
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, format_name, getVirtuals());
Pipes pipes;
auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this());
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<HDFSSource>(
read_from_format_info,
this_ptr,
block_for_format,
requested_virtual_columns,
context_,
max_block_size,
iterator_wrapper,
columns_description));
iterator_wrapper));
}
return Pipe::unitePipes(std::move(pipes));
}

View File

@ -7,6 +7,7 @@
#include <Processors/ISource.h>
#include <Storages/IStorage.h>
#include <Storages/Cache/SchemaCache.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Poco/URI.h>
namespace DB
@ -74,6 +75,8 @@ public:
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool supportsSubsetOfColumns() const override;
bool supportsSubcolumns() const override { return true; }
static ColumnsDescription getTableStructureFromData(
const String & format,
const String & uri,
@ -140,16 +143,12 @@ public:
using IteratorWrapper = std::function<StorageHDFS::PathWithInfo()>;
using StorageHDFSPtr = std::shared_ptr<StorageHDFS>;
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
HDFSSource(
const ReadFromFormatInfo & info,
StorageHDFSPtr storage_,
const Block & block_for_format_,
const std::vector<NameAndTypePair> & requested_virtual_columns_,
ContextPtr context_,
UInt64 max_block_size_,
std::shared_ptr<IteratorWrapper> file_iterator_,
ColumnsDescription columns_description_);
std::shared_ptr<IteratorWrapper> file_iterator_);
String getName() const override;
@ -158,7 +157,8 @@ public:
private:
StorageHDFSPtr storage;
Block block_for_format;
std::vector<NameAndTypePair> requested_virtual_columns;
NamesAndTypesList requested_columns;
NamesAndTypesList requested_virtual_columns;
UInt64 max_block_size;
std::shared_ptr<IteratorWrapper> file_iterator;
ColumnsDescription columns_description;

View File

@ -37,6 +37,8 @@ public:
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
bool supportsSubcolumns() const override { return true; }
private:
void addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context) override;

View File

@ -20,6 +20,7 @@
#include <azure/identity/managed_identity_credential.hpp>
#include <azure/storage/common/storage_credential.hpp>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Transforms/ExtractColumnsTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IInputFormat.h>
@ -603,7 +604,7 @@ private:
Pipe StorageAzureBlob::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -615,15 +616,6 @@ Pipe StorageAzureBlob::read(
Pipes pipes;
std::unordered_set<String> column_names_set(column_names.begin(), column_names.end());
std::vector<NameAndTypePair> requested_virtual_columns;
for (const auto & virtual_column : getVirtuals())
{
if (column_names_set.contains(virtual_column.name))
requested_virtual_columns.push_back(virtual_column);
}
std::shared_ptr<StorageAzureBlobSource::Iterator> iterator_wrapper;
if (configuration.withGlobs())
{
@ -639,39 +631,15 @@ Pipe StorageAzureBlob::read(
std::nullopt, query_info.query, virtual_block, local_context, nullptr);
}
ColumnsDescription columns_description;
Block block_for_format;
if (supportsSubsetOfColumns())
{
auto fetch_columns = column_names;
const auto & virtuals = getVirtuals();
std::erase_if(
fetch_columns,
[&](const String & col)
{ return std::any_of(virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col){ return col == virtual_col.name; }); });
if (fetch_columns.empty())
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
}
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, configuration.format, getVirtuals());
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageAzureBlobSource>(
requested_virtual_columns,
read_from_format_info,
configuration.format,
getName(),
block_for_format,
local_context,
format_settings,
columns_description,
max_block_size,
configuration.compression_method,
object_storage.get(),
@ -762,11 +730,6 @@ bool StorageAzureBlob::supportsPartitionBy() const
return true;
}
bool StorageAzureBlob::supportsSubcolumns() const
{
return FormatFactory::instance().checkIfFormatSupportsSubcolumns(configuration.format);
}
bool StorageAzureBlob::supportsSubsetOfColumns() const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
@ -1075,35 +1038,26 @@ Chunk StorageAzureBlobSource::generate()
return {};
}
Block StorageAzureBlobSource::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
{
for (const auto & virtual_column : requested_virtual_columns)
sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
return sample_block;
}
StorageAzureBlobSource::StorageAzureBlobSource(
const std::vector<NameAndTypePair> & requested_virtual_columns_,
const ReadFromFormatInfo & info,
const String & format_,
String name_,
const Block & sample_block_,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
String compression_hint_,
AzureObjectStorage * object_storage_,
const String & container_,
std::shared_ptr<Iterator> file_iterator_)
:ISource(getHeader(sample_block_, requested_virtual_columns_))
:ISource(info.source_header)
, WithContext(context_)
, requested_virtual_columns(requested_virtual_columns_)
, requested_columns(info.requested_columns)
, requested_virtual_columns(info.requested_virtual_columns)
, format(format_)
, name(std::move(name_))
, sample_block(sample_block_)
, sample_block(info.format_header)
, format_settings(format_settings_)
, columns_desc(columns_)
, columns_desc(info.columns_description)
, max_block_size(max_block_size_)
, compression_hint(compression_hint_)
, object_storage(std::move(object_storage_))
@ -1159,6 +1113,13 @@ StorageAzureBlobSource::ReaderHolder StorageAzureBlobSource::createReader()
{ return std::make_shared<AddingDefaultsTransform>(header, columns_desc, *input_format, getContext()); });
}
/// Add ExtractColumnsTransform to extract requested columns/subcolumns
/// from chunk read by IInputFormat.
builder.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExtractColumnsTransform>(header, requested_columns);
});
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto current_reader = std::make_unique<PullingPipelineExecutor>(*pipeline);

View File

@ -11,6 +11,7 @@
#include <Storages/StorageConfiguration.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Storages/prepareReadingFromFormat.h>
namespace DB
{
@ -93,7 +94,7 @@ public:
bool supportsPartitionBy() const override;
bool supportsSubcolumns() const override;
bool supportsSubcolumns() const override { return true; }
bool supportsSubsetOfColumns() const override;
@ -185,13 +186,11 @@ public:
};
StorageAzureBlobSource(
const std::vector<NameAndTypePair> & requested_virtual_columns_,
const ReadFromFormatInfo & info,
const String & format_,
String name_,
const Block & sample_block_,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
String compression_hint_,
AzureObjectStorage * object_storage_,
@ -204,10 +203,9 @@ public:
String getName() const override;
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
private:
std::vector<NameAndTypePair> requested_virtual_columns;
NamesAndTypesList requested_columns;
NamesAndTypesList requested_virtual_columns;
String format;
String name;
Block sample_block;

View File

@ -6,6 +6,7 @@
#include <Storages/Distributed/DistributedAsyncInsertSource.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/ReadFromStorageProgress.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
@ -28,6 +29,7 @@
#include <Formats/ReadSchemaUtils.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Transforms/ExtractColumnsTransform.h>
#include <Processors/ISource.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IInputFormat.h>
@ -564,56 +566,46 @@ using StorageFilePtr = std::shared_ptr<StorageFile>;
class StorageFileSource : public ISource
{
public:
struct FilesInfo
class FilesIterator
{
public:
explicit FilesIterator(const Strings & files_) : files(files_)
{
}
String next()
{
auto current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= files.size())
return "";
return files[current_index];
}
private:
std::vector<std::string> files;
std::atomic<size_t> next_file_to_read = 0;
bool need_path_column = false;
bool need_file_column = false;
size_t total_bytes_to_read = 0;
std::atomic<size_t> index = 0;
};
using FilesInfoPtr = std::shared_ptr<FilesInfo>;
static Block getBlockForSource(const Block & block_for_format, const FilesInfoPtr & files_info)
{
auto res = block_for_format;
if (files_info->need_path_column)
{
res.insert(
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
"_path"});
}
if (files_info->need_file_column)
{
res.insert(
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
"_file"});
}
return res;
}
using FilesIteratorPtr = std::shared_ptr<FilesIterator>;
StorageFileSource(
const ReadFromFormatInfo & info,
std::shared_ptr<StorageFile> storage_,
const StorageSnapshotPtr & storage_snapshot_,
ContextPtr context_,
UInt64 max_block_size_,
FilesInfoPtr files_info_,
ColumnsDescription columns_description_,
const Block & block_for_format_,
FilesIteratorPtr files_iterator_,
std::unique_ptr<ReadBuffer> read_buf_)
: ISource(getBlockForSource(block_for_format_, files_info_))
: ISource(info.source_header)
, storage(std::move(storage_))
, storage_snapshot(storage_snapshot_)
, files_info(std::move(files_info_))
, files_iterator(std::move(files_iterator_))
, read_buf(std::move(read_buf_))
, columns_description(std::move(columns_description_))
, block_for_format(block_for_format_)
, columns_description(info.columns_description)
, requested_columns(info.requested_columns)
, requested_virtual_columns(info.requested_virtual_columns)
, block_for_format(info.format_header)
, context(context_)
, max_block_size(max_block_size_)
{
@ -699,12 +691,10 @@ public:
{
if (!storage->use_table_fd)
{
auto current_file = files_info->next_file_to_read.fetch_add(1);
if (current_file >= files_info->files.size())
current_path = files_iterator->next();
if (current_path.empty())
return {};
current_path = files_info->files[current_file];
/// Special case for distributed format. Defaults are not needed here.
if (storage->format_name == "Distributed")
{
@ -744,6 +734,13 @@ public:
});
}
/// Add ExtractColumnsTransform to extract requested columns/subcolumns
/// from chunk read by IInputFormat.
builder.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExtractColumnsTransform>(header, requested_columns);
});
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
@ -755,19 +752,19 @@ public:
UInt64 num_rows = chunk.getNumRows();
/// Enrich with virtual columns.
if (files_info->need_path_column)
{
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, current_path);
chunk.addColumn(column->convertToFullColumnIfConst());
}
if (files_info->need_file_column)
for (const auto & virtual_column : requested_virtual_columns)
{
size_t last_slash_pos = current_path.find_last_of('/');
auto file_name = current_path.substr(last_slash_pos + 1);
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, std::move(file_name));
chunk.addColumn(column->convertToFullColumnIfConst());
if (virtual_column.name == "_path")
{
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, current_path)->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = current_path.find_last_of('/');
auto file_name = current_path.substr(last_slash_pos + 1);
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, file_name)->convertToFullColumnIfConst());
}
}
if (num_rows && total_files_size)
@ -799,7 +796,7 @@ public:
private:
std::shared_ptr<StorageFile> storage;
StorageSnapshotPtr storage_snapshot;
FilesInfoPtr files_info;
FilesIteratorPtr files_iterator;
String current_path;
Block sample_block;
std::unique_ptr<ReadBuffer> read_buf;
@ -808,6 +805,8 @@ private:
std::unique_ptr<PullingPipelineExecutor> reader;
ColumnsDescription columns_description;
NamesAndTypesList requested_columns;
NamesAndTypesList requested_virtual_columns;
Block block_for_format;
ContextPtr context; /// TODO Untangle potential issues with context lifetime.
@ -849,18 +848,7 @@ Pipe StorageFile::read(
}
}
auto files_info = std::make_shared<StorageFileSource::FilesInfo>();
files_info->files = paths;
files_info->total_bytes_to_read = total_bytes_to_read;
for (const auto & column : column_names)
{
if (column == "_path")
files_info->need_path_column = true;
if (column == "_file")
files_info->need_file_column = true;
}
auto files_iterator = std::make_shared<StorageFileSource::FilesIterator>(paths);
auto this_ptr = std::static_pointer_cast<StorageFile>(shared_from_this());
size_t num_streams = max_num_streams;
@ -876,33 +864,10 @@ Pipe StorageFile::read(
if (progress_callback)
progress_callback(FileProgress(0, total_bytes_to_read));
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, format_name, getVirtuals());
for (size_t i = 0; i < num_streams; ++i)
{
ColumnsDescription columns_description;
Block block_for_format;
if (supportsSubsetOfColumns())
{
auto fetch_columns = column_names;
const auto & virtuals = getVirtuals();
std::erase_if(
fetch_columns,
[&](const String & col)
{
return std::any_of(
virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col) { return col == virtual_col.name; });
});
if (fetch_columns.empty())
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
}
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
/// In case of reading from fd we have to check whether we have already created
/// the read buffer from it in Storage constructor (for schema inference) or not.
/// If yes, then we should use it in StorageFileSource. Atomic bool flag is needed
@ -912,13 +877,12 @@ Pipe StorageFile::read(
read_buffer = std::move(peekable_read_buffer_from_fd);
pipes.emplace_back(std::make_shared<StorageFileSource>(
read_from_format_info,
this_ptr,
storage_snapshot,
context,
max_block_size,
files_info,
columns_description,
block_for_format,
files_iterator,
std::move(read_buffer)));
}

View File

@ -75,6 +75,8 @@ public:
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool supportsSubsetOfColumns() const override;
bool supportsSubcolumns() const override { return true; }
bool prefersLargeBlocks() const override;
bool parallelizeOutputAfterReading(ContextPtr context) const override;

View File

@ -42,6 +42,7 @@
#include <Formats/ReadSchemaUtils.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Transforms/ExtractColumnsTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IInputFormat.h>
@ -528,22 +529,12 @@ size_t StorageS3Source::KeysIterator::getTotalSize() const
return pimpl->getTotalSize();
}
Block StorageS3Source::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
{
for (const auto & virtual_column : requested_virtual_columns)
sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
return sample_block;
}
StorageS3Source::StorageS3Source(
const std::vector<NameAndTypePair> & requested_virtual_columns_,
const ReadFromFormatInfo & info,
const String & format_,
String name_,
const Block & sample_block_,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
const S3Settings::RequestSettings & request_settings_,
String compression_hint_,
@ -552,20 +543,21 @@ StorageS3Source::StorageS3Source(
const String & version_id_,
std::shared_ptr<IIterator> file_iterator_,
const size_t download_thread_num_)
: ISource(getHeader(sample_block_, requested_virtual_columns_))
: ISource(info.source_header)
, WithContext(context_)
, name(std::move(name_))
, bucket(bucket_)
, version_id(version_id_)
, format(format_)
, columns_desc(columns_)
, columns_desc(info.columns_description)
, requested_columns(info.requested_columns)
, max_block_size(max_block_size_)
, request_settings(request_settings_)
, compression_hint(std::move(compression_hint_))
, client(client_)
, sample_block(sample_block_)
, sample_block(info.format_header)
, format_settings(format_settings_)
, requested_virtual_columns(requested_virtual_columns_)
, requested_virtual_columns(info.requested_virtual_columns)
, file_iterator(file_iterator_)
, download_thread_num(download_thread_num_)
, create_reader_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, 1)
@ -611,6 +603,13 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
{ return std::make_shared<AddingDefaultsTransform>(header, columns_desc, *input_format, getContext()); });
}
/// Add ExtractColumnsTransform to extract requested columns/subcolumns
/// from chunk read by IInputFormat.
builder.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExtractColumnsTransform>(header, requested_columns);
});
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto current_reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
@ -1026,11 +1025,6 @@ std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
}
}
bool StorageS3::supportsSubcolumns() const
{
return FormatFactory::instance().checkIfFormatSupportsSubcolumns(configuration.format);
}
bool StorageS3::supportsSubsetOfColumns() const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
@ -1062,52 +1056,20 @@ Pipe StorageS3::read(
Pipes pipes;
std::unordered_set<String> column_names_set(column_names.begin(), column_names.end());
std::vector<NameAndTypePair> requested_virtual_columns;
for (const auto & virtual_column : getVirtuals())
{
if (column_names_set.contains(virtual_column.name))
requested_virtual_columns.push_back(virtual_column);
}
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(
query_configuration, distributed_processing, local_context, query_info.query, virtual_block);
ColumnsDescription columns_description;
Block block_for_format;
if (supportsSubsetOfColumns())
{
auto fetch_columns = column_names;
const auto & virtuals = getVirtuals();
std::erase_if(
fetch_columns,
[&](const String & col)
{ return std::any_of(virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col){ return col == virtual_col.name; }); });
if (fetch_columns.empty())
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
}
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, query_configuration.format, getVirtuals());
const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageS3Source>(
requested_virtual_columns,
read_from_format_info,
query_configuration.format,
getName(),
block_for_format,
local_context,
format_settings,
columns_description,
max_block_size,
query_configuration.request_settings,
query_configuration.compression_method,

View File

@ -21,6 +21,7 @@
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Storages/Cache/SchemaCache.h>
#include <Storages/StorageConfiguration.h>
#include <Storages/prepareReadingFromFormat.h>
namespace Aws::S3
{
@ -119,16 +120,12 @@ public:
ReadTaskCallback callback;
};
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
StorageS3Source(
const std::vector<NameAndTypePair> & requested_virtual_columns_,
const ReadFromFormatInfo & info,
const String & format,
String name_,
const Block & sample_block,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
const S3Settings::RequestSettings & request_settings_,
String compression_hint_,
@ -150,6 +147,7 @@ private:
String version_id;
String format;
ColumnsDescription columns_desc;
NamesAndTypesList requested_columns;
UInt64 max_block_size;
S3Settings::RequestSettings request_settings;
String compression_hint;
@ -214,7 +212,7 @@ private:
ReaderHolder reader;
std::vector<NameAndTypePair> requested_virtual_columns;
NamesAndTypesList requested_virtual_columns;
std::shared_ptr<IIterator> file_iterator;
size_t download_thread_num = 1;
@ -360,7 +358,7 @@ private:
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
bool supportsSubcolumns() const override;
bool supportsSubcolumns() const override { return true; }
bool supportsSubsetOfColumns() const override;

View File

@ -36,6 +36,8 @@ public:
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
bool supportsSubcolumns() const override { return true; }
protected:
void updateConfigurationIfChanged(ContextPtr local_context);

View File

@ -1,5 +1,4 @@
#include <Storages/StorageURL.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Storages/PartitionedSink.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/NamedCollectionsHelpers.h>
@ -23,6 +22,8 @@
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Transforms/ExtractColumnsTransform.h>
#include <Common/ThreadStatus.h>
#include <Common/parseRemoteDescription.h>
@ -208,25 +209,15 @@ void StorageURLSource::setCredentials(Poco::Net::HTTPBasicCredentials & credenti
}
}
Block StorageURLSource::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
{
for (const auto & virtual_column : requested_virtual_columns)
sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
return sample_block;
}
StorageURLSource::StorageURLSource(
const std::vector<NameAndTypePair> & requested_virtual_columns_,
const ReadFromFormatInfo & info,
std::shared_ptr<IteratorWrapper> uri_iterator_,
const std::string & http_method,
std::function<void(std::ostream &)> callback,
const String & format,
const std::optional<FormatSettings> & format_settings,
String name_,
const Block & sample_block,
ContextPtr context,
const ColumnsDescription & columns,
UInt64 max_block_size,
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method,
@ -234,7 +225,13 @@ StorageURLSource::StorageURLSource(
const HTTPHeaderEntries & headers_,
const URIParams & params,
bool glob_url)
: ISource(getHeader(sample_block, requested_virtual_columns_)), name(std::move(name_)), requested_virtual_columns(requested_virtual_columns_), uri_iterator(uri_iterator_)
: ISource(info.source_header)
, name(std::move(name_))
, columns_description(info.columns_description)
, requested_columns(info.requested_columns)
, requested_virtual_columns(info.requested_virtual_columns)
, block_for_format(info.format_header)
, uri_iterator(uri_iterator_)
{
auto headers = getHeaders(headers_);
@ -292,7 +289,7 @@ StorageURLSource::StorageURLSource(
input_format = FormatFactory::instance().getInput(
format,
*read_buf,
sample_block,
info.format_header,
context,
max_block_size,
format_settings,
@ -304,8 +301,20 @@ StorageURLSource::StorageURLSource(
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
builder.addSimpleTransform([&](const Block & cur_header)
{ return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *input_format, context); });
if (columns_description.hasDefaults())
{
builder.addSimpleTransform([&](const Block & cur_header)
{
return std::make_shared<AddingDefaultsTransform>(cur_header, columns_description, *input_format, context);
});
}
/// Add ExtractColumnsTransform to extract requested columns/subcolumns
/// from chunk read by IInputFormat.
builder.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExtractColumnsTransform>(header, requested_columns);
});
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
@ -712,27 +721,6 @@ Pipe IStorageURLBase::read(
{
auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size);
ColumnsDescription columns_description;
Block block_for_format;
if (supportsSubsetOfColumns())
{
columns_description = storage_snapshot->getDescriptionForColumns(column_names);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
}
std::unordered_set<String> column_names_set(column_names.begin(), column_names.end());
std::vector<NameAndTypePair> requested_virtual_columns;
for (const auto & virtual_column : getVirtuals())
{
if (column_names_set.contains(virtual_column.name))
requested_virtual_columns.push_back(virtual_column);
}
size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
std::shared_ptr<StorageURLSource::IteratorWrapper> iterator_wrapper{nullptr};
@ -776,6 +764,8 @@ Pipe IStorageURLBase::read(
num_streams = 1;
}
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, format_name, getVirtuals());
Pipes pipes;
pipes.reserve(num_streams);
@ -783,16 +773,14 @@ Pipe IStorageURLBase::read(
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageURLSource>(
requested_virtual_columns,
read_from_format_info,
iterator_wrapper,
getReadMethod(),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
getReadPOSTDataCallback(column_names, read_from_format_info.columns_description, query_info, local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
block_for_format,
local_context,
columns_description,
max_block_size,
getHTTPTimeouts(local_context),
compression_method,
@ -838,17 +826,17 @@ Pipe StorageURLWithFailover::read(
return uri_options;
});
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, format_name, getVirtuals());
auto pipe = Pipe(std::make_shared<StorageURLSource>(
std::vector<NameAndTypePair>{},
read_from_format_info,
iterator_wrapper,
getReadMethod(),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
getReadPOSTDataCallback(column_names, read_from_format_info.columns_description, query_info, local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
block_for_format,
local_context,
columns_description,
max_block_size,
getHTTPTimeouts(local_context),
compression_method,

View File

@ -11,6 +11,7 @@
#include <Storages/StorageFactory.h>
#include <Storages/Cache/SchemaCache.h>
#include <Storages/StorageConfiguration.h>
#include <Storages/prepareReadingFromFormat.h>
namespace DB
@ -46,6 +47,8 @@ public:
bool supportsPartitionBy() const override { return true; }
bool supportsSubcolumns() const override { return true; }
NamesAndTypesList getVirtuals() const override;
static ColumnsDescription getTableStructureFromData(
@ -158,16 +161,14 @@ public:
using IteratorWrapper = std::function<FailoverOptions()>;
StorageURLSource(
const std::vector<NameAndTypePair> & requested_virtual_columns_,
const ReadFromFormatInfo & info,
std::shared_ptr<IteratorWrapper> uri_iterator_,
const std::string & http_method,
std::function<void(std::ostream &)> callback,
const String & format,
const std::optional<FormatSettings> & format_settings,
String name_,
const Block & sample_block,
ContextPtr context,
const ColumnsDescription & columns,
UInt64 max_block_size,
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method,
@ -182,8 +183,6 @@ public:
static void setCredentials(Poco::Net::HTTPBasicCredentials & credentials, const Poco::URI & request_uri);
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
static std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> getFirstAvailableURIAndReadBuffer(
std::vector<String>::const_iterator & option,
const std::vector<String>::const_iterator & end,
@ -202,7 +201,10 @@ private:
InitializeFunc initialize;
String name;
std::vector<NameAndTypePair> requested_virtual_columns;
ColumnsDescription columns_description;
NamesAndTypesList requested_columns;
NamesAndTypesList requested_virtual_columns;
Block block_for_format;
std::shared_ptr<IteratorWrapper> uri_iterator;
Poco::URI curr_uri;

View File

@ -36,6 +36,8 @@ public:
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
bool supportsSubcolumns() const override { return true; }
private:
void addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context) override;

View File

@ -0,0 +1,69 @@
#include <Storages/prepareReadingFromFormat.h>
#include <Formats/FormatFactory.h>
namespace DB
{
ReadFromFormatInfo prepareReadingFromFormat(const Strings & requested_columns, const StorageSnapshotPtr & storage_snapshot, const String & format_name, const NamesAndTypesList & virtuals)
{
ReadFromFormatInfo info;
/// Collect requested virtual columns and remove them from requested columns.
Strings columns_to_read;
for (const auto & column_name : requested_columns)
{
bool is_virtual = false;
for (const auto & virtual_column : virtuals)
{
if (column_name == virtual_column.name)
{
info.requested_virtual_columns.push_back(virtual_column);
is_virtual = true;
break;
}
}
if (!is_virtual)
columns_to_read.push_back(column_name);
}
/// Create header for Source that will contain all requested columns including virtual columns at the end
/// (because they will be added to the chunk after reading regular columns).
info.source_header = storage_snapshot->getSampleBlockForColumns(columns_to_read);
for (const auto & requested_virtual_column : info.requested_virtual_columns)
info.source_header.insert({requested_virtual_column.type->createColumn(), requested_virtual_column.type, requested_virtual_column.name});
/// Set requested columns that should be read from data.
info.requested_columns = storage_snapshot->getColumnsByNames(GetColumnsOptions(GetColumnsOptions::All).withSubcolumns().withExtendedObjects(), columns_to_read);
if (format_name != "Distributed" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name))
{
/// If only virtual columns were requested, just read the smallest column.
if (columns_to_read.empty())
{
columns_to_read.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
}
/// We need to replace all subcolumns with their nested columns (e.g `a.b`, `a.b.c`, `x.y` -> `a`, `x`),
/// because most formats cannot extract subcolumns on their own.
/// All requested subcolumns will be extracted after reading.
else
{
std::unordered_set<String> columns_to_read_set;
for (const auto & column_to_read : info.requested_columns)
columns_to_read_set.insert(column_to_read.getNameInStorage());
columns_to_read = Strings(columns_to_read_set.begin(), columns_to_read_set.end());
}
info.columns_description = storage_snapshot->getDescriptionForColumns(columns_to_read);
}
/// If format doesn't support reading subset of columns, read all columns.
/// Requested columns/subcolumns will be extracted after reading.
else
{
info.columns_description = storage_snapshot->metadata->getColumns();
}
/// Create header for InputFormat with columns that will be read from the data.
info.format_header = storage_snapshot->getSampleBlockForColumns(info.columns_description.getNamesOfPhysical());
return info;
}
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <Core/Block.h>
#include <Storages/StorageSnapshot.h>
namespace DB
{
struct ReadFromFormatInfo
{
/// Header that will return Source from storage.
/// It contains all requested columns including virtual columns;
Block source_header;
/// Header that will be passed to IInputFormat to read data from file.
/// It can contain more columns than were requested if format doesn't support
/// reading subset of columns.
Block format_header;
/// Description of columns for format_header. Used for inserting defaults.
ColumnsDescription columns_description;
/// The list of requested columns without virtual columns.
NamesAndTypesList requested_columns;
/// The list of requested virtual columns.
NamesAndTypesList requested_virtual_columns;
};
/// Get all needed information for reading from data in some input format.
ReadFromFormatInfo prepareReadingFromFormat(const Strings & requested_columns, const StorageSnapshotPtr & storage_snapshot, const String & format_name, const NamesAndTypesList & virtuals);
}

View File

@ -592,3 +592,40 @@ def test_partition_by_tf(cluster):
assert "1,2,3\n" == get_azure_file_content("test_tf_3.csv")
assert "3,2,1\n" == get_azure_file_content("test_tf_1.csv")
assert "78,43,45\n" == get_azure_file_content("test_tf_45.csv")
def test_read_subcolumns(cluster):
node = cluster.instances["node"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)",
)
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)",
)
res = node.query(
f"select a.b.d, _path, a.b, _file, a.e from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "2\tcont/test_subcolumns.tsv\t(1,2)\ttest_subcolumns.tsv\t3\n"
res = node.query(
f"select a.b.d, _path, a.b, _file, a.e from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "2\tcont/test_subcolumns.jsonl\t(1,2)\ttest_subcolumns.jsonl\t3\n"
res = node.query(
f"select x.b.d, _path, x.b, _file, x.e from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "0\tcont/test_subcolumns.jsonl\t(0,0)\ttest_subcolumns.jsonl\t0\n"
res = node.query(
f"select x.b.d, _path, x.b, _file, x.e from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) default ((42, 42), 42)')"
)
assert res == "42\tcont/test_subcolumns.jsonl\t(42,42)\ttest_subcolumns.jsonl\t42\n"

View File

@ -866,6 +866,54 @@ def test_skip_empty_files(started_cluster):
assert int(res) == 0
def test_read_subcolumns(started_cluster):
node = started_cluster.instances["node1"]
node.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_subcolumns.tsv', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)"
)
node.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_subcolumns.jsonl', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)"
)
res = node.query(
f"select a.b.d, _path, a.b, _file, a.e from hdfs('hdfs://hdfs1:9000/test_subcolumns.tsv', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert (
res
== "2\thdfs://hdfs1:9000/test_subcolumns.tsv\t(1,2)\ttest_subcolumns.tsv\t3\n"
)
res = node.query(
f"select a.b.d, _path, a.b, _file, a.e from hdfs('hdfs://hdfs1:9000/test_subcolumns.jsonl', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert (
res
== "2\thdfs://hdfs1:9000/test_subcolumns.jsonl\t(1,2)\ttest_subcolumns.jsonl\t3\n"
)
res = node.query(
f"select x.b.d, _path, x.b, _file, x.e from hdfs('hdfs://hdfs1:9000/test_subcolumns.jsonl', auto, 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert (
res
== "0\thdfs://hdfs1:9000/test_subcolumns.jsonl\t(0,0)\ttest_subcolumns.jsonl\t0\n"
)
res = node.query(
f"select x.b.d, _path, x.b, _file, x.e from hdfs('hdfs://hdfs1:9000/test_subcolumns.jsonl', auto, 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) default ((42, 42), 42)')"
)
assert (
res
== "42\thdfs://hdfs1:9000/test_subcolumns.jsonl\t(42,42)\ttest_subcolumns.jsonl\t42\n"
)
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -1780,3 +1780,66 @@ def test_skip_empty_files(started_cluster):
)
assert len(res.strip()) == 0
def test_read_subcolumns(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.tsv', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)"
)
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.jsonl', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)"
)
res = instance.query(
f"select a.b.d, _path, a.b, _file, a.e from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.tsv', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "2\troot/test_subcolumns.tsv\t(1,2)\ttest_subcolumns.tsv\t3\n"
res = instance.query(
f"select a.b.d, _path, a.b, _file, a.e from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.jsonl', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "2\troot/test_subcolumns.jsonl\t(1,2)\ttest_subcolumns.jsonl\t3\n"
res = instance.query(
f"select x.b.d, _path, x.b, _file, x.e from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.jsonl', auto, 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "0\troot/test_subcolumns.jsonl\t(0,0)\ttest_subcolumns.jsonl\t0\n"
res = instance.query(
f"select x.b.d, _path, x.b, _file, x.e from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.jsonl', auto, 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) default ((42, 42), 42)')"
)
assert res == "42\troot/test_subcolumns.jsonl\t(42,42)\ttest_subcolumns.jsonl\t42\n"
res = instance.query(
f"select a.b.d, _path, a.b, _file, a.e from url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.tsv', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "2\t/root/test_subcolumns.tsv\t(1,2)\ttest_subcolumns.tsv\t3\n"
res = instance.query(
f"select a.b.d, _path, a.b, _file, a.e from url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.jsonl', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "2\t/root/test_subcolumns.jsonl\t(1,2)\ttest_subcolumns.jsonl\t3\n"
res = instance.query(
f"select x.b.d, _path, x.b, _file, x.e from url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.jsonl', auto, 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "0\t/root/test_subcolumns.jsonl\t(0,0)\ttest_subcolumns.jsonl\t0\n"
res = instance.query(
f"select x.b.d, _path, x.b, _file, x.e from url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.jsonl', auto, 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) default ((42, 42), 42)')"
)
assert (
res == "42\t/root/test_subcolumns.jsonl\t(42,42)\ttest_subcolumns.jsonl\t42\n"
)

View File

@ -0,0 +1,4 @@
2 (1,2) 3
2 (1,2) 3
0 (0,0) 0
42 (42,42) 42

View File

@ -0,0 +1,18 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
DATA_FILE=$CLICKHOUSE_TEST_UNIQUE_NAME
$CLICKHOUSE_LOCAL -q "select ((1, 2), 3)::Tuple(b Tuple(c UInt32, d UInt32), e UInt32) as a format TSV" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "select a.b.d, a.b, a.e from file('$DATA_FILE', TSV, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
$CLICKHOUSE_LOCAL -q "select ((1, 2), 3)::Tuple(b Tuple(c UInt32, d UInt32), e UInt32) as a format JSONEachRow" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "select a.b.d, a.b, a.e from file('$DATA_FILE', JSONEachRow, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
$CLICKHOUSE_LOCAL -q "select x.b.d, x.b, x.e from file('$DATA_FILE', JSONEachRow, 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
$CLICKHOUSE_LOCAL -q "select x.b.d, x.b, x.e from file('$DATA_FILE', JSONEachRow, 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) default ((42, 42), 42)')"
rm $DATA_FILE