Fix bugs in Poco, use true non-blocking IO

This commit is contained in:
avogar 2023-05-26 23:11:57 +00:00
parent 67c8c5c561
commit ef9bae50b9
12 changed files with 154 additions and 65 deletions

View File

@ -274,7 +274,9 @@ void SocketImpl::shutdown()
int SocketImpl::sendBytes(const void* buffer, int length, int flags) int SocketImpl::sendBytes(const void* buffer, int length, int flags)
{ {
if (_isBrokenTimeout) bool blocking = _blocking && (flags & MSG_DONTWAIT) == 0;
if (_isBrokenTimeout && blocking)
{ {
if (_sndTimeout.totalMicroseconds() != 0) if (_sndTimeout.totalMicroseconds() != 0)
{ {
@ -289,11 +291,13 @@ int SocketImpl::sendBytes(const void* buffer, int length, int flags)
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
rc = ::send(_sockfd, reinterpret_cast<const char*>(buffer), length, flags); rc = ::send(_sockfd, reinterpret_cast<const char*>(buffer), length, flags);
} }
while (_blocking && rc < 0 && lastError() == POCO_EINTR); while (blocking && rc < 0 && lastError() == POCO_EINTR);
if (rc < 0) if (rc < 0)
{ {
int err = lastError(); int err = lastError();
if (err == POCO_EAGAIN || err == POCO_ETIMEDOUT) if ((err == POCO_EAGAIN || err == POCO_EWOULDBLOCK) && !blocking)
;
else if (err == POCO_EAGAIN || err == POCO_ETIMEDOUT)
throw TimeoutException(); throw TimeoutException();
else else
error(err); error(err);

View File

@ -183,6 +183,16 @@ namespace Net
/// Returns true iff a reused session was negotiated during /// Returns true iff a reused session was negotiated during
/// the handshake. /// the handshake.
virtual void setBlocking(bool flag);
/// Sets the socket in blocking mode if flag is true,
/// disables blocking mode if flag is false.
virtual bool getBlocking() const;
/// Returns the blocking mode of the socket.
/// This method will only work if the blocking modes of
/// the socket are changed via the setBlocking method!
protected: protected:
void acceptSSL(); void acceptSSL();
/// Assume per-object mutex is locked. /// Assume per-object mutex is locked.

View File

@ -201,6 +201,16 @@ namespace Net
/// Returns true iff a reused session was negotiated during /// Returns true iff a reused session was negotiated during
/// the handshake. /// the handshake.
virtual void setBlocking(bool flag);
/// Sets the socket in blocking mode if flag is true,
/// disables blocking mode if flag is false.
virtual bool getBlocking() const;
/// Returns the blocking mode of the socket.
/// This method will only work if the blocking modes of
/// the socket are changed via the setBlocking method!
protected: protected:
void acceptSSL(); void acceptSSL();
/// Performs a SSL server-side handshake. /// Performs a SSL server-side handshake.

View File

@ -629,5 +629,15 @@ bool SecureSocketImpl::sessionWasReused()
return false; return false;
} }
void SecureSocketImpl::setBlocking(bool flag)
{
_pSocket->setBlocking(flag);
}
bool SecureSocketImpl::getBlocking() const
{
return _pSocket->getBlocking();
}
} } // namespace Poco::Net } } // namespace Poco::Net

View File

@ -237,5 +237,15 @@ int SecureStreamSocketImpl::completeHandshake()
return _impl.completeHandshake(); return _impl.completeHandshake();
} }
bool SecureStreamSocketImpl::getBlocking() const
{
return _impl.getBlocking();
}
void SecureStreamSocketImpl::setBlocking(bool flag)
{
_impl.setBlocking(flag);
}
} } // namespace Poco::Net } } // namespace Poco::Net

View File

@ -0,0 +1,8 @@
//
// Created by Павел Круглов on 27/05/2023.
//
#ifndef CLICKHOUSE_CHECKSSLERROR_H
#define CLICKHOUSE_CHECKSSLERROR_H
#endif //CLICKHOUSE_CHECKSSLERROR_H

View File

@ -0,0 +1,29 @@
#include <Common/checkSSLReturnCode.h>
#include "config.h"
#if USE_SSL
#include <Poco/Net/SecureStreamSocket.h>
#endif
namespace DB
{
bool checkSSLWantRead(ssize_t res)
{
#if USE_SSL
return res == Poco::Net::SecureStreamSocket::ERR_SSL_WANT_READ;
#else
return false;
#endif
}
bool checkSSLWantWrite(ssize_t res)
{
#if USE_SSL
return res == Poco::Net::SecureStreamSocket::ERR_SSL_WANT_WRITE;
#else
return false;
#endif
}
}

View File

@ -0,0 +1,12 @@
#pragma once
namespace DB
{
/// Check if ret is ERR_SSL_WANT_READ.
bool checkSSLWantRead(ssize_t ret);
/// CHeck if ret is ERR_SSL_WANT_WRITE.
bool checkSSLWantWrite(ssize_t ret);
}

View File

@ -9,12 +9,7 @@
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/AsyncTaskExecutor.h> #include <Common/AsyncTaskExecutor.h>
#include <Common/checkSSLReturnCode.h>
#include "config.h"
#if USE_SSL
#include <Poco/Net/SecureStreamSocket.h>
#endif
namespace ProfileEvents namespace ProfileEvents
{ {
@ -27,7 +22,6 @@ namespace CurrentMetrics
extern const Metric NetworkReceive; extern const Metric NetworkReceive;
} }
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
@ -38,14 +32,13 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
bool ReadBufferFromPocoSocket::nextImpl() bool ReadBufferFromPocoSocket::nextImpl()
{ {
ssize_t bytes_read = 0; ssize_t bytes_read = 0;
Stopwatch watch; Stopwatch watch;
SCOPE_EXIT({ SCOPE_EXIT({
// / NOTE: it is quite inaccurate on high loads since the thread could be replaced by another one /// NOTE: it is quite inaccurate on high loads since the thread could be replaced by another one
ProfileEvents::increment(ProfileEvents::NetworkReceiveElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::NetworkReceiveElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::NetworkReceiveBytes, bytes_read); ProfileEvents::increment(ProfileEvents::NetworkReceiveBytes, bytes_read);
}); });
@ -58,17 +51,35 @@ bool ReadBufferFromPocoSocket::nextImpl()
if (internal_buffer.size() > INT_MAX) if (internal_buffer.size() > INT_MAX)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer overflow"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer overflow");
bytes_read = readFromSocket(); /// If async_callback is specified, set socket to non-blocking mode
/// and try to read data from it, if socket is not ready for reading,
#if USE_SSL /// run async_callback and try again later.
/// In case of non-blocking connect for secure socket receiveBytes can return ERR_SSL_WANT_READ, /// It is expected that file descriptor may be polled externally.
/// in this case we should call receiveBytes again when socket is ready. /// Note that send timeout is not checked here. External code should check it while polling.
if (socket.secure()) if (async_callback)
{ {
while (bytes_read == Poco::Net::SecureStreamSocket::ERR_SSL_WANT_READ) socket.setBlocking(false);
bytes_read = readFromSocket(); SCOPE_EXIT(socket.setBlocking(true));
bool secure = socket.secure();
bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), static_cast<int>(internal_buffer.size()));
/// Check EAGAIN and ERR_SSL_WANT_READ/ERR_SSL_WANT_WRITE for secure socket (reading from secure socket can write too).
while (bytes_read < 0 && (errno == EAGAIN || (secure && (checkSSLWantRead(bytes_read) || checkSSLWantWrite(bytes_read)))))
{
/// In case of ERR_SSL_WANT_WRITE we should wait for socket to be ready for writing, otherwise - for reading.
if (secure && checkSSLWantWrite(bytes_read))
async_callback(socket.impl()->sockfd(), socket.getSendTimeout(), AsyncEventTimeoutType::SEND, socket_description, AsyncTaskExecutor::Event::WRITE | AsyncTaskExecutor::Event::ERROR);
else
async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), AsyncEventTimeoutType::RECEIVE, socket_description, AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR);
/// Try to read again.
bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), static_cast<int>(internal_buffer.size()));
}
}
else
{
bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), static_cast<int>(internal_buffer.size()));
} }
#endif
} }
catch (const Poco::Net::NetException & e) catch (const Poco::Net::NetException & e)
{ {
@ -96,17 +107,6 @@ bool ReadBufferFromPocoSocket::nextImpl()
return true; return true;
} }
ssize_t ReadBufferFromPocoSocket::readFromSocket()
{
/// If async_callback is specified, and read will block, run async_callback and try again later.
/// It is expected that file descriptor may be polled externally.
/// Note that receive timeout is not checked here. External code should check it while polling.
while (async_callback && !socket.poll(0, Poco::Net::Socket::SELECT_READ | Poco::Net::Socket::SELECT_ERROR))
async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), AsyncEventTimeoutType::RECEIVE, socket_description, AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR);
return socket.impl()->receiveBytes(internal_buffer.begin(), static_cast<int>(internal_buffer.size()));
}
ReadBufferFromPocoSocket::ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size) ReadBufferFromPocoSocket::ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size)
: BufferWithOwnMemory<ReadBuffer>(buf_size) : BufferWithOwnMemory<ReadBuffer>(buf_size)
, socket(socket_) , socket(socket_)

View File

@ -30,8 +30,6 @@ public:
void setAsyncCallback(AsyncCallback async_callback_) { async_callback = std::move(async_callback_); } void setAsyncCallback(AsyncCallback async_callback_) { async_callback = std::move(async_callback_); }
private: private:
ssize_t readFromSocket();
AsyncCallback async_callback; AsyncCallback async_callback;
std::string socket_description; std::string socket_description;
}; };

View File

@ -10,12 +10,7 @@
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/AsyncTaskExecutor.h> #include <Common/AsyncTaskExecutor.h>
#include <Common/checkSSLReturnCode.h>
#include "config.h"
#if USE_SSL
#include <Poco/Net/SecureStreamSocket.h>
#endif
namespace ProfileEvents namespace ProfileEvents
{ {
@ -28,7 +23,6 @@ namespace CurrentMetrics
extern const Metric NetworkSend; extern const Metric NetworkSend;
} }
namespace DB namespace DB
{ {
@ -40,7 +34,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
void WriteBufferFromPocoSocket::nextImpl() void WriteBufferFromPocoSocket::nextImpl()
{ {
if (!offset()) if (!offset())
@ -67,17 +60,36 @@ void WriteBufferFromPocoSocket::nextImpl()
if (size > INT_MAX) if (size > INT_MAX)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer overflow"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer overflow");
res = writeToSocket(pos, size); /// If async_callback is specified, set socket to non-blocking mode
/// and try to write data to it, if socket is not ready for writing,
#if USE_SSL /// run async_callback and try again later.
/// In case of non-blocking connect for secure socket sendBytes can return ERR_SSL_WANT_WRITE, /// It is expected that file descriptor may be polled externally.
/// in this case we should call sendBytes again when socket is ready. /// Note that send timeout is not checked here. External code should check it while polling.
if (socket.secure()) if (async_callback)
{ {
while (res == Poco::Net::SecureStreamSocket::ERR_SSL_WANT_WRITE) socket.setBlocking(false);
res = writeToSocket(pos, size); /// Set socket to blocking mode at the end.
SCOPE_EXIT(socket.setBlocking(true));
bool secure = socket.secure();
res = socket.impl()->sendBytes(pos, static_cast<int>(size));
/// Check EAGAIN and ERR_SSL_WANT_WRITE/ERR_SSL_WANT_READ for secure socket (writing to secure socket can read too).
while (res < 0 && (errno == EAGAIN || (secure && (checkSSLWantRead(res) || checkSSLWantWrite(res)))))
{
/// In case of ERR_SSL_WANT_READ we should wait for socket to be ready for reading, otherwise - for writing.
if (secure && checkSSLWantRead(res))
async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), AsyncEventTimeoutType::RECEIVE, socket_description, AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR);
else
async_callback(socket.impl()->sockfd(), socket.getSendTimeout(), AsyncEventTimeoutType::SEND, socket_description, AsyncTaskExecutor::Event::WRITE | AsyncTaskExecutor::Event::ERROR);
/// Try to write again.
res = socket.impl()->sendBytes(pos, static_cast<int>(size));
}
}
else
{
res = socket.impl()->sendBytes(pos, static_cast<int>(size));
} }
#endif
} }
catch (const Poco::Net::NetException & e) catch (const Poco::Net::NetException & e)
{ {
@ -104,18 +116,6 @@ void WriteBufferFromPocoSocket::nextImpl()
} }
} }
ssize_t WriteBufferFromPocoSocket::writeToSocket(char * data, size_t size)
{
/// If async_callback is specified, and write will block, run async_callback and try again later.
/// It is expected that file descriptor may be polled externally.
/// Note that send timeout is not checked here. External code should check it while polling.
while (async_callback && !socket.poll(0, Poco::Net::Socket::SELECT_WRITE | Poco::Net::Socket::SELECT_ERROR))
async_callback(socket.impl()->sockfd(), socket.getSendTimeout(), AsyncEventTimeoutType::SEND, socket_description, AsyncTaskExecutor::Event::WRITE | AsyncTaskExecutor::Event::ERROR);
return socket.impl()->sendBytes(data, static_cast<int>(size));
}
WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size) WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size) : BufferWithOwnMemory<WriteBuffer>(buf_size)
, socket(socket_) , socket(socket_)

View File

@ -35,8 +35,6 @@ protected:
Poco::Net::SocketAddress our_address; Poco::Net::SocketAddress our_address;
private: private:
ssize_t writeToSocket(char * data, size_t size);
AsyncCallback async_callback; AsyncCallback async_callback;
std::string socket_description; std::string socket_description;
}; };