Fix tests

This commit is contained in:
kssenii 2024-06-17 17:11:17 +02:00
parent 060c61d88d
commit 0f2c2cc9bf
6 changed files with 48 additions and 21 deletions

View File

@ -251,13 +251,15 @@ void S3QueueIFileMetadata::setProcessed()
LOG_TRACE(log, "Set file {} as processed (rows: {})", path, file_status->processed_rows);
}
void S3QueueIFileMetadata::setFailed(const std::string & exception, bool reduce_retry_count)
void S3QueueIFileMetadata::setFailed(const std::string & exception, bool reduce_retry_count, bool overwrite_status)
{
LOG_TRACE(log, "Setting file {} as failed (path: {}, reduce retry count: {}, exception: {})",
path, failed_node_path, reduce_retry_count, exception);
ProfileEvents::increment(ProfileEvents::S3QueueFailedFiles);
chassert(file_status->state == FileStatus::State::Failed);
if (overwrite_status || file_status->state != FileStatus::State::Failed)
file_status->onFailed(exception);
node_metadata.last_exception = exception;
if (reduce_retry_count)

View File

@ -54,7 +54,7 @@ public:
bool setProcessing();
void setProcessed();
void setFailed(const std::string & exception, bool reduce_retry_count = true);
void setFailed(const std::string & exception, bool reduce_retry_count, bool overwrite_status);
virtual void setProcessedAtStartRequests(
Coordination::Requests & requests,

View File

@ -54,6 +54,7 @@ StorageS3QueueSource::FileIterator::FileIterator(
bool StorageS3QueueSource::FileIterator::isFinished() const
{
LOG_TEST(log, "Iterator finished: {}, objects to retry: {}", iterator_finished, objects_to_retry.size());
return iterator_finished
&& listed_keys_cache.end() == std::find_if(
listed_keys_cache.begin(), listed_keys_cache.end(),
@ -82,6 +83,8 @@ StorageS3QueueSource::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl
if (objects_to_retry.empty())
{
object_info = glob_iterator->next(processor);
if (!object_info)
iterator_finished = true;
}
else
{
@ -91,7 +94,10 @@ StorageS3QueueSource::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl
}
if (!object_info)
{
LOG_TEST(log, "No object left");
return {};
}
if (shutdown_called)
{
@ -108,6 +114,7 @@ StorageS3QueueSource::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl
void StorageS3QueueSource::FileIterator::returnForRetry(ObjectInfoPtr object_info)
{
chassert(object_info);
if (metadata->useBucketsForProcessing())
{
const auto bucket = metadata->getBucketForPath(object_info->relative_path);
@ -374,10 +381,13 @@ void StorageS3QueueSource::lazyInitialize(size_t processor)
if (initialized)
return;
LOG_TEST(log, "Initializing a new reader");
internal_source->lazyInitialize(processor);
reader = std::move(internal_source->reader);
if (reader)
reader_future = std::move(internal_source->reader_future);
initialized = true;
}
@ -427,7 +437,7 @@ Chunk StorageS3QueueSource::generateImpl()
{
try
{
file_metadata->setFailed("Cancelled");
file_metadata->setFailed("Cancelled", /* reduce_retry_count */true, /* overwrite_status */false);
}
catch (...)
{
@ -459,7 +469,7 @@ Chunk StorageS3QueueSource::generateImpl()
try
{
file_metadata->setFailed("Table is dropped");
file_metadata->setFailed("Table is dropped", /* reduce_retry_count */true, /* overwrite_status */false);
}
catch (...)
{
@ -511,10 +521,9 @@ Chunk StorageS3QueueSource::generateImpl()
const auto message = getCurrentExceptionMessage(true);
LOG_ERROR(log, "Got an error while pulling chunk. Will set file {} as failed. Error: {} ", path, message);
appendLogElement(path, *file_status, processed_rows_from_file, false);
failed_files.push_back(file_metadata);
failed_during_read_files.push_back(file_metadata);
file_status->onFailed(getCurrentExceptionMessage(true));
appendLogElement(path, *file_status, processed_rows_from_file, false);
if (processed_rows_from_file == 0)
{
@ -619,7 +628,7 @@ Chunk StorageS3QueueSource::generateImpl()
void StorageS3QueueSource::commit(bool success, const std::string & exception)
{
LOG_TEST(log, "Having {} files to set as {}, failed files: {}",
processed_files.size(), success ? "Processed" : "Failed", failed_files.size());
processed_files.size(), success ? "Processed" : "Failed", failed_during_read_files.size());
for (const auto & file_metadata : processed_files)
{
@ -629,14 +638,20 @@ void StorageS3QueueSource::commit(bool success, const std::string & exception)
applyActionAfterProcessing(file_metadata->getPath());
}
else
file_metadata->setFailed(exception, /* reduce_retry_count */false);
file_metadata->setFailed(
exception,
/* reduce_retry_count */false,
/* overwrite_status */true);
}
for (const auto & file_metadata : failed_files)
for (const auto & file_metadata : failed_during_read_files)
{
/// `exception` from commit args is from insertion to storage.
/// Here we do not used it as failed_files were not inserted into storage, but skipped.
file_metadata->setFailed(file_metadata->getFileStatus()->getException(), /* reduce_retry_count */true);
/// Here we do not used it as failed_during_read_files were not inserted into storage, but skipped.
file_metadata->setFailed(
file_metadata->getFileStatus()->getException(),
/* reduce_retry_count */true,
/* overwrite_status */false);
}
}

View File

@ -85,6 +85,7 @@ public:
std::deque<ObjectInfoPtr> objects_to_retry;
std::pair<ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr> getNextKeyFromAcquiredBucket(size_t processor);
bool hasKeysForProcessor(const Processor & processor) const;
};
StorageS3QueueSource(
@ -137,7 +138,7 @@ private:
LoggerPtr log;
std::vector<Metadata::FileMetadataPtr> processed_files;
std::vector<Metadata::FileMetadataPtr> failed_files;
std::vector<Metadata::FileMetadataPtr> failed_during_read_files;
ReaderHolder reader;
std::future<ReaderHolder> reader_future;

View File

@ -72,7 +72,12 @@ namespace
return zkutil::extractZooKeeperPath(result_zk_path, true);
}
void checkAndAdjustSettings(S3QueueSettings & s3queue_settings, const Settings & settings, bool is_attach, const LoggerPtr & log)
void checkAndAdjustSettings(
S3QueueSettings & s3queue_settings,
const Settings & settings,
bool is_attach,
const LoggerPtr & log,
ASTStorage * engine_args)
{
if (!is_attach && !s3queue_settings.mode.changed)
{
@ -100,6 +105,10 @@ namespace
if (!is_attach && !s3queue_settings.s3queue_processing_threads_num.changed)
{
s3queue_settings.s3queue_processing_threads_num = std::max<uint32_t>(getNumberOfPhysicalCPUCores(), 16);
engine_args->settings->as<ASTSetQuery>()->changes.insertSetting(
"s3queue_processing_threads_num",
s3queue_settings.s3queue_processing_threads_num.value);
LOG_TRACE(log, "Set `processing_threads_num` to {}", s3queue_settings.s3queue_processing_threads_num);
}
}
@ -114,7 +123,7 @@ StorageS3Queue::StorageS3Queue(
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
ASTStorage * /* engine_args */,
ASTStorage * engine_args,
LoadingStrictnessLevel mode)
: IStorage(table_id_)
, WithContext(context_)
@ -138,7 +147,7 @@ StorageS3Queue::StorageS3Queue(
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
}
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef(), mode > LoadingStrictnessLevel::CREATE, log);
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef(), mode > LoadingStrictnessLevel::CREATE, log, engine_args);
object_storage = configuration->createObjectStorage(context_, /* is_readonly */true);
FormatFactory::instance().checkFormatName(configuration->format);

View File

@ -793,8 +793,8 @@ def test_max_set_age(started_cluster):
additional_settings={
"keeper_path": keeper_path,
"s3queue_tracked_file_ttl_sec": max_age,
"s3queue_cleanup_interval_min_ms": 0,
"s3queue_cleanup_interval_max_ms": 0,
"s3queue_cleanup_interval_min_ms": max_age / 3,
"s3queue_cleanup_interval_max_ms": max_age / 3,
"s3queue_loading_retries": 0,
"s3queue_processing_threads_num": 1,
"s3queue_loading_retries": 0,
@ -822,7 +822,7 @@ def test_max_set_age(started_cluster):
assert expected_rows == get_count()
assert 10 == int(node.query(f"SELECT uniq(_path) from {dst_table_name}"))
time.sleep(max_age + 1)
time.sleep(max_age + 5)
expected_rows = 20
@ -1671,7 +1671,7 @@ def test_commit_on_limit(started_cluster):
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 1,
"s3queue_loading_retries": 1,
"s3queue_loading_retries": 0,
"s3queue_max_processed_files_before_commit": 10,
},
)