fixing HTTPHandler

This commit is contained in:
Sema Checherinda 2024-06-25 23:04:47 +02:00
parent 4b5b149167
commit 072eb14493
9 changed files with 51 additions and 39 deletions

View File

@ -222,7 +222,7 @@ TEST(MemoryWriteBuffer, WriteAndReread)
if (s > 1)
{
MemoryWriteBuffer buf(s - 1);
EXPECT_THROW(buf.write(data.data(), data.size()), WriteBuffer::CurrentBufferExhausted);
EXPECT_THROW(buf.write(data.data(), data.size()), MemoryWriteBuffer::CurrentBufferExhausted);
buf.finalize();
}
}

View File

@ -36,7 +36,7 @@ void CascadeWriteBuffer::nextImpl()
curr_buffer->position() = position();
curr_buffer->next();
}
catch (const WriteBuffer::CurrentBufferExhausted &)
catch (const MemoryWriteBuffer::CurrentBufferExhausted &)
{
if (curr_buffer_num < num_sources)
{

View File

@ -112,7 +112,7 @@ void MemoryWriteBuffer::addChunk()
if (0 == next_chunk_size)
{
set(position(), 0);
throw WriteBuffer::CurrentBufferExhausted();
throw MemoryWriteBuffer::CurrentBufferExhausted();
}
}

View File

@ -16,6 +16,13 @@ namespace DB
class MemoryWriteBuffer : public WriteBuffer, public IReadableWriteBuffer, boost::noncopyable, private Allocator<false>
{
public:
/// Special exception to throw when the current MemoryWriteBuffer cannot receive data
class CurrentBufferExhausted : public std::exception
{
public:
const char * what() const noexcept override { return "WriteBuffer limit is exhausted"; }
};
/// Use max_total_size_ = 0 for unlimited storage
explicit MemoryWriteBuffer(
size_t max_total_size_ = 0,

View File

@ -30,4 +30,19 @@ WriteBuffer::~WriteBuffer()
}
}
void WriteBuffer::cancel() noexcept
{
if (canceled || finalized)
return;
LoggerPtr log = getLogger("WriteBuffer");
LOG_INFO(
log,
"Cancel has been called. Stack trace: {}",
StackTrace().toString());
LockMemoryExceptionInThread lock(VariableContext::Global);
cancelImpl();
canceled = true;
}
}

View File

@ -29,14 +29,6 @@ namespace ErrorCodes
class WriteBuffer : public BufferBase
{
public:
/// Special exception to throw when the current WriteBuffer cannot receive data
/// It is used in MemoryWriteBuffer and CascadeWriteBuffer
class CurrentBufferExhausted : public std::exception
{
public:
const char * what() const noexcept override { return "WriteBuffer limit is exhausted"; }
};
using BufferBase::set;
using BufferBase::position;
void set(Position ptr, size_t size) { BufferBase::set(ptr, size, 0); }
@ -60,13 +52,6 @@ public:
{
nextImpl();
}
catch (const CurrentBufferExhausted &)
{
pos = working_buffer.begin();
bytes += bytes_in_buffer;
throw;
}
catch (...)
{
/** If the nextImpl() call was unsuccessful, move the cursor to the beginning,
@ -75,8 +60,6 @@ public:
pos = working_buffer.begin();
bytes += bytes_in_buffer;
cancel();
throw;
}
@ -157,15 +140,7 @@ public:
}
}
void cancel() noexcept
{
if (canceled || finalized)
return;
LockMemoryExceptionInThread lock(VariableContext::Global);
cancelImpl();
canceled = true;
}
void cancel() noexcept;
/// Wait for data to be reliably written. Mainly, call fsync for fd.
/// May be called after finalize() if needed.

View File

@ -162,7 +162,8 @@ WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
{
try
{
finalize();
if (!canceled)
finalize();
}
catch (...)
{

View File

@ -1027,14 +1027,7 @@ catch (...)
{
tryLogCurrentException(log, "Cannot send exception to client");
try
{
used_output.finalize();
}
catch (...)
{
tryLogCurrentException(log, "Cannot flush data to client (after sending exception)");
}
used_output.cancel();
}
void HTTPHandler::formatExceptionForClient(int exception_code, HTTPServerRequest & request, HTTPServerResponse & response, Output & used_output)
@ -1172,7 +1165,7 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
/// Check if exception was thrown in used_output.finalize().
/// In this case used_output can be in invalid state and we
/// cannot write in it anymore. So, just log this exception.
if (used_output.isFinalized())
if (used_output.isFinalized() || used_output.isCanceled())
{
if (thread_trace_context)
thread_trace_context->root_span.addAttribute("clickhouse.exception", "Cannot flush data to client");
@ -1191,6 +1184,8 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
if (thread_trace_context)
thread_trace_context->root_span.addAttribute(status);
return;
}
used_output.finalize();

View File

@ -78,6 +78,7 @@ private:
WriteBuffer * out_maybe_delayed_and_compressed = nullptr;
bool finalized = false;
bool canceled = false;
bool exception_is_written = false;
std::function<void(WriteBuffer &, const String &)> exception_writer;
@ -99,6 +100,24 @@ private:
out->finalize();
}
void cancel()
{
if (canceled)
return;
canceled = true;
if (out_compressed_holder)
out_compressed_holder->cancel();
if (out)
out->cancel();
}
bool isCanceled() const
{
return canceled;
}
bool isFinalized() const
{
return finalized;