Registering StorageS3 and TableFunctionS3 under two different names (S3 and COSN).

This commit is contained in:
Peng Jian 2020-07-13 22:13:30 +08:00
parent 958bb1f3aa
commit 7579d65ebc
9 changed files with 42 additions and 126 deletions

View File

@ -212,7 +212,11 @@ namespace S3
/// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#path-style-access
static const RE2 path_style_pattern("^/([^/]*)/(.*)");
static constexpr auto S3 = "S3";
static constexpr auto COSN = "COSN", COS = "COS";
uri = uri_;
storage_name = S3;
if (uri.getHost().empty())
throw Exception("Host is empty in S3 URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
@ -234,6 +238,15 @@ namespace S3
key = uri.getPath().substr(1);
if (key.empty() || key == "/")
throw Exception("Key name is empty in virtual hosted style S3 URI: " + key + " (" + uri.toString() + ")", ErrorCodes::BAD_ARGUMENTS);
boost::to_upper(name);
if (name != S3 || name != COS) {
throw Exception("Object storage system name is unrecognized in virtual hosted style S3 URI: " + name + " (" + uri.toString() + ")", ErrorCodes::BAD_ARGUMENTS);
}
if (name == S3) {
storage_name = name;
} else {
storage_name = COSN;
}
}
else if (re2::RE2::PartialMatch(uri.getPath(), path_style_pattern, &bucket, &key))
{

View File

@ -69,6 +69,7 @@ struct URI
String endpoint;
String bucket;
String key;
String storage_name;
bool is_virtual_hosted_style;

View File

@ -1,59 +0,0 @@
#pragma once
#include <Common/config.h>
#if USE_AWS_S3
#include <Storages/IStorage.h>
#include <Poco/URI.h>
#include <common/logger_useful.h>
#include <ext/shared_ptr_helper.h>
namespace Aws::S3
{
class S3Client;
}
namespace DB
{
/**
* This class represents table engine for external COS urls.
* It sends HTTP GET to server when select is called and
* HTTP PUT when insert is called.
*/
class StorageCOS final : public StorageS3
{
public:
StorageCOS(const S3::URI & uri,
const String & access_key_id,
const String & secret_access_key,
const StorageID & table_id_,
const String & format_name_,
UInt64 min_upload_part_size_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_,
const String & compression_method_)
: StorageS3(uri,
access_key_id,
secret_access_key,
table_id_,
format_name_,
min_upload_part_size_,
columns_,
constraints_,
context_,
compression_method_)
{
}
String getName() const override
{
return "COSN";
}
};
}
#endif

View File

@ -5,7 +5,6 @@
#include <IO/S3Common.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageCOS.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/Context.h>
@ -205,6 +204,7 @@ StorageS3::StorageS3(
, format_name(format_name_)
, min_upload_part_size(min_upload_part_size_)
, compression_method(compression_method_)
, name(uri_.storage_name)
{
context_global.getRemoteHostFilter().checkURL(uri_.uri);
StorageInMemoryMetadata storage_metadata;
@ -330,9 +330,9 @@ BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const StorageMet
client, uri.bucket, uri.key);
}
void registerStorageS3Impl(const String & name, bool is_s3, StorageFactory & factory)
void registerStorageS3Impl(const String & name, StorageFactory & factory)
{
factory.registerStorage(name, [is_s3](const StorageFactory::Arguments & args)
factory.registerStorage(name, [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
@ -365,10 +365,7 @@ void registerStorageS3Impl(const String & name, bool is_s3, StorageFactory & fac
else
compression_method = "auto";
if (is_s3)
return StorageS3::create(s3_uri, access_key_id, secret_access_key, args.table_id, format_name, min_upload_part_size, args.columns, args.constraints, args.context);
else
return StorageCOS::create(s3_uri, access_key_id, secret_access_key, args.table_id, format_name, min_upload_part_size, args.columns, args.constraints, args.context);
return StorageS3::create(s3_uri, access_key_id, secret_access_key, args.table_id, format_name, min_upload_part_size, args.columns, args.constraints, args.context);
},
{
.source_access_type = AccessType::S3,
@ -377,12 +374,12 @@ void registerStorageS3Impl(const String & name, bool is_s3, StorageFactory & fac
void registerStorageS3(StorageFactory & factory)
{
return registerStorageS3Impl("S3", true, factory);
return registerStorageS3Impl("S3", factory);
}
void registerStorageCOS(StorageFactory & factory)
{
return registerStorageS3Impl("COSN", false, factory);
return registerStorageS3Impl("COSN", factory);
}
NamesAndTypesList StorageS3::getVirtuals() const

View File

@ -38,7 +38,7 @@ public:
String getName() const override
{
return "S3";
return name;
}
Pipes read(
@ -62,6 +62,7 @@ private:
UInt64 min_upload_part_size;
String compression_method;
std::shared_ptr<Aws::S3::S3Client> client;
String name;
};
}

View File

@ -1,20 +0,0 @@
#include <Common/config.h>
#if USE_AWS_S3
#include <IO/S3Common.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionCOS.h>
#include "registerTableFunctions.h"
namespace DB
{
void registerTableFunctionCOS(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionCOS>();
}
}
#endif

View File

@ -1,31 +0,0 @@
#pragma once
#include <Common/config.h>
#if USE_AWS_S3
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionS3.h>
namespace DB
{
/* cosn(source, [access_key_id, secret_access_key,] format, structure) - creates a temporary storage for a file in COS
*/
class TableFunctionCOS : public TableFunctionS3
{
public:
static constexpr auto name = "cosn";
std::string getName() const override
{
return name;
}
private:
const char * getStorageTypeName() const override { return "COSN"; }
};
}
#endif

View File

@ -65,8 +65,18 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & ast_function, const Conte
ColumnsDescription columns = parseColumnsListFromString(structure, context);
static constexpr auto S3 = "S3", s3 = "s3", COSN = "COSN", cons = "cosn";
Poco::URI uri (filename);
S3::URI s3_uri (uri);
if (s3_uri.storage_name == S3) {
name = s3;
storage_type_name = S3;
} else {
name = cosn;
storage_type_name = COSN;
}
/// Create table
StoragePtr storage = getStorage(filename, access_key_id, secret_access_key, format, columns, const_cast<Context &>(context), table_name, compression_method);
StoragePtr storage = getStorage(s3_uri, access_key_id, secret_access_key, format, columns, const_cast<Context &>(context), table_name, compression_method);
storage->startup();
@ -74,7 +84,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & ast_function, const Conte
}
StoragePtr TableFunctionS3::getStorage(
const String & source,
const URI & uri,
const String & access_key_id,
const String & secret_access_key,
const String & format,
@ -83,8 +93,6 @@ StoragePtr TableFunctionS3::getStorage(
const std::string & table_name,
const String & compression_method)
{
Poco::URI uri (source);
S3::URI s3_uri (uri);
UInt64 min_upload_part_size = global_context.getSettingsRef().s3_min_upload_part_size;
return StorageS3::create(
@ -105,6 +113,11 @@ void registerTableFunctionS3(TableFunctionFactory & factory)
factory.registerFunction<TableFunctionS3>();
}
void registerTableFunctionCOS(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionS3>();
}
}
#endif

View File

@ -17,7 +17,6 @@ class Context;
class TableFunctionS3 : public ITableFunction
{
public:
static constexpr auto name = "s3";
std::string getName() const override
{
return name;
@ -39,7 +38,9 @@ private:
const std::string & table_name,
const String & compression_method);
const char * getStorageTypeName() const override { return "S3"; }
const char * getStorageTypeName() const override { return storage_type_name; }
String name;
String storage_type_name;
};
}