Fix build

This commit is contained in:
Daniil Rubin 2022-11-03 17:28:41 +00:00
parent 97dd75194a
commit a9f8948c8d
2 changed files with 49 additions and 42 deletions

View File

@ -12,6 +12,9 @@
# include <Storages/StorageFactory.h>
# include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Formats/FormatFactory.h>
# include <aws/core/auth/AWSCredentials.h>
# include <aws/s3/S3Client.h>
@ -168,19 +171,18 @@ void JsonMetadataGetter::handleJSON(const JSON & json)
}
StorageDelta::StorageDelta(
const S3::URI & uri_,
const String & access_key_,
const String & secret_access_key_,
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
const String & format_name_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_)
ContextPtr context_,
std::optional<FormatSettings> format_settings_)
: IStorage(table_id_)
, base_configuration({uri_, access_key_, secret_access_key_, {}, {}, {}})
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
, log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")"))
, table_path(uri_.key)
, table_path(base_configuration.uri.key)
, format_name(configuration_.format)
{
StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, base_configuration);
@ -193,32 +195,38 @@ StorageDelta::StorageDelta(
LOG_DEBUG(log, "New uri: {}", new_uri);
LOG_DEBUG(log, "Table path: {}", table_path);
auto s3_uri = S3::URI(Poco::URI(new_uri));
// set new url in configuration
StorageS3Configuration new_configuration;
new_configuration.url = new_uri;
new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id;
new_configuration.auth_settings.secret_access_key= configuration_.auth_settings.secret_access_key;
new_configuration.format = configuration_.format;
if (columns_.empty())
{
columns_ = StorageS3::getTableStructureFromData(format_name_, s3_uri, access_key_, secret_access_key_, "", false, {}, context_);
columns_ = StorageS3::getTableStructureFromData(new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr);
storage_metadata.setColumns(columns_);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
s3engine = std::make_shared<StorageS3>(
s3_uri,
access_key_,
secret_access_key_,
table_id_,
format_name_,
base_configuration.rw_settings,
columns_,
constraints_,
comment,
context_,
std::nullopt);
new_configuration,
table_id_,
columns_,
constraints_,
comment,
context_,
format_settings_,
/* distributed_processing_ */false,
nullptr);
}
Pipe StorageDelta::read(
@ -253,29 +261,29 @@ void registerStorageDelta(StorageFactory & factory)
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage DeltaLake requires 3 to 4 arguments: table_url, access_key, secret_access_key, [format]");
String table_url = checkAndGetLiteralArgument<String>(engine_args[0], "url");
String access_key_id = checkAndGetLiteralArgument<String>(engine_args[1], "access_key_id");
String secret_access_key = checkAndGetLiteralArgument<String>(engine_args[2], "secret_access_key");
String format = "Parquet";
if (engine_args.size() == 4)
{
format = checkAndGetLiteralArgument<String>(engine_args[3], "format");
}
auto s3_uri = S3::URI(Poco::URI(table_url));
StorageS3Configuration configuration;
configuration.url = checkAndGetLiteralArgument<String>(engine_args[0], "url");
configuration.auth_settings.access_key_id = checkAndGetLiteralArgument<String>(engine_args[1], "access_key_id");
configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(engine_args[2], "secret_access_key");
if (engine_args.size() == 4)
configuration.format = checkAndGetLiteralArgument<String>(engine_args[3], "format");
if (configuration.format == "auto")
configuration.format = "Parquet";
//auto format_settings = getFormatSettings(args.getContext());
return std::make_shared<StorageDelta>(
s3_uri,
access_key_id,
secret_access_key,
configuration,
args.table_id,
format,
args.columns,
args.constraints,
args.comment,
args.getContext());
args.getContext(),
std::nullopt);
},
{
.supports_settings = true,

View File

@ -64,15 +64,13 @@ class StorageDelta : public IStorage
{
public:
StorageDelta(
const S3::URI & uri_,
const String & access_key_,
const String & secret_access_key_,
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
const String & format_name_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_);
ContextPtr context_,
std::optional<FormatSettings> format_settings_);
String getName() const override { return "DeltaLake"; }
@ -93,6 +91,7 @@ private:
std::shared_ptr<StorageS3> s3engine;
Poco::Logger * log;
String table_path;
String format_name;
};
}