#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { /// Поток блоков читающих из таблицы и ее имя using ExternalTableData = std::pair; /// Вектор пар, описывающих таблицы using ExternalTablesData = std::vector; class Connection; using ConnectionPtr = std::shared_ptr; using Connections = std::vector; /** Connection with database server, to use by client. * How to use - see Core/Protocol.h * (Implementation of server end - see Server/TCPHandler.h) * * As 'default_database' empty string could be passed * - in that case, server will use it's own default database. */ class Connection : private boost::noncopyable { friend class ParallelReplicas; friend class MultiplexedConnections; public: Connection(const String & host_, UInt16 port_, const String & default_database_, const String & user_, const String & password_, const String & client_name_ = "client", Protocol::Compression::Enum compression_ = Protocol::Compression::Enable, Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0), Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0), Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0), Poco::Timespan ping_timeout_ = Poco::Timespan(DBMS_DEFAULT_PING_TIMEOUT_SEC, 0)) : host(host_), port(port_), default_database(default_database_), user(user_), password(password_), resolved_address(host, port), client_name(client_name_), compression(compression_), connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_), ping_timeout(ping_timeout_), log_wrapper(*this) { /// Don't connect immediately, only on first need. if (user.empty()) user = "default"; setDescription(); } Connection(const String & host_, UInt16 port_, const Poco::Net::SocketAddress & resolved_address_, const String & default_database_, const String & user_, const String & password_, const String & client_name_ = "client", Protocol::Compression::Enum compression_ = Protocol::Compression::Enable, Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0), Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0), Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0), Poco::Timespan ping_timeout_ = Poco::Timespan(DBMS_DEFAULT_PING_TIMEOUT_SEC, 0)) : host(host_), port(port_), default_database(default_database_), user(user_), password(password_), resolved_address(resolved_address_), client_name(client_name_), compression(compression_), connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_), ping_timeout(ping_timeout_), log_wrapper(*this) { /// Don't connect immediately, only on first need. if (user.empty()) user = "default"; setDescription(); } virtual ~Connection() {}; /// Set throttler of network traffic. One throttler could be used for multiple connections to limit total traffic. void setThrottler(const ThrottlerPtr & throttler_) { throttler = throttler_; } /// Packet that could be received from server. struct Packet { UInt64 type; Block block; std::unique_ptr exception; Progress progress; BlockStreamProfileInfo profile_info; Packet() : type(Protocol::Server::Hello) {} }; /// Change default database. Changes will take effect on next reconnect. void setDefaultDatabase(const String & database); void getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & revision); /// For log and exception messages. const String & getDescription() const; const String & getHost() const; UInt16 getPort() const; const String & getDefaultDatabase() const; /// If last flag is true, you need to call sendExternalTablesData after. void sendQuery(const String & query, const String & query_id_ = "", UInt64 stage = QueryProcessingStage::Complete, const Settings * settings = nullptr, bool with_pending_data = false); void sendCancel(); /// Send block of data; if name is specified, server will write it to external (temporary) table of that name. void sendData(const Block & block, const String & name = ""); /// Send all contents of external (temporary) tables. void sendExternalTablesData(ExternalTablesData & data); /// Send prepared block of data (serialized and, if need, compressed), that will be read from 'input'. /// You could pass size of serialized/compressed block. void sendPreparedData(ReadBuffer & input, size_t size, const String & name = ""); /// Check, if has data to read. bool poll(size_t timeout_microseconds = 0); /// Check, if has data in read buffer. bool hasReadBufferPendingData() const; /// Receive packet from server. Packet receivePacket(); /// If not connected yet, or if connection is broken - then connect. If cannot connect - throw an exception. void forceConnected(); /** Disconnect. * This may be used, if connection is left in unsynchronised state * (when someone continues to wait for something) after an exception. */ void disconnect(); /** Заполнить информацию, которая необходима при получении блока для некоторых задач * (пока только для запроса DESCRIBE TABLE с Distributed-таблицами). */ void fillBlockExtraInfo(BlockExtraInfo & info) const; size_t outBytesCount() const { return out ? out->count() : 0; } size_t inBytesCount() const { return in ? in->count() : 0; } private: String host; UInt16 port; String default_database; String user; String password; /** Address could be resolved beforehand and passed to constructor. Then 'host' and 'port' fields are used just for logging. * Otherwise address is resolved in constructor. Thus, DNS based load balancing is not supported. */ Poco::Net::SocketAddress resolved_address; /// Для сообщений в логе и в эксепшенах. String description; void setDescription(); String client_name; bool connected = false; String server_name; UInt64 server_version_major = 0; UInt64 server_version_minor = 0; UInt64 server_revision = 0; Poco::Net::StreamSocket socket; std::shared_ptr in; std::shared_ptr out; String query_id; UInt64 compression; /// Enable data compression for communication. /// What compression algorithm to use while sending data for INSERT queries and external tables. CompressionMethod network_compression_method = CompressionMethod::LZ4; /** If not nullptr, used to limit network traffic. * Only traffic for transferring blocks is accounted. Other packets don't. */ ThrottlerPtr throttler; Poco::Timespan connect_timeout; Poco::Timespan receive_timeout; Poco::Timespan send_timeout; Poco::Timespan ping_timeout; /// From where to read query execution result. std::shared_ptr maybe_compressed_in; BlockInputStreamPtr block_in; /// Where to write data for INSERT. std::shared_ptr maybe_compressed_out; BlockOutputStreamPtr block_out; /// Logger is created lazily, for avoid to run DNS request in constructor. class LoggerWrapper { public: LoggerWrapper(Connection & parent_) : log(nullptr), parent(parent_) { } Logger * get() { if (!log) log = &Logger::get("Connection (" + parent.getDescription() + ")"); return log; } private: std::atomic log; Connection & parent; }; LoggerWrapper log_wrapper; void connect(); void sendHello(); void receiveHello(); bool ping(); Block receiveData(); std::unique_ptr receiveException(); Progress receiveProgress(); BlockStreamProfileInfo receiveProfileInfo(); void initBlockInput(); }; }