mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge pull request #62259 from divanik/divanik/fix_archieve_support_in_s3
Add support of reading from archives in S3
This commit is contained in:
commit
9b09550176
@ -83,7 +83,7 @@ setup_minio() {
|
||||
./mc alias set clickminio http://localhost:11111 clickhouse clickhouse
|
||||
./mc admin user add clickminio test testtest
|
||||
./mc admin policy set clickminio readwrite user=test
|
||||
./mc mb clickminio/test
|
||||
./mc mb --ignore-existing clickminio/test
|
||||
if [ "$test_type" = "stateless" ]; then
|
||||
./mc policy set public clickminio/test
|
||||
fi
|
||||
|
@ -248,6 +248,25 @@ FROM s3(
|
||||
LIMIT 5;
|
||||
```
|
||||
|
||||
|
||||
## Working with archives
|
||||
|
||||
Suppose that we have several archive files with following URIs on S3:
|
||||
|
||||
- 'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-10.csv.zip'
|
||||
- 'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-11.csv.zip'
|
||||
- 'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-12.csv.zip'
|
||||
|
||||
Extracting data from these archives is possible using ::. Globs can be used both in the url part as well as in the part after :: (responsible for the name of a file inside the archive).
|
||||
|
||||
``` sql
|
||||
SELECT *
|
||||
FROM s3(
|
||||
'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-1{0..2}.csv.zip :: *.csv'
|
||||
);
|
||||
```
|
||||
|
||||
|
||||
## Virtual Columns {#virtual-columns}
|
||||
|
||||
- `_path` — Path to the file. Type: `LowCardinalty(String)`.
|
||||
|
@ -1,8 +1,7 @@
|
||||
#include <IO/S3/URI.h>
|
||||
#include <Poco/URI.h>
|
||||
#include "Common/Macros.h"
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/NamedCollectionsHelpers.h>
|
||||
#include "Common/Macros.h"
|
||||
#if USE_AWS_S3
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/quoteString.h>
|
||||
@ -55,7 +54,11 @@ URI::URI(const std::string & uri_)
|
||||
static constexpr auto OSS = "OSS";
|
||||
static constexpr auto EOS = "EOS";
|
||||
|
||||
uri = Poco::URI(uri_);
|
||||
if (containsArchive(uri_))
|
||||
std::tie(uri_str, archive_pattern) = getPathToArchiveAndArchivePattern(uri_);
|
||||
else
|
||||
uri_str = uri_;
|
||||
uri = Poco::URI(uri_str);
|
||||
|
||||
std::unordered_map<std::string, std::string> mapper;
|
||||
auto context = Context::getGlobalContextInstance();
|
||||
@ -126,9 +129,10 @@ URI::URI(const std::string & uri_)
|
||||
boost::to_upper(name);
|
||||
/// For S3Express it will look like s3express-eun1-az1, i.e. contain region and AZ info
|
||||
if (name != S3 && !name.starts_with(S3EXPRESS) && name != COS && name != OBS && name != OSS && name != EOS)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Object storage system name is unrecognized in virtual hosted style S3 URI: {}",
|
||||
quoteString(name));
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Object storage system name is unrecognized in virtual hosted style S3 URI: {}",
|
||||
quoteString(name));
|
||||
|
||||
if (name == COS)
|
||||
storage_name = COSN;
|
||||
@ -156,10 +160,40 @@ void URI::validateBucket(const String & bucket, const Poco::URI & uri)
|
||||
/// S3 specification requires at least 3 and at most 63 characters in bucket name.
|
||||
/// https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html
|
||||
if (bucket.length() < 3 || bucket.length() > 63)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bucket name length is out of bounds in virtual hosted style S3 URI: {}{}",
|
||||
quoteString(bucket), !uri.empty() ? " (" + uri.toString() + ")" : "");
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Bucket name length is out of bounds in virtual hosted style S3 URI: {}{}",
|
||||
quoteString(bucket),
|
||||
!uri.empty() ? " (" + uri.toString() + ")" : "");
|
||||
}
|
||||
|
||||
bool URI::containsArchive(const std::string & source)
|
||||
{
|
||||
size_t pos = source.find("::");
|
||||
return (pos != std::string::npos);
|
||||
}
|
||||
|
||||
std::pair<std::string, std::string> URI::getPathToArchiveAndArchivePattern(const std::string & source)
|
||||
{
|
||||
size_t pos = source.find("::");
|
||||
assert(pos != std::string::npos);
|
||||
|
||||
std::string path_to_archive = source.substr(0, pos);
|
||||
while ((!path_to_archive.empty()) && path_to_archive.ends_with(' '))
|
||||
path_to_archive.pop_back();
|
||||
|
||||
if (path_to_archive.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path to archive is empty");
|
||||
|
||||
std::string_view path_in_archive_view = std::string_view{source}.substr(pos + 2);
|
||||
while (path_in_archive_view.front() == ' ')
|
||||
path_in_archive_view.remove_prefix(1);
|
||||
|
||||
if (path_in_archive_view.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filename is empty");
|
||||
|
||||
return {path_to_archive, std::string{path_in_archive_view}};
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <optional>
|
||||
#include <string>
|
||||
|
||||
#include "config.h"
|
||||
@ -28,6 +29,8 @@ struct URI
|
||||
std::string key;
|
||||
std::string version_id;
|
||||
std::string storage_name;
|
||||
std::optional<std::string> archive_pattern;
|
||||
std::string uri_str;
|
||||
|
||||
bool is_virtual_hosted_style;
|
||||
|
||||
@ -36,6 +39,10 @@ struct URI
|
||||
void addRegionToURI(const std::string & region);
|
||||
|
||||
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
|
||||
|
||||
private:
|
||||
bool containsArchive(const std::string & source);
|
||||
std::pair<std::string, std::string> getPathToArchiveAndArchivePattern(const std::string & source);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include <boost/algorithm/string/case_conv.hpp>
|
||||
#include <parquet/arrow/reader.h>
|
||||
#include <ranges>
|
||||
#include <filesystem>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
|
@ -54,7 +54,7 @@ public:
|
||||
{
|
||||
std::lock_guard lock(configuration_update_mutex);
|
||||
updateConfigurationImpl(local_context);
|
||||
return Storage::getConfiguration();
|
||||
return Storage::getConfigurationCopy();
|
||||
}
|
||||
|
||||
void updateConfiguration(const ContextPtr & local_context) override
|
||||
@ -106,7 +106,7 @@ private:
|
||||
const bool updated = base_configuration.update(local_context);
|
||||
auto new_keys = getDataFiles(base_configuration, local_context);
|
||||
|
||||
if (!updated && new_keys == Storage::getConfiguration().keys)
|
||||
if (!updated && new_keys == Storage::getConfigurationCopy().keys)
|
||||
return;
|
||||
|
||||
Storage::useConfiguration(getConfigurationForDataRead(base_configuration, local_context, new_keys));
|
||||
|
@ -31,16 +31,17 @@
|
||||
#include <Poco/JSON/Object.h>
|
||||
#include <Poco/JSON/Parser.h>
|
||||
|
||||
#include <filesystem>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
}
|
||||
|
||||
IcebergMetadata::IcebergMetadata(
|
||||
|
@ -63,7 +63,7 @@ public:
|
||||
{
|
||||
std::lock_guard lock(configuration_update_mutex);
|
||||
updateConfigurationImpl(local_context);
|
||||
return StorageS3::getConfiguration();
|
||||
return StorageS3::getConfigurationCopy();
|
||||
}
|
||||
|
||||
void updateConfiguration(const ContextPtr & local_context) override
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <aws/s3/S3Client.h>
|
||||
#include <aws/s3/model/ListObjectsV2Request.h>
|
||||
|
||||
#include <filesystem>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -1,3 +1,4 @@
|
||||
#include <optional>
|
||||
#include "config.h"
|
||||
|
||||
#if USE_AWS_S3
|
||||
@ -365,7 +366,11 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
|
||||
auto configuration_snapshot = updateConfigurationAndGetCopy(local_context);
|
||||
|
||||
auto internal_source = std::make_unique<StorageS3Source>(
|
||||
info, configuration.format, getName(), local_context, format_settings,
|
||||
info,
|
||||
configuration.format,
|
||||
getName(),
|
||||
local_context,
|
||||
format_settings,
|
||||
max_block_size,
|
||||
configuration_snapshot.request_settings,
|
||||
configuration_snapshot.compression_method,
|
||||
@ -373,7 +378,9 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
|
||||
configuration_snapshot.url.bucket,
|
||||
configuration_snapshot.url.version_id,
|
||||
configuration_snapshot.url.uri.getHost() + std::to_string(configuration_snapshot.url.uri.getPort()),
|
||||
file_iterator, local_context->getSettingsRef().max_download_threads, false);
|
||||
file_iterator,
|
||||
local_context->getSettingsRef().max_download_threads,
|
||||
false);
|
||||
|
||||
auto file_deleter = [this, bucket = configuration_snapshot.url.bucket, client = configuration_snapshot.client, blob_storage_log = BlobStorageLogWriter::create()](const std::string & path) mutable
|
||||
{
|
||||
@ -608,8 +615,13 @@ void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const
|
||||
std::shared_ptr<StorageS3Queue::FileIterator> StorageS3Queue::createFileIterator(ContextPtr local_context, const ActionsDAG::Node * predicate)
|
||||
{
|
||||
auto glob_iterator = std::make_unique<StorageS3QueueSource::GlobIterator>(
|
||||
*configuration.client, configuration.url, predicate, getVirtualsList(), local_context,
|
||||
/* read_keys */nullptr, configuration.request_settings);
|
||||
*configuration.client,
|
||||
configuration.url,
|
||||
predicate,
|
||||
getVirtualsList(),
|
||||
local_context,
|
||||
/* read_keys */ nullptr,
|
||||
configuration.request_settings);
|
||||
|
||||
return std::make_shared<FileIterator>(
|
||||
files_metadata, std::move(glob_iterator), s3queue_settings->s3queue_current_shard_num, shutdown_called, log);
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -1,7 +1,10 @@
|
||||
#pragma once
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#include <memory>
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include "IO/Archives/IArchiveReader.h"
|
||||
#include "IO/Archives/createArchiveReader.h"
|
||||
#include "IO/ReadBuffer.h"
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include <Compression/CompressionInfo.h>
|
||||
@ -23,36 +26,52 @@
|
||||
#include <Poco/URI.h>
|
||||
#include <Common/threadPoolCallbackRunner.h>
|
||||
|
||||
#include <filesystem>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
class PullingPipelineExecutor;
|
||||
class NamedCollection;
|
||||
|
||||
class StorageS3Source : public SourceWithKeyCondition, WithContext
|
||||
{
|
||||
public:
|
||||
|
||||
struct KeyWithInfo
|
||||
{
|
||||
KeyWithInfo() = default;
|
||||
|
||||
explicit KeyWithInfo(String key_, std::optional<S3::ObjectInfo> info_ = std::nullopt)
|
||||
: key(std::move(key_)), info(std::move(info_)) {}
|
||||
explicit KeyWithInfo(
|
||||
String key_,
|
||||
std::optional<S3::ObjectInfo> info_ = std::nullopt,
|
||||
std::optional<String> path_in_archive_ = std::nullopt,
|
||||
std::shared_ptr<IArchiveReader> archive_reader_ = nullptr)
|
||||
: key(std::move(key_))
|
||||
, info(std::move(info_))
|
||||
, path_in_archive(std::move(path_in_archive_))
|
||||
, archive_reader(std::move(archive_reader_))
|
||||
{
|
||||
if (path_in_archive.has_value() != (archive_reader != nullptr))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Archive reader and path in archive must exist simultaneously");
|
||||
}
|
||||
|
||||
virtual ~KeyWithInfo() = default;
|
||||
|
||||
String key;
|
||||
std::optional<S3::ObjectInfo> info;
|
||||
std::optional<String> path_in_archive;
|
||||
std::shared_ptr<IArchiveReader> archive_reader;
|
||||
|
||||
String getPath() const { return path_in_archive.has_value() ? (key + "::" + path_in_archive.value()) : key; }
|
||||
String getFileName() const { return path_in_archive.has_value() ? path_in_archive.value() : key; }
|
||||
};
|
||||
|
||||
using KeyWithInfoPtr = std::shared_ptr<KeyWithInfo>;
|
||||
|
||||
using KeysWithInfo = std::vector<KeyWithInfoPtr>;
|
||||
|
||||
class IIterator
|
||||
{
|
||||
public:
|
||||
@ -65,7 +84,7 @@ public:
|
||||
/// fixme: May underestimate if the glob has a strong filter, so there are few matches among the first 1000 ListObjects results.
|
||||
virtual size_t estimatedKeysCount() = 0;
|
||||
|
||||
KeyWithInfoPtr operator ()() { return next(); }
|
||||
KeyWithInfoPtr operator()() { return next(); }
|
||||
};
|
||||
|
||||
class DisclosedGlobIterator : public IIterator
|
||||
@ -126,6 +145,41 @@ public:
|
||||
ReadTaskCallback callback;
|
||||
};
|
||||
|
||||
class ArchiveIterator : public IIterator, public WithContext
|
||||
{
|
||||
public:
|
||||
explicit ArchiveIterator(
|
||||
std::unique_ptr<IIterator> basic_iterator_,
|
||||
const std::string & archive_pattern_,
|
||||
std::shared_ptr<const S3::Client> client_,
|
||||
const String & bucket_,
|
||||
const String & version_id_,
|
||||
const S3Settings::RequestSettings & request_settings,
|
||||
ContextPtr context_,
|
||||
KeysWithInfo * read_keys_);
|
||||
|
||||
KeyWithInfoPtr next(size_t) override; /// NOLINT
|
||||
size_t estimatedKeysCount() override;
|
||||
void refreshArchiveReader();
|
||||
|
||||
private:
|
||||
std::unique_ptr<IIterator> basic_iterator;
|
||||
KeyWithInfoPtr basic_key_with_info_ptr;
|
||||
std::unique_ptr<ReadBufferFromFileBase> basic_read_buffer;
|
||||
std::shared_ptr<IArchiveReader> archive_reader{nullptr};
|
||||
std::unique_ptr<IArchiveReader::FileEnumerator> file_enumerator = nullptr;
|
||||
std::string path_in_archive = {}; // used when reading a single file from archive
|
||||
IArchiveReader::NameFilter filter = {}; // used when files inside archive are defined with a glob
|
||||
std::shared_ptr<const S3::Client> client;
|
||||
const String bucket;
|
||||
const String version_id;
|
||||
S3Settings::RequestSettings request_settings;
|
||||
std::mutex take_next_mutex;
|
||||
KeysWithInfo * read_keys;
|
||||
};
|
||||
|
||||
friend StorageS3Source::ArchiveIterator;
|
||||
|
||||
StorageS3Source(
|
||||
const ReadFromFormatInfo & info,
|
||||
const String & format,
|
||||
@ -194,10 +248,7 @@ private:
|
||||
ReaderHolder(const ReaderHolder & other) = delete;
|
||||
ReaderHolder & operator=(const ReaderHolder & other) = delete;
|
||||
|
||||
ReaderHolder(ReaderHolder && other) noexcept
|
||||
{
|
||||
*this = std::move(other);
|
||||
}
|
||||
ReaderHolder(ReaderHolder && other) noexcept { *this = std::move(other); }
|
||||
|
||||
ReaderHolder & operator=(ReaderHolder && other) noexcept
|
||||
{
|
||||
@ -215,8 +266,9 @@ private:
|
||||
explicit operator bool() const { return reader != nullptr; }
|
||||
PullingPipelineExecutor * operator->() { return reader.get(); }
|
||||
const PullingPipelineExecutor * operator->() const { return reader.get(); }
|
||||
String getPath() const { return fs::path(bucket) / key_with_info->key; }
|
||||
const String & getFile() const { return key_with_info->key; }
|
||||
String getPath() const { return bucket + "/" + key_with_info->getPath(); }
|
||||
String getFile() const { return key_with_info->getFileName(); }
|
||||
bool isArchive() { return key_with_info->path_in_archive.has_value(); }
|
||||
const KeyWithInfo & getKeyWithInfo() const { return *key_with_info; }
|
||||
std::optional<size_t> getFileSize() const { return key_with_info->info ? std::optional(key_with_info->info->size) : std::nullopt; }
|
||||
|
||||
@ -255,10 +307,7 @@ private:
|
||||
ReaderHolder createReader(size_t idx = 0);
|
||||
std::future<ReaderHolder> createReaderAsync(size_t idx = 0);
|
||||
|
||||
std::unique_ptr<ReadBuffer> createS3ReadBuffer(const String & key, size_t object_size);
|
||||
std::unique_ptr<ReadBuffer> createAsyncS3ReadBuffer(const String & key, const ReadSettings & read_settings, size_t object_size);
|
||||
|
||||
void addNumRowsToCache(const String & key, size_t num_rows);
|
||||
void addNumRowsToCache(const String & bucket_with_key, size_t num_rows);
|
||||
std::optional<size_t> tryGetNumRowsFromCache(const KeyWithInfo & key_with_info);
|
||||
};
|
||||
|
||||
@ -285,8 +334,7 @@ public:
|
||||
bool withPartitionWildcard() const
|
||||
{
|
||||
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
|
||||
return url.bucket.find(PARTITION_ID_WILDCARD) != String::npos
|
||||
|| keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
|
||||
return url.bucket.find(PARTITION_ID_WILDCARD) != String::npos || keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
|
||||
}
|
||||
|
||||
bool withGlobsIgnorePartitionWildcard() const;
|
||||
@ -315,10 +363,7 @@ public:
|
||||
bool distributed_processing_ = false,
|
||||
ASTPtr partition_by_ = nullptr);
|
||||
|
||||
String getName() const override
|
||||
{
|
||||
return name;
|
||||
}
|
||||
String getName() const override { return name; }
|
||||
|
||||
void read(
|
||||
QueryPlan & query_plan,
|
||||
@ -330,27 +375,25 @@ public:
|
||||
size_t max_block_size,
|
||||
size_t num_streams) override;
|
||||
|
||||
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context, bool async_insert) override;
|
||||
SinkToStoragePtr
|
||||
write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context, bool async_insert) override;
|
||||
|
||||
void truncate(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, TableExclusiveLockHolder &) override;
|
||||
void truncate(
|
||||
const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, TableExclusiveLockHolder &) override;
|
||||
|
||||
bool supportsPartitionBy() const override;
|
||||
|
||||
static void processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection);
|
||||
static void processNamedCollectionResult(Configuration & configuration, const NamedCollection & collection);
|
||||
|
||||
static SchemaCache & getSchemaCache(const ContextPtr & ctx);
|
||||
|
||||
static StorageS3::Configuration getConfiguration(ASTs & engine_args, const ContextPtr & local_context, bool get_format_from_file = true);
|
||||
static Configuration getConfiguration(ASTs & engine_args, const ContextPtr & local_context, bool get_format_from_file = true);
|
||||
|
||||
static ColumnsDescription getTableStructureFromData(
|
||||
const StorageS3::Configuration & configuration,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const ContextPtr & ctx);
|
||||
const Configuration & configuration_, const std::optional<FormatSettings> & format_settings_, const ContextPtr & ctx);
|
||||
|
||||
static std::pair<ColumnsDescription, String> getTableStructureAndFormatFromData(
|
||||
const StorageS3::Configuration & configuration,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const ContextPtr & ctx);
|
||||
const Configuration & configuration, const std::optional<FormatSettings> & format_settings, const ContextPtr & ctx);
|
||||
|
||||
using KeysWithInfo = StorageS3Source::KeysWithInfo;
|
||||
|
||||
@ -363,7 +406,9 @@ protected:
|
||||
|
||||
void useConfiguration(const Configuration & new_configuration);
|
||||
|
||||
const Configuration & getConfiguration();
|
||||
Configuration getConfigurationCopy() const;
|
||||
|
||||
String getFormatCopy() const;
|
||||
|
||||
private:
|
||||
friend class StorageS3Cluster;
|
||||
@ -372,7 +417,7 @@ private:
|
||||
friend class ReadFromStorageS3Step;
|
||||
|
||||
Configuration configuration;
|
||||
std::mutex configuration_update_mutex;
|
||||
mutable std::mutex configuration_update_mutex;
|
||||
|
||||
String name;
|
||||
const bool distributed_processing;
|
||||
@ -394,6 +439,24 @@ private:
|
||||
bool parallelizeOutputAfterReading(ContextPtr context) const override;
|
||||
};
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> createS3ReadBuffer(
|
||||
const String & key,
|
||||
size_t object_size,
|
||||
std::shared_ptr<const Context> context,
|
||||
std::shared_ptr<const S3::Client> client_ptr,
|
||||
const String & bucket,
|
||||
const String & version_id,
|
||||
const S3Settings::RequestSettings & request_settings);
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> createAsyncS3ReadBuffer(
|
||||
const String & key,
|
||||
const ReadSettings & read_settings,
|
||||
size_t object_size,
|
||||
std::shared_ptr<const Context> context,
|
||||
std::shared_ptr<const S3::Client> client_ptr,
|
||||
const String & bucket,
|
||||
const String & version_id,
|
||||
const S3Settings::RequestSettings & request_settings);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -91,7 +91,14 @@ void StorageS3Cluster::updateConfigurationIfChanged(ContextPtr local_context)
|
||||
RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const
|
||||
{
|
||||
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
|
||||
*s3_configuration.client, s3_configuration.url, predicate, getVirtualsList(), context, nullptr, s3_configuration.request_settings, context->getFileProgressCallback());
|
||||
*s3_configuration.client,
|
||||
s3_configuration.url,
|
||||
predicate,
|
||||
getVirtualsList(),
|
||||
context,
|
||||
nullptr,
|
||||
s3_configuration.request_settings,
|
||||
context->getFileProgressCallback());
|
||||
|
||||
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String
|
||||
{
|
||||
|
@ -216,7 +216,19 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
|
||||
configuration.auth_settings.no_sign_request = no_sign_request;
|
||||
|
||||
if (configuration.format == "auto")
|
||||
configuration.format = FormatFactory::instance().tryGetFormatFromFileName(Poco::URI(url).getPath()).value_or("auto");
|
||||
{
|
||||
if (configuration.url.archive_pattern.has_value())
|
||||
{
|
||||
configuration.format = FormatFactory::instance()
|
||||
.tryGetFormatFromFileName(Poco::URI(configuration.url.archive_pattern.value()).getPath())
|
||||
.value_or("auto");
|
||||
}
|
||||
else
|
||||
{
|
||||
configuration.format
|
||||
= FormatFactory::instance().tryGetFormatFromFileName(Poco::URI(configuration.url.uri_str).getPath()).value_or("auto");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
configuration.keys = {configuration.url.key};
|
||||
|
@ -0,0 +1,52 @@
|
||||
1 Str1 example1.csv test/03036_archive1.zip::example1.csv
|
||||
2 Str2 example1.csv test/03036_archive1.zip::example1.csv
|
||||
3 Str3 example2.csv test/03036_archive2.zip::example2.csv
|
||||
4 Str4 example2.csv test/03036_archive2.zip::example2.csv
|
||||
5 Str5 example3.csv test/03036_archive2.zip::example3.csv
|
||||
6 Str6 example3.csv test/03036_archive2.zip::example3.csv
|
||||
3 Str3 example2.csv test/03036_archive1.zip::example2.csv
|
||||
3 Str3 example2.csv test/03036_archive2.zip::example2.csv
|
||||
4 Str4 example2.csv test/03036_archive1.zip::example2.csv
|
||||
4 Str4 example2.csv test/03036_archive2.zip::example2.csv
|
||||
1 Str1 example1.csv test/03036_archive1.zip::example1.csv
|
||||
2 Str2 example1.csv test/03036_archive1.zip::example1.csv
|
||||
3 Str3 example2.csv test/03036_archive1.zip::example2.csv
|
||||
3 Str3 example2.csv test/03036_archive2.zip::example2.csv
|
||||
4 Str4 example2.csv test/03036_archive1.zip::example2.csv
|
||||
4 Str4 example2.csv test/03036_archive2.zip::example2.csv
|
||||
5 Str5 example3.csv test/03036_archive2.zip::example3.csv
|
||||
6 Str6 example3.csv test/03036_archive2.zip::example3.csv
|
||||
1 Str1 example1.csv test/03036_archive1.tar::example1.csv
|
||||
2 Str2 example1.csv test/03036_archive1.tar::example1.csv
|
||||
7 Str7 example4.csv test/03036_archive1.tar::example4.csv
|
||||
7 Str7 example4.csv test/03036_archive2.tar::example4.csv
|
||||
8 Str8 example4.csv test/03036_archive1.tar::example4.csv
|
||||
8 Str8 example4.csv test/03036_archive2.tar::example4.csv
|
||||
5 Str5 example3.csv test/03036_archive2.tar::example3.csv
|
||||
6 Str6 example3.csv test/03036_archive2.tar::example3.csv
|
||||
7 Str7 example4.csv test/03036_archive2.tar::example4.csv
|
||||
8 Str8 example4.csv test/03036_archive2.tar::example4.csv
|
||||
9 Str9 example5.csv test/03036_archive2.tar::example5.csv
|
||||
10 Str10 example5.csv test/03036_archive2.tar::example5.csv
|
||||
3 Str3 example2.csv test/03036_archive3.tar.gz::example2.csv
|
||||
4 Str4 example2.csv test/03036_archive3.tar.gz::example2.csv
|
||||
11 Str11 example6.csv test/03036_archive3.tar.gz::example6.csv
|
||||
12 Str12 example6.csv test/03036_archive3.tar.gz::example6.csv
|
||||
3 Str3 example2.csv test/03036_archive3.tar.gz::example2.csv
|
||||
4 Str4 example2.csv test/03036_archive3.tar.gz::example2.csv
|
||||
5 Str5 example3.csv test/03036_archive2.tar::example3.csv
|
||||
6 Str6 example3.csv test/03036_archive2.tar::example3.csv
|
||||
3 Str3 example2.csv test/03036_archive2.zip::example2.csv
|
||||
4 Str4 example2.csv test/03036_archive2.zip::example2.csv
|
||||
5 Str5 example3.csv test/03036_archive2.tar::example3.csv
|
||||
6 Str6 example3.csv test/03036_archive2.tar::example3.csv
|
||||
7 Str7 example4.csv test/03036_archive2.tar::example4.csv
|
||||
8 Str8 example4.csv test/03036_archive2.tar::example4.csv
|
||||
9 Str9 example5.csv test/03036_archive2.tar::example5.csv
|
||||
10 Str10 example5.csv test/03036_archive2.tar::example5.csv
|
||||
3 Str3 example2.csv test/03036_archive3.tar.gz::example2.csv
|
||||
4 Str4 example2.csv test/03036_archive3.tar.gz::example2.csv
|
||||
5 Str5 example3.csv test/03036_archive2.tar::example3.csv
|
||||
6 Str6 example3.csv test/03036_archive2.tar::example3.csv
|
||||
13 Str13 example7.csv test/03036_compressed_file_archive.zip::example7.csv
|
||||
14 Str14 example7.csv test/03036_compressed_file_archive.zip::example7.csv
|
22
tests/queries/0_stateless/03036_reading_s3_archives.sql
Normal file
22
tests/queries/0_stateless/03036_reading_s3_archives.sql
Normal file
@ -0,0 +1,22 @@
|
||||
-- Tags: no-fasttest
|
||||
-- Tag no-fasttest: Depends on AWS
|
||||
|
||||
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive1.zip :: example1.csv') ORDER BY (id, _file, _path);
|
||||
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive2.zip :: example*.csv') ORDER BY (id, _file, _path);
|
||||
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive*.zip :: example2.csv') ORDER BY (id, _file, _path);
|
||||
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive*.zip :: example*') ORDER BY (id, _file, _path);
|
||||
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive1.tar :: example1.csv') ORDER BY (id, _file, _path);
|
||||
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive*.tar :: example4.csv') ORDER BY (id, _file, _path);
|
||||
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive2.tar :: example*.csv') ORDER BY (id, _file, _path);
|
||||
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive*.tar.gz :: example*.csv') ORDER BY (id, _file, _path);
|
||||
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive*.tar* :: example{2..3}.csv') ORDER BY (id, _file, _path);
|
||||
select id, data, _file, _path from s3(s3_conn, filename='03036_archive2.zip :: nonexistent.csv'); -- { serverError CANNOT_EXTRACT_TABLE_STRUCTURE }
|
||||
select id, data, _file, _path from s3(s3_conn, filename='03036_archive2.zip :: nonexistent{2..3}.csv'); -- { serverError CANNOT_EXTRACT_TABLE_STRUCTURE }
|
||||
CREATE TABLE table_zip22 Engine S3(s3_conn, filename='03036_archive2.zip :: example2.csv');
|
||||
select id, data, _file, _path from table_zip22 ORDER BY (id, _file, _path);
|
||||
CREATE table table_tar2star Engine S3(s3_conn, filename='03036_archive2.tar :: example*.csv');
|
||||
SELECT id, data, _file, _path FROM table_tar2star ORDER BY (id, _file, _path);
|
||||
CREATE table table_tarstarglobs Engine S3(s3_conn, filename='03036_archive*.tar* :: example{2..3}.csv');
|
||||
SELECT id, data, _file, _path FROM table_tarstarglobs ORDER BY (id, _file, _path);
|
||||
CREATE table table_noexist Engine s3(s3_conn, filename='03036_archive2.zip :: nonexistent.csv'); -- { serverError INCORRECT_QUERY }
|
||||
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_compressed_file_archive.zip :: example7.csv', format='CSV', structure='auto', compression_method='gz') ORDER BY (id, _file, _path)
|
@ -0,0 +1,11 @@
|
||||
1 Str1
|
||||
2 Str2
|
||||
3 Str3
|
||||
4 Str4
|
||||
DEFAULT 03036_archive1.zip::example1.csv id Nullable(Int64), data Nullable(String)
|
||||
21 Str21
|
||||
22 Str22
|
||||
23 Str23
|
||||
24 Str24
|
||||
UNION 03036_json_archive.zip::example11.jsonl id Nullable(Int64), data Nullable(String)
|
||||
UNION 03036_json_archive.zip::example12.jsonl id Nullable(Int64), data Nullable(String)
|
@ -0,0 +1,9 @@
|
||||
-- Tags: no-fasttest
|
||||
-- Tag no-fasttest: Depends on AWS
|
||||
|
||||
SELECT * FROM s3(s3_conn, filename='03036_archive1.zip :: example{1,2}.csv') ORDER BY tuple(*);
|
||||
SELECT schema_inference_mode, splitByChar('/', source)[-1] as file, schema FROM system.schema_inference_cache WHERE file = '03036_archive1.zip::example1.csv' ORDER BY file;
|
||||
|
||||
SET schema_inference_mode = 'union';
|
||||
SELECT * FROM s3(s3_conn, filename='03036_json_archive.zip :: example{11,12}.jsonl') ORDER BY tuple(*);
|
||||
SELECT schema_inference_mode, splitByChar('/', source)[-1] as file, schema FROM system.schema_inference_cache WHERE startsWith(file, '03036_json_archive.zip') ORDER BY file;
|
BIN
tests/queries/0_stateless/data_minio/03036_archive1.tar
Normal file
BIN
tests/queries/0_stateless/data_minio/03036_archive1.tar
Normal file
Binary file not shown.
BIN
tests/queries/0_stateless/data_minio/03036_archive1.zip
Normal file
BIN
tests/queries/0_stateless/data_minio/03036_archive1.zip
Normal file
Binary file not shown.
BIN
tests/queries/0_stateless/data_minio/03036_archive2.tar
Normal file
BIN
tests/queries/0_stateless/data_minio/03036_archive2.tar
Normal file
Binary file not shown.
BIN
tests/queries/0_stateless/data_minio/03036_archive2.zip
Normal file
BIN
tests/queries/0_stateless/data_minio/03036_archive2.zip
Normal file
Binary file not shown.
BIN
tests/queries/0_stateless/data_minio/03036_archive3.tar.gz
Normal file
BIN
tests/queries/0_stateless/data_minio/03036_archive3.tar.gz
Normal file
Binary file not shown.
Binary file not shown.
BIN
tests/queries/0_stateless/data_minio/03036_json_archive.zip
Normal file
BIN
tests/queries/0_stateless/data_minio/03036_json_archive.zip
Normal file
Binary file not shown.
Loading…
Reference in New Issue
Block a user