mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-14 02:12:21 +00:00
Addressed review comments
- Removed first_buffer, finalizeImpl and logic to different buffer size from ForkWriteBuffer
This commit is contained in:
parent
5a8d6ba0fc
commit
d7fc7a4290
@ -16,46 +16,23 @@ ForkWriteBuffer::ForkWriteBuffer(WriteBufferPtrs && sources_)
|
|||||||
{
|
{
|
||||||
if (sources.empty())
|
if (sources.empty())
|
||||||
{
|
{
|
||||||
first_buffer = nullptr;
|
|
||||||
throw Exception("ForkWriteBuffer required WriteBuffer is not provided", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
|
throw Exception("ForkWriteBuffer required WriteBuffer is not provided", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
|
||||||
}
|
}
|
||||||
else
|
set(sources.front()->buffer().begin(), sources.front()->buffer().size());
|
||||||
{
|
|
||||||
first_buffer = sources.begin()->get();
|
|
||||||
set(first_buffer->buffer().begin(), first_buffer->buffer().size());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ForkWriteBuffer::nextImpl()
|
void ForkWriteBuffer::nextImpl()
|
||||||
{
|
{
|
||||||
if (!first_buffer)
|
sources.front()->position() = position();
|
||||||
return;
|
|
||||||
|
|
||||||
first_buffer->position() = position();
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
for (const WriteBufferPtr & write_buffer : sources | std::views::reverse)
|
for (const WriteBufferPtr & write_buffer : sources | std::views::reverse)
|
||||||
{
|
{
|
||||||
if (write_buffer.get() != first_buffer)
|
if (write_buffer != sources.front())
|
||||||
{
|
{
|
||||||
//if buffer size if not enough to write, then split the message with buffer length
|
write_buffer->write(sources.front()->buffer().begin(), sources.front()->offset());
|
||||||
if (write_buffer->available() < first_buffer->offset())
|
|
||||||
{
|
|
||||||
size_t bytes_written = 0;
|
|
||||||
auto to_be_written = first_buffer->offset();
|
|
||||||
while (to_be_written != 0)
|
|
||||||
{
|
|
||||||
int bytes_to_copy = std::min(to_be_written, write_buffer->available());
|
|
||||||
write_buffer->write(first_buffer->buffer().begin()+bytes_written, bytes_to_copy);
|
|
||||||
write_buffer->next();
|
|
||||||
bytes_written += bytes_to_copy;
|
|
||||||
to_be_written -= bytes_to_copy;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
write_buffer->write(first_buffer->buffer().begin(), first_buffer->offset());
|
|
||||||
}
|
}
|
||||||
write_buffer->next();
|
write_buffer->next();
|
||||||
}
|
}
|
||||||
@ -68,12 +45,6 @@ void ForkWriteBuffer::nextImpl()
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ForkWriteBuffer::finalizeImpl()
|
|
||||||
{
|
|
||||||
next();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
ForkWriteBuffer::~ForkWriteBuffer()
|
ForkWriteBuffer::~ForkWriteBuffer()
|
||||||
{
|
{
|
||||||
finalize();
|
finalize();
|
||||||
|
@ -25,11 +25,9 @@ public:
|
|||||||
|
|
||||||
protected:
|
protected:
|
||||||
void nextImpl() override;
|
void nextImpl() override;
|
||||||
void finalizeImpl() override;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
WriteBufferPtrs sources;
|
WriteBufferPtrs sources;
|
||||||
WriteBuffer *first_buffer;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user