do abort/cancel/deletion merge/mutate task with exception context

This commit is contained in:
Sema Checherinda 2023-06-16 16:34:50 +02:00
parent cf31fdb4d5
commit 47484faace
9 changed files with 117 additions and 89 deletions

View File

@ -85,7 +85,6 @@ public:
uint128 getHash()
{
next();
chassert(finalized);
return IHashingBuffer<WriteBuffer>::getHash();
}
};

33
src/IO/WriteBuffer.cpp Normal file
View File

@ -0,0 +1,33 @@
#include "WriteBuffer.h"
#include <Common/logger_useful.h>
namespace DB
{
/// Calling finalize() in the destructor of derived classes is a bad practice.
/// This causes objects to be left on the remote FS when a write operation is rolled back.
/// Do call finalize() explicitly, before this call you have no guarantee that the file has been written
WriteBuffer::~WriteBuffer()
{
// That destructor could be call with finalized=false in case of exceptions
if (count() > 0 && !finalized)
{
/// It is totally OK to destroy instance without finalization when an exception occurs
/// However it is suspicious to destroy instance without finalization at the green path
if (!std::uncaught_exceptions() && std::current_exception() == nullptr)
{
Poco::Logger * log = &Poco::Logger::get("WriteBuffer");
LOG_ERROR(
log,
"WriteBuffer is not finalized when destructor is called. "
"No exceptions in flight are detected. "
"The file might not be written at all or might be truncated. "
"Stack trace: {}",
StackTrace().toString());
chassert(false && "WriteBuffer is not finalized in destructor.");
}
}
}
}

View File

@ -8,7 +8,6 @@
#include <Common/Exception.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <Common/logger_useful.h>
#include <IO/BufferBase.h>
@ -64,28 +63,7 @@ public:
}
/// Calling finalize() in the destructor of derived classes is a bad practice.
/// This causes objects to be left on the remote FS when a write operation is rolled back.
/// Do call finalize() explicitly, before this call you have no guarantee that the file has been written
virtual ~WriteBuffer()
{
// That destructor could be call with finalized=false in case of exceptions
if (count() > 0 && !finalized)
{
/// It is totally OK to destroy instance without finalization when an exception occurs
/// However it is suspicious to destroy instance without finalization at the green path
if (!std::uncaught_exceptions())
{
Poco::Logger * log = &Poco::Logger::get("WriteBuffer");
LOG_ERROR(
log,
"WriteBuffer is not finalized in destructor. "
"No exceptions in flight are detected. "
"The file might not be written at all or might be truncated. "
"Stack trace: {}",
StackTrace().toString());
}
}
}
virtual ~WriteBuffer();
inline void nextIfAtEnd()
{
@ -114,7 +92,6 @@ public:
}
}
inline void write(char x)
{
if (finalized)
@ -140,7 +117,6 @@ public:
if (finalized)
return;
/// finalize() is often called from destructors.
LockMemoryExceptionInThread lock(VariableContext::Global);
try
{

View File

@ -13,7 +13,6 @@ class WriteBufferFromFileBase : public BufferWithOwnMemory<WriteBuffer>
{
public:
WriteBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment);
~WriteBufferFromFileBase() override = default;
void sync() override = 0;
virtual std::string getFileName() const = 0;

View File

@ -27,7 +27,6 @@ void MergePlainMergeTreeTask::onCompleted()
task_result_callback(delay);
}
bool MergePlainMergeTreeTask::executeStep()
{
/// All metrics will be saved in the thread_group, including all scheduled tasks.

View File

@ -177,47 +177,50 @@ void MergeTreeBackgroundExecutor<Queue>::routine(TaskRuntimeDataPtr item)
active.erase(std::remove(active.begin(), active.end(), item), active.end());
};
bool need_execute_again = false;
try
auto print_task_exception = [&]()
{
ALLOW_ALLOCATIONS_IN_SCOPE;
need_execute_again = item->task->executeStep();
}
catch (const Exception & e)
{
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
if (e.code() == ErrorCodes::ABORTED) /// Cancelled merging parts is not an error - log as info.
LOG_INFO(log, getExceptionMessageAndPattern(e, /* with_stacktrace */ false));
else
tryLogCurrentException(__PRETTY_FUNCTION__);
});
}
catch (...)
{
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
tryLogCurrentException(__PRETTY_FUNCTION__);
});
}
if (need_execute_again)
{
std::lock_guard guard(mutex);
erase_from_active();
if (item->is_currently_deleting)
std::exception_ptr ex = std::current_exception();
try
{
std::rethrow_exception(ex);
}
catch (const Exception & e)
{
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
item->task.reset();
/// Cancelled merging parts is not an error - log as info.
if (e.code() == ErrorCodes::ABORTED)
LOG_INFO(log, getExceptionMessageAndPattern(e, /* with_stacktrace */ false));
else
tryLogCurrentException(__PRETTY_FUNCTION__);
});
item->is_done.set();
item = nullptr;
return;
}
catch (...)
{
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
tryLogCurrentException(__PRETTY_FUNCTION__);
});
}
};
auto on_task_done = [&]() TSA_REQUIRES(mutex)
{
/// We have to call reset() under a lock, otherwise a race is possible.
/// Imagine, that task is finally completed (last execution returned false),
/// we removed the task from both queues, but still have pointer.
/// The thread that shutdowns storage will scan queues in order to find some tasks to wait for, but will find nothing.
/// So, the destructor of a task and the destructor of a storage will be executed concurrently.
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
item->task.reset();
});
item->is_done.set();
item = nullptr;
};
auto on_task_restart = [&]() TSA_REQUIRES(mutex)
{
/// After the `guard` destruction `item` has to be in moved from state
/// Not to own the object it points to.
/// Otherwise the destruction of the task won't be ordered with the destruction of the
@ -226,8 +229,9 @@ void MergeTreeBackgroundExecutor<Queue>::routine(TaskRuntimeDataPtr item)
has_tasks.notify_one();
item = nullptr;
return;
}
};
auto release_task = [&]()
{
std::lock_guard guard(mutex);
erase_from_active();
@ -241,39 +245,56 @@ void MergeTreeBackgroundExecutor<Queue>::routine(TaskRuntimeDataPtr item)
/// But it is rather safe, because we have try...catch block here, and another one in ThreadPool.
item->task->onCompleted();
}
catch (const Exception & e)
{
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
if (e.code() == ErrorCodes::ABORTED) /// Cancelled merging parts is not an error - log as info.
LOG_INFO(log, getExceptionMessageAndPattern(e, /* with_stacktrace */ false));
else
tryLogCurrentException(__PRETTY_FUNCTION__);
});
}
catch (...)
{
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
tryLogCurrentException(__PRETTY_FUNCTION__);
});
print_task_exception();
}
return on_task_done();
};
/// We have to call reset() under a lock, otherwise a race is possible.
/// Imagine, that task is finally completed (last execution returned false),
/// we removed the task from both queues, but still have pointer.
/// The thread that shutdowns storage will scan queues in order to find some tasks to wait for, but will find nothing.
/// So, the destructor of a task and the destructor of a storage will be executed concurrently.
bool need_execute_again = false;
try
{
ALLOW_ALLOCATIONS_IN_SCOPE;
need_execute_again = item->task->executeStep();
}
catch (...)
{
print_task_exception();
/// Release the task with exception context.
/// An exception context is needed to proper delete write buffers without finalization
release_task();
return;
}
if (!need_execute_again)
{
release_task();
return;
}
{
std::lock_guard guard(mutex);
erase_from_active();
if (item->is_currently_deleting)
{
NOEXCEPT_SCOPE({
try
{
ALLOW_ALLOCATIONS_IN_SCOPE;
item->task.reset();
});
/// An exception context is needed to proper delete write buffers without finalization
throw Exception(ErrorCodes::ABORTED, "Storage is about to be deleted. Done task as if it was aborted.");
}
catch (...)
{
print_task_exception();
return on_task_done();
}
}
item->is_done.set();
item = nullptr;
on_task_restart();
}
}

View File

@ -347,7 +347,10 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat
}
if (compress_primary_key)
{
index_source_hashing_stream->finalize();
index_compressor_stream->finalize();
}
index_file_hashing_stream->finalize();

View File

@ -48,7 +48,6 @@ public:
Finalizer & operator=(Finalizer &&) noexcept;
~Finalizer();
void finish();
};

View File

@ -24,7 +24,6 @@ void MutatePlainMergeTreeTask::onCompleted()
task_result_callback(delay);
}
void MutatePlainMergeTreeTask::prepare()
{
future_part = merge_mutate_entry->future_part;