Some fixes

This commit is contained in:
kssenii 2023-07-31 20:07:23 +02:00
parent c13fdca23e
commit 870a506a0b
9 changed files with 131 additions and 176 deletions

View File

@ -104,7 +104,7 @@ class IColumn;
M(UInt64, s3_retry_attempts, 10, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
M(UInt64, s3_request_timeout_ms, 3000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(String, s3queue_default_zookeeper_path, "/", "Default zookeeper path prefix for S3Queue engine", 0) \
M(String, s3queue_default_zookeeper_path, "/s3queue/", "Default zookeeper path prefix for S3Queue engine", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \
M(Bool, hdfs_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in hdfs engine tables", 0) \

View File

@ -1,3 +1,4 @@
#include "IO/VarInt.h"
#include "config.h"
#if USE_AWS_S3
@ -23,33 +24,34 @@ namespace ErrorCodes
void S3QueueHolder::S3QueueCollection::read(ReadBuffer & in)
{
assertString("collection:\n", in);
files = {};
while (!in.eof())
{
String file_name;
UInt64 timestamp;
UInt64 retries_count;
in >> file_name >> "\n";
in >> timestamp >> "\n";
in >> retries_count >> "\n";
TrackedCollectionItem item = {.file_path=file_name, .timestamp=timestamp, .retries_count=retries_count};
if (in.eof())
return;
size_t files_num;
in >> files_num >> "\n";
while (files_num--)
{
TrackedCollectionItem item;
in >> item.file_path >> "\n";
in >> item.timestamp >> "\n";
in >> item.retries_count >> "\n";
in >> item.last_exception >> "\n";
files.push_back(item);
}
}
void S3QueueHolder::S3QueueCollection::write(WriteBuffer & out) const
{
out << "collection:\n";
out << files.size() << "\n";
for (const auto & processed_file : files)
{
out << processed_file.file_path << "\n";
out << processed_file.timestamp << "\n";
out << processed_file.retries_count << "\n";
out << processed_file.last_exception << "\n";
}
/// todo(kssenii): use a more flexible format?
}
String S3QueueHolder::S3QueueCollection::toString() const
@ -79,16 +81,14 @@ void S3QueueHolder::S3QueueProcessedCollection::parse(const String & collection_
{
ReadBufferFromString buf(collection_str);
read(buf);
// Remove old items
if (max_age > 0)
if (max_age > 0) // Remove old items
{
UInt64 timestamp = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
UInt64 max_seconds_diff = max_age;
auto new_end = std::remove_if(
files.begin(), files.end(),
[&timestamp, &max_seconds_diff](TrackedCollectionItem processed_file)
std::erase_if(
files,
[&timestamp, &max_seconds_diff](const TrackedCollectionItem & processed_file)
{ return (timestamp - processed_file.timestamp) > max_seconds_diff; });
files.erase(new_end, files.end());
}
}
@ -100,12 +100,10 @@ void S3QueueHolder::S3QueueProcessedCollection::add(const String & file_name)
TrackedCollectionItem processed_file = {.file_path=file_name, .timestamp=timestamp};
files.push_back(processed_file);
// Check set size
if (files.size() > max_size)
while (files.size() > max_size)
{
files.erase(files.begin(), files.begin() + (files.size() - max_size));
files.pop_front();
}
/// todo(kssenii): use deque here
}
@ -120,7 +118,7 @@ void S3QueueHolder::S3QueueFailedCollection::parse(const String & collection_str
}
bool S3QueueHolder::S3QueueFailedCollection::add(const String & file_name)
bool S3QueueHolder::S3QueueFailedCollection::add(const String & file_name, const String & exception_message)
{
auto failed_it
= std::find_if(files.begin(), files.end(), [&file_name](const TrackedCollectionItem & s) { return s.file_path == file_name; });
@ -133,7 +131,7 @@ bool S3QueueHolder::S3QueueFailedCollection::add(const String & file_name)
}
else
{
TrackedCollectionItem failed_file = {.file_path=file_name, .retries_count=max_retries_count};
TrackedCollectionItem failed_file = { .file_path=file_name, .retries_count=max_retries_count, .last_exception = exception_message };
files.push_back(failed_file);
}
return true;
@ -163,6 +161,7 @@ S3QueueHolder::S3QueueHolder(
, max_set_size(max_set_size_)
, max_set_age_sec(max_set_age_sec_)
, max_loading_retries(max_loading_retries_)
, zk_client(getContext()->getZooKeeper())
, zookeeper_path(zookeeper_path_)
, zookeeper_failed_path(fs::path(zookeeper_path_) / "failed")
, zookeeper_processing_path(fs::path(zookeeper_path_) / "processing")
@ -171,31 +170,20 @@ S3QueueHolder::S3QueueHolder(
, mode(mode_)
, log(&Poco::Logger::get("S3QueueHolder"))
{
current_zookeeper = getContext()->getZooKeeper();
if (!current_zookeeper)
throw Exception(ErrorCodes::NO_ZOOKEEPER, "Cannot get ZooKeeper");
}
zkutil::ZooKeeperPtr S3QueueHolder::getZooKeeper() const
{
/// todo(kssenii): current_zookeeper is not updated at all apart from in constructor, remove the lock?
std::lock_guard lock(current_zookeeper_mutex);
return current_zookeeper;
}
void S3QueueHolder::setFileProcessed(const String & file_path)
{
auto zookeeper = getZooKeeper();
auto lock = acquireLock();
if (mode == S3QueueMode::UNORDERED)
{
String processed_files = zookeeper->get(zookeeper_processed_path);
String processed_files = zk_client->get(zookeeper_processed_path);
auto processed = S3QueueProcessedCollection(max_set_size, max_set_age_sec);
processed.parse(processed_files);
processed.add(file_path);
zookeeper->set(zookeeper_processed_path, processed.toString());
zk_client->set(zookeeper_processed_path, processed.toString());
}
else if (mode == S3QueueMode::ORDERED)
{
@ -204,24 +192,22 @@ void S3QueueHolder::setFileProcessed(const String & file_path)
// This check can be useful, when multiple table engines consume in ordered mode.
if (max_file.compare(file_path) <= 0)
{
zookeeper->set(zookeeper_processed_path, file_path);
zk_client->set(zookeeper_processed_path, file_path);
}
}
removeProcessingFile(file_path);
}
bool S3QueueHolder::markFailedAndCheckRetry(const String & file_path)
bool S3QueueHolder::setFileFailed(const String & file_path, const String & exception_message)
{
auto zookeeper = getZooKeeper();
auto lock = acquireLock();
String failed_files = zookeeper->get(zookeeper_failed_path);
auto failed_collection = S3QueueFailedCollection(max_loading_retries);
failed_collection.parse(failed_files);
bool retry_later = failed_collection.add(file_path);
failed_collection.parse(zk_client->get(zookeeper_failed_path));
bool retry_later = failed_collection.add(file_path, exception_message);
zookeeper->set(zookeeper_failed_path, failed_collection.toString());
zk_client->set(zookeeper_failed_path, failed_collection.toString());
removeProcessingFile(file_path);
return retry_later;
@ -229,8 +215,7 @@ bool S3QueueHolder::markFailedAndCheckRetry(const String & file_path)
S3QueueHolder::S3FilesCollection S3QueueHolder::getFailedFiles()
{
auto zookeeper = getZooKeeper();
String failed_files = zookeeper->get(zookeeper_failed_path);
String failed_files = zk_client->get(zookeeper_failed_path);
auto failed_collection = S3QueueFailedCollection(max_loading_retries);
failed_collection.parse(failed_files);
@ -240,85 +225,79 @@ S3QueueHolder::S3FilesCollection S3QueueHolder::getFailedFiles()
String S3QueueHolder::getMaxProcessedFile()
{
auto zookeeper = getZooKeeper();
String processed = zookeeper->get(zookeeper_processed_path);
String processed = zk_client->get(zookeeper_processed_path);
return processed;
}
S3QueueHolder::S3FilesCollection S3QueueHolder::getProcessingFiles()
{
auto zookeeper = getZooKeeper();
String processing = zookeeper->get(zookeeper_processing_path);
return parseCollection(processing);
String processing_files;
if (!zk_client->tryGet(zookeeper_processing_path, processing_files))
return {};
return parseCollection(processing_files);
}
void S3QueueHolder::setFilesProcessing(Strings & file_paths)
{
auto zookeeper = getZooKeeper();
std::unordered_set<String> processing_files(file_paths.begin(), file_paths.end());
processing_files.merge(getProcessingFiles());
String processing_files_str = toString(Strings(processing_files.begin(), processing_files.end()));
Strings processing_file_paths(processing_files.begin(), processing_files.end());
zookeeper->set(fs::path(zookeeper_processing_path), toString(processing_file_paths));
if (zk_client->exists(zookeeper_processing_path))
zk_client->set(fs::path(zookeeper_processing_path), processing_files_str);
else
zk_client->create(fs::path(zookeeper_processing_path), processing_files_str, zkutil::CreateMode::Ephemeral);
}
S3QueueHolder::S3FilesCollection S3QueueHolder::getUnorderedProcessedFiles()
{
auto zookeeper = getZooKeeper();
String processed = zookeeper->get(zookeeper_processed_path);
String processed = zk_client->get(zookeeper_processed_path);
auto collection = S3QueueProcessedCollection(max_set_size, max_set_age_sec);
collection.parse(processed);
return collection.getFileNames();
}
S3QueueHolder::S3FilesCollection S3QueueHolder::getExcludedFiles()
S3QueueHolder::S3FilesCollection S3QueueHolder::getProcessedAndFailedFiles()
{
auto zookeeper = getZooKeeper();
S3FilesCollection exclude_files = getFailedFiles();
S3FilesCollection processed_and_failed_files = getFailedFiles();
if (mode == S3QueueMode::UNORDERED)
{
S3FilesCollection processed_files = getUnorderedProcessedFiles();
exclude_files.merge(processed_files);
processed_and_failed_files.merge(getUnorderedProcessedFiles());
}
else
{
String processed = getMaxProcessedFile();
exclude_files.insert(processed);
processed_and_failed_files.insert(processed);
}
S3FilesCollection processing_files = getProcessingFiles();
exclude_files.merge(processing_files);
processed_and_failed_files.merge(processing_files);
return exclude_files;
return processed_and_failed_files;
}
void S3QueueHolder::removeProcessingFile(const String & file_path)
{
auto zookeeper = getZooKeeper();
String node_data;
String processing = zookeeper->get(zookeeper_processing_path);
String processing = zk_client->get(zookeeper_processing_path);
S3FilesCollection processing_files = parseCollection(processing);
processing_files.erase(file_path);
Strings file_paths(processing_files.begin(), processing_files.end());
zookeeper->set(fs::path(zookeeper_processing_path), toString(file_paths));
zk_client->set(fs::path(zookeeper_processing_path), toString(file_paths));
}
std::shared_ptr<zkutil::EphemeralNodeHolder> S3QueueHolder::acquireLock()
{
auto zookeeper = getZooKeeper();
UInt32 retry_count = 200;
UInt32 sleep_ms = 100;
UInt32 retries = 0;
while (true)
{
Coordination::Error code = zookeeper->tryCreate(zookeeper_lock_path, "", zkutil::CreateMode::Ephemeral);
Coordination::Error code = zk_client->tryCreate(zookeeper_lock_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
{
retries++;
@ -334,14 +313,14 @@ std::shared_ptr<zkutil::EphemeralNodeHolder> S3QueueHolder::acquireLock()
}
else
{
return zkutil::EphemeralNodeHolder::existing(zookeeper_lock_path, *zookeeper);
return zkutil::EphemeralNodeHolder::existing(zookeeper_lock_path, *zk_client);
}
}
}
S3QueueHolder::S3FilesCollection S3QueueHolder::parseCollection(String & files)
S3QueueHolder::S3FilesCollection S3QueueHolder::parseCollection(const String & collection_str)
{
ReadBufferFromString rb(files);
ReadBufferFromString rb(collection_str);
Strings deserialized;
try
{
@ -353,9 +332,7 @@ S3QueueHolder::S3FilesCollection S3QueueHolder::parseCollection(String & files)
deserialized = {};
}
std::unordered_set<String> processed(deserialized.begin(), deserialized.end());
return processed;
return std::unordered_set<String>(deserialized.begin(), deserialized.end());
}
}

View File

@ -17,10 +17,11 @@ public:
String file_path;
UInt64 timestamp = 0;
UInt64 retries_count = 0;
String last_exception;
};
using S3FilesCollection = std::unordered_set<String>;
using TrackedFiles = std::vector<TrackedCollectionItem>;
using TrackedFiles = std::deque<TrackedCollectionItem>;
S3QueueHolder(
const String & zookeeper_path_,
@ -31,9 +32,9 @@ public:
UInt64 & max_loading_retries_);
void setFileProcessed(const String & file_path);
bool markFailedAndCheckRetry(const String & file_path);
bool setFileFailed(const String & file_path, const String & exception_message);
void setFilesProcessing(Strings & file_paths);
S3FilesCollection getExcludedFiles();
S3FilesCollection getProcessedAndFailedFiles();
String getMaxProcessedFile();
std::shared_ptr<zkutil::EphemeralNodeHolder> acquireLock();
@ -73,7 +74,7 @@ public:
S3QueueFailedCollection(const UInt64 & max_retries_count_);
void parse(const String & collection_str) override;
bool add(const String & file_name);
bool add(const String & file_name, const String & exception_message);
S3FilesCollection getFileNames();
@ -87,7 +88,7 @@ private:
const UInt64 max_set_age_sec;
const UInt64 max_loading_retries;
zkutil::ZooKeeperPtr current_zookeeper;
zkutil::ZooKeeperPtr zk_client;
mutable std::mutex current_zookeeper_mutex;
mutable std::mutex mutex;
const String zookeeper_path;
@ -99,14 +100,12 @@ private:
const UUID table_uuid;
Poco::Logger * log;
zkutil::ZooKeeperPtr getZooKeeper() const;
S3FilesCollection getFailedFiles();
S3FilesCollection getProcessingFiles();
S3FilesCollection getUnorderedProcessedFiles();
void removeProcessingFile(const String & file_path);
S3FilesCollection parseCollection(String & files);
S3FilesCollection parseCollection(const String & collection_str);
};

View File

@ -21,7 +21,6 @@
# include <Storages/NamedCollectionsHelpers.h>
# include <Storages/PartitionedSink.h>
# include <Storages/ReadFromStorageProgress.h>
# include <Storages/S3Queue/S3QueueSource.h>
# include <Storages/StorageS3.h>
# include <Storages/StorageS3Settings.h>
@ -74,21 +73,17 @@ StorageS3QueueSource::QueueGlobIterator::QueueGlobIterator(
const Block & virtual_header,
ContextPtr context,
UInt64 & max_poll_size_,
StorageS3QueueSource::KeysWithInfo * read_keys_,
const S3Settings::RequestSettings & request_settings_)
: max_poll_size(max_poll_size_)
, bucket(globbed_uri_.bucket)
, glob_iterator(std::make_unique<StorageS3QueueSource::DisclosedGlobIterator>(
client_, globbed_uri_, query, virtual_header, context, read_keys_, request_settings_))
client_, globbed_uri_, query, virtual_header, context, nullptr, request_settings_))
{
/// todo(kssenii): remove this loop, it should not be here
while (true)
{
KeyWithInfo val = glob_iterator->next();
if (val.key.empty())
{
break;
}
keys_buf.push_back(val);
}
}
@ -98,16 +93,17 @@ Strings StorageS3QueueSource::QueueGlobIterator::filterProcessingFiles(
{
for (const KeyWithInfo & val : keys_buf)
{
auto full_path = bucket + '/' + val.key;
auto full_path = val.key;
if (exclude_keys.find(full_path) != exclude_keys.end())
{
LOG_TRACE(log, "Found in exclude keys {}", val.key);
LOG_TEST(log, "File {} will be skipped, because it was found in exclude files list "
"(either already processed or failed to be processed)", val.key);
continue;
}
if ((engine_mode == S3QueueMode::ORDERED) && (full_path.compare(max_file) <= 0))
{
continue;
}
if ((processing_keys.size() < max_poll_size) || (engine_mode == S3QueueMode::ORDERED))
{
processing_keys.push_back(val);
@ -124,6 +120,7 @@ Strings StorageS3QueueSource::QueueGlobIterator::filterProcessingFiles(
processing_keys.begin(),
processing_keys.end(),
[](const KeyWithInfo & lhs, const KeyWithInfo & rhs) { return lhs.key.compare(rhs.key) < 0; });
if (processing_keys.size() > max_poll_size)
{
processing_keys.erase(processing_keys.begin() + max_poll_size, processing_keys.end());
@ -132,11 +129,9 @@ Strings StorageS3QueueSource::QueueGlobIterator::filterProcessingFiles(
Strings keys;
for (const auto & key_info : processing_keys)
{
keys.push_back(bucket + '/' + key_info.key);
}
processing_keys.push_back(KeyWithInfo());
keys.push_back(key_info.key);
processing_keys.push_back(KeyWithInfo());
processing_iterator = processing_keys.begin();
return keys;
}
@ -153,12 +148,6 @@ StorageS3QueueSource::KeyWithInfo StorageS3QueueSource::QueueGlobIterator::next(
return KeyWithInfo();
}
size_t StorageS3QueueSource::QueueGlobIterator::getTotalSize() const
{
return glob_iterator->getTotalSize();
}
Block StorageS3QueueSource::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
{
for (const auto & virtual_column : requested_virtual_columns)
@ -232,6 +221,7 @@ String StorageS3QueueSource::getName() const
Chunk StorageS3QueueSource::generate()
{
auto file_progress = getContext()->getFileProgressCallback();
while (true)
{
if (isCancelled() || !reader)
@ -243,22 +233,12 @@ Chunk StorageS3QueueSource::generate()
Chunk chunk;
bool success_in_pulling = false;
String file_path;
try
{
if (reader->pull(chunk))
{
UInt64 num_rows = chunk.getNumRows();
file_path = reader.getPath();
size_t total_size = file_iterator->getTotalSize();
if (num_rows && total_size)
{
size_t chunk_size = reader.getFormat()->getApproxBytesReadForChunk();
if (!chunk_size)
chunk_size = chunk.bytes();
updateRowsProgressApprox(*this, num_rows, chunk_size, total_size, total_rows_approx_accumulated, total_rows_count_times, total_rows_approx_max);
}
auto file_path = reader.getPath();
for (const auto & virtual_column : requested_virtual_columns)
{
@ -279,14 +259,13 @@ Chunk StorageS3QueueSource::generate()
catch (const Exception & e)
{
LOG_ERROR(log, "Exception in chunk pulling: {} ", e.displayText());
const auto & failed_file_path = reader.getPath();
queue_holder->markFailedAndCheckRetry(failed_file_path);
queue_holder->setFileFailed(reader.getFile(), e.message());
success_in_pulling = false;
}
if (success_in_pulling)
{
applyActionAfterProcessing(file_path);
queue_holder->setFileProcessed(file_path);
applyActionAfterProcessing(reader.getFile());
queue_holder->setFileProcessed(reader.getFile());
return chunk;
}
@ -296,6 +275,7 @@ Chunk StorageS3QueueSource::generate()
if (!reader)
break;
/// Even if task is finished the thread may be not freed in pool.
/// So wait until it will be freed before scheduling a new task.
internal_source->create_reader_pool.wait();
@ -320,12 +300,10 @@ void StorageS3QueueSource::applyActionAfterProcessing(const String & file_path)
void StorageS3QueueSource::deleteProcessedObject(const String & file_path)
{
LOG_WARNING(log, "Delete processed file {} from bucket {}", file_path, bucket);
S3::DeleteObjectRequest request;
/// todo(kssenii): looks incorrect
String delete_key = file_path.substr(bucket.length() + 1);
LOG_INFO(log, "Delete processed file {} from bucket {}", file_path, bucket);
request.WithKey(delete_key).WithBucket(bucket);
S3::DeleteObjectRequest request;
request.WithKey(file_path).WithBucket(bucket);
auto outcome = client->DeleteObject(request);
if (!outcome.IsSuccess())
{

View File

@ -46,18 +46,15 @@ public:
const Block & virtual_header,
ContextPtr context,
UInt64 & max_poll_size_,
KeysWithInfo * read_keys_ = nullptr,
const S3Settings::RequestSettings & request_settings_ = {});
KeyWithInfo next() override;
size_t getTotalSize() const override;
Strings
filterProcessingFiles(const S3QueueMode & engine_mode, std::unordered_set<String> & exclude_keys, const String & max_file = "");
private:
UInt64 max_poll_size;
const String bucket;
KeysWithInfo keys_buf;
KeysWithInfo processing_keys;
mutable std::mutex mutex;
@ -116,10 +113,6 @@ private:
std::future<ReaderHolder> reader_future;
UInt64 total_rows_approx_max = 0;
size_t total_rows_count_times = 0;
UInt64 total_rows_approx_accumulated = 0;
mutable std::mutex mutex;
std::shared_ptr<StorageS3Source> internal_source;

View File

@ -24,7 +24,6 @@
# include <Storages/NamedCollectionsHelpers.h>
# include <Storages/PartitionedSink.h>
# include <Storages/ReadFromStorageProgress.h>
# include <Storages/S3Queue/S3QueueSource.h>
# include <Storages/S3Queue/S3QueueTableMetadata.h>
# include <Storages/S3Queue/StorageS3Queue.h>
@ -107,44 +106,43 @@ StorageS3Queue::StorageS3Queue(
, log(&Poco::Logger::get("StorageS3Queue (" + table_id_.table_name + ")"))
{
if (!withGlobs())
{
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue engine can read only from url with globs");
}
String setting_zookeeper_path = s3queue_settings->keeper_path;
LOG_INFO(log, "Settings zookeeper_path={}", setting_zookeeper_path);
std::string setting_zookeeper_path = s3queue_settings->keeper_path;
std::string zk_path_prefix;
if (setting_zookeeper_path.empty())
{
auto table_id = getStorageID();
auto database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
auto database = DatabaseCatalog::instance().getDatabase(table_id_.database_name);
bool is_in_replicated_database = database->getEngineName() == "Replicated";
auto default_path = getContext()->getSettingsRef().s3queue_default_zookeeper_path.value;
if (!default_path.empty())
{
zookeeper_path
= zkutil::extractZooKeeperPath(fs::path(default_path) / toString(table_id.uuid), /* check_starts_with_slash */ true, log);
zk_path_prefix = default_path;
}
else if (is_in_replicated_database)
{
LOG_INFO(log, "S3Queue engine keeper_path not specified. Use replicated database zookeeper path");
String base_zookeeper_path = assert_cast<const DatabaseReplicated *>(database.get())->getZooKeeperPath();
zookeeper_path = zkutil::extractZooKeeperPath(
fs::path(base_zookeeper_path) / "s3queue" / toString(table_id.uuid), /* check_starts_with_slash */ true, log);
LOG_INFO(log, "S3Queue engine zookeeper path is not specified. "
"Using replicated database zookeeper path");
zk_path_prefix = fs::path(assert_cast<const DatabaseReplicated *>(database.get())->getZooKeeperPath()) / "s3queue";
}
else
{
throw Exception(
ErrorCodes::NO_ZOOKEEPER,
"S3Queue keeper_path engine setting not specified, s3queue_default_zookeeper_path_prefix not specified and table not in "
"replicated database.");
throw Exception(ErrorCodes::NO_ZOOKEEPER,
"S3Queue keeper_path engine setting not specified, "
"s3queue_default_zookeeper_path_prefix not specified");
}
}
else
{
zookeeper_path = zkutil::extractZooKeeperPath(s3queue_settings->keeper_path, /* check_starts_with_slash */ true, log);
zk_path_prefix = s3queue_settings->keeper_path.value;
}
LOG_INFO(log, "Set zookeeper_path={}", zookeeper_path);
zookeeper_path = zkutil::extractZooKeeperPath(
fs::path(zk_path_prefix) / toString(table_id_.uuid), /* check_starts_with_slash */ true, log);
LOG_INFO(log, "Using zookeeper path: {}", zookeeper_path);
FormatFactory::instance().checkFormatName(format_name);
context_->getGlobalContext()->getRemoteHostFilter().checkURL(s3_configuration.url.uri);
@ -550,8 +548,8 @@ bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_
{
String metadata_str = S3QueueTableMetadata(s3_configuration, *s3queue_settings).toString();
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/processed", "collection:\n", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/failed", "collection:\n", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/processed", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/failed", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/processing", "", zkutil::CreateMode::Ephemeral));
ops.emplace_back(zkutil::makeCreateRequest(
zookeeper_path + "/columns", metadata_snapshot->getColumns().toString(), zkutil::CreateMode::Persistent));
@ -612,11 +610,8 @@ void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const
std::shared_ptr<StorageS3QueueSource::IIterator>
StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query, KeysWithInfo * read_keys)
StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query)
{
/// Iterate through disclosed globs and make a source for each file
std::lock_guard lock{sync_mutex};
auto it = std::make_shared<StorageS3QueueSource::QueueGlobIterator>(
*s3_configuration.client,
s3_configuration.url,
@ -624,26 +619,35 @@ StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query, KeysW
virtual_block,
local_context,
s3queue_settings->s3queue_polling_size.value,
read_keys,
s3_configuration.request_settings);
auto zookeeper_lock = queue_holder->acquireLock();
S3QueueHolder::S3FilesCollection exclude = queue_holder->getExcludedFiles();
auto lock = queue_holder->acquireLock();
S3QueueHolder::S3FilesCollection files_to_skip = queue_holder->getProcessedAndFailedFiles();
Strings processing_files;
Strings files_to_process;
if (mode == S3QueueMode::UNORDERED)
{
processing_files = it->filterProcessingFiles(mode, exclude);
files_to_process = it->filterProcessingFiles(mode, files_to_skip);
}
else
{
String max_processed_file = queue_holder->getMaxProcessedFile();
processing_files = it->filterProcessingFiles(mode, exclude, max_processed_file);
files_to_process = it->filterProcessingFiles(mode, files_to_skip, max_processed_file);
}
queue_holder->setFilesProcessing(processing_files);
LOG_TEST(log, "Found files to process: {}", fmt::join(files_to_process, ", "));
queue_holder->setFilesProcessing(files_to_process);
return it;
}
void StorageS3Queue::drop()
{
auto zk_client = getZooKeeper();
if (zk_client->exists(zookeeper_path))
zk_client->removeRecursive(zookeeper_path);
}
void registerStorageS3QueueImpl(const String & name, StorageFactory & factory)
{
factory.registerStorage(

View File

@ -109,9 +109,9 @@ private:
std::atomic<bool> shutdown_called{false};
Poco::Logger * log;
void startup() override;
void shutdown() override;
void drop() override;
struct TaskContext
{
@ -126,7 +126,6 @@ private:
zkutil::ZooKeeperPtr current_zookeeper;
mutable std::mutex current_zookeeper_mutex;
mutable std::mutex sync_mutex;
void setZooKeeper();
zkutil::ZooKeeperPtr tryGetZooKeeper() const;
@ -140,7 +139,7 @@ private:
using KeysWithInfo = StorageS3QueueSource::KeysWithInfo;
std::shared_ptr<StorageS3QueueSource::IIterator>
createFileIterator(ContextPtr local_context, ASTPtr query, KeysWithInfo * read_keys = nullptr);
createFileIterator(ContextPtr local_context, ASTPtr query);
void streamToViews();
Configuration updateConfigurationAndGetCopy(ContextPtr local_context);

View File

@ -596,7 +596,7 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto current_reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
return ReaderHolder{fs::path(bucket) / key_with_info.key, std::move(read_buf), std::move(input_format), std::move(pipeline), std::move(current_reader)};
return ReaderHolder{key_with_info.key, bucket, std::move(read_buf), std::move(input_format), std::move(pipeline), std::move(current_reader)};
}
std::future<StorageS3Source::ReaderHolder> StorageS3Source::createReaderAsync()

View File

@ -159,12 +159,14 @@ private:
{
public:
ReaderHolder(
String path_,
String key_,
String bucket_,
std::unique_ptr<ReadBuffer> read_buf_,
std::shared_ptr<IInputFormat> input_format_,
std::unique_ptr<QueryPipeline> pipeline_,
std::unique_ptr<PullingPipelineExecutor> reader_)
: path(std::move(path_))
: key(std::move(key_))
, bucket(std::move(bucket_))
, read_buf(std::move(read_buf_))
, input_format(std::move(input_format_))
, pipeline(std::move(pipeline_))
@ -189,19 +191,22 @@ private:
pipeline = std::move(other.pipeline);
input_format = std::move(other.input_format);
read_buf = std::move(other.read_buf);
path = std::move(other.path);
key = std::move(other.key);
bucket = std::move(other.bucket);
return *this;
}
explicit operator bool() const { return reader != nullptr; }
PullingPipelineExecutor * operator->() { return reader.get(); }
const PullingPipelineExecutor * operator->() const { return reader.get(); }
const String & getPath() const { return path; }
String getPath() const { return fs::path(bucket) / key; }
const String & getFile() const { return key; }
const IInputFormat * getInputFormat() const { return input_format.get(); }
private:
String path;
String key;
String bucket;
std::unique_ptr<ReadBuffer> read_buf;
std::shared_ptr<IInputFormat> input_format;
std::unique_ptr<QueryPipeline> pipeline;