ClickHouse/src/Client/Connection.h

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

316 lines
9.8 KiB
C++
Raw Normal View History

2012-05-21 20:38:34 +00:00
#pragma once
2012-10-16 19:20:58 +00:00
2012-05-16 18:03:00 +00:00
#include <Poco/Net/StreamSocket.h>
#include <Common/SSH/Wrappers.h>
2021-08-17 19:59:51 +00:00
#include <Client/IServerConnection.h>
#include <Core/Defines.h>
2012-05-16 18:03:00 +00:00
2020-12-02 17:02:14 +00:00
#include <IO/ReadBufferFromPocoSocket.h>
2023-03-03 19:30:43 +00:00
#include <IO/WriteBufferFromPocoSocket.h>
#include <Interpreters/TablesStatus.h>
#include <Interpreters/Context_fwd.h>
2018-12-21 12:17:30 +00:00
#include <Compression/ICompressionCodec.h>
#include <Storages/MergeTree/RequestResponse.h>
#include <atomic>
#include <optional>
#include "config.h"
2012-05-16 18:03:00 +00:00
namespace DB
{
struct Settings;
class Connection;
2021-09-11 11:34:22 +00:00
struct ConnectionParameters;
using ConnectionPtr = std::shared_ptr<Connection>;
using Connections = std::vector<ConnectionPtr>;
2012-05-16 18:03:00 +00:00
2021-10-08 17:21:19 +00:00
class NativeReader;
class NativeWriter;
2016-07-31 03:53:16 +00:00
/** Connection with database server, to use by client.
* How to use - see Core/Protocol.h
* (Implementation of server end - see Server/TCPHandler.h)
2012-05-30 06:46:57 +00:00
*
2016-07-31 03:53:16 +00:00
* As 'default_database' empty string could be passed
* - in that case, server will use it's own default database.
2012-05-16 18:03:00 +00:00
*/
2021-08-17 19:59:51 +00:00
class Connection : public IServerConnection
2012-05-16 18:03:00 +00:00
{
2015-11-06 17:44:01 +00:00
friend class MultiplexedConnections;
2012-05-16 18:03:00 +00:00
public:
Connection(const String & host_, UInt16 port_,
2015-05-28 21:41:28 +00:00
const String & default_database_,
const String & user_, const String & password_,
const ssh::SSHKey & ssh_private_key_,
2022-08-03 19:44:08 +00:00
const String & quota_key_,
const String & cluster_,
const String & cluster_secret_,
2021-03-29 00:35:09 +00:00
const String & client_name_,
Protocol::Compression compression_,
Protocol::Secure secure_);
2021-10-08 17:21:19 +00:00
~Connection() override;
2021-10-16 19:48:51 +00:00
IServerConnection::Type getConnectionType() const override { return IServerConnection::Type::SERVER; }
2021-09-19 18:24:06 +00:00
static ServerConnectionPtr createConnection(const ConnectionParameters & parameters, ContextPtr context);
2021-09-11 11:34:22 +00:00
2016-07-31 03:53:16 +00:00
/// Set throttler of network traffic. One throttler could be used for multiple connections to limit total traffic.
2021-08-17 19:59:51 +00:00
void setThrottler(const ThrottlerPtr & throttler_) override
{
throttler = throttler_;
}
2016-07-31 03:53:16 +00:00
/// Change default database. Changes will take effect on next reconnect.
2021-08-17 19:59:51 +00:00
void setDefaultDatabase(const String & database) override;
void getServerVersion(const ConnectionTimeouts & timeouts,
String & name,
UInt64 & version_major,
UInt64 & version_minor,
UInt64 & version_patch,
2021-08-17 19:59:51 +00:00
UInt64 & revision) override;
UInt64 getServerRevision(const ConnectionTimeouts & timeouts) override;
2021-08-17 19:59:51 +00:00
const String & getServerTimezone(const ConnectionTimeouts & timeouts) override;
const String & getServerDisplayName(const ConnectionTimeouts & timeouts) override;
2016-07-31 03:53:16 +00:00
/// For log and exception messages.
2021-08-17 19:59:51 +00:00
const String & getDescription() const override;
2016-06-08 14:39:49 +00:00
const String & getHost() const;
UInt16 getPort() const;
const String & getDefaultDatabase() const;
Protocol::Compression getCompression() const { return compression; }
std::vector<std::pair<String, String>> getPasswordComplexityRules() const override { return password_complexity_rules; }
void sendQuery(
2021-08-27 08:46:31 +00:00
const ConnectionTimeouts & timeouts,
const String & query,
const NameToNameMap& query_parameters,
const String & query_id_/* = "" */,
UInt64 stage/* = QueryProcessingStage::Complete */,
const Settings * settings/* = nullptr */,
const ClientInfo * client_info/* = nullptr */,
2022-05-06 15:04:03 +00:00
bool with_pending_data/* = false */,
std::function<void(const Progress &)> process_progress_callback) override;
2021-08-17 19:59:51 +00:00
void sendCancel() override;
void sendData(const Block & block, const String & name/* = "" */, bool scalar/* = false */) override;
2021-08-17 19:59:51 +00:00
2023-02-03 13:34:18 +00:00
void sendMergeTreeReadTaskResponse(const ParallelReadResponse & response) override;
2021-08-17 19:59:51 +00:00
void sendExternalTablesData(ExternalTablesData & data) override;
bool poll(size_t timeout_microseconds/* = 0 */) override;
2021-08-17 19:59:51 +00:00
bool hasReadPendingData() const override;
std::optional<UInt64> checkPacket(size_t timeout_microseconds/* = 0*/) override;
2021-08-17 19:59:51 +00:00
Packet receivePacket() override;
2021-08-17 19:59:51 +00:00
void forceConnected(const ConnectionTimeouts & timeouts) override;
2021-08-17 19:59:51 +00:00
bool isConnected() const override { return connected; }
bool checkConnected(const ConnectionTimeouts & timeouts) override { return connected && ping(timeouts); }
2021-08-17 19:59:51 +00:00
void disconnect() override;
2020-06-30 09:25:23 +00:00
2021-08-17 19:59:51 +00:00
/// Send prepared block of data (serialized and, if need, compressed), that will be read from 'input'.
/// You could pass size of serialized/compressed block.
void sendPreparedData(ReadBuffer & input, size_t size, const String & name = "");
void sendReadTaskResponse(const String &);
/// Send all scalars.
void sendScalarsData(Scalars & data);
/// Send parts' uuids to excluded them from query processing
void sendIgnoredPartUUIDs(const std::vector<UUID> & uuids);
TablesStatusResponse getTablesStatus(const ConnectionTimeouts & timeouts,
const TablesStatusRequest & request);
size_t outBytesCount() const { return out ? out->count() : 0; }
size_t inBytesCount() const { return in ? in->count() : 0; }
2021-01-19 19:21:06 +00:00
Poco::Net::Socket * getSocket() { return socket.get(); }
2021-02-17 17:44:11 +00:00
/// Each time read from socket blocks and async_callback is set, it will be called. You can poll socket inside it.
2021-02-17 17:34:52 +00:00
void setAsyncCallback(AsyncCallback async_callback_)
{
2021-02-23 23:22:07 +00:00
async_callback = std::move(async_callback_);
2021-02-17 17:34:52 +00:00
if (in)
2023-03-03 19:30:43 +00:00
in->setAsyncCallback(async_callback);
if (out)
out->setAsyncCallback(async_callback);
2021-02-17 17:34:52 +00:00
}
2023-03-03 19:30:43 +00:00
bool haveMoreAddressesToConnect() const { return have_more_addresses_to_connect; }
2012-05-16 18:03:00 +00:00
private:
String host;
UInt16 port;
2012-05-30 06:46:57 +00:00
String default_database;
2013-08-10 09:04:45 +00:00
String user;
String password;
ssh::SSHKey ssh_private_key;
2022-08-03 19:44:08 +00:00
String quota_key;
/// For inter-server authorization
String cluster;
String cluster_secret;
/// For DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET
String salt;
/// For DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2
std::optional<UInt64> nonce;
/// Address is resolved during the first connection (or the following reconnects)
/// Use it only for logging purposes
std::optional<Poco::Net::SocketAddress> current_resolved_address;
2017-03-09 04:26:17 +00:00
/// For messages in log and in exceptions.
String description;
void setDescription();
/// Returns resolved address if it was resolved.
std::optional<Poco::Net::SocketAddress> getResolvedAddress() const;
2012-05-23 19:51:30 +00:00
String client_name;
2015-02-01 07:21:19 +00:00
bool connected = false;
2012-05-16 18:20:45 +00:00
String server_name;
2015-02-01 07:21:19 +00:00
UInt64 server_version_major = 0;
UInt64 server_version_minor = 0;
UInt64 server_version_patch = 0;
2015-02-01 07:21:19 +00:00
UInt64 server_revision = 0;
String server_timezone;
2018-02-26 06:49:17 +00:00
String server_display_name;
2017-09-28 19:43:31 +00:00
std::unique_ptr<Poco::Net::StreamSocket> socket;
2020-12-02 17:02:14 +00:00
std::shared_ptr<ReadBufferFromPocoSocket> in;
2023-03-03 19:30:43 +00:00
std::shared_ptr<WriteBufferFromPocoSocket> out;
std::optional<UInt64> last_input_packet_type;
String query_id;
2017-10-03 14:52:08 +00:00
Protocol::Compression compression; /// Enable data compression for communication.
Protocol::Secure secure; /// Enable data encryption for communication.
/// What compression settings to use while sending data for INSERT queries and external tables.
2018-12-21 12:17:30 +00:00
CompressionCodecPtr compression_codec;
2016-07-31 03:53:16 +00:00
/** If not nullptr, used to limit network traffic.
* Only traffic for transferring blocks is accounted. Other packets don't.
*/
ThrottlerPtr throttler;
std::vector<std::pair<String, String>> password_complexity_rules;
2016-07-31 03:53:16 +00:00
/// From where to read query execution result.
std::shared_ptr<ReadBuffer> maybe_compressed_in;
2021-10-08 17:21:19 +00:00
std::unique_ptr<NativeReader> block_in;
std::unique_ptr<NativeReader> block_logs_in;
2021-08-30 11:04:59 +00:00
std::unique_ptr<NativeReader> block_profile_events_in;
2016-07-31 03:53:16 +00:00
/// Where to write data for INSERT.
std::shared_ptr<WriteBuffer> maybe_compressed_out;
2021-10-08 17:21:19 +00:00
std::unique_ptr<NativeWriter> block_out;
bool have_more_addresses_to_connect = false;
2016-07-31 03:53:16 +00:00
/// Logger is created lazily, for avoid to run DNS request in constructor.
class LoggerWrapper
{
public:
2021-03-19 21:49:18 +00:00
explicit LoggerWrapper(Connection & parent_)
2015-05-28 21:41:28 +00:00
: log(nullptr), parent(parent_)
{
}
2020-05-30 21:57:37 +00:00
Poco::Logger * get()
{
if (!log)
2020-05-30 21:57:37 +00:00
log = &Poco::Logger::get("Connection (" + parent.getDescription() + ")");
return log;
}
private:
2020-05-30 21:57:37 +00:00
std::atomic<Poco::Logger *> log;
2015-05-28 21:41:28 +00:00
Connection & parent;
};
LoggerWrapper log_wrapper;
2021-02-19 21:45:58 +00:00
AsyncCallback async_callback = {};
2021-02-17 17:34:52 +00:00
void connect(const ConnectionTimeouts & timeouts);
2021-02-17 17:34:52 +00:00
void sendHello();
String packStringForSshSign(String challenge);
void performHandshakeForSSHAuth();
2022-08-03 19:44:08 +00:00
void sendAddendum();
void receiveHello(const Poco::Timespan & handshake_timeout);
#if USE_SSL
void sendClusterNameAndSalt();
#endif
bool ping(const ConnectionTimeouts & timeouts);
2012-05-16 18:03:00 +00:00
Block receiveData();
Block receiveLogData();
2021-10-08 17:21:19 +00:00
Block receiveDataImpl(NativeReader & reader);
2021-08-30 11:04:59 +00:00
Block receiveProfileEvents();
2021-03-22 17:12:31 +00:00
std::vector<String> receiveMultistringMessage(UInt64 msg_type) const;
std::unique_ptr<Exception> receiveException() const;
Progress receiveProgress() const;
2023-02-03 13:34:18 +00:00
ParallelReadRequest receiveParallelReadRequest() const;
InitialAllRangesAnnouncement receiveInitialParallelReadAnnouncement() const;
2021-10-15 20:18:20 +00:00
ProfileInfo receiveProfileInfo() const;
void initInputBuffers();
void initBlockInput();
void initBlockLogsInput();
2021-08-30 11:04:59 +00:00
void initBlockProfileEventsInput();
[[noreturn]] void throwUnexpectedPacket(UInt64 packet_type, const char * expected) const;
2012-05-16 18:03:00 +00:00
};
2023-03-03 19:30:43 +00:00
template <typename Conn>
2021-02-17 17:34:52 +00:00
class AsyncCallbackSetter
{
public:
2023-03-03 19:30:43 +00:00
AsyncCallbackSetter(Conn * connection_, AsyncCallback async_callback) : connection(connection_)
2021-02-17 17:34:52 +00:00
{
connection->setAsyncCallback(std::move(async_callback));
}
~AsyncCallbackSetter()
{
connection->setAsyncCallback({});
}
private:
2023-03-03 19:30:43 +00:00
Conn * connection;
2021-02-17 17:34:52 +00:00
};
2012-05-16 18:03:00 +00:00
}