Fix some tests

This commit is contained in:
kssenii 2024-03-25 19:19:54 +01:00
parent e019b3a391
commit f5982fdb1f
18 changed files with 104 additions and 44 deletions

View File

@ -16,21 +16,13 @@ namespace DB
struct HDFSObjectStorageSettings
{
HDFSObjectStorageSettings() = default;
size_t min_bytes_for_seek;
int objects_chunk_size_to_delete;
int replication;
HDFSObjectStorageSettings(
int min_bytes_for_seek_,
int objects_chunk_size_to_delete_,
int replication_)
HDFSObjectStorageSettings(int min_bytes_for_seek_, int replication_)
: min_bytes_for_seek(min_bytes_for_seek_)
, objects_chunk_size_to_delete(objects_chunk_size_to_delete_)
, replication(replication_)
{}
size_t min_bytes_for_seek;
int replication;
};

View File

@ -227,9 +227,8 @@ void registerHDFSObjectStorage(ObjectStorageFactory & factory)
if (uri.back() != '/')
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS path must ends with '/', but '{}' doesn't.", uri);
std::unique_ptr<HDFSObjectStorageSettings> settings = std::make_unique<HDFSObjectStorageSettings>(
auto settings = std::make_unique<HDFSObjectStorageSettings>(
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000),
context->getSettingsRef().hdfs_replication
);

View File

@ -143,8 +143,6 @@ std::unique_ptr<S3::Client> getClient(
auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)),
};
LOG_TEST(&Poco::Logger::get("kssenii"), "KSSENII: {} - {}", auth_settings.access_key_id, auth_settings.secret_access_key);
return S3::ClientFactory::instance().create(
client_configuration,
client_settings,

View File

@ -6,6 +6,7 @@
#include <Storages/checkAndGetLiteralArgument.h>
#include <Parsers/IAST.h>
#include <Disks/ObjectStorages/HDFS/HDFSObjectStorage.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Formats/FormatFactory.h>
namespace DB
@ -13,6 +14,7 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
}
StorageHDFSConfiguration::StorageHDFSConfiguration(const StorageHDFSConfiguration & other)
@ -29,37 +31,53 @@ void StorageHDFSConfiguration::check(ContextPtr context) const
checkHDFSURL(fs::path(url) / path);
}
ObjectStoragePtr StorageHDFSConfiguration::createObjectStorage(ContextPtr context, bool /* is_readonly */) /// NOLINT
ObjectStoragePtr StorageHDFSConfiguration::createObjectStorage( /// NOLINT
ContextPtr context,
bool /* is_readonly */)
{
assertInitialized();
if (!url.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS url is empty");
auto settings = std::make_unique<HDFSObjectStorageSettings>();
return std::make_shared<HDFSObjectStorage>(url, std::move(settings), context->getConfigRef());
const auto & settings = context->getSettingsRef();
auto hdfs_settings = std::make_unique<HDFSObjectStorageSettings>(
settings.remote_read_min_bytes_for_seek,
settings.hdfs_replication
);
return std::make_shared<HDFSObjectStorage>(url, std::move(hdfs_settings), context->getConfigRef());
}
void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr, bool /* with_structure */)
void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool /* with_structure */)
{
url = checkAndGetLiteralArgument<String>(args[0], "url");
if (args.size() > 1)
{
args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(args[1], context);
format = checkAndGetLiteralArgument<String>(args[1], "format_name");
else
format = "auto";
}
if (args.size() == 3)
{
args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(args[2], context);
compression_method = checkAndGetLiteralArgument<String>(args[2], "compression_method");
else
compression_method = "auto";
}
const size_t begin_of_path = url.find('/', url.find("//") + 2);
path = url.substr(begin_of_path + 1);
url = url.substr(0, begin_of_path);
auto pos = url.find("//");
if (pos == std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid url: {}", url);
pos = url.find('/', pos + 2);
if (pos == std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid url: {}", url);
path = url.substr(pos + 1);
url = url.substr(0, pos);
paths = {path};
}
void StorageHDFSConfiguration::fromNamedCollection(const NamedCollection &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method fromNamedColection() is not implemented");
}
}
#endif

View File

@ -29,12 +29,12 @@ public:
ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly = true) override; /// NOLINT
StorageObjectStorageConfigurationPtr clone() override { return std::make_shared<StorageHDFSConfiguration>(*this); }
void fromNamedCollection(const NamedCollection &) override {}
void fromAST(ASTs & args, ContextPtr, bool /* with_structure */) override;
static void addStructureToArgs(ASTs &, const String &, ContextPtr) {}
private:
void fromNamedCollection(const NamedCollection &) override;
void fromAST(ASTs & args, ContextPtr, bool /* with_structure */) override;
String url;
String path;
std::vector<String> paths;

View File

@ -31,7 +31,7 @@ ReadBufferIterator::ReadBufferIterator(
, query_settings(query_settings_)
, schema_cache(schema_cache_)
, read_keys(read_keys_)
, format(configuration->format.empty() || configuration->format == "auto" ? std::nullopt : std::optional<String>(configuration->format))
, format(configuration->format == "auto" ? std::nullopt : std::optional<String>(configuration->format))
, prev_read_keys_size(read_keys_.size())
{
}
@ -191,7 +191,7 @@ ReadBufferIterator::Data ReadBufferIterator::next()
{
if (first)
{
if (format)
if (format.has_value())
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"The table structure cannot be extracted from a {} format file, because there are no files with provided path "

View File

@ -51,10 +51,16 @@ String StorageS3Configuration::getDataSourceDescription()
void StorageS3Configuration::check(ContextPtr context) const
{
validateNamespace(url.bucket);
context->getGlobalContext()->getRemoteHostFilter().checkURL(url.uri);
context->getGlobalContext()->getHTTPHeaderFilter().checkHeaders(headers_from_ast);
}
void StorageS3Configuration::validateNamespace(const String & name) const
{
S3::URI::validateBucket(name, {});
}
StorageS3Configuration::StorageS3Configuration(const StorageS3Configuration & other)
: StorageObjectStorageConfiguration(other)
{

View File

@ -27,6 +27,8 @@ public:
String getDataSourceDescription() override;
void check(ContextPtr context) const override;
void validateNamespace(const String & name) const override;
StorageObjectStorageConfigurationPtr clone() override { return std::make_shared<StorageS3Configuration>(*this); }
bool isStaticConfiguration() const override { return static_configuration; }

View File

@ -87,6 +87,7 @@ StorageObjectStorage<StorageSettings>::StorageObjectStorage(
, format_settings(format_settings_)
, partition_by(partition_by_)
, distributed_processing(distributed_processing_)
, log(getLogger("Storage" + engine_name_))
, object_storage(object_storage_)
, configuration(configuration_)
{
@ -204,6 +205,7 @@ SinkToStoragePtr StorageObjectStorage<StorageSettings>::write(
if (partition_by_ast)
{
LOG_TEST(log, "Using PartitionedSink for {}", configuration->getPath());
return std::make_shared<PartitionedStorageObjectStorageSink>(
object_storage, configuration, format_settings, sample_block, local_context, partition_by_ast);
}

View File

@ -3,6 +3,7 @@
#include <Common/re2.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Common/logger_useful.h>
#include <Storages/IStorage.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Processors/Formats/IInputFormat.h>
@ -113,6 +114,7 @@ protected:
const ASTPtr partition_by;
const bool distributed_processing;
LoggerPtr log;
ObjectStoragePtr object_storage;
ConfigurationPtr configuration;
std::mutex configuration_update_mutex;

View File

@ -1,5 +1,6 @@
#include <Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
#include <Formats/FormatFactory.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -18,7 +19,10 @@ void StorageObjectStorageConfiguration::initialize(
// FIXME: it should be - if (format == "auto" && get_format_from_file)
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().tryGetFormatFromFileName(configuration.getPath()).value_or("auto");
else
FormatFactory::instance().checkFormatName(configuration.format);
configuration.check(local_context);
configuration.initialized = true;
}

View File

@ -43,6 +43,8 @@ public:
std::string getPathWithoutGlob() const;
virtual void check(ContextPtr context) const = 0;
virtual void validateNamespace(const String & /* name */) const {}
virtual ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly = true) = 0; /// NOLINT
virtual StorageObjectStorageConfigurationPtr clone() = 0;
virtual bool isStaticConfiguration() const { return true; }

View File

@ -1,9 +1,14 @@
#include "StorageObjectStorageSink.h"
#include <Formats/FormatFactory.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Common/isValidUTF8.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_TEXT;
}
StorageObjectStorageSink::StorageObjectStorageSink(
ObjectStoragePtr object_storage,
@ -93,6 +98,7 @@ void StorageObjectStorageSink::release()
write_buf->finalize();
}
PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink(
ObjectStoragePtr object_storage_,
StorageObjectStorageConfigurationPtr configuration_,
@ -111,9 +117,12 @@ PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink(
SinkPtr PartitionedStorageObjectStorageSink::createSinkForPartition(const String & partition_id)
{
auto blob = configuration->getPaths().back();
auto partition_key = replaceWildcards(blob, partition_id);
validatePartitionKey(partition_key, true);
auto partition_bucket = replaceWildcards(configuration->getNamespace(), partition_id);
validateNamespace(partition_bucket);
auto partition_key = replaceWildcards(configuration->getPath(), partition_id);
validateKey(partition_key);
return std::make_shared<StorageObjectStorageSink>(
object_storage,
configuration,
@ -124,4 +133,29 @@ SinkPtr PartitionedStorageObjectStorageSink::createSinkForPartition(const String
);
}
void PartitionedStorageObjectStorageSink::validateKey(const String & str)
{
/// See:
/// - https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html
/// - https://cloud.ibm.com/apidocs/cos/cos-compatibility#putobject
if (str.empty() || str.size() > 1024)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Incorrect key length (not empty, max 1023 characters), got: {}", str.size());
if (!UTF8::isValidUTF8(reinterpret_cast<const UInt8 *>(str.data()), str.size()))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in key");
validatePartitionKey(str, true);
}
void PartitionedStorageObjectStorageSink::validateNamespace(const String & str)
{
configuration->validateNamespace(str);
if (!UTF8::isValidUTF8(reinterpret_cast<const UInt8 *>(str.data()), str.size()))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in bucket name");
validatePartitionKey(str, false);
}
}

View File

@ -54,6 +54,9 @@ public:
SinkPtr createSinkForPartition(const String & partition_id) override;
private:
void validateKey(const String & str);
void validateNamespace(const String & str);
ObjectStoragePtr object_storage;
StorageObjectStorageConfigurationPtr configuration;
const std::optional<FormatSettings> format_settings;

View File

@ -108,7 +108,7 @@ void S3Settings::RequestSettings::PartUploadSettings::validate()
if (max_upload_part_size > max_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_upload_part_size has invalid value {} which is grater than the s3 API limit {}",
"Setting max_upload_part_size has invalid value {} which is greater than the s3 API limit {}",
ReadableSize(max_upload_part_size), ReadableSize(max_upload_part_size_limit));
if (max_single_part_upload_size > max_upload_part_size_limit)

View File

@ -23,4 +23,3 @@ $CLICKHOUSE_CLIENT -q "SELECT * FROM hdfs('hdfs1:9000/data', 'CSV', 'x UInt32')"
$CLICKHOUSE_CLIENT -q "SELECT * FROM hdfs('hdfs://hdfs1/data', 'CSV', 'x UInt32')" 2>&1 | grep -F -q "HDFS_ERROR" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT -q "SELECT * FROM hdfs('http://hdfs1:9000/data', 'CSV', 'x UInt32')" 2>&1 | grep -F -q "BAD_ARGUMENTS" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT -q "SELECT * FROM hdfs('hdfs://hdfs1@nameservice/abcd/data', 'CSV', 'x UInt32')" 2>&1 | grep -F -q "HDFS_ERROR" && echo 'OK' || echo 'FAIL';

View File

@ -13,7 +13,7 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
$CLICKHOUSE_CLIENT -nm -q "
INSERT INTO FUNCTION s3('http://localhost:11111/test/$CLICKHOUSE_DATABASE/test_INT_MAX.tsv', '', '', 'TSV')
SELECT repeat('a', 1024) FROM numbers((pow(2, 30) * 2) / 1024)
SETTINGS s3_max_single_part_upload_size = '10Gi';
SETTINGS s3_max_single_part_upload_size = '5Gi';
SELECT count() FROM s3('http://localhost:11111/test/$CLICKHOUSE_DATABASE/test_INT_MAX.tsv');
"

View File

@ -2,5 +2,4 @@
select * from s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/MyPrefix/BU%20-%20UNIT%20-%201/*.parquet'); -- { serverError CANNOT_EXTRACT_TABLE_STRUCTURE }
select * from s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/MyPrefix/*.parquet?some_tocken=ABCD'); -- { serverError CANNOT_EXTRACT_TABLE_STRUCTURE }
select * from s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/MyPrefix/*.parquet?some_tocken=ABCD'); -- { serverError CANNOT_DETECT_FORMAT }