Merge pull request #65716 from ClickHouse/backport/24.6/65046

Backport #65046 to 24.6: S3Queue improvements
This commit is contained in:
Max K 2024-06-27 12:17:14 +02:00 committed by GitHub
commit 9344f445f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 739 additions and 165 deletions

View File

@ -193,21 +193,21 @@ Chunk StorageObjectStorageSource::generate()
progress(num_rows, chunk_size ? chunk_size : chunk.bytes()); progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
const auto & object_info = reader.getObjectInfo(); const auto & object_info = reader.getObjectInfo();
const auto & filename = object_info.getFileName(); const auto & filename = object_info->getFileName();
chassert(object_info.metadata); chassert(object_info->metadata);
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk( VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
chunk, read_from_format_info.requested_virtual_columns, chunk, read_from_format_info.requested_virtual_columns,
{ {
.path = getUniqueStoragePathIdentifier(*configuration, reader.getObjectInfo(), false), .path = getUniqueStoragePathIdentifier(*configuration, *object_info, false),
.size = object_info.metadata->size_bytes, .size = object_info->metadata->size_bytes,
.filename = &filename, .filename = &filename,
.last_modified = object_info.metadata->last_modified .last_modified = object_info->metadata->last_modified
}); });
return chunk; return chunk;
} }
if (reader.getInputFormat() && getContext()->getSettingsRef().use_cache_for_count_from_files) if (reader.getInputFormat() && getContext()->getSettingsRef().use_cache_for_count_from_files)
addNumRowsToCache(reader.getObjectInfo(), total_rows_in_file); addNumRowsToCache(*reader.getObjectInfo(), total_rows_in_file);
total_rows_in_file = 0; total_rows_in_file = 0;

View File

@ -100,7 +100,7 @@ protected:
PullingPipelineExecutor * operator->() { return reader.get(); } PullingPipelineExecutor * operator->() { return reader.get(); }
const PullingPipelineExecutor * operator->() const { return reader.get(); } const PullingPipelineExecutor * operator->() const { return reader.get(); }
const ObjectInfo & getObjectInfo() const { return *object_info; } ObjectInfoPtr getObjectInfo() const { return object_info; }
const IInputFormat * getInputFormat() const { return dynamic_cast<const IInputFormat *>(source.get()); } const IInputFormat * getInputFormat() const { return dynamic_cast<const IInputFormat *>(source.get()); }
private: private:

View File

@ -35,6 +35,11 @@ namespace
} }
} }
void S3QueueIFileMetadata::FileStatus::setProcessingEndTime()
{
processing_end_time = now();
}
void S3QueueIFileMetadata::FileStatus::onProcessing() void S3QueueIFileMetadata::FileStatus::onProcessing()
{ {
state = FileStatus::State::Processing; state = FileStatus::State::Processing;
@ -44,13 +49,15 @@ void S3QueueIFileMetadata::FileStatus::onProcessing()
void S3QueueIFileMetadata::FileStatus::onProcessed() void S3QueueIFileMetadata::FileStatus::onProcessed()
{ {
state = FileStatus::State::Processed; state = FileStatus::State::Processed;
processing_end_time = now(); if (!processing_end_time)
setProcessingEndTime();
} }
void S3QueueIFileMetadata::FileStatus::onFailed(const std::string & exception) void S3QueueIFileMetadata::FileStatus::onFailed(const std::string & exception)
{ {
state = FileStatus::State::Failed; state = FileStatus::State::Failed;
processing_end_time = now(); if (!processing_end_time)
setProcessingEndTime();
std::lock_guard lock(last_exception_mutex); std::lock_guard lock(last_exception_mutex);
last_exception = exception; last_exception = exception;
} }
@ -120,7 +127,14 @@ S3QueueIFileMetadata::~S3QueueIFileMetadata()
{ {
if (processing_id_version.has_value()) if (processing_id_version.has_value())
{ {
file_status->onFailed("Uncaught exception"); if (file_status->getException().empty())
{
if (std::current_exception())
file_status->onFailed(getCurrentExceptionMessage(true));
else
file_status->onFailed("Unprocessed exception");
}
LOG_TEST(log, "Removing processing node in destructor for file: {}", path); LOG_TEST(log, "Removing processing node in destructor for file: {}", path);
try try
{ {
@ -227,7 +241,16 @@ void S3QueueIFileMetadata::setProcessed()
ProfileEvents::increment(ProfileEvents::S3QueueProcessedFiles); ProfileEvents::increment(ProfileEvents::S3QueueProcessedFiles);
file_status->onProcessed(); file_status->onProcessed();
setProcessedImpl();
try
{
setProcessedImpl();
}
catch (...)
{
file_status->onFailed(getCurrentExceptionMessage(true));
throw;
}
processing_id.reset(); processing_id.reset();
processing_id_version.reset(); processing_id_version.reset();
@ -235,18 +258,36 @@ void S3QueueIFileMetadata::setProcessed()
LOG_TRACE(log, "Set file {} as processed (rows: {})", path, file_status->processed_rows); LOG_TRACE(log, "Set file {} as processed (rows: {})", path, file_status->processed_rows);
} }
void S3QueueIFileMetadata::setFailed(const std::string & exception) void S3QueueIFileMetadata::setFailed(const std::string & exception_message, bool reduce_retry_count, bool overwrite_status)
{ {
LOG_TRACE(log, "Setting file {} as failed (exception: {}, path: {})", path, exception, failed_node_path); LOG_TRACE(log, "Setting file {} as failed (path: {}, reduce retry count: {}, exception: {})",
path, failed_node_path, reduce_retry_count, exception_message);
ProfileEvents::increment(ProfileEvents::S3QueueFailedFiles); ProfileEvents::increment(ProfileEvents::S3QueueFailedFiles);
file_status->onFailed(exception); if (overwrite_status || file_status->state != FileStatus::State::Failed)
node_metadata.last_exception = exception; file_status->onFailed(exception_message);
if (max_loading_retries == 0) node_metadata.last_exception = exception_message;
setFailedNonRetriable();
else if (reduce_retry_count)
setFailedRetriable(); {
try
{
if (max_loading_retries == 0)
setFailedNonRetriable();
else
setFailedRetriable();
}
catch (...)
{
auto full_exception = fmt::format(
"First exception: {}, exception while setting file as failed: {}",
exception_message, getCurrentExceptionMessage(true));
file_status->onFailed(full_exception);
throw;
}
}
processing_id.reset(); processing_id.reset();
processing_id_version.reset(); processing_id_version.reset();
@ -296,19 +337,20 @@ void S3QueueIFileMetadata::setFailedRetriable()
auto zk_client = getZooKeeper(); auto zk_client = getZooKeeper();
/// Extract the number of already done retries from node_hash.retriable node if it exists. /// Extract the number of already done retries from node_hash.retriable node if it exists.
Coordination::Requests requests;
Coordination::Stat stat; Coordination::Stat stat;
std::string res; std::string res;
if (zk_client->tryGet(retrieable_failed_node_path, res, &stat)) bool has_failed_before = zk_client->tryGet(retrieable_failed_node_path, res, &stat);
if (has_failed_before)
{ {
auto failed_node_metadata = NodeMetadata::fromString(res); auto failed_node_metadata = NodeMetadata::fromString(res);
node_metadata.retries = failed_node_metadata.retries + 1; node_metadata.retries = failed_node_metadata.retries + 1;
file_status->retries = node_metadata.retries; file_status->retries = node_metadata.retries;
} }
LOG_TRACE(log, "File `{}` failed to process, try {}/{}", LOG_TRACE(log, "File `{}` failed to process, try {}/{}, retries node exists: {} (failed node path: {})",
path, node_metadata.retries, max_loading_retries); path, node_metadata.retries, max_loading_retries, has_failed_before, failed_node_path);
Coordination::Requests requests;
if (node_metadata.retries >= max_loading_retries) if (node_metadata.retries >= max_loading_retries)
{ {
/// File is no longer retriable. /// File is no longer retriable.

View File

@ -19,6 +19,7 @@ public:
None None
}; };
void setProcessingEndTime();
void onProcessing(); void onProcessing();
void onProcessed(); void onProcessed();
void onFailed(const std::string & exception); void onFailed(const std::string & exception);
@ -54,13 +55,15 @@ public:
bool setProcessing(); bool setProcessing();
void setProcessed(); void setProcessed();
void setFailed(const std::string & exception); void setFailed(const std::string & exception_message, bool reduce_retry_count, bool overwrite_status);
virtual void setProcessedAtStartRequests( virtual void setProcessedAtStartRequests(
Coordination::Requests & requests, Coordination::Requests & requests,
const zkutil::ZooKeeperPtr & zk_client) = 0; const zkutil::ZooKeeperPtr & zk_client) = 0;
FileStatusPtr getFileStatus() { return file_status; } FileStatusPtr getFileStatus() { return file_status; }
const std::string & getPath() const { return path; }
size_t getMaxTries() const { return max_loading_retries; }
struct NodeMetadata struct NodeMetadata
{ {

View File

@ -133,6 +133,9 @@ S3QueueMetadata::S3QueueMetadata(const fs::path & zookeeper_path_, const S3Queue
generateRescheduleInterval( generateRescheduleInterval(
settings.s3queue_cleanup_interval_min_ms, settings.s3queue_cleanup_interval_max_ms)); settings.s3queue_cleanup_interval_min_ms, settings.s3queue_cleanup_interval_max_ms));
} }
LOG_TRACE(log, "Mode: {}, buckets: {}, processing threads: {}, result buckets num: {}",
settings.mode.toString(), settings.s3queue_buckets, settings.s3queue_processing_threads_num, buckets_num);
} }
S3QueueMetadata::~S3QueueMetadata() S3QueueMetadata::~S3QueueMetadata()
@ -219,7 +222,7 @@ S3QueueMetadata::Bucket S3QueueMetadata::getBucketForPath(const std::string & pa
S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::BucketHolderPtr
S3QueueMetadata::tryAcquireBucket(const Bucket & bucket, const Processor & processor) S3QueueMetadata::tryAcquireBucket(const Bucket & bucket, const Processor & processor)
{ {
return S3QueueOrderedFileMetadata::tryAcquireBucket(zookeeper_path, bucket, processor); return S3QueueOrderedFileMetadata::tryAcquireBucket(zookeeper_path, bucket, processor, log);
} }
void S3QueueMetadata::initialize( void S3QueueMetadata::initialize(

View File

@ -45,13 +45,15 @@ S3QueueOrderedFileMetadata::BucketHolder::BucketHolder(
int bucket_version_, int bucket_version_,
const std::string & bucket_lock_path_, const std::string & bucket_lock_path_,
const std::string & bucket_lock_id_path_, const std::string & bucket_lock_id_path_,
zkutil::ZooKeeperPtr zk_client_) zkutil::ZooKeeperPtr zk_client_,
LoggerPtr log_)
: bucket_info(std::make_shared<BucketInfo>(BucketInfo{ : bucket_info(std::make_shared<BucketInfo>(BucketInfo{
.bucket = bucket_, .bucket = bucket_,
.bucket_version = bucket_version_, .bucket_version = bucket_version_,
.bucket_lock_path = bucket_lock_path_, .bucket_lock_path = bucket_lock_path_,
.bucket_lock_id_path = bucket_lock_id_path_})) .bucket_lock_id_path = bucket_lock_id_path_}))
, zk_client(zk_client_) , zk_client(zk_client_)
, log(log_)
{ {
} }
@ -61,7 +63,9 @@ void S3QueueOrderedFileMetadata::BucketHolder::release()
return; return;
released = true; released = true;
LOG_TEST(getLogger("S3QueueBucketHolder"), "Releasing bucket {}", bucket_info->bucket);
LOG_TEST(log, "Releasing bucket {}, version {}",
bucket_info->bucket, bucket_info->bucket_version);
Coordination::Requests requests; Coordination::Requests requests;
/// Check that bucket lock version has not changed /// Check that bucket lock version has not changed
@ -72,11 +76,24 @@ void S3QueueOrderedFileMetadata::BucketHolder::release()
Coordination::Responses responses; Coordination::Responses responses;
const auto code = zk_client->tryMulti(requests, responses); const auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
LOG_TEST(log, "Released bucket {}, version {}",
bucket_info->bucket, bucket_info->bucket_version);
else
LOG_TRACE(log,
"Failed to release bucket {}, version {}: {}. "
"This is normal if keeper session expired.",
bucket_info->bucket, bucket_info->bucket_version, code);
zkutil::KeeperMultiException::check(code, requests, responses); zkutil::KeeperMultiException::check(code, requests, responses);
} }
S3QueueOrderedFileMetadata::BucketHolder::~BucketHolder() S3QueueOrderedFileMetadata::BucketHolder::~BucketHolder()
{ {
if (!released)
LOG_TEST(log, "Releasing bucket ({}) holder in destructor", bucket_info->bucket);
try try
{ {
release(); release();
@ -154,7 +171,8 @@ S3QueueOrderedFileMetadata::Bucket S3QueueOrderedFileMetadata::getBucketForPath(
S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcquireBucket( S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcquireBucket(
const std::filesystem::path & zk_path, const std::filesystem::path & zk_path,
const Bucket & bucket, const Bucket & bucket,
const Processor & processor) const Processor & processor,
LoggerPtr log_)
{ {
const auto zk_client = getZooKeeper(); const auto zk_client = getZooKeeper();
const auto bucket_lock_path = zk_path / "buckets" / toString(bucket) / "lock"; const auto bucket_lock_path = zk_path / "buckets" / toString(bucket) / "lock";
@ -183,7 +201,7 @@ S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcqui
const auto bucket_lock_version = set_response->stat.version; const auto bucket_lock_version = set_response->stat.version;
LOG_TEST( LOG_TEST(
getLogger("S3QueueOrderedFileMetadata"), log_,
"Processor {} acquired bucket {} for processing (bucket lock version: {})", "Processor {} acquired bucket {} for processing (bucket lock version: {})",
processor, bucket, bucket_lock_version); processor, bucket, bucket_lock_version);
@ -192,7 +210,8 @@ S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcqui
bucket_lock_version, bucket_lock_version,
bucket_lock_path, bucket_lock_path,
bucket_lock_id_path, bucket_lock_id_path,
zk_client); zk_client,
log_);
} }
if (code == Coordination::Error::ZNODEEXISTS) if (code == Coordination::Error::ZNODEEXISTS)
@ -384,8 +403,11 @@ void S3QueueOrderedFileMetadata::setProcessedImpl()
auto code = zk_client->tryMulti(requests, responses); auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK) if (code == Coordination::Error::ZOK)
{ {
if (max_loading_retries) if (max_loading_retries
zk_client->tryRemove(failed_node_path + ".retriable", -1); && zk_client->tryRemove(failed_node_path + ".retriable", -1) == Coordination::Error::ZOK)
{
LOG_TEST(log, "Removed node {}.retriable", failed_node_path);
}
return; return;
} }

View File

@ -36,7 +36,8 @@ public:
static BucketHolderPtr tryAcquireBucket( static BucketHolderPtr tryAcquireBucket(
const std::filesystem::path & zk_path, const std::filesystem::path & zk_path,
const Bucket & bucket, const Bucket & bucket,
const Processor & processor); const Processor & processor,
LoggerPtr log_);
static S3QueueOrderedFileMetadata::Bucket getBucketForPath(const std::string & path, size_t buckets_num); static S3QueueOrderedFileMetadata::Bucket getBucketForPath(const std::string & path, size_t buckets_num);
@ -72,26 +73,32 @@ private:
bool ignore_if_exists); bool ignore_if_exists);
}; };
struct S3QueueOrderedFileMetadata::BucketHolder struct S3QueueOrderedFileMetadata::BucketHolder : private boost::noncopyable
{ {
BucketHolder( BucketHolder(
const Bucket & bucket_, const Bucket & bucket_,
int bucket_version_, int bucket_version_,
const std::string & bucket_lock_path_, const std::string & bucket_lock_path_,
const std::string & bucket_lock_id_path_, const std::string & bucket_lock_id_path_,
zkutil::ZooKeeperPtr zk_client_); zkutil::ZooKeeperPtr zk_client_,
LoggerPtr log_);
~BucketHolder(); ~BucketHolder();
Bucket getBucket() const { return bucket_info->bucket; } Bucket getBucket() const { return bucket_info->bucket; }
BucketInfoPtr getBucketInfo() const { return bucket_info; } BucketInfoPtr getBucketInfo() const { return bucket_info; }
void setFinished() { finished = true; }
bool isFinished() const { return finished; }
void release(); void release();
private: private:
BucketInfoPtr bucket_info; BucketInfoPtr bucket_info;
const zkutil::ZooKeeperPtr zk_client; const zkutil::ZooKeeperPtr zk_client;
bool released = false; bool released = false;
bool finished = false;
LoggerPtr log;
}; };
} }

View File

@ -19,7 +19,7 @@ class ASTStorage;
0) \ 0) \
M(S3QueueAction, after_processing, S3QueueAction::KEEP, "Delete or keep file in S3 after successful processing", 0) \ M(S3QueueAction, after_processing, S3QueueAction::KEEP, "Delete or keep file in S3 after successful processing", 0) \
M(String, keeper_path, "", "Zookeeper node path", 0) \ M(String, keeper_path, "", "Zookeeper node path", 0) \
M(UInt32, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \ M(UInt32, s3queue_loading_retries, 10, "Retry loading up to specified number of times", 0) \
M(UInt32, s3queue_processing_threads_num, 1, "Number of processing threads", 0) \ M(UInt32, s3queue_processing_threads_num, 1, "Number of processing threads", 0) \
M(UInt32, s3queue_enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \ M(UInt32, s3queue_enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \
M(String, s3queue_last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \ M(String, s3queue_last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \
@ -31,6 +31,10 @@ class ASTStorage;
M(UInt32, s3queue_cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \ M(UInt32, s3queue_cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
M(UInt32, s3queue_cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \ M(UInt32, s3queue_cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
M(UInt32, s3queue_buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \ M(UInt32, s3queue_buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \
M(UInt32, s3queue_max_processed_files_before_commit, 100, "Number of files which can be processed before being committed to keeper", 0) \
M(UInt32, s3queue_max_processed_rows_before_commit, 0, "Number of rows which can be processed before being committed to keeper", 0) \
M(UInt32, s3queue_max_processed_bytes_before_commit, 0, "Number of bytes which can be processed before being committed to keeper", 0) \
M(UInt32, s3queue_max_processing_time_sec_before_commit, 0, "Timeout in seconds after which to commit files committed to keeper", 0) \
#define LIST_OF_S3QUEUE_SETTINGS(M, ALIAS) \ #define LIST_OF_S3QUEUE_SETTINGS(M, ALIAS) \
S3QUEUE_RELATED_SETTINGS(M, ALIAS) \ S3QUEUE_RELATED_SETTINGS(M, ALIAS) \

View File

@ -32,16 +32,16 @@ namespace ErrorCodes
} }
StorageS3QueueSource::S3QueueObjectInfo::S3QueueObjectInfo( StorageS3QueueSource::S3QueueObjectInfo::S3QueueObjectInfo(
const ObjectInfo & object_info, const Source::ObjectInfo & object_info,
Metadata::FileMetadataPtr processing_holder_) S3QueueMetadata::FileMetadataPtr file_metadata_)
: ObjectInfo(object_info.relative_path, object_info.metadata) : Source::ObjectInfo(object_info.relative_path, object_info.metadata)
, processing_holder(processing_holder_) , file_metadata(file_metadata_)
{ {
} }
StorageS3QueueSource::FileIterator::FileIterator( StorageS3QueueSource::FileIterator::FileIterator(
std::shared_ptr<S3QueueMetadata> metadata_, std::shared_ptr<S3QueueMetadata> metadata_,
std::unique_ptr<GlobIterator> glob_iterator_, std::unique_ptr<Source::GlobIterator> glob_iterator_,
std::atomic<bool> & shutdown_called_, std::atomic<bool> & shutdown_called_,
LoggerPtr logger_) LoggerPtr logger_)
: StorageObjectStorageSource::IIterator("S3QueueIterator") : StorageObjectStorageSource::IIterator("S3QueueIterator")
@ -52,25 +52,52 @@ StorageS3QueueSource::FileIterator::FileIterator(
{ {
} }
bool StorageS3QueueSource::FileIterator::isFinished() const
{
LOG_TEST(log, "Iterator finished: {}, objects to retry: {}", iterator_finished, objects_to_retry.size());
return iterator_finished
&& std::all_of(listed_keys_cache.begin(), listed_keys_cache.end(), [](const auto & v) { return v.second.keys.empty(); })
&& objects_to_retry.empty();
}
size_t StorageS3QueueSource::FileIterator::estimatedKeysCount() size_t StorageS3QueueSource::FileIterator::estimatedKeysCount()
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method estimateKeysCount is not implemented"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method estimateKeysCount is not implemented");
} }
StorageS3QueueSource::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl(size_t processor) StorageS3QueueSource::Source::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl(size_t processor)
{ {
ObjectInfoPtr object_info; Source::ObjectInfoPtr object_info;
S3QueueOrderedFileMetadata::BucketInfoPtr bucket_info; S3QueueOrderedFileMetadata::BucketInfoPtr bucket_info;
while (!shutdown_called) while (!shutdown_called)
{ {
if (metadata->useBucketsForProcessing()) if (metadata->useBucketsForProcessing())
{
std::lock_guard lock(mutex);
std::tie(object_info, bucket_info) = getNextKeyFromAcquiredBucket(processor); std::tie(object_info, bucket_info) = getNextKeyFromAcquiredBucket(processor);
}
else else
object_info = glob_iterator->next(processor); {
std::lock_guard lock(mutex);
if (objects_to_retry.empty())
{
object_info = glob_iterator->next(processor);
if (!object_info)
iterator_finished = true;
}
else
{
object_info = objects_to_retry.front();
objects_to_retry.pop_front();
}
}
if (!object_info) if (!object_info)
{
LOG_TEST(log, "No object left");
return {}; return {};
}
if (shutdown_called) if (shutdown_called)
{ {
@ -85,19 +112,64 @@ StorageS3QueueSource::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl
return {}; return {};
} }
std::pair<StorageS3QueueSource::ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr> void StorageS3QueueSource::FileIterator::returnForRetry(Source::ObjectInfoPtr object_info)
{
chassert(object_info);
if (metadata->useBucketsForProcessing())
{
const auto bucket = metadata->getBucketForPath(object_info->relative_path);
listed_keys_cache[bucket].keys.emplace_front(object_info);
}
else
{
objects_to_retry.push_back(object_info);
}
}
void StorageS3QueueSource::FileIterator::releaseFinishedBuckets()
{
for (const auto & [processor, holders] : bucket_holders)
{
LOG_TEST(log, "Releasing {} bucket holders for processor {}", holders.size(), processor);
for (auto it = holders.begin(); it != holders.end(); ++it)
{
const auto & holder = *it;
const auto bucket = holder->getBucketInfo()->bucket;
if (!holder->isFinished())
{
/// Only the last holder in the list of holders can be non-finished.
chassert(std::next(it) == holders.end());
/// Do not release non-finished bucket holder. We will continue processing it.
LOG_TEST(log, "Bucket {} is not finished yet, will not release it", bucket);
break;
}
/// Release bucket lock.
holder->release();
/// Reset bucket processor in cached state.
auto cached_info = listed_keys_cache.find(bucket);
if (cached_info != listed_keys_cache.end())
cached_info->second.processor.reset();
}
}
}
std::pair<StorageS3QueueSource::Source::ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr>
StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processor) StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processor)
{ {
/// We need this lock to maintain consistency between listing s3 directory auto bucket_holder_it = bucket_holders.emplace(processor, std::vector<BucketHolderPtr>{}).first;
/// and getting/putting result into listed_keys_cache. BucketHolder * current_bucket_holder = bucket_holder_it->second.empty() || bucket_holder_it->second.back()->isFinished()
std::lock_guard lock(buckets_mutex); ? nullptr
: bucket_holder_it->second.back().get();
auto bucket_holder_it = bucket_holders.emplace(processor, nullptr).first;
auto current_processor = toString(processor); auto current_processor = toString(processor);
LOG_TEST( LOG_TEST(
log, "Current processor: {}, acquired bucket: {}", log, "Current processor: {}, acquired bucket: {}",
processor, bucket_holder_it->second ? toString(bucket_holder_it->second->getBucket()) : "None"); processor, current_bucket_holder ? toString(current_bucket_holder->getBucket()) : "None");
while (true) while (true)
{ {
@ -106,9 +178,9 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
/// In case it is already acquired, they put the key into listed_keys_cache, /// In case it is already acquired, they put the key into listed_keys_cache,
/// so that the thread who acquired the bucket will be able to see /// so that the thread who acquired the bucket will be able to see
/// those keys without the need to list s3 directory once again. /// those keys without the need to list s3 directory once again.
if (bucket_holder_it->second) if (current_bucket_holder)
{ {
const auto bucket = bucket_holder_it->second->getBucket(); const auto bucket = current_bucket_holder->getBucket();
auto it = listed_keys_cache.find(bucket); auto it = listed_keys_cache.find(bucket);
if (it != listed_keys_cache.end()) if (it != listed_keys_cache.end())
{ {
@ -141,7 +213,7 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
LOG_TEST(log, "Current bucket: {}, will process file: {}", LOG_TEST(log, "Current bucket: {}, will process file: {}",
bucket, object_info->getFileName()); bucket, object_info->getFileName());
return std::pair{object_info, bucket_holder_it->second->getBucketInfo()}; return std::pair{object_info, current_bucket_holder->getBucketInfo()};
} }
LOG_TEST(log, "Cache of bucket {} is empty", bucket); LOG_TEST(log, "Cache of bucket {} is empty", bucket);
@ -156,9 +228,9 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
if (iterator_finished) if (iterator_finished)
{ {
/// Bucket is fully processed - release the bucket. /// Bucket is fully processed, but we will release it later
bucket_holder_it->second->release(); /// - once we write and commit files via commit() method.
bucket_holder_it->second.reset(); current_bucket_holder->setFinished();
} }
} }
/// If processing thread has already acquired some bucket /// If processing thread has already acquired some bucket
@ -167,8 +239,10 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
/// because one processing thread can acquire only one bucket at a time. /// because one processing thread can acquire only one bucket at a time.
/// Once a thread is finished with its acquired bucket, it checks listed_keys_cache /// Once a thread is finished with its acquired bucket, it checks listed_keys_cache
/// to see if there are keys from buckets not acquired by anyone. /// to see if there are keys from buckets not acquired by anyone.
if (!bucket_holder_it->second) if (!current_bucket_holder)
{ {
LOG_TEST(log, "Checking caches keys: {}", listed_keys_cache.size());
for (auto it = listed_keys_cache.begin(); it != listed_keys_cache.end();) for (auto it = listed_keys_cache.begin(); it != listed_keys_cache.end();)
{ {
auto & [bucket, bucket_info] = *it; auto & [bucket, bucket_info] = *it;
@ -193,8 +267,8 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
continue; continue;
} }
bucket_holder_it->second = metadata->tryAcquireBucket(bucket, current_processor); auto acquired_bucket = metadata->tryAcquireBucket(bucket, current_processor);
if (!bucket_holder_it->second) if (!acquired_bucket)
{ {
LOG_TEST(log, "Bucket {} is already locked for processing (keys: {})", LOG_TEST(log, "Bucket {} is already locked for processing (keys: {})",
bucket, bucket_keys.size()); bucket, bucket_keys.size());
@ -202,6 +276,9 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
continue; continue;
} }
bucket_holder_it->second.push_back(acquired_bucket);
current_bucket_holder = bucket_holder_it->second.back().get();
bucket_processor = current_processor; bucket_processor = current_processor;
/// Take the key from the front, the order is important. /// Take the key from the front, the order is important.
@ -211,7 +288,7 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
LOG_TEST(log, "Acquired bucket: {}, will process file: {}", LOG_TEST(log, "Acquired bucket: {}, will process file: {}",
bucket, object_info->getFileName()); bucket, object_info->getFileName());
return std::pair{object_info, bucket_holder_it->second->getBucketInfo()}; return std::pair{object_info, current_bucket_holder->getBucketInfo()};
} }
} }
@ -229,12 +306,12 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
LOG_TEST(log, "Found next file: {}, bucket: {}, current bucket: {}, cached_keys: {}", LOG_TEST(log, "Found next file: {}, bucket: {}, current bucket: {}, cached_keys: {}",
object_info->getFileName(), bucket, object_info->getFileName(), bucket,
bucket_holder_it->second ? toString(bucket_holder_it->second->getBucket()) : "None", current_bucket_holder ? toString(current_bucket_holder->getBucket()) : "None",
bucket_cache.keys.size()); bucket_cache.keys.size());
if (bucket_holder_it->second) if (current_bucket_holder)
{ {
if (bucket_holder_it->second->getBucket() != bucket) if (current_bucket_holder->getBucket() != bucket)
{ {
/// Acquired bucket differs from object's bucket, /// Acquired bucket differs from object's bucket,
/// put it into bucket's cache and continue. /// put it into bucket's cache and continue.
@ -242,13 +319,16 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
continue; continue;
} }
/// Bucket is already acquired, process the file. /// Bucket is already acquired, process the file.
return std::pair{object_info, bucket_holder_it->second->getBucketInfo()}; return std::pair{object_info, current_bucket_holder->getBucketInfo()};
} }
else else
{ {
bucket_holder_it->second = metadata->tryAcquireBucket(bucket, current_processor); auto acquired_bucket = metadata->tryAcquireBucket(bucket, current_processor);
if (bucket_holder_it->second) if (acquired_bucket)
{ {
bucket_holder_it->second.push_back(acquired_bucket);
current_bucket_holder = bucket_holder_it->second.back().get();
bucket_cache.processor = current_processor; bucket_cache.processor = current_processor;
if (!bucket_cache.keys.empty()) if (!bucket_cache.keys.empty())
{ {
@ -258,7 +338,7 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
object_info = bucket_cache.keys.front(); object_info = bucket_cache.keys.front();
bucket_cache.keys.pop_front(); bucket_cache.keys.pop_front();
} }
return std::pair{object_info, bucket_holder_it->second->getBucketInfo()}; return std::pair{object_info, current_bucket_holder->getBucketInfo()};
} }
else else
{ {
@ -270,12 +350,6 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
} }
else else
{ {
if (bucket_holder_it->second)
{
bucket_holder_it->second->release();
bucket_holder_it->second.reset();
}
LOG_TEST(log, "Reached the end of file iterator"); LOG_TEST(log, "Reached the end of file iterator");
iterator_finished = true; iterator_finished = true;
@ -301,7 +375,12 @@ StorageS3QueueSource::StorageS3QueueSource(
const std::atomic<bool> & table_is_being_dropped_, const std::atomic<bool> & table_is_being_dropped_,
std::shared_ptr<S3QueueLog> s3_queue_log_, std::shared_ptr<S3QueueLog> s3_queue_log_,
const StorageID & storage_id_, const StorageID & storage_id_,
LoggerPtr log_) LoggerPtr log_,
size_t max_processed_files_before_commit_,
size_t max_processed_rows_before_commit_,
size_t max_processed_bytes_before_commit_,
size_t max_processing_time_sec_before_commit_,
bool commit_once_processed_)
: ISource(header_) : ISource(header_)
, WithContext(context_) , WithContext(context_)
, name(std::move(name_)) , name(std::move(name_))
@ -314,6 +393,11 @@ StorageS3QueueSource::StorageS3QueueSource(
, table_is_being_dropped(table_is_being_dropped_) , table_is_being_dropped(table_is_being_dropped_)
, s3_queue_log(s3_queue_log_) , s3_queue_log(s3_queue_log_)
, storage_id(storage_id_) , storage_id(storage_id_)
, max_processed_files_before_commit(max_processed_files_before_commit_)
, max_processed_rows_before_commit(max_processed_rows_before_commit_)
, max_processed_bytes_before_commit(max_processed_bytes_before_commit_)
, max_processing_time_sec_before_commit(max_processing_time_sec_before_commit_)
, commit_once_processed(commit_once_processed_)
, remove_file_func(remove_file_func_) , remove_file_func(remove_file_func_)
, log(log_) , log(log_)
{ {
@ -329,24 +413,52 @@ void StorageS3QueueSource::lazyInitialize(size_t processor)
if (initialized) if (initialized)
return; return;
LOG_TEST(log, "Initializing a new reader");
internal_source->lazyInitialize(processor); internal_source->lazyInitialize(processor);
reader = std::move(internal_source->reader); reader = std::move(internal_source->reader);
if (reader) if (reader)
reader_future = std::move(internal_source->reader_future); reader_future = std::move(internal_source->reader_future);
initialized = true; initialized = true;
} }
Chunk StorageS3QueueSource::generate() Chunk StorageS3QueueSource::generate()
{
Chunk chunk;
try
{
chunk = generateImpl();
}
catch (...)
{
if (commit_once_processed)
commit(false, getCurrentExceptionMessage(true));
throw;
}
if (!chunk && commit_once_processed)
{
commit(true);
}
return chunk;
}
Chunk StorageS3QueueSource::generateImpl()
{ {
lazyInitialize(processor_id); lazyInitialize(processor_id);
while (true) while (true)
{ {
if (!reader) if (!reader)
{
LOG_TEST(log, "No reader");
break; break;
}
const auto * object_info = dynamic_cast<const S3QueueObjectInfo *>(&reader.getObjectInfo()); const auto * object_info = dynamic_cast<const S3QueueObjectInfo *>(reader.getObjectInfo().get());
auto file_metadata = object_info->processing_holder; auto file_metadata = object_info->file_metadata;
auto file_status = file_metadata->getFileStatus(); auto file_status = file_metadata->getFileStatus();
if (isCancelled()) if (isCancelled())
@ -357,7 +469,7 @@ Chunk StorageS3QueueSource::generate()
{ {
try try
{ {
file_metadata->setFailed("Cancelled"); file_metadata->setFailed("Cancelled", /* reduce_retry_count */true, /* overwrite_status */false);
} }
catch (...) catch (...)
{ {
@ -365,16 +477,19 @@ Chunk StorageS3QueueSource::generate()
object_info->relative_path, getCurrentExceptionMessage(true)); object_info->relative_path, getCurrentExceptionMessage(true));
} }
appendLogElement(reader.getObjectInfo().getPath(), *file_status, processed_rows_from_file, false); appendLogElement(reader.getObjectInfo()->getPath(), *file_status, processed_rows_from_file, false);
} }
LOG_TEST(log, "Query is cancelled");
break; break;
} }
const auto & path = reader.getObjectInfo().getPath(); const auto & path = reader.getObjectInfo()->getPath();
if (shutdown_called) if (shutdown_called)
{ {
LOG_TEST(log, "Shutdown called");
if (processed_rows_from_file == 0) if (processed_rows_from_file == 0)
break; break;
@ -386,7 +501,7 @@ Chunk StorageS3QueueSource::generate()
try try
{ {
file_metadata->setFailed("Table is dropped"); file_metadata->setFailed("Table is dropped", /* reduce_retry_count */true, /* overwrite_status */false);
} }
catch (...) catch (...)
{ {
@ -420,15 +535,16 @@ Chunk StorageS3QueueSource::generate()
file_status->processed_rows += chunk.getNumRows(); file_status->processed_rows += chunk.getNumRows();
processed_rows_from_file += chunk.getNumRows(); processed_rows_from_file += chunk.getNumRows();
total_processed_rows += chunk.getNumRows();
total_processed_bytes += chunk.bytes();
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk( VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
chunk, requested_virtual_columns, chunk, requested_virtual_columns,
{ {
.path = path, .path = path,
.size = reader.getObjectInfo().metadata->size_bytes .size = reader.getObjectInfo()->metadata->size_bytes
}); });
return chunk; return chunk;
} }
} }
@ -437,22 +553,84 @@ Chunk StorageS3QueueSource::generate()
const auto message = getCurrentExceptionMessage(true); const auto message = getCurrentExceptionMessage(true);
LOG_ERROR(log, "Got an error while pulling chunk. Will set file {} as failed. Error: {} ", path, message); LOG_ERROR(log, "Got an error while pulling chunk. Will set file {} as failed. Error: {} ", path, message);
file_metadata->setFailed(message); failed_during_read_files.push_back(file_metadata);
file_status->onFailed(getCurrentExceptionMessage(true));
appendLogElement(path, *file_status, processed_rows_from_file, false); appendLogElement(path, *file_status, processed_rows_from_file, false);
if (processed_rows_from_file == 0)
{
auto * file_iterator = dynamic_cast<FileIterator *>(internal_source->file_iterator.get());
chassert(file_iterator);
if (file_status->retries < file_metadata->getMaxTries())
file_iterator->returnForRetry(reader.getObjectInfo());
/// If we did not process any rows from the failed file,
/// commit all previously processed files,
/// not to lose the work already done.
return {};
}
throw; throw;
} }
file_metadata->setProcessed();
applyActionAfterProcessing(reader.getObjectInfo().relative_path);
appendLogElement(path, *file_status, processed_rows_from_file, true); appendLogElement(path, *file_status, processed_rows_from_file, true);
file_status->setProcessingEndTime();
file_status.reset(); file_status.reset();
processed_rows_from_file = 0; processed_rows_from_file = 0;
processed_files.push_back(file_metadata);
if (processed_files.size() == max_processed_files_before_commit)
{
LOG_TRACE(log, "Number of max processed files before commit reached "
"(rows: {}, bytes: {}, files: {})",
total_processed_rows, total_processed_bytes, processed_files.size());
break;
}
bool rows_or_bytes_or_time_limit_reached = false;
if (max_processed_rows_before_commit
&& total_processed_rows == max_processed_rows_before_commit)
{
LOG_TRACE(log, "Number of max processed rows before commit reached "
"(rows: {}, bytes: {}, files: {})",
total_processed_rows, total_processed_bytes, processed_files.size());
rows_or_bytes_or_time_limit_reached = true;
}
else if (max_processed_bytes_before_commit
&& total_processed_bytes == max_processed_bytes_before_commit)
{
LOG_TRACE(log, "Number of max processed bytes before commit reached "
"(rows: {}, bytes: {}, files: {})",
total_processed_rows, total_processed_bytes, processed_files.size());
rows_or_bytes_or_time_limit_reached = true;
}
else if (max_processing_time_sec_before_commit
&& total_stopwatch.elapsedSeconds() >= max_processing_time_sec_before_commit)
{
LOG_TRACE(log, "Max processing time before commit reached "
"(rows: {}, bytes: {}, files: {})",
total_processed_rows, total_processed_bytes, processed_files.size());
rows_or_bytes_or_time_limit_reached = true;
}
if (rows_or_bytes_or_time_limit_reached)
{
if (!reader_future.valid())
break;
LOG_TRACE(log, "Rows or bytes limit reached, but we have one more file scheduled already, "
"will process it despite the limit");
}
if (shutdown_called) if (shutdown_called)
{ {
LOG_INFO(log, "Shutdown was called, stopping sync"); LOG_TRACE(log, "Shutdown was called, stopping sync");
break; break;
} }
@ -460,19 +638,55 @@ Chunk StorageS3QueueSource::generate()
reader = reader_future.get(); reader = reader_future.get();
if (!reader) if (!reader)
{
LOG_TEST(log, "Reader finished");
break; break;
}
file_status = files_metadata->getFileStatus(reader.getObjectInfo().getPath()); file_status = files_metadata->getFileStatus(reader.getObjectInfo()->getPath());
/// Even if task is finished the thread may be not freed in pool. if (!rows_or_bytes_or_time_limit_reached && processed_files.size() + 1 < max_processed_files_before_commit)
/// So wait until it will be freed before scheduling a new task. {
internal_source->create_reader_pool->wait(); /// Even if task is finished the thread may be not freed in pool.
reader_future = internal_source->createReaderAsync(processor_id); /// So wait until it will be freed before scheduling a new task.
internal_source->create_reader_pool->wait();
reader_future = internal_source->createReaderAsync(processor_id);
}
} }
return {}; return {};
} }
void StorageS3QueueSource::commit(bool success, const std::string & exception_message)
{
LOG_TEST(log, "Having {} files to set as {}, failed files: {}",
processed_files.size(), success ? "Processed" : "Failed", failed_during_read_files.size());
for (const auto & file_metadata : processed_files)
{
if (success)
{
file_metadata->setProcessed();
applyActionAfterProcessing(file_metadata->getPath());
}
else
file_metadata->setFailed(
exception_message,
/* reduce_retry_count */false,
/* overwrite_status */true);
}
for (const auto & file_metadata : failed_during_read_files)
{
/// `exception` from commit args is from insertion to storage.
/// Here we do not used it as failed_during_read_files were not inserted into storage, but skipped.
file_metadata->setFailed(
file_metadata->getFileStatus()->getException(),
/* reduce_retry_count */true,
/* overwrite_status */false);
}
}
void StorageS3QueueSource::applyActionAfterProcessing(const String & path) void StorageS3QueueSource::applyActionAfterProcessing(const String & path)
{ {
switch (action) switch (action)

View File

@ -20,24 +20,18 @@ class StorageS3QueueSource : public ISource, WithContext
{ {
public: public:
using Storage = StorageObjectStorage; using Storage = StorageObjectStorage;
using ConfigurationPtr = Storage::ConfigurationPtr; using Source = StorageObjectStorageSource;
using GlobIterator = StorageObjectStorageSource::GlobIterator;
using ZooKeeperGetter = std::function<zkutil::ZooKeeperPtr()>;
using RemoveFileFunc = std::function<void(std::string)>; using RemoveFileFunc = std::function<void(std::string)>;
using FileStatusPtr = S3QueueMetadata::FileStatusPtr; using BucketHolderPtr = S3QueueOrderedFileMetadata::BucketHolderPtr;
using ReaderHolder = StorageObjectStorageSource::ReaderHolder; using BucketHolder = S3QueueOrderedFileMetadata::BucketHolder;
using Metadata = S3QueueMetadata;
using ObjectInfo = StorageObjectStorageSource::ObjectInfo;
using ObjectInfoPtr = std::shared_ptr<ObjectInfo>;
using ObjectInfos = std::vector<ObjectInfoPtr>;
struct S3QueueObjectInfo : public ObjectInfo struct S3QueueObjectInfo : public Source::ObjectInfo
{ {
S3QueueObjectInfo( S3QueueObjectInfo(
const ObjectInfo & object_info, const Source::ObjectInfo & object_info,
Metadata::FileMetadataPtr processing_holder_); S3QueueMetadata::FileMetadataPtr file_metadata_);
Metadata::FileMetadataPtr processing_holder; S3QueueMetadata::FileMetadataPtr file_metadata;
}; };
class FileIterator : public StorageObjectStorageSource::IIterator class FileIterator : public StorageObjectStorageSource::IIterator
@ -45,39 +39,59 @@ public:
public: public:
FileIterator( FileIterator(
std::shared_ptr<S3QueueMetadata> metadata_, std::shared_ptr<S3QueueMetadata> metadata_,
std::unique_ptr<GlobIterator> glob_iterator_, std::unique_ptr<Source::GlobIterator> glob_iterator_,
std::atomic<bool> & shutdown_called_, std::atomic<bool> & shutdown_called_,
LoggerPtr logger_); LoggerPtr logger_);
bool isFinished() const;
/// Note: /// Note:
/// List results in s3 are always returned in UTF-8 binary order. /// List results in s3 are always returned in UTF-8 binary order.
/// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html) /// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html)
ObjectInfoPtr nextImpl(size_t processor) override; Source::ObjectInfoPtr nextImpl(size_t processor) override;
size_t estimatedKeysCount() override; size_t estimatedKeysCount() override;
/// If the key was taken from iterator via next() call,
/// we might later want to return it back for retrying.
void returnForRetry(Source::ObjectInfoPtr object_info);
/// Release hold buckets.
/// In fact, they could be released in destructors of BucketHolder,
/// but we anyway try to release them explicitly,
/// because we want to be able to rethrow exceptions if they might happen.
void releaseFinishedBuckets();
private: private:
using Bucket = S3QueueMetadata::Bucket; using Bucket = S3QueueMetadata::Bucket;
using Processor = S3QueueMetadata::Processor; using Processor = S3QueueMetadata::Processor;
const std::shared_ptr<S3QueueMetadata> metadata; const std::shared_ptr<S3QueueMetadata> metadata;
const std::unique_ptr<GlobIterator> glob_iterator; const std::unique_ptr<Source::GlobIterator> glob_iterator;
std::atomic<bool> & shutdown_called; std::atomic<bool> & shutdown_called;
std::mutex mutex; std::mutex mutex;
LoggerPtr log; LoggerPtr log;
std::mutex buckets_mutex;
struct ListedKeys struct ListedKeys
{ {
std::deque<ObjectInfoPtr> keys; std::deque<Source::ObjectInfoPtr> keys;
std::optional<Processor> processor; std::optional<Processor> processor;
}; };
/// A cache of keys which were iterated via glob_iterator, but not taken for processing.
std::unordered_map<Bucket, ListedKeys> listed_keys_cache; std::unordered_map<Bucket, ListedKeys> listed_keys_cache;
bool iterator_finished = false;
std::unordered_map<size_t, S3QueueOrderedFileMetadata::BucketHolderPtr> bucket_holders;
std::pair<ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr> getNextKeyFromAcquiredBucket(size_t processor); /// We store a vector of holders, because we cannot release them until processed files are committed.
std::unordered_map<size_t, std::vector<BucketHolderPtr>> bucket_holders;
/// Is glob_iterator finished?
std::atomic_bool iterator_finished = false;
/// Only for processing without buckets.
std::deque<Source::ObjectInfoPtr> objects_to_retry;
std::pair<Source::ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr> getNextKeyFromAcquiredBucket(size_t processor);
bool hasKeysForProcessor(const Processor & processor) const;
}; };
StorageS3QueueSource( StorageS3QueueSource(
@ -94,7 +108,12 @@ public:
const std::atomic<bool> & table_is_being_dropped_, const std::atomic<bool> & table_is_being_dropped_,
std::shared_ptr<S3QueueLog> s3_queue_log_, std::shared_ptr<S3QueueLog> s3_queue_log_,
const StorageID & storage_id_, const StorageID & storage_id_,
LoggerPtr log_); LoggerPtr log_,
size_t max_processed_files_before_commit_,
size_t max_processed_rows_before_commit_,
size_t max_processed_bytes_before_commit_,
size_t max_processing_time_sec_before_commit_,
bool commit_once_processed_);
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns); static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
@ -102,6 +121,10 @@ public:
Chunk generate() override; Chunk generate() override;
/// Commit files after insertion into storage finished.
/// `success` defines whether insertion was successful or not.
void commit(bool success, const std::string & exception_message = {});
private: private:
const String name; const String name;
const size_t processor_id; const size_t processor_id;
@ -113,17 +136,29 @@ private:
const std::atomic<bool> & table_is_being_dropped; const std::atomic<bool> & table_is_being_dropped;
const std::shared_ptr<S3QueueLog> s3_queue_log; const std::shared_ptr<S3QueueLog> s3_queue_log;
const StorageID storage_id; const StorageID storage_id;
const size_t max_processed_files_before_commit;
const size_t max_processed_rows_before_commit;
const size_t max_processed_bytes_before_commit;
const size_t max_processing_time_sec_before_commit;
const bool commit_once_processed;
RemoveFileFunc remove_file_func; RemoveFileFunc remove_file_func;
LoggerPtr log; LoggerPtr log;
ReaderHolder reader; std::vector<S3QueueMetadata::FileMetadataPtr> processed_files;
std::future<ReaderHolder> reader_future; std::vector<S3QueueMetadata::FileMetadataPtr> failed_during_read_files;
Source::ReaderHolder reader;
std::future<Source::ReaderHolder> reader_future;
std::atomic<bool> initialized{false}; std::atomic<bool> initialized{false};
size_t processed_rows_from_file = 0; size_t processed_rows_from_file = 0;
size_t total_processed_rows = 0;
size_t total_processed_bytes = 0;
S3QueueOrderedFileMetadata::BucketHolderPtr current_bucket_holder; Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};
Chunk generateImpl();
void applyActionAfterProcessing(const String & path); void applyActionAfterProcessing(const String & path);
void appendLogElement(const std::string & filename, S3QueueMetadata::FileStatus & file_status_, size_t processed_rows, bool processed); void appendLogElement(const std::string & filename, S3QueueMetadata::FileStatus & file_status_, size_t processed_rows, bool processed);
void lazyInitialize(size_t processor); void lazyInitialize(size_t processor);

View File

@ -130,8 +130,11 @@ void S3QueueUnorderedFileMetadata::setProcessedImpl()
const auto code = zk_client->tryMulti(requests, responses); const auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK) if (code == Coordination::Error::ZOK)
{ {
if (max_loading_retries) if (max_loading_retries
zk_client->tryRemove(failed_node_path + ".retriable", -1); && zk_client->tryRemove(failed_node_path + ".retriable", -1) == Coordination::Error::ZOK)
{
LOG_TEST(log, "Removed node {}.retriable", failed_node_path);
}
LOG_TRACE(log, "Moved file `{}` to processed (node path: {})", path, processed_node_path); LOG_TRACE(log, "Moved file `{}` to processed (node path: {})", path, processed_node_path);
return; return;

View File

@ -26,6 +26,7 @@
#include <Storages/prepareReadingFromFormat.h> #include <Storages/prepareReadingFromFormat.h>
#include <Storages/ObjectStorage/S3/Configuration.h> #include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/Utils.h> #include <Storages/ObjectStorage/Utils.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <QueryPipeline/QueryPipelineBuilder.h> #include <QueryPipeline/QueryPipelineBuilder.h>
#include <filesystem> #include <filesystem>
@ -71,7 +72,12 @@ namespace
return zkutil::extractZooKeeperPath(result_zk_path, true); return zkutil::extractZooKeeperPath(result_zk_path, true);
} }
void checkAndAdjustSettings(S3QueueSettings & s3queue_settings, const Settings & settings, bool is_attach) void checkAndAdjustSettings(
S3QueueSettings & s3queue_settings,
const Settings & settings,
bool is_attach,
const LoggerPtr & log,
ASTStorage * engine_args)
{ {
if (!is_attach && !s3queue_settings.mode.changed) if (!is_attach && !s3queue_settings.mode.changed)
{ {
@ -79,11 +85,6 @@ namespace
} }
/// In case !is_attach, we leave Ordered mode as default for compatibility. /// In case !is_attach, we leave Ordered mode as default for compatibility.
if (!s3queue_settings.s3queue_processing_threads_num)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `s3queue_processing_threads_num` cannot be set to zero");
}
if (!s3queue_settings.s3queue_enable_logging_to_s3queue_log.changed) if (!s3queue_settings.s3queue_enable_logging_to_s3queue_log.changed)
{ {
s3queue_settings.s3queue_enable_logging_to_s3queue_log = settings.s3queue_enable_logging_to_s3queue_log; s3queue_settings.s3queue_enable_logging_to_s3queue_log = settings.s3queue_enable_logging_to_s3queue_log;
@ -95,6 +96,21 @@ namespace
"Setting `s3queue_cleanup_interval_min_ms` ({}) must be less or equal to `s3queue_cleanup_interval_max_ms` ({})", "Setting `s3queue_cleanup_interval_min_ms` ({}) must be less or equal to `s3queue_cleanup_interval_max_ms` ({})",
s3queue_settings.s3queue_cleanup_interval_min_ms, s3queue_settings.s3queue_cleanup_interval_max_ms); s3queue_settings.s3queue_cleanup_interval_min_ms, s3queue_settings.s3queue_cleanup_interval_max_ms);
} }
if (!s3queue_settings.s3queue_processing_threads_num)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `s3queue_processing_threads_num` cannot be set to zero");
}
if (!is_attach && !s3queue_settings.s3queue_processing_threads_num.changed)
{
s3queue_settings.s3queue_processing_threads_num = std::max<uint32_t>(getNumberOfPhysicalCPUCores(), 16);
engine_args->settings->as<ASTSetQuery>()->changes.insertSetting(
"s3queue_processing_threads_num",
s3queue_settings.s3queue_processing_threads_num.value);
LOG_TRACE(log, "Set `processing_threads_num` to {}", s3queue_settings.s3queue_processing_threads_num);
}
} }
} }
@ -107,7 +123,7 @@ StorageS3Queue::StorageS3Queue(
const String & comment, const String & comment,
ContextPtr context_, ContextPtr context_,
std::optional<FormatSettings> format_settings_, std::optional<FormatSettings> format_settings_,
ASTStorage * /* engine_args */, ASTStorage * engine_args,
LoadingStrictnessLevel mode) LoadingStrictnessLevel mode)
: IStorage(table_id_) : IStorage(table_id_)
, WithContext(context_) , WithContext(context_)
@ -131,7 +147,7 @@ StorageS3Queue::StorageS3Queue(
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs"); throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
} }
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef(), mode > LoadingStrictnessLevel::CREATE); checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef(), mode > LoadingStrictnessLevel::CREATE, log, engine_args);
object_storage = configuration->createObjectStorage(context_, /* is_readonly */true); object_storage = configuration->createObjectStorage(context_, /* is_readonly */true);
FormatFactory::instance().checkFormatName(configuration->format); FormatFactory::instance().checkFormatName(configuration->format);
@ -305,10 +321,12 @@ void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const
createIterator(nullptr); createIterator(nullptr);
for (size_t i = 0; i < adjusted_num_streams; ++i) for (size_t i = 0; i < adjusted_num_streams; ++i)
pipes.emplace_back(storage->createSource( pipes.emplace_back(storage->createSource(
i, i/* processor_id */,
info, info,
iterator, iterator,
max_block_size, context)); max_block_size,
context,
true/* commit_once_processed */));
auto pipe = Pipe::unitePipes(std::move(pipes)); auto pipe = Pipe::unitePipes(std::move(pipes));
if (pipe.empty()) if (pipe.empty())
@ -325,7 +343,8 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
const ReadFromFormatInfo & info, const ReadFromFormatInfo & info,
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator, std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
size_t max_block_size, size_t max_block_size,
ContextPtr local_context) ContextPtr local_context,
bool commit_once_processed)
{ {
auto internal_source = std::make_unique<StorageObjectStorageSource>( auto internal_source = std::make_unique<StorageObjectStorageSource>(
getName(), getName(),
@ -358,7 +377,12 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
table_is_being_dropped, table_is_being_dropped,
s3_queue_log, s3_queue_log,
getStorageID(), getStorageID(),
log); log,
s3queue_settings->s3queue_max_processed_files_before_commit,
s3queue_settings->s3queue_max_processed_rows_before_commit,
s3queue_settings->s3queue_max_processed_bytes_before_commit,
s3queue_settings->s3queue_max_processing_time_sec_before_commit,
commit_once_processed);
} }
bool StorageS3Queue::hasDependencies(const StorageID & table_id) bool StorageS3Queue::hasDependencies(const StorageID & table_id)
@ -433,48 +457,83 @@ void StorageS3Queue::threadFunc()
bool StorageS3Queue::streamToViews() bool StorageS3Queue::streamToViews()
{ {
// Create a stream for each consumer and join them in a union stream
// Only insert into dependent views and expect that input blocks contain virtual columns
auto table_id = getStorageID(); auto table_id = getStorageID();
auto table = DatabaseCatalog::instance().getTable(table_id, getContext()); auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (!table) if (!table)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Engine table {} doesn't exist.", table_id.getNameForLogs()); throw Exception(ErrorCodes::LOGICAL_ERROR, "Engine table {} doesn't exist.", table_id.getNameForLogs());
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext());
// Create an INSERT query for streaming data
auto insert = std::make_shared<ASTInsertQuery>(); auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = table_id; insert->table_id = table_id;
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext());
auto s3queue_context = Context::createCopy(getContext()); auto s3queue_context = Context::createCopy(getContext());
s3queue_context->makeQueryContext(); s3queue_context->makeQueryContext();
// Create a stream for each consumer and join them in a union stream
// Only insert into dependent views and expect that input blocks contain virtual columns
InterpreterInsertQuery interpreter(insert, s3queue_context, false, true, true);
auto block_io = interpreter.execute();
auto file_iterator = createFileIterator(s3queue_context, nullptr); auto file_iterator = createFileIterator(s3queue_context, nullptr);
size_t total_rows = 0;
auto read_from_format_info = prepareReadingFromFormat(block_io.pipeline.getHeader().getNames(), storage_snapshot, supportsSubsetOfColumns(s3queue_context)); while (!shutdown_called && !file_iterator->isFinished())
Pipes pipes;
pipes.reserve(s3queue_settings->s3queue_processing_threads_num);
for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i)
{ {
auto source = createSource(i, read_from_format_info, file_iterator, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context); InterpreterInsertQuery interpreter(insert, s3queue_context, false, true, true);
pipes.emplace_back(std::move(source)); auto block_io = interpreter.execute();
auto read_from_format_info = prepareReadingFromFormat(
block_io.pipeline.getHeader().getNames(),
storage_snapshot,
supportsSubsetOfColumns(s3queue_context));
Pipes pipes;
std::vector<std::shared_ptr<StorageS3QueueSource>> sources;
pipes.reserve(s3queue_settings->s3queue_processing_threads_num);
sources.reserve(s3queue_settings->s3queue_processing_threads_num);
for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i)
{
auto source = createSource(
i/* processor_id */,
read_from_format_info,
file_iterator,
DBMS_DEFAULT_BUFFER_SIZE,
s3queue_context,
false/* commit_once_processed */);
pipes.emplace_back(source);
sources.emplace_back(source);
}
auto pipe = Pipe::unitePipes(std::move(pipes));
block_io.pipeline.complete(std::move(pipe));
block_io.pipeline.setNumThreads(s3queue_settings->s3queue_processing_threads_num);
block_io.pipeline.setConcurrencyControl(s3queue_context->getSettingsRef().use_concurrency_control);
std::atomic_size_t rows = 0;
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
try
{
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();
}
catch (...)
{
for (auto & source : sources)
source->commit(/* success */false, getCurrentExceptionMessage(true));
file_iterator->releaseFinishedBuckets();
throw;
}
for (auto & source : sources)
source->commit(/* success */true);
file_iterator->releaseFinishedBuckets();
total_rows += rows;
} }
auto pipe = Pipe::unitePipes(std::move(pipes));
block_io.pipeline.complete(std::move(pipe)); return total_rows > 0;
block_io.pipeline.setNumThreads(s3queue_settings->s3queue_processing_threads_num);
block_io.pipeline.setConcurrencyControl(s3queue_context->getSettingsRef().use_concurrency_control);
std::atomic_size_t rows = 0;
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();
return rows > 0;
} }
zkutil::ZooKeeperPtr StorageS3Queue::getZooKeeper() const zkutil::ZooKeeperPtr StorageS3Queue::getZooKeeper() const

View File

@ -88,7 +88,8 @@ private:
const ReadFromFormatInfo & info, const ReadFromFormatInfo & info,
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator, std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
size_t max_block_size, size_t max_block_size,
ContextPtr local_context); ContextPtr local_context,
bool commit_once_processed);
bool hasDependencies(const StorageID & table_id); bool hasDependencies(const StorageID & table_id);
bool streamToViews(); bool streamToViews();

View File

@ -26,6 +26,7 @@ ColumnsDescription StorageSystemS3Queue::getColumnsDescription()
return ColumnsDescription return ColumnsDescription
{ {
{"zookeeper_path", std::make_shared<DataTypeString>(), "Path in zookeeper to S3Queue metadata"}, {"zookeeper_path", std::make_shared<DataTypeString>(), "Path in zookeeper to S3Queue metadata"},
{"file_path", std::make_shared<DataTypeString>(), "File path of a file which is being processed by S3Queue"},
{"file_name", std::make_shared<DataTypeString>(), "File name of a file which is being processed by S3Queue"}, {"file_name", std::make_shared<DataTypeString>(), "File name of a file which is being processed by S3Queue"},
{"rows_processed", std::make_shared<DataTypeUInt64>(), "Currently processed number of rows"}, {"rows_processed", std::make_shared<DataTypeUInt64>(), "Currently processed number of rows"},
{"status", std::make_shared<DataTypeString>(), "Status of processing: Processed, Processing, Failed"}, {"status", std::make_shared<DataTypeString>(), "Status of processing: Processed, Processing, Failed"},
@ -45,11 +46,12 @@ void StorageSystemS3Queue::fillData(MutableColumns & res_columns, ContextPtr, co
{ {
for (const auto & [zookeeper_path, metadata] : S3QueueMetadataFactory::instance().getAll()) for (const auto & [zookeeper_path, metadata] : S3QueueMetadataFactory::instance().getAll())
{ {
for (const auto & [file_name, file_status] : metadata->getFileStatuses()) for (const auto & [file_path, file_status] : metadata->getFileStatuses())
{ {
size_t i = 0; size_t i = 0;
res_columns[i++]->insert(zookeeper_path); res_columns[i++]->insert(zookeeper_path);
res_columns[i++]->insert(file_name); res_columns[i++]->insert(file_path);
res_columns[i++]->insert(std::filesystem::path(file_path).filename().string());
res_columns[i++]->insert(file_status->processed_rows.load()); res_columns[i++]->insert(file_status->processed_rows.load());
res_columns[i++]->insert(magic_enum::enum_name(file_status->state.load())); res_columns[i++]->insert(magic_enum::enum_name(file_status->state.load()));

View File

@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<parts_to_throw_insert>0</parts_to_throw_insert>
</merge_tree>
</clickhouse>

View File

@ -110,6 +110,17 @@ def started_cluster():
with_installed_binary=True, with_installed_binary=True,
use_old_analyzer=True, use_old_analyzer=True,
) )
cluster.add_instance(
"instance_too_many_parts",
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
main_configs=[
"configs/s3queue_log.xml",
"configs/merge_tree.xml",
],
stay_alive=True,
)
logging.info("Starting cluster...") logging.info("Starting cluster...")
cluster.start() cluster.start()
@ -352,6 +363,7 @@ def test_direct_select_file(started_cluster, mode):
files_path, files_path,
additional_settings={ additional_settings={
"keeper_path": keeper_path, "keeper_path": keeper_path,
"s3queue_processing_threads_num": 1,
}, },
) )
@ -379,6 +391,7 @@ def test_direct_select_file(started_cluster, mode):
files_path, files_path,
additional_settings={ additional_settings={
"keeper_path": keeper_path, "keeper_path": keeper_path,
"s3queue_processing_threads_num": 1,
}, },
) )
@ -397,6 +410,7 @@ def test_direct_select_file(started_cluster, mode):
files_path, files_path,
additional_settings={ additional_settings={
"keeper_path": keeper_path, "keeper_path": keeper_path,
"s3queue_processing_threads_num": 1,
}, },
) )
@ -779,8 +793,10 @@ def test_max_set_age(started_cluster):
additional_settings={ additional_settings={
"keeper_path": keeper_path, "keeper_path": keeper_path,
"s3queue_tracked_file_ttl_sec": max_age, "s3queue_tracked_file_ttl_sec": max_age,
"s3queue_cleanup_interval_min_ms": 0, "s3queue_cleanup_interval_min_ms": max_age / 3,
"s3queue_cleanup_interval_max_ms": 0, "s3queue_cleanup_interval_max_ms": max_age / 3,
"s3queue_loading_retries": 0,
"s3queue_processing_threads_num": 1,
"s3queue_loading_retries": 0, "s3queue_loading_retries": 0,
}, },
) )
@ -806,7 +822,7 @@ def test_max_set_age(started_cluster):
assert expected_rows == get_count() assert expected_rows == get_count()
assert 10 == int(node.query(f"SELECT uniq(_path) from {dst_table_name}")) assert 10 == int(node.query(f"SELECT uniq(_path) from {dst_table_name}"))
time.sleep(max_age + 1) time.sleep(max_age + 5)
expected_rows = 20 expected_rows = 20
@ -861,6 +877,11 @@ def test_max_set_age(started_cluster):
assert "Cannot parse input" in node.query( assert "Cannot parse input" in node.query(
"SELECT exception FROM system.s3queue WHERE file_name ilike '%fff.csv'" "SELECT exception FROM system.s3queue WHERE file_name ilike '%fff.csv'"
) )
assert 1 == int(
node.query(
"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%fff.csv'"
)
)
assert 1 == int( assert 1 == int(
node.query( node.query(
"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%fff.csv' AND notEmpty(exception)" "SELECT count() FROM system.s3queue_log WHERE file_name ilike '%fff.csv' AND notEmpty(exception)"
@ -1284,7 +1305,7 @@ def test_shards_distributed(started_cluster, mode, processing_threads):
def get_count(node, table_name): def get_count(node, table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}")) return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(10): for _ in range(30):
if ( if (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name) get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == total_rows: ) == total_rows:
@ -1577,3 +1598,156 @@ def test_upgrade(started_cluster):
node.restart_with_latest_version() node.restart_with_latest_version()
assert expected_rows == get_count() assert expected_rows == get_count()
def test_exception_during_insert(started_cluster):
node = started_cluster.instances["instance_too_many_parts"]
table_name = f"test_exception_during_insert"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
create_mv(node, table_name, dst_table_name)
node.wait_for_log_line(
"Failed to process data: Code: 252. DB::Exception: Too many parts"
)
time.sleep(2)
exception = node.query(
f"SELECT exception FROM system.s3queue WHERE zookeeper_path ilike '%{table_name}%' and notEmpty(exception)"
)
assert "Too many parts" in exception
node.replace_in_config(
"/etc/clickhouse-server/config.d/merge_tree.xml",
"parts_to_throw_insert>0",
"parts_to_throw_insert>10",
)
node.restart_clickhouse()
def get_count():
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
expected_rows = 10
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
def test_commit_on_limit(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"test_commit_on_limit"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 1,
"s3queue_loading_retries": 0,
"s3queue_max_processed_files_before_commit": 10,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
incorrect_values = [
["failed", 1, 1],
]
incorrect_values_csv = (
"\n".join((",".join(map(str, row)) for row in incorrect_values)) + "\n"
).encode()
correct_values = [
[1, 1, 1],
]
correct_values_csv = (
"\n".join((",".join(map(str, row)) for row in correct_values)) + "\n"
).encode()
put_s3_file_content(
started_cluster, f"{files_path}/test_99.csv", correct_values_csv
)
put_s3_file_content(
started_cluster, f"{files_path}/test_999.csv", correct_values_csv
)
put_s3_file_content(
started_cluster, f"{files_path}/test_9999.csv", incorrect_values_csv
)
put_s3_file_content(
started_cluster, f"{files_path}/test_99999.csv", correct_values_csv
)
put_s3_file_content(
started_cluster, f"{files_path}/test_999999.csv", correct_values_csv
)
create_mv(node, table_name, dst_table_name)
def get_processed_files():
return (
node.query(
f"SELECT file_name FROM system.s3queue WHERE zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 "
)
.strip()
.split("\n")
)
def get_failed_files():
return (
node.query(
f"SELECT file_name FROM system.s3queue WHERE zookeeper_path ilike '%{table_name}%' and status = 'Failed'"
)
.strip()
.split("\n")
)
for _ in range(30):
if "test_999999.csv" in get_processed_files():
break
time.sleep(1)
assert "test_999999.csv" in get_processed_files()
assert 1 == int(
node.query(
"SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles' SETTINGS system_events_show_zero_values=1"
)
)
expected_processed = ["test_" + str(i) + ".csv" for i in range(files_to_generate)]
processed = get_processed_files()
for value in expected_processed:
assert value in processed
expected_failed = ["test_9999.csv"]
failed = get_failed_files()
for value in expected_failed:
assert value not in processed
assert value in failed