2012-05-21 20:38:34 +00:00
|
|
|
#pragma once
|
|
|
|
|
2015-09-29 19:19:54 +00:00
|
|
|
#include <common/logger_useful.h>
|
2012-10-16 19:20:58 +00:00
|
|
|
|
2012-05-16 18:03:00 +00:00
|
|
|
#include <Poco/Net/StreamSocket.h>
|
|
|
|
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Common/Throttler.h>
|
2015-02-10 20:48:17 +00:00
|
|
|
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Core/Block.h>
|
|
|
|
#include <Core/Defines.h>
|
2017-11-24 13:55:31 +00:00
|
|
|
#include <IO/Progress.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Core/Protocol.h>
|
|
|
|
#include <Core/QueryProcessingStage.h>
|
2012-05-16 18:03:00 +00:00
|
|
|
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <DataStreams/IBlockInputStream.h>
|
|
|
|
#include <DataStreams/IBlockOutputStream.h>
|
|
|
|
#include <DataStreams/BlockStreamProfileInfo.h>
|
2012-05-16 18:03:00 +00:00
|
|
|
|
2017-10-12 23:56:28 +00:00
|
|
|
#include <IO/CompressionSettings.h>
|
2017-12-27 17:58:52 +00:00
|
|
|
#include <IO/ConnectionTimeouts.h>
|
2017-10-12 23:56:28 +00:00
|
|
|
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Interpreters/Settings.h>
|
2017-04-17 16:02:48 +00:00
|
|
|
#include <Interpreters/TablesStatus.h>
|
2013-02-01 19:02:04 +00:00
|
|
|
|
2014-10-08 19:00:25 +00:00
|
|
|
#include <atomic>
|
2018-06-08 19:50:15 +00:00
|
|
|
#include <optional>
|
2014-10-08 19:00:25 +00:00
|
|
|
|
2012-05-16 18:03:00 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2016-10-24 21:40:39 +00:00
|
|
|
class ClientInfo;
|
|
|
|
|
2017-03-09 00:56:38 +00:00
|
|
|
/// The stream of blocks reading from the table and its name
|
2016-05-28 10:15:36 +00:00
|
|
|
using ExternalTableData = std::pair<BlockInputStreamPtr, std::string>;
|
2017-03-09 00:56:38 +00:00
|
|
|
/// Vector of pairs describing tables
|
2016-05-28 10:15:36 +00:00
|
|
|
using ExternalTablesData = std::vector<ExternalTableData>;
|
|
|
|
|
|
|
|
class Connection;
|
|
|
|
|
|
|
|
using ConnectionPtr = std::shared_ptr<Connection>;
|
|
|
|
using Connections = std::vector<ConnectionPtr>;
|
2012-05-16 18:03:00 +00:00
|
|
|
|
2015-02-10 20:48:17 +00:00
|
|
|
|
2016-07-31 03:53:16 +00:00
|
|
|
/** Connection with database server, to use by client.
|
|
|
|
* How to use - see Core/Protocol.h
|
|
|
|
* (Implementation of server end - see Server/TCPHandler.h)
|
2012-05-30 06:46:57 +00:00
|
|
|
*
|
2016-07-31 03:53:16 +00:00
|
|
|
* As 'default_database' empty string could be passed
|
|
|
|
* - in that case, server will use it's own default database.
|
2012-05-16 18:03:00 +00:00
|
|
|
*/
|
2012-10-22 19:03:01 +00:00
|
|
|
class Connection : private boost::noncopyable
|
2012-05-16 18:03:00 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
friend class MultiplexedConnections;
|
2015-01-14 10:06:30 +00:00
|
|
|
|
2012-05-16 18:03:00 +00:00
|
|
|
public:
|
2018-03-29 20:21:01 +00:00
|
|
|
Connection(const String & host_, UInt16 port_,
|
2017-04-01 07:20:54 +00:00
|
|
|
const String & default_database_,
|
|
|
|
const String & user_, const String & password_,
|
2017-12-27 17:58:52 +00:00
|
|
|
const ConnectionTimeouts & timeouts_,
|
2017-04-01 07:20:54 +00:00
|
|
|
const String & client_name_ = "client",
|
2017-10-03 14:52:08 +00:00
|
|
|
Protocol::Compression compression_ = Protocol::Compression::Enable,
|
2018-03-29 01:41:06 +00:00
|
|
|
Protocol::Secure secure_ = Protocol::Secure::Disable,
|
2017-04-17 16:02:48 +00:00
|
|
|
Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0))
|
2017-04-01 07:20:54 +00:00
|
|
|
:
|
2018-03-29 20:21:01 +00:00
|
|
|
host(host_), port(port_), default_database(default_database_),
|
|
|
|
user(user_), password(password_), current_resolved_address(host, port),
|
2017-04-01 07:20:54 +00:00
|
|
|
client_name(client_name_),
|
|
|
|
compression(compression_),
|
2018-03-29 01:41:06 +00:00
|
|
|
secure(secure_),
|
2017-12-27 17:58:52 +00:00
|
|
|
timeouts(timeouts_),
|
2017-04-17 16:02:48 +00:00
|
|
|
sync_request_timeout(sync_request_timeout_),
|
2017-04-01 07:20:54 +00:00
|
|
|
log_wrapper(*this)
|
|
|
|
{
|
|
|
|
/// Don't connect immediately, only on first need.
|
|
|
|
|
|
|
|
if (user.empty())
|
|
|
|
user = "default";
|
|
|
|
|
|
|
|
setDescription();
|
|
|
|
}
|
|
|
|
|
2018-06-03 17:43:56 +00:00
|
|
|
virtual ~Connection() {}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
/// Set throttler of network traffic. One throttler could be used for multiple connections to limit total traffic.
|
|
|
|
void setThrottler(const ThrottlerPtr & throttler_)
|
|
|
|
{
|
|
|
|
throttler = throttler_;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// Packet that could be received from server.
|
|
|
|
struct Packet
|
|
|
|
{
|
|
|
|
UInt64 type;
|
|
|
|
|
|
|
|
Block block;
|
|
|
|
std::unique_ptr<Exception> exception;
|
|
|
|
Progress progress;
|
|
|
|
BlockStreamProfileInfo profile_info;
|
|
|
|
|
|
|
|
Packet() : type(Protocol::Server::Hello) {}
|
|
|
|
};
|
|
|
|
|
|
|
|
/// Change default database. Changes will take effect on next reconnect.
|
|
|
|
void setDefaultDatabase(const String & database);
|
|
|
|
|
2018-07-31 21:36:18 +00:00
|
|
|
void getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & version_patch, UInt64 & revision);
|
2018-09-28 20:17:38 +00:00
|
|
|
UInt64 getServerRevision();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
const String & getServerTimezone();
|
2018-02-26 06:49:17 +00:00
|
|
|
const String & getServerDisplayName();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
/// For log and exception messages.
|
|
|
|
const String & getDescription() const;
|
|
|
|
const String & getHost() const;
|
|
|
|
UInt16 getPort() const;
|
|
|
|
const String & getDefaultDatabase() const;
|
|
|
|
|
|
|
|
/// If last flag is true, you need to call sendExternalTablesData after.
|
|
|
|
void sendQuery(
|
|
|
|
const String & query,
|
|
|
|
const String & query_id_ = "",
|
|
|
|
UInt64 stage = QueryProcessingStage::Complete,
|
|
|
|
const Settings * settings = nullptr,
|
|
|
|
const ClientInfo * client_info = nullptr,
|
|
|
|
bool with_pending_data = false);
|
|
|
|
|
|
|
|
void sendCancel();
|
|
|
|
/// Send block of data; if name is specified, server will write it to external (temporary) table of that name.
|
|
|
|
void sendData(const Block & block, const String & name = "");
|
|
|
|
/// Send all contents of external (temporary) tables.
|
|
|
|
void sendExternalTablesData(ExternalTablesData & data);
|
|
|
|
|
|
|
|
/// Send prepared block of data (serialized and, if need, compressed), that will be read from 'input'.
|
|
|
|
/// You could pass size of serialized/compressed block.
|
|
|
|
void sendPreparedData(ReadBuffer & input, size_t size, const String & name = "");
|
|
|
|
|
|
|
|
/// Check, if has data to read.
|
|
|
|
bool poll(size_t timeout_microseconds = 0);
|
|
|
|
|
|
|
|
/// Check, if has data in read buffer.
|
2018-06-14 14:29:42 +00:00
|
|
|
bool hasReadPendingData() const;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-06-08 19:50:15 +00:00
|
|
|
/// Checks if there is input data in connection and reads packet ID.
|
|
|
|
std::optional<UInt64> checkPacket(size_t timeout_microseconds = 0);
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/// Receive packet from server.
|
|
|
|
Packet receivePacket();
|
|
|
|
|
|
|
|
/// If not connected yet, or if connection is broken - then connect. If cannot connect - throw an exception.
|
|
|
|
void forceConnected();
|
|
|
|
|
2017-04-17 16:02:48 +00:00
|
|
|
TablesStatusResponse getTablesStatus(const TablesStatusRequest & request);
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/** Disconnect.
|
|
|
|
* This may be used, if connection is left in unsynchronised state
|
|
|
|
* (when someone continues to wait for something) after an exception.
|
|
|
|
*/
|
|
|
|
void disconnect();
|
|
|
|
|
|
|
|
size_t outBytesCount() const { return out ? out->count() : 0; }
|
|
|
|
size_t inBytesCount() const { return in ? in->count() : 0; }
|
2014-11-27 19:35:31 +00:00
|
|
|
|
2018-03-29 20:21:01 +00:00
|
|
|
/// Returns initially resolved address
|
|
|
|
Poco::Net::SocketAddress getResolvedAddress() const;
|
|
|
|
|
2012-05-16 18:03:00 +00:00
|
|
|
private:
|
2017-04-01 07:20:54 +00:00
|
|
|
String host;
|
|
|
|
UInt16 port;
|
|
|
|
String default_database;
|
|
|
|
String user;
|
|
|
|
String password;
|
|
|
|
|
2018-03-29 20:21:01 +00:00
|
|
|
/// Address is resolved during the first connection (or the following reconnects)
|
|
|
|
/// Use it only for logging purposes
|
|
|
|
Poco::Net::SocketAddress current_resolved_address;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
/// For messages in log and in exceptions.
|
|
|
|
String description;
|
|
|
|
void setDescription();
|
|
|
|
|
|
|
|
String client_name;
|
|
|
|
|
|
|
|
bool connected = false;
|
|
|
|
|
|
|
|
String server_name;
|
|
|
|
UInt64 server_version_major = 0;
|
|
|
|
UInt64 server_version_minor = 0;
|
2018-07-31 21:36:18 +00:00
|
|
|
UInt64 server_version_patch = 0;
|
2017-04-01 07:20:54 +00:00
|
|
|
UInt64 server_revision = 0;
|
|
|
|
String server_timezone;
|
2018-02-26 06:49:17 +00:00
|
|
|
String server_display_name;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-09-28 19:43:31 +00:00
|
|
|
std::unique_ptr<Poco::Net::StreamSocket> socket;
|
2017-04-01 07:20:54 +00:00
|
|
|
std::shared_ptr<ReadBuffer> in;
|
|
|
|
std::shared_ptr<WriteBuffer> out;
|
2018-06-08 19:50:15 +00:00
|
|
|
std::optional<UInt64> last_input_packet_type;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
String query_id;
|
2017-10-03 14:52:08 +00:00
|
|
|
Protocol::Compression compression; /// Enable data compression for communication.
|
2018-03-29 01:41:06 +00:00
|
|
|
Protocol::Secure secure; /// Enable data encryption for communication.
|
2017-10-12 23:56:28 +00:00
|
|
|
|
|
|
|
/// What compression settings to use while sending data for INSERT queries and external tables.
|
|
|
|
CompressionSettings compression_settings;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
/** If not nullptr, used to limit network traffic.
|
|
|
|
* Only traffic for transferring blocks is accounted. Other packets don't.
|
|
|
|
*/
|
|
|
|
ThrottlerPtr throttler;
|
|
|
|
|
2017-12-27 17:58:52 +00:00
|
|
|
ConnectionTimeouts timeouts;
|
2017-04-17 16:02:48 +00:00
|
|
|
Poco::Timespan sync_request_timeout;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
/// From where to read query execution result.
|
|
|
|
std::shared_ptr<ReadBuffer> maybe_compressed_in;
|
|
|
|
BlockInputStreamPtr block_in;
|
2018-06-06 20:57:07 +00:00
|
|
|
BlockInputStreamPtr block_logs_in;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
/// Where to write data for INSERT.
|
|
|
|
std::shared_ptr<WriteBuffer> maybe_compressed_out;
|
|
|
|
BlockOutputStreamPtr block_out;
|
|
|
|
|
|
|
|
/// Logger is created lazily, for avoid to run DNS request in constructor.
|
|
|
|
class LoggerWrapper
|
|
|
|
{
|
|
|
|
public:
|
|
|
|
LoggerWrapper(Connection & parent_)
|
|
|
|
: log(nullptr), parent(parent_)
|
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
Logger * get()
|
|
|
|
{
|
|
|
|
if (!log)
|
|
|
|
log = &Logger::get("Connection (" + parent.getDescription() + ")");
|
|
|
|
|
|
|
|
return log;
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
std::atomic<Logger *> log;
|
|
|
|
Connection & parent;
|
|
|
|
};
|
|
|
|
|
|
|
|
LoggerWrapper log_wrapper;
|
|
|
|
|
|
|
|
void connect();
|
|
|
|
void sendHello();
|
|
|
|
void receiveHello();
|
|
|
|
bool ping();
|
|
|
|
|
|
|
|
Block receiveData();
|
2018-06-06 20:57:07 +00:00
|
|
|
Block receiveLogData();
|
|
|
|
Block receiveDataImpl(BlockInputStreamPtr & stream);
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
std::unique_ptr<Exception> receiveException();
|
|
|
|
Progress receiveProgress();
|
|
|
|
BlockStreamProfileInfo receiveProfileInfo();
|
|
|
|
|
2018-06-06 20:57:07 +00:00
|
|
|
void initInputBuffers();
|
2017-04-01 07:20:54 +00:00
|
|
|
void initBlockInput();
|
2018-06-06 20:57:07 +00:00
|
|
|
void initBlockLogsInput();
|
2017-04-17 16:02:48 +00:00
|
|
|
|
|
|
|
void throwUnexpectedPacket(UInt64 packet_type, const char * expected) const;
|
2012-05-16 18:03:00 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
}
|