From 3d0b31dfede17296bab21df368ab0650c281d481 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 7 Mar 2014 08:04:02 +0400 Subject: [PATCH] dbms: probably better [#METR-10240]. --- dbms/include/DB/IO/ReadBufferFromPocoSocket.h | 48 +------------- .../include/DB/IO/WriteBufferFromPocoSocket.h | 55 +--------------- dbms/src/IO/ReadBufferFromPocoSocket.cpp | 55 ++++++++++++++++ dbms/src/IO/WriteBufferFromPocoSocket.cpp | 63 +++++++++++++++++++ 4 files changed, 124 insertions(+), 97 deletions(-) create mode 100644 dbms/src/IO/ReadBufferFromPocoSocket.cpp create mode 100644 dbms/src/IO/WriteBufferFromPocoSocket.cpp diff --git a/dbms/include/DB/IO/ReadBufferFromPocoSocket.h b/dbms/include/DB/IO/ReadBufferFromPocoSocket.h index 542ea8b5a8a..d67fef2bb80 100644 --- a/dbms/include/DB/IO/ReadBufferFromPocoSocket.h +++ b/dbms/include/DB/IO/ReadBufferFromPocoSocket.h @@ -1,10 +1,6 @@ #pragma once #include -#include - -#include -#include #include #include @@ -26,50 +22,12 @@ protected: */ Poco::Net::SocketAddress peer_address; - - bool nextImpl() - { - ssize_t bytes_read = 0; - - /// Добавляем в эксепшены более подробную информацию. - try - { - bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), internal_buffer.size()); - } - catch (const Poco::Net::NetException & e) - { - throw Exception(e.displayText(), "while reading from socket (" + peer_address.toString() + ")", ErrorCodes::NETWORK_ERROR); - } - catch (const Poco::TimeoutException & e) - { - throw Exception("Timeout exceeded while reading from socket (" + peer_address.toString() + ")", ErrorCodes::SOCKET_TIMEOUT); - } - catch (const Poco::IOException & e) - { - throw Exception(e.displayText(), "while reading from socket (" + peer_address.toString() + ")", ErrorCodes::NETWORK_ERROR); - } - - if (bytes_read < 0) - throw Exception("Cannot read from socket (" + peer_address.toString() + ")", ErrorCodes::CANNOT_READ_FROM_SOCKET); - - if (bytes_read) - working_buffer.resize(bytes_read); - else - return false; - - return true; - } + bool nextImpl(); public: - ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) - : BufferWithOwnMemory(buf_size), socket(socket_), peer_address(socket.peerAddress()) - { - } + ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); - bool poll(size_t timeout_microseconds) - { - return offset() != buffer().size() || socket.poll(timeout_microseconds, Poco::Net::Socket::SELECT_READ | Poco::Net::Socket::SELECT_ERROR); - } + bool poll(size_t timeout_microseconds); }; } diff --git a/dbms/include/DB/IO/WriteBufferFromPocoSocket.h b/dbms/include/DB/IO/WriteBufferFromPocoSocket.h index 973db1af04a..2c6b88fb3d9 100644 --- a/dbms/include/DB/IO/WriteBufferFromPocoSocket.h +++ b/dbms/include/DB/IO/WriteBufferFromPocoSocket.h @@ -1,10 +1,6 @@ #pragma once #include -#include - -#include -#include #include #include @@ -27,57 +23,12 @@ protected: Poco::Net::SocketAddress peer_address; - void nextImpl() - { - if (!offset()) - return; - - size_t bytes_written = 0; - while (bytes_written < offset()) - { - ssize_t res = 0; - - /// Добавляем в эксепшены более подробную информацию. - try - { - res = socket.impl()->sendBytes(working_buffer.begin() + bytes_written, offset() - bytes_written); - } - catch (const Poco::Net::NetException & e) - { - throw Exception(e.displayText() + "while writing to socket (" + peer_address.toString() + ")", ErrorCodes::NETWORK_ERROR); - } - catch (const Poco::TimeoutException & e) - { - throw Exception("Timeout exceeded while writing to socket (" + peer_address.toString() + ")", ErrorCodes::SOCKET_TIMEOUT); - } - catch (const Poco::IOException & e) - { - throw Exception(e.displayText(), "while reading from socket (" + peer_address.toString() + ")", ErrorCodes::NETWORK_ERROR); - } - - if (res < 0) - throw Exception("Cannot write to socket (" + peer_address.toString() + ")", ErrorCodes::CANNOT_WRITE_TO_SOCKET); - bytes_written += res; - } - } + void nextImpl(); public: - WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) - : BufferWithOwnMemory(buf_size), socket(socket_), peer_address(socket.peerAddress()) - { - } + WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); - ~WriteBufferFromPocoSocket() - { - try - { - next(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } + ~WriteBufferFromPocoSocket(); }; } diff --git a/dbms/src/IO/ReadBufferFromPocoSocket.cpp b/dbms/src/IO/ReadBufferFromPocoSocket.cpp new file mode 100644 index 00000000000..cd77880b3cf --- /dev/null +++ b/dbms/src/IO/ReadBufferFromPocoSocket.cpp @@ -0,0 +1,55 @@ +#include + +#include +#include + +#include + + +namespace DB +{ + +bool ReadBufferFromPocoSocket::nextImpl() +{ + ssize_t bytes_read = 0; + + /// Добавляем в эксепшены более подробную информацию. + try + { + bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), internal_buffer.size()); + } + catch (const Poco::Net::NetException & e) + { + throw Exception(e.displayText(), "while reading from socket (" + peer_address.toString() + ")", ErrorCodes::NETWORK_ERROR); + } + catch (const Poco::TimeoutException & e) + { + throw Exception("Timeout exceeded while reading from socket (" + peer_address.toString() + ")", ErrorCodes::SOCKET_TIMEOUT); + } + catch (const Poco::IOException & e) + { + throw Exception(e.displayText(), "while reading from socket (" + peer_address.toString() + ")", ErrorCodes::NETWORK_ERROR); + } + + if (bytes_read < 0) + throw Exception("Cannot read from socket (" + peer_address.toString() + ")", ErrorCodes::CANNOT_READ_FROM_SOCKET); + + if (bytes_read) + working_buffer.resize(bytes_read); + else + return false; + + return true; +} + +ReadBufferFromPocoSocket::ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size) + : BufferWithOwnMemory(buf_size), socket(socket_), peer_address(socket.peerAddress()) +{ +} + +bool ReadBufferFromPocoSocket::poll(size_t timeout_microseconds) +{ + return offset() != buffer().size() || socket.poll(timeout_microseconds, Poco::Net::Socket::SELECT_READ | Poco::Net::Socket::SELECT_ERROR); +} + +} diff --git a/dbms/src/IO/WriteBufferFromPocoSocket.cpp b/dbms/src/IO/WriteBufferFromPocoSocket.cpp new file mode 100644 index 00000000000..874e9760693 --- /dev/null +++ b/dbms/src/IO/WriteBufferFromPocoSocket.cpp @@ -0,0 +1,63 @@ +#include + +#include +#include + +#include + + +namespace DB +{ + +void WriteBufferFromPocoSocket::nextImpl() +{ + if (!offset()) + return; + + size_t bytes_written = 0; + while (bytes_written < offset()) + { + ssize_t res = 0; + + /// Добавляем в эксепшены более подробную информацию. + try + { + res = socket.impl()->sendBytes(working_buffer.begin() + bytes_written, offset() - bytes_written); + } + catch (const Poco::Net::NetException & e) + { + throw Exception(e.displayText() + "while writing to socket (" + peer_address.toString() + ")", ErrorCodes::NETWORK_ERROR); + } + catch (const Poco::TimeoutException & e) + { + throw Exception("Timeout exceeded while writing to socket (" + peer_address.toString() + ")", ErrorCodes::SOCKET_TIMEOUT); + } + catch (const Poco::IOException & e) + { + throw Exception(e.displayText(), "while reading from socket (" + peer_address.toString() + ")", ErrorCodes::NETWORK_ERROR); + } + + if (res < 0) + throw Exception("Cannot write to socket (" + peer_address.toString() + ")", ErrorCodes::CANNOT_WRITE_TO_SOCKET); + bytes_written += res; + } +} + +WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size) + : BufferWithOwnMemory(buf_size), socket(socket_), peer_address(socket.peerAddress()) +{ +} + +WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket() +{ + try + { + next(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + +}