Fix a few more tests

This commit is contained in:
kssenii 2024-04-19 13:43:43 +01:00
parent c8915a16a5
commit e2e6071063
13 changed files with 204 additions and 69 deletions

View File

@ -31,8 +31,18 @@ void HDFSObjectStorage::startup()
{
}
void HDFSObjectStorage::initializeHDFS() const
{
if (hdfs_fs)
return;
hdfs_builder = createHDFSBuilder(url, config);
hdfs_fs = createHDFSFS(hdfs_builder.get());
}
ObjectStorageKey HDFSObjectStorage::generateObjectKeyForPath(const std::string & /* path */) const
{
initializeHDFS();
/// what ever data_source_description.description value is, consider that key as relative key
chassert(data_directory.starts_with("/"));
return ObjectStorageKey::createAsRelative(
@ -41,6 +51,7 @@ ObjectStorageKey HDFSObjectStorage::generateObjectKeyForPath(const std::string &
bool HDFSObjectStorage::exists(const StoredObject & object) const
{
initializeHDFS();
std::string path = object.remote_path;
if (path.starts_with(url_without_path))
path = path.substr(url_without_path.size());
@ -57,6 +68,7 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObject( /// NOLIN
std::optional<size_t>,
std::optional<size_t>) const
{
initializeHDFS();
std::string path = object.remote_path;
if (path.starts_with(url))
path = path.substr(url.size());
@ -73,6 +85,7 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLI
std::optional<size_t>,
std::optional<size_t>) const
{
initializeHDFS();
auto disk_read_settings = patchSettings(read_settings);
auto read_buffer_creator =
[this, disk_read_settings]
@ -102,6 +115,7 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
size_t buf_size,
const WriteSettings & write_settings)
{
initializeHDFS();
if (attributes.has_value())
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
@ -123,6 +137,7 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void HDFSObjectStorage::removeObject(const StoredObject & object)
{
initializeHDFS();
auto path = object.remote_path;
if (path.starts_with(url_without_path))
path = path.substr(url_without_path.size());
@ -136,24 +151,28 @@ void HDFSObjectStorage::removeObject(const StoredObject & object)
void HDFSObjectStorage::removeObjects(const StoredObjects & objects)
{
initializeHDFS();
for (const auto & object : objects)
removeObject(object);
}
void HDFSObjectStorage::removeObjectIfExists(const StoredObject & object)
{
initializeHDFS();
if (exists(object))
removeObject(object);
}
void HDFSObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
{
initializeHDFS();
for (const auto & object : objects)
removeObjectIfExists(object);
}
ObjectMetadata HDFSObjectStorage::getObjectMetadata(const std::string & path) const
{
initializeHDFS();
auto * file_info = hdfsGetPathInfo(hdfs_fs.get(), path.data());
if (!file_info)
throw Exception(ErrorCodes::HDFS_ERROR,
@ -169,6 +188,7 @@ ObjectMetadata HDFSObjectStorage::getObjectMetadata(const std::string & path) co
void HDFSObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const
{
initializeHDFS();
auto * log = &Poco::Logger::get("HDFSObjectStorage");
LOG_TRACE(log, "Trying to list files for {}", path);
@ -222,6 +242,7 @@ void HDFSObjectStorage::copyObject( /// NOLINT
const WriteSettings & write_settings,
std::optional<ObjectAttributes> object_to_attributes)
{
initializeHDFS();
if (object_to_attributes.has_value())
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,

View File

@ -37,8 +37,6 @@ public:
SettingsPtr settings_,
const Poco::Util::AbstractConfiguration & config_)
: config(config_)
, hdfs_builder(createHDFSBuilder(hdfs_root_path_, config))
, hdfs_fs(createHDFSFS(hdfs_builder.get()))
, settings(std::move(settings_))
{
const size_t begin_of_path = hdfs_root_path_.find('/', hdfs_root_path_.find("//") + 2);
@ -117,10 +115,12 @@ public:
bool isRemote() const override { return true; }
private:
void initializeHDFS() const;
const Poco::Util::AbstractConfiguration & config;
HDFSBuilderWrapper hdfs_builder;
HDFSFSPtr hdfs_fs;
mutable HDFSBuilderWrapper hdfs_builder;
mutable HDFSFSPtr hdfs_fs;
SettingsPtr settings;
std::string url;
std::string url_without_path;

View File

@ -157,7 +157,8 @@ std::unique_ptr<S3::Client> getClient(
auth_settings.server_side_encryption_customer_key_base64,
std::move(sse_kms_config),
auth_settings.headers,
credentials_configuration);
credentials_configuration,
auth_settings.session_token);
}
}

View File

@ -381,7 +381,7 @@ void StorageAzureBlobConfiguration::fromAST(ASTs & engine_args, ContextPtr conte
}
void StorageAzureBlobConfiguration::addStructureAndFormatToArgs(
ASTs & args, const String & structure_, const String & /* format */, ContextPtr context)
ASTs & args, const String & structure_, const String & format_, ContextPtr context)
{
if (tryGetNamedCollectionWithOverrides(args, context))
{
@ -397,66 +397,129 @@ void StorageAzureBlobConfiguration::addStructureAndFormatToArgs(
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage Azure requires 3 to 7 arguments: "
"StorageObjectStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])");
"StorageObjectStorage(connection_string|storage_account_url, container_name, "
"blobpath, [account_name, account_key, format, compression, structure])");
}
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
auto structure_literal = std::make_shared<ASTLiteral>(structure_);
auto format_literal = std::make_shared<ASTLiteral>(format_);
auto is_format_arg
= [](const std::string & s) -> bool { return s == "auto" || FormatFactory::instance().getAllFormats().contains(s); };
/// (connection_string, container_name, blobpath)
if (args.size() == 3)
{
/// Add format=auto & compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(format_literal);
/// Add compression = "auto" before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
/// (connection_string, container_name, blobpath, structure) or
/// (connection_string, container_name, blobpath, format)
/// We can distinguish them by looking at the 4-th argument: check if it's format name or not.
else if (args.size() == 4)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/account_name/structure");
/// (..., format) -> (..., format, compression, structure)
if (is_format_arg(fourth_arg))
{
if (fourth_arg == "auto")
args[3] = format_literal;
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
/// (..., structure) -> (..., format, compression, structure)
else
{
args.back() = structure_literal;
auto structure_arg = args.back();
args[3] = format_literal;
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
if (fourth_arg == "auto")
args.push_back(structure_literal);
else
args.push_back(structure_arg);
}
}
/// (connection_string, container_name, blobpath, format, compression) or
/// (storage_account_url, container_name, blobpath, account_name, account_key)
/// We can distinguish them by looking at the 4-th argument: check if it's format name or not.
else if (args.size() == 5)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/account_name");
if (!is_format_arg(fourth_arg))
/// (..., format, compression) -> (..., format, compression, structure)
if (is_format_arg(fourth_arg))
{
/// Add format=auto & compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(std::make_shared<ASTLiteral>("auto"));
}
if (fourth_arg == "auto")
args[3] = format_literal;
args.push_back(structure_literal);
}
else if (args.size() == 6)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/account_name");
if (!is_format_arg(fourth_arg))
/// (..., account_name, account_key) -> (..., account_name, account_key, format, compression, structure)
else
{
args.push_back(format_literal);
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
else
{
args.back() = structure_literal;
}
}
else if (args.size() == 7)
/// (connection_string, container_name, blobpath, format, compression, structure) or
/// (storage_account_url, container_name, blobpath, account_name, account_key, structure) or
/// (storage_account_url, container_name, blobpath, account_name, account_key, format)
else if (args.size() == 6)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/account_name");
auto sixth_arg = checkAndGetLiteralArgument<String>(args[5], "format/structure");
/// (..., format, compression, structure)
if (is_format_arg(fourth_arg))
{
if (fourth_arg == "auto")
args[3] = format_literal;
if (checkAndGetLiteralArgument<String>(args[5], "structure") == "auto")
args[5] = structure_literal;
}
/// (..., account_name, account_key, format) -> (..., account_name, account_key, format, compression, structure)
else if (is_format_arg(sixth_arg))
{
if (sixth_arg == "auto")
args[5] = format_literal;
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
/// (..., account_name, account_key, structure) -> (..., account_name, account_key, format, compression, structure)
else
{
auto structure_arg = args.back();
args[5] = format_literal;
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
if (sixth_arg == "auto")
args.push_back(structure_literal);
else
args.push_back(structure_arg);
}
}
/// (storage_account_url, container_name, blobpath, account_name, account_key, format, compression)
else if (args.size() == 7)
{
/// (..., format, compression) -> (..., format, compression, structure)
if (checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
args[5] = format_literal;
args.push_back(structure_literal);
}
/// (storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure)
else if (args.size() == 8)
{
args.back() = structure_literal;
if (checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
args[5] = format_literal;
if (checkAndGetLiteralArgument<String>(args[7], "structure") == "auto")
args[7] = structure_literal;
}
}
}

View File

@ -73,9 +73,11 @@ void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool wit
std::string url_str;
url_str = checkAndGetLiteralArgument<String>(args[0], "url");
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
if (args.size() > 1)
{
args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(args[1], context);
format = checkAndGetLiteralArgument<String>(args[1], "format_name");
}
@ -83,18 +85,15 @@ void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool wit
{
if (args.size() > 2)
{
args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(args[2], context);
structure = checkAndGetLiteralArgument<String>(args[2], "structure");
}
if (args.size() > 3)
{
args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(args[3], context);
compression_method = checkAndGetLiteralArgument<String>(args[3], "compression_method");
}
}
else if (args.size() > 2)
{
args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(args[2], context);
compression_method = checkAndGetLiteralArgument<String>(args[2], "compression_method");
}
@ -165,6 +164,9 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgs(
auto format_literal = std::make_shared<ASTLiteral>(format_);
auto structure_literal = std::make_shared<ASTLiteral>(structure_);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
/// hdfs(url)
if (count == 1)
{

View File

@ -16,6 +16,7 @@
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorage/ReadFromStorageObjectStorage.h>
#include <Storages/ObjectStorage/ReadBufferIterator.h>
#include <Storages/ObjectStorage/Utils.h>
#include <Storages/Cache/SchemaCache.h>
@ -193,6 +194,7 @@ SinkToStoragePtr StorageObjectStorage<StorageSettings>::write(
{
updateConfiguration(local_context);
const auto sample_block = metadata_snapshot->getSampleBlock();
const auto & query_settings = StorageSettings::create(local_context->getSettingsRef());
if (configuration->withWildcard())
{
@ -209,7 +211,8 @@ SinkToStoragePtr StorageObjectStorage<StorageSettings>::write(
{
LOG_TEST(log, "Using PartitionedSink for {}", configuration->getPath());
return std::make_shared<PartitionedStorageObjectStorageSink>(
object_storage, configuration, format_settings, sample_block, local_context, partition_by_ast);
object_storage, configuration, query_settings,
format_settings, sample_block, local_context, partition_by_ast);
}
}
@ -220,46 +223,19 @@ SinkToStoragePtr StorageObjectStorage<StorageSettings>::write(
getName(), configuration->getPath());
}
const auto storage_settings = StorageSettings::create(local_context->getSettingsRef());
auto configuration_copy = configuration->clone();
if (!storage_settings.truncate_on_insert
&& object_storage->exists(StoredObject(configuration->getPath())))
auto & paths = configuration->getPaths();
if (auto new_key = checkAndGetNewFileOnInsertIfNeeded(
*object_storage, *configuration, query_settings, paths.front(), paths.size()))
{
if (storage_settings.create_new_file_on_insert)
{
auto & paths = configuration_copy->getPaths();
size_t index = paths.size();
const auto & first_key = paths[0];
auto pos = first_key.find_first_of('.');
String new_key;
do
{
new_key = first_key.substr(0, pos)
+ "."
+ std::to_string(index)
+ (pos == std::string::npos ? "" : first_key.substr(pos));
++index;
}
while (object_storage->exists(StoredObject(new_key)));
paths.push_back(new_key);
configuration->getPaths().push_back(new_key);
}
else
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Object in bucket {} with key {} already exists. "
"If you want to overwrite it, enable setting [engine_name]_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting [engine_name]_create_new_file_on_insert",
configuration_copy->getNamespace(), configuration_copy->getPaths().back());
}
paths.push_back(*new_key);
}
return std::make_shared<StorageObjectStorageSink>(
object_storage, configuration_copy, format_settings, sample_block, local_context);
object_storage,
configuration->clone(),
format_settings,
sample_block,
local_context);
}
template <typename StorageSettings>

View File

@ -84,7 +84,7 @@ struct HDFSStorageSettings
.create_new_file_on_insert = settings.hdfs_create_new_file_on_insert,
.schema_inference_use_cache = settings.schema_inference_use_cache_for_hdfs,
.schema_inference_mode = settings.schema_inference_mode,
.skip_empty_files = settings.s3_skip_empty_files, /// TODO: add setting for hdfs
.skip_empty_files = settings.hdfs_skip_empty_files, /// TODO: add setting for hdfs
.list_object_keys_size = settings.s3_list_object_keys_size, /// TODO: add a setting for hdfs
.throw_on_zero_files_match = settings.s3_throw_on_zero_files_match,
.ignore_non_existent_file = settings.hdfs_ignore_file_doesnt_exist,

View File

@ -2,6 +2,7 @@
#include <Formats/FormatFactory.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Common/isValidUTF8.h>
#include <Storages/ObjectStorage/Utils.h>
namespace DB
{
@ -102,6 +103,7 @@ void StorageObjectStorageSink::release()
PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink(
ObjectStoragePtr object_storage_,
StorageObjectStorageConfigurationPtr configuration_,
const StorageObjectStorageSettings & query_settings_,
std::optional<FormatSettings> format_settings_,
const Block & sample_block_,
ContextPtr context_,
@ -109,6 +111,7 @@ PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink(
: PartitionedSink(partition_by, context_, sample_block_)
, object_storage(object_storage_)
, configuration(configuration_)
, query_settings(query_settings_)
, format_settings(format_settings_)
, sample_block(sample_block_)
, context(context_)
@ -123,6 +126,12 @@ SinkPtr PartitionedStorageObjectStorageSink::createSinkForPartition(const String
auto partition_key = replaceWildcards(configuration->getPath(), partition_id);
validateKey(partition_key);
if (auto new_key = checkAndGetNewFileOnInsertIfNeeded(
*object_storage, *configuration, query_settings, partition_key, /* sequence_number */1))
{
partition_key = *new_key;
}
return std::make_shared<StorageObjectStorageSink>(
object_storage,
configuration,

View File

@ -1,6 +1,7 @@
#pragma once
#include <Storages/PartitionedSink.h>
#include <Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
#include <Storages/ObjectStorage/StorageObjectStorageQuerySettings.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Disks/ObjectStorages/IObjectStorage_fwd.h>
@ -46,6 +47,7 @@ public:
PartitionedStorageObjectStorageSink(
ObjectStoragePtr object_storage_,
StorageObjectStorageConfigurationPtr configuration_,
const StorageObjectStorageSettings & query_settings_,
std::optional<FormatSettings> format_settings_,
const Block & sample_block_,
ContextPtr context_,
@ -59,6 +61,7 @@ private:
ObjectStoragePtr object_storage;
StorageObjectStorageConfigurationPtr configuration;
const StorageObjectStorageSettings query_settings;
const std::optional<FormatSettings> format_settings;
const Block sample_block;
const ContextPtr context;

View File

@ -0,0 +1,43 @@
#include <Storages/ObjectStorage/Utils.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
#include <Storages/ObjectStorage/StorageObjectStorageQuerySettings.h>
namespace DB
{
std::optional<String> checkAndGetNewFileOnInsertIfNeeded(
const IObjectStorage & object_storage,
const StorageObjectStorageConfiguration & configuration,
const StorageObjectStorageSettings & query_settings,
const String & key,
size_t sequence_number)
{
if (query_settings.truncate_on_insert
|| !object_storage.exists(StoredObject(key)))
return std::nullopt;
if (query_settings.create_new_file_on_insert)
{
auto pos = key.find_first_of('.');
String new_key;
do
{
new_key = key.substr(0, pos) + "." + std::to_string(sequence_number) + (pos == std::string::npos ? "" : key.substr(pos));
++sequence_number;
}
while (object_storage.exists(StoredObject(new_key)));
return new_key;
}
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Object in bucket {} with key {} already exists. "
"If you want to overwrite it, enable setting s3_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting s3_create_new_file_on_insert",
configuration.getNamespace(), key);
}
}

View File

@ -0,0 +1,17 @@
#include <Core/Types.h>
namespace DB
{
class IObjectStorage;
class StorageObjectStorageConfiguration;
struct StorageObjectStorageSettings;
std::optional<std::string> checkAndGetNewFileOnInsertIfNeeded(
const IObjectStorage & object_storage,
const StorageObjectStorageConfiguration & configuration,
const StorageObjectStorageSettings & query_settings,
const std::string & key,
size_t sequence_number);
}

View File

@ -980,7 +980,7 @@ def test_read_subcolumns(started_cluster):
assert (
res
== "2\thdfs://hdfs1:9000/test_subcolumns.tsv\t(1,2)\ttest_subcolumns.tsv\t3\n"
== "2\t/test_subcolumns.tsv\t(1,2)\ttest_subcolumns.tsv\t3\n"
)
res = node.query(
@ -989,7 +989,7 @@ def test_read_subcolumns(started_cluster):
assert (
res
== "2\thdfs://hdfs1:9000/test_subcolumns.jsonl\t(1,2)\ttest_subcolumns.jsonl\t3\n"
== "2\t/test_subcolumns.jsonl\t(1,2)\ttest_subcolumns.jsonl\t3\n"
)
res = node.query(
@ -998,7 +998,7 @@ def test_read_subcolumns(started_cluster):
assert (
res
== "0\thdfs://hdfs1:9000/test_subcolumns.jsonl\t(0,0)\ttest_subcolumns.jsonl\t0\n"
== "0\t/test_subcolumns.jsonl\t(0,0)\ttest_subcolumns.jsonl\t0\n"
)
res = node.query(
@ -1007,7 +1007,7 @@ def test_read_subcolumns(started_cluster):
assert (
res
== "42\thdfs://hdfs1:9000/test_subcolumns.jsonl\t(42,42)\ttest_subcolumns.jsonl\t42\n"
== "42\t/test_subcolumns.jsonl\t(42,42)\ttest_subcolumns.jsonl\t42\n"
)

View File

@ -130,7 +130,7 @@ def test_prohibited(started_cluster):
assert False, "Exception have to be thrown"
except Exception as ex:
assert (
"Unable to open HDFS file: /storage_user_two_prohibited error: Permission denied: user=specuser, access=WRITE"
"Unable to open HDFS file: /storage_user_two_prohibited (hdfs://suser@kerberizedhdfs1:9010/storage_user_two_prohibited) error: Permission denied: user=specuser, access=WRITE"
in str(ex)
)