handle failed files

This commit is contained in:
Sergey Katkovskiy 2023-05-10 18:17:26 +03:00
parent d37622434f
commit 446cf3c847
8 changed files with 425 additions and 163 deletions

View File

@ -94,6 +94,7 @@ class IColumn;
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(String, s3queue_default_zookeeper_path, "", "Default zookeeper path prefix for S3Queue engine", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \
M(Bool, hdfs_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in hdfs engine tables", 0) \

View File

@ -16,20 +16,14 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int NO_ZOOKEEPER;
extern const int TIMEOUT_EXCEEDED;
}
S3QueueHolder::ProcessedCollection::ProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_)
: max_size(max_size_), max_age(max_age_)
{
}
void S3QueueHolder::ProcessedCollection::read(ReadBuffer & in)
void S3QueueHolder::S3QueueCollection::read(ReadBuffer & in)
{
files = {};
in >> "processed_files\n";
in >> "collection:\n";
while (!in.eof())
{
String file_name;
@ -41,9 +35,9 @@ void S3QueueHolder::ProcessedCollection::read(ReadBuffer & in)
}
}
void S3QueueHolder::ProcessedCollection::write(WriteBuffer & out) const
void S3QueueHolder::S3QueueCollection::write(WriteBuffer & out) const
{
out << "processed_files\n";
out << "collection:\n";
for (const auto & processed_file : files)
{
out << processed_file.first << "\n";
@ -51,7 +45,30 @@ void S3QueueHolder::ProcessedCollection::write(WriteBuffer & out) const
}
}
void S3QueueHolder::ProcessedCollection::parse(const String & s)
String S3QueueHolder::S3QueueCollection::toString() const
{
WriteBufferFromOwnString out;
write(out);
return out.str();
}
S3QueueHolder::S3FilesCollection S3QueueHolder::S3QueueCollection::getFileNames()
{
S3FilesCollection keys = {};
for (const auto & pair : files)
{
keys.insert(pair.first);
}
return keys;
}
S3QueueHolder::S3QueueProcessedCollection::S3QueueProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_)
: max_size(max_size_), max_age(max_age_)
{
}
void S3QueueHolder::S3QueueProcessedCollection::parse(const String & s)
{
ReadBufferFromString buf(s);
read(buf);
@ -69,14 +86,8 @@ void S3QueueHolder::ProcessedCollection::parse(const String & s)
}
}
String S3QueueHolder::ProcessedCollection::toString() const
{
WriteBufferFromOwnString out;
write(out);
return out.str();
}
void S3QueueHolder::ProcessedCollection::add(const String & file_name)
void S3QueueHolder::S3QueueProcessedCollection::add(const String & file_name)
{
Int64 timestamp = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
auto pair = std::make_pair(file_name, timestamp);
@ -89,22 +100,62 @@ void S3QueueHolder::ProcessedCollection::add(const String & file_name)
}
}
S3QueueHolder::S3FilesCollection S3QueueHolder::ProcessedCollection::getFileNames()
S3QueueHolder::S3QueueFailedCollection::S3QueueFailedCollection(const UInt64 & max_retries_count_) : max_retries_count(max_retries_count_)
{
S3FilesCollection keys = {};
for (auto & pair : files)
{
keys.insert(pair.first);
}
return keys;
}
void S3QueueHolder::S3QueueFailedCollection::parse(const String & s)
{
ReadBufferFromString buf(s);
read(buf);
}
bool S3QueueHolder::S3QueueFailedCollection::add(const String & file_name)
{
auto failed_it
= std::find_if(files.begin(), files.end(), [&file_name](const std::pair<String, Int64> & s) { return s.first == file_name; });
if (failed_it != files.end())
{
failed_it->second--;
if (failed_it->second == 0)
{
return false;
}
}
else
{
auto pair = std::make_pair(file_name, max_retries_count);
files.push_back(pair);
}
return true;
}
S3QueueHolder::S3FilesCollection S3QueueHolder::S3QueueFailedCollection::getFilesWithoutRetries()
{
S3FilesCollection failed_keys;
for (const auto & pair : files)
{
if (pair.second <= 0)
{
failed_keys.insert(pair.first);
}
}
return failed_keys;
}
S3QueueHolder::S3QueueHolder(
const String & zookeeper_path_, const S3QueueMode & mode_, ContextPtr context_, UInt64 & max_set_size_, UInt64 & max_set_age_s_)
const String & zookeeper_path_,
const S3QueueMode & mode_,
ContextPtr context_,
UInt64 & max_set_size_,
UInt64 & max_set_age_s_,
UInt64 & max_loading_retries_)
: WithContext(context_)
, max_set_size(max_set_size_)
, max_set_age_s(max_set_age_s_)
, max_loading_retries(max_loading_retries_)
, zookeeper_path(zookeeper_path_)
, zookeeper_failed_path(zookeeper_path_ + "/failed")
, zookeeper_processing_path(zookeeper_path_ + "/processing")
@ -140,7 +191,7 @@ void S3QueueHolder::setFileProcessed(const String & file_path)
if (mode == S3QueueMode::UNORDERED)
{
String processed_files = zookeeper->get(zookeeper_processed_path);
auto processed = ProcessedCollection(max_set_size, max_set_age_s);
auto processed = S3QueueProcessedCollection(max_set_size, max_set_age_s);
processed.parse(processed_files);
processed.add(file_path);
zookeeper->set(zookeeper_processed_path, processed.toString());
@ -153,144 +204,111 @@ void S3QueueHolder::setFileProcessed(const String & file_path)
zookeeper->set(zookeeper_processed_path, file_path);
}
}
String node_data;
Strings file_paths;
if (zookeeper->tryGet(fs::path(zookeeper_processing_path), node_data))
{
S3FilesCollection processing_files = parseCollection(node_data);
for (const auto & x : processing_files)
{
if (x != file_path)
{
file_paths.push_back(x);
}
}
}
zookeeper->set(fs::path(zookeeper_processing_path), toString(file_paths));
removeProcessingFile(file_path);
}
void S3QueueHolder::setFileFailed(const String & file_path)
bool S3QueueHolder::markFailedAndCheckRetry(const String & file_path)
{
auto zookeeper = getZooKeeper();
auto lock = AcquireLock();
String failed_files = zookeeper->get(zookeeper_failed_path);
S3FilesCollection failed = parseCollection(failed_files);
auto failed_collection = S3QueueFailedCollection(max_loading_retries);
failed_collection.parse(failed_files);
bool retry_later = failed_collection.add(file_path);
failed.insert(file_path);
Strings set_failed;
set_failed.insert(set_failed.end(), failed.begin(), failed.end());
zookeeper->set(zookeeper_failed_path, failed_collection.toString());
removeProcessingFile(file_path);
zookeeper->set(zookeeper_failed_path, toString(set_failed));
return retry_later;
}
S3QueueHolder::S3FilesCollection S3QueueHolder::parseCollection(String & files)
S3QueueHolder::S3FilesCollection S3QueueHolder::getFailedFiles()
{
ReadBuffer rb(const_cast<char *>(reinterpret_cast<const char *>(files.data())), files.length(), 0);
Strings deserialized;
try
{
readQuoted(deserialized, rb);
}
catch (...)
{
deserialized = {};
}
std::unordered_set<String> processed(deserialized.begin(), deserialized.end());
return processed;
}
S3QueueHolder::S3FilesCollection S3QueueHolder::getExcludedFiles()
{
std::unordered_set<String> exclude_files;
auto zookeeper = getZooKeeper();
String failed_files = zookeeper->get(zookeeper_failed_path);
String failed = zookeeper->get(zookeeper_failed_path);
S3FilesCollection failed_files = parseCollection(failed);
exclude_files.merge(failed_files);
auto failed_collection = S3QueueFailedCollection(max_loading_retries);
failed_collection.parse(failed_files);
String processed = zookeeper->get(zookeeper_processed_path);
if (mode != S3QueueMode::ORDERED)
{
auto collection = ProcessedCollection(max_set_size, max_set_age_s);
collection.parse(processed);
S3FilesCollection processed_files = collection.getFileNames();
exclude_files.merge(processed_files);
}
else
{
exclude_files.insert(processed);
}
String processing = zookeeper->get(fs::path(zookeeper_processing_path));
S3FilesCollection processing_files = parseCollection(processing);
exclude_files.merge(processing_files);
return exclude_files;
return failed_collection.getFilesWithoutRetries();
}
String S3QueueHolder::getMaxProcessedFile()
{
if (mode != S3QueueMode::ORDERED)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getMaxProcessedFile not implemented for unordered mode");
auto zookeeper = getZooKeeper();
String processed = zookeeper->get(zookeeper_path + "/processed");
String processed = zookeeper->get(zookeeper_processed_path);
return processed;
}
S3QueueHolder::S3FilesCollection S3QueueHolder::getProcessingFiles()
{
auto zookeeper = getZooKeeper();
String processing = zookeeper->get(fs::path(zookeeper_processing_path));
return parseCollection(processing);
}
void S3QueueHolder::setFilesProcessing(Strings & file_paths)
{
auto zookeeper = getZooKeeper();
String node_data;
if (zookeeper->tryGet(fs::path(zookeeper_processing_path), node_data))
for (const auto & x : getProcessingFiles())
{
S3FilesCollection processing_files = parseCollection(node_data);
for (const auto & x : processing_files)
if (!std::count(file_paths.begin(), file_paths.end(), x))
{
if (!std::count(file_paths.begin(), file_paths.end(), x))
{
file_paths.push_back(x);
}
file_paths.push_back(x);
}
}
zookeeper->set(fs::path(zookeeper_processing_path), toString(file_paths));
}
S3QueueHolder::S3FilesCollection S3QueueHolder::getFailedFiles()
S3QueueHolder::S3FilesCollection S3QueueHolder::getUnorderedProcessedFiles()
{
auto zookeeper = getZooKeeper();
auto lock = AcquireLock();
String failed = zookeeper->get(zookeeper_failed_path);
return parseCollection(failed);
}
S3QueueHolder::S3FilesCollection S3QueueHolder::getProcessedFiles()
{
auto zookeeper = getZooKeeper();
auto lock = AcquireLock();
String processed = zookeeper->get(zookeeper_processed_path);
auto collection = ProcessedCollection(max_set_size, max_set_age_s);
auto collection = S3QueueProcessedCollection(max_set_size, max_set_age_s);
collection.parse(processed);
return collection.getFileNames();
}
S3QueueHolder::S3FilesCollection S3QueueHolder::getProcessingFiles()
S3QueueHolder::S3FilesCollection S3QueueHolder::getExcludedFiles()
{
auto zookeeper = getZooKeeper();
auto lock = AcquireLock();
String processing = zookeeper->get(fs::path(zookeeper_processing_path));
return parseCollection(processing);
S3FilesCollection exclude_files = getFailedFiles();
if (mode == S3QueueMode::UNORDERED)
{
S3FilesCollection processed_files = getUnorderedProcessedFiles();
exclude_files.merge(processed_files);
}
else
{
String processed = getMaxProcessedFile();
exclude_files.insert(processed);
}
S3FilesCollection processing_files = getProcessingFiles();
exclude_files.merge(processing_files);
return exclude_files;
}
void S3QueueHolder::removeProcessingFile(const String & file_path)
{
auto zookeeper = getZooKeeper();
String node_data;
Strings file_paths;
String processing = zookeeper->get(zookeeper_processing_path);
S3FilesCollection processing_files = parseCollection(processing);
file_paths.insert(file_paths.end(), processing_files.begin(), processing_files.end());
file_paths.erase(std::remove(file_paths.begin(), file_paths.end(), file_path), file_paths.end());
zookeeper->set(fs::path(zookeeper_processing_path), toString(file_paths));
}
std::shared_ptr<zkutil::EphemeralNodeHolder> S3QueueHolder::AcquireLock()
{
@ -322,6 +340,23 @@ std::shared_ptr<zkutil::EphemeralNodeHolder> S3QueueHolder::AcquireLock()
}
}
S3QueueHolder::S3FilesCollection S3QueueHolder::parseCollection(String & files)
{
ReadBuffer rb(const_cast<char *>(reinterpret_cast<const char *>(files.data())), files.length(), 0);
Strings deserialized;
try
{
readQuoted(deserialized, rb);
}
catch (...)
{
deserialized = {};
}
std::unordered_set<String> processed(deserialized.begin(), deserialized.end());
return processed;
}
}

View File

@ -14,44 +14,73 @@ class S3QueueHolder : public WithContext
public:
using S3FilesCollection = std::unordered_set<String>;
using ProcessedFiles = std::vector<std::pair<String, Int64>>;
using FailedFiles = std::vector<std::pair<String, Int64>>;
S3QueueHolder(
const String & zookeeper_path_, const S3QueueMode & mode_, ContextPtr context_, UInt64 & max_set_size_, UInt64 & max_set_age_s_);
const String & zookeeper_path_,
const S3QueueMode & mode_,
ContextPtr context_,
UInt64 & max_set_size_,
UInt64 & max_set_age_s_,
UInt64 & max_loading_retries_);
void setFileProcessed(const String & file_path);
void setFileFailed(const String & file_path);
bool markFailedAndCheckRetry(const String & file_path);
void setFilesProcessing(Strings & file_paths);
static S3FilesCollection parseCollection(String & files);
S3FilesCollection getExcludedFiles();
String getMaxProcessedFile();
S3FilesCollection getFailedFiles();
S3FilesCollection getProcessedFiles();
S3FilesCollection getProcessingFiles();
std::shared_ptr<zkutil::EphemeralNodeHolder> AcquireLock();
struct ProcessedCollection
struct S3QueueCollection
{
ProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_);
void parse(const String & s);
public:
virtual ~S3QueueCollection() = default;
String toString() const;
void add(const String & file_name);
S3FilesCollection getFileNames();
const UInt64 max_size;
const UInt64 max_age;
virtual void parse(const String & s) = 0;
protected:
ProcessedFiles files;
void read(ReadBuffer & in);
void write(WriteBuffer & out) const;
ProcessedFiles files;
};
const UInt64 max_set_size;
const UInt64 max_set_age_s;
struct S3QueueProcessedCollection : public S3QueueCollection
{
public:
S3QueueProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_);
void parse(const String & s) override;
void add(const String & file_name);
private:
const UInt64 max_size;
const UInt64 max_age;
};
struct S3QueueFailedCollection : S3QueueCollection
{
public:
S3QueueFailedCollection(const UInt64 & max_retries_count_);
void parse(const String & s) override;
bool add(const String & file_name);
S3FilesCollection getFilesWithoutRetries();
private:
const UInt64 max_retries_count;
};
private:
const UInt64 max_set_size;
const UInt64 max_set_age_s;
const UInt64 max_loading_retries;
zkutil::ZooKeeperPtr current_zookeeper;
mutable std::mutex current_zookeeper_mutex;
mutable std::mutex mutex;
@ -66,6 +95,13 @@ private:
zkutil::ZooKeeperPtr tryGetZooKeeper() const;
zkutil::ZooKeeperPtr getZooKeeper() const;
S3FilesCollection getFailedFiles();
S3FilesCollection getProcessingFiles();
S3FilesCollection getUnorderedProcessedFiles();
void removeProcessingFile(const String & file_path);
static S3FilesCollection parseCollection(String & files);
};

View File

@ -14,16 +14,21 @@ class ASTStorage;
M(S3QueueMode, \
mode, \
S3QueueMode::ORDERED, \
"With unordered mode, the set of all already processed files is tracked with persistent nodes in ZooKepeer", \
"With unordered mode, the set of all already processed files is tracked with persistent nodes in ZooKepeer." \
"With ordered mode, only the max name of the successfully consumed file stored.", \
0) \
M(S3QueueAction, after_processing, S3QueueAction::KEEP, "Delete, keep or move file in S3 after processing", 0) \
M(S3QueueAction, after_processing, S3QueueAction::KEEP, "Delete or keep file in S3 after successful processing", 0) \
M(String, keeper_path, "", "Zookeeper node path", 0) \
M(UInt64, s3queue_max_retries, 0, "Retry loading up to specified number of times", 0) \
M(UInt64, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \
M(UInt64, s3queue_polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
M(UInt64, s3queue_polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
M(UInt64, s3queue_polling_backoff_ms, 0, "Retry loading up to specified number of times", 0) \
M(UInt64, s3queue_polling_backoff_ms, 0, "Polling backoff", 0) \
M(UInt64, s3queue_max_set_size, 1000, "Max set size for tracking processed files in unordered mode in ZooKeeper", 0) \
M(UInt64, s3queue_max_set_age_s, 0, "Maximum number of seconds to store processed files in ZooKeeper node (forever default)", 0) \
M(UInt64, \
s3queue_max_set_age_s, \
0, \
"Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", \
0) \
M(UInt64, s3queue_polling_size, 50, "Maximum files to fetch from S3 with SELECT", 0)
#define LIST_OF_S3QUEUE_SETTINGS(M, ALIAS) \

View File

@ -69,6 +69,11 @@ extern const Event S3ListObjects;
namespace DB
{
namespace ErrorCodes
{
extern const int S3_ERROR;
}
StorageS3QueueSource::QueueGlobIterator::QueueGlobIterator(
const S3::Client & client_,
@ -244,13 +249,15 @@ Chunk StorageS3QueueSource::generate()
}
Chunk chunk;
bool success_in_pulling = false;
String file_path;
try
{
if (reader->pull(chunk))
{
UInt64 num_rows = chunk.getNumRows();
const auto & file_path = reader.getPath();
file_path = reader.getPath();
size_t total_size = file_iterator->getTotalSize();
if (num_rows && total_size)
{
@ -271,16 +278,21 @@ Chunk StorageS3QueueSource::generate()
chunk.addColumn(column->convertToFullColumnIfConst());
}
}
queue_holder->setFileProcessed(file_path);
applyActionAfterProcessing(file_path);
return chunk;
success_in_pulling = true;
}
}
catch (const Exception & e)
{
LOG_ERROR(log, "Exception in chunk pulling: {} ", e.displayText());
const auto & failed_file_path = reader.getPath();
queue_holder->setFileFailed(failed_file_path);
queue_holder->markFailedAndCheckRetry(failed_file_path);
success_in_pulling = false;
}
if (success_in_pulling)
{
applyActionAfterProcessing(file_path);
queue_holder->setFileProcessed(file_path);
return chunk;
}
@ -298,14 +310,28 @@ Chunk StorageS3QueueSource::generate()
return {};
}
void StorageS3QueueSource::applyActionAfterProcessing(const String & file_path)
{
LOG_WARNING(log, "Delete {} Bucket {}", file_path, bucket);
switch (action)
{
case S3QueueAction::DELETE:
deleteProcessedObject(file_path);
break;
case S3QueueAction::KEEP:
break;
}
}
void StorageS3QueueSource::deleteProcessedObject(const String & file_path)
{
LOG_WARNING(log, "Delete processed file {} from bucket {}", file_path, bucket);
S3::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetKey(file_path);
String delete_key = file_path.substr(bucket.length() + 1);
request.WithKey(delete_key).WithBucket(bucket);
auto outcome = client->DeleteObject(request);
if (!outcome.IsSuccess() && !S3::isNotFoundError(outcome.GetError().GetErrorType()))
if (!outcome.IsSuccess())
{
const auto & err = outcome.GetError();
LOG_ERROR(log, "{} (Code: {})", err.GetMessage(), static_cast<size_t>(err.GetErrorType()));
@ -316,7 +342,6 @@ void StorageS3QueueSource::applyActionAfterProcessing(const String & file_path)
}
}
}
#endif

View File

@ -123,6 +123,7 @@ private:
mutable std::mutex mutex;
std::shared_ptr<StorageS3Source> internal_source;
void deleteProcessedObject(const String & file_path);
void applyActionAfterProcessing(const String & file_path);
};

View File

@ -59,11 +59,6 @@
namespace fs = std::filesystem;
//namespace CurrentMetrics
//{
//extern const Metric S3QueueBackgroundReads;
//}
namespace ProfileEvents
{
extern const Event S3DeleteObjects;
@ -131,7 +126,14 @@ StorageS3Queue::StorageS3Queue(
auto table_id = getStorageID();
auto database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
bool is_in_replicated_database = database->getEngineName() == "Replicated";
if (is_in_replicated_database)
auto default_path = getContext()->getSettingsRef().s3queue_default_zookeeper_path.value;
if (default_path != "")
{
zookeeper_path
= zkutil::extractZooKeeperPath(fs::path(default_path) / toString(table_id.uuid), /* check_starts_with_slash */ true, log);
}
else if (is_in_replicated_database)
{
LOG_INFO(log, "S3Queue engine keeper_path not specified. Use replicated database zookeeper path");
String base_zookeeper_path = assert_cast<const DatabaseReplicated *>(database.get())->getZooKeeperPath();
@ -140,7 +142,10 @@ StorageS3Queue::StorageS3Queue(
}
else
{
throw Exception(ErrorCodes::NO_ZOOKEEPER, "S3Queue zookeeper path not specified and table not in replicated database.");
throw Exception(
ErrorCodes::NO_ZOOKEEPER,
"S3Queue keeper_path engine setting not specified, s3queue_default_zookeeper_path_prefix not specified and table not in "
"replicated database.");
}
}
else
@ -176,7 +181,12 @@ StorageS3Queue::StorageS3Queue(
}
queue_holder = std::make_unique<S3QueueHolder>(
zookeeper_path, mode, getContext(), s3queue_settings->s3queue_max_set_size.value, s3queue_settings->s3queue_max_set_age_s.value);
zookeeper_path,
mode,
getContext(),
s3queue_settings->s3queue_max_set_size.value,
s3queue_settings->s3queue_max_set_age_s.value,
s3queue_settings->s3queue_loading_retries.value);
auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
@ -187,7 +197,7 @@ StorageS3Queue::StorageS3Queue(
for (const auto & column : virtual_columns)
virtual_block.insert({column.type->createColumn(), column.type, column.name});
auto poll_thread = context_->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
auto poll_thread = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
task = std::make_shared<TaskContext>(std::move(poll_thread));
}
@ -551,8 +561,8 @@ bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_
{
String metadata_str = S3QueueTableMetadata(s3_configuration, *s3queue_settings).toString();
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/processed", "processed_files\n", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/failed", "[]", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/processed", "collection:\n", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/failed", "collection:\n", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/processing", "", zkutil::CreateMode::Ephemeral));
ops.emplace_back(zkutil::makeCreateRequest(
zookeeper_path + "/columns", metadata_snapshot->getColumns().toString(), zkutil::CreateMode::Persistent));

View File

@ -7,7 +7,72 @@ import time
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.s3_tools import prepare_s3_bucket
import json
"""
export CLICKHOUSE_TESTS_SERVER_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-server
export CLICKHOUSE_TESTS_CLIENT_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-client
export CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-odbc-bridge
export CLICKHOUSE_TESTS_BASE_CONFIG_DIR=/home/sergey/vkr/ClickHouse/programs/server
"""
def prepare_s3_bucket(started_cluster):
# Allows read-write access for bucket without authorization.
bucket_read_write_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetBucketLocation",
"Resource": "arn:aws:s3:::root",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::root",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::root/*",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::root/*",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:DeleteObject",
"Resource": "arn:aws:s3:::root/*",
},
],
}
minio_client = started_cluster.minio_client
minio_client.set_bucket_policy(
started_cluster.minio_bucket, json.dumps(bucket_read_write_policy)
)
started_cluster.minio_restricted_bucket = "{}-with-auth".format(
started_cluster.minio_bucket
)
if minio_client.bucket_exists(started_cluster.minio_restricted_bucket):
minio_client.remove_bucket(started_cluster.minio_restricted_bucket)
minio_client.make_bucket(started_cluster.minio_restricted_bucket)
@pytest.fixture(autouse=True)
@ -112,6 +177,90 @@ def run_query(instance, query, stdin=None, settings=None):
return result
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_delete_after_processing(started_cluster, mode):
prefix = "delete"
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
total_values = generate_random_files(5, prefix, started_cluster, bucket)
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_delete_{mode}',
s3queue_loading_retries = 3,
after_processing='delete';
"""
)
get_query = f"SELECT * FROM test.s3_queue"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == total_values
minio = started_cluster.minio_client
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
assert len(objects) == 0
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_failed_retry(started_cluster, mode):
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = [
["failed", 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
filename = f"test.csv"
put_s3_file_content(started_cluster, bucket, filename, values_csv)
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/select_failed_retry_{mode}',
s3queue_loading_retries = 3;
"""
)
# first try
get_query = f"SELECT * FROM test.s3_queue"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
# second try
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
# upload correct file
values = [
[1, 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
put_s3_file_content(started_cluster, bucket, filename, values_csv)
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == values
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_direct_select_file(started_cluster, mode):
auth = "'minio','minio123',"