mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Merge pull request #35586 from kssenii/fix-virtual-columns-s3
Fix s3 engine getting virtual columns
This commit is contained in:
commit
5c6b84c398
@ -36,6 +36,7 @@ Example of configuration:
|
||||
<access_key_id>AKIAIOSFODNN7EXAMPLE</access_key_id>
|
||||
<secret_access_key> wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY</secret_access_key>
|
||||
<format>CSV</format>
|
||||
<url>https://s3.us-east-1.amazonaws.com/yourbucket/mydata/</url>
|
||||
</s3_mydata>
|
||||
</named_collections>
|
||||
</clickhouse>
|
||||
@ -44,12 +45,12 @@ Example of configuration:
|
||||
### Example of using named connections with the s3 function
|
||||
|
||||
```sql
|
||||
INSERT INTO FUNCTION s3(s3_mydata, url = 'https://s3.us-east-1.amazonaws.com/yourbucket/mydata/test_file.tsv.gz',
|
||||
INSERT INTO FUNCTION s3(s3_mydata, filename = 'test_file.tsv.gz',
|
||||
format = 'TSV', structure = 'number UInt64', compression_method = 'gzip')
|
||||
SELECT * FROM numbers(10000);
|
||||
|
||||
SELECT count()
|
||||
FROM s3(s3_mydata, url = 'https://s3.us-east-1.amazonaws.com/yourbucket/mydata/test_file.tsv.gz')
|
||||
FROM s3(s3_mydata, filename = 'test_file.tsv.gz')
|
||||
|
||||
┌─count()─┐
|
||||
│ 10000 │
|
||||
|
@ -26,6 +26,7 @@
|
||||
#include <Storages/HDFS/ReadBufferFromHDFS.h>
|
||||
#include <Storages/HDFS/WriteBufferFromHDFS.h>
|
||||
#include <Storages/PartitionedSink.h>
|
||||
#include <Storages/getVirtualsForStorage.h>
|
||||
|
||||
#include <Formats/ReadSchemaUtils.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
@ -164,6 +165,13 @@ StorageHDFS::StorageHDFS(
|
||||
storage_metadata.setConstraints(constraints_);
|
||||
storage_metadata.setComment(comment);
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
|
||||
auto default_virtuals = NamesAndTypesList{
|
||||
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
||||
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
|
||||
|
||||
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
|
||||
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
|
||||
}
|
||||
|
||||
ColumnsDescription StorageHDFS::getTableStructureFromData(
|
||||
@ -273,36 +281,6 @@ private:
|
||||
Strings::iterator uris_iter;
|
||||
};
|
||||
|
||||
Block HDFSSource::getHeader(const StorageMetadataPtr & metadata_snapshot, bool need_path_column, bool need_file_column)
|
||||
{
|
||||
auto header = metadata_snapshot->getSampleBlock();
|
||||
/// Note: AddingDefaultsBlockInputStream doesn't change header.
|
||||
if (need_path_column)
|
||||
header.insert(
|
||||
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
|
||||
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
|
||||
"_path"});
|
||||
if (need_file_column)
|
||||
header.insert(
|
||||
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
|
||||
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
|
||||
"_file"});
|
||||
return header;
|
||||
}
|
||||
|
||||
Block HDFSSource::getBlockForSource(
|
||||
const StorageHDFSPtr & storage,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
const ColumnsDescription & columns_description,
|
||||
bool need_path_column,
|
||||
bool need_file_column)
|
||||
{
|
||||
if (storage->isColumnOriented())
|
||||
return storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
|
||||
else
|
||||
return getHeader(storage_snapshot->metadata, need_path_column, need_file_column);
|
||||
}
|
||||
|
||||
HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(ContextPtr context_, const String & uri)
|
||||
: pimpl(std::make_shared<HDFSSource::DisclosedGlobIterator::Impl>(context_, uri)) {}
|
||||
|
||||
@ -321,22 +299,28 @@ String HDFSSource::URISIterator::next()
|
||||
return pimpl->next();
|
||||
}
|
||||
|
||||
Block HDFSSource::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
|
||||
{
|
||||
for (const auto & virtual_column : requested_virtual_columns)
|
||||
sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
|
||||
|
||||
return sample_block;
|
||||
}
|
||||
|
||||
HDFSSource::HDFSSource(
|
||||
StorageHDFSPtr storage_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
const Block & block_for_format_,
|
||||
const std::vector<NameAndTypePair> & requested_virtual_columns_,
|
||||
ContextPtr context_,
|
||||
UInt64 max_block_size_,
|
||||
bool need_path_column_,
|
||||
bool need_file_column_,
|
||||
std::shared_ptr<IteratorWrapper> file_iterator_,
|
||||
ColumnsDescription columns_description_)
|
||||
: SourceWithProgress(getBlockForSource(storage_, storage_snapshot_, columns_description_, need_path_column_, need_file_column_))
|
||||
: SourceWithProgress(getHeader(block_for_format_, requested_virtual_columns_))
|
||||
, WithContext(context_)
|
||||
, storage(std::move(storage_))
|
||||
, storage_snapshot(storage_snapshot_)
|
||||
, block_for_format(block_for_format_)
|
||||
, requested_virtual_columns(requested_virtual_columns_)
|
||||
, max_block_size(max_block_size_)
|
||||
, need_path_column(need_path_column_)
|
||||
, need_file_column(need_file_column_)
|
||||
, file_iterator(file_iterator_)
|
||||
, columns_description(std::move(columns_description_))
|
||||
{
|
||||
@ -361,14 +345,7 @@ bool HDFSSource::initialize()
|
||||
auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method);
|
||||
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef()), compression);
|
||||
|
||||
auto get_block_for_format = [&]() -> Block
|
||||
{
|
||||
if (storage->isColumnOriented())
|
||||
return storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
|
||||
return storage_snapshot->metadata->getSampleBlock();
|
||||
};
|
||||
|
||||
auto input_format = getContext()->getInputFormat(storage->format_name, *read_buf, get_block_for_format(), max_block_size);
|
||||
auto input_format = getContext()->getInputFormat(storage->format_name, *read_buf, block_for_format, max_block_size);
|
||||
|
||||
QueryPipelineBuilder builder;
|
||||
builder.init(Pipe(input_format));
|
||||
@ -402,20 +379,21 @@ Chunk HDFSSource::generate()
|
||||
Columns columns = chunk.getColumns();
|
||||
UInt64 num_rows = chunk.getNumRows();
|
||||
|
||||
/// Enrich with virtual columns.
|
||||
if (need_path_column)
|
||||
for (const auto & virtual_column : requested_virtual_columns)
|
||||
{
|
||||
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, current_path);
|
||||
columns.push_back(column->convertToFullColumnIfConst());
|
||||
}
|
||||
if (virtual_column.name == "_path")
|
||||
{
|
||||
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, current_path);
|
||||
columns.push_back(column->convertToFullColumnIfConst());
|
||||
}
|
||||
else if (virtual_column.name == "_file")
|
||||
{
|
||||
size_t last_slash_pos = current_path.find_last_of('/');
|
||||
auto file_name = current_path.substr(last_slash_pos + 1);
|
||||
|
||||
if (need_file_column)
|
||||
{
|
||||
size_t last_slash_pos = current_path.find_last_of('/');
|
||||
auto file_name = current_path.substr(last_slash_pos + 1);
|
||||
|
||||
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, std::move(file_name));
|
||||
columns.push_back(column->convertToFullColumnIfConst());
|
||||
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, std::move(file_name));
|
||||
columns.push_back(column->convertToFullColumnIfConst());
|
||||
}
|
||||
}
|
||||
|
||||
return Chunk(std::move(columns), num_rows);
|
||||
@ -526,17 +504,6 @@ Pipe StorageHDFS::read(
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
bool need_path_column = false;
|
||||
bool need_file_column = false;
|
||||
|
||||
for (const auto & column : column_names)
|
||||
{
|
||||
if (column == "_path")
|
||||
need_path_column = true;
|
||||
if (column == "_file")
|
||||
need_file_column = true;
|
||||
}
|
||||
|
||||
std::shared_ptr<HDFSSource::IteratorWrapper> iterator_wrapper{nullptr};
|
||||
if (distributed_processing)
|
||||
{
|
||||
@ -563,27 +530,51 @@ Pipe StorageHDFS::read(
|
||||
});
|
||||
}
|
||||
|
||||
std::unordered_set<String> column_names_set(column_names.begin(), column_names.end());
|
||||
std::vector<NameAndTypePair> requested_virtual_columns;
|
||||
|
||||
for (const auto & virtual_column : getVirtuals())
|
||||
{
|
||||
if (column_names_set.contains(virtual_column.name))
|
||||
requested_virtual_columns.push_back(virtual_column);
|
||||
}
|
||||
|
||||
ColumnsDescription columns_description;
|
||||
Block block_for_format;
|
||||
if (isColumnOriented())
|
||||
{
|
||||
auto fetch_columns = column_names;
|
||||
const auto & virtuals = getVirtuals();
|
||||
std::erase_if(
|
||||
fetch_columns,
|
||||
[&](const String & col)
|
||||
{ return std::any_of(virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col){ return col == virtual_col.name; }); });
|
||||
|
||||
if (fetch_columns.empty())
|
||||
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()));
|
||||
|
||||
columns_description = ColumnsDescription{
|
||||
storage_snapshot->getSampleBlockForColumns(fetch_columns).getNamesAndTypesList()};
|
||||
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
|
||||
}
|
||||
else
|
||||
{
|
||||
columns_description = storage_snapshot->metadata->getColumns();
|
||||
block_for_format = storage_snapshot->metadata->getSampleBlock();
|
||||
}
|
||||
|
||||
Pipes pipes;
|
||||
auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this());
|
||||
for (size_t i = 0; i < num_streams; ++i)
|
||||
{
|
||||
const auto get_columns_for_format = [&]() -> ColumnsDescription
|
||||
{
|
||||
if (isColumnOriented())
|
||||
return ColumnsDescription{storage_snapshot->getSampleBlockForColumns(column_names).getNamesAndTypesList()};
|
||||
else
|
||||
return storage_snapshot->metadata->getColumns();
|
||||
};
|
||||
|
||||
pipes.emplace_back(std::make_shared<HDFSSource>(
|
||||
this_ptr,
|
||||
storage_snapshot,
|
||||
block_for_format,
|
||||
requested_virtual_columns,
|
||||
context_,
|
||||
max_block_size,
|
||||
need_path_column,
|
||||
need_file_column,
|
||||
iterator_wrapper,
|
||||
get_columns_for_format()));
|
||||
columns_description));
|
||||
}
|
||||
return Pipe::unitePipes(std::move(pipes));
|
||||
}
|
||||
@ -715,9 +706,7 @@ void registerStorageHDFS(StorageFactory & factory)
|
||||
|
||||
NamesAndTypesList StorageHDFS::getVirtuals() const
|
||||
{
|
||||
return NamesAndTypesList{
|
||||
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
||||
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
|
||||
return virtual_columns;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -76,6 +76,7 @@ private:
|
||||
const bool distributed_processing;
|
||||
ASTPtr partition_by;
|
||||
bool is_path_with_globs;
|
||||
NamesAndTypesList virtual_columns;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("StorageHDFS");
|
||||
};
|
||||
@ -110,25 +111,14 @@ public:
|
||||
using IteratorWrapper = std::function<String()>;
|
||||
using StorageHDFSPtr = std::shared_ptr<StorageHDFS>;
|
||||
|
||||
static Block getHeader(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
bool need_path_column,
|
||||
bool need_file_column);
|
||||
|
||||
static Block getBlockForSource(
|
||||
const StorageHDFSPtr & storage,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
const ColumnsDescription & columns_description,
|
||||
bool need_path_column,
|
||||
bool need_file_column);
|
||||
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
|
||||
|
||||
HDFSSource(
|
||||
StorageHDFSPtr storage_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
const Block & block_for_format_,
|
||||
const std::vector<NameAndTypePair> & requested_virtual_columns_,
|
||||
ContextPtr context_,
|
||||
UInt64 max_block_size_,
|
||||
bool need_path_column_,
|
||||
bool need_file_column_,
|
||||
std::shared_ptr<IteratorWrapper> file_iterator_,
|
||||
ColumnsDescription columns_description_);
|
||||
|
||||
@ -140,7 +130,8 @@ public:
|
||||
|
||||
private:
|
||||
StorageHDFSPtr storage;
|
||||
StorageSnapshotPtr storage_snapshot;
|
||||
Block block_for_format;
|
||||
std::vector<NameAndTypePair> requested_virtual_columns;
|
||||
UInt64 max_block_size;
|
||||
bool need_path_column;
|
||||
bool need_file_column;
|
||||
|
@ -25,6 +25,7 @@
|
||||
#include <Storages/StorageS3Settings.h>
|
||||
#include <Storages/StorageSnapshot.h>
|
||||
#include <Storages/PartitionedSink.h>
|
||||
#include <Storages/getVirtualsForStorage.h>
|
||||
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
@ -210,25 +211,16 @@ String StorageS3Source::KeysIterator::next()
|
||||
return pimpl->next();
|
||||
}
|
||||
|
||||
Block StorageS3Source::getHeader(Block sample_block, bool with_path_column, bool with_file_column)
|
||||
Block StorageS3Source::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
|
||||
{
|
||||
if (with_path_column)
|
||||
sample_block.insert(
|
||||
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
|
||||
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
|
||||
"_path"});
|
||||
if (with_file_column)
|
||||
sample_block.insert(
|
||||
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
|
||||
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
|
||||
"_file"});
|
||||
for (const auto & virtual_column : requested_virtual_columns)
|
||||
sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
|
||||
|
||||
return sample_block;
|
||||
}
|
||||
|
||||
StorageS3Source::StorageS3Source(
|
||||
bool need_path,
|
||||
bool need_file,
|
||||
const std::vector<NameAndTypePair> & requested_virtual_columns_,
|
||||
const String & format_,
|
||||
String name_,
|
||||
const Block & sample_block_,
|
||||
@ -242,7 +234,7 @@ StorageS3Source::StorageS3Source(
|
||||
const String & bucket_,
|
||||
std::shared_ptr<IteratorWrapper> file_iterator_,
|
||||
const size_t download_thread_num_)
|
||||
: SourceWithProgress(getHeader(sample_block_, need_path, need_file))
|
||||
: SourceWithProgress(getHeader(sample_block_, requested_virtual_columns_))
|
||||
, WithContext(context_)
|
||||
, name(std::move(name_))
|
||||
, bucket(bucket_)
|
||||
@ -254,8 +246,7 @@ StorageS3Source::StorageS3Source(
|
||||
, client(client_)
|
||||
, sample_block(sample_block_)
|
||||
, format_settings(format_settings_)
|
||||
, with_file_column(need_file)
|
||||
, with_path_column(need_path)
|
||||
, requested_virtual_columns(requested_virtual_columns_)
|
||||
, file_iterator(file_iterator_)
|
||||
, download_thread_num(download_thread_num_)
|
||||
{
|
||||
@ -344,16 +335,18 @@ Chunk StorageS3Source::generate()
|
||||
{
|
||||
UInt64 num_rows = chunk.getNumRows();
|
||||
|
||||
if (with_path_column)
|
||||
chunk.addColumn(DataTypeLowCardinality{std::make_shared<DataTypeString>()}
|
||||
.createColumnConst(num_rows, file_path)
|
||||
->convertToFullColumnIfConst());
|
||||
if (with_file_column)
|
||||
for (const auto & virtual_column : requested_virtual_columns)
|
||||
{
|
||||
size_t last_slash_pos = file_path.find_last_of('/');
|
||||
chunk.addColumn(DataTypeLowCardinality{std::make_shared<DataTypeString>()}
|
||||
.createColumnConst(num_rows, file_path.substr(last_slash_pos + 1))
|
||||
->convertToFullColumnIfConst());
|
||||
if (virtual_column.name == "_path")
|
||||
{
|
||||
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, file_path)->convertToFullColumnIfConst());
|
||||
}
|
||||
else if (virtual_column.name == "_file")
|
||||
{
|
||||
size_t last_slash_pos = file_path.find_last_of('/');
|
||||
auto column = virtual_column.type->createColumnConst(num_rows, file_path.substr(last_slash_pos + 1));
|
||||
chunk.addColumn(column->convertToFullColumnIfConst());
|
||||
}
|
||||
}
|
||||
|
||||
return chunk;
|
||||
@ -627,6 +620,13 @@ StorageS3::StorageS3(
|
||||
storage_metadata.setConstraints(constraints_);
|
||||
storage_metadata.setComment(comment);
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
|
||||
auto default_virtuals = NamesAndTypesList{
|
||||
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
||||
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
|
||||
|
||||
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
|
||||
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
|
||||
}
|
||||
|
||||
std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(const ClientAuthentication & client_auth, const std::vector<String> & keys, bool is_key_with_globs, bool distributed_processing, ContextPtr local_context)
|
||||
@ -674,14 +674,14 @@ Pipe StorageS3::read(
|
||||
updateClientAndAuthSettings(local_context, client_auth);
|
||||
|
||||
Pipes pipes;
|
||||
bool need_path_column = false;
|
||||
bool need_file_column = false;
|
||||
for (const auto & column : column_names)
|
||||
|
||||
std::unordered_set<String> column_names_set(column_names.begin(), column_names.end());
|
||||
std::vector<NameAndTypePair> requested_virtual_columns;
|
||||
|
||||
for (const auto & virtual_column : getVirtuals())
|
||||
{
|
||||
if (column == "_path")
|
||||
need_path_column = true;
|
||||
if (column == "_file")
|
||||
need_file_column = true;
|
||||
if (column_names_set.contains(virtual_column.name))
|
||||
requested_virtual_columns.push_back(virtual_column);
|
||||
}
|
||||
|
||||
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper = createFileIterator(client_auth, keys, is_key_with_globs, distributed_processing, local_context);
|
||||
@ -690,8 +690,18 @@ Pipe StorageS3::read(
|
||||
Block block_for_format;
|
||||
if (isColumnOriented())
|
||||
{
|
||||
auto fetch_columns = column_names;
|
||||
const auto & virtuals = getVirtuals();
|
||||
std::erase_if(
|
||||
fetch_columns,
|
||||
[&](const String & col)
|
||||
{ return std::any_of(virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col){ return col == virtual_col.name; }); });
|
||||
|
||||
if (fetch_columns.empty())
|
||||
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()));
|
||||
|
||||
columns_description = ColumnsDescription{
|
||||
storage_snapshot->getSampleBlockForColumns(column_names).getNamesAndTypesList()};
|
||||
storage_snapshot->getSampleBlockForColumns(fetch_columns).getNamesAndTypesList()};
|
||||
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
|
||||
}
|
||||
else
|
||||
@ -704,8 +714,7 @@ Pipe StorageS3::read(
|
||||
for (size_t i = 0; i < num_streams; ++i)
|
||||
{
|
||||
pipes.emplace_back(std::make_shared<StorageS3Source>(
|
||||
need_path_column,
|
||||
need_file_column,
|
||||
requested_virtual_columns,
|
||||
format_name,
|
||||
getName(),
|
||||
block_for_format,
|
||||
@ -882,6 +891,8 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
|
||||
configuration.access_key_id = arg_value->as<ASTLiteral>()->value.safeGet<String>();
|
||||
else if (arg_name == "secret_access_key")
|
||||
configuration.secret_access_key = arg_value->as<ASTLiteral>()->value.safeGet<String>();
|
||||
else if (arg_name == "filename")
|
||||
configuration.url = std::filesystem::path(configuration.url) / arg_value->as<ASTLiteral>()->value.safeGet<String>();
|
||||
else
|
||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
|
||||
"Unknown key-value argument `{}` for StorageS3, expected: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
|
||||
@ -1081,9 +1092,7 @@ void registerStorageCOS(StorageFactory & factory)
|
||||
|
||||
NamesAndTypesList StorageS3::getVirtuals() const
|
||||
{
|
||||
return NamesAndTypesList{
|
||||
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
||||
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
|
||||
return virtual_columns;
|
||||
}
|
||||
|
||||
bool StorageS3::supportsPartitionBy() const
|
||||
|
@ -58,11 +58,10 @@ public:
|
||||
|
||||
using IteratorWrapper = std::function<String()>;
|
||||
|
||||
static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column);
|
||||
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
|
||||
|
||||
StorageS3Source(
|
||||
bool need_path,
|
||||
bool need_file,
|
||||
const std::vector<NameAndTypePair> & requested_virtual_columns_,
|
||||
const String & format,
|
||||
String name_,
|
||||
const Block & sample_block,
|
||||
@ -102,8 +101,7 @@ private:
|
||||
std::unique_ptr<PullingPipelineExecutor> reader;
|
||||
/// onCancel and generate can be called concurrently
|
||||
std::mutex reader_mutex;
|
||||
bool with_file_column = false;
|
||||
bool with_path_column = false;
|
||||
std::vector<NameAndTypePair> requested_virtual_columns;
|
||||
std::shared_ptr<IteratorWrapper> file_iterator;
|
||||
size_t download_thread_num = 1;
|
||||
|
||||
@ -196,6 +194,7 @@ private:
|
||||
|
||||
ClientAuthentication client_auth;
|
||||
std::vector<String> keys;
|
||||
NamesAndTypesList virtual_columns;
|
||||
|
||||
String format_name;
|
||||
UInt64 max_single_read_retries;
|
||||
|
22
src/Storages/getVirtualsForStorage.cpp
Normal file
22
src/Storages/getVirtualsForStorage.cpp
Normal file
@ -0,0 +1,22 @@
|
||||
#include "getVirtualsForStorage.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
NamesAndTypesList getVirtualsForStorage(const NamesAndTypesList & storage_columns_, const NamesAndTypesList & default_virtuals_)
|
||||
{
|
||||
auto default_virtuals = default_virtuals_;
|
||||
auto storage_columns = storage_columns_;
|
||||
default_virtuals.sort();
|
||||
storage_columns.sort();
|
||||
|
||||
NamesAndTypesList result_virtuals;
|
||||
std::set_difference(
|
||||
default_virtuals.begin(), default_virtuals.end(), storage_columns.begin(), storage_columns.end(),
|
||||
std::back_inserter(result_virtuals),
|
||||
[](const NameAndTypePair & lhs, const NameAndTypePair & rhs){ return lhs.name < rhs.name; });
|
||||
|
||||
return result_virtuals;
|
||||
}
|
||||
|
||||
}
|
9
src/Storages/getVirtualsForStorage.h
Normal file
9
src/Storages/getVirtualsForStorage.h
Normal file
@ -0,0 +1,9 @@
|
||||
#pragma once
|
||||
#include <Core/NamesAndTypes.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
NamesAndTypesList getVirtualsForStorage(const NamesAndTypesList & storage_columns_, const NamesAndTypesList & default_virtuals_);
|
||||
|
||||
}
|
@ -12,6 +12,7 @@
|
||||
#include <Storages/StorageS3.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include "registerTableFunctions.h"
|
||||
#include <filesystem>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -37,6 +38,8 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar
|
||||
s3_configuration.access_key_id = arg_value->as<ASTLiteral>()->value.safeGet<String>();
|
||||
else if (arg_name == "secret_access_key")
|
||||
s3_configuration.secret_access_key = arg_value->as<ASTLiteral>()->value.safeGet<String>();
|
||||
else if (arg_name == "filename")
|
||||
s3_configuration.url = std::filesystem::path(s3_configuration.url) / arg_value->as<ASTLiteral>()->value.safeGet<String>();
|
||||
else
|
||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, error_message);
|
||||
}
|
||||
|
@ -22,5 +22,11 @@
|
||||
</header>
|
||||
</headers>
|
||||
</url_with_headers>
|
||||
<s3_conn>
|
||||
<url>http://localhost:11111/test/</url>
|
||||
<access_key_id>test</access_key_id>
|
||||
<secret_access_key>testtest</secret_access_key>
|
||||
<structure>auto</structure>
|
||||
</s3_conn>
|
||||
</named_collections>
|
||||
</clickhouse>
|
||||
|
@ -554,6 +554,26 @@ def test_insert_select_schema_inference(started_cluster):
|
||||
assert int(result) == 1
|
||||
|
||||
|
||||
def test_virtual_columns_2(started_cluster):
|
||||
hdfs_api = started_cluster.hdfs_api
|
||||
|
||||
table_function = (
|
||||
f"hdfs('hdfs://hdfs1:9000/parquet_2', 'Parquet', 'a Int32, b String')"
|
||||
)
|
||||
node1.query(f"insert into table function {table_function} SELECT 1, 'kek'")
|
||||
|
||||
result = node1.query(f"SELECT _path FROM {table_function}")
|
||||
assert result.strip() == "hdfs://hdfs1:9000/parquet_2"
|
||||
|
||||
table_function = (
|
||||
f"hdfs('hdfs://hdfs1:9000/parquet_3', 'Parquet', 'a Int32, _path String')"
|
||||
)
|
||||
node1.query(f"insert into table function {table_function} SELECT 1, 'kek'")
|
||||
|
||||
result = node1.query(f"SELECT _path FROM {table_function}")
|
||||
assert result.strip() == "kek"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cluster.start()
|
||||
input("Cluster created, press any key to destroy...")
|
||||
|
15
tests/queries/0_stateless/02245_s3_virtual_columns.reference
Normal file
15
tests/queries/0_stateless/02245_s3_virtual_columns.reference
Normal file
@ -0,0 +1,15 @@
|
||||
-- { echo }
|
||||
drop table if exists test_02245;
|
||||
create table test_02245 (a UInt64) engine = S3(s3_conn, filename='test_02245', format=Parquet);
|
||||
insert into test_02245 select 1 settings s3_truncate_on_insert=1;
|
||||
select * from test_02245;
|
||||
1
|
||||
select _path from test_02245;
|
||||
test/test_02245
|
||||
drop table if exists test_02245_2;
|
||||
create table test_02245_2 (a UInt64, _path Int32) engine = S3(s3_conn, filename='test_02245_2', format=Parquet);
|
||||
insert into test_02245_2 select 1, 2 settings s3_truncate_on_insert=1;
|
||||
select * from test_02245_2;
|
||||
1 2
|
||||
select _path from test_02245_2;
|
||||
2
|
15
tests/queries/0_stateless/02245_s3_virtual_columns.sql
Normal file
15
tests/queries/0_stateless/02245_s3_virtual_columns.sql
Normal file
@ -0,0 +1,15 @@
|
||||
-- Tags: no-fasttest
|
||||
-- Tag no-fasttest: Depends on AWS
|
||||
|
||||
-- { echo }
|
||||
drop table if exists test_02245;
|
||||
create table test_02245 (a UInt64) engine = S3(s3_conn, filename='test_02245', format=Parquet);
|
||||
insert into test_02245 select 1 settings s3_truncate_on_insert=1;
|
||||
select * from test_02245;
|
||||
select _path from test_02245;
|
||||
|
||||
drop table if exists test_02245_2;
|
||||
create table test_02245_2 (a UInt64, _path Int32) engine = S3(s3_conn, filename='test_02245_2', format=Parquet);
|
||||
insert into test_02245_2 select 1, 2 settings s3_truncate_on_insert=1;
|
||||
select * from test_02245_2;
|
||||
select _path from test_02245_2;
|
Loading…
Reference in New Issue
Block a user