mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 19:42:00 +00:00
add chunked wrapper to native protocol
This commit is contained in:
parent
0a01a92bf8
commit
05e823a1e9
@ -4,8 +4,6 @@
|
||||
#include <Core/Settings.h>
|
||||
#include <Compression/CompressedReadBuffer.h>
|
||||
#include <Compression/CompressedWriteBuffer.h>
|
||||
#include <IO/ReadBufferFromPocoSocket.h>
|
||||
#include <IO/WriteBufferFromPocoSocket.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/copyData.h>
|
||||
@ -191,10 +189,10 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
|
||||
, tcp_keep_alive_timeout_in_sec);
|
||||
}
|
||||
|
||||
in = std::make_shared<ReadBufferFromPocoSocket>(*socket);
|
||||
in = std::make_shared<ReadBufferFromPocoSocketChunked>(*socket);
|
||||
in->setAsyncCallback(async_callback);
|
||||
|
||||
out = std::make_shared<WriteBufferFromPocoSocket>(*socket);
|
||||
out = std::make_shared<WriteBufferFromPocoSocketChunked>(*socket);
|
||||
out->setAsyncCallback(async_callback);
|
||||
connected = true;
|
||||
setDescription();
|
||||
@ -205,6 +203,12 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
|
||||
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM)
|
||||
sendAddendum();
|
||||
|
||||
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS)
|
||||
{
|
||||
in->enableChunked();
|
||||
out->enableChunked();
|
||||
}
|
||||
|
||||
LOG_TRACE(log_wrapper.get(), "Connected to {} server version {}.{}.{}.",
|
||||
server_name, server_version_major, server_version_minor, server_version_patch);
|
||||
}
|
||||
@ -567,6 +571,7 @@ bool Connection::ping(const ConnectionTimeouts & timeouts)
|
||||
|
||||
UInt64 pong = 0;
|
||||
writeVarUInt(Protocol::Client::Ping, *out);
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
|
||||
if (in->eof())
|
||||
@ -611,6 +616,7 @@ TablesStatusResponse Connection::getTablesStatus(const ConnectionTimeouts & time
|
||||
|
||||
writeVarUInt(Protocol::Client::TablesStatusRequest, *out);
|
||||
request.write(*out, server_revision);
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
|
||||
UInt64 response_type = 0;
|
||||
@ -762,6 +768,8 @@ void Connection::sendQuery(
|
||||
block_profile_events_in.reset();
|
||||
block_out.reset();
|
||||
|
||||
out->finishPacket();
|
||||
|
||||
/// Send empty block which means end of data.
|
||||
if (!with_pending_data)
|
||||
{
|
||||
@ -778,6 +786,7 @@ void Connection::sendCancel()
|
||||
return;
|
||||
|
||||
writeVarUInt(Protocol::Client::Cancel, *out);
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
|
||||
@ -804,6 +813,8 @@ void Connection::sendData(const Block & block, const String & name, bool scalar)
|
||||
|
||||
block_out->write(block);
|
||||
maybe_compressed_out->next();
|
||||
if (!block)
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
|
||||
if (throttler)
|
||||
@ -814,6 +825,7 @@ void Connection::sendIgnoredPartUUIDs(const std::vector<UUID> & uuids)
|
||||
{
|
||||
writeVarUInt(Protocol::Client::IgnoredPartUUIDs, *out);
|
||||
writeVectorBinary(uuids, *out);
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
|
||||
@ -823,6 +835,7 @@ void Connection::sendReadTaskResponse(const String & response)
|
||||
writeVarUInt(Protocol::Client::ReadTaskResponse, *out);
|
||||
writeVarUInt(DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION, *out);
|
||||
writeStringBinary(response, *out);
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
|
||||
@ -831,6 +844,7 @@ void Connection::sendMergeTreeReadTaskResponse(const ParallelReadResponse & resp
|
||||
{
|
||||
writeVarUInt(Protocol::Client::MergeTreeReadTaskResponse, *out);
|
||||
response.serialize(*out);
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
|
||||
@ -848,6 +862,8 @@ void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String
|
||||
copyData(input, *out);
|
||||
else
|
||||
copyData(input, *out, size);
|
||||
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
|
||||
@ -876,6 +892,8 @@ void Connection::sendScalarsData(Scalars & data)
|
||||
sendData(elem.second, elem.first, true /* scalar */);
|
||||
}
|
||||
|
||||
out->finishPacket();
|
||||
|
||||
out_bytes = out->count() - out_bytes;
|
||||
maybe_compressed_out_bytes = maybe_compressed_out->count() - maybe_compressed_out_bytes;
|
||||
double elapsed = watch.elapsedSeconds();
|
||||
@ -1018,13 +1036,13 @@ std::optional<Poco::Net::SocketAddress> Connection::getResolvedAddress() const
|
||||
|
||||
bool Connection::poll(size_t timeout_microseconds)
|
||||
{
|
||||
return static_cast<ReadBufferFromPocoSocket &>(*in).poll(timeout_microseconds);
|
||||
return in->poll(timeout_microseconds);
|
||||
}
|
||||
|
||||
|
||||
bool Connection::hasReadPendingData() const
|
||||
{
|
||||
return last_input_packet_type.has_value() || static_cast<const ReadBufferFromPocoSocket &>(*in).hasPendingData();
|
||||
return last_input_packet_type.has_value() || in->hasPendingData();
|
||||
}
|
||||
|
||||
|
||||
|
@ -8,8 +8,8 @@
|
||||
#include <Core/Defines.h>
|
||||
|
||||
|
||||
#include <IO/ReadBufferFromPocoSocket.h>
|
||||
#include <IO/WriteBufferFromPocoSocket.h>
|
||||
#include <IO/ReadBufferFromPocoSocketChunked.h>
|
||||
#include <IO/WriteBufferFromPocoSocketChunked.h>
|
||||
|
||||
#include <Interpreters/TablesStatus.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
@ -207,8 +207,8 @@ private:
|
||||
String server_display_name;
|
||||
|
||||
std::unique_ptr<Poco::Net::StreamSocket> socket;
|
||||
std::shared_ptr<ReadBufferFromPocoSocket> in;
|
||||
std::shared_ptr<WriteBufferFromPocoSocket> out;
|
||||
std::shared_ptr<ReadBufferFromPocoSocketChunked> in;
|
||||
std::shared_ptr<WriteBufferFromPocoSocketChunked> out;
|
||||
std::optional<UInt64> last_input_packet_type;
|
||||
|
||||
String query_id;
|
||||
|
@ -79,6 +79,9 @@ static constexpr auto DBMS_MIN_REVISION_WITH_SSH_AUTHENTICATION = 54466;
|
||||
/// Send read-only flag for Replicated tables as well
|
||||
static constexpr auto DBMS_MIN_REVISION_WITH_TABLE_READ_ONLY_CHECK = 54467;
|
||||
|
||||
/// Packets size header
|
||||
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS = 54468;
|
||||
|
||||
/// Version of ClickHouse TCP protocol.
|
||||
///
|
||||
/// Should be incremented manually on protocol changes.
|
||||
@ -86,6 +89,6 @@ static constexpr auto DBMS_MIN_REVISION_WITH_TABLE_READ_ONLY_CHECK = 54467;
|
||||
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
|
||||
/// later is just a number for server version (one number instead of commit SHA)
|
||||
/// for simplicity (sometimes it may be more convenient in some use cases).
|
||||
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54467;
|
||||
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54468;
|
||||
|
||||
}
|
||||
|
@ -32,25 +32,13 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
bool ReadBufferFromPocoSocket::nextImpl()
|
||||
size_t ReadBufferFromPocoSocket::readSocket(Position begin, size_t size)
|
||||
{
|
||||
ssize_t bytes_read = 0;
|
||||
Stopwatch watch;
|
||||
|
||||
SCOPE_EXIT({
|
||||
/// NOTE: it is quite inaccurate on high loads since the thread could be replaced by another one
|
||||
ProfileEvents::increment(ProfileEvents::NetworkReceiveElapsedMicroseconds, watch.elapsedMicroseconds());
|
||||
ProfileEvents::increment(ProfileEvents::NetworkReceiveBytes, bytes_read);
|
||||
});
|
||||
|
||||
/// Add more details to exceptions.
|
||||
try
|
||||
{
|
||||
CurrentMetrics::Increment metric_increment(CurrentMetrics::NetworkReceive);
|
||||
|
||||
if (internal_buffer.size() > INT_MAX)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer overflow");
|
||||
|
||||
/// If async_callback is specified, set socket to non-blocking mode
|
||||
/// and try to read data from it, if socket is not ready for reading,
|
||||
/// run async_callback and try again later.
|
||||
@ -61,7 +49,7 @@ bool ReadBufferFromPocoSocket::nextImpl()
|
||||
socket.setBlocking(false);
|
||||
SCOPE_EXIT(socket.setBlocking(true));
|
||||
bool secure = socket.secure();
|
||||
bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), static_cast<int>(internal_buffer.size()));
|
||||
bytes_read = socket.impl()->receiveBytes(begin, static_cast<int>(size));
|
||||
|
||||
/// Check EAGAIN and ERR_SSL_WANT_READ/ERR_SSL_WANT_WRITE for secure socket (reading from secure socket can write too).
|
||||
while (bytes_read < 0 && (errno == EAGAIN || (secure && (checkSSLWantRead(bytes_read) || checkSSLWantWrite(bytes_read)))))
|
||||
@ -73,12 +61,12 @@ bool ReadBufferFromPocoSocket::nextImpl()
|
||||
async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), AsyncEventTimeoutType::RECEIVE, socket_description, AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR);
|
||||
|
||||
/// Try to read again.
|
||||
bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), static_cast<int>(internal_buffer.size()));
|
||||
bytes_read = socket.impl()->receiveBytes(begin, static_cast<int>(size));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), static_cast<int>(internal_buffer.size()));
|
||||
bytes_read = socket.impl()->receiveBytes(begin, static_cast<int>(size));
|
||||
}
|
||||
}
|
||||
catch (const Poco::Net::NetException & e)
|
||||
@ -99,6 +87,40 @@ bool ReadBufferFromPocoSocket::nextImpl()
|
||||
if (bytes_read < 0)
|
||||
throw NetException(ErrorCodes::CANNOT_READ_FROM_SOCKET, "Cannot read from socket (peer: {}, local: {})", peer_address.toString(), socket.address().toString());
|
||||
|
||||
return bytes_read;
|
||||
}
|
||||
|
||||
bool ReadBufferFromPocoSocket::readSocketExact(Position begin, size_t size)
|
||||
{
|
||||
for (size_t bytes_left = size; bytes_left > 0;)
|
||||
{
|
||||
size_t ret = readSocket(begin + size - bytes_left, bytes_left);
|
||||
if (ret == 0)
|
||||
return false;
|
||||
bytes_left -= ret;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ReadBufferFromPocoSocket::nextImpl()
|
||||
{
|
||||
ssize_t bytes_read = 0;
|
||||
Stopwatch watch;
|
||||
|
||||
SCOPE_EXIT({
|
||||
/// NOTE: it is quite inaccurate on high loads since the thread could be replaced by another one
|
||||
ProfileEvents::increment(ProfileEvents::NetworkReceiveElapsedMicroseconds, watch.elapsedMicroseconds());
|
||||
ProfileEvents::increment(ProfileEvents::NetworkReceiveBytes, bytes_read);
|
||||
});
|
||||
|
||||
CurrentMetrics::Increment metric_increment(CurrentMetrics::NetworkReceive);
|
||||
|
||||
if (internal_buffer.size() > INT_MAX)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer overflow");
|
||||
|
||||
bytes_read = readSocket(internal_buffer.begin(), internal_buffer.size());
|
||||
|
||||
if (read_event != ProfileEvents::end())
|
||||
ProfileEvents::increment(read_event, bytes_read);
|
||||
|
||||
|
@ -32,6 +32,9 @@ public:
|
||||
|
||||
void setAsyncCallback(AsyncCallback async_callback_) { async_callback = std::move(async_callback_); }
|
||||
|
||||
size_t readSocket(Position begin, size_t size);
|
||||
bool readSocketExact(Position begin, size_t size);
|
||||
|
||||
private:
|
||||
AsyncCallback async_callback;
|
||||
std::string socket_description;
|
||||
|
114
src/IO/ReadBufferFromPocoSocketChunked.cpp
Normal file
114
src/IO/ReadBufferFromPocoSocketChunked.cpp
Normal file
@ -0,0 +1,114 @@
|
||||
#include <IO/ReadBufferFromPocoSocketChunked.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
|
||||
namespace DB::ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
ReadBufferFromPocoSocketChunked::ReadBufferFromPocoSocketChunked(Poco::Net::Socket & socket_, size_t buf_size)
|
||||
: ReadBufferFromPocoSocketChunked(socket_, ProfileEvents::end(), buf_size)
|
||||
{}
|
||||
|
||||
ReadBufferFromPocoSocketChunked::ReadBufferFromPocoSocketChunked(Poco::Net::Socket & socket_, const ProfileEvents::Event & read_event_, size_t buf_size)
|
||||
: ReadBuffer(nullptr, 0), log(getLogger("Protocol")), buffer_socket(socket_, read_event_, buf_size)
|
||||
{
|
||||
chassert(buf_size <= std::numeric_limits<decltype(chunk_left)>::max());
|
||||
|
||||
working_buffer = buffer_socket.buffer();
|
||||
pos = buffer_socket.position();
|
||||
}
|
||||
|
||||
void ReadBufferFromPocoSocketChunked::enableChunked()
|
||||
{
|
||||
chunked = true;
|
||||
}
|
||||
|
||||
bool ReadBufferFromPocoSocketChunked::poll(size_t timeout_microseconds)
|
||||
{
|
||||
buffer_socket.position() = pos + skip_next;
|
||||
return buffer_socket.poll(timeout_microseconds);
|
||||
}
|
||||
|
||||
void ReadBufferFromPocoSocketChunked::setAsyncCallback(AsyncCallback async_callback_)
|
||||
{
|
||||
buffer_socket.setAsyncCallback(async_callback_);
|
||||
}
|
||||
|
||||
bool ReadBufferFromPocoSocketChunked::startChunk()
|
||||
{
|
||||
do {
|
||||
if (buffer_socket.read(reinterpret_cast<char *>(&chunk_left), sizeof(chunk_left)) == 0)
|
||||
return false;
|
||||
if (chunk_left == 0)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Native protocol: empty chunk received");
|
||||
} while (chunk_left == 0);
|
||||
|
||||
return nextChunk();
|
||||
}
|
||||
|
||||
bool ReadBufferFromPocoSocketChunked::nextChunk()
|
||||
{
|
||||
static bool start = false;
|
||||
|
||||
if (chunk_left == 0) {
|
||||
start = true;
|
||||
return startChunk();
|
||||
}
|
||||
|
||||
if (buffer_socket.available() == 0)
|
||||
if (!buffer_socket.next())
|
||||
return false;
|
||||
if (start)
|
||||
LOG_TEST(log, "Packet recieve started. Message {}, size {}", static_cast<unsigned int>(*buffer_socket.position()), chunk_left);
|
||||
else
|
||||
LOG_TEST(log, "Packet recieve continued. Size {}", chunk_left);
|
||||
|
||||
start = false;
|
||||
|
||||
nextimpl_working_buffer_offset = buffer_socket.offset();
|
||||
|
||||
if (buffer_socket.available() < chunk_left)
|
||||
{
|
||||
working_buffer.resize(buffer_socket.offset() + buffer_socket.available());
|
||||
chunk_left -= buffer_socket.available();
|
||||
return true;
|
||||
}
|
||||
|
||||
working_buffer.resize(buffer_socket.offset() + chunk_left);
|
||||
skip_next = std::min(static_cast<size_t>(4), buffer_socket.available() - chunk_left);
|
||||
|
||||
if (skip_next > 0)
|
||||
std::memcpy(&chunk_left, buffer_socket.position() + chunk_left, skip_next);
|
||||
if (4 > skip_next)
|
||||
if (!buffer_socket.readSocketExact(reinterpret_cast<Position>(&chunk_left) + skip_next, 4 - skip_next))
|
||||
return false;
|
||||
|
||||
if (chunk_left == 0)
|
||||
LOG_TEST(log, "Packet recieve ended.");
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool ReadBufferFromPocoSocketChunked::nextImpl()
|
||||
{
|
||||
buffer_socket.position() = pos + skip_next;
|
||||
skip_next = 0;
|
||||
|
||||
if (chunked)
|
||||
return nextChunk();
|
||||
|
||||
if (!buffer_socket.next())
|
||||
return false;
|
||||
|
||||
pos = buffer_socket.position();
|
||||
working_buffer.resize(offset() + buffer_socket.available());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
32
src/IO/ReadBufferFromPocoSocketChunked.h
Normal file
32
src/IO/ReadBufferFromPocoSocketChunked.h
Normal file
@ -0,0 +1,32 @@
|
||||
#pragma once
|
||||
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <IO/ReadBufferFromPocoSocket.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ReadBufferFromPocoSocketChunked: public ReadBuffer
|
||||
{
|
||||
public:
|
||||
explicit ReadBufferFromPocoSocketChunked(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
|
||||
explicit ReadBufferFromPocoSocketChunked(Poco::Net::Socket & socket_, const ProfileEvents::Event & read_event_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
|
||||
|
||||
void enableChunked();
|
||||
bool poll(size_t timeout_microseconds);
|
||||
void setAsyncCallback(AsyncCallback async_callback_);
|
||||
|
||||
protected:
|
||||
bool startChunk();
|
||||
bool nextChunk();
|
||||
bool nextImpl() override;
|
||||
|
||||
private:
|
||||
LoggerPtr log;
|
||||
ReadBufferFromPocoSocket buffer_socket;
|
||||
bool chunked = false;
|
||||
UInt32 chunk_left = 0; // chunk left to read from socket
|
||||
UInt8 skip_next = 0; // skip already processed bytes in buffer_socket
|
||||
};
|
||||
|
||||
}
|
56
src/IO/WriteBufferFromPocoSocketChunked.h
Normal file
56
src/IO/WriteBufferFromPocoSocketChunked.h
Normal file
@ -0,0 +1,56 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
#include <IO/WriteBufferFromPocoSocket.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class WriteBufferFromPocoSocketChunked: public WriteBufferFromPocoSocket
|
||||
{
|
||||
public:
|
||||
explicit WriteBufferFromPocoSocketChunked(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) : WriteBufferFromPocoSocket(socket_, buf_size), log(getLogger("Protocol")) {}
|
||||
explicit WriteBufferFromPocoSocketChunked(Poco::Net::Socket & socket_, const ProfileEvents::Event & write_event_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) : WriteBufferFromPocoSocket(socket_, write_event_, buf_size), log(getLogger("Protocol")) {}
|
||||
|
||||
void enableChunked() { chunked = true; }
|
||||
void finishPacket()
|
||||
{
|
||||
if (!chunked)
|
||||
return;
|
||||
|
||||
next();
|
||||
|
||||
if (finished)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Native protocol: attempt to send empty chunk");
|
||||
|
||||
LOG_TEST(log, "Packet send ended.");
|
||||
finished = true;
|
||||
|
||||
UInt32 s = 0;
|
||||
socketSendBytes(reinterpret_cast<const char *>(&s), sizeof(s));
|
||||
}
|
||||
protected:
|
||||
void nextImpl() override
|
||||
{
|
||||
if (chunked)
|
||||
{
|
||||
UInt32 s = static_cast<UInt32>(offset());
|
||||
if (finished)
|
||||
LOG_TEST(log, "Packet send started. Message {}, size {}", static_cast<unsigned int>(*buffer().begin()), s);
|
||||
else
|
||||
LOG_TEST(log, "Packet send continued. Size {}", s);
|
||||
|
||||
finished = false;
|
||||
socketSendBytes(reinterpret_cast<const char *>(&s), sizeof(s));
|
||||
}
|
||||
|
||||
WriteBufferFromPocoSocket::nextImpl();
|
||||
}
|
||||
private:
|
||||
LoggerPtr log;
|
||||
bool chunked = false;
|
||||
bool finished = true;
|
||||
};
|
||||
|
||||
}
|
@ -19,8 +19,6 @@
|
||||
#include <IO/Progress.h>
|
||||
#include <Compression/CompressedReadBuffer.h>
|
||||
#include <Compression/CompressedWriteBuffer.h>
|
||||
#include <IO/ReadBufferFromPocoSocket.h>
|
||||
#include <IO/WriteBufferFromPocoSocket.h>
|
||||
#include <IO/LimitReadBuffer.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
@ -253,8 +251,8 @@ void TCPHandler::runImpl()
|
||||
socket().setSendTimeout(send_timeout);
|
||||
socket().setNoDelay(true);
|
||||
|
||||
in = std::make_shared<ReadBufferFromPocoSocket>(socket(), read_event);
|
||||
out = std::make_shared<WriteBufferFromPocoSocket>(socket(), write_event);
|
||||
in = std::make_shared<ReadBufferFromPocoSocketChunked>(socket(), read_event);
|
||||
out = std::make_shared<WriteBufferFromPocoSocketChunked>(socket(), write_event);
|
||||
|
||||
/// Support for PROXY protocol
|
||||
if (parse_proxy_protocol && !receiveProxyHeader())
|
||||
@ -289,6 +287,12 @@ void TCPHandler::runImpl()
|
||||
if (!default_database.empty())
|
||||
session->sessionContext()->setCurrentDatabase(default_database);
|
||||
}
|
||||
|
||||
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS)
|
||||
{
|
||||
in->enableChunked();
|
||||
out->enableChunked();
|
||||
}
|
||||
}
|
||||
catch (const Exception & e) /// Typical for an incorrect username, password, or address.
|
||||
{
|
||||
@ -320,7 +324,7 @@ void TCPHandler::runImpl()
|
||||
{
|
||||
Stopwatch idle_time;
|
||||
UInt64 timeout_ms = std::min(poll_interval, idle_connection_timeout) * 1000000;
|
||||
while (tcp_server.isOpen() && !server.isCancelled() && !static_cast<ReadBufferFromPocoSocket &>(*in).poll(timeout_ms))
|
||||
while (tcp_server.isOpen() && !server.isCancelled() && !in->poll(timeout_ms))
|
||||
{
|
||||
if (idle_time.elapsedSeconds() > idle_connection_timeout)
|
||||
{
|
||||
@ -788,7 +792,7 @@ bool TCPHandler::readDataNext()
|
||||
/// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down.
|
||||
while (true)
|
||||
{
|
||||
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(timeout_us))
|
||||
if (in->poll(timeout_us))
|
||||
{
|
||||
/// If client disconnected.
|
||||
if (in->eof())
|
||||
@ -1154,6 +1158,8 @@ void TCPHandler::processTablesStatusRequest()
|
||||
}
|
||||
|
||||
response.write(*out, client_tcp_protocol_version);
|
||||
|
||||
out->finishPacket();
|
||||
}
|
||||
|
||||
void TCPHandler::receiveUnexpectedTablesStatusRequest()
|
||||
@ -1174,6 +1180,8 @@ void TCPHandler::sendPartUUIDs()
|
||||
|
||||
writeVarUInt(Protocol::Server::PartUUIDs, *out);
|
||||
writeVectorBinary(uuids, *out);
|
||||
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
}
|
||||
@ -1182,6 +1190,8 @@ void TCPHandler::sendPartUUIDs()
|
||||
void TCPHandler::sendReadTaskRequestAssumeLocked()
|
||||
{
|
||||
writeVarUInt(Protocol::Server::ReadTaskRequest, *out);
|
||||
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
|
||||
@ -1190,6 +1200,8 @@ void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRanges
|
||||
{
|
||||
writeVarUInt(Protocol::Server::MergeTreeAllRangesAnnouncement, *out);
|
||||
announcement.serialize(*out);
|
||||
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
|
||||
@ -1198,6 +1210,8 @@ void TCPHandler::sendMergeTreeReadTaskRequestAssumeLocked(ParallelReadRequest re
|
||||
{
|
||||
writeVarUInt(Protocol::Server::MergeTreeReadTaskRequest, *out);
|
||||
request.serialize(*out);
|
||||
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
|
||||
@ -1206,6 +1220,8 @@ void TCPHandler::sendProfileInfo(const ProfileInfo & info)
|
||||
{
|
||||
writeVarUInt(Protocol::Server::ProfileInfo, *out);
|
||||
info.write(*out);
|
||||
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
|
||||
@ -1221,6 +1237,8 @@ void TCPHandler::sendTotals(const Block & totals)
|
||||
|
||||
state.block_out->write(totals);
|
||||
state.maybe_compressed_out->next();
|
||||
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
}
|
||||
@ -1237,6 +1255,8 @@ void TCPHandler::sendExtremes(const Block & extremes)
|
||||
|
||||
state.block_out->write(extremes);
|
||||
state.maybe_compressed_out->next();
|
||||
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
}
|
||||
@ -1254,6 +1274,8 @@ void TCPHandler::sendProfileEvents()
|
||||
writeStringBinary("", *out);
|
||||
|
||||
state.profile_events_block_out->write(block);
|
||||
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
|
||||
auto elapsed_milliseconds = stopwatch.elapsedMilliseconds();
|
||||
@ -1291,6 +1313,8 @@ void TCPHandler::sendTimezone()
|
||||
LOG_DEBUG(log, "TCPHandler::sendTimezone(): {}", tz);
|
||||
writeVarUInt(Protocol::Server::TimezoneUpdate, *out);
|
||||
writeStringBinary(tz, *out);
|
||||
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
|
||||
@ -1636,6 +1660,7 @@ bool TCPHandler::receivePacket()
|
||||
|
||||
case Protocol::Client::Ping:
|
||||
writeVarUInt(Protocol::Server::Pong, *out);
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
return false;
|
||||
|
||||
@ -2152,7 +2177,7 @@ QueryState::CancellationStatus TCPHandler::getQueryCancellationStatus()
|
||||
after_check_cancelled.restart();
|
||||
|
||||
/// During request execution the only packet that can come from the client is stopping the query.
|
||||
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(0))
|
||||
if (in->poll(0))
|
||||
{
|
||||
if (in->eof())
|
||||
{
|
||||
@ -2216,6 +2241,8 @@ void TCPHandler::sendData(const Block & block)
|
||||
|
||||
state.block_out->write(block);
|
||||
state.maybe_compressed_out->next();
|
||||
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
catch (...)
|
||||
@ -2251,6 +2278,8 @@ void TCPHandler::sendLogData(const Block & block)
|
||||
writeStringBinary("", *out);
|
||||
|
||||
state.logs_block_out->write(block);
|
||||
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
|
||||
@ -2262,6 +2291,7 @@ void TCPHandler::sendTableColumns(const ColumnsDescription & columns)
|
||||
writeStringBinary("", *out);
|
||||
writeStringBinary(columns.toString(), *out);
|
||||
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
|
||||
@ -2271,6 +2301,8 @@ void TCPHandler::sendException(const Exception & e, bool with_stack_trace)
|
||||
|
||||
writeVarUInt(Protocol::Server::Exception, *out);
|
||||
writeException(e, *out, with_stack_trace);
|
||||
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
|
||||
@ -2281,6 +2313,8 @@ void TCPHandler::sendEndOfStream()
|
||||
state.io.setAllDataSent();
|
||||
|
||||
writeVarUInt(Protocol::Server::EndOfStream, *out);
|
||||
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
|
||||
@ -2299,6 +2333,8 @@ void TCPHandler::sendProgress()
|
||||
increment.elapsed_ns = current_elapsed_ns - state.prev_elapsed_ns;
|
||||
state.prev_elapsed_ns = current_elapsed_ns;
|
||||
increment.write(*out, client_tcp_protocol_version);
|
||||
|
||||
out->finishPacket();
|
||||
out->next();
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,8 @@
|
||||
#include <Interpreters/ProfileEventsExt.h>
|
||||
#include <Formats/NativeReader.h>
|
||||
#include <Formats/NativeWriter.h>
|
||||
#include <IO/ReadBufferFromPocoSocketChunked.h>
|
||||
#include <IO/WriteBufferFromPocoSocketChunked.h>
|
||||
|
||||
#include "IServer.h"
|
||||
#include "Interpreters/AsynchronousInsertQueue.h"
|
||||
@ -204,8 +206,8 @@ private:
|
||||
ClientInfo::QueryKind query_kind = ClientInfo::QueryKind::NO_QUERY;
|
||||
|
||||
/// Streams for reading/writing from/to client connection socket.
|
||||
std::shared_ptr<ReadBuffer> in;
|
||||
std::shared_ptr<WriteBuffer> out;
|
||||
std::shared_ptr<ReadBufferFromPocoSocketChunked> in;
|
||||
std::shared_ptr<WriteBufferFromPocoSocketChunked> out;
|
||||
|
||||
ProfileEvents::Event read_event;
|
||||
ProfileEvents::Event write_event;
|
||||
|
Loading…
Reference in New Issue
Block a user