From fe3939287b476b7e0f2cec669dd9606f2fd0438f Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Sat, 27 May 2023 01:02:48 +0200 Subject: [PATCH] add test, fix .gin_sid files --- src/IO/WriteBufferFromS3.cpp | 12 ++++++++++-- src/IO/WriteBufferFromS3TaskTracker.cpp | 15 ++++++++------- src/Storages/MergeTree/GinIndexStore.cpp | 7 +++++++ .../configs/config.d/storage_conf.xml | 19 +++++++++++++++++++ tests/integration/test_merge_tree_s3/test.py | 5 +++-- 5 files changed, 47 insertions(+), 11 deletions(-) diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index 954c996d929..4657a65f931 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -182,6 +182,14 @@ void WriteBufferFromS3::finalizeImpl() if (!is_prefinalized) preFinalize(); + if (std::uncaught_exception()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Detected buffer finalization when an exception is unwinding the stack." + " Do not call finalize buffer in destructors or when exception thrown." + " Details {}", + getLogDetails()); + chassert(offset() == 0); chassert(hidden_size == 0); @@ -521,7 +529,7 @@ void WriteBufferFromS3::completeMultipartUpload() if (multipart_tags.empty()) throw Exception( - ErrorCodes::LOGICAL_ERROR, + ErrorCodes::S3_ERROR, "Failed to complete multipart upload. No parts have uploaded"); for (size_t i = 0; i < multipart_tags.size(); ++i) @@ -529,7 +537,7 @@ void WriteBufferFromS3::completeMultipartUpload() const auto tag = multipart_tags.at(i); if (tag.empty()) throw Exception( - ErrorCodes::LOGICAL_ERROR, + ErrorCodes::S3_ERROR, "Failed to complete multipart upload. Part {} haven't been uploaded.", i); } diff --git a/src/IO/WriteBufferFromS3TaskTracker.cpp b/src/IO/WriteBufferFromS3TaskTracker.cpp index 4abae90eeac..7cf5a89df86 100644 --- a/src/IO/WriteBufferFromS3TaskTracker.cpp +++ b/src/IO/WriteBufferFromS3TaskTracker.cpp @@ -162,15 +162,16 @@ void WriteBufferFromS3::TaskTracker::waitTilInflightShrink() for (auto & it : finished_futures) { - SCOPE_EXIT({ - /// According to basic exception safety TaskTracker has to be destroyed after exception - /// If it would be true than this SCOPE_EXIT is superfluous - /// However WriteBufferWithFinalizeCallback, WriteBufferFromFileDecorator do call finalize in d-tor - /// TaskTracker has to cope this until the issue with finalizing in d-tor is addressed in #50274 - futures.erase(it); - }); +// SCOPE_EXIT({ +// /// According to basic exception safety TaskTracker has to be destroyed after exception +// /// If it would be true than this SCOPE_EXIT is superfluous +// /// However WriteBufferWithFinalizeCallback, WriteBufferFromFileDecorator do call finalize in d-tor +// /// TaskTracker has to cope this until the issue with finalizing in d-tor is addressed in #50274 +// futures.erase(it); +// }); it->get(); + futures.erase(it); } finished_futures.clear(); diff --git a/src/Storages/MergeTree/GinIndexStore.cpp b/src/Storages/MergeTree/GinIndexStore.cpp index 0904855755c..aa0c1fccbc3 100644 --- a/src/Storages/MergeTree/GinIndexStore.cpp +++ b/src/Storages/MergeTree/GinIndexStore.cpp @@ -166,6 +166,7 @@ UInt32 GinIndexStore::getNextSegmentIDRange(const String & file_name, size_t n) /// Write segment ID 1 writeVarUInt(1, *ostr); ostr->sync(); + ostr->finalize(); } /// Read id in file @@ -188,6 +189,7 @@ UInt32 GinIndexStore::getNextSegmentIDRange(const String & file_name, size_t n) writeVarUInt(result + n, *ostr); ostr->sync(); + ostr->finalize(); } return result; } @@ -317,8 +319,13 @@ void GinIndexStore::writeSegment() current_segment.segment_id = getNextSegmentID(); metadata_file_stream->sync(); + metadata_file_stream->finalize(); + dict_file_stream->sync(); + dict_file_stream->finalize(); + postings_file_stream->sync(); + postings_file_stream->finalize(); } GinIndexStoreDeserializer::GinIndexStoreDeserializer(const GinIndexStorePtr & store_) diff --git a/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml b/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml index cca80143548..504280e4bed 100644 --- a/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml +++ b/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml @@ -33,6 +33,18 @@ 20000 1 + + s3 + http://resolver:8083/root/data/ + minio + minio123 + true + 0 + 20000 + 20000 + 0 + 1 + local / @@ -128,6 +140,13 @@ + + +
+ broken_s3_always_multi_part +
+
+
diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py index f87644a6876..4dcc5805294 100644 --- a/tests/integration/test_merge_tree_s3/test.py +++ b/tests/integration/test_merge_tree_s3/test.py @@ -930,8 +930,9 @@ def test_merge_canceled_by_drop(cluster, node_name): ) +@pytest.mark.parametrize("storage_policy", ["broken_s3_always_multi_part", "broken_s3"]) @pytest.mark.parametrize("node_name", ["node"]) -def test_merge_canceled_by_s3_errors(cluster, node_name): +def test_merge_canceled_by_s3_errors(cluster, node_name, storage_policy): node = cluster.instances[node_name] node.query("DROP TABLE IF EXISTS test_merge_canceled_by_s3_errors NO DELAY") node.query( @@ -939,7 +940,7 @@ def test_merge_canceled_by_s3_errors(cluster, node_name): " (key UInt32, value String)" " Engine=MergeTree() " " ORDER BY value " - " SETTINGS storage_policy='broken_s3'" + f" SETTINGS storage_policy='{storage_policy}'" ) node.query("SYSTEM STOP MERGES test_merge_canceled_by_s3_errors") node.query(