This commit is contained in:
kssenii 2023-09-27 14:54:31 +02:00
parent d77452c561
commit 6b191a1afe
9 changed files with 197 additions and 141 deletions

View File

@ -391,7 +391,7 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee
auto code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS)
return std::make_shared<zkutil::EphemeralNodeHolder>(current_worker_path, *zookeeper, false, false, description);
return zkutil::EphemeralNodeHolder::existing(current_worker_path, *zookeeper);
if (code == Coordination::Error::ZBADVERSION)
{

View File

@ -644,11 +644,18 @@ class EphemeralNodeHolder
public:
using Ptr = std::shared_ptr<EphemeralNodeHolder>;
EphemeralNodeHolder(const std::string & path_, ZooKeeper & zookeeper_, bool create, bool sequential, const std::string & data)
EphemeralNodeHolder(const std::string & path_, ZooKeeper & zookeeper_, bool create, bool try_create, bool sequential, const std::string & data)
: path(path_), zookeeper(zookeeper_)
{
if (create)
{
path = zookeeper.create(path, data, sequential ? CreateMode::EphemeralSequential : CreateMode::Ephemeral);
need_remove = created = true;
}
else if (try_create)
{
need_remove = created = Coordination::Error::ZOK == zookeeper.tryCreate(path, data, sequential ? CreateMode::EphemeralSequential : CreateMode::Ephemeral);
}
}
std::string getPath() const
@ -656,19 +663,32 @@ public:
return path;
}
bool isCreated() const
{
return created;
}
static Ptr create(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "")
{
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, true, false, data);
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, true, false, false, data);
}
static Ptr tryCreate(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "")
{
auto node = std::make_shared<EphemeralNodeHolder>(path, zookeeper, false, true, false, data);
if (node->isCreated())
return node;
return nullptr;
}
static Ptr createSequential(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "")
{
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, true, true, data);
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, true, false, true, data);
}
static Ptr existing(const std::string & path, ZooKeeper & zookeeper)
{
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, false, false, "");
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, false, false, false, "");
}
void setAlreadyRemoved()
@ -702,6 +722,7 @@ private:
ZooKeeper & zookeeper;
CurrentMetrics::Increment metric_increment{CurrentMetrics::EphemeralNode};
bool need_remove = true;
bool created = false;
};
using EphemeralNodeHolderPtr = EphemeralNodeHolder::Ptr;

View File

@ -4,10 +4,13 @@
#include <base/sleep.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/randomSeed.h>
#include <Common/getRandomASCIIString.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Interpreters/Context.h>
#include <Storages/S3Queue/S3QueueFilesMetadata.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/StorageS3Settings.h>
#include <Storages/StorageSnapshot.h>
#include <Poco/JSON/JSON.h>
@ -55,15 +58,6 @@ std::unique_lock<std::mutex> S3QueueFilesMetadata::LocalFileStatuses::lock() con
return std::unique_lock(mutex);
}
S3QueueFilesMetadata::FileStatus::State S3QueueFilesMetadata::LocalFileStatuses::state(const std::string & filename) const
{
auto lk = lock();
if (auto it = file_statuses.find(filename); it != file_statuses.end())
return it->second->state;
else
return FileStatus::State::None;
}
S3QueueFilesMetadata::FileStatuses S3QueueFilesMetadata::LocalFileStatuses::getAll() const
{
auto lk = lock();
@ -106,6 +100,7 @@ std::string S3QueueFilesMetadata::NodeMetadata::toString() const
json.set("last_processed_timestamp", getCurrentTime());
json.set("last_exception", last_exception);
json.set("retries", retries);
json.set("processing_id", processing_id);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
@ -123,6 +118,7 @@ S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::NodeMetadata::fromStrin
metadata.last_processed_timestamp = json->getValue<UInt64>("last_processed_timestamp");
metadata.last_exception = json->getValue<String>("last_exception");
metadata.retries = json->getValue<UInt64>("retries");
metadata.processing_id = json->getValue<UInt64>("processing_id");
return metadata;
}
@ -189,28 +185,28 @@ S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::createNodeMetadata(
return metadata;
}
bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path)
S3QueueFilesMetadata::ProcessingHolderPtr S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path)
{
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessingMicroseconds);
auto file_status = local_file_statuses.get(path, /* create */false);
/// Check locally cached file status.
switch (local_file_statuses.state(path))
switch (file_status->state)
{
case FileStatus::State::Processing: [[fallthrough]];
case FileStatus::State::Processed:
{
/// File is already processes or processing by current server.
return false;
return nullptr;
}
case FileStatus::State::Failed:
{
if (!max_loading_retries)
{
/// File was processes by current server and failed,
/// retries are disabled.
return false;
}
/// TODO save information if file is still retriable.
/// max_loading_retries == 0 => file is not retriable.
/// file_status->retries is a cached value, so in case file_status->retries >= max_loading retries
/// we can fully rely that it is true, but in other case the value might be outdated,
/// but this is ok, we will recheck with zookeeper.
if (!max_loading_retries || file_status->retries >= max_loading_retries)
return nullptr;
break;
}
case FileStatus::State::None:
@ -220,19 +216,25 @@ bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path)
break;
}
}
/// TODO lock file token not to go to keeper simultaneously from this server.
std::unique_lock lock(file_status->processing_lock, std::defer_lock);
if (!lock.try_lock())
{
/// Another thread is already trying to set file as processing.
return nullptr;
}
SetFileProcessingResult result;
ProcessingHolderPtr processing_holder;
switch (mode)
{
case S3QueueMode::ORDERED:
{
result = trySetFileAsProcessingForOrderedMode(path);
std::tie(result, processing_holder) = trySetFileAsProcessingForOrderedMode(path);
break;
}
case S3QueueMode::UNORDERED:
{
result = trySetFileAsProcessingForUnorderedMode(path);
std::tie(result, processing_holder) = trySetFileAsProcessingForUnorderedMode(path);
break;
}
}
@ -240,7 +242,6 @@ bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path)
{
case SetFileProcessingResult::Success:
{
auto file_status = local_file_statuses.get(path, /* create */true);
file_status->state = FileStatus::State::Processing;
file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileProcessingMicroseconds, timer.get());
timer.cancel();
@ -251,14 +252,12 @@ bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path)
case SetFileProcessingResult::AlreadyProcessed:
{
/// Cache the state.
auto file_status = local_file_statuses.get(path, /* create */true);
file_status->state = FileStatus::State::Processed;
break;
}
case SetFileProcessingResult::AlreadyFailed:
{
/// Cache the state.
auto file_status = local_file_statuses.get(path, /* create */true);
file_status->state = FileStatus::State::Failed;
break;
}
@ -268,54 +267,60 @@ bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path)
break;
}
}
return result == SetFileProcessingResult::Success;
if (result != SetFileProcessingResult::Success)
return nullptr;
return processing_holder;
}
S3QueueFilesMetadata::SetFileProcessingResult S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path)
std::pair<S3QueueFilesMetadata::SetFileProcessingResult, S3QueueFilesMetadata::ProcessingHolderPtr>
S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path)
{
/// Create an ephemenral node in /processing
/// if corresponding node does not exist in failed/, processed/ and processing/.
/// Return false otherwise.
const auto node_name = getNodeName(path);
const auto node_metadata = createNodeMetadata(path).toString();
const auto zk_client = getZooKeeper();
auto node_metadata = createNodeMetadata(path);
node_metadata.processing_id = getRandomASCIIString(10);
Coordination::Requests requests;
zkutil::addCheckNotExistsRequest(requests, *zk_client, zookeeper_processed_path / node_name);
zkutil::addCheckNotExistsRequest(requests, *zk_client, zookeeper_failed_path / node_name);
requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata, zkutil::CreateMode::Ephemeral));
requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata.toString(), zkutil::CreateMode::Ephemeral));
Coordination::Responses responses;
auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
{
return SetFileProcessingResult::Success;
auto holder = std::make_unique<ProcessingHolder>(node_metadata.processing_id, zookeeper_processing_path / node_name, zk_client);
return std::pair{SetFileProcessingResult::Success, std::move(holder)};
}
else if (responses[0]->error == Coordination::Error::ZOK)
{
if (responses[1]->error == Coordination::Error::ZOK)
{
chassert(responses[2]->error != Coordination::Error::ZOK);
return SetFileProcessingResult::ProcessingByOtherNode;
}
else
return SetFileProcessingResult::AlreadyFailed;
}
else
return SetFileProcessingResult::AlreadyProcessed;
if (responses[0]->error != Coordination::Error::ZOK)
return std::pair{SetFileProcessingResult::AlreadyProcessed, nullptr};
if (responses[1]->error != Coordination::Error::ZOK)
return std::pair{SetFileProcessingResult::AlreadyFailed, nullptr};
chassert(responses[2]->error != Coordination::Error::ZOK);
return std::pair{SetFileProcessingResult::ProcessingByOtherNode, nullptr};
}
S3QueueFilesMetadata::SetFileProcessingResult S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path)
std::pair<S3QueueFilesMetadata::SetFileProcessingResult, S3QueueFilesMetadata::ProcessingHolderPtr>
S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path)
{
/// Create an ephemenral node in /processing
/// if corresponding it does not exist in failed/, processing/ and satisfied max processed file check.
/// Return false otherwise.
const auto node_name = getNodeName(path);
const auto node_metadata = createNodeMetadata(path).toString();
const auto zk_client = getZooKeeper();
auto node_metadata = createNodeMetadata(path);
node_metadata.processing_id = getRandomASCIIString(10);
while (true)
{
@ -330,12 +335,12 @@ S3QueueFilesMetadata::SetFileProcessingResult S3QueueFilesMetadata::trySetFileAs
if (responses[0]->error == Coordination::Error::ZOK)
{
LOG_TEST(log, "Skipping file `{}`: already processing", path);
return SetFileProcessingResult::ProcessingByOtherNode;
return std::pair{SetFileProcessingResult::ProcessingByOtherNode, nullptr};
}
else
{
LOG_TEST(log, "Skipping file `{}`: failed", path);
return SetFileProcessingResult::AlreadyFailed;
return std::pair{SetFileProcessingResult::AlreadyFailed, nullptr};
}
}
@ -347,27 +352,30 @@ S3QueueFilesMetadata::SetFileProcessingResult S3QueueFilesMetadata::trySetFileAs
auto max_processed_file_path = processed_node_metadata.file_path;
if (!max_processed_file_path.empty() && path <= max_processed_file_path)
return SetFileProcessingResult::AlreadyProcessed;
return std::pair{SetFileProcessingResult::AlreadyProcessed, nullptr};
requests.clear();
responses.clear();
zkutil::addCheckNotExistsRequest(requests, *zk_client, zookeeper_failed_path / node_name);
requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata, zkutil::CreateMode::Ephemeral));
requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata.toString(), zkutil::CreateMode::Ephemeral));
requests.push_back(zkutil::makeCheckRequest(zookeeper_processed_path, processed_node_stat.version));
code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
return SetFileProcessingResult::Success;
{
auto holder = std::make_unique<ProcessingHolder>(node_metadata.processing_id, zookeeper_processing_path / node_name, zk_client);
return std::pair{SetFileProcessingResult::Success, std::move(holder)};
}
if (responses[0]->error != Coordination::Error::ZOK)
{
LOG_TEST(log, "Skipping file `{}`: failed", path);
return SetFileProcessingResult::AlreadyFailed;
return std::pair{SetFileProcessingResult::AlreadyFailed, nullptr};
}
else if (responses[1]->error != Coordination::Error::ZOK)
{
LOG_TEST(log, "Skipping file `{}`: already processing", path);
return SetFileProcessingResult::ProcessingByOtherNode;
return std::pair{SetFileProcessingResult::ProcessingByOtherNode, nullptr};
}
else
{
@ -465,8 +473,8 @@ void S3QueueFilesMetadata::setFileFailed(const String & path, const String & exc
{
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds);
SCOPE_EXIT_SAFE({
auto file_status = local_file_statuses.get(path, /* create */false);
auto file_status = local_file_statuses.get(path, /* create */false);
SCOPE_EXIT({
file_status->state = FileStatus::State::Failed;
file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileFailedMicroseconds, timer.get());
timer.cancel();
@ -505,6 +513,7 @@ void S3QueueFilesMetadata::setFileFailed(const String & path, const String & exc
{
auto failed_node_metadata = NodeMetadata::fromString(res);
node_metadata.retries = failed_node_metadata.retries + 1;
file_status->retries = node_metadata.retries;
}
LOG_TEST(log, "File `{}` failed to process, try {}/{} (Error: {})",
@ -605,30 +614,12 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
/// Create a lock so that with distributed processing
/// multiple nodes do not execute cleanup in parallel.
Coordination::Error code = zk_client->tryCreate(zookeeper_cleanup_lock_path,
toString(getCurrentTime()),
zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNODEEXISTS)
auto ephemeral_node = zkutil::EphemeralNodeHolder::create(zookeeper_cleanup_lock_path, *zk_client, toString(getCurrentTime()));
if (!ephemeral_node)
{
LOG_TEST(log, "Cleanup is already being executed by another node");
return;
}
else if (code != Coordination::Error::ZOK)
{
throw Coordination::Exception::fromPath(code, zookeeper_cleanup_lock_path);
}
SCOPE_EXIT_SAFE({
try
{
zk_client->remove(zookeeper_cleanup_lock_path);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
chassert(false);
}
});
struct Node
{
@ -687,7 +678,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
local_file_statuses.remove(node.metadata.file_path, /* if_exists */true);
code = zk_client->tryRemove(path);
auto code = zk_client->tryRemove(path);
if (code == Coordination::Error::ZOK)
--nodes_to_remove;
else
@ -704,7 +695,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
local_file_statuses.remove(node.metadata.file_path, /* if_exists */true);
code = zk_client->tryRemove(path);
auto code = zk_client->tryRemove(path);
if (code != Coordination::Error::ZOK)
LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", path.string(), code);
}

View File

@ -22,7 +22,17 @@ public:
~S3QueueFilesMetadata();
bool trySetFileAsProcessing(const std::string & path);
struct ProcessingHolder
{
ProcessingHolder(const std::string & processing_id_, const std::string & zk_node_path_, zkutil::ZooKeeperPtr zk_client_)
: zk_client(zk_client_), zk_node_path(zk_node_path_), processing_id(processing_id_) {}
zkutil::ZooKeeperPtr zk_client;
std::string zk_node_path;
std::string processing_id;
};
using ProcessingHolderPtr = std::unique_ptr<ProcessingHolder>;
ProcessingHolderPtr trySetFileAsProcessing(const std::string & path);
void setFileProcessed(const std::string & path);
@ -47,6 +57,10 @@ public:
time_t processing_start_time = 0;
time_t processing_end_time = 0;
size_t retries = 0;
std::mutex processing_lock;
};
using FileStatuses = std::unordered_map<std::string, std::shared_ptr<FileStatus>>;
@ -88,8 +102,8 @@ private:
AlreadyProcessed,
AlreadyFailed,
};
SetFileProcessingResult trySetFileAsProcessingForOrderedMode(const std::string & path);
SetFileProcessingResult trySetFileAsProcessingForUnorderedMode(const std::string & path);
std::pair<SetFileProcessingResult, ProcessingHolderPtr> trySetFileAsProcessingForOrderedMode(const std::string & path);
std::pair<SetFileProcessingResult, ProcessingHolderPtr> trySetFileAsProcessingForUnorderedMode(const std::string & path);
struct NodeMetadata
{
@ -97,6 +111,7 @@ private:
UInt64 last_processed_timestamp = 0;
std::string last_exception;
UInt64 retries = 0;
std::string processing_id; /// For ephemeral processing node.
std::string toString() const;
static NodeMetadata fromString(const std::string & metadata_str);
@ -115,7 +130,6 @@ private:
FileStatuses getAll() const;
std::shared_ptr<FileStatus> get(const std::string & filename, bool create);
bool remove(const std::string & filename, bool if_exists);
FileStatus::State state(const std::string & filename) const;
std::unique_lock<std::mutex> lock() const;
};
LocalFileStatuses local_file_statuses;

View File

@ -5,6 +5,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/logger_useful.h>
#include <Common/getRandomASCIIString.h>
#include <Storages/S3Queue/S3QueueSource.h>
#include <Storages/VirtualColumnUtils.h>
@ -29,24 +30,39 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
StorageS3QueueSource::S3QueueKeyWithInfo::S3QueueKeyWithInfo(
const std::string & key_,
std::optional<S3::ObjectInfo> info_,
std::unique_ptr<Metadata::ProcessingHolder> processing_holder_,
std::shared_ptr<Metadata::FileStatus> file_status_)
: StorageS3Source::KeyWithInfo(key_, info_)
, processing_holder(std::move(processing_holder_))
, file_status(file_status_)
{
}
StorageS3QueueSource::FileIterator::FileIterator(
std::shared_ptr<S3QueueFilesMetadata> metadata_, std::unique_ptr<GlobIterator> glob_iterator_)
: metadata(metadata_) , glob_iterator(std::move(glob_iterator_))
{
}
StorageS3QueueSource::KeyWithInfo StorageS3QueueSource::FileIterator::next()
StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next()
{
/// List results in s3 are always returned in UTF-8 binary order.
/// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html)
while (true)
{
KeyWithInfo val = glob_iterator->next();
if (val.key.empty())
KeyWithInfoPtr val = glob_iterator->next();
if (!val)
return {};
if (metadata->trySetFileAsProcessing(val.key))
return val;
if (auto processing_holder = metadata->trySetFileAsProcessing(val->key); processing_holder)
{
return std::make_shared<S3QueueKeyWithInfo>(val->key, val->info, std::move(processing_holder), nullptr);
}
}
}
@ -77,6 +93,7 @@ StorageS3QueueSource::StorageS3QueueSource(
, shutdown_called(shutdown_called_)
, s3_queue_log(s3_queue_log_)
, storage_id(storage_id_)
, s3_queue_user_id(fmt::format("{}:{}", CurrentThread::getQueryId(), getRandomASCIIString(8)))
, remove_file_func(remove_file_func_)
, log(&Poco::Logger::get("StorageS3QueueSource"))
{

View File

@ -19,19 +19,30 @@ class StorageS3QueueSource : public ISource, WithContext
{
public:
using IIterator = StorageS3Source::IIterator;
using KeyWithInfoPtr = StorageS3Source::KeyWithInfoPtr;
using GlobIterator = StorageS3Source::DisclosedGlobIterator;
using KeyWithInfo = StorageS3Source::KeyWithInfo;
using ZooKeeperGetter = std::function<zkutil::ZooKeeperPtr()>;
using RemoveFileFunc = std::function<void(std::string)>;
using Metadata = S3QueueFilesMetadata;
struct S3QueueKeyWithInfo : public StorageS3Source::KeyWithInfo
{
S3QueueKeyWithInfo(
const std::string & key_,
std::optional<S3::ObjectInfo> info_,
std::unique_ptr<Metadata::ProcessingHolder> processing_holder_,
std::shared_ptr<Metadata::FileStatus> file_status_);
std::unique_ptr<Metadata::ProcessingHolder> processing_holder;
std::shared_ptr<Metadata::FileStatus> file_status;
};
class FileIterator : public IIterator
{
public:
FileIterator(
std::shared_ptr<S3QueueFilesMetadata> metadata_,
std::unique_ptr<GlobIterator> glob_iterator_);
FileIterator(std::shared_ptr<S3QueueFilesMetadata> metadata_, std::unique_ptr<GlobIterator> glob_iterator_);
KeyWithInfo next() override;
KeyWithInfoPtr next() override;
size_t estimatedKeysCount() override;
@ -71,6 +82,7 @@ private:
const std::atomic<bool> & shutdown_called;
const std::shared_ptr<S3QueueLog> s3_queue_log;
const StorageID storage_id;
const std::string s3_queue_user_id;
RemoveFileFunc remove_file_func;
Poco::Logger * log;

View File

@ -161,7 +161,7 @@ public:
/// We don't have to list bucket, because there is no asterisks.
if (key_prefix.size() == globbed_uri.key.size())
{
buffer.emplace_back(globbed_uri.key, std::nullopt);
buffer.emplace_back(std::make_shared<KeyWithInfo>(globbed_uri.key, std::nullopt));
buffer_iter = buffer.begin();
is_finished = true;
return;
@ -182,7 +182,7 @@ public:
fillInternalBufferAssumeLocked();
}
KeyWithInfo next()
KeyWithInfoPtr next()
{
std::lock_guard lock(mutex);
return nextAssumeLocked();
@ -201,7 +201,7 @@ public:
private:
using ListObjectsOutcome = Aws::S3::Model::ListObjectsV2Outcome;
KeyWithInfo nextAssumeLocked()
KeyWithInfoPtr nextAssumeLocked()
{
if (buffer_iter != buffer.end())
{
@ -277,7 +277,7 @@ private:
.last_modification_time = row.GetLastModified().Millis() / 1000,
};
temp_buffer.emplace_back(std::move(key), std::move(info));
temp_buffer.emplace_back(std::make_shared<KeyWithInfo>(std::move(key), std::move(info)));
}
}
@ -289,7 +289,7 @@ private:
if (!is_initialized)
{
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, fs::path(globbed_uri.bucket) / temp_buffer.front().key, getContext());
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, fs::path(globbed_uri.bucket) / temp_buffer.front()->key, getContext());
is_initialized = true;
}
@ -298,7 +298,7 @@ private:
std::vector<String> paths;
paths.reserve(temp_buffer.size());
for (const auto & key_with_info : temp_buffer)
paths.push_back(fs::path(globbed_uri.bucket) / key_with_info.key);
paths.push_back(fs::path(globbed_uri.bucket) / key_with_info->key);
VirtualColumnUtils::filterByPathOrFile(temp_buffer, paths, query, virtual_columns, getContext(), filter_ast);
}
@ -307,8 +307,8 @@ private:
if (file_progress_callback)
{
for (const auto & [_, info] : buffer)
file_progress_callback(FileProgress(0, info->size));
for (const auto & key_with_info : buffer)
file_progress_callback(FileProgress(0, key_with_info->info->size));
}
/// Set iterator only after the whole batch is processed
@ -371,7 +371,7 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
{
}
StorageS3Source::KeyWithInfo StorageS3Source::DisclosedGlobIterator::next()
StorageS3Source::KeyWithInfoPtr StorageS3Source::DisclosedGlobIterator::next()
{
return pimpl->next();
}
@ -422,11 +422,11 @@ public:
if (read_keys_)
{
for (const auto & key : keys)
read_keys_->push_back({key, {}});
read_keys_->push_back(std::make_shared<KeyWithInfo>(key));
}
}
KeyWithInfo next()
KeyWithInfoPtr next()
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= keys.size())
@ -439,7 +439,7 @@ public:
file_progress_callback(FileProgress(0, info->size));
}
return {key, info};
return std::make_shared<KeyWithInfo>(key, info);
}
size_t objectsCount()
@ -476,7 +476,7 @@ StorageS3Source::KeysIterator::KeysIterator(
{
}
StorageS3Source::KeyWithInfo StorageS3Source::KeysIterator::next()
StorageS3Source::KeyWithInfoPtr StorageS3Source::KeysIterator::next()
{
return pimpl->next();
}
@ -502,14 +502,14 @@ StorageS3Source::ReadTaskIterator::ReadTaskIterator(
pool.wait();
buffer.reserve(max_threads_count);
for (auto & key_future : keys)
buffer.emplace_back(key_future.get(), std::nullopt);
buffer.emplace_back(std::make_shared<KeyWithInfo>(key_future.get(), std::nullopt));
}
StorageS3Source::KeyWithInfo StorageS3Source::ReadTaskIterator::next()
StorageS3Source::KeyWithInfoPtr StorageS3Source::ReadTaskIterator::next()
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= buffer.size())
return {callback(), {}};
return std::make_shared<KeyWithInfo>(callback());
return buffer[current_index];
}
@ -566,22 +566,22 @@ StorageS3Source::StorageS3Source(
StorageS3Source::ReaderHolder StorageS3Source::createReader()
{
KeyWithInfo key_with_info;
KeyWithInfoPtr key_with_info;
do
{
key_with_info = (*file_iterator)();
if (key_with_info.key.empty())
if (!key_with_info)
return {};
if (!key_with_info.info)
key_with_info.info = S3::getObjectInfo(*client, bucket, key_with_info.key, version_id, request_settings);
if (!key_with_info->info)
key_with_info->info = S3::getObjectInfo(*client, bucket, key_with_info->key, version_id, request_settings);
}
while (getContext()->getSettingsRef().s3_skip_empty_files && key_with_info.info->size == 0);
while (getContext()->getSettingsRef().s3_skip_empty_files && key_with_info->info->size == 0);
QueryPipelineBuilder builder;
std::shared_ptr<ISource> source;
std::unique_ptr<ReadBuffer> read_buf;
std::optional<size_t> num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files ? tryGetNumRowsFromCache(key_with_info) : std::nullopt;
std::optional<size_t> num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files ? tryGetNumRowsFromCache(*key_with_info) : std::nullopt;
if (num_rows_from_cache)
{
/// We should not return single chunk with all number of rows,
@ -594,8 +594,8 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
}
else
{
auto compression_method = chooseCompressionMethod(key_with_info.key, compression_hint);
read_buf = createS3ReadBuffer(key_with_info.key, key_with_info.info->size);
auto compression_method = chooseCompressionMethod(key_with_info->key, compression_hint);
read_buf = createS3ReadBuffer(key_with_info->key, key_with_info->info->size);
auto input_format = FormatFactory::instance().getInput(
format,
@ -639,7 +639,7 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
ProfileEvents::increment(ProfileEvents::EngineFileLikeReadFiles);
return ReaderHolder{key_with_info, bucket, std::move(read_buf), std::move(source), std::move(pipeline), std::move(current_reader)};
return ReaderHolder{*key_with_info, bucket, std::move(read_buf), std::move(source), std::move(pipeline), std::move(current_reader)};
}
std::future<StorageS3Source::ReaderHolder> StorageS3Source::createReaderAsync()
@ -1494,7 +1494,7 @@ namespace
{
current_key_with_info = (*file_iterator)();
if (current_key_with_info.key.empty())
if (!current_key_with_info)
{
if (first)
throw Exception(
@ -1506,6 +1506,8 @@ namespace
return nullptr;
}
chassert(!current_key_with_info->key.empty());
/// S3 file iterator could get new keys after new iteration, check them in schema cache.
if (getContext()->getSettingsRef().schema_inference_use_cache_for_s3 && read_keys.size() > prev_read_keys_size)
{
@ -1515,15 +1517,15 @@ namespace
return nullptr;
}
if (getContext()->getSettingsRef().s3_skip_empty_files && current_key_with_info.info && current_key_with_info.info->size == 0)
if (getContext()->getSettingsRef().s3_skip_empty_files && current_key_with_info->info && current_key_with_info->info->size == 0)
continue;
int zstd_window_log_max = static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max);
auto impl = std::make_unique<ReadBufferFromS3>(configuration.client, configuration.url.bucket, current_key_with_info.key, configuration.url.version_id, configuration.request_settings, getContext()->getReadSettings());
auto impl = std::make_unique<ReadBufferFromS3>(configuration.client, configuration.url.bucket, current_key_with_info->key, configuration.url.version_id, configuration.request_settings, getContext()->getReadSettings());
if (!getContext()->getSettingsRef().s3_skip_empty_files || !impl->eof())
{
first = false;
return wrapReadBufferWithCompressionMethod(std::move(impl), chooseCompressionMethod(current_key_with_info.key, configuration.compression_method), zstd_window_log_max);
return wrapReadBufferWithCompressionMethod(std::move(impl), chooseCompressionMethod(current_key_with_info->key, configuration.compression_method), zstd_window_log_max);
}
}
}
@ -1538,7 +1540,7 @@ namespace
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3)
return;
String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket / current_key_with_info.key;
String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket / current_key_with_info->key;
auto key = getKeyForSchemaCache(source, configuration.format, format_settings, getContext());
StorageS3::getSchemaCache(getContext()).addNumRows(key, num_rows);
}
@ -1549,7 +1551,7 @@ namespace
const StorageS3::Configuration & configuration;
const std::optional<FormatSettings> & format_settings;
std::optional<ColumnsDescription> columns_from_cache;
StorageS3Source::KeyWithInfo current_key_with_info;
StorageS3Source::KeyWithInfoPtr current_key_with_info;
size_t prev_read_keys_size;
bool first = true;
};
@ -1689,9 +1691,9 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
auto get_last_mod_time = [&]
{
time_t last_modification_time = 0;
if (it->info)
if ((*it)->info)
{
last_modification_time = it->info->last_modification_time;
last_modification_time = (*it)->info->last_modification_time;
}
else
{
@ -1701,7 +1703,7 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
last_modification_time = S3::getObjectInfo(
*configuration.client,
configuration.url.bucket,
it->key,
(*it)->key,
configuration.url.version_id,
configuration.request_settings,
/*with_metadata=*/ false,
@ -1712,7 +1714,7 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
return last_modification_time ? std::make_optional(last_modification_time) : std::nullopt;
};
String path = fs::path(configuration.url.bucket) / it->key;
String path = fs::path(configuration.url.bucket) / (*it)->key;
String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / path;
auto cache_key = getKeyForSchemaCache(source, configuration.format, format_settings, ctx);
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
@ -1734,7 +1736,7 @@ void StorageS3::addColumnsToCache(
auto host_and_bucket = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket;
Strings sources;
sources.reserve(keys.size());
std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket / elem.key; });
std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket / elem->key; });
auto cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx);
auto & schema_cache = getSchemaCache(ctx);
schema_cache.addManyColumns(cache_keys, columns);

View File

@ -43,22 +43,21 @@ public:
struct KeyWithInfo
{
KeyWithInfo() = default;
KeyWithInfo(String key_, std::optional<S3::ObjectInfo> info_)
: key(std::move(key_)), info(std::move(info_))
{
}
explicit KeyWithInfo(String key_, std::optional<S3::ObjectInfo> info_ = std::nullopt)
: key(std::move(key_)), info(std::move(info_)) {}
String key;
std::optional<S3::ObjectInfo> info;
};
using KeyWithInfoPtr = std::shared_ptr<KeyWithInfo>;
using KeysWithInfo = std::vector<KeyWithInfo>;
using KeysWithInfo = std::vector<KeyWithInfoPtr>;
class IIterator
{
public:
virtual ~IIterator() = default;
virtual KeyWithInfo next() = 0;
virtual KeyWithInfoPtr next() = 0;
/// Estimates how many streams we need to process all files.
/// If keys count >= max_threads_count, the returned number may not represent the actual number of the keys.
@ -66,7 +65,7 @@ public:
/// fixme: May underestimate if the glob has a strong filter, so there are few matches among the first 1000 ListObjects results.
virtual size_t estimatedKeysCount() = 0;
KeyWithInfo operator ()() { return next(); }
KeyWithInfoPtr operator ()() { return next(); }
};
class DisclosedGlobIterator : public IIterator
@ -82,7 +81,7 @@ public:
const S3Settings::RequestSettings & request_settings_ = {},
std::function<void(FileProgress)> progress_callback_ = {});
KeyWithInfo next() override;
KeyWithInfoPtr next() override;
size_t estimatedKeysCount() override;
private:
@ -106,7 +105,7 @@ public:
KeysWithInfo * read_keys = nullptr,
std::function<void(FileProgress)> progress_callback_ = {});
KeyWithInfo next() override;
KeyWithInfoPtr next() override;
size_t estimatedKeysCount() override;
private:
@ -120,7 +119,7 @@ public:
public:
explicit ReadTaskIterator(const ReadTaskCallback & callback_, const size_t max_threads_count);
KeyWithInfo next() override;
KeyWithInfoPtr next() override;
size_t estimatedKeysCount() override;
private:

View File

@ -82,7 +82,7 @@ RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(ASTPtr
{
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.url, query, virtual_columns, context, nullptr, s3_configuration.request_settings, context->getFileProgressCallback());
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String { return iterator->next().key; });
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String { return iterator->next()->key; });
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}