fix CascadeWriteBuffer

This commit is contained in:
Sema Checherinda 2024-06-24 21:12:06 +02:00
parent 8f81dc49d3
commit 96a6462059
7 changed files with 37 additions and 12 deletions

View File

@ -222,7 +222,7 @@ TEST(MemoryWriteBuffer, WriteAndReread)
if (s > 1)
{
MemoryWriteBuffer buf(s - 1);
EXPECT_THROW(buf.write(data.data(), data.size()), MemoryWriteBuffer::CurrentBufferExhausted);
EXPECT_THROW(buf.write(data.data(), data.size()), WriteBuffer::CurrentBufferExhausted);
buf.finalize();
}
}

View File

@ -36,7 +36,7 @@ void CascadeWriteBuffer::nextImpl()
curr_buffer->position() = position();
curr_buffer->next();
}
catch (const MemoryWriteBuffer::CurrentBufferExhausted &)
catch (const WriteBuffer::CurrentBufferExhausted &)
{
if (curr_buffer_num < num_sources)
{
@ -83,6 +83,20 @@ void CascadeWriteBuffer::finalizeImpl()
}
}
void CascadeWriteBuffer::cancelImpl() noexcept
{
if (curr_buffer)
curr_buffer->position() = position();
for (auto & buf : prepared_sources)
{
if (buf)
{
buf->cancel();
}
}
}
WriteBuffer * CascadeWriteBuffer::setNextBuffer()
{
if (first_lazy_source_num <= curr_buffer_num && curr_buffer_num < num_sources)

View File

@ -16,7 +16,7 @@ namespace ErrorCodes
* (lazy_sources contains not pointers themself, but their delayed constructors)
*
* Firtly, CascadeWriteBuffer redirects data to first buffer of the sequence
* If current WriteBuffer cannot receive data anymore, it throws special exception MemoryWriteBuffer::CurrentBufferExhausted in nextImpl() body,
* If current WriteBuffer cannot receive data anymore, it throws special exception WriteBuffer::CurrentBufferExhausted in nextImpl() body,
* CascadeWriteBuffer prepare next buffer and continuously redirects data to it.
* If there are no buffers anymore CascadeWriteBuffer throws an exception.
*
@ -48,6 +48,7 @@ public:
private:
void finalizeImpl() override;
void cancelImpl() noexcept override;
WriteBuffer * setNextBuffer();

View File

@ -112,7 +112,7 @@ void MemoryWriteBuffer::addChunk()
if (0 == next_chunk_size)
{
set(position(), 0);
throw MemoryWriteBuffer::CurrentBufferExhausted();
throw WriteBuffer::CurrentBufferExhausted();
}
}

View File

@ -16,13 +16,6 @@ namespace DB
class MemoryWriteBuffer : public WriteBuffer, public IReadableWriteBuffer, boost::noncopyable, private Allocator<false>
{
public:
/// Special exception to throw when the current WriteBuffer cannot receive data
class CurrentBufferExhausted : public std::exception
{
public:
const char * what() const noexcept override { return "MemoryWriteBuffer limit is exhausted"; }
};
/// Use max_total_size_ = 0 for unlimited storage
explicit MemoryWriteBuffer(
size_t max_total_size_ = 0,

View File

@ -20,6 +20,7 @@ namespace ErrorCodes
}
/** A simple abstract class for buffered data writing (char sequences) somewhere.
* Unlike std::ostream, it provides access to the internal buffer,
* and also allows you to manually manage the position inside the buffer.
@ -29,6 +30,14 @@ namespace ErrorCodes
class WriteBuffer : public BufferBase
{
public:
/// Special exception to throw when the current WriteBuffer cannot receive data
/// It is used in MemoryWriteBuffer and CascadeWriteBuffer
class CurrentBufferExhausted : public std::exception
{
public:
const char * what() const noexcept override { return "WriteBuffer limit is exhausted"; }
};
using BufferBase::set;
using BufferBase::position;
void set(Position ptr, size_t size) { BufferBase::set(ptr, size, 0); }
@ -52,6 +61,13 @@ public:
{
nextImpl();
}
catch (const CurrentBufferExhausted &)
{
pos = working_buffer.begin();
bytes += bytes_in_buffer;
throw;
}
catch (...)
{
/** If the nextImpl() call was unsuccessful, move the cursor to the beginning,

View File

@ -63,7 +63,8 @@ public:
~WriteBufferFromVector() override
{
finalize();
if (!canceled)
finalize();
}
private: