Fix tests

This commit is contained in:
kssenii 2024-06-11 19:01:24 +02:00
parent bc294ef51f
commit 1e435eb353
6 changed files with 75 additions and 17 deletions

View File

@ -133,6 +133,9 @@ S3QueueMetadata::S3QueueMetadata(const fs::path & zookeeper_path_, const S3Queue
generateRescheduleInterval(
settings.s3queue_cleanup_interval_min_ms, settings.s3queue_cleanup_interval_max_ms));
}
LOG_TRACE(log, "Mode: {}, buckets: {}, processing threads: {}, result buckets num: {}",
settings.mode.toString(), settings.s3queue_buckets, settings.s3queue_processing_threads_num, buckets_num);
}
S3QueueMetadata::~S3QueueMetadata()

View File

@ -301,7 +301,8 @@ StorageS3QueueSource::StorageS3QueueSource(
const std::atomic<bool> & table_is_being_dropped_,
std::shared_ptr<S3QueueLog> s3_queue_log_,
const StorageID & storage_id_,
LoggerPtr log_)
LoggerPtr log_,
bool commit_once_processed_)
: ISource(header_)
, WithContext(context_)
, name(std::move(name_))
@ -314,6 +315,7 @@ StorageS3QueueSource::StorageS3QueueSource(
, table_is_being_dropped(table_is_being_dropped_)
, s3_queue_log(s3_queue_log_)
, storage_id(storage_id_)
, commit_once_processed(commit_once_processed_)
, remove_file_func(remove_file_func_)
, log(log_)
{
@ -337,6 +339,28 @@ void StorageS3QueueSource::lazyInitialize(size_t processor)
}
Chunk StorageS3QueueSource::generate()
{
Chunk chunk;
try
{
chunk = generateImpl();
}
catch (...)
{
if (commit_once_processed)
setFailed(getCurrentExceptionMessage(true), true);
throw;
}
if (!chunk && commit_once_processed)
{
setProcessed();
}
return chunk;
}
Chunk StorageS3QueueSource::generateImpl()
{
lazyInitialize(processor_id);
@ -409,8 +433,6 @@ Chunk StorageS3QueueSource::generate()
SCOPE_EXIT({ CurrentThread::get().attachProfileCountersScope(prev_scope); });
/// FIXME: if files are compressed, profile counters update does not work fully (s3 related counters are not saved). Why?
started_files.push_back(file_metadata);
try
{
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueuePullMicroseconds);
@ -432,13 +454,16 @@ Chunk StorageS3QueueSource::generate()
{
const auto message = getCurrentExceptionMessage(true);
LOG_ERROR(log, "Got an error while pulling chunk. Will set file {} as failed. Error: {} ", path, message);
file_status->onFailed(getCurrentExceptionMessage(true));
appendLogElement(path, *file_status, processed_rows_from_file, false);
started_files.push_back(file_metadata);
throw;
}
appendLogElement(path, *file_status, processed_rows_from_file, true);
file_status.reset();
processed_rows_from_file = 0;
started_files.push_back(file_metadata);
if (shutdown_called)
{
@ -465,6 +490,8 @@ Chunk StorageS3QueueSource::generate()
void StorageS3QueueSource::setProcessed()
{
LOG_TEST(log, "Having {} files to set as processed", started_files.size());
for (const auto & file_metadata : started_files)
{
file_metadata->setProcessed();
@ -474,6 +501,8 @@ void StorageS3QueueSource::setProcessed()
void StorageS3QueueSource::setFailed(const std::string & exception, bool reduce_retry_count)
{
LOG_TEST(log, "Having {} files to set as failed", started_files.size());
for (const auto & file_metadata : started_files)
{
file_metadata->setFailed(exception, reduce_retry_count);

View File

@ -94,7 +94,8 @@ public:
const std::atomic<bool> & table_is_being_dropped_,
std::shared_ptr<S3QueueLog> s3_queue_log_,
const StorageID & storage_id_,
LoggerPtr log_);
LoggerPtr log_,
bool commit_once_processed_);
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
@ -117,6 +118,7 @@ private:
const std::atomic<bool> & table_is_being_dropped;
const std::shared_ptr<S3QueueLog> s3_queue_log;
const StorageID storage_id;
const bool commit_once_processed;
RemoveFileFunc remove_file_func;
LoggerPtr log;
@ -130,6 +132,7 @@ private:
S3QueueOrderedFileMetadata::BucketHolderPtr current_bucket_holder;
Chunk generateImpl();
void applyActionAfterProcessing(const String & path);
void appendLogElement(const std::string & filename, S3QueueMetadata::FileStatus & file_status_, size_t processed_rows, bool processed);
void lazyInitialize(size_t processor);

View File

@ -74,7 +74,7 @@ namespace
return zkutil::extractZooKeeperPath(result_zk_path, true);
}
void checkAndAdjustSettings(S3QueueSettings & s3queue_settings, const Settings & settings, bool is_attach)
void checkAndAdjustSettings(S3QueueSettings & s3queue_settings, const Settings & settings, bool is_attach, const LoggerPtr & log)
{
if (!is_attach && !s3queue_settings.mode.changed)
{
@ -82,11 +82,6 @@ namespace
}
/// In case !is_attach, we leave Ordered mode as default for compatibility.
if (!s3queue_settings.s3queue_processing_threads_num)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `s3queue_processing_threads_num` cannot be set to zero");
}
if (!s3queue_settings.s3queue_enable_logging_to_s3queue_log.changed)
{
s3queue_settings.s3queue_enable_logging_to_s3queue_log = settings.s3queue_enable_logging_to_s3queue_log;
@ -99,9 +94,15 @@ namespace
s3queue_settings.s3queue_cleanup_interval_min_ms, s3queue_settings.s3queue_cleanup_interval_max_ms);
}
if (!s3queue_settings.s3queue_processing_threads_num)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `s3queue_processing_threads_num` cannot be set to zero");
}
if (!s3queue_settings.s3queue_processing_threads_num.changed)
{
s3queue_settings.s3queue_processing_threads_num = std::max<uint32_t>(getNumberOfPhysicalCPUCores(), 16);
LOG_TRACE(log, "Set `processing_threads_num` to {}", s3queue_settings.s3queue_processing_threads_num);
}
}
}
@ -139,7 +140,7 @@ StorageS3Queue::StorageS3Queue(
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
}
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef(), mode > LoadingStrictnessLevel::CREATE);
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef(), mode > LoadingStrictnessLevel::CREATE, log);
object_storage = configuration->createObjectStorage(context_, /* is_readonly */true);
FormatFactory::instance().checkFormatName(configuration->format);
@ -313,10 +314,12 @@ void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const
createIterator(nullptr);
for (size_t i = 0; i < adjusted_num_streams; ++i)
pipes.emplace_back(storage->createSource(
i,
i/* processor_id */,
info,
iterator,
max_block_size, context));
max_block_size,
context,
true/* commit_once_processed */));
auto pipe = Pipe::unitePipes(std::move(pipes));
if (pipe.empty())
@ -333,7 +336,8 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
const ReadFromFormatInfo & info,
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
size_t max_block_size,
ContextPtr local_context)
ContextPtr local_context,
bool commit_once_processed)
{
auto internal_source = std::make_unique<StorageObjectStorageSource>(
getName(),
@ -366,7 +370,8 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
table_is_being_dropped,
s3_queue_log,
getStorageID(),
log);
log,
commit_once_processed);
}
bool StorageS3Queue::hasDependencies(const StorageID & table_id)
@ -471,7 +476,14 @@ bool StorageS3Queue::streamToViews()
for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i)
{
auto source = createSource(i, read_from_format_info, file_iterator, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context);
auto source = createSource(
i/* processor_id */,
read_from_format_info,
file_iterator,
DBMS_DEFAULT_BUFFER_SIZE,
s3queue_context,
false/* commit_once_processed */);
pipes.emplace_back(source);
sources.emplace_back(source);
}

View File

@ -88,7 +88,8 @@ private:
const ReadFromFormatInfo & info,
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
size_t max_block_size,
ContextPtr local_context);
ContextPtr local_context,
bool commit_once_processed);
bool hasDependencies(const StorageID & table_id);
bool streamToViews();

View File

@ -363,6 +363,7 @@ def test_direct_select_file(started_cluster, mode):
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 1,
},
)
@ -390,6 +391,7 @@ def test_direct_select_file(started_cluster, mode):
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 1,
},
)
@ -408,6 +410,7 @@ def test_direct_select_file(started_cluster, mode):
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 1,
},
)
@ -793,6 +796,8 @@ def test_max_set_age(started_cluster):
"s3queue_cleanup_interval_min_ms": 0,
"s3queue_cleanup_interval_max_ms": 0,
"s3queue_loading_retries": 0,
"s3queue_processing_threads_num": 1,
"s3queue_loading_retries": 0,
},
)
create_mv(node, table_name, dst_table_name)
@ -872,6 +877,11 @@ def test_max_set_age(started_cluster):
assert "Cannot parse input" in node.query(
"SELECT exception FROM system.s3queue WHERE file_name ilike '%fff.csv'"
)
assert 1 == int(
node.query(
"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%fff.csv'"
)
)
assert 1 == int(
node.query(
"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%fff.csv' AND notEmpty(exception)"