add test, fix .gin_sid files

This commit is contained in:
Sema Checherinda 2023-05-27 01:02:48 +02:00
parent 0e019c8e83
commit fe3939287b
5 changed files with 47 additions and 11 deletions

View File

@ -182,6 +182,14 @@ void WriteBufferFromS3::finalizeImpl()
if (!is_prefinalized) if (!is_prefinalized)
preFinalize(); preFinalize();
if (std::uncaught_exception())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Detected buffer finalization when an exception is unwinding the stack."
" Do not call finalize buffer in destructors or when exception thrown."
" Details {}",
getLogDetails());
chassert(offset() == 0); chassert(offset() == 0);
chassert(hidden_size == 0); chassert(hidden_size == 0);
@ -521,7 +529,7 @@ void WriteBufferFromS3::completeMultipartUpload()
if (multipart_tags.empty()) if (multipart_tags.empty())
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::S3_ERROR,
"Failed to complete multipart upload. No parts have uploaded"); "Failed to complete multipart upload. No parts have uploaded");
for (size_t i = 0; i < multipart_tags.size(); ++i) for (size_t i = 0; i < multipart_tags.size(); ++i)
@ -529,7 +537,7 @@ void WriteBufferFromS3::completeMultipartUpload()
const auto tag = multipart_tags.at(i); const auto tag = multipart_tags.at(i);
if (tag.empty()) if (tag.empty())
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::S3_ERROR,
"Failed to complete multipart upload. Part {} haven't been uploaded.", i); "Failed to complete multipart upload. Part {} haven't been uploaded.", i);
} }

View File

@ -162,15 +162,16 @@ void WriteBufferFromS3::TaskTracker::waitTilInflightShrink()
for (auto & it : finished_futures) for (auto & it : finished_futures)
{ {
SCOPE_EXIT({ // SCOPE_EXIT({
/// According to basic exception safety TaskTracker has to be destroyed after exception // /// According to basic exception safety TaskTracker has to be destroyed after exception
/// If it would be true than this SCOPE_EXIT is superfluous // /// If it would be true than this SCOPE_EXIT is superfluous
/// However WriteBufferWithFinalizeCallback, WriteBufferFromFileDecorator do call finalize in d-tor // /// However WriteBufferWithFinalizeCallback, WriteBufferFromFileDecorator do call finalize in d-tor
/// TaskTracker has to cope this until the issue with finalizing in d-tor is addressed in #50274 // /// TaskTracker has to cope this until the issue with finalizing in d-tor is addressed in #50274
futures.erase(it); // futures.erase(it);
}); // });
it->get(); it->get();
futures.erase(it);
} }
finished_futures.clear(); finished_futures.clear();

View File

@ -166,6 +166,7 @@ UInt32 GinIndexStore::getNextSegmentIDRange(const String & file_name, size_t n)
/// Write segment ID 1 /// Write segment ID 1
writeVarUInt(1, *ostr); writeVarUInt(1, *ostr);
ostr->sync(); ostr->sync();
ostr->finalize();
} }
/// Read id in file /// Read id in file
@ -188,6 +189,7 @@ UInt32 GinIndexStore::getNextSegmentIDRange(const String & file_name, size_t n)
writeVarUInt(result + n, *ostr); writeVarUInt(result + n, *ostr);
ostr->sync(); ostr->sync();
ostr->finalize();
} }
return result; return result;
} }
@ -317,8 +319,13 @@ void GinIndexStore::writeSegment()
current_segment.segment_id = getNextSegmentID(); current_segment.segment_id = getNextSegmentID();
metadata_file_stream->sync(); metadata_file_stream->sync();
metadata_file_stream->finalize();
dict_file_stream->sync(); dict_file_stream->sync();
dict_file_stream->finalize();
postings_file_stream->sync(); postings_file_stream->sync();
postings_file_stream->finalize();
} }
GinIndexStoreDeserializer::GinIndexStoreDeserializer(const GinIndexStorePtr & store_) GinIndexStoreDeserializer::GinIndexStoreDeserializer(const GinIndexStorePtr & store_)

View File

@ -33,6 +33,18 @@
<request_timeout_ms>20000</request_timeout_ms> <request_timeout_ms>20000</request_timeout_ms>
<s3_max_inflight_parts_for_one_file>1</s3_max_inflight_parts_for_one_file> <s3_max_inflight_parts_for_one_file>1</s3_max_inflight_parts_for_one_file>
</broken_s3> </broken_s3>
<broken_s3_always_multi_part>
<type>s3</type>
<endpoint>http://resolver:8083/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<skip_access_check>true</skip_access_check>
<retry_attempts>0</retry_attempts>
<connect_timeout_ms>20000</connect_timeout_ms>
<request_timeout_ms>20000</request_timeout_ms>
<s3_max_single_part_upload_size>0</s3_max_single_part_upload_size>
<s3_max_inflight_parts_for_one_file>1</s3_max_inflight_parts_for_one_file>
</broken_s3_always_multi_part>
<hdd> <hdd>
<type>local</type> <type>local</type>
<path>/</path> <path>/</path>
@ -128,6 +140,13 @@
</main> </main>
</volumes> </volumes>
</broken_s3> </broken_s3>
<broken_s3_always_multi_part>
<volumes>
<main>
<disk>broken_s3_always_multi_part</disk>
</main>
</volumes>
</broken_s3_always_multi_part>
</policies> </policies>
</storage_configuration> </storage_configuration>

View File

@ -930,8 +930,9 @@ def test_merge_canceled_by_drop(cluster, node_name):
) )
@pytest.mark.parametrize("storage_policy", ["broken_s3_always_multi_part", "broken_s3"])
@pytest.mark.parametrize("node_name", ["node"]) @pytest.mark.parametrize("node_name", ["node"])
def test_merge_canceled_by_s3_errors(cluster, node_name): def test_merge_canceled_by_s3_errors(cluster, node_name, storage_policy):
node = cluster.instances[node_name] node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS test_merge_canceled_by_s3_errors NO DELAY") node.query("DROP TABLE IF EXISTS test_merge_canceled_by_s3_errors NO DELAY")
node.query( node.query(
@ -939,7 +940,7 @@ def test_merge_canceled_by_s3_errors(cluster, node_name):
" (key UInt32, value String)" " (key UInt32, value String)"
" Engine=MergeTree() " " Engine=MergeTree() "
" ORDER BY value " " ORDER BY value "
" SETTINGS storage_policy='broken_s3'" f" SETTINGS storage_policy='{storage_policy}'"
) )
node.query("SYSTEM STOP MERGES test_merge_canceled_by_s3_errors") node.query("SYSTEM STOP MERGES test_merge_canceled_by_s3_errors")
node.query( node.query(