Merge pull request #50275 from CheSema/d-tor-free

buffers d-tor finalize free
This commit is contained in:
Sema Checherinda 2023-05-31 12:26:19 +02:00 committed by GitHub
commit ab027ca1ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 60 additions and 57 deletions

View File

@ -138,7 +138,7 @@ void FileChecker::save() const
std::string tmp_files_info_path = parentPath(files_info_path) + "tmp_" + fileName(files_info_path);
{
std::unique_ptr<WriteBuffer> out = disk ? disk->writeFile(tmp_files_info_path) : std::make_unique<WriteBufferFromFile>(tmp_files_info_path);
std::unique_ptr<WriteBufferFromFileBase> out = disk ? disk->writeFile(tmp_files_info_path) : std::make_unique<WriteBufferFromFile>(tmp_files_info_path);
/// So complex JSON structure - for compatibility with the old format.
writeCString("{\"clickhouse\":{", *out);
@ -157,7 +157,9 @@ void FileChecker::save() const
}
writeCString("}}", *out);
out->next();
out->sync();
out->finalize();
}
if (disk)

View File

@ -13,19 +13,6 @@ WriteBufferWithFinalizeCallback::WriteBufferWithFinalizeCallback(
{
}
WriteBufferWithFinalizeCallback::~WriteBufferWithFinalizeCallback()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void WriteBufferWithFinalizeCallback::finalizeImpl()
{
WriteBufferFromFileDecorator::finalizeImpl();

View File

@ -19,8 +19,6 @@ public:
FinalizeCallback && create_callback_,
const String & remote_path_);
~WriteBufferWithFinalizeCallback() override;
String getFileName() const override { return remote_path; }
private:

View File

@ -191,7 +191,10 @@ namespace
explicit StreamFromWriteBuffer(std::unique_ptr<WriteBuffer> write_buffer_)
: write_buffer(std::move(write_buffer_)), start_offset(write_buffer->count()) {}
~StreamFromWriteBuffer() { write_buffer->finalize(); }
~StreamFromWriteBuffer()
{
write_buffer->finalize();
}
static int closeFileFunc(void *, void * stream)
{

View File

@ -30,15 +30,6 @@ void WriteBufferFromFileDecorator::finalizeImpl()
WriteBufferFromFileDecorator::~WriteBufferFromFileDecorator()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
/// It is not a mistake that swap is called here
/// Swap has been called at constructor, it should be called at destructor
/// In oreder to provide valid buffer for impl's d-tor call

View File

@ -109,8 +109,8 @@ void WriteBufferFromS3::nextImpl()
if (is_prefinalized)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot write to prefinalized buffer for S3, the file could have been created with PutObjectRequest");
ErrorCodes::LOGICAL_ERROR,
"Cannot write to prefinalized buffer for S3, the file could have been created with PutObjectRequest");
/// Make sense to call waitIfAny before adding new async task to check if there is an exception
/// The faster the exception is propagated the lesser time is spent for cancellation
@ -238,7 +238,13 @@ WriteBufferFromS3::~WriteBufferFromS3()
// That destructor could be call with finalized=false in case of exceptions
if (!finalized)
{
LOG_ERROR(log, "WriteBufferFromS3 is not finalized in destructor. It could be if an exception occurs. File is not written to S3. {}.", getLogDetails());
LOG_INFO(log,
"WriteBufferFromS3 is not finalized in destructor. "
"It could be if an exception occurs. File is not written to S3. "
"{}. "
"Stack trace: {}",
getLogDetails(),
StackTrace().toString());
}
task_tracker->safeWaitAll();

View File

@ -153,15 +153,8 @@ void WriteBufferFromS3::TaskTracker::waitTilInflightShrink()
for (auto & it : finished_futures)
{
SCOPE_EXIT({
/// According to basic exception safety TaskTracker has to be destroyed after exception
/// If it would be true than this SCOPE_EXIT is superfluous
/// However WriteBufferWithFinalizeCallback, WriteBufferFromFileDecorator do call finalize in d-tor
/// TaskTracker has to cope this until the issue with finalizing in d-tor is addressed in #50274
futures.erase(it);
});
it->get();
futures.erase(it);
}
finished_futures.clear();

View File

@ -49,6 +49,8 @@ private:
/// waitTilInflightShrink waits til the number of in-flight tasks beyond the limit `max_tasks_inflight`.
void waitTilInflightShrink() TSA_NO_THREAD_SAFETY_ANALYSIS;
void collectFinishedFutures(bool propagate_exceptions) TSA_REQUIRES(mutex);
const bool is_async;
ThreadPoolCallbackRunner<void> scheduler;
const size_t max_tasks_inflight;

View File

@ -166,6 +166,7 @@ UInt32 GinIndexStore::getNextSegmentIDRange(const String & file_name, size_t n)
/// Write segment ID 1
writeVarUInt(1, *ostr);
ostr->sync();
ostr->finalize();
}
/// Read id in file
@ -188,6 +189,7 @@ UInt32 GinIndexStore::getNextSegmentIDRange(const String & file_name, size_t n)
writeVarUInt(result + n, *ostr);
ostr->sync();
ostr->finalize();
}
return result;
}
@ -317,8 +319,13 @@ void GinIndexStore::writeSegment()
current_segment.segment_id = getNextSegmentID();
metadata_file_stream->sync();
metadata_file_stream->finalize();
dict_file_stream->sync();
dict_file_stream->finalize();
postings_file_stream->sync();
postings_file_stream->finalize();
}
GinIndexStoreDeserializer::GinIndexStoreDeserializer(const GinIndexStorePtr & store_)

View File

@ -119,22 +119,12 @@ void MergedBlockOutputStream::Finalizer::Impl::finish()
part->getDataPartStorage().removeFile(file_name);
}
MergedBlockOutputStream::Finalizer::~Finalizer()
{
try
{
finish();
}
catch (...)
{
tryLogCurrentException("MergedBlockOutputStream");
}
}
MergedBlockOutputStream::Finalizer::Finalizer(Finalizer &&) noexcept = default;
MergedBlockOutputStream::Finalizer & MergedBlockOutputStream::Finalizer::operator=(Finalizer &&) noexcept = default;
MergedBlockOutputStream::Finalizer::Finalizer(std::unique_ptr<Impl> impl_) : impl(std::move(impl_)) {}
MergedBlockOutputStream::Finalizer::~Finalizer() = default;
void MergedBlockOutputStream::finalizePart(
const MergeTreeMutableDataPartPtr & new_part,
bool sync,

View File

@ -44,9 +44,10 @@ public:
std::unique_ptr<Impl> impl;
explicit Finalizer(std::unique_ptr<Impl> impl_);
~Finalizer();
Finalizer(Finalizer &&) noexcept;
Finalizer & operator=(Finalizer &&) noexcept;
~Finalizer();
void finish();
};

View File

@ -341,7 +341,10 @@ private:
void finalize()
{
compressed.next();
compressed.finalize();
plain->next();
plain->finalize();
}
};

View File

@ -33,6 +33,18 @@
<request_timeout_ms>20000</request_timeout_ms>
<s3_max_inflight_parts_for_one_file>1</s3_max_inflight_parts_for_one_file>
</broken_s3>
<broken_s3_always_multi_part>
<type>s3</type>
<endpoint>http://resolver:8083/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<skip_access_check>true</skip_access_check>
<retry_attempts>0</retry_attempts>
<connect_timeout_ms>20000</connect_timeout_ms>
<request_timeout_ms>20000</request_timeout_ms>
<s3_max_single_part_upload_size>0</s3_max_single_part_upload_size>
<s3_max_inflight_parts_for_one_file>1</s3_max_inflight_parts_for_one_file>
</broken_s3_always_multi_part>
<hdd>
<type>local</type>
<path>/</path>
@ -128,6 +140,13 @@
</main>
</volumes>
</broken_s3>
<broken_s3_always_multi_part>
<volumes>
<main>
<disk>broken_s3_always_multi_part</disk>
</main>
</volumes>
</broken_s3_always_multi_part>
</policies>
</storage_configuration>

View File

@ -930,8 +930,9 @@ def test_merge_canceled_by_drop(cluster, node_name):
)
@pytest.mark.parametrize("storage_policy", ["broken_s3_always_multi_part", "broken_s3"])
@pytest.mark.parametrize("node_name", ["node"])
def test_merge_canceled_by_s3_errors(cluster, node_name):
def test_merge_canceled_by_s3_errors(cluster, node_name, storage_policy):
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS test_merge_canceled_by_s3_errors NO DELAY")
node.query(
@ -939,7 +940,7 @@ def test_merge_canceled_by_s3_errors(cluster, node_name):
" (key UInt32, value String)"
" Engine=MergeTree() "
" ORDER BY value "
" SETTINGS storage_policy='broken_s3'"
f" SETTINGS storage_policy='{storage_policy}'"
)
node.query("SYSTEM STOP MERGES test_merge_canceled_by_s3_errors")
node.query(
@ -1048,8 +1049,8 @@ def test_s3_engine_heavy_write_check_mem(cluster, node_name, in_flight_memory):
" AND type!='QueryStart'"
).split()
assert int(memory_usage) < 1.1 * memory
assert int(memory_usage) > 0.9 * memory
assert int(memory_usage) < 1.2 * memory
assert int(memory_usage) > 0.8 * memory
assert int(wait_inflight) > 10 * 1000 * 1000
@ -1096,7 +1097,7 @@ def test_s3_disk_heavy_write_check_mem(cluster, node_name):
" AND type!='QueryStart'"
)
assert int(result) < 1.1 * memory
assert int(result) > 0.9 * memory
assert int(result) < 1.2 * memory
assert int(result) > 0.8 * memory
check_no_objects_after_drop(cluster, node_name=node_name)