diff --git a/src/Disks/tests/gtest_cascade_and_memory_write_buffer.cpp b/src/Disks/tests/gtest_cascade_and_memory_write_buffer.cpp index 23b783173c8..f5d34f7f70c 100644 --- a/src/Disks/tests/gtest_cascade_and_memory_write_buffer.cpp +++ b/src/Disks/tests/gtest_cascade_and_memory_write_buffer.cpp @@ -222,7 +222,7 @@ TEST(MemoryWriteBuffer, WriteAndReread) if (s > 1) { MemoryWriteBuffer buf(s - 1); - EXPECT_THROW(buf.write(data.data(), data.size()), MemoryWriteBuffer::CurrentBufferExhausted); + EXPECT_THROW(buf.write(data.data(), data.size()), WriteBuffer::CurrentBufferExhausted); buf.finalize(); } } diff --git a/src/IO/CascadeWriteBuffer.cpp b/src/IO/CascadeWriteBuffer.cpp index 91a42e77fdb..4542ffc88f7 100644 --- a/src/IO/CascadeWriteBuffer.cpp +++ b/src/IO/CascadeWriteBuffer.cpp @@ -36,7 +36,7 @@ void CascadeWriteBuffer::nextImpl() curr_buffer->position() = position(); curr_buffer->next(); } - catch (const MemoryWriteBuffer::CurrentBufferExhausted &) + catch (const WriteBuffer::CurrentBufferExhausted &) { if (curr_buffer_num < num_sources) { @@ -83,6 +83,20 @@ void CascadeWriteBuffer::finalizeImpl() } } +void CascadeWriteBuffer::cancelImpl() noexcept +{ + if (curr_buffer) + curr_buffer->position() = position(); + + for (auto & buf : prepared_sources) + { + if (buf) + { + buf->cancel(); + } + } +} + WriteBuffer * CascadeWriteBuffer::setNextBuffer() { if (first_lazy_source_num <= curr_buffer_num && curr_buffer_num < num_sources) diff --git a/src/IO/CascadeWriteBuffer.h b/src/IO/CascadeWriteBuffer.h index a003d11bd8a..7a8b11c6a87 100644 --- a/src/IO/CascadeWriteBuffer.h +++ b/src/IO/CascadeWriteBuffer.h @@ -16,7 +16,7 @@ namespace ErrorCodes * (lazy_sources contains not pointers themself, but their delayed constructors) * * Firtly, CascadeWriteBuffer redirects data to first buffer of the sequence - * If current WriteBuffer cannot receive data anymore, it throws special exception MemoryWriteBuffer::CurrentBufferExhausted in nextImpl() body, + * If current WriteBuffer cannot receive data anymore, it throws special exception WriteBuffer::CurrentBufferExhausted in nextImpl() body, * CascadeWriteBuffer prepare next buffer and continuously redirects data to it. * If there are no buffers anymore CascadeWriteBuffer throws an exception. * @@ -48,6 +48,7 @@ public: private: void finalizeImpl() override; + void cancelImpl() noexcept override; WriteBuffer * setNextBuffer(); diff --git a/src/IO/MemoryReadWriteBuffer.cpp b/src/IO/MemoryReadWriteBuffer.cpp index 1f4d350f083..c79ee1d6f58 100644 --- a/src/IO/MemoryReadWriteBuffer.cpp +++ b/src/IO/MemoryReadWriteBuffer.cpp @@ -112,7 +112,7 @@ void MemoryWriteBuffer::addChunk() if (0 == next_chunk_size) { set(position(), 0); - throw MemoryWriteBuffer::CurrentBufferExhausted(); + throw WriteBuffer::CurrentBufferExhausted(); } } diff --git a/src/IO/MemoryReadWriteBuffer.h b/src/IO/MemoryReadWriteBuffer.h index d7ca992aa44..feb1499d12f 100644 --- a/src/IO/MemoryReadWriteBuffer.h +++ b/src/IO/MemoryReadWriteBuffer.h @@ -16,13 +16,6 @@ namespace DB class MemoryWriteBuffer : public WriteBuffer, public IReadableWriteBuffer, boost::noncopyable, private Allocator { public: - /// Special exception to throw when the current WriteBuffer cannot receive data - class CurrentBufferExhausted : public std::exception - { - public: - const char * what() const noexcept override { return "MemoryWriteBuffer limit is exhausted"; } - }; - /// Use max_total_size_ = 0 for unlimited storage explicit MemoryWriteBuffer( size_t max_total_size_ = 0, diff --git a/src/IO/WriteBuffer.h b/src/IO/WriteBuffer.h index 8b429891567..5b9381334a8 100644 --- a/src/IO/WriteBuffer.h +++ b/src/IO/WriteBuffer.h @@ -20,6 +20,7 @@ namespace ErrorCodes } + /** A simple abstract class for buffered data writing (char sequences) somewhere. * Unlike std::ostream, it provides access to the internal buffer, * and also allows you to manually manage the position inside the buffer. @@ -29,6 +30,14 @@ namespace ErrorCodes class WriteBuffer : public BufferBase { public: + /// Special exception to throw when the current WriteBuffer cannot receive data + /// It is used in MemoryWriteBuffer and CascadeWriteBuffer + class CurrentBufferExhausted : public std::exception + { + public: + const char * what() const noexcept override { return "WriteBuffer limit is exhausted"; } + }; + using BufferBase::set; using BufferBase::position; void set(Position ptr, size_t size) { BufferBase::set(ptr, size, 0); } @@ -52,6 +61,13 @@ public: { nextImpl(); } + catch (const CurrentBufferExhausted &) + { + pos = working_buffer.begin(); + bytes += bytes_in_buffer; + + throw; + } catch (...) { /** If the nextImpl() call was unsuccessful, move the cursor to the beginning, diff --git a/src/IO/WriteBufferFromVector.h b/src/IO/WriteBufferFromVector.h index 1ea32af2968..17a329d401d 100644 --- a/src/IO/WriteBufferFromVector.h +++ b/src/IO/WriteBufferFromVector.h @@ -63,7 +63,8 @@ public: ~WriteBufferFromVector() override { - finalize(); + if (!canceled) + finalize(); } private: