S3Queue fixes

This commit is contained in:
kssenii 2023-09-07 14:37:24 +02:00
parent e192d4c624
commit 7338b560a8
10 changed files with 619 additions and 1053 deletions

View File

@ -1,18 +1,18 @@
#include "IO/VarInt.h"
#include "config.h"
#if USE_AWS_S3
# include <algorithm>
# include <IO/Operators.h>
# include <IO/ReadBufferFromString.h>
# include <IO/ReadHelpers.h>
# include <Storages/S3Queue/S3QueueFilesMetadata.h>
# include <Storages/S3Queue/StorageS3Queue.h>
# include <Storages/StorageS3Settings.h>
# include <Storages/StorageSnapshot.h>
# include <base/sleep.h>
# include <Common/ZooKeeper/ZooKeeper.h>
#include <base/sleep.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Storages/S3Queue/S3QueueFilesMetadata.h>
#include <Storages/S3Queue/StorageS3Queue.h>
#include <Storages/StorageS3Settings.h>
#include <Storages/StorageSnapshot.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
namespace DB
{
@ -30,151 +30,6 @@ namespace
}
}
void S3QueueFilesMetadata::S3QueueCollection::read(ReadBuffer & in)
{
files = {};
if (in.eof())
return;
size_t files_num;
in >> files_num >> "\n";
while (files_num--)
{
TrackedCollectionItem item;
in >> item.file_path >> "\n";
in >> item.timestamp >> "\n";
in >> item.retries_count >> "\n";
in >> item.last_exception >> "\n";
files.push_back(item);
}
}
void S3QueueFilesMetadata::S3QueueCollection::write(WriteBuffer & out) const
{
out << files.size() << "\n";
for (const auto & processed_file : files)
{
out << processed_file.file_path << "\n";
out << processed_file.timestamp << "\n";
out << processed_file.retries_count << "\n";
out << processed_file.last_exception << "\n";
}
}
String S3QueueFilesMetadata::S3QueueCollection::toString() const
{
WriteBufferFromOwnString out;
write(out);
return out.str();
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::S3QueueCollection::getFileNames()
{
S3FilesCollection keys = {};
for (const auto & pair : files)
keys.insert(pair.file_path);
return keys;
}
S3QueueFilesMetadata::S3QueueProcessedCollection::S3QueueProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_)
: max_size(max_size_), max_age(max_age_)
{
}
void S3QueueFilesMetadata::S3QueueProcessedCollection::parse(const String & collection_str)
{
ReadBufferFromString buf(collection_str);
read(buf);
if (max_age > 0) // Remove old items
{
std::erase_if(
files,
[timestamp = getCurrentTime(), this](const TrackedCollectionItem & processed_file)
{ return (timestamp - processed_file.timestamp) > max_age; });
}
}
void S3QueueFilesMetadata::S3QueueProcessedCollection::add(const String & file_name)
{
TrackedCollectionItem processed_file;
processed_file.file_path = file_name;
processed_file.timestamp = getCurrentTime();
files.push_back(processed_file);
/// TODO: it is strange that in parse() we take into account only max_age, but here only max_size.
while (files.size() > max_size)
{
files.pop_front();
}
}
S3QueueFilesMetadata::S3QueueFailedCollection::S3QueueFailedCollection(const UInt64 & max_retries_count_)
: max_retries_count(max_retries_count_)
{
}
void S3QueueFilesMetadata::S3QueueFailedCollection::parse(const String & collection_str)
{
ReadBufferFromString buf(collection_str);
read(buf);
}
bool S3QueueFilesMetadata::S3QueueFailedCollection::add(const String & file_name, const String & exception_message)
{
auto failed_it = std::find_if(
files.begin(), files.end(),
[&file_name](const TrackedCollectionItem & s) { return s.file_path == file_name; });
if (failed_it == files.end())
{
files.emplace_back(file_name, 0, max_retries_count, exception_message);
}
else if (failed_it->retries_count == 0 || --failed_it->retries_count == 0)
{
return false;
}
return true;
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::S3QueueFailedCollection::getFileNames()
{
S3FilesCollection failed_keys;
for (const auto & pair : files)
{
if (pair.retries_count == 0)
failed_keys.insert(pair.file_path);
}
return failed_keys;
}
void S3QueueFilesMetadata::S3QueueProcessingCollection::parse(const String & collection_str)
{
ReadBufferFromString rb(collection_str);
Strings result;
readQuoted(result, rb);
files = S3FilesCollection(result.begin(), result.end());
}
void S3QueueFilesMetadata::S3QueueProcessingCollection::add(const Strings & file_names)
{
files.insert(file_names.begin(), file_names.end());
}
void S3QueueFilesMetadata::S3QueueProcessingCollection::remove(const String & file_name)
{
files.erase(file_name);
}
String S3QueueFilesMetadata::S3QueueProcessingCollection::toString() const
{
return DB::toString(Strings(files.begin(), files.end()));
}
S3QueueFilesMetadata::S3QueueFilesMetadata(
const StorageS3Queue * storage_,
const S3QueueSettings & settings_)
@ -183,171 +38,273 @@ S3QueueFilesMetadata::S3QueueFilesMetadata(
, max_set_size(settings_.s3queue_tracked_files_limit.value)
, max_set_age_sec(settings_.s3queue_tracked_file_ttl_sec.value)
, max_loading_retries(settings_.s3queue_loading_retries.value)
, zookeeper_processing_path(fs::path(storage->getZooKeeperPath()) / "processing")
, zookeeper_processed_path(fs::path(storage->getZooKeeperPath()) / "processed")
, zookeeper_failed_path(fs::path(storage->getZooKeeperPath()) / "failed")
, zookeeper_lock_path(fs::path(storage->getZooKeeperPath()) / "lock")
, zookeeper_processing_path(storage->getZooKeeperPath() / "processing")
, zookeeper_processed_path(storage->getZooKeeperPath() / "processed")
, zookeeper_failed_path(storage->getZooKeeperPath() / "failed")
, log(&Poco::Logger::get("S3QueueFilesMetadata"))
{
}
void S3QueueFilesMetadata::setFileProcessed(const String & file_path)
std::string S3QueueFilesMetadata::NodeMetadata::toString() const
{
auto zookeeper = storage->getZooKeeper();
auto lock = acquireLock(zookeeper);
Poco::JSON::Object json;
json.set("file_path", file_path);
json.set("last_processed_timestamp", getCurrentTime());
json.set("last_exception", last_exception);
json.set("retries", retries);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}
S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::NodeMetadata::fromString(const std::string & metadata_str)
{
Poco::JSON::Parser parser;
auto json = parser.parse(metadata_str).extract<Poco::JSON::Object::Ptr>();
NodeMetadata metadata;
metadata.file_path = json->getValue<String>("file_path");
metadata.last_processed_timestamp = json->getValue<UInt64>("last_processed_timestamp");
metadata.last_exception = json->getValue<String>("last_exception");
metadata.retries = json->getValue<UInt64>("retries");
return metadata;
}
std::string S3QueueFilesMetadata::getNodeName(const std::string & path)
{
SipHash path_hash;
path_hash.update(path);
return toString(path_hash.get64());
}
S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::createNodeMetadata(
const std::string & path,
const std::string & exception,
size_t retries)
{
NodeMetadata metadata;
metadata.file_path = path;
metadata.last_processed_timestamp = getCurrentTime();
metadata.last_exception = exception;
metadata.retries = retries;
return metadata;
}
bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path)
{
switch (mode)
{
case S3QueueMode::UNORDERED:
{
S3QueueProcessedCollection processed_files(max_set_size, max_set_age_sec);
processed_files.parse(zookeeper->get(zookeeper_processed_path));
processed_files.add(file_path);
zookeeper->set(zookeeper_processed_path, processed_files.toString());
break;
}
case S3QueueMode::ORDERED:
{
// Check that we set in ZooKeeper node only maximum processed file path.
// This check can be useful, when multiple table engines consume in ordered mode.
String max_file = getMaxProcessedFile();
if (max_file.compare(file_path) <= 0)
zookeeper->set(zookeeper_processed_path, file_path);
break;
return trySetFileAsProcessingForOrderedMode(path);
}
}
removeProcessingFile(file_path);
}
bool S3QueueFilesMetadata::setFileFailed(const String & file_path, const String & exception_message)
{
auto zookeeper = storage->getZooKeeper();
auto lock = acquireLock(zookeeper);
S3QueueFailedCollection failed_collection(max_loading_retries);
failed_collection.parse(zookeeper->get(zookeeper_failed_path));
const bool can_be_retried = failed_collection.add(file_path, exception_message);
zookeeper->set(zookeeper_failed_path, failed_collection.toString());
removeProcessingFile(file_path);
return can_be_retried;
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getFailedFiles()
{
auto zookeeper = storage->getZooKeeper();
String failed_files = zookeeper->get(zookeeper_failed_path);
S3QueueFailedCollection failed_collection(max_loading_retries);
failed_collection.parse(failed_files);
return failed_collection.getFileNames();
}
String S3QueueFilesMetadata::getMaxProcessedFile()
{
auto zookeeper = storage->getZooKeeper();
return zookeeper->get(zookeeper_processed_path);
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getProcessingFiles()
{
auto zookeeper = storage->getZooKeeper();
String processing_files;
if (!zookeeper->tryGet(zookeeper_processing_path, processing_files))
return {};
S3QueueProcessingCollection processing_collection;
if (!processing_files.empty())
processing_collection.parse(processing_files);
return processing_collection.getFileNames();
}
void S3QueueFilesMetadata::setFilesProcessing(const Strings & file_paths)
{
auto zookeeper = storage->getZooKeeper();
String processing_files;
zookeeper->tryGet(zookeeper_processing_path, processing_files);
S3QueueProcessingCollection processing_collection;
if (!processing_files.empty())
processing_collection.parse(processing_files);
processing_collection.add(file_paths);
if (zookeeper->exists(zookeeper_processing_path))
zookeeper->set(zookeeper_processing_path, processing_collection.toString());
else
zookeeper->create(zookeeper_processing_path, processing_collection.toString(), zkutil::CreateMode::Ephemeral);
}
void S3QueueFilesMetadata::removeProcessingFile(const String & file_path)
{
auto zookeeper = storage->getZooKeeper();
String processing_files;
zookeeper->tryGet(zookeeper_processing_path, processing_files);
S3QueueProcessingCollection processing_collection;
processing_collection.parse(processing_files);
processing_collection.remove(file_path);
zookeeper->set(zookeeper_processing_path, processing_collection.toString());
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getUnorderedProcessedFiles()
{
auto zookeeper = storage->getZooKeeper();
S3QueueProcessedCollection processed_collection(max_set_size, max_set_age_sec);
processed_collection.parse(zookeeper->get(zookeeper_processed_path));
return processed_collection.getFileNames();
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getProcessedFailedAndProcessingFiles()
{
S3FilesCollection processed_and_failed_files = getFailedFiles();
switch (mode)
{
case S3QueueMode::UNORDERED:
{
processed_and_failed_files.merge(getUnorderedProcessedFiles());
break;
}
case S3QueueMode::ORDERED:
{
processed_and_failed_files.insert(getMaxProcessedFile());
break;
return trySetFileAsProcessingForUnorderedMode(path);
}
}
processed_and_failed_files.merge(getProcessingFiles());
return processed_and_failed_files;
}
std::shared_ptr<zkutil::EphemeralNodeHolder> S3QueueFilesMetadata::acquireLock(zkutil::ZooKeeperPtr zookeeper)
bool S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path)
{
UInt32 retry_count = 200;
UInt32 sleep_ms = 100;
UInt32 retries = 0;
const auto node_name = getNodeName(path);
const auto node_metadata = createNodeMetadata(path).toString();
const auto zk_client = storage->getZooKeeper();
/// The following requests to the following:
/// If !exists(processed_node) && !exists(failed_node) && !exists(processing_node) => create(processing_node)
Coordination::Requests requests;
/// Check that processed node does not appear.
requests.push_back(zkutil::makeCreateRequest(zookeeper_processed_path / node_name, "", zkutil::CreateMode::Persistent));
requests.push_back(zkutil::makeRemoveRequest(zookeeper_processed_path / node_name, -1));
/// Check that failed node does not appear.
requests.push_back(zkutil::makeCreateRequest(zookeeper_failed_path / node_name, "", zkutil::CreateMode::Persistent));
requests.push_back(zkutil::makeRemoveRequest(zookeeper_failed_path / node_name, -1));
/// Check that processing node does not exist and create if not.
requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata, zkutil::CreateMode::Ephemeral));
Coordination::Responses responses;
auto code = zk_client->tryMulti(requests, responses);
return code == Coordination::Error::ZOK;
}
bool S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path)
{
const auto node_name = getNodeName(path);
const auto node_metadata = createNodeMetadata(path).toString();
const auto zk_client = storage->getZooKeeper();
while (true)
{
Coordination::Error code = zookeeper->tryCreate(zookeeper_lock_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
Coordination::Requests requests;
zkutil::addCheckNotExistsRequest(requests, zk_client, zookeeper_failed_path / node_name);
zkutil::addCheckNotExistsRequest(requests, zk_client, zookeeper_processing_path / node_name);
requests.push_back(zkutil::makeGetRequest(zookeeper_processed_path));
Coordination::Responses responses;
auto code = zk_client->tryMulti(requests, responses);
if (code != Coordination::Error::ZOK)
{
retries++;
if (retries > retry_count)
if (responses[0]->error != Coordination::Error::ZOK
|| responses[1]->error != Coordination::Error::ZOK)
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Can't acquire zookeeper lock");
/// Path is already in Failed or Processing.
return false;
}
sleepForMilliseconds(sleep_ms);
/// GetRequest for zookeeper_processed_path should never fail,
/// because this is persistent node created at the creation of S3Queue storage.
throw zkutil::KeeperException::fromPath(code, requests.back()->getPath());
}
else if (code != Coordination::Error::ZOK)
Coordination::Stat processed_node_stat;
NodeMetadata processed_node_metadata;
if (const auto * get_response = dynamic_cast<const Coordination::GetResponse *>(responses.back().get()))
{
throw Coordination::Exception::fromPath(code, zookeeper_lock_path);
processed_node_stat = get_response->stat;
if (!get_response->data.empty())
processed_node_metadata = NodeMetadata::fromString(get_response->data);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected response type with error: {}", responses.back()->error);
auto max_processed_file_path = processed_node_metadata.file_path;
if (!max_processed_file_path.empty() && path <= max_processed_file_path)
return false;
requests.clear();
zkutil::addCheckNotExistsRequest(requests, *zk_client, zookeeper_failed_path / node_name);
requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata, zkutil::CreateMode::Ephemeral));
requests.push_back(zkutil::makeCheckRequest(zookeeper_processed_path, processed_node_stat.version));
code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
return true;
if (responses[0]->error != Coordination::Error::ZOK
|| responses[1]->error != Coordination::Error::ZOK)
{
/// Path is already in Failed or Processing.
return false;
}
/// Max processed path changed. Retry.
}
}
void S3QueueFilesMetadata::setFileProcessed(const String & path)
{
switch (mode)
{
case S3QueueMode::ORDERED:
{
return setFileProcessedForOrderedMode(path);
}
case S3QueueMode::UNORDERED:
{
return setFileProcessedForUnorderedMode(path);
}
}
}
void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(const String & path)
{
/// List results in s3 are always returned in UTF-8 binary order.
/// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html)
const auto node_name = getNodeName(path);
const auto node_metadata = createNodeMetadata(path).toString();
const auto zk_client = storage->getZooKeeper();
Coordination::Requests requests;
requests.push_back(zkutil::makeRemoveRequest(zookeeper_processing_path / node_name, -1));
requests.push_back(zkutil::makeCreateRequest(zookeeper_processed_path / node_name, node_metadata, zkutil::CreateMode::Persistent));
Coordination::Responses responses;
auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
return;
/// TODO this could be because of the expired session.
if (responses[0]->error != Coordination::Error::ZOK)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attemp to set file as processed but it is not processing");
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attemp to set file as processed but it is already processed");
}
void S3QueueFilesMetadata::setFileProcessedForOrderedMode(const String & path)
{
const auto node_name = getNodeName(path);
const auto node_metadata = createNodeMetadata(path).toString();
const auto zk_client = storage->getZooKeeper();
while (true)
{
std::string res;
Coordination::Stat stat;
bool exists = zk_client->tryGet(zookeeper_processed_path, res, &stat);
Coordination::Requests requests;
if (exists)
{
if (!res.empty())
{
auto metadata = NodeMetadata::fromString(res);
if (metadata.file_path >= path)
return;
}
requests.push_back(zkutil::makeSetRequest(zookeeper_processed_path, node_metadata, stat.version));
}
else
{
return zkutil::EphemeralNodeHolder::existing(zookeeper_lock_path, *zookeeper);
requests.push_back(zkutil::makeCreateRequest(zookeeper_processed_path, node_metadata, zkutil::CreateMode::Persistent));
}
Coordination::Responses responses;
auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
return;
}
}
void S3QueueFilesMetadata::setFileFailed(const String & path, const String & exception_message)
{
const auto node_name = getNodeName(path);
auto node_metadata = createNodeMetadata(path, exception_message);
const auto zk_client = storage->getZooKeeper();
Coordination::Requests requests;
requests.push_back(zkutil::makeRemoveRequest(zookeeper_processing_path / node_name, -1));
requests.push_back(zkutil::makeCreateRequest(zookeeper_failed_path / node_name, node_metadata.toString(), zkutil::CreateMode::Persistent));
Coordination::Responses responses;
auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
return;
if (responses[0]->error != Coordination::Error::ZOK)
{
/// TODO this could be because of the expired session.
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attemp to set file as filed but it is not processing");
}
Coordination::Stat stat;
auto failed_node_metadata = NodeMetadata::fromString(zk_client->get(zookeeper_failed_path / node_name, &stat));
node_metadata.retries = failed_node_metadata.retries + 1;
/// Failed node already exists, update it.
requests.clear();
requests.push_back(zkutil::makeRemoveRequest(zookeeper_processing_path / node_name, -1));
requests.push_back(zkutil::makeSetRequest(zookeeper_failed_path / node_name, node_metadata.toString(), stat.version));
responses.clear();
code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
return;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to set file as failed");
}
}
#endif

View File

@ -1,102 +1,29 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <filesystem>
#include <Core/Types.h>
#include <Core/SettingsEnums.h>
# include <Core/UUID.h>
# include <Interpreters/Context.h>
# include <Storages/StorageS3Settings.h>
# include <Common/ZooKeeper/ZooKeeper.h>
namespace fs = std::filesystem;
namespace Poco { class Logger; }
namespace DB
{
class StorageS3Queue;
struct S3QueueSettings;
class StorageS3Queue;
class S3QueueFilesMetadata
{
public:
struct TrackedCollectionItem
{
TrackedCollectionItem() = default;
TrackedCollectionItem(const String & file_path_, UInt64 timestamp_, UInt64 retries_count_, const String & last_exception_)
: file_path(file_path_), timestamp(timestamp_), retries_count(retries_count_), last_exception(last_exception_) {}
String file_path;
UInt64 timestamp = 0;
UInt64 retries_count = 0;
String last_exception;
};
using S3FilesCollection = std::unordered_set<String>;
using TrackedFiles = std::deque<TrackedCollectionItem>;
S3QueueFilesMetadata(const StorageS3Queue * storage_, const S3QueueSettings & settings_);
void setFilesProcessing(const Strings & file_paths);
void setFileProcessed(const String & file_path);
bool setFileFailed(const String & file_path, const String & exception_message);
bool trySetFileAsProcessing(const std::string & path);
S3FilesCollection getProcessedFailedAndProcessingFiles();
String getMaxProcessedFile();
std::shared_ptr<zkutil::EphemeralNodeHolder> acquireLock(zkutil::ZooKeeperPtr zookeeper);
void setFileProcessed(const std::string & path);
struct S3QueueCollection
{
public:
virtual ~S3QueueCollection() = default;
virtual String toString() const;
S3FilesCollection getFileNames();
virtual void parse(const String & collection_str) = 0;
protected:
TrackedFiles files;
void read(ReadBuffer & in);
void write(WriteBuffer & out) const;
};
struct S3QueueProcessedCollection : public S3QueueCollection
{
public:
S3QueueProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_);
void parse(const String & collection_str) override;
void add(const String & file_name);
private:
const UInt64 max_size;
const UInt64 max_age;
};
struct S3QueueFailedCollection : S3QueueCollection
{
public:
S3QueueFailedCollection(const UInt64 & max_retries_count_);
void parse(const String & collection_str) override;
bool add(const String & file_name, const String & exception_message);
S3FilesCollection getFileNames();
private:
UInt64 max_retries_count;
};
struct S3QueueProcessingCollection
{
public:
S3QueueProcessingCollection() = default;
void parse(const String & collection_str);
void add(const Strings & file_names);
void remove(const String & file_name);
String toString() const;
const S3FilesCollection & getFileNames() const { return files; }
private:
S3FilesCollection files;
};
void setFileFailed(const std::string & path, const std::string & exception_message);
private:
const StorageS3Queue * storage;
@ -105,23 +32,35 @@ private:
const UInt64 max_set_age_sec;
const UInt64 max_loading_retries;
const String zookeeper_processing_path;
const String zookeeper_processed_path;
const String zookeeper_failed_path;
const String zookeeper_lock_path;
const fs::path zookeeper_processing_path;
const fs::path zookeeper_processed_path;
const fs::path zookeeper_failed_path;
mutable std::mutex mutex;
Poco::Logger * log;
S3FilesCollection getFailedFiles();
S3FilesCollection getProcessingFiles();
S3FilesCollection getUnorderedProcessedFiles();
bool trySetFileAsProcessingForOrderedMode(const std::string & path);
bool trySetFileAsProcessingForUnorderedMode(const std::string & path);
void removeProcessingFile(const String & file_path);
void setFileProcessedForOrderedMode(const std::string & path);
void setFileProcessedForUnorderedMode(const std::string & path);
std::string getNodeName(const std::string & path);
struct NodeMetadata
{
std::string file_path;
UInt64 last_processed_timestamp = 0;
std::string last_exception;
UInt64 retries = 0;
std::string toString() const;
static NodeMetadata fromString(const std::string & metadata_str);
};
NodeMetadata createNodeMetadata(const std::string & path, const std::string & exception = "", size_t retries = 0);
};
}
#endif

View File

@ -1,8 +1,8 @@
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Common/Exception.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSetQuery.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Common/Exception.h>
namespace DB

View File

@ -1,47 +1,12 @@
#include <algorithm>
#include <Common/ProfileEvents.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include "IO/ParallelReadBuffer.h"
#include "Parsers/ASTCreateQuery.h"
#include "config.h"
#if USE_AWS_S3
# include <Common/isValidUTF8.h>
# include <Functions/FunctionsConversion.h>
# include <IO/S3/Requests.h>
# include <IO/S3Common.h>
# include <Interpreters/TreeRewriter.h>
# include <Parsers/ASTFunction.h>
# include <Parsers/ASTInsertQuery.h>
# include <Storages/NamedCollectionsHelpers.h>
# include <Storages/PartitionedSink.h>
# include <Storages/S3Queue/S3QueueSource.h>
# include <Storages/StorageS3.h>
# include <Storages/StorageS3Settings.h>
# include <Storages/VirtualColumnUtils.h>
# include <Formats/FormatFactory.h>
# include <Processors/Formats/IInputFormat.h>
# include <Processors/Formats/IOutputFormat.h>
# include <Processors/Transforms/AddingDefaultsTransform.h>
# include <QueryPipeline/QueryPipelineBuilder.h>
# include <DataTypes/DataTypeString.h>
# include <Common/CurrentMetrics.h>
# include <Common/NamedCollections/NamedCollections.h>
# include <Common/parseGlobs.h>
# include <Processors/ISource.h>
# include <Processors/Sinks/SinkToStorage.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/logger_useful.h>
#include <Storages/S3Queue/S3QueueSource.h>
#include <Storages/VirtualColumnUtils.h>
namespace CurrentMetrics
@ -64,138 +29,43 @@ namespace ErrorCodes
extern const int S3_ERROR;
}
StorageS3QueueSource::QueueGlobIterator::QueueGlobIterator(
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const NamesAndTypesList & virtual_columns,
ContextPtr context,
UInt64 & max_poll_size_,
const S3Settings::RequestSettings & request_settings_)
: max_poll_size(max_poll_size_)
, glob_iterator(std::make_unique<StorageS3QueueSource::DisclosedGlobIterator>(
client_, globbed_uri_, query, virtual_columns, context, nullptr, request_settings_))
StorageS3QueueSource::FileIterator::FileIterator(
std::shared_ptr<S3QueueFilesMetadata> metadata_, std::unique_ptr<GlobIterator> glob_iterator_)
: metadata(metadata_) , glob_iterator(std::move(glob_iterator_))
{
}
StorageS3QueueSource::KeyWithInfo StorageS3QueueSource::FileIterator::next()
{
/// todo(kssenii): remove this loop, it should not be here
while (true)
{
KeyWithInfo val = glob_iterator->next();
if (val.key.empty())
break;
keys_buf.push_back(val);
return {};
if (metadata->trySetFileAsProcessing(val.key))
return val;
}
}
Strings StorageS3QueueSource::QueueGlobIterator::filterProcessingFiles(
const S3QueueMode & engine_mode, std::unordered_set<String> & exclude_keys, const String & max_file)
{
for (const KeyWithInfo & val : keys_buf)
{
auto full_path = val.key;
if (exclude_keys.find(full_path) != exclude_keys.end())
{
LOG_TEST(log, "File {} will be skipped, because it was found in exclude files list "
"(either already processed or failed to be processed)", val.key);
continue;
}
if ((engine_mode == S3QueueMode::ORDERED) && (full_path.compare(max_file) <= 0))
continue;
if ((processing_keys.size() < max_poll_size) || (engine_mode == S3QueueMode::ORDERED))
{
processing_keys.push_back(val);
}
else
{
break;
}
}
if (engine_mode == S3QueueMode::ORDERED)
{
std::sort(
processing_keys.begin(),
processing_keys.end(),
[](const KeyWithInfo & lhs, const KeyWithInfo & rhs) { return lhs.key.compare(rhs.key) < 0; });
if (processing_keys.size() > max_poll_size)
{
processing_keys.erase(processing_keys.begin() + max_poll_size, processing_keys.end());
}
}
Strings keys;
for (const auto & key_info : processing_keys)
keys.push_back(key_info.key);
processing_keys.push_back(KeyWithInfo());
processing_iterator = processing_keys.begin();
return keys;
}
StorageS3QueueSource::KeyWithInfo StorageS3QueueSource::QueueGlobIterator::next()
{
std::lock_guard lock(mutex);
if (processing_iterator != processing_keys.end())
{
return *processing_iterator++;
}
return KeyWithInfo();
}
StorageS3QueueSource::StorageS3QueueSource(
const ReadFromFormatInfo & info,
const String & format_,
String name_,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
UInt64 max_block_size_,
const S3Settings::RequestSettings & request_settings_,
String compression_hint_,
const std::shared_ptr<const S3::Client> & client_,
const String & bucket_,
const String & version_id_,
const String & url_host_and_port,
std::shared_ptr<IIterator> file_iterator_,
const Block & header_,
std::unique_ptr<StorageS3Source> internal_source_,
std::shared_ptr<S3QueueFilesMetadata> files_metadata_,
const S3QueueAction & action_,
const size_t download_thread_num_)
: ISource(info.source_header)
RemoveFileFunc remove_file_func_,
const NamesAndTypesList & requested_virtual_columns_,
ContextPtr context_)
: ISource(header_)
, WithContext(context_)
, name(std::move(name_))
, bucket(bucket_)
, version_id(version_id_)
, format(format_)
, columns_desc(info.columns_description)
, request_settings(request_settings_)
, client(client_)
, files_metadata(files_metadata_)
, requested_virtual_columns(info.requested_virtual_columns)
, requested_columns(info.requested_columns)
, file_iterator(file_iterator_)
, action(action_)
, files_metadata(files_metadata_)
, internal_source(std::move(internal_source_))
, requested_virtual_columns(requested_virtual_columns_)
, remove_file_func(remove_file_func_)
, log(&Poco::Logger::get("StorageS3QueueSource"))
{
internal_source = std::make_shared<StorageS3Source>(
info,
format_,
name_,
context_,
format_settings_,
max_block_size_,
request_settings_,
compression_hint_,
client_,
bucket_,
version_id_,
url_host_and_port,
file_iterator,
download_thread_num_,
false,
/* query_info */ std::nullopt);
reader = std::move(internal_source->reader);
if (reader)
reader_future = std::move(internal_source->reader_future);
@ -213,7 +83,6 @@ String StorageS3QueueSource::getName() const
Chunk StorageS3QueueSource::generate()
{
auto file_progress = getContext()->getFileProgressCallback();
while (true)
{
if (isCancelled() || !reader)
@ -223,46 +92,27 @@ Chunk StorageS3QueueSource::generate()
break;
}
Chunk chunk;
bool success_in_pulling = false;
try
{
Chunk chunk;
if (reader->pull(chunk))
{
UInt64 num_rows = chunk.getNumRows();
auto file_path = reader.getPath();
for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
{
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, file_path)->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = file_path.find_last_of('/');
auto column = virtual_column.type->createColumnConst(num_rows, file_path.substr(last_slash_pos + 1));
chunk.addColumn(column->convertToFullColumnIfConst());
}
}
success_in_pulling = true;
LOG_TEST(log, "Read {} rows from file: {}", chunk.getNumRows(), reader.getPath());
VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, reader.getPath());
return chunk;
}
}
catch (const Exception & e)
{
LOG_ERROR(log, "Exception in chunk pulling: {} ", e.displayText());
files_metadata->setFileFailed(reader.getFile(), e.message());
success_in_pulling = false;
}
if (success_in_pulling)
{
applyActionAfterProcessing(reader.getFile());
files_metadata->setFileProcessed(reader.getFile());
return chunk;
throw;
}
files_metadata->setFileProcessed(reader.getFile());
applyActionAfterProcessing(reader.getFile());
assert(reader_future.valid());
chassert(reader_future.valid());
reader = reader_future.get();
if (!reader)
@ -277,37 +127,21 @@ Chunk StorageS3QueueSource::generate()
return {};
}
void StorageS3QueueSource::applyActionAfterProcessing(const String & file_path)
void StorageS3QueueSource::applyActionAfterProcessing(const String & path)
{
switch (action)
{
case S3QueueAction::DELETE:
deleteProcessedObject(file_path);
{
assert(remove_file_func);
remove_file_func(path);
break;
}
case S3QueueAction::KEEP:
break;
}
}
void StorageS3QueueSource::deleteProcessedObject(const String & file_path)
{
LOG_INFO(log, "Delete processed file {} from bucket {}", file_path, bucket);
S3::DeleteObjectRequest request;
request.WithKey(file_path).WithBucket(bucket);
auto outcome = client->DeleteObject(request);
if (!outcome.IsSuccess())
{
const auto & err = outcome.GetError();
LOG_ERROR(log, "{} (Code: {})", err.GetMessage(), static_cast<size_t>(err.GetErrorType()));
}
else
{
LOG_TRACE(log, "Object with path {} was removed from S3", file_path);
}
}
}
#endif

View File

@ -2,29 +2,13 @@
#include "config.h"
#if USE_AWS_S3
#include <Processors/ISource.h>
#include <Storages/S3Queue/S3QueueFilesMetadata.h>
#include <Storages/StorageS3.h>
#include <Common/ZooKeeper/ZooKeeper.h>
# include <Core/Types.h>
# include <Compression/CompressionInfo.h>
# include <Storages/IStorage.h>
# include <Storages/S3Queue/S3QueueFilesMetadata.h>
# include <Storages/StorageS3.h>
# include <Storages/StorageS3Settings.h>
# include <Storages/prepareReadingFromFormat.h>
# include <IO/CompressionMethod.h>
# include <IO/S3/getObjectInfo.h>
# include <Interpreters/Context.h>
# include <Interpreters/threadPoolCallbackRunner.h>
# include <Processors/Executors/PullingPipelineExecutor.h>
# include <Processors/ISource.h>
# include <Storages/Cache/SchemaCache.h>
# include <Storages/StorageConfiguration.h>
# include <Poco/URI.h>
# include <Common/ZooKeeper/ZooKeeper.h>
# include <Common/logger_useful.h>
namespace Poco { class Logger; }
namespace DB
{
@ -34,56 +18,37 @@ class StorageS3QueueSource : public ISource, WithContext
{
public:
using IIterator = StorageS3Source::IIterator;
using DisclosedGlobIterator = StorageS3Source::DisclosedGlobIterator;
using KeysWithInfo = StorageS3Source::KeysWithInfo;
using GlobIterator = StorageS3Source::DisclosedGlobIterator;
using KeyWithInfo = StorageS3Source::KeyWithInfo;
class QueueGlobIterator : public IIterator
using ZooKeeperGetter = std::function<zkutil::ZooKeeperPtr()>;
using RemoveFileFunc = std::function<void(std::string)>;
class FileIterator : public IIterator
{
public:
QueueGlobIterator(
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const NamesAndTypesList & virtual_columns,
ContextPtr context,
UInt64 & max_poll_size_,
const S3Settings::RequestSettings & request_settings_ = {});
FileIterator(
std::shared_ptr<S3QueueFilesMetadata> metadata_,
std::unique_ptr<GlobIterator> glob_iterator_);
KeyWithInfo next() override;
Strings
filterProcessingFiles(const S3QueueMode & engine_mode, std::unordered_set<String> & exclude_keys, const String & max_file = "");
private:
UInt64 max_poll_size;
KeysWithInfo keys_buf;
KeysWithInfo processing_keys;
mutable std::mutex mutex;
std::unique_ptr<DisclosedGlobIterator> glob_iterator;
std::vector<KeyWithInfo>::iterator processing_iterator;
Poco::Logger * log = &Poco::Logger::get("StorageS3QueueSourceIterator");
const std::shared_ptr<S3QueueFilesMetadata> metadata;
const std::unique_ptr<GlobIterator> glob_iterator;
std::mutex mutex;
};
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
StorageS3QueueSource(
const ReadFromFormatInfo & info,
const String & format,
String name_,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
UInt64 max_block_size_,
const S3Settings::RequestSettings & request_settings_,
String compression_hint_,
const std::shared_ptr<const S3::Client> & client_,
const String & bucket,
const String & version_id,
const String & url_host_and_port,
std::shared_ptr<IIterator> file_iterator_,
const Block & header_,
std::unique_ptr<StorageS3Source> internal_source_,
std::shared_ptr<S3QueueFilesMetadata> files_metadata_,
const S3QueueAction & action_,
size_t download_thread_num);
RemoveFileFunc remove_file_func_,
const NamesAndTypesList & requested_virtual_columns_,
ContextPtr context_);
~StorageS3QueueSource() override;
@ -91,34 +56,21 @@ public:
Chunk generate() override;
private:
String name;
String bucket;
String version_id;
String format;
ColumnsDescription columns_desc;
S3Settings::RequestSettings request_settings;
std::shared_ptr<const S3::Client> client;
const String name;
const S3QueueAction action;
const std::shared_ptr<S3QueueFilesMetadata> files_metadata;
const std::shared_ptr<StorageS3Source> internal_source;
const NamesAndTypesList requested_virtual_columns;
RemoveFileFunc remove_file_func;
Poco::Logger * log;
std::shared_ptr<S3QueueFilesMetadata> files_metadata;
using ReaderHolder = StorageS3Source::ReaderHolder;
ReaderHolder reader;
NamesAndTypesList requested_virtual_columns;
NamesAndTypesList requested_columns;
std::shared_ptr<IIterator> file_iterator;
const S3QueueAction action;
Poco::Logger * log = &Poco::Logger::get("StorageS3QueueSource");
std::future<ReaderHolder> reader_future;
mutable std::mutex mutex;
std::shared_ptr<StorageS3Source> internal_source;
void deleteProcessedObject(const String & file_path);
void applyActionAfterProcessing(const String & file_path);
void applyActionAfterProcessing(const String & path);
};
}

View File

@ -2,12 +2,12 @@
#if USE_AWS_S3
# include <Poco/JSON/JSON.h>
# include <Poco/JSON/Object.h>
# include <Poco/JSON/Parser.h>
# include <Storages/S3Queue/S3QueueSettings.h>
# include <Storages/S3Queue/S3QueueTableMetadata.h>
# include <Storages/StorageS3.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/S3Queue/S3QueueTableMetadata.h>
#include <Storages/StorageS3.h>
namespace DB
@ -18,13 +18,17 @@ namespace ErrorCodes
extern const int METADATA_MISMATCH;
}
S3QueueTableMetadata::S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings)
S3QueueTableMetadata::S3QueueTableMetadata(
const StorageS3::Configuration & configuration,
const S3QueueSettings & engine_settings,
const StorageInMemoryMetadata & storage_metadata)
{
format_name = configuration.format;
after_processing = engine_settings.after_processing.toString();
mode = engine_settings.mode.toString();
s3queue_tracked_files_limit = engine_settings.s3queue_tracked_files_limit;
s3queue_tracked_file_ttl_sec = engine_settings.s3queue_tracked_file_ttl_sec;
columns = storage_metadata.getColumns().toString();
}
@ -36,6 +40,7 @@ String S3QueueTableMetadata::toString() const
json.set("s3queue_tracked_files_limit", s3queue_tracked_files_limit);
json.set("s3queue_tracked_file_ttl_sec", s3queue_tracked_file_ttl_sec);
json.set("format_name", format_name);
json.set("columns", columns);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
@ -52,6 +57,7 @@ void S3QueueTableMetadata::read(const String & metadata_str)
s3queue_tracked_files_limit = json->getValue<UInt64>("s3queue_tracked_files_limit");
s3queue_tracked_file_ttl_sec = json->getValue<UInt64>("s3queue_tracked_file_ttl_sec");
format_name = json->getValue<String>("format_name");
columns = json->getValue<String>("columns");
}
S3QueueTableMetadata S3QueueTableMetadata::parse(const String & metadata_str)

View File

@ -2,9 +2,9 @@
#if USE_AWS_S3
# include <Storages/S3Queue/S3QueueSettings.h>
# include <Storages/StorageS3.h>
# include <base/types.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/StorageS3.h>
#include <base/types.h>
namespace DB
{
@ -18,13 +18,14 @@ class ReadBuffer;
struct S3QueueTableMetadata
{
String format_name;
String columns;
String after_processing;
String mode;
UInt64 s3queue_tracked_files_limit;
UInt64 s3queue_tracked_file_ttl_sec;
S3QueueTableMetadata() = default;
S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings);
S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings, const StorageInMemoryMetadata & storage_metadata);
void read(const String & metadata_str);
static S3QueueTableMetadata parse(const String & metadata_str);

View File

@ -2,69 +2,38 @@
#if USE_AWS_S3
#include <Common/ProfileEvents.h>
#include <IO/S3Common.h>
#include <IO/CompressionMethod.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTInsertQuery.h>
#include <Storages/S3Queue/S3QueueTableMetadata.h>
#include <Storages/S3Queue/StorageS3Queue.h>
#include <Storages/S3Queue/S3QueueFilesMetadata.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/StorageSnapshot.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/prepareReadingFromFormat.h>
#include <filesystem>
# include <Databases/DatabaseReplicated.h>
# include <IO/WriteBuffer.h>
# include <IO/WriteHelpers.h>
# include <Interpreters/InterpreterInsertQuery.h>
# include <Processors/Executors/CompletedPipelineExecutor.h>
# include <Common/ProfileEvents.h>
# include <Common/ZooKeeper/ZooKeeper.h>
# include <Common/isValidUTF8.h>
# include "IO/ParallelReadBuffer.h"
# include <Functions/FunctionsConversion.h>
# include <IO/S3Common.h>
# include <Interpreters/TreeRewriter.h>
# include <Parsers/ASTFunction.h>
# include <Parsers/ASTInsertQuery.h>
# include <Storages/NamedCollectionsHelpers.h>
# include <Storages/PartitionedSink.h>
# include <Storages/S3Queue/S3QueueSource.h>
# include <Storages/S3Queue/S3QueueTableMetadata.h>
# include <Storages/S3Queue/StorageS3Queue.h>
# include <Storages/StorageFactory.h>
# include <Storages/StorageMaterializedView.h>
# include <Storages/StorageS3.h>
# include <Storages/StorageSnapshot.h>
# include <Storages/VirtualColumnUtils.h>
# include <Storages/prepareReadingFromFormat.h>
# include <Common/NamedCollections/NamedCollections.h>
# include <Formats/FormatFactory.h>
# include <Processors/Formats/IInputFormat.h>
# include <Processors/Formats/IOutputFormat.h>
# include <Processors/Transforms/AddingDefaultsTransform.h>
# include <QueryPipeline/QueryPipelineBuilder.h>
# include <DataTypes/DataTypeString.h>
# include <Common/parseGlobs.h>
# include <filesystem>
# include <Processors/ISource.h>
# include <Processors/Sinks/SinkToStorage.h>
# include <QueryPipeline/Pipe.h>
namespace fs = std::filesystem;
namespace ProfileEvents
{
extern const Event S3DeleteObjects;
extern const Event S3ListObjects;
extern const Event S3DeleteObjects;
extern const Event S3ListObjects;
}
namespace DB
{
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
static const auto MAX_THREAD_WORK_DURATION_MS = 60000;
namespace ErrorCodes
@ -78,6 +47,33 @@ namespace ErrorCodes
extern const int INCOMPATIBLE_COLUMNS;
}
namespace
{
bool containsGlobs(const S3::URI & url)
{
return url.key.find_first_of("*?{") != std::string::npos;
}
std::string chooseZooKeeperPath(const StorageID & table_id, const Settings & settings, const S3QueueSettings & s3queue_settings)
{
std::string zk_path_prefix = settings.s3queue_default_zookeeper_path.value;
if (zk_path_prefix.empty())
zk_path_prefix = "/";
std::string result_zk_path;
if (s3queue_settings.keeper_path.changed)
{
/// We do not add table uuid here on purpose.
result_zk_path = fs::path(zk_path_prefix) / s3queue_settings.keeper_path.value;
}
else
{
auto database_uuid = DatabaseCatalog::instance().getDatabase(table_id.database_name)->getUUID();
result_zk_path = fs::path(zk_path_prefix) / toString(database_uuid) / toString(table_id.uuid);
}
return zkutil::extractZooKeeperPath(result_zk_path, true);
}
}
StorageS3Queue::StorageS3Queue(
std::unique_ptr<S3QueueSettings> s3queue_settings_,
@ -87,79 +83,64 @@ StorageS3Queue::StorageS3Queue(
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
ASTPtr partition_by_)
std::optional<FormatSettings> format_settings_)
: IStorage(table_id_)
, WithContext(context_)
, s3queue_settings(std::move(s3queue_settings_))
, zk_path(chooseZooKeeperPath(table_id_, context_->getSettingsRef(), *s3queue_settings))
, after_processing(s3queue_settings->after_processing)
, files_metadata(std::make_shared<S3QueueFilesMetadata>(this, *s3queue_settings))
, configuration{configuration_}
, reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms)
, format_settings(format_settings_)
, partition_by(partition_by_)
, reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms)
, log(&Poco::Logger::get("StorageS3Queue (" + table_id_.table_name + ")"))
{
if (configuration.url.key.ends_with('/'))
{
configuration.url.key += '*';
if (!withGlobs())
}
else if (!containsGlobs(configuration.url))
{
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
std::string zk_path_prefix = getContext()->getSettingsRef().s3queue_default_zookeeper_path.value;
if (zk_path_prefix.empty())
zk_path_prefix = "/";
std::string result_zk_path;
if (s3queue_settings->keeper_path.changed)
{
/// We do not add table uuid here on purpose.
result_zk_path = fs::path(zk_path_prefix) / s3queue_settings->keeper_path.value;
}
else
{
auto database_uuid = DatabaseCatalog::instance().getDatabase(table_id_.database_name)->getUUID();
result_zk_path = fs::path(zk_path_prefix) / toString(database_uuid) / toString(table_id_.uuid);
}
zk_path = zkutil::extractZooKeeperPath(result_zk_path, true/* check_starts_with_slash */, log);
LOG_INFO(log, "Using zookeeper path: {}", zk_path);
FormatFactory::instance().checkFormatName(configuration.format);
context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration.url.uri);
StorageInMemoryMetadata storage_metadata;
configuration.update(context_);
FormatFactory::instance().checkFormatName(configuration.format);
context_->getRemoteHostFilter().checkURL(configuration.url.uri);
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
{
auto columns = StorageS3::getTableStructureFromDataImpl(configuration, format_settings, context_);
storage_metadata.setColumns(columns);
}
else
{
storage_metadata.setColumns(columns_);
}
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
createOrCheckMetadata(storage_metadata);
setInMemoryMetadata(storage_metadata);
auto metadata_snapshot = getInMemoryMetadataPtr();
const bool is_first_replica = createTableIfNotExists(metadata_snapshot);
if (!is_first_replica)
{
checkTableStructure(zk_path, metadata_snapshot);
}
files_metadata = std::make_shared<S3QueueFilesMetadata>(this, *s3queue_settings);
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
task = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
auto poll_thread = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
task = std::make_shared<TaskContext>(std::move(poll_thread));
LOG_INFO(log, "Using zookeeper path: {}", zk_path.string());
}
bool StorageS3Queue::supportsSubcolumns() const
void StorageS3Queue::startup()
{
return true;
if (task)
task->activateAndSchedule();
}
void StorageS3Queue::shutdown()
{
shutdown_called = true;
if (task)
task->deactivate();
}
bool StorageS3Queue::supportsSubsetOfColumns() const
@ -177,80 +158,62 @@ Pipe StorageS3Queue::read(
size_t /* num_streams */)
{
if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
throw Exception(
ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");
{
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. "
"To enable use setting `stream_like_engine_allow_direct_select`");
}
if (mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageS3Queue with attached materialized views");
{
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED,
"Cannot read from {} with attached materialized views", getName());
}
auto query_configuration = updateConfigurationAndGetCopy(local_context);
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(local_context, query_info.query);
Pipes pipes;
pipes.emplace_back(createSource(column_names, storage_snapshot, query_info.query, max_block_size, local_context));
return Pipe::unitePipes(std::move(pipes));
}
std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
ASTPtr query,
size_t max_block_size,
ContextPtr local_context)
{
auto configuration_snapshot = updateConfigurationAndGetCopy(local_context);
auto file_iterator = createFileIterator(local_context, query);
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
return Pipe(std::make_shared<StorageS3QueueSource>(
read_from_format_info,
configuration.format,
getName(),
local_context,
format_settings,
auto internal_source = std::make_unique<StorageS3Source>(
read_from_format_info, configuration.format, getName(), local_context, format_settings,
max_block_size,
query_configuration.request_settings,
configuration.compression_method,
query_configuration.client,
query_configuration.url.bucket,
query_configuration.url.version_id,
query_configuration.url.uri.getHost() + std::to_string(query_configuration.url.uri.getPort()),
iterator_wrapper,
files_metadata,
after_processing,
max_download_threads));
}
configuration_snapshot.request_settings,
configuration_snapshot.compression_method,
configuration_snapshot.client,
configuration_snapshot.url.bucket,
configuration_snapshot.url.version_id,
configuration_snapshot.url.uri.getHost() + std::to_string(configuration_snapshot.url.uri.getPort()),
file_iterator, local_context->getSettingsRef().max_download_threads, false, /* query_info */ std::nullopt);
SinkToStoragePtr StorageS3Queue::write(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, bool)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Write is not supported by storage {}", getName());
}
void StorageS3Queue::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported by storage {}", getName());
}
NamesAndTypesList StorageS3Queue::getVirtuals() const
{
return virtual_columns;
}
bool StorageS3Queue::supportsPartitionBy() const
{
return true;
}
void StorageS3Queue::startup()
{
if (task)
task->holder->activateAndSchedule();
}
void StorageS3Queue::shutdown()
{
shutdown_called = true;
if (task)
auto file_deleter = [this, bucket = configuration_snapshot.url.bucket, client = configuration_snapshot.client](const std::string & path)
{
task->stream_cancelled = true;
task->holder->deactivate();
}
}
size_t StorageS3Queue::getTableDependentCount() const
{
auto table_id = getStorageID();
// Check if at least one direct dependency is attached
return DatabaseCatalog::instance().getDependentViews(table_id).size();
S3::DeleteObjectRequest request;
request.WithKey(path).WithBucket(bucket);
auto outcome = client->DeleteObject(request);
if (!outcome.IsSuccess())
{
const auto & err = outcome.GetError();
LOG_ERROR(log, "{} (Code: {})", err.GetMessage(), static_cast<size_t>(err.GetErrorType()));
}
else
{
LOG_TRACE(log, "Object with path {} was removed from S3", path);
}
};
return std::make_shared<StorageS3QueueSource>(
getName(), read_from_format_info.source_header, std::move(internal_source),
files_metadata, after_processing, file_deleter, read_from_format_info.requested_virtual_columns, local_context);
}
bool StorageS3Queue::hasDependencies(const StorageID & table_id)
@ -280,40 +243,33 @@ bool StorageS3Queue::hasDependencies(const StorageID & table_id)
void StorageS3Queue::threadFunc()
{
bool reschedule = true;
SCOPE_EXIT({ mv_attached.store(false); });
try
{
auto table_id = getStorageID();
auto dependencies_count = getTableDependentCount();
size_t dependencies_count = DatabaseCatalog::instance().getDependentViews(table_id).size();
if (dependencies_count)
{
auto start_time = std::chrono::steady_clock::now();
/// Reset reschedule interval.
reschedule_processing_interval_ms = s3queue_settings->s3queue_polling_min_timeout_ms;
/// Disallow parallel selects while streaming to mv.
mv_attached.store(true);
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!task->stream_cancelled)
{
if (!hasDependencies(table_id))
{
/// For this case, we can not wait for watch thread to wake up
reschedule = true;
break;
}
/// Keep streaming as long as there are attached views and streaming is not cancelled
while (!shutdown_called && hasDependencies(table_id))
{
LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);
streamToViews();
auto ts = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(ts - start_time);
auto now = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time);
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
{
LOG_TRACE(log, "Thread work duration limit exceeded. Reschedule.");
reschedule = true;
break;
}
reschedule_processing_interval_ms = s3queue_settings->s3queue_polling_min_timeout_ms;
}
}
}
@ -322,19 +278,16 @@ void StorageS3Queue::threadFunc()
tryLogCurrentException(__PRETTY_FUNCTION__);
}
mv_attached.store(false);
if (reschedule && !shutdown_called)
if (!shutdown_called)
{
LOG_TRACE(log, "Reschedule S3 Queue thread func.");
/// Reschedule with backoff.
if (reschedule_processing_interval_ms < s3queue_settings->s3queue_polling_max_timeout_ms)
reschedule_processing_interval_ms += s3queue_settings->s3queue_polling_backoff_ms;
task->holder->scheduleAfter(reschedule_processing_interval_ms);
LOG_TRACE(log, "Reschedule S3 Queue processing thread in {} ms", reschedule_processing_interval_ms);
task->scheduleAfter(reschedule_processing_interval_ms);
}
}
void StorageS3Queue::streamToViews()
{
auto table_id = getStorageID();
@ -348,8 +301,6 @@ void StorageS3Queue::streamToViews()
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = table_id;
size_t block_size = 100;
auto s3queue_context = Context::createCopy(getContext());
s3queue_context->makeQueryContext();
auto query_configuration = updateConfigurationAndGetCopy(s3queue_context);
@ -358,40 +309,14 @@ void StorageS3Queue::streamToViews()
// Only insert into dependent views and expect that input blocks contain virtual columns
InterpreterInsertQuery interpreter(insert, s3queue_context, false, true, true);
auto block_io = interpreter.execute();
auto column_names = block_io.pipeline.getHeader().getNames();
// Create a stream for each consumer and join them in a union stream
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(s3queue_context, nullptr);
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
const size_t max_download_threads = s3queue_context->getSettingsRef().max_download_threads;
auto pipe = Pipe(std::make_shared<StorageS3QueueSource>(
read_from_format_info,
configuration.format,
getName(),
s3queue_context,
format_settings,
block_size,
query_configuration.request_settings,
configuration.compression_method,
query_configuration.client,
query_configuration.url.bucket,
query_configuration.url.version_id,
query_configuration.url.uri.getHost() + std::to_string(query_configuration.url.uri.getPort()),
iterator_wrapper,
files_metadata,
after_processing,
max_download_threads));
auto pipe = Pipe(createSource(block_io.pipeline.getHeader().getNames(), storage_snapshot, nullptr, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context));
std::atomic_size_t rows = 0;
{
block_io.pipeline.complete(std::move(pipe));
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();
}
block_io.pipeline.complete(std::move(pipe));
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();
}
StorageS3Queue::Configuration StorageS3Queue::updateConfigurationAndGetCopy(ContextPtr local_context)
@ -411,49 +336,40 @@ zkutil::ZooKeeperPtr StorageS3Queue::getZooKeeper() const
return zk_client;
}
bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot)
void StorageS3Queue::createOrCheckMetadata(const StorageInMemoryMetadata & storage_metadata)
{
auto zookeeper = getZooKeeper();
zookeeper->createAncestors(zk_path);
for (size_t i = 0; i < zk_create_table_retries; ++i)
for (size_t i = 0; i < 1000; ++i)
{
Coordination::Requests ops;
bool is_first_replica = true;
if (zookeeper->exists(zk_path + "/metadata"))
Coordination::Requests requests;
if (zookeeper->exists(zk_path / "metadata"))
{
if (!zookeeper->exists(zk_path + "/processing"))
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processing", "", zkutil::CreateMode::Ephemeral));
LOG_DEBUG(log, "This table {} is already created, will use existing metadata for checking engine settings", zk_path);
is_first_replica = false;
checkTableStructure(zk_path, storage_metadata);
}
else
{
String metadata_str = S3QueueTableMetadata(configuration, *s3queue_settings).toString();
ops.emplace_back(zkutil::makeCreateRequest(zk_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processed", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/failed", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processing", "", zkutil::CreateMode::Ephemeral));
ops.emplace_back(zkutil::makeCreateRequest(
zk_path + "/columns", metadata_snapshot->getColumns().toString(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/metadata", metadata_str, zkutil::CreateMode::Persistent));
std::string metadata = S3QueueTableMetadata(configuration, *s3queue_settings, storage_metadata).toString();
requests.emplace_back(zkutil::makeCreateRequest(zk_path, "", zkutil::CreateMode::Persistent));
requests.emplace_back(zkutil::makeCreateRequest(zk_path / "processed", "", zkutil::CreateMode::Persistent));
requests.emplace_back(zkutil::makeCreateRequest(zk_path / "failed", "", zkutil::CreateMode::Persistent));
requests.emplace_back(zkutil::makeCreateRequest(zk_path / "processing", "", zkutil::CreateMode::Persistent));
requests.emplace_back(zkutil::makeCreateRequest(zk_path / "metadata", metadata, zkutil::CreateMode::Persistent));
}
Coordination::Responses responses;
auto code = zookeeper->tryMulti(ops, responses);
auto code = zookeeper->tryMulti(requests, responses);
if (code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zk_path);
LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zk_path.string());
continue;
}
else if (code != Coordination::Error::ZOK)
{
zkutil::KeeperMultiException::check(code, ops, responses);
zkutil::KeeperMultiException::check(code, requests, responses);
}
return is_first_replica;
return;
}
throw Exception(
@ -463,24 +379,20 @@ bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_
}
/** Verify that list of columns and table settings match those specified in ZK (/metadata).
* If not, throw an exception.
*/
void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot)
void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const StorageInMemoryMetadata & storage_metadata)
{
// Verify that list of columns and table settings match those specified in ZK (/metadata).
// If not, throw an exception.
auto zookeeper = getZooKeeper();
S3QueueTableMetadata old_metadata(configuration, *s3queue_settings);
Coordination::Stat metadata_stat;
String metadata_str = zookeeper->get(fs::path(zookeeper_prefix) / "metadata", &metadata_stat);
String metadata_str = zookeeper->get(fs::path(zookeeper_prefix) / "metadata");
auto metadata_from_zk = S3QueueTableMetadata::parse(metadata_str);
S3QueueTableMetadata old_metadata(configuration, *s3queue_settings, storage_metadata);
old_metadata.checkEquals(metadata_from_zk);
Coordination::Stat columns_stat;
auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_prefix) / "columns", &columns_stat));
const ColumnsDescription & old_columns = metadata_snapshot->getColumns();
auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_prefix) / "columns"));
const ColumnsDescription & old_columns = storage_metadata.getColumns();
if (columns_from_zk != old_columns)
{
throw Exception(
@ -492,38 +404,12 @@ void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const
}
}
std::shared_ptr<StorageS3QueueSource::IIterator>
StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query)
std::shared_ptr<StorageS3Queue::FileIterator> StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query)
{
auto it = std::make_shared<StorageS3QueueSource::QueueGlobIterator>(
*configuration.client,
configuration.url,
query,
virtual_columns,
local_context,
s3queue_settings->s3queue_polling_size.value,
configuration.request_settings);
auto zookeeper = getZooKeeper();
auto lock = files_metadata->acquireLock(zookeeper);
S3QueueFilesMetadata::S3FilesCollection files_to_skip = files_metadata->getProcessedFailedAndProcessingFiles();
Strings files_to_process;
if (s3queue_settings->mode == S3QueueMode::UNORDERED)
{
files_to_process = it->filterProcessingFiles(s3queue_settings->mode, files_to_skip);
}
else
{
String max_processed_file = files_metadata->getMaxProcessedFile();
files_to_process = it->filterProcessingFiles(s3queue_settings->mode, files_to_skip, max_processed_file);
}
LOG_TEST(log, "Found files to process: {}", fmt::join(files_to_process, ", "));
files_metadata->setFilesProcessing(files_to_process);
return it;
auto glob_iterator = std::make_unique<StorageS3QueueSource::GlobIterator>(
*configuration.client, configuration.url, query, virtual_columns, local_context,
/* read_keys */nullptr, configuration.request_settings);
return std::make_shared<FileIterator>(files_metadata, std::move(glob_iterator));
}
void StorageS3Queue::drop()
@ -540,11 +426,15 @@ void registerStorageS3QueueImpl(const String & name, StorageFactory & factory)
[](const StorageFactory::Arguments & args)
{
if (!args.attach && !args.getLocalContext()->getSettingsRef().allow_experimental_s3queue)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3Queue is experimental. You can enable it with the `allow_experimental_s3queue` setting.");
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3Queue is experimental. "
"You can enable it with the `allow_experimental_s3queue` setting.");
}
auto & engine_args = args.engine_args;
if (engine_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
auto configuration = StorageS3::getConfiguration(engine_args, args.getLocalContext());
// Use format settings from global server context + settings from
@ -582,10 +472,6 @@ void registerStorageS3QueueImpl(const String & name, StorageFactory & factory)
format_settings = getFormatSettings(args.getContext());
}
ASTPtr partition_by;
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
return std::make_shared<StorageS3Queue>(
std::move(s3queue_settings),
std::move(configuration),
@ -594,12 +480,10 @@ void registerStorageS3QueueImpl(const String & name, StorageFactory & factory)
args.constraints,
args.comment,
args.getContext(),
format_settings,
partition_by);
format_settings);
},
{
.supports_settings = true,
.supports_sort_order = true, // for partition by
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});

View File

@ -4,29 +4,14 @@
#if USE_AWS_S3
# include <Core/Types.h>
# include <Compression/CompressionInfo.h>
# include <Common/ZooKeeper/ZooKeeper.h>
# include <Core/BackgroundSchedulePool.h>
# include <Storages/IStorage.h>
# include <Storages/S3Queue/S3QueueFilesMetadata.h>
# include <Storages/S3Queue/S3QueueSettings.h>
# include <Storages/S3Queue/S3QueueSource.h>
# include <Storages/StorageS3Settings.h>
# include <IO/CompressionMethod.h>
# include <IO/S3/getObjectInfo.h>
# include <Interpreters/Context.h>
# include <Interpreters/threadPoolCallbackRunner.h>
# include <Processors/Executors/PullingPipelineExecutor.h>
# include <Processors/ISource.h>
# include <Storages/Cache/SchemaCache.h>
# include <Storages/StorageConfiguration.h>
# include <Storages/StorageS3.h>
# include <Poco/URI.h>
# include <Common/logger_useful.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/logger_useful.h>
#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/S3Queue/S3QueueSource.h>
#include <Storages/StorageS3.h>
#include <Interpreters/Context.h>
namespace Aws::S3
{
@ -35,7 +20,7 @@ class Client;
namespace DB
{
class S3QueueFilesMetadata;
class StorageS3Queue : public IStorage, WithContext
{
@ -50,8 +35,7 @@ public:
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
ASTPtr partition_by_ = nullptr);
std::optional<FormatSettings> format_settings_);
String getName() const override { return "S3Queue"; }
@ -64,79 +48,58 @@ public:
size_t max_block_size,
size_t num_streams) override;
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
bool async_insert) override;
void truncate(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr /*local_context*/,
TableExclusiveLockHolder &) override;
NamesAndTypesList getVirtuals() const override;
bool supportsPartitionBy() const override;
NamesAndTypesList getVirtuals() const override { return virtual_columns; }
const auto & getFormatName() const { return configuration.format; }
const String & getZooKeeperPath() const { return zk_path; }
const fs::path & getZooKeeperPath() const { return zk_path; }
zkutil::ZooKeeperPtr getZooKeeper() const;
private:
using FileIterator = StorageS3QueueSource::FileIterator;
const std::unique_ptr<S3QueueSettings> s3queue_settings;
const fs::path zk_path;
const S3QueueAction after_processing;
std::shared_ptr<S3QueueFilesMetadata> files_metadata;
Configuration configuration;
const std::optional<FormatSettings> format_settings;
NamesAndTypesList virtual_columns;
UInt64 reschedule_processing_interval_ms;
std::optional<FormatSettings> format_settings;
ASTPtr partition_by;
String zk_path;
mutable zkutil::ZooKeeperPtr zk_client;
mutable std::mutex zk_mutex;
BackgroundSchedulePool::TaskHolder task;
std::atomic<bool> stream_cancelled{false};
UInt64 reschedule_processing_interval_ms;
std::atomic<bool> mv_attached = false;
std::atomic<bool> shutdown_called{false};
std::atomic<bool> shutdown_called = false;
Poco::Logger * log;
bool supportsSubcolumns() const override;
bool withGlobs() const { return configuration.url.key.find_first_of("*?{") != std::string::npos; }
void threadFunc();
size_t getTableDependentCount() const;
bool hasDependencies(const StorageID & table_id);
void startup() override;
void shutdown() override;
void drop() override;
struct TaskContext
{
BackgroundSchedulePool::TaskHolder holder;
std::atomic<bool> stream_cancelled{false};
explicit TaskContext(BackgroundSchedulePool::TaskHolder && task_) : holder(std::move(task_)) { }
};
std::shared_ptr<TaskContext> task;
bool supportsSubsetOfColumns() const override;
bool supportsSubcolumns() const override { return true; }
const UInt32 zk_create_table_retries = 1000;
bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);
void checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot);
using KeysWithInfo = StorageS3QueueSource::KeysWithInfo;
std::shared_ptr<StorageS3QueueSource::IIterator>
createFileIterator(ContextPtr local_context, ASTPtr query);
std::shared_ptr<FileIterator> createFileIterator(ContextPtr local_context, ASTPtr query);
std::shared_ptr<StorageS3QueueSource> createSource(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
ASTPtr query,
size_t max_block_size,
ContextPtr local_context);
bool hasDependencies(const StorageID & table_id);
void streamToViews();
void threadFunc();
void createOrCheckMetadata(const StorageInMemoryMetadata & storage_metadata);
void checkTableStructure(const String & zookeeper_prefix, const StorageInMemoryMetadata & storage_metadata);
Configuration updateConfigurationAndGetCopy(ContextPtr local_context);
};

View File

@ -111,6 +111,7 @@ def generate_random_files(
to_generate = [
(f"{prefix}/test_{i}.csv", i) for i in range(start_ind, start_ind + count)
]
print(f"Generating files: {to_generate}")
to_generate.sort(key=lambda x: x[0])
for filename, i in to_generate:
@ -179,29 +180,58 @@ def run_query(instance, query, stdin=None, settings=None):
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_delete_after_processing(started_cluster, mode):
prefix = "delete"
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
node = started_cluster.instances["instance"]
total_values = generate_random_files(5, prefix, started_cluster, bucket)
instance.query(
table_name = "test.delete_after_processing"
dst_table_name = "test.delete_after_processing_dst"
mv_name = "test.delete_after_processing_mv"
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
files_num = 5
row_num = 10
prefix = "delete"
total_values = generate_random_files(
files_num, prefix, started_cluster, bucket, row_num=row_num
)
node.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
DROP TABLE IF EXISTS {table_name};
DROP TABLE IF EXISTS {dst_table_name};
DROP TABLE IF EXISTS {mv_name};
CREATE TABLE {table_name} ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_delete_{mode}',
s3queue_loading_retries = 3,
after_processing='delete';
CREATE TABLE {dst_table_name} ({table_format}, _path String)
ENGINE = MergeTree()
ORDER BY column1;
CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT *, _path FROM {table_name};
"""
)
get_query = f"SELECT * FROM test.s3_queue ORDER BY column1, column2, column3"
expected_count = files_num * row_num
for _ in range(100):
count = int(node.query(f"SELECT count() FROM {dst_table_name}"))
print(f"{count}/{expected_count}")
if count == expected_count:
break
time.sleep(1)
assert int(node.query(f"SELECT count() FROM {dst_table_name}")) == expected_count
assert int(node.query(f"SELECT uniq(_path) FROM {dst_table_name}")) == files_num
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name} ORDER BY column1, column2, column3"
).splitlines()
] == sorted(total_values, key=lambda x: (x[0], x[1], x[2]))
minio = started_cluster.minio_client
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
assert len(objects) == 0