More tests fixes

This commit is contained in:
kssenii 2024-03-28 14:15:14 +01:00
parent ca1c119115
commit 5c63d09c5b
11 changed files with 98 additions and 54 deletions

View File

@ -113,6 +113,9 @@ class IColumn;
M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
M(Bool, s3_ignore_file_doesnt_exist, false, "Ignore if files does not exits and return 0 zeros for StorageS3", 0) \
M(Bool, hdfs_ignore_file_doesnt_exist, false, "Ignore if files does not exits and return 0 zeros for StorageHDFS", 0) \
M(Bool, azure_ignore_file_doesnt_exist, false, "Ignore if files does not exits and return 0 zeros for StorageAzure", 0) \
M(Bool, s3_disable_checksum, false, "Do not calculate a checksum when sending a file to S3. This speeds up writes by avoiding excessive processing passes on a file. It is mostly safe as the data of MergeTree tables is checksummed by ClickHouse anyway, and when S3 is accessed with HTTPS, the TLS layer already provides integrity while transferring through the network. While additional checksums on S3 give defense in depth.", 0) \
M(UInt64, s3_retry_attempts, 100, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
M(UInt64, s3_request_timeout_ms, 30000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \

View File

@ -103,10 +103,10 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
void HDFSObjectStorage::removeObject(const StoredObject & object)
{
const auto & path = object.remote_path;
const size_t begin_of_path = path.find('/', path.find("//") + 2);
// const size_t begin_of_path = path.find('/', path.find("//") + 2);
/// Add path from root to file name
int res = hdfsDelete(hdfs_fs.get(), path.substr(begin_of_path).c_str(), 0);
int res = hdfsDelete(hdfs_fs.get(), path.c_str(), 0);
if (res == -1)
throw Exception(ErrorCodes::HDFS_ERROR, "HDFSDelete failed with path: {}", path);

View File

@ -432,7 +432,9 @@ void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::string & path) const
{
auto settings_ptr = s3_settings.get();
auto object_info = S3::getObjectInfo(*client.get(), uri.bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
auto object_info = S3::getObjectInfo(
*client.get(), uri.bucket, path, {}, settings_ptr->request_settings,
/* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
if (object_info.size == 0 && object_info.last_modification_time == 0 && object_info.metadata.empty())
return {};
@ -448,7 +450,9 @@ std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::s
ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) const
{
auto settings_ptr = s3_settings.get();
auto object_info = S3::getObjectInfo(*client.get(), uri.bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
auto object_info = S3::getObjectInfo(
*client.get(), uri.bucket, path, {}, settings_ptr->request_settings,
/* with_metadata= */ true, /* for_disk_s3= */ true);
ObjectMetadata result;
result.size_bytes = object_info.size;

View File

@ -16,6 +16,7 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
StorageHDFSConfiguration::StorageHDFSConfiguration(const StorageHDFSConfiguration & other)
@ -62,6 +63,13 @@ void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool wit
std::string url_str;
url_str = checkAndGetLiteralArgument<String>(args[0], "url");
const size_t max_args_num = with_structure ? 4 : 3;
if (args.size() > max_args_num)
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Expected not more than {} arguments", max_args_num);
}
if (args.size() > 1)
{
args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(args[1], context);
@ -72,6 +80,7 @@ void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool wit
{
if (args.size() > 2)
{
args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(args[2], context);
structure = checkAndGetLiteralArgument<String>(args[2], "structure");
}
if (args.size() > 3)
@ -100,13 +109,14 @@ void StorageHDFSConfiguration::fromNamedCollection(const NamedCollection & colle
url_str = collection.get<String>("url");
format = collection.getOrDefault<String>("format", "auto");
compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
compression_method = collection.getOrDefault<String>("compression_method",
collection.getOrDefault<String>("compression", "auto"));
structure = collection.getOrDefault<String>("structure", "auto");
setURL(url_str);
}
void StorageHDFSConfiguration::setURL(const std::string url_)
void StorageHDFSConfiguration::setURL(const std::string & url_)
{
auto pos = url_.find("//");
if (pos == std::string::npos)
@ -117,8 +127,10 @@ void StorageHDFSConfiguration::setURL(const std::string url_)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad hdfs url: {}", url_);
path = url_.substr(pos + 1);
if (!path.starts_with('/'))
path = '/' + path;
url = url_.substr(0, pos);
path = '/' + path;
paths = {path};
LOG_TRACE(getLogger("StorageHDFSConfiguration"), "Using url: {}, path: {}", url, path);

View File

@ -36,7 +36,7 @@ public:
private:
void fromNamedCollection(const NamedCollection &) override;
void fromAST(ASTs & args, ContextPtr, bool /* with_structure */) override;
void setURL(const std::string url_);
void setURL(const std::string & url_);
String url;
String path;

View File

@ -26,6 +26,7 @@ struct StorageObjectStorageSettings
bool skip_empty_files;
size_t list_object_keys_size;
bool throw_on_zero_files_match;
bool ignore_non_existent_file;
};
struct S3StorageSettings
@ -40,6 +41,7 @@ struct S3StorageSettings
.skip_empty_files = settings.s3_skip_empty_files,
.list_object_keys_size = settings.s3_list_object_keys_size,
.throw_on_zero_files_match = settings.s3_throw_on_zero_files_match,
.ignore_non_existent_file = settings.s3_ignore_file_doesnt_exist,
};
}
@ -62,6 +64,7 @@ struct AzureStorageSettings
.skip_empty_files = settings.s3_skip_empty_files, /// TODO: add setting for azure
.list_object_keys_size = settings.azure_list_object_keys_size,
.throw_on_zero_files_match = settings.s3_throw_on_zero_files_match,
.ignore_non_existent_file = settings.azure_ignore_file_doesnt_exist,
};
}
@ -84,6 +87,7 @@ struct HDFSStorageSettings
.skip_empty_files = settings.s3_skip_empty_files, /// TODO: add setting for hdfs
.list_object_keys_size = settings.s3_list_object_keys_size, /// TODO: add a setting for hdfs
.throw_on_zero_files_match = settings.s3_throw_on_zero_files_match,
.ignore_non_existent_file = settings.hdfs_ignore_file_doesnt_exist,
};
}

View File

@ -100,14 +100,15 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
{
/// Iterate through disclosed globs and make a source for each file
return std::make_shared<GlobIterator>(
object_storage, configuration, predicate, virtual_columns, local_context,
read_keys, settings.list_object_keys_size, settings.throw_on_zero_files_match, file_progress_callback);
object_storage, configuration, predicate, virtual_columns,
local_context, read_keys, settings.list_object_keys_size,
settings.throw_on_zero_files_match, file_progress_callback);
}
else
{
return std::make_shared<KeysIterator>(
object_storage, configuration, virtual_columns, read_keys,
settings.throw_on_zero_files_match, file_progress_callback);
settings.ignore_non_existent_file, file_progress_callback);
}
}
@ -331,9 +332,8 @@ std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(const S
}
}
StorageObjectStorageSource::IIterator::IIterator(bool throw_on_zero_files_match_, const std::string & logger_name_)
: throw_on_zero_files_match(throw_on_zero_files_match_)
, logger(getLogger(logger_name_))
StorageObjectStorageSource::IIterator::IIterator(const std::string & logger_name_)
: logger(getLogger(logger_name_))
{
}
@ -343,13 +343,8 @@ ObjectInfoPtr StorageObjectStorageSource::IIterator::next(size_t processor)
if (object_info)
{
first_iteration = false;
LOG_TEST(&Poco::Logger::get("KeysIterator"), "Next key: {}", object_info->relative_path);
}
else if (first_iteration && throw_on_zero_files_match)
{
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Can not match any files");
}
return object_info;
}
@ -364,11 +359,12 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
size_t list_object_keys_size,
bool throw_on_zero_files_match_,
std::function<void(FileProgress)> file_progress_callback_)
: IIterator(throw_on_zero_files_match_, "GlobIterator")
: IIterator("GlobIterator")
, WithContext(context_)
, object_storage(object_storage_)
, configuration(configuration_)
, virtual_columns(virtual_columns_)
, throw_on_zero_files_match(throw_on_zero_files_match_)
, read_keys(read_keys_)
, file_progress_callback(file_progress_callback_)
{
@ -412,10 +408,24 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
}
}
ObjectInfoPtr StorageObjectStorageSource::GlobIterator::nextImpl(size_t /* processor */)
ObjectInfoPtr StorageObjectStorageSource::GlobIterator::nextImpl(size_t processor)
{
std::lock_guard lock(next_mutex);
auto object_info = nextImplUnlocked(processor);
if (object_info)
{
if (first_iteration)
first_iteration = false;
}
else if (first_iteration && throw_on_zero_files_match)
{
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Can not match any files");
}
return object_info;
}
ObjectInfoPtr StorageObjectStorageSource::GlobIterator::nextImplUnlocked(size_t /* processor */)
{
bool current_batch_processed = object_infos.empty() || index >= object_infos.size();
if (is_finished && current_batch_processed)
return {};
@ -485,14 +495,15 @@ StorageObjectStorageSource::KeysIterator::KeysIterator(
ConfigurationPtr configuration_,
const NamesAndTypesList & virtual_columns_,
ObjectInfos * read_keys_,
bool throw_on_zero_files_match_,
bool ignore_non_existent_files_,
std::function<void(FileProgress)> file_progress_callback_)
: IIterator(throw_on_zero_files_match_, "KeysIterator")
: IIterator("KeysIterator")
, object_storage(object_storage_)
, configuration(configuration_)
, virtual_columns(virtual_columns_)
, file_progress_callback(file_progress_callback_)
, keys(configuration->getPaths())
, ignore_non_existent_files(ignore_non_existent_files_)
{
if (read_keys_)
{
@ -507,20 +518,29 @@ StorageObjectStorageSource::KeysIterator::KeysIterator(
ObjectInfoPtr StorageObjectStorageSource::KeysIterator::nextImpl(size_t /* processor */)
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= keys.size())
return {};
auto key = keys[current_index];
ObjectMetadata metadata{};
if (file_progress_callback)
while (true)
{
metadata = object_storage->getObjectMetadata(key);
file_progress_callback(FileProgress(0, metadata.size_bytes));
}
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= keys.size())
return {};
return std::make_shared<ObjectInfo>(key, metadata);
auto key = keys[current_index];
ObjectMetadata object_metadata{};
if (ignore_non_existent_files)
{
auto metadata = object_storage->tryGetObjectMetadata(key);
if (!metadata)
continue;
}
else
object_metadata = object_storage->getObjectMetadata(key);
if (file_progress_callback)
file_progress_callback(FileProgress(0, object_metadata.size_bytes));
return std::make_shared<ObjectInfo>(key, object_metadata);
}
}
StorageObjectStorageSource::ReaderHolder::ReaderHolder(
@ -555,7 +575,7 @@ StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator(
CurrentMetrics::Metric metric_threads_,
CurrentMetrics::Metric metric_threads_active_,
CurrentMetrics::Metric metric_threads_scheduled_)
: IIterator(false, "ReadTaskIterator")
: IIterator("ReadTaskIterator")
, callback(callback_)
{
ThreadPool pool(metric_threads_, metric_threads_active_, metric_threads_scheduled_, max_threads_count);

View File

@ -133,7 +133,7 @@ protected:
class StorageObjectStorageSource::IIterator
{
public:
IIterator(bool throw_on_zero_files_match_, const std::string & logger_name_);
explicit IIterator(const std::string & logger_name_);
virtual ~IIterator() = default;
@ -143,10 +143,6 @@ public:
protected:
virtual ObjectInfoPtr nextImpl(size_t processor) = 0;
protected:
const bool throw_on_zero_files_match;
bool first_iteration = true;
LoggerPtr logger;
};
@ -190,23 +186,26 @@ public:
private:
ObjectInfoPtr nextImpl(size_t processor) override;
ObjectInfoPtr nextImplUnlocked(size_t processor);
void createFilterAST(const String & any_key);
ObjectStoragePtr object_storage;
ConfigurationPtr configuration;
ActionsDAGPtr filter_dag;
NamesAndTypesList virtual_columns;
const ObjectStoragePtr object_storage;
const ConfigurationPtr configuration;
const NamesAndTypesList virtual_columns;
const bool throw_on_zero_files_match;
size_t index = 0;
ObjectInfos object_infos;
ObjectInfos * read_keys;
ActionsDAGPtr filter_dag;
ObjectStorageIteratorPtr object_storage_iterator;
bool recursive{false};
std::unique_ptr<re2::RE2> matcher;
bool is_finished = false;
bool first_iteration = true;
std::mutex next_mutex;
std::function<void(FileProgress)> file_progress_callback;
@ -220,7 +219,7 @@ public:
ConfigurationPtr configuration_,
const NamesAndTypesList & virtual_columns_,
ObjectInfos * read_keys_,
bool throw_on_zero_files_match_,
bool ignore_non_existent_files_,
std::function<void(FileProgress)> file_progress_callback = {});
~KeysIterator() override = default;
@ -236,5 +235,6 @@ private:
const std::function<void(FileProgress)> file_progress_callback;
const std::vector<String> keys;
std::atomic<size_t> index = 0;
bool ignore_non_existent_files;
};
}

View File

@ -45,7 +45,7 @@ StorageS3QueueSource::FileIterator::FileIterator(
std::unique_ptr<GlobIterator> glob_iterator_,
size_t current_shard_,
std::atomic<bool> & shutdown_called_)
: StorageObjectStorageSource::IIterator(false, "S3QueueIterator")
: StorageObjectStorageSource::IIterator("S3QueueIterator")
, metadata(metadata_)
, glob_iterator(std::move(glob_iterator_))
, shutdown_called(shutdown_called_)

View File

@ -326,7 +326,7 @@ def test_virtual_columns(started_cluster):
hdfs_api.write_data("/file1", "1\n")
hdfs_api.write_data("/file2", "2\n")
hdfs_api.write_data("/file3", "3\n")
expected = "1\tfile1\thdfs://hdfs1:9000/file1\n2\tfile2\thdfs://hdfs1:9000/file2\n3\tfile3\thdfs://hdfs1:9000/file3\n"
expected = "1\tfile1\t/file1\n2\tfile2\t/file2\n3\tfile3\t/file3\n"
assert (
node1.query(
"select id, _file as file_name, _path as file_path from virtual_cols order by id"
@ -365,7 +365,7 @@ def test_truncate_table(started_cluster):
assert hdfs_api.read_data("/tr") == "1\tMark\t72.53\n"
assert node1.query("select * from test_truncate") == "1\tMark\t72.53\n"
node1.query("truncate table test_truncate")
assert node1.query("select * from test_truncate") == ""
assert node1.query("select * from test_truncate settings hdfs_ignore_file_doesnt_exist=1") == ""
node1.query("drop table test_truncate")
@ -488,13 +488,13 @@ def test_hdfsCluster(started_cluster):
actual = node1.query(
"select id, _file as file_name, _path as file_path from hdfs('hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') order by id"
)
expected = "1\tfile1\thdfs://hdfs1:9000/test_hdfsCluster/file1\n2\tfile2\thdfs://hdfs1:9000/test_hdfsCluster/file2\n3\tfile3\thdfs://hdfs1:9000/test_hdfsCluster/file3\n"
expected = "1\tfile1\t/test_hdfsCluster/file1\n2\tfile2\t/test_hdfsCluster/file2\n3\tfile3\t/test_hdfsCluster/file3\n"
assert actual == expected
actual = node1.query(
"select id, _file as file_name, _path as file_path from hdfsCluster('test_cluster_two_shards', 'hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') order by id"
)
expected = "1\tfile1\thdfs://hdfs1:9000/test_hdfsCluster/file1\n2\tfile2\thdfs://hdfs1:9000/test_hdfsCluster/file2\n3\tfile3\thdfs://hdfs1:9000/test_hdfsCluster/file3\n"
expected = "1\tfile1\t/test_hdfsCluster/file1\n2\tfile2\t/test_hdfsCluster/file2\n3\tfile3\t/test_hdfsCluster/file3\n"
assert actual == expected
fs.delete(dir, recursive=True)
@ -502,7 +502,7 @@ def test_hdfsCluster(started_cluster):
def test_hdfs_directory_not_exist(started_cluster):
ddl = "create table HDFSStorageWithNotExistDir (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/data/not_eixst', 'TSV')"
node1.query(ddl)
assert "" == node1.query("select * from HDFSStorageWithNotExistDir")
assert "" == node1.query("select * from HDFSStorageWithNotExistDir settings hdfs_ignore_file_doesnt_exist=1")
def test_overwrite(started_cluster):
@ -658,7 +658,7 @@ def test_virtual_columns_2(started_cluster):
node1.query(f"insert into table function {table_function} SELECT 1, 'kek'")
result = node1.query(f"SELECT _path FROM {table_function}")
assert result.strip() == "hdfs://hdfs1:9000/parquet_2"
assert result.strip() == "/parquet_2"
table_function = (
f"hdfs('hdfs://hdfs1:9000/parquet_3', 'Parquet', 'a Int32, _path String')"

View File

@ -60,7 +60,8 @@ SELECT * FROM \"abacaba/file.tsv\"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM test_hdfs_4.\`http://localhost:11111/test/a.tsv\`" 2>&1 | tr '\n' ' ' | grep -oF -e "UNKNOWN_TABLE" -e "BAD_ARGUMENTS" > /dev/null && echo "OK" || echo 'FAIL' ||:
${CLICKHOUSE_CLIENT} --query "SELECT * FROM test_hdfs_4.\`hdfs://localhost:12222/file.myext\`" 2>&1 | tr '\n' ' ' | grep -oF -e "UNKNOWN_TABLE" -e "CANNOT_EXTRACT_TABLE_STRUCTURE" > /dev/null && echo "OK" || echo 'FAIL' ||:
${CLICKHOUSE_CLIENT} --query "SELECT * FROM test_hdfs_4.\`hdfs://localhost:12222/test_02725_3.tsv\`" 2>&1 | tr '\n' ' ' | grep -oF -e "UNKNOWN_TABLE" -e "CANNOT_EXTRACT_TABLE_STRUCTURE" > /dev/null && echo "OK" || echo 'FAIL' ||:
${CLICKHOUSE_CLIENT} --query "SELECT * FROM test_hdfs_4.\`hdfs://localhost:12222/file.myext\`"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM test_hdfs_4.\`hdfs://localhost:12222/test_02725_3.tsv\`"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM test_hdfs_4.\`hdfs://localhost:12222\`" 2>&1 | tr '\n' ' ' | grep -oF -e "UNKNOWN_TABLE" -e "BAD_ARGUMENTS" > /dev/null && echo "OK" || echo 'FAIL' ||: