ClickHouse/dbms/include/DB/Client/Connection.h

274 lines
8.3 KiB
C++
Raw Normal View History

2012-05-21 20:38:34 +00:00
#pragma once
2015-09-29 19:19:54 +00:00
#include <common/logger_useful.h>
2012-10-16 19:20:58 +00:00
2012-05-16 18:03:00 +00:00
#include <Poco/Net/StreamSocket.h>
#include <DB/Common/Throttler.h>
2012-05-16 18:03:00 +00:00
#include <DB/Core/Block.h>
#include <DB/Core/Defines.h>
2012-05-16 18:03:00 +00:00
#include <DB/Core/Progress.h>
#include <DB/Core/Protocol.h>
#include <DB/Core/QueryProcessingStage.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
2015-01-18 08:25:56 +00:00
#include <DB/DataStreams/BlockStreamProfileInfo.h>
2012-05-16 18:03:00 +00:00
#include <DB/Interpreters/Settings.h>
#include <atomic>
2012-05-16 18:03:00 +00:00
namespace DB
{
class ClientInfo;
/// Поток блоков читающих из таблицы и ее имя
using ExternalTableData = std::pair<BlockInputStreamPtr, std::string>;
/// Вектор пар, описывающих таблицы
using ExternalTablesData = std::vector<ExternalTableData>;
class Connection;
using ConnectionPtr = std::shared_ptr<Connection>;
using Connections = std::vector<ConnectionPtr>;
2012-05-16 18:03:00 +00:00
2016-07-31 03:53:16 +00:00
/** Connection with database server, to use by client.
* How to use - see Core/Protocol.h
* (Implementation of server end - see Server/TCPHandler.h)
2012-05-30 06:46:57 +00:00
*
2016-07-31 03:53:16 +00:00
* As 'default_database' empty string could be passed
* - in that case, server will use it's own default database.
2012-05-16 18:03:00 +00:00
*/
2012-10-22 19:03:01 +00:00
class Connection : private boost::noncopyable
2012-05-16 18:03:00 +00:00
{
friend class ParallelReplicas;
2015-11-06 17:44:01 +00:00
friend class MultiplexedConnections;
2012-05-16 18:03:00 +00:00
public:
2012-05-30 06:46:57 +00:00
Connection(const String & host_, UInt16 port_, const String & default_database_,
2013-08-10 09:04:45 +00:00
const String & user_, const String & password_,
2012-05-23 19:51:30 +00:00
const String & client_name_ = "client",
2012-07-26 19:42:20 +00:00
Protocol::Compression::Enum compression_ = Protocol::Compression::Enable,
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0),
Poco::Timespan ping_timeout_ = Poco::Timespan(DBMS_DEFAULT_PING_TIMEOUT_SEC, 0))
2013-08-10 09:04:45 +00:00
:
host(host_), port(port_), default_database(default_database_),
2015-05-28 21:41:28 +00:00
user(user_), password(password_), resolved_address(host, port),
client_name(client_name_),
compression(compression_),
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_),
ping_timeout(ping_timeout_),
log_wrapper(*this)
{
2016-07-31 03:53:16 +00:00
/// Don't connect immediately, only on first need.
2015-05-28 21:41:28 +00:00
if (user.empty())
user = "default";
setDescription();
2015-05-28 21:41:28 +00:00
}
Connection(const String & host_, UInt16 port_, const Poco::Net::SocketAddress & resolved_address_,
const String & default_database_,
const String & user_, const String & password_,
const String & client_name_ = "client",
Protocol::Compression::Enum compression_ = Protocol::Compression::Enable,
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0),
Poco::Timespan ping_timeout_ = Poco::Timespan(DBMS_DEFAULT_PING_TIMEOUT_SEC, 0))
:
host(host_), port(port_),
default_database(default_database_),
2013-08-10 09:04:45 +00:00
user(user_), password(password_),
2015-05-28 21:41:28 +00:00
resolved_address(resolved_address_),
2015-02-01 07:21:19 +00:00
client_name(client_name_),
compression(compression_),
2012-10-16 19:20:58 +00:00
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_),
ping_timeout(ping_timeout_),
2015-05-28 21:41:28 +00:00
log_wrapper(*this)
2012-05-16 18:03:00 +00:00
{
2016-07-31 03:53:16 +00:00
/// Don't connect immediately, only on first need.
2013-08-10 09:04:45 +00:00
if (user.empty())
user = "default";
setDescription();
2012-05-16 18:03:00 +00:00
}
virtual ~Connection() {};
2016-07-31 03:53:16 +00:00
/// Set throttler of network traffic. One throttler could be used for multiple connections to limit total traffic.
void setThrottler(const ThrottlerPtr & throttler_)
{
throttler = throttler_;
}
2012-05-16 18:03:00 +00:00
2016-07-31 03:53:16 +00:00
/// Packet that could be received from server.
2012-05-16 18:03:00 +00:00
struct Packet
{
UInt64 type;
Block block;
std::unique_ptr<Exception> exception;
2012-05-16 18:03:00 +00:00
Progress progress;
2013-05-22 14:57:43 +00:00
BlockStreamProfileInfo profile_info;
2012-05-16 18:03:00 +00:00
Packet() : type(Protocol::Server::Hello) {}
};
2016-07-31 03:53:16 +00:00
/// Change default database. Changes will take effect on next reconnect.
void setDefaultDatabase(const String & database);
2012-05-16 18:20:45 +00:00
void getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & revision);
2012-05-16 18:03:00 +00:00
2016-07-31 03:53:16 +00:00
/// For log and exception messages.
2016-06-08 14:39:49 +00:00
const String & getDescription() const;
const String & getHost() const;
UInt16 getPort() const;
const String & getDefaultDatabase() const;
2015-05-28 21:41:28 +00:00
2016-07-31 03:53:16 +00:00
/// If last flag is true, you need to call sendExternalTablesData after.
void sendQuery(
const String & query,
const String & query_id_ = "",
UInt64 stage = QueryProcessingStage::Complete,
const Settings * settings = nullptr,
const ClientInfo * client_info = nullptr,
bool with_pending_data = false);
2012-05-16 18:03:00 +00:00
void sendCancel();
2016-07-31 03:53:16 +00:00
/// Send block of data; if name is specified, server will write it to external (temporary) table of that name.
void sendData(const Block & block, const String & name = "");
2016-07-31 03:53:16 +00:00
/// Send all contents of external (temporary) tables.
void sendExternalTablesData(ExternalTablesData & data);
2012-05-16 18:03:00 +00:00
2016-07-31 03:53:16 +00:00
/// Send prepared block of data (serialized and, if need, compressed), that will be read from 'input'.
/// You could pass size of serialized/compressed block.
void sendPreparedData(ReadBuffer & input, size_t size, const String & name = "");
2016-07-31 03:53:16 +00:00
/// Check, if has data to read.
2012-05-16 18:03:00 +00:00
bool poll(size_t timeout_microseconds = 0);
2016-07-31 03:53:16 +00:00
/// Check, if has data in read buffer.
bool hasReadBufferPendingData() const;
2016-07-31 03:53:16 +00:00
/// Receive packet from server.
2012-05-16 18:03:00 +00:00
Packet receivePacket();
2016-07-31 03:53:16 +00:00
/// If not connected yet, or if connection is broken - then connect. If cannot connect - throw an exception.
void forceConnected();
2016-07-31 03:53:16 +00:00
/** Disconnect.
* This may be used, if connection is left in unsynchronised state
* (when someone continues to wait for something) after an exception.
*/
void disconnect();
2015-10-12 14:53:16 +00:00
/** Заполнить информацию, которая необходима при получении блока для некоторых задач
* (пока только для запроса DESCRIBE TABLE с Distributed-таблицами).
*/
void fillBlockExtraInfo(BlockExtraInfo & info) const;
size_t outBytesCount() const { return out ? out->count() : 0; }
size_t inBytesCount() const { return in ? in->count() : 0; }
2012-05-16 18:03:00 +00:00
private:
String host;
UInt16 port;
2012-05-30 06:46:57 +00:00
String default_database;
2013-08-10 09:04:45 +00:00
String user;
String password;
2012-05-16 18:20:45 +00:00
2016-07-31 03:53:16 +00:00
/** Address could be resolved beforehand and passed to constructor. Then 'host' and 'port' fields are used just for logging.
* Otherwise address is resolved in constructor. Thus, DNS based load balancing is not supported.
*/
2015-05-28 21:41:28 +00:00
Poco::Net::SocketAddress resolved_address;
/// Для сообщений в логе и в эксепшенах.
String description;
void setDescription();
2012-05-23 19:51:30 +00:00
String client_name;
2015-02-01 07:21:19 +00:00
bool connected = false;
2012-05-21 20:38:34 +00:00
2012-05-16 18:20:45 +00:00
String server_name;
2015-02-01 07:21:19 +00:00
UInt64 server_version_major = 0;
UInt64 server_version_minor = 0;
UInt64 server_revision = 0;
2012-05-16 18:03:00 +00:00
Poco::Net::StreamSocket socket;
std::shared_ptr<ReadBuffer> in;
std::shared_ptr<WriteBuffer> out;
2012-05-16 18:03:00 +00:00
String query_id;
2016-07-31 03:53:16 +00:00
UInt64 compression; /// Enable data compression for communication.
/// What compression algorithm to use while sending data for INSERT queries and external tables.
CompressionMethod network_compression_method = CompressionMethod::LZ4;
2012-05-16 18:03:00 +00:00
2016-07-31 03:53:16 +00:00
/** If not nullptr, used to limit network traffic.
* Only traffic for transferring blocks is accounted. Other packets don't.
*/
ThrottlerPtr throttler;
2012-07-26 19:42:20 +00:00
Poco::Timespan connect_timeout;
Poco::Timespan receive_timeout;
Poco::Timespan send_timeout;
Poco::Timespan ping_timeout;
2012-07-26 19:42:20 +00:00
2016-07-31 03:53:16 +00:00
/// From where to read query execution result.
std::shared_ptr<ReadBuffer> maybe_compressed_in;
2012-05-16 18:03:00 +00:00
BlockInputStreamPtr block_in;
2016-07-31 03:53:16 +00:00
/// Where to write data for INSERT.
std::shared_ptr<WriteBuffer> maybe_compressed_out;
2012-05-16 18:03:00 +00:00
BlockOutputStreamPtr block_out;
2016-07-31 03:53:16 +00:00
/// Logger is created lazily, for avoid to run DNS request in constructor.
class LoggerWrapper
{
public:
2015-05-28 21:41:28 +00:00
LoggerWrapper(Connection & parent_)
: log(nullptr), parent(parent_)
{
}
Logger * get()
{
if (!log)
log = &Logger::get("Connection (" + parent.getDescription() + ")");
return log;
}
private:
std::atomic<Logger *> log;
2015-05-28 21:41:28 +00:00
Connection & parent;
};
LoggerWrapper log_wrapper;
2012-10-16 19:20:58 +00:00
2012-05-16 18:03:00 +00:00
void connect();
void sendHello();
void receiveHello();
bool ping();
Block receiveData();
std::unique_ptr<Exception> receiveException();
2012-05-16 18:03:00 +00:00
Progress receiveProgress();
2013-05-22 14:57:43 +00:00
BlockStreamProfileInfo receiveProfileInfo();
void initBlockInput();
2012-05-16 18:03:00 +00:00
};
}