This commit is contained in:
kssenii 2023-08-01 14:31:05 +02:00
parent 08f5ebf3e8
commit a14a6b56b3
7 changed files with 489 additions and 493 deletions

View File

@ -0,0 +1,351 @@
#include "IO/VarInt.h"
#include "config.h"
#if USE_AWS_S3
# include <algorithm>
# include <IO/Operators.h>
# include <IO/ReadBufferFromString.h>
# include <IO/ReadHelpers.h>
# include <Storages/S3Queue/S3QueueFilesMetadata.h>
# include <Storages/S3Queue/StorageS3Queue.h>
# include <Storages/StorageS3Settings.h>
# include <Storages/StorageSnapshot.h>
# include <base/sleep.h>
# include <Common/ZooKeeper/ZooKeeper.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
}
namespace
{
UInt64 getCurrentTime()
{
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}
}
void S3QueueFilesMetadata::S3QueueCollection::read(ReadBuffer & in)
{
files = {};
if (in.eof())
return;
size_t files_num;
in >> files_num >> "\n";
while (files_num--)
{
TrackedCollectionItem item;
in >> item.file_path >> "\n";
in >> item.timestamp >> "\n";
in >> item.retries_count >> "\n";
in >> item.last_exception >> "\n";
files.push_back(item);
}
}
void S3QueueFilesMetadata::S3QueueCollection::write(WriteBuffer & out) const
{
out << files.size() << "\n";
for (const auto & processed_file : files)
{
out << processed_file.file_path << "\n";
out << processed_file.timestamp << "\n";
out << processed_file.retries_count << "\n";
out << processed_file.last_exception << "\n";
}
}
String S3QueueFilesMetadata::S3QueueCollection::toString() const
{
WriteBufferFromOwnString out;
write(out);
return out.str();
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::S3QueueCollection::getFileNames()
{
S3FilesCollection keys = {};
for (const auto & pair : files)
keys.insert(pair.file_path);
return keys;
}
S3QueueFilesMetadata::S3QueueProcessedCollection::S3QueueProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_)
: max_size(max_size_), max_age(max_age_)
{
}
void S3QueueFilesMetadata::S3QueueProcessedCollection::parse(const String & collection_str)
{
ReadBufferFromString buf(collection_str);
read(buf);
if (max_age > 0) // Remove old items
{
std::erase_if(
files,
[timestamp = getCurrentTime(), this](const TrackedCollectionItem & processed_file)
{ return (timestamp - processed_file.timestamp) > max_age; });
}
}
void S3QueueFilesMetadata::S3QueueProcessedCollection::add(const String & file_name)
{
TrackedCollectionItem processed_file = { .file_path=file_name, .timestamp = getCurrentTime() };
files.push_back(processed_file);
/// TODO: it is strange that in parse() we take into account only max_age, but here only max_size.
while (files.size() > max_size)
{
files.pop_front();
}
}
S3QueueFilesMetadata::S3QueueFailedCollection::S3QueueFailedCollection(const UInt64 & max_retries_count_)
: max_retries_count(max_retries_count_)
{
}
void S3QueueFilesMetadata::S3QueueFailedCollection::parse(const String & collection_str)
{
ReadBufferFromString buf(collection_str);
read(buf);
}
bool S3QueueFilesMetadata::S3QueueFailedCollection::add(const String & file_name, const String & exception_message)
{
auto failed_it = std::find_if(
files.begin(), files.end(),
[&file_name](const TrackedCollectionItem & s) { return s.file_path == file_name; });
if (failed_it == files.end())
{
files.emplace_back(file_name, 0, max_retries_count, exception_message);
}
else if (failed_it->retries_count == 0 || --failed_it->retries_count == 0)
{
return false;
}
return true;
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::S3QueueFailedCollection::getFileNames()
{
S3FilesCollection failed_keys;
for (const auto & pair : files)
{
if (pair.retries_count == 0)
failed_keys.insert(pair.file_path);
}
return failed_keys;
}
void S3QueueFilesMetadata::S3QueueProcessingCollection::parse(const String & collection_str)
{
ReadBufferFromString rb(collection_str);
Strings result;
readQuoted(result, rb);
files = S3FilesCollection(result.begin(), result.end());
}
void S3QueueFilesMetadata::S3QueueProcessingCollection::add(const Strings & file_names)
{
files.insert(file_names.begin(), file_names.end());
}
void S3QueueFilesMetadata::S3QueueProcessingCollection::remove(const String & file_name)
{
files.erase(file_name);
}
String S3QueueFilesMetadata::S3QueueProcessingCollection::toString() const
{
return DB::toString(Strings(files.begin(), files.end()));
}
S3QueueFilesMetadata::S3QueueFilesMetadata(
const StorageS3Queue * storage_,
const S3QueueSettings & settings_)
: storage(storage_)
, mode(settings_.mode)
, max_set_size(settings_.s3queue_tracked_files_limit.value)
, max_set_age_sec(settings_.s3queue_tracked_file_ttl_sec.value)
, max_loading_retries(settings_.s3queue_loading_retries.value)
, zookeeper_processing_path(fs::path(storage->getZooKeeperPath()) / "processing")
, zookeeper_processed_path(fs::path(storage->getZooKeeperPath()) / "processed")
, zookeeper_failed_path(fs::path(storage->getZooKeeperPath()) / "failed")
, zookeeper_lock_path(fs::path(storage->getZooKeeperPath()) / "lock")
, log(&Poco::Logger::get("S3QueueFilesMetadata"))
{
}
void S3QueueFilesMetadata::setFileProcessed(const String & file_path)
{
auto zookeeper = storage->getZooKeeper();
auto lock = acquireLock(zookeeper);
switch (mode)
{
case S3QueueMode::UNORDERED:
{
S3QueueProcessedCollection processed_files(max_set_size, max_set_age_sec);
processed_files.parse(zookeeper->get(zookeeper_processed_path));
processed_files.add(file_path);
zookeeper->set(zookeeper_processed_path, processed_files.toString());
break;
}
case S3QueueMode::ORDERED:
{
// Check that we set in ZooKeeper node only maximum processed file path.
// This check can be useful, when multiple table engines consume in ordered mode.
String max_file = getMaxProcessedFile();
if (max_file.compare(file_path) <= 0)
zookeeper->set(zookeeper_processed_path, file_path);
break;
}
}
removeProcessingFile(file_path);
}
bool S3QueueFilesMetadata::setFileFailed(const String & file_path, const String & exception_message)
{
auto zookeeper = storage->getZooKeeper();
auto lock = acquireLock(zookeeper);
S3QueueFailedCollection failed_collection(max_loading_retries);
failed_collection.parse(zookeeper->get(zookeeper_failed_path));
const bool can_be_retried = failed_collection.add(file_path, exception_message);
zookeeper->set(zookeeper_failed_path, failed_collection.toString());
removeProcessingFile(file_path);
return can_be_retried;
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getFailedFiles()
{
auto zookeeper = storage->getZooKeeper();
String failed_files = zookeeper->get(zookeeper_failed_path);
S3QueueFailedCollection failed_collection(max_loading_retries);
failed_collection.parse(failed_files);
return failed_collection.getFileNames();
}
String S3QueueFilesMetadata::getMaxProcessedFile()
{
auto zookeeper = storage->getZooKeeper();
return zookeeper->get(zookeeper_processed_path);
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getProcessingFiles()
{
auto zookeeper = storage->getZooKeeper();
String processing_files;
if (!zookeeper->tryGet(zookeeper_processing_path, processing_files))
return {};
S3QueueProcessingCollection processing_collection;
if (!processing_files.empty())
processing_collection.parse(processing_files);
return processing_collection.getFileNames();
}
void S3QueueFilesMetadata::setFilesProcessing(const Strings & file_paths)
{
auto zookeeper = storage->getZooKeeper();
String processing_files;
zookeeper->tryGet(zookeeper_processing_path, processing_files);
S3QueueProcessingCollection processing_collection;
if (!processing_files.empty())
processing_collection.parse(processing_files);
processing_collection.add(file_paths);
if (zookeeper->exists(zookeeper_processing_path))
zookeeper->set(zookeeper_processing_path, processing_collection.toString());
else
zookeeper->create(zookeeper_processing_path, processing_collection.toString(), zkutil::CreateMode::Ephemeral);
}
void S3QueueFilesMetadata::removeProcessingFile(const String & file_path)
{
auto zookeeper = storage->getZooKeeper();
String processing_files;
zookeeper->tryGet(zookeeper_processing_path, processing_files);
S3QueueProcessingCollection processing_collection;
processing_collection.parse(processing_files);
processing_collection.remove(file_path);
zookeeper->set(zookeeper_processing_path, processing_collection.toString());
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getUnorderedProcessedFiles()
{
auto zookeeper = storage->getZooKeeper();
S3QueueProcessedCollection processed_collection(max_set_size, max_set_age_sec);
processed_collection.parse(zookeeper->get(zookeeper_processed_path));
return processed_collection.getFileNames();
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getProcessedFailedAndProcessingFiles()
{
S3FilesCollection processed_and_failed_files = getFailedFiles();
switch (mode)
{
case S3QueueMode::UNORDERED:
{
processed_and_failed_files.merge(getUnorderedProcessedFiles());
break;
}
case S3QueueMode::ORDERED:
{
processed_and_failed_files.insert(getMaxProcessedFile());
break;
}
}
processed_and_failed_files.merge(getProcessingFiles());
return processed_and_failed_files;
}
std::shared_ptr<zkutil::EphemeralNodeHolder> S3QueueFilesMetadata::acquireLock(zkutil::ZooKeeperPtr zookeeper)
{
UInt32 retry_count = 200;
UInt32 sleep_ms = 100;
UInt32 retries = 0;
while (true)
{
Coordination::Error code = zookeeper->tryCreate(zookeeper_lock_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
{
retries++;
if (retries > retry_count)
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Can't acquire zookeeper lock");
}
sleepForMilliseconds(sleep_ms);
}
else if (code != Coordination::Error::ZOK)
{
throw Coordination::Exception(code, zookeeper_lock_path);
}
else
{
return zkutil::EphemeralNodeHolder::existing(zookeeper_lock_path, *zookeeper);
}
}
}
}
#endif

View File

@ -9,7 +9,10 @@
namespace DB
{
class S3QueueHolder : public WithContext
class StorageS3Queue;
struct S3QueueSettings;
class S3QueueFilesMetadata
{
public:
struct TrackedCollectionItem
@ -23,27 +26,21 @@ public:
using S3FilesCollection = std::unordered_set<String>;
using TrackedFiles = std::deque<TrackedCollectionItem>;
S3QueueHolder(
const String & zookeeper_path_,
const S3QueueMode & mode_,
ContextPtr context_,
UInt64 & max_set_size_,
UInt64 & max_set_age_sec_,
UInt64 & max_loading_retries_);
S3QueueFilesMetadata(const StorageS3Queue * storage_, const S3QueueSettings & settings_);
void setFilesProcessing(const Strings & file_paths);
void setFileProcessed(const String & file_path);
bool setFileFailed(const String & file_path, const String & exception_message);
void setFilesProcessing(Strings & file_paths);
S3FilesCollection getProcessedAndFailedFiles();
String getMaxProcessedFile();
std::shared_ptr<zkutil::EphemeralNodeHolder> acquireLock();
S3FilesCollection getProcessedFailedAndProcessingFiles();
String getMaxProcessedFile();
std::shared_ptr<zkutil::EphemeralNodeHolder> acquireLock(zkutil::ZooKeeperPtr zookeeper);
struct S3QueueCollection
{
public:
virtual ~S3QueueCollection() = default;
String toString() const;
virtual String toString() const;
S3FilesCollection getFileNames();
virtual void parse(const String & collection_str) = 0;
@ -82,30 +79,42 @@ public:
UInt64 max_retries_count;
};
struct S3QueueProcessingCollection
{
public:
S3QueueProcessingCollection() = default;
void parse(const String & collection_str);
void add(const Strings & file_names);
void remove(const String & file_name);
String toString() const;
const S3FilesCollection & getFileNames() const { return files; }
private:
S3FilesCollection files;
};
private:
const StorageS3Queue * storage;
const S3QueueMode mode;
const UInt64 max_set_size;
const UInt64 max_set_age_sec;
const UInt64 max_loading_retries;
zkutil::ZooKeeperPtr zk_client;
mutable std::mutex current_zookeeper_mutex;
mutable std::mutex mutex;
const String zookeeper_path;
const String zookeeper_failed_path;
const String zookeeper_processing_path;
const String zookeeper_processed_path;
const String zookeeper_failed_path;
const String zookeeper_lock_path;
const S3QueueMode mode;
const UUID table_uuid;
mutable std::mutex mutex;
Poco::Logger * log;
S3FilesCollection getFailedFiles();
S3FilesCollection getProcessingFiles();
S3FilesCollection getUnorderedProcessedFiles();
void removeProcessingFile(const String & file_path);
S3FilesCollection parseCollection(const String & collection_str);
void removeProcessingFile(const String & file_path);
};

View File

@ -1,341 +0,0 @@
#include "IO/VarInt.h"
#include "config.h"
#if USE_AWS_S3
# include <algorithm>
# include <IO/Operators.h>
# include <IO/ReadBufferFromString.h>
# include <IO/ReadHelpers.h>
# include <Storages/S3Queue/S3QueueHolder.h>
# include <Storages/StorageS3Settings.h>
# include <Storages/StorageSnapshot.h>
# include <base/sleep.h>
# include <Common/ZooKeeper/ZooKeeper.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
}
/// TODO: update zk session if expired
void S3QueueHolder::S3QueueCollection::read(ReadBuffer & in)
{
files = {};
if (in.eof())
return;
size_t files_num;
in >> files_num >> "\n";
while (files_num--)
{
TrackedCollectionItem item;
in >> item.file_path >> "\n";
in >> item.timestamp >> "\n";
in >> item.retries_count >> "\n";
in >> item.last_exception >> "\n";
files.push_back(item);
}
}
void S3QueueHolder::S3QueueCollection::write(WriteBuffer & out) const
{
out << files.size() << "\n";
for (const auto & processed_file : files)
{
out << processed_file.file_path << "\n";
out << processed_file.timestamp << "\n";
out << processed_file.retries_count << "\n";
out << processed_file.last_exception << "\n";
}
}
String S3QueueHolder::S3QueueCollection::toString() const
{
WriteBufferFromOwnString out;
write(out);
return out.str();
}
S3QueueHolder::S3FilesCollection S3QueueHolder::S3QueueCollection::getFileNames()
{
S3FilesCollection keys = {};
for (const auto & pair : files)
{
keys.insert(pair.file_path);
}
return keys;
}
S3QueueHolder::S3QueueProcessedCollection::S3QueueProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_)
: max_size(max_size_), max_age(max_age_)
{
}
void S3QueueHolder::S3QueueProcessedCollection::parse(const String & collection_str)
{
ReadBufferFromString buf(collection_str);
read(buf);
if (max_age > 0) // Remove old items
{
UInt64 timestamp = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
UInt64 max_seconds_diff = max_age;
std::erase_if(
files,
[&timestamp, &max_seconds_diff](const TrackedCollectionItem & processed_file)
{ return (timestamp - processed_file.timestamp) > max_seconds_diff; });
}
}
void S3QueueHolder::S3QueueProcessedCollection::add(const String & file_name)
{
UInt64 timestamp = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
TrackedCollectionItem processed_file = {.file_path=file_name, .timestamp=timestamp};
files.push_back(processed_file);
while (files.size() > max_size)
{
files.pop_front();
}
}
S3QueueHolder::S3QueueFailedCollection::S3QueueFailedCollection(const UInt64 & max_retries_count_) : max_retries_count(max_retries_count_)
{
}
void S3QueueHolder::S3QueueFailedCollection::parse(const String & collection_str)
{
ReadBufferFromString buf(collection_str);
read(buf);
}
bool S3QueueHolder::S3QueueFailedCollection::add(const String & file_name, const String & exception_message)
{
auto failed_it
= std::find_if(files.begin(), files.end(), [&file_name](const TrackedCollectionItem & s) { return s.file_path == file_name; });
if (failed_it != files.end())
{
if (failed_it->retries_count == 0 || --failed_it->retries_count == 0)
{
return false;
}
}
else
{
TrackedCollectionItem failed_file = { .file_path=file_name, .retries_count=max_retries_count, .last_exception = exception_message };
files.push_back(failed_file);
}
return true;
}
S3QueueHolder::S3FilesCollection S3QueueHolder::S3QueueFailedCollection::getFileNames()
{
S3FilesCollection failed_keys;
for (const auto & pair : files)
{
if (pair.retries_count <= 0)
{
failed_keys.insert(pair.file_path);
}
}
return failed_keys;
}
S3QueueHolder::S3QueueHolder(
const String & zookeeper_path_,
const S3QueueMode & mode_,
ContextPtr context_,
UInt64 & max_set_size_,
UInt64 & max_set_age_sec_,
UInt64 & max_loading_retries_)
: WithContext(context_)
, max_set_size(max_set_size_)
, max_set_age_sec(max_set_age_sec_)
, max_loading_retries(max_loading_retries_)
, zk_client(getContext()->getZooKeeper())
, zookeeper_path(zookeeper_path_)
, zookeeper_failed_path(fs::path(zookeeper_path_) / "failed")
, zookeeper_processing_path(fs::path(zookeeper_path_) / "processing")
, zookeeper_processed_path(fs::path(zookeeper_path_) / "processed")
, zookeeper_lock_path(fs::path(zookeeper_path_) / "lock")
, mode(mode_)
, log(&Poco::Logger::get("S3QueueHolder"))
{
}
void S3QueueHolder::setFileProcessed(const String & file_path)
{
auto lock = acquireLock();
if (mode == S3QueueMode::UNORDERED)
{
String processed_files = zk_client->get(zookeeper_processed_path);
auto processed = S3QueueProcessedCollection(max_set_size, max_set_age_sec);
processed.parse(processed_files);
processed.add(file_path);
zk_client->set(zookeeper_processed_path, processed.toString());
}
else if (mode == S3QueueMode::ORDERED)
{
String max_file = getMaxProcessedFile();
// Check that we set in ZooKeeper node only maximum processed file path.
// This check can be useful, when multiple table engines consume in ordered mode.
if (max_file.compare(file_path) <= 0)
{
zk_client->set(zookeeper_processed_path, file_path);
}
}
removeProcessingFile(file_path);
}
bool S3QueueHolder::setFileFailed(const String & file_path, const String & exception_message)
{
auto lock = acquireLock();
auto failed_collection = S3QueueFailedCollection(max_loading_retries);
failed_collection.parse(zk_client->get(zookeeper_failed_path));
bool retry_later = failed_collection.add(file_path, exception_message);
zk_client->set(zookeeper_failed_path, failed_collection.toString());
removeProcessingFile(file_path);
return retry_later;
}
S3QueueHolder::S3FilesCollection S3QueueHolder::getFailedFiles()
{
String failed_files = zk_client->get(zookeeper_failed_path);
auto failed_collection = S3QueueFailedCollection(max_loading_retries);
failed_collection.parse(failed_files);
return failed_collection.getFileNames();
}
String S3QueueHolder::getMaxProcessedFile()
{
String processed = zk_client->get(zookeeper_processed_path);
return processed;
}
S3QueueHolder::S3FilesCollection S3QueueHolder::getProcessingFiles()
{
String processing_files;
if (!zk_client->tryGet(zookeeper_processing_path, processing_files))
return {};
return parseCollection(processing_files);
}
void S3QueueHolder::setFilesProcessing(Strings & file_paths)
{
std::unordered_set<String> processing_files(file_paths.begin(), file_paths.end());
processing_files.merge(getProcessingFiles());
String processing_files_str = toString(Strings(processing_files.begin(), processing_files.end()));
if (zk_client->exists(zookeeper_processing_path))
zk_client->set(fs::path(zookeeper_processing_path), processing_files_str);
else
zk_client->create(fs::path(zookeeper_processing_path), processing_files_str, zkutil::CreateMode::Ephemeral);
}
S3QueueHolder::S3FilesCollection S3QueueHolder::getUnorderedProcessedFiles()
{
String processed = zk_client->get(zookeeper_processed_path);
auto collection = S3QueueProcessedCollection(max_set_size, max_set_age_sec);
collection.parse(processed);
return collection.getFileNames();
}
S3QueueHolder::S3FilesCollection S3QueueHolder::getProcessedAndFailedFiles()
{
S3FilesCollection processed_and_failed_files = getFailedFiles();
if (mode == S3QueueMode::UNORDERED)
{
processed_and_failed_files.merge(getUnorderedProcessedFiles());
}
else
{
String processed = getMaxProcessedFile();
processed_and_failed_files.insert(processed);
}
S3FilesCollection processing_files = getProcessingFiles();
processed_and_failed_files.merge(processing_files);
return processed_and_failed_files;
}
void S3QueueHolder::removeProcessingFile(const String & file_path)
{
String node_data;
String processing = zk_client->get(zookeeper_processing_path);
S3FilesCollection processing_files = parseCollection(processing);
processing_files.erase(file_path);
Strings file_paths(processing_files.begin(), processing_files.end());
zk_client->set(fs::path(zookeeper_processing_path), toString(file_paths));
}
std::shared_ptr<zkutil::EphemeralNodeHolder> S3QueueHolder::acquireLock()
{
UInt32 retry_count = 200;
UInt32 sleep_ms = 100;
UInt32 retries = 0;
while (true)
{
Coordination::Error code = zk_client->tryCreate(zookeeper_lock_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
{
retries++;
if (retries > retry_count)
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Can't acquire zookeeper lock");
}
sleepForMilliseconds(sleep_ms);
}
else if (code != Coordination::Error::ZOK)
{
throw Coordination::Exception(code, zookeeper_lock_path);
}
else
{
return zkutil::EphemeralNodeHolder::existing(zookeeper_lock_path, *zk_client);
}
}
}
S3QueueHolder::S3FilesCollection S3QueueHolder::parseCollection(const String & collection_str)
{
ReadBufferFromString rb(collection_str);
Strings deserialized;
try
{
readQuoted(deserialized, rb);
}
catch (const Exception & e)
{
LOG_WARNING(log, "Can't parse collection from ZooKeeper node: {}", e.displayText());
deserialized = {};
}
return std::unordered_set<String>(deserialized.begin(), deserialized.end());
}
}
#endif

View File

@ -171,7 +171,7 @@ StorageS3QueueSource::StorageS3QueueSource(
const String & bucket_,
const String & version_id_,
std::shared_ptr<IIterator> file_iterator_,
std::shared_ptr<S3QueueHolder> queue_holder_,
std::shared_ptr<S3QueueFilesMetadata> files_metadata_,
const S3QueueAction & action_,
const size_t download_thread_num_)
: ISource(getHeader(sample_block_, requested_virtual_columns_))
@ -183,7 +183,7 @@ StorageS3QueueSource::StorageS3QueueSource(
, columns_desc(columns_)
, request_settings(request_settings_)
, client(client_)
, queue_holder(queue_holder_)
, files_metadata(files_metadata_)
, requested_virtual_columns(requested_virtual_columns_)
, file_iterator(file_iterator_)
, action(action_)
@ -259,13 +259,13 @@ Chunk StorageS3QueueSource::generate()
catch (const Exception & e)
{
LOG_ERROR(log, "Exception in chunk pulling: {} ", e.displayText());
queue_holder->setFileFailed(reader.getFile(), e.message());
files_metadata->setFileFailed(reader.getFile(), e.message());
success_in_pulling = false;
}
if (success_in_pulling)
{
applyActionAfterProcessing(reader.getFile());
queue_holder->setFileProcessed(reader.getFile());
files_metadata->setFileProcessed(reader.getFile());
return chunk;
}

View File

@ -8,7 +8,7 @@
# include <Compression/CompressionInfo.h>
# include <Storages/IStorage.h>
# include <Storages/S3Queue/S3QueueHolder.h>
# include <Storages/S3Queue/S3QueueFilesMetadata.h>
# include <Storages/StorageS3.h>
# include <Storages/StorageS3Settings.h>
@ -81,7 +81,7 @@ public:
const String & bucket,
const String & version_id,
std::shared_ptr<IIterator> file_iterator_,
std::shared_ptr<S3QueueHolder> queue_holder_,
std::shared_ptr<S3QueueFilesMetadata> files_metadata_,
const S3QueueAction & action_,
size_t download_thread_num);
@ -101,7 +101,7 @@ private:
S3Settings::RequestSettings request_settings;
std::shared_ptr<const S3::Client> client;
std::shared_ptr<S3QueueHolder> queue_holder;
std::shared_ptr<S3QueueFilesMetadata> files_metadata;
using ReaderHolder = StorageS3Source::ReaderHolder;
ReaderHolder reader;

View File

@ -93,23 +93,21 @@ StorageS3Queue::StorageS3Queue(
: IStorage(table_id_)
, WithContext(context_)
, s3queue_settings(std::move(s3queue_settings_))
, s3_configuration{configuration_}
, keys({s3_configuration.url.key})
, mode(s3queue_settings->mode)
, after_processing(s3queue_settings->after_processing)
, milliseconds_to_wait(s3queue_settings->s3queue_polling_min_timeout_ms)
, format_name(configuration_.format)
, compression_method(configuration_.compression_method)
, name(s3_configuration.url.storage_name)
, configuration{configuration_}
, reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms)
, format_settings(format_settings_)
, partition_by(partition_by_)
, log(&Poco::Logger::get("StorageS3Queue (" + table_id_.table_name + ")"))
{
if (!withGlobs())
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue engine can read only from url with globs");
if (configuration.url.key.ends_with('/'))
configuration.url.key += '*';
String setting_zookeeper_path = s3queue_settings->keeper_path;
if (setting_zookeeper_path.empty())
if (!withGlobs())
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
String setting_zk_path = s3queue_settings->keeper_path;
if (setting_zk_path.empty())
{
auto database = DatabaseCatalog::instance().getDatabase(table_id_.database_name);
bool is_in_replicated_database = database->getEngineName() == "Replicated";
@ -135,25 +133,25 @@ StorageS3Queue::StorageS3Queue(
"s3queue_default_zookeeper_path_prefix not specified");
}
zookeeper_path = zkutil::extractZooKeeperPath(
zk_path = zkutil::extractZooKeeperPath(
fs::path(zk_path_prefix) / toString(table_id_.uuid), /* check_starts_with_slash */ true, log);
}
else
{
/// We do not add table uuid here on purpose.
zookeeper_path = zkutil::extractZooKeeperPath(s3queue_settings->keeper_path.value, /* check_starts_with_slash */ true, log);
zk_path = zkutil::extractZooKeeperPath(s3queue_settings->keeper_path.value, /* check_starts_with_slash */ true, log);
}
LOG_INFO(log, "Using zookeeper path: {}", zookeeper_path);
LOG_INFO(log, "Using zookeeper path: {}", zk_path);
FormatFactory::instance().checkFormatName(format_name);
context_->getGlobalContext()->getRemoteHostFilter().checkURL(s3_configuration.url.uri);
FormatFactory::instance().checkFormatName(configuration.format);
context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration.url.uri);
StorageInMemoryMetadata storage_metadata;
s3_configuration.update(context_);
configuration.update(context_);
if (columns_.empty())
{
auto columns = StorageS3::getTableStructureFromDataImpl(s3_configuration, format_settings, context_);
auto columns = StorageS3::getTableStructureFromDataImpl(configuration, format_settings, context_);
storage_metadata.setColumns(columns);
}
else
@ -163,22 +161,15 @@ StorageS3Queue::StorageS3Queue(
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
setZooKeeper();
auto metadata_snapshot = getInMemoryMetadataPtr();
const bool is_first_replica = createTableIfNotExists(metadata_snapshot);
if (!is_first_replica)
{
checkTableStructure(zookeeper_path, metadata_snapshot);
checkTableStructure(zk_path, metadata_snapshot);
}
queue_holder = std::make_unique<S3QueueHolder>(
zookeeper_path,
mode,
getContext(),
s3queue_settings->s3queue_tracked_files_limit.value,
s3queue_settings->s3queue_tracked_file_ttl_sec.value,
s3queue_settings->s3queue_loading_retries.value);
files_metadata = std::make_shared<S3QueueFilesMetadata>(this, *s3queue_settings);
auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
@ -196,12 +187,12 @@ StorageS3Queue::StorageS3Queue(
bool StorageS3Queue::supportsSubcolumns() const
{
return FormatFactory::instance().checkIfFormatSupportsSubcolumns(format_name);
return FormatFactory::instance().checkIfFormatSupportsSubcolumns(configuration.format);
}
bool StorageS3Queue::supportsSubsetOfColumns() const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
}
Pipe StorageS3Queue::read(
@ -220,7 +211,7 @@ Pipe StorageS3Queue::read(
if (mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageS3Queue with attached materialized views");
auto query_s3_configuration = updateConfigurationAndGetCopy(local_context);
auto query_configuration = updateConfigurationAndGetCopy(local_context);
Pipes pipes;
@ -262,24 +253,23 @@ Pipe StorageS3Queue::read(
}
const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
auto zookeeper = getZooKeeper();
return Pipe(std::make_shared<StorageS3QueueSource>(
requested_virtual_columns,
format_name,
configuration.format,
getName(),
block_for_format,
local_context,
format_settings,
columns_description,
max_block_size,
query_s3_configuration.request_settings,
compression_method,
query_s3_configuration.client,
query_s3_configuration.url.bucket,
query_s3_configuration.url.version_id,
query_configuration.request_settings,
configuration.compression_method,
query_configuration.client,
query_configuration.url.bucket,
query_configuration.url.version_id,
iterator_wrapper,
queue_holder,
files_metadata,
after_processing,
max_download_threads));
}
@ -387,7 +377,7 @@ void StorageS3Queue::threadFunc()
break;
}
milliseconds_to_wait = s3queue_settings->s3queue_polling_min_timeout_ms;
reschedule_processing_interval_ms = s3queue_settings->s3queue_polling_min_timeout_ms;
}
}
}
@ -402,9 +392,9 @@ void StorageS3Queue::threadFunc()
{
LOG_TRACE(log, "Reschedule S3 Queue thread func.");
/// Reschedule with backoff.
if (milliseconds_to_wait < s3queue_settings->s3queue_polling_max_timeout_ms)
milliseconds_to_wait += s3queue_settings->s3queue_polling_backoff_ms;
task->holder->scheduleAfter(milliseconds_to_wait);
if (reschedule_processing_interval_ms < s3queue_settings->s3queue_polling_max_timeout_ms)
reschedule_processing_interval_ms += s3queue_settings->s3queue_polling_backoff_ms;
task->holder->scheduleAfter(reschedule_processing_interval_ms);
}
}
@ -426,7 +416,7 @@ void StorageS3Queue::streamToViews()
auto s3queue_context = Context::createCopy(getContext());
s3queue_context->makeQueryContext();
auto query_s3_configuration = updateConfigurationAndGetCopy(s3queue_context);
auto query_configuration = updateConfigurationAndGetCopy(s3queue_context);
// Create a stream for each consumer and join them in a union stream
// Only insert into dependent views and expect that input blocks contain virtual columns
@ -473,23 +463,22 @@ void StorageS3Queue::streamToViews()
Pipes pipes;
auto zookeeper = getZooKeeper();
auto pipe = Pipe(std::make_shared<StorageS3QueueSource>(
requested_virtual_columns,
format_name,
configuration.format,
getName(),
block_for_format,
s3queue_context,
format_settings,
columns_description,
block_size,
query_s3_configuration.request_settings,
compression_method,
query_s3_configuration.client,
query_s3_configuration.url.bucket,
query_s3_configuration.url.version_id,
query_configuration.request_settings,
configuration.compression_method,
query_configuration.client,
query_configuration.url.bucket,
query_configuration.url.version_id,
iterator_wrapper,
queue_holder,
files_metadata,
after_processing,
max_download_threads));
@ -505,65 +494,56 @@ void StorageS3Queue::streamToViews()
StorageS3Queue::Configuration StorageS3Queue::updateConfigurationAndGetCopy(ContextPtr local_context)
{
s3_configuration.update(local_context);
return s3_configuration;
}
void StorageS3Queue::setZooKeeper()
{
std::lock_guard lock(current_zookeeper_mutex);
current_zookeeper = getContext()->getZooKeeper();
}
zkutil::ZooKeeperPtr StorageS3Queue::tryGetZooKeeper() const
{
std::lock_guard lock(current_zookeeper_mutex);
return current_zookeeper;
configuration.update(local_context);
return configuration;
}
zkutil::ZooKeeperPtr StorageS3Queue::getZooKeeper() const
{
auto res = tryGetZooKeeper();
if (!res)
throw Exception(ErrorCodes::NO_ZOOKEEPER, "Cannot get ZooKeeper");
return res;
std::lock_guard lock{zk_mutex};
if (!zk_client || zk_client->expired())
{
zk_client = getContext()->getZooKeeper();
zk_client->sync(zk_path);
}
return zk_client;
}
bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot)
{
auto zookeeper = getZooKeeper();
zookeeper->createAncestors(zookeeper_path);
zookeeper->createAncestors(zk_path);
for (size_t i = 0; i < zk_create_table_retries; ++i)
{
Coordination::Requests ops;
bool is_first_replica = true;
if (zookeeper->exists(zookeeper_path + "/metadata"))
if (zookeeper->exists(zk_path + "/metadata"))
{
if (!zookeeper->exists(zookeeper_path + "/processing"))
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/processing", "", zkutil::CreateMode::Ephemeral));
LOG_DEBUG(log, "This table {} is already created, will use existing metadata for checking engine settings", zookeeper_path);
if (!zookeeper->exists(zk_path + "/processing"))
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processing", "", zkutil::CreateMode::Ephemeral));
LOG_DEBUG(log, "This table {} is already created, will use existing metadata for checking engine settings", zk_path);
is_first_replica = false;
}
else
{
String metadata_str = S3QueueTableMetadata(s3_configuration, *s3queue_settings).toString();
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/processed", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/failed", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/processing", "", zkutil::CreateMode::Ephemeral));
String metadata_str = S3QueueTableMetadata(configuration, *s3queue_settings).toString();
ops.emplace_back(zkutil::makeCreateRequest(zk_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processed", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/failed", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processing", "", zkutil::CreateMode::Ephemeral));
ops.emplace_back(zkutil::makeCreateRequest(
zookeeper_path + "/columns", metadata_snapshot->getColumns().toString(), zkutil::CreateMode::Persistent));
zk_path + "/columns", metadata_snapshot->getColumns().toString(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/metadata", metadata_str, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/metadata", metadata_str, zkutil::CreateMode::Persistent));
}
Coordination::Responses responses;
auto code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zookeeper_path);
LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zk_path);
continue;
}
else if (code != Coordination::Error::ZOK)
@ -577,7 +557,7 @@ bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_
throw Exception(
ErrorCodes::REPLICA_ALREADY_EXISTS,
"Cannot create table, because it is created concurrently every time or because "
"of wrong zookeeper_path or because of logical error");
"of wrong zk_path or because of logical error");
}
@ -588,7 +568,7 @@ void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const
{
auto zookeeper = getZooKeeper();
S3QueueTableMetadata old_metadata(s3_configuration, *s3queue_settings);
S3QueueTableMetadata old_metadata(configuration, *s3queue_settings);
Coordination::Stat metadata_stat;
String metadata_str = zookeeper->get(fs::path(zookeeper_prefix) / "metadata", &metadata_stat);
@ -615,39 +595,40 @@ std::shared_ptr<StorageS3QueueSource::IIterator>
StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query)
{
auto it = std::make_shared<StorageS3QueueSource::QueueGlobIterator>(
*s3_configuration.client,
s3_configuration.url,
*configuration.client,
configuration.url,
query,
virtual_block,
local_context,
s3queue_settings->s3queue_polling_size.value,
s3_configuration.request_settings);
configuration.request_settings);
auto lock = queue_holder->acquireLock();
S3QueueHolder::S3FilesCollection files_to_skip = queue_holder->getProcessedAndFailedFiles();
auto zookeeper = getZooKeeper();
auto lock = files_metadata->acquireLock(zookeeper);
S3QueueFilesMetadata::S3FilesCollection files_to_skip = files_metadata->getProcessedFailedAndProcessingFiles();
Strings files_to_process;
if (mode == S3QueueMode::UNORDERED)
if (s3queue_settings->mode == S3QueueMode::UNORDERED)
{
files_to_process = it->filterProcessingFiles(mode, files_to_skip);
files_to_process = it->filterProcessingFiles(s3queue_settings->mode, files_to_skip);
}
else
{
String max_processed_file = queue_holder->getMaxProcessedFile();
files_to_process = it->filterProcessingFiles(mode, files_to_skip, max_processed_file);
String max_processed_file = files_metadata->getMaxProcessedFile();
files_to_process = it->filterProcessingFiles(s3queue_settings->mode, files_to_skip, max_processed_file);
}
LOG_TEST(log, "Found files to process: {}", fmt::join(files_to_process, ", "));
queue_holder->setFilesProcessing(files_to_process);
files_metadata->setFilesProcessing(files_to_process);
return it;
}
void StorageS3Queue::drop()
{
auto zk_client = getZooKeeper();
if (zk_client->exists(zookeeper_path))
zk_client->removeRecursive(zookeeper_path);
auto zookeeper = getZooKeeper();
if (zookeeper->exists(zk_path))
zookeeper->removeRecursive(zk_path);
}
void registerStorageS3QueueImpl(const String & name, StorageFactory & factory)

View File

@ -11,7 +11,7 @@
# include <Core/BackgroundSchedulePool.h>
# include <Storages/IStorage.h>
# include <Storages/S3Queue/S3QueueHolder.h>
# include <Storages/S3Queue/S3QueueFilesMetadata.h>
# include <Storages/S3Queue/S3QueueSettings.h>
# include <Storages/S3Queue/S3QueueSource.h>
# include <Storages/StorageS3Settings.h>
@ -41,6 +41,7 @@ class StorageS3Queue : public IStorage, WithContext
{
public:
using Configuration = typename StorageS3::Configuration;
StorageS3Queue(
std::unique_ptr<S3QueueSettings> s3queue_settings_,
const Configuration & configuration_,
@ -79,35 +80,39 @@ public:
bool supportsPartitionBy() const override;
const auto & getFormatName() const { return format_name; }
const auto & getFormatName() const { return configuration.format; }
const String & getZooKeeperPath() const { return zk_path; }
zkutil::ZooKeeperPtr getZooKeeper() const;
private:
std::unique_ptr<S3QueueSettings> s3queue_settings;
std::shared_ptr<S3QueueHolder> queue_holder;
Configuration s3_configuration;
std::vector<String> keys;
const std::unique_ptr<S3QueueSettings> s3queue_settings;
const S3QueueAction after_processing;
std::shared_ptr<S3QueueFilesMetadata> files_metadata;
Configuration configuration;
NamesAndTypesList virtual_columns;
Block virtual_block;
S3QueueMode mode;
S3QueueAction after_processing;
uint64_t milliseconds_to_wait = 10000;
String format_name;
String compression_method;
String name;
UInt64 reschedule_processing_interval_ms;
std::optional<FormatSettings> format_settings;
ASTPtr partition_by;
String zk_path;
mutable zkutil::ZooKeeperPtr zk_client;
mutable std::mutex zk_mutex;
std::atomic<bool> mv_attached = false;
std::atomic<bool> shutdown_called{false};
Poco::Logger * log;
bool supportsSubcolumns() const override;
bool withGlobs() const { return s3_configuration.url.key.find_first_of("*?{") != std::string::npos; }
bool withGlobs() const { return configuration.url.key.find_first_of("*?{") != std::string::npos; }
void threadFunc();
size_t getTableDependentCount() const;
std::atomic<bool> mv_attached = false;
bool hasDependencies(const StorageID & table_id);
std::atomic<bool> shutdown_called{false};
Poco::Logger * log;
void startup() override;
void shutdown() override;
@ -122,19 +127,10 @@ private:
std::shared_ptr<TaskContext> task;
bool supportsSubsetOfColumns() const override;
String zookeeper_path;
zkutil::ZooKeeperPtr current_zookeeper;
mutable std::mutex current_zookeeper_mutex;
void setZooKeeper();
zkutil::ZooKeeperPtr tryGetZooKeeper() const;
zkutil::ZooKeeperPtr getZooKeeper() const;
const UInt32 zk_create_table_retries = 1000;
bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);
void checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot);
const String & getZooKeeperPath() const { return zookeeper_path; }
using KeysWithInfo = StorageS3QueueSource::KeysWithInfo;