diff --git a/base/poco/Net/src/SocketImpl.cpp b/base/poco/Net/src/SocketImpl.cpp index 2aba413b322..484b8cfeec3 100644 --- a/base/poco/Net/src/SocketImpl.cpp +++ b/base/poco/Net/src/SocketImpl.cpp @@ -274,7 +274,9 @@ void SocketImpl::shutdown() int SocketImpl::sendBytes(const void* buffer, int length, int flags) { - if (_isBrokenTimeout) + bool blocking = _blocking && (flags & MSG_DONTWAIT) == 0; + + if (_isBrokenTimeout && blocking) { if (_sndTimeout.totalMicroseconds() != 0) { @@ -289,11 +291,13 @@ int SocketImpl::sendBytes(const void* buffer, int length, int flags) if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); rc = ::send(_sockfd, reinterpret_cast(buffer), length, flags); } - while (_blocking && rc < 0 && lastError() == POCO_EINTR); + while (blocking && rc < 0 && lastError() == POCO_EINTR); if (rc < 0) { int err = lastError(); - if (err == POCO_EAGAIN || err == POCO_ETIMEDOUT) + if ((err == POCO_EAGAIN || err == POCO_EWOULDBLOCK) && !blocking) + ; + else if (err == POCO_EAGAIN || err == POCO_ETIMEDOUT) throw TimeoutException(); else error(err); diff --git a/base/poco/NetSSL_OpenSSL/include/Poco/Net/SecureSocketImpl.h b/base/poco/NetSSL_OpenSSL/include/Poco/Net/SecureSocketImpl.h index 56c550decfe..49c12b6b45f 100644 --- a/base/poco/NetSSL_OpenSSL/include/Poco/Net/SecureSocketImpl.h +++ b/base/poco/NetSSL_OpenSSL/include/Poco/Net/SecureSocketImpl.h @@ -183,6 +183,16 @@ namespace Net /// Returns true iff a reused session was negotiated during /// the handshake. + virtual void setBlocking(bool flag); + /// Sets the socket in blocking mode if flag is true, + /// disables blocking mode if flag is false. + + virtual bool getBlocking() const; + /// Returns the blocking mode of the socket. + /// This method will only work if the blocking modes of + /// the socket are changed via the setBlocking method! + + protected: void acceptSSL(); /// Assume per-object mutex is locked. diff --git a/base/poco/NetSSL_OpenSSL/include/Poco/Net/SecureStreamSocketImpl.h b/base/poco/NetSSL_OpenSSL/include/Poco/Net/SecureStreamSocketImpl.h index b41043769fe..99e2130d673 100644 --- a/base/poco/NetSSL_OpenSSL/include/Poco/Net/SecureStreamSocketImpl.h +++ b/base/poco/NetSSL_OpenSSL/include/Poco/Net/SecureStreamSocketImpl.h @@ -201,6 +201,16 @@ namespace Net /// Returns true iff a reused session was negotiated during /// the handshake. + virtual void setBlocking(bool flag); + /// Sets the socket in blocking mode if flag is true, + /// disables blocking mode if flag is false. + + virtual bool getBlocking() const; + /// Returns the blocking mode of the socket. + /// This method will only work if the blocking modes of + /// the socket are changed via the setBlocking method! + + protected: void acceptSSL(); /// Performs a SSL server-side handshake. diff --git a/base/poco/NetSSL_OpenSSL/src/SecureSocketImpl.cpp b/base/poco/NetSSL_OpenSSL/src/SecureSocketImpl.cpp index 9631c7a401a..efe25f65909 100644 --- a/base/poco/NetSSL_OpenSSL/src/SecureSocketImpl.cpp +++ b/base/poco/NetSSL_OpenSSL/src/SecureSocketImpl.cpp @@ -629,5 +629,15 @@ bool SecureSocketImpl::sessionWasReused() return false; } +void SecureSocketImpl::setBlocking(bool flag) +{ + _pSocket->setBlocking(flag); +} + +bool SecureSocketImpl::getBlocking() const +{ + return _pSocket->getBlocking(); +} + } } // namespace Poco::Net diff --git a/base/poco/NetSSL_OpenSSL/src/SecureStreamSocketImpl.cpp b/base/poco/NetSSL_OpenSSL/src/SecureStreamSocketImpl.cpp index aa1a96e1585..c00dd43b2ed 100644 --- a/base/poco/NetSSL_OpenSSL/src/SecureStreamSocketImpl.cpp +++ b/base/poco/NetSSL_OpenSSL/src/SecureStreamSocketImpl.cpp @@ -237,5 +237,15 @@ int SecureStreamSocketImpl::completeHandshake() return _impl.completeHandshake(); } +bool SecureStreamSocketImpl::getBlocking() const +{ + return _impl.getBlocking(); +} + +void SecureStreamSocketImpl::setBlocking(bool flag) +{ + _impl.setBlocking(flag); +} + } } // namespace Poco::Net diff --git a/src/Common/checkSSLError.h b/src/Common/checkSSLError.h new file mode 100644 index 00000000000..05bca9f8b5f --- /dev/null +++ b/src/Common/checkSSLError.h @@ -0,0 +1,8 @@ +// +// Created by Павел Круглов on 27/05/2023. +// + +#ifndef CLICKHOUSE_CHECKSSLERROR_H +#define CLICKHOUSE_CHECKSSLERROR_H + +#endif //CLICKHOUSE_CHECKSSLERROR_H diff --git a/src/Common/checkSSLReturnCode.cpp b/src/Common/checkSSLReturnCode.cpp new file mode 100644 index 00000000000..8916a25e19c --- /dev/null +++ b/src/Common/checkSSLReturnCode.cpp @@ -0,0 +1,29 @@ +#include +#include "config.h" + +#if USE_SSL +#include +#endif + +namespace DB +{ + +bool checkSSLWantRead(ssize_t res) +{ +#if USE_SSL + return res == Poco::Net::SecureStreamSocket::ERR_SSL_WANT_READ; +#else + return false; +#endif +} + +bool checkSSLWantWrite(ssize_t res) +{ +#if USE_SSL + return res == Poco::Net::SecureStreamSocket::ERR_SSL_WANT_WRITE; +#else + return false; +#endif +} + +} diff --git a/src/Common/checkSSLReturnCode.h b/src/Common/checkSSLReturnCode.h new file mode 100644 index 00000000000..f30564137aa --- /dev/null +++ b/src/Common/checkSSLReturnCode.h @@ -0,0 +1,12 @@ +#pragma once + +namespace DB +{ + +/// Check if ret is ERR_SSL_WANT_READ. +bool checkSSLWantRead(ssize_t ret); + +/// CHeck if ret is ERR_SSL_WANT_WRITE. +bool checkSSLWantWrite(ssize_t ret); + +} diff --git a/src/IO/ReadBufferFromPocoSocket.cpp b/src/IO/ReadBufferFromPocoSocket.cpp index d6790439683..ff72dc5386c 100644 --- a/src/IO/ReadBufferFromPocoSocket.cpp +++ b/src/IO/ReadBufferFromPocoSocket.cpp @@ -9,12 +9,7 @@ #include #include #include - -#include "config.h" - -#if USE_SSL -#include -#endif +#include namespace ProfileEvents { @@ -27,7 +22,6 @@ namespace CurrentMetrics extern const Metric NetworkReceive; } - namespace DB { namespace ErrorCodes @@ -38,14 +32,13 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } - bool ReadBufferFromPocoSocket::nextImpl() { ssize_t bytes_read = 0; Stopwatch watch; SCOPE_EXIT({ - // / NOTE: it is quite inaccurate on high loads since the thread could be replaced by another one + /// NOTE: it is quite inaccurate on high loads since the thread could be replaced by another one ProfileEvents::increment(ProfileEvents::NetworkReceiveElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::NetworkReceiveBytes, bytes_read); }); @@ -58,17 +51,35 @@ bool ReadBufferFromPocoSocket::nextImpl() if (internal_buffer.size() > INT_MAX) throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer overflow"); - bytes_read = readFromSocket(); - -#if USE_SSL - /// In case of non-blocking connect for secure socket receiveBytes can return ERR_SSL_WANT_READ, - /// in this case we should call receiveBytes again when socket is ready. - if (socket.secure()) + /// If async_callback is specified, set socket to non-blocking mode + /// and try to read data from it, if socket is not ready for reading, + /// run async_callback and try again later. + /// It is expected that file descriptor may be polled externally. + /// Note that send timeout is not checked here. External code should check it while polling. + if (async_callback) { - while (bytes_read == Poco::Net::SecureStreamSocket::ERR_SSL_WANT_READ) - bytes_read = readFromSocket(); + socket.setBlocking(false); + SCOPE_EXIT(socket.setBlocking(true)); + bool secure = socket.secure(); + bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), static_cast(internal_buffer.size())); + + /// Check EAGAIN and ERR_SSL_WANT_READ/ERR_SSL_WANT_WRITE for secure socket (reading from secure socket can write too). + while (bytes_read < 0 && (errno == EAGAIN || (secure && (checkSSLWantRead(bytes_read) || checkSSLWantWrite(bytes_read))))) + { + /// In case of ERR_SSL_WANT_WRITE we should wait for socket to be ready for writing, otherwise - for reading. + if (secure && checkSSLWantWrite(bytes_read)) + async_callback(socket.impl()->sockfd(), socket.getSendTimeout(), AsyncEventTimeoutType::SEND, socket_description, AsyncTaskExecutor::Event::WRITE | AsyncTaskExecutor::Event::ERROR); + else + async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), AsyncEventTimeoutType::RECEIVE, socket_description, AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR); + + /// Try to read again. + bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), static_cast(internal_buffer.size())); + } + } + else + { + bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), static_cast(internal_buffer.size())); } -#endif } catch (const Poco::Net::NetException & e) { @@ -96,17 +107,6 @@ bool ReadBufferFromPocoSocket::nextImpl() return true; } -ssize_t ReadBufferFromPocoSocket::readFromSocket() -{ - /// If async_callback is specified, and read will block, run async_callback and try again later. - /// It is expected that file descriptor may be polled externally. - /// Note that receive timeout is not checked here. External code should check it while polling. - while (async_callback && !socket.poll(0, Poco::Net::Socket::SELECT_READ | Poco::Net::Socket::SELECT_ERROR)) - async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), AsyncEventTimeoutType::RECEIVE, socket_description, AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR); - - return socket.impl()->receiveBytes(internal_buffer.begin(), static_cast(internal_buffer.size())); -} - ReadBufferFromPocoSocket::ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size) : BufferWithOwnMemory(buf_size) , socket(socket_) diff --git a/src/IO/ReadBufferFromPocoSocket.h b/src/IO/ReadBufferFromPocoSocket.h index 3c4bc424334..dab4ac86295 100644 --- a/src/IO/ReadBufferFromPocoSocket.h +++ b/src/IO/ReadBufferFromPocoSocket.h @@ -30,8 +30,6 @@ public: void setAsyncCallback(AsyncCallback async_callback_) { async_callback = std::move(async_callback_); } private: - ssize_t readFromSocket(); - AsyncCallback async_callback; std::string socket_description; }; diff --git a/src/IO/WriteBufferFromPocoSocket.cpp b/src/IO/WriteBufferFromPocoSocket.cpp index 6e7c67cc054..df1041f0056 100644 --- a/src/IO/WriteBufferFromPocoSocket.cpp +++ b/src/IO/WriteBufferFromPocoSocket.cpp @@ -10,12 +10,7 @@ #include #include #include - -#include "config.h" - -#if USE_SSL -#include -#endif +#include namespace ProfileEvents { @@ -28,7 +23,6 @@ namespace CurrentMetrics extern const Metric NetworkSend; } - namespace DB { @@ -40,7 +34,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } - void WriteBufferFromPocoSocket::nextImpl() { if (!offset()) @@ -67,17 +60,36 @@ void WriteBufferFromPocoSocket::nextImpl() if (size > INT_MAX) throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer overflow"); - res = writeToSocket(pos, size); - -#if USE_SSL - /// In case of non-blocking connect for secure socket sendBytes can return ERR_SSL_WANT_WRITE, - /// in this case we should call sendBytes again when socket is ready. - if (socket.secure()) + /// If async_callback is specified, set socket to non-blocking mode + /// and try to write data to it, if socket is not ready for writing, + /// run async_callback and try again later. + /// It is expected that file descriptor may be polled externally. + /// Note that send timeout is not checked here. External code should check it while polling. + if (async_callback) { - while (res == Poco::Net::SecureStreamSocket::ERR_SSL_WANT_WRITE) - res = writeToSocket(pos, size); + socket.setBlocking(false); + /// Set socket to blocking mode at the end. + SCOPE_EXIT(socket.setBlocking(true)); + bool secure = socket.secure(); + res = socket.impl()->sendBytes(pos, static_cast(size)); + + /// Check EAGAIN and ERR_SSL_WANT_WRITE/ERR_SSL_WANT_READ for secure socket (writing to secure socket can read too). + while (res < 0 && (errno == EAGAIN || (secure && (checkSSLWantRead(res) || checkSSLWantWrite(res))))) + { + /// In case of ERR_SSL_WANT_READ we should wait for socket to be ready for reading, otherwise - for writing. + if (secure && checkSSLWantRead(res)) + async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), AsyncEventTimeoutType::RECEIVE, socket_description, AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR); + else + async_callback(socket.impl()->sockfd(), socket.getSendTimeout(), AsyncEventTimeoutType::SEND, socket_description, AsyncTaskExecutor::Event::WRITE | AsyncTaskExecutor::Event::ERROR); + + /// Try to write again. + res = socket.impl()->sendBytes(pos, static_cast(size)); + } + } + else + { + res = socket.impl()->sendBytes(pos, static_cast(size)); } -#endif } catch (const Poco::Net::NetException & e) { @@ -104,18 +116,6 @@ void WriteBufferFromPocoSocket::nextImpl() } } -ssize_t WriteBufferFromPocoSocket::writeToSocket(char * data, size_t size) -{ - /// If async_callback is specified, and write will block, run async_callback and try again later. - /// It is expected that file descriptor may be polled externally. - /// Note that send timeout is not checked here. External code should check it while polling. - while (async_callback && !socket.poll(0, Poco::Net::Socket::SELECT_WRITE | Poco::Net::Socket::SELECT_ERROR)) - async_callback(socket.impl()->sockfd(), socket.getSendTimeout(), AsyncEventTimeoutType::SEND, socket_description, AsyncTaskExecutor::Event::WRITE | AsyncTaskExecutor::Event::ERROR); - - return socket.impl()->sendBytes(data, static_cast(size)); - -} - WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size) : BufferWithOwnMemory(buf_size) , socket(socket_) diff --git a/src/IO/WriteBufferFromPocoSocket.h b/src/IO/WriteBufferFromPocoSocket.h index 0f03e816af5..ecb61020357 100644 --- a/src/IO/WriteBufferFromPocoSocket.h +++ b/src/IO/WriteBufferFromPocoSocket.h @@ -35,8 +35,6 @@ protected: Poco::Net::SocketAddress our_address; private: - ssize_t writeToSocket(char * data, size_t size); - AsyncCallback async_callback; std::string socket_description; };