Merge pull request #60282 from ClickHouse/s3queue-fix-bug-and-flaky-test

s3queue: fix bug (also fixes flaky test_storage_s3_queue/test.py::test_shards_distributed)
This commit is contained in:
Kseniia Sumarokova 2024-02-23 11:57:34 +01:00 committed by GitHub
commit 6a7fef13ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 6 additions and 2 deletions

View File

@ -699,7 +699,10 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl(
{ {
auto code = zk_client->tryMulti(requests, responses); auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK) if (code == Coordination::Error::ZOK)
{
LOG_TEST(log, "Moved file `{}` to processed", path);
return; return;
}
} }
/// Failed to update max processed node, retry. /// Failed to update max processed node, retry.

View File

@ -80,6 +80,7 @@ StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next(si
{ {
val = keys.front(); val = keys.front();
keys.pop_front(); keys.pop_front();
chassert(idx == metadata->getProcessingIdForPath(val->key));
} }
} }
else else
@ -103,7 +104,7 @@ StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next(si
LOG_TEST(log, "Putting key {} into queue of processor {} (total: {})", LOG_TEST(log, "Putting key {} into queue of processor {} (total: {})",
val->key, processing_id_for_key, sharded_keys.size()); val->key, processing_id_for_key, sharded_keys.size());
if (auto it = sharded_keys.find(idx); it != sharded_keys.end()) if (auto it = sharded_keys.find(processing_id_for_key); it != sharded_keys.end())
{ {
it->second.push_back(val); it->second.push_back(val);
} }
@ -111,7 +112,7 @@ StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next(si
{ {
throw Exception(ErrorCodes::LOGICAL_ERROR, throw Exception(ErrorCodes::LOGICAL_ERROR,
"Processing id {} does not exist (Expected ids: {})", "Processing id {} does not exist (Expected ids: {})",
idx, fmt::join(metadata->getProcessingIdsForShard(current_shard), ", ")); processing_id_for_key, fmt::join(metadata->getProcessingIdsForShard(current_shard), ", "));
} }
} }
continue; continue;