Fix tests

This commit is contained in:
kssenii 2024-03-27 19:06:19 +01:00
parent cb97f8dab5
commit 7a991de488
21 changed files with 279 additions and 86 deletions

View File

@ -7,6 +7,7 @@
#include <Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Common/getRandomASCIIString.h>
#include <Common/logger_useful.h>
#if USE_HDFS
@ -18,6 +19,7 @@ namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
extern const int HDFS_ERROR;
extern const int ACCESS_DENIED;
}
void HDFSObjectStorage::shutdown()
@ -48,7 +50,7 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObject( /// NOLIN
std::optional<size_t>,
std::optional<size_t>) const
{
return std::make_unique<ReadBufferFromHDFS>(object.remote_path, object.remote_path, config, patchSettings(read_settings));
return std::make_unique<ReadBufferFromHDFS>(hdfs_root_path, object.remote_path, config, patchSettings(read_settings));
}
std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLINT
@ -62,12 +64,12 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLI
[this, disk_read_settings]
(bool /* restricted_seek */, const std::string & path) -> std::unique_ptr<ReadBufferFromFileBase>
{
size_t begin_of_path = path.find('/', path.find("//") + 2);
auto hdfs_path = path.substr(begin_of_path);
auto hdfs_uri = path.substr(0, begin_of_path);
// size_t begin_of_path = path.find('/', path.find("//") + 2);
// auto hdfs_path = path.substr(begin_of_path);
// auto hdfs_uri = path.substr(0, begin_of_path);
return std::make_unique<ReadBufferFromHDFS>(
hdfs_uri, hdfs_path, config, disk_read_settings, /* read_until_position */0, /* use_external_buffer */true);
hdfs_root_path, path, config, disk_read_settings, /* read_until_position */0, /* use_external_buffer */true);
};
return std::make_unique<ReadBufferFromRemoteFSGather>(
@ -131,7 +133,8 @@ ObjectMetadata HDFSObjectStorage::getObjectMetadata(const std::string & path) co
{
auto * file_info = hdfsGetPathInfo(hdfs_fs.get(), path.data());
if (!file_info)
throw Exception(ErrorCodes::HDFS_ERROR, "Cannot get file info for: {}. Error: {}", path, hdfsGetLastError());
throw Exception(ErrorCodes::HDFS_ERROR,
"Cannot get file info for: {}. Error: {}", path, hdfsGetLastError());
ObjectMetadata metadata;
metadata.size_bytes = static_cast<size_t>(file_info->mSize);
@ -141,6 +144,54 @@ ObjectMetadata HDFSObjectStorage::getObjectMetadata(const std::string & path) co
return metadata;
}
void HDFSObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const
{
auto * log = &Poco::Logger::get("HDFSObjectStorage");
LOG_TRACE(log, "Trying to list files for {}", path);
HDFSFileInfo ls;
ls.file_info = hdfsListDirectory(hdfs_fs.get(), path.data(), &ls.length);
if (ls.file_info == nullptr && errno != ENOENT) // NOLINT
{
// ignore file not found exception, keep throw other exception,
// libhdfs3 doesn't have function to get exception type, so use errno.
throw Exception(ErrorCodes::ACCESS_DENIED, "Cannot list directory {}: {}",
path, String(hdfsGetLastError()));
}
if (!ls.file_info && ls.length > 0)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "file_info shouldn't be null");
}
LOG_TRACE(log, "Listed {} files for {}", ls.length, path);
for (int i = 0; i < ls.length; ++i)
{
const String file_path = fs::path(ls.file_info[i].mName).lexically_normal();
const size_t last_slash = file_path.rfind('/');
const String file_name = file_path.substr(last_slash);
const bool is_directory = ls.file_info[i].mKind == 'D';
if (is_directory)
{
listObjects(fs::path(file_path) / "", children, max_keys);
}
else
{
LOG_TEST(log, "Found file: {}", file_path);
children.emplace_back(std::make_shared<RelativePathWithMetadata>(
String(file_path),
ObjectMetadata{
static_cast<uint64_t>(ls.file_info[i].mSize),
Poco::Timestamp::fromEpochTime(ls.file_info[i].mLastMod),
{}}));
}
}
}
void HDFSObjectStorage::copyObject( /// NOLINT
const StoredObject & object_from,
const StoredObject & object_to,
@ -160,7 +211,10 @@ void HDFSObjectStorage::copyObject( /// NOLINT
}
std::unique_ptr<IObjectStorage> HDFSObjectStorage::cloneObjectStorage(const std::string &, const Poco::Util::AbstractConfiguration &, const std::string &, ContextPtr)
std::unique_ptr<IObjectStorage> HDFSObjectStorage::cloneObjectStorage(
const std::string &,
const Poco::Util::AbstractConfiguration &,
const std::string &, ContextPtr)
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "HDFS object storage doesn't support cloning");
}

View File

@ -92,6 +92,8 @@ public:
const WriteSettings & write_settings,
std::optional<ObjectAttributes> object_to_attributes = {}) override;
void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override;
void shutdown() override;
void startup() override;

View File

@ -27,9 +27,7 @@ class ObjectStorageIteratorFromList : public IObjectStorageIterator
public:
explicit ObjectStorageIteratorFromList(RelativePathsWithMetadata && batch_)
: batch(std::move(batch_))
, batch_iterator(batch.begin())
{
}
, batch_iterator(batch.begin()) {}
void next() override
{
@ -37,21 +35,23 @@ public:
++batch_iterator;
}
void nextBatch() override
{
batch_iterator = batch.end();
}
void nextBatch() override { batch_iterator = batch.end(); }
bool isValid() override
{
return batch_iterator != batch.end();
}
bool isValid() override { return batch_iterator != batch.end(); }
RelativePathWithMetadataPtr current() override;
RelativePathsWithMetadata currentBatch() override { return batch; }
std::optional<RelativePathsWithMetadata> getCurrentBatchAndScheduleNext() override { return std::nullopt; }
std::optional<RelativePathsWithMetadata> getCurrentBatchAndScheduleNext() override
{
if (batch.empty())
return {};
auto current_batch = std::move(batch);
batch = {};
return current_batch;
}
size_t getAccumulatedSize() const override { return batch.size(); }

View File

@ -21,6 +21,18 @@ IObjectStorageIteratorAsync::IObjectStorageIteratorAsync(
{
}
IObjectStorageIteratorAsync::~IObjectStorageIteratorAsync()
{
if (!deactivated)
deactivate();
}
void IObjectStorageIteratorAsync::deactivate()
{
list_objects_pool.wait();
deactivated = true;
}
void IObjectStorageIteratorAsync::nextBatch()
{
std::lock_guard lock(mutex);

View File

@ -19,18 +19,20 @@ public:
CurrentMetrics::Metric threads_scheduled_metric,
const std::string & thread_name);
void next() override;
void nextBatch() override;
~IObjectStorageIteratorAsync() override;
bool isValid() override;
RelativePathWithMetadataPtr current() override;
RelativePathsWithMetadata currentBatch() override;
void next() override;
void nextBatch() override;
size_t getAccumulatedSize() const override;
std::optional<RelativePathsWithMetadata> getCurrentBatchAndScheduleNext() override;
~IObjectStorageIteratorAsync() override
{
list_objects_pool.wait();
}
void deactivate();
protected:
@ -46,6 +48,7 @@ protected:
bool is_initialized{false};
bool is_finished{false};
bool deactivated{false};
mutable std::recursive_mutex mutex;
ThreadPool list_objects_pool;

View File

@ -110,10 +110,19 @@ public:
CurrentMetrics::ObjectStorageS3ThreadsScheduled,
"ListObjectS3")
, client(client_)
, request(std::make_unique<S3::ListObjectsV2Request>())
{
request.SetBucket(bucket_);
request.SetPrefix(path_prefix);
request.SetMaxKeys(static_cast<int>(max_list_size));
request->SetBucket(bucket_);
request->SetPrefix(path_prefix);
request->SetMaxKeys(static_cast<int>(max_list_size));
}
~S3IteratorAsync() override
{
/// Deactivate background threads before resetting the request to avoid data race.
deactivate();
request.reset();
client.reset();
}
private:
@ -121,12 +130,12 @@ private:
{
ProfileEvents::increment(ProfileEvents::S3ListObjects);
auto outcome = client->ListObjectsV2(request);
auto outcome = client->ListObjectsV2(*request);
/// Outcome failure will be handled on the caller side.
if (outcome.IsSuccess())
{
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
request->SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
auto objects = outcome.GetResult().GetContents();
for (const auto & object : objects)
@ -141,12 +150,12 @@ private:
throw S3Exception(outcome.GetError().GetErrorType(),
"Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}",
quoteString(request.GetBucket()), quoteString(request.GetPrefix()),
quoteString(request->GetBucket()), quoteString(request->GetPrefix()),
backQuote(outcome.GetError().GetExceptionName()), quoteString(outcome.GetError().GetMessage()));
}
std::shared_ptr<const S3::Client> client;
S3::ListObjectsV2Request request;
std::unique_ptr<S3::ListObjectsV2Request> request;
};
}

View File

@ -8,6 +8,8 @@
#include <Disks/ObjectStorages/HDFS/HDFSObjectStorage.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Formats/FormatFactory.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -28,7 +30,7 @@ StorageHDFSConfiguration::StorageHDFSConfiguration(const StorageHDFSConfiguratio
void StorageHDFSConfiguration::check(ContextPtr context) const
{
context->getRemoteHostFilter().checkURL(Poco::URI(url));
checkHDFSURL(fs::path(url) / path);
checkHDFSURL(fs::path(url) / path.substr(1));
}
ObjectStoragePtr StorageHDFSConfiguration::createObjectStorage( /// NOLINT
@ -44,9 +46,22 @@ ObjectStoragePtr StorageHDFSConfiguration::createObjectStorage( /// NOLINT
return std::make_shared<HDFSObjectStorage>(url, std::move(hdfs_settings), context->getConfigRef());
}
void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool /* with_structure */)
std::string StorageHDFSConfiguration::getPathWithoutGlob() const
{
url = checkAndGetLiteralArgument<String>(args[0], "url");
/// Unlike s3 and azure, which are object storages,
/// hdfs is a filesystem, so it cannot list files by partual prefix,
/// only by directory.
auto first_glob_pos = path.find_first_of("*?{");
auto end_of_path_without_globs = path.substr(0, first_glob_pos).rfind('/');
if (end_of_path_without_globs == std::string::npos || end_of_path_without_globs == 0)
return "/";
return path.substr(0, end_of_path_without_globs);
}
void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool with_structure)
{
std::string url_str;
url_str = checkAndGetLiteralArgument<String>(args[0], "url");
if (args.size() > 1)
{
@ -54,28 +69,60 @@ void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool /*
format = checkAndGetLiteralArgument<String>(args[1], "format_name");
}
if (args.size() == 3)
if (with_structure)
{
if (args.size() > 2)
{
structure = checkAndGetLiteralArgument<String>(args[2], "structure");
}
if (args.size() > 3)
{
args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(args[3], context);
compression_method = checkAndGetLiteralArgument<String>(args[3], "compression_method");
}
}
else if (args.size() > 2)
{
args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(args[2], context);
compression_method = checkAndGetLiteralArgument<String>(args[2], "compression_method");
}
auto pos = url.find("//");
if (pos == std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid url: {}", url);
pos = url.find('/', pos + 2);
if (pos == std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid url: {}", url);
path = url.substr(pos + 1);
url = url.substr(0, pos);
paths = {path};
setURL(url_str);
}
void StorageHDFSConfiguration::fromNamedCollection(const NamedCollection &)
void StorageHDFSConfiguration::fromNamedCollection(const NamedCollection & collection)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method fromNamedColection() is not implemented");
std::string url_str;
auto filename = collection.getOrDefault<String>("filename", "");
if (!filename.empty())
url_str = std::filesystem::path(collection.get<String>("url")) / filename;
else
url_str = collection.get<String>("url");
format = collection.getOrDefault<String>("format", "auto");
compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
structure = collection.getOrDefault<String>("structure", "auto");
setURL(url_str);
}
void StorageHDFSConfiguration::setURL(const std::string url_)
{
auto pos = url_.find("//");
if (pos == std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad hdfs url: {}", url_);
pos = url_.find('/', pos + 2);
if (pos == std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad hdfs url: {}", url_);
path = url_.substr(pos + 1);
url = url_.substr(0, pos);
path = '/' + path;
paths = {path};
LOG_TRACE(getLogger("StorageHDFSConfiguration"), "Using url: {}, path: {}", url, path);
}
}

View File

@ -31,9 +31,12 @@ public:
static void addStructureToArgs(ASTs &, const String &, ContextPtr) {}
std::string getPathWithoutGlob() const override;
private:
void fromNamedCollection(const NamedCollection &) override;
void fromAST(ASTs & args, ContextPtr, bool /* with_structure */) override;
void setURL(const std::string url_);
String url;
String path;

View File

@ -6,6 +6,7 @@
#include <IO/Progress.h>
#include <Common/Throttler.h>
#include <Common/safe_cast.h>
#include <Common/logger_useful.h>
#include <hdfs/hdfs.h>
#include <mutex>
@ -55,10 +56,10 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
: BufferWithOwnMemory<SeekableReadBuffer>(use_external_buffer_ ? 0 : read_settings_.remote_fs_buffer_size)
, hdfs_uri(hdfs_uri_)
, hdfs_file_path(hdfs_file_path_)
, builder(createHDFSBuilder(hdfs_uri_, config_))
, read_settings(read_settings_)
, read_until_position(read_until_position_)
{
builder = createHDFSBuilder(hdfs_uri_, config_);
fs = createHDFSFS(builder.get());
fin = hdfsOpenFile(fs.get(), hdfs_file_path.c_str(), O_RDONLY, 0, 0, 0);
@ -96,11 +97,14 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
bool nextImpl() override
{
auto log = &Poco::Logger::get("kssenii");
size_t num_bytes_to_read;
if (read_until_position)
{
if (read_until_position == file_offset)
{
return false;
}
if (read_until_position < file_offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", file_offset, read_until_position - 1);
@ -111,10 +115,11 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
{
num_bytes_to_read = internal_buffer.size();
}
if (file_size != 0 && file_offset >= file_size)
{
return false;
}
// if (file_size != 0 && file_offset >= file_size)
// {
// LOG_TEST(log, "KSSENII 1 2");
// return false;
// }
ResourceGuard rlock(read_settings.resource_link, num_bytes_to_read);
int bytes_read;
@ -145,6 +150,8 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
file_offset += bytes_read;
if (read_settings.remote_throttler)
read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
LOG_TEST(log, "KSSENII SIZE: {}", bytes_read);
return true;
}

View File

@ -49,8 +49,8 @@ void ReadFromStorageObejctStorage::createIterator(const ActionsDAG::Node * predi
{
auto context = getContext();
iterator_wrapper = StorageObjectStorageSource::createFileIterator(
configuration, object_storage, distributed_processing, context, predicate,
virtual_columns, nullptr, query_settings.list_object_keys_size, metric_threads_count,
configuration, object_storage, query_settings, distributed_processing,
context, predicate, virtual_columns, nullptr, metric_threads_count,
metric_threads_active, metric_threads_scheduled, context->getFileProgressCallback());
}
}

View File

@ -288,8 +288,8 @@ std::unique_ptr<ReadBufferIterator> StorageObjectStorage<StorageSettings>::creat
{
const auto settings = StorageSettings::create(context->getSettingsRef());
auto file_iterator = StorageObjectStorageSource::createFileIterator(
configuration, object_storage, /* distributed_processing */false,
context, /* predicate */{}, /* virtual_columns */{}, &read_keys, settings.list_object_keys_size,
configuration, object_storage, settings, /* distributed_processing */false,
context, /* predicate */{}, /* virtual_columns */{}, &read_keys,
StorageSettings::ObjectStorageThreads(), StorageSettings::ObjectStorageThreadsActive(), StorageSettings::ObjectStorageThreadsScheduled());
return std::make_unique<ReadBufferIterator>(

View File

@ -92,7 +92,8 @@ StorageObjectStorageCluster<Definition, StorageSettings, Configuration>::getTask
const auto settings = StorageSettings::create(local_context->getSettingsRef());
auto iterator = std::make_shared<StorageObjectStorageSource::GlobIterator>(
object_storage, configuration, predicate, virtual_columns, local_context,
nullptr, settings.list_object_keys_size, local_context->getFileProgressCallback());
nullptr, settings.list_object_keys_size, settings.throw_on_zero_files_match,
local_context->getFileProgressCallback());
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String
{

View File

@ -40,7 +40,8 @@ StorageObjectStorageConfiguration::StorageObjectStorageConfiguration(const Stora
bool StorageObjectStorageConfiguration::withWildcard() const
{
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
return getPath().find(PARTITION_ID_WILDCARD) != String::npos;
return getPath().find(PARTITION_ID_WILDCARD) != String::npos
|| getNamespace().find(PARTITION_ID_WILDCARD) != String::npos;
}
bool StorageObjectStorageConfiguration::isPathWithGlobs() const

View File

@ -40,7 +40,7 @@ public:
bool withGlobs() const { return isPathWithGlobs() || isNamespaceWithGlobs(); }
bool isPathWithGlobs() const;
bool isNamespaceWithGlobs() const;
std::string getPathWithoutGlob() const;
virtual std::string getPathWithoutGlob() const;
virtual void check(ContextPtr context) const = 0;
virtual void validateNamespace(const String & /* name */) const {}

View File

@ -25,6 +25,7 @@ struct StorageObjectStorageSettings
SchemaInferenceMode schema_inference_mode;
bool skip_empty_files;
size_t list_object_keys_size;
bool throw_on_zero_files_match;
};
struct S3StorageSettings
@ -38,6 +39,7 @@ struct S3StorageSettings
.schema_inference_mode = settings.schema_inference_mode,
.skip_empty_files = settings.s3_skip_empty_files,
.list_object_keys_size = settings.s3_list_object_keys_size,
.throw_on_zero_files_match = settings.s3_throw_on_zero_files_match,
};
}
@ -59,6 +61,7 @@ struct AzureStorageSettings
.schema_inference_mode = settings.schema_inference_mode,
.skip_empty_files = settings.s3_skip_empty_files, /// TODO: add setting for azure
.list_object_keys_size = settings.azure_list_object_keys_size,
.throw_on_zero_files_match = settings.s3_throw_on_zero_files_match,
};
}
@ -80,6 +83,7 @@ struct HDFSStorageSettings
.schema_inference_mode = settings.schema_inference_mode,
.skip_empty_files = settings.s3_skip_empty_files, /// TODO: add setting for hdfs
.list_object_keys_size = settings.s3_list_object_keys_size, /// TODO: add a setting for hdfs
.throw_on_zero_files_match = settings.s3_throw_on_zero_files_match,
};
}

View File

@ -28,6 +28,7 @@ namespace ErrorCodes
extern const int CANNOT_COMPILE_REGEXP;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int FILE_DOESNT_EXIST;
}
StorageObjectStorageSource::StorageObjectStorageSource(
@ -75,12 +76,12 @@ StorageObjectStorageSource::~StorageObjectStorageSource()
std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSource::createFileIterator(
ConfigurationPtr configuration,
ObjectStoragePtr object_storage,
const StorageObjectStorageSettings & settings,
bool distributed_processing,
const ContextPtr & local_context,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns,
ObjectInfos * read_keys,
size_t list_object_keys_size,
CurrentMetrics::Metric metric_threads_,
CurrentMetrics::Metric metric_threads_active_,
CurrentMetrics::Metric metric_threads_scheduled_,
@ -99,12 +100,14 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
{
/// Iterate through disclosed globs and make a source for each file
return std::make_shared<GlobIterator>(
object_storage, configuration, predicate, virtual_columns, local_context, read_keys, list_object_keys_size, file_progress_callback);
object_storage, configuration, predicate, virtual_columns, local_context,
read_keys, settings.list_object_keys_size, settings.throw_on_zero_files_match, file_progress_callback);
}
else
{
return std::make_shared<KeysIterator>(
object_storage, configuration, virtual_columns, read_keys, file_progress_callback);
object_storage, configuration, virtual_columns, read_keys,
settings.throw_on_zero_files_match, file_progress_callback);
}
}
@ -209,6 +212,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
do
{
object_info = file_iterator->next(processor);
if (!object_info || object_info->relative_path.empty())
return {};
@ -226,8 +230,11 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
? tryGetNumRowsFromCache(object_info)
: std::nullopt;
LOG_TRACE(&Poco::Logger::get("kssenii"), "HAS NUM ROWS FROM CACHE: {}", num_rows_from_cache.has_value());
if (num_rows_from_cache)
{
LOG_TRACE(&Poco::Logger::get("kssenii"), "NUM ROWS FROM CACHE: {}", num_rows_from_cache.value());
/// We should not return single chunk with all number of rows,
/// because there is a chance that this chunk will be materialized later
/// (it can cause memory problems even with default values in columns or when virtual columns are requested).
@ -324,6 +331,29 @@ std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(const S
}
}
StorageObjectStorageSource::IIterator::IIterator(bool throw_on_zero_files_match_, const std::string & logger_name_)
: throw_on_zero_files_match(throw_on_zero_files_match_)
, logger(getLogger(logger_name_))
{
}
ObjectInfoPtr StorageObjectStorageSource::IIterator::next(size_t processor)
{
auto object_info = nextImpl(processor);
if (object_info)
{
first_iteration = false;
LOG_TEST(&Poco::Logger::get("KeysIterator"), "Next key: {}", object_info->relative_path);
}
else if (first_iteration && throw_on_zero_files_match)
{
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Can not match any files");
}
return object_info;
}
StorageObjectStorageSource::GlobIterator::GlobIterator(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
@ -332,8 +362,10 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
ContextPtr context_,
ObjectInfos * read_keys_,
size_t list_object_keys_size,
bool throw_on_zero_files_match_,
std::function<void(FileProgress)> file_progress_callback_)
: WithContext(context_)
: IIterator(throw_on_zero_files_match_, "GlobIterator")
, WithContext(context_)
, object_storage(object_storage_)
, configuration(configuration_)
, virtual_columns(virtual_columns_)
@ -380,7 +412,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
}
}
ObjectInfoPtr StorageObjectStorageSource::GlobIterator::next(size_t /* processor */)
ObjectInfoPtr StorageObjectStorageSource::GlobIterator::nextImpl(size_t /* processor */)
{
std::lock_guard lock(next_mutex);
@ -401,9 +433,10 @@ ObjectInfoPtr StorageObjectStorageSource::GlobIterator::next(size_t /* processor
}
new_batch = std::move(result.value());
LOG_TEST(logger, "Batch size: {}", new_batch.size());
for (auto it = new_batch.begin(); it != new_batch.end();)
{
chassert(*it);
if (!recursive && !re2::RE2::FullMatch((*it)->relative_path, *matcher))
it = new_batch.erase(it);
else
@ -452,8 +485,10 @@ StorageObjectStorageSource::KeysIterator::KeysIterator(
ConfigurationPtr configuration_,
const NamesAndTypesList & virtual_columns_,
ObjectInfos * read_keys_,
bool throw_on_zero_files_match_,
std::function<void(FileProgress)> file_progress_callback_)
: object_storage(object_storage_)
: IIterator(throw_on_zero_files_match_, "KeysIterator")
, object_storage(object_storage_)
, configuration(configuration_)
, virtual_columns(virtual_columns_)
, file_progress_callback(file_progress_callback_)
@ -470,7 +505,7 @@ StorageObjectStorageSource::KeysIterator::KeysIterator(
}
}
ObjectInfoPtr StorageObjectStorageSource::KeysIterator::next(size_t /* processor */)
ObjectInfoPtr StorageObjectStorageSource::KeysIterator::nextImpl(size_t /* processor */)
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= keys.size())
@ -520,7 +555,8 @@ StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator(
CurrentMetrics::Metric metric_threads_,
CurrentMetrics::Metric metric_threads_active_,
CurrentMetrics::Metric metric_threads_scheduled_)
: callback(callback_)
: IIterator(false, "ReadTaskIterator")
, callback(callback_)
{
ThreadPool pool(metric_threads_, metric_threads_active_, metric_threads_scheduled_, max_threads_count);
auto pool_scheduler = threadPoolCallbackRunner<String>(pool, "ReadTaskIter");
@ -540,7 +576,7 @@ StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator(
}
}
ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator::next(size_t)
ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator::nextImpl(size_t)
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= buffer.size())

View File

@ -53,12 +53,12 @@ public:
static std::shared_ptr<IIterator> createFileIterator(
ConfigurationPtr configuration,
ObjectStoragePtr object_storage,
const StorageObjectStorageSettings & settings,
bool distributed_processing,
const ContextPtr & local_context,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns,
ObjectInfos * read_keys,
size_t list_object_keys_size,
CurrentMetrics::Metric metric_threads_,
CurrentMetrics::Metric metric_threads_active_,
CurrentMetrics::Metric metric_threads_scheduled_,
@ -133,10 +133,21 @@ protected:
class StorageObjectStorageSource::IIterator
{
public:
IIterator(bool throw_on_zero_files_match_, const std::string & logger_name_);
virtual ~IIterator() = default;
virtual size_t estimatedKeysCount() = 0;
virtual ObjectInfoPtr next(size_t processor) = 0;
ObjectInfoPtr next(size_t processor);
protected:
virtual ObjectInfoPtr nextImpl(size_t processor) = 0;
protected:
const bool throw_on_zero_files_match;
bool first_iteration = true;
LoggerPtr logger;
};
class StorageObjectStorageSource::ReadTaskIterator : public IIterator
@ -151,9 +162,9 @@ public:
size_t estimatedKeysCount() override { return buffer.size(); }
ObjectInfoPtr next(size_t) override;
private:
ObjectInfoPtr nextImpl(size_t) override;
ReadTaskCallback callback;
ObjectInfos buffer;
std::atomic_size_t index = 0;
@ -170,15 +181,17 @@ public:
ContextPtr context_,
ObjectInfos * read_keys_,
size_t list_object_keys_size,
bool throw_on_zero_files_match_,
std::function<void(FileProgress)> file_progress_callback_ = {});
~GlobIterator() override = default;
size_t estimatedKeysCount() override { return object_infos.size(); }
ObjectInfoPtr next(size_t processor) override;
private:
ObjectInfoPtr nextImpl(size_t processor) override;
void createFilterAST(const String & any_key);
ObjectStoragePtr object_storage;
ConfigurationPtr configuration;
ActionsDAGPtr filter_dag;
@ -193,7 +206,6 @@ private:
std::unique_ptr<re2::RE2> matcher;
void createFilterAST(const String & any_key);
bool is_finished = false;
std::mutex next_mutex;
@ -208,15 +220,16 @@ public:
ConfigurationPtr configuration_,
const NamesAndTypesList & virtual_columns_,
ObjectInfos * read_keys_,
bool throw_on_zero_files_match_,
std::function<void(FileProgress)> file_progress_callback = {});
~KeysIterator() override = default;
size_t estimatedKeysCount() override { return keys.size(); }
ObjectInfoPtr next(size_t processor) override;
private:
ObjectInfoPtr nextImpl(size_t processor) override;
const ObjectStoragePtr object_storage;
const ConfigurationPtr configuration;
const NamesAndTypesList virtual_columns;

View File

@ -45,7 +45,8 @@ StorageS3QueueSource::FileIterator::FileIterator(
std::unique_ptr<GlobIterator> glob_iterator_,
size_t current_shard_,
std::atomic<bool> & shutdown_called_)
: metadata(metadata_)
: StorageObjectStorageSource::IIterator(false, "S3QueueIterator")
, metadata(metadata_)
, glob_iterator(std::move(glob_iterator_))
, shutdown_called(shutdown_called_)
, log(&Poco::Logger::get("StorageS3QueueSource"))
@ -59,7 +60,7 @@ StorageS3QueueSource::FileIterator::FileIterator(
}
}
StorageS3QueueSource::ObjectInfoPtr StorageS3QueueSource::FileIterator::next(size_t processor)
StorageS3QueueSource::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl(size_t processor)
{
while (!shutdown_called)
{

View File

@ -56,7 +56,7 @@ public:
/// Note:
/// List results in s3 are always returned in UTF-8 binary order.
/// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html)
ObjectInfoPtr next(size_t processor) override;
ObjectInfoPtr nextImpl(size_t processor) override;
size_t estimatedKeysCount() override;

View File

@ -598,7 +598,7 @@ std::shared_ptr<StorageS3Queue::FileIterator> StorageS3Queue::createFileIterator
{
auto settings = S3StorageSettings::create(local_context->getSettingsRef());
auto glob_iterator = std::make_unique<StorageObjectStorageSource::GlobIterator>(
object_storage, configuration, predicate, getVirtualsList(), local_context, nullptr, settings.list_object_keys_size);
object_storage, configuration, predicate, getVirtualsList(), local_context, nullptr, settings.list_object_keys_size, settings.throw_on_zero_files_match);
return std::make_shared<FileIterator>(files_metadata, std::move(glob_iterator), s3queue_settings->s3queue_current_shard_num, shutdown_called);
}

View File

@ -61,7 +61,7 @@ def test_read_write_storage_with_globs(started_cluster):
hdfs_api.write_data("/storage" + i, i + "\tMark\t72.53\n")
assert hdfs_api.read_data("/storage" + i) == i + "\tMark\t72.53\n"
assert node1.query("select count(*) from HDFSStorageWithRange") == "3\n"
assert node1.query("select count(*) from HDFSStorageWithRange settings s3_throw_on_zero_files_match=1") == "3\n"
assert node1.query("select count(*) from HDFSStorageWithEnum") == "3\n"
assert node1.query("select count(*) from HDFSStorageWithQuestionMark") == "3\n"
assert node1.query("select count(*) from HDFSStorageWithAsterisk") == "3\n"
@ -159,7 +159,7 @@ def test_bad_hdfs_uri(started_cluster):
)
except Exception as ex:
print(ex)
assert "Unable to create builder to connect to HDFS" in str(ex)
assert "Unable to connect to HDFS" in str(ex)
try:
node1.query(