ClickHouse/src/IO/WriteBufferFromPocoSocketChunked.h

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

236 lines
8.2 KiB
C++
Raw Normal View History

2024-05-14 15:37:20 +00:00
#pragma once
#include "base/defines.h"
2024-05-14 15:37:20 +00:00
#include <Common/logger_useful.h>
#include <IO/WriteBufferFromPocoSocket.h>
2024-05-14 18:09:11 +00:00
#include <IO/NetUtils.h>
2024-05-14 15:37:20 +00:00
2024-06-14 15:56:14 +00:00
namespace
{
template <typename T>
const T & setValue(T * typed_ptr, std::type_identity_t<T> val)
{
memcpy(typed_ptr, &val, sizeof(T));
return *typed_ptr;
}
}
2024-05-14 15:37:20 +00:00
namespace DB
{
class WriteBufferFromPocoSocketChunked: public WriteBufferFromPocoSocket
{
public:
explicit WriteBufferFromPocoSocketChunked(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) : WriteBufferFromPocoSocket(socket_, buf_size), log(getLogger("Protocol")) {}
explicit WriteBufferFromPocoSocketChunked(Poco::Net::Socket & socket_, const ProfileEvents::Event & write_event_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) : WriteBufferFromPocoSocket(socket_, write_event_, buf_size), log(getLogger("Protocol")) {}
void enableChunked()
{
chunked = true;
/// Initialize next chunk
chunk_size_ptr = reinterpret_cast<decltype(chunk_size_ptr)>(pos);
pos += std::min(available(), sizeof(*chunk_size_ptr));
}
void finishChunk()
2024-05-14 15:37:20 +00:00
{
if (!chunked)
return;
if (pos <= reinterpret_cast<Position>(chunk_size_ptr) + sizeof(*chunk_size_ptr))
{
if (chunk_size_ptr == last_finish_chunk) // prevent duplicate finish chunk
return;
2024-06-07 12:25:46 +00:00
/// If current chunk is empty it means we are finishing a chunk previously sent by next(),
/// we want to convert current chunk header into end-of-chunk marker and initialize next chunk.
2024-06-11 16:47:05 +00:00
/// We don't need to worry about if it's the end of the buffer because next() always sends the whole buffer
/// so it should be a beginning of the buffer.
chassert(reinterpret_cast<Position>(chunk_size_ptr) == working_buffer.begin());
2024-06-14 15:56:14 +00:00
setValue(chunk_size_ptr, 0);
/// Initialize next chunk
chunk_size_ptr = reinterpret_cast<decltype(chunk_size_ptr)>(pos);
pos += std::min(available(), sizeof(*chunk_size_ptr));
last_finish_chunk = chunk_size_ptr;
return;
}
2024-05-14 15:37:20 +00:00
2024-06-11 16:47:05 +00:00
/// Previously finished chunk wasn't sent yet
if (last_finish_chunk == chunk_size_ptr)
{
chunk_started = false;
LOG_TEST(log, "{} -> {} Chunk send ended.", ourAddress().toString(), peerAddress().toString());
}
/// Fill up current chunk size
2024-06-14 15:56:14 +00:00
setValue(chunk_size_ptr, toLittleEndian(static_cast<UInt32>(pos - reinterpret_cast<Position>(chunk_size_ptr) - sizeof(*chunk_size_ptr))));
if (!chunk_started)
LOG_TEST(log, "{} -> {} Chunk send started. Message {}, size {}",
ourAddress().toString(), peerAddress().toString(),
static_cast<unsigned int>(*(reinterpret_cast<char *>(chunk_size_ptr) + sizeof(*chunk_size_ptr))),
*chunk_size_ptr);
else
2024-06-11 16:47:05 +00:00
{
chunk_started = false;
2024-06-11 16:47:05 +00:00
LOG_TEST(log, "{} -> {} Chunk send continued. Size {}", ourAddress().toString(), peerAddress().toString(), *chunk_size_ptr);
}
LOG_TEST(log, "{} -> {} Chunk send ended.", ourAddress().toString(), peerAddress().toString());
2024-05-14 15:37:20 +00:00
if (available() < sizeof(*chunk_size_ptr))
{
finishing = available();
pos += available();
chunk_size_ptr = reinterpret_cast<decltype(chunk_size_ptr)>(pos);
return;
}
/// Buffer end-of-chunk
*reinterpret_cast<decltype(chunk_size_ptr)>(pos) = 0;
pos += sizeof(*chunk_size_ptr);
/// Initialize next chunk
chunk_size_ptr = reinterpret_cast<decltype(chunk_size_ptr)>(pos);
pos += std::min(available(), sizeof(*chunk_size_ptr));
last_finish_chunk = chunk_size_ptr;
2024-05-14 15:37:20 +00:00
}
2024-06-11 16:47:05 +00:00
~WriteBufferFromPocoSocketChunked() override
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
2024-05-14 15:37:20 +00:00
protected:
void nextImpl() override
{
if (!chunked)
2024-06-09 19:31:20 +00:00
{
WriteBufferFromPocoSocket::nextImpl();
return;
}
2024-05-14 15:37:20 +00:00
/// next() after finishChunk ar the end of the buffer
if (finishing < sizeof(*chunk_size_ptr))
{
pos -= finishing;
/// Send current chunk
WriteBufferFromPocoSocket::nextImpl();
/// Send end-of-chunk directly
UInt32 s = 0;
2024-05-14 15:37:20 +00:00
socketSendBytes(reinterpret_cast<const char *>(&s), sizeof(s));
finishing = sizeof(*chunk_size_ptr);
/// Initialize next chunk
chunk_size_ptr = reinterpret_cast<decltype(chunk_size_ptr)>(working_buffer.begin());
nextimpl_working_buffer_offset = sizeof(*chunk_size_ptr);
last_finish_chunk = chunk_size_ptr;
return;
}
/// Send end-of-chunk buffered by finishChunk
2024-06-07 18:23:37 +00:00
if (offset() == 2 * sizeof(*chunk_size_ptr) && last_finish_chunk == chunk_size_ptr)
{
pos -= sizeof(*chunk_size_ptr);
/// Send end-of-chunk
WriteBufferFromPocoSocket::nextImpl();
/// Initialize next chunk
chunk_size_ptr = reinterpret_cast<decltype(chunk_size_ptr)>(working_buffer.begin());
nextimpl_working_buffer_offset = sizeof(*chunk_size_ptr);
last_finish_chunk = chunk_size_ptr;
return;
}
/// Prevent sending empty chunk
if (offset() == sizeof(*chunk_size_ptr))
{
nextimpl_working_buffer_offset = sizeof(*chunk_size_ptr);
return;
2024-05-14 15:37:20 +00:00
}
/// Finish chunk at the end of the buffer
if (working_buffer.end() - reinterpret_cast<Position>(chunk_size_ptr) <= static_cast<std::ptrdiff_t>(sizeof(*chunk_size_ptr)))
{
pos = reinterpret_cast<Position>(chunk_size_ptr);
/// Send current chunk
WriteBufferFromPocoSocket::nextImpl();
/// Initialize next chunk
chunk_size_ptr = reinterpret_cast<decltype(chunk_size_ptr)>(working_buffer.begin());
nextimpl_working_buffer_offset = sizeof(*chunk_size_ptr);
2024-06-07 18:23:37 +00:00
last_finish_chunk = nullptr;
return;
}
if (pos - reinterpret_cast<Position>(chunk_size_ptr) == sizeof(*chunk_size_ptr)) // next() after finishChunk
pos -= sizeof(*chunk_size_ptr);
else // fill up current chunk size
{
2024-06-14 15:56:14 +00:00
setValue(chunk_size_ptr, toLittleEndian(static_cast<UInt32>(pos - reinterpret_cast<Position>(chunk_size_ptr) - sizeof(*chunk_size_ptr))));
if (!chunk_started)
{
chunk_started = true;
LOG_TEST(log, "{} -> {} Chunk send started. Message {}, size {}",
ourAddress().toString(), peerAddress().toString(),
static_cast<unsigned int>(*(reinterpret_cast<char *>(chunk_size_ptr) + sizeof(*chunk_size_ptr))),
*chunk_size_ptr);
}
else
LOG_TEST(log, "{} -> {} Chunk send continued. Size {}", ourAddress().toString(), peerAddress().toString(), *chunk_size_ptr);
}
/// Send current chunk
2024-05-14 15:37:20 +00:00
WriteBufferFromPocoSocket::nextImpl();
/// Initialize next chunk
chunk_size_ptr = reinterpret_cast<decltype(chunk_size_ptr)>(working_buffer.begin());
nextimpl_working_buffer_offset = sizeof(*chunk_size_ptr);
2024-06-07 18:23:37 +00:00
last_finish_chunk = nullptr;
2024-05-14 15:37:20 +00:00
}
2024-06-11 16:47:05 +00:00
void finalizeImpl() override
{
if (offset() == sizeof(*chunk_size_ptr))
pos -= sizeof(*chunk_size_ptr);
WriteBufferFromPocoSocket::finalizeImpl();
}
Poco::Net::SocketAddress peerAddress()
{
return peer_address;
}
Poco::Net::SocketAddress ourAddress()
{
return our_address;
}
2024-05-14 15:37:20 +00:00
private:
LoggerPtr log;
bool chunked = false;
UInt32 * last_finish_chunk = nullptr; // pointer to the last chunk header created by finishChunk
bool chunk_started = false; // chunk started flag
2024-06-07 02:06:26 +00:00
UInt32 * chunk_size_ptr = nullptr; // pointer to the chunk size holder in the buffer
size_t finishing = sizeof(*chunk_size_ptr); // indicates not enough buffer for end-of-chunk marker
2024-05-14 15:37:20 +00:00
};
}