ClickHouse/dbms/include/DB/Client/Connection.h

212 lines
7.0 KiB
C
Raw Normal View History

2012-05-21 20:38:34 +00:00
#pragma once
2012-10-16 19:20:58 +00:00
#include <Yandex/logger_useful.h>
2012-05-16 18:03:00 +00:00
#include <Poco/Net/StreamSocket.h>
#include <DB/Core/Block.h>
#include <DB/Core/Defines.h>
2012-05-16 18:03:00 +00:00
#include <DB/Core/Progress.h>
#include <DB/Core/Protocol.h>
#include <DB/Core/QueryProcessingStage.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
2015-01-18 08:25:56 +00:00
#include <DB/DataStreams/BlockStreamProfileInfo.h>
2012-05-16 18:03:00 +00:00
#include <DB/Interpreters/Settings.h>
#include <atomic>
2012-05-16 18:03:00 +00:00
namespace DB
{
using Poco::SharedPtr;
/// Поток блоков читающих из таблицы и ее имя
typedef std::pair<BlockInputStreamPtr, std::string> ExternalTableData;
/// Вектор пар, описывающих таблицы
typedef std::vector<ExternalTableData> ExternalTablesData;
2012-05-16 18:03:00 +00:00
/** Соединение с сервером БД для использования в клиенте.
* Как использовать - см. Core/Protocol.h
* (Реализацию на стороне сервера - см. Server/TCPHandler.h)
2012-05-30 06:46:57 +00:00
*
* В качестве default_database может быть указана пустая строка
* - в этом случае сервер использует свою БД по-умолчанию.
2012-05-16 18:03:00 +00:00
*/
2012-10-22 19:03:01 +00:00
class Connection : private boost::noncopyable
2012-05-16 18:03:00 +00:00
{
public:
2012-05-30 06:46:57 +00:00
Connection(const String & host_, UInt16 port_, const String & default_database_,
2013-08-10 09:04:45 +00:00
const String & user_, const String & password_,
2012-08-02 17:33:31 +00:00
const DataTypeFactory & data_type_factory_,
2012-05-23 19:51:30 +00:00
const String & client_name_ = "client",
2012-07-26 19:42:20 +00:00
Protocol::Compression::Enum compression_ = Protocol::Compression::Enable,
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0))
2013-08-10 09:04:45 +00:00
:
host(host_), port(port_), default_database(default_database_),
user(user_), password(password_),
client_name(client_name_), connected(false),
2012-05-16 18:20:45 +00:00
server_version_major(0), server_version_minor(0), server_revision(0),
query_id(""), compression(compression_), data_type_factory(data_type_factory_),
2012-10-16 19:20:58 +00:00
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_),
log_wrapper(host, port)
2012-05-16 18:03:00 +00:00
{
2012-05-21 20:38:34 +00:00
/// Соединеняемся не сразу, а при первой необходимости.
2013-08-10 09:04:45 +00:00
if (user.empty())
user = "default";
2012-05-16 18:03:00 +00:00
}
virtual ~Connection() {};
/// Пакет, который может быть получен от сервера.
struct Packet
{
UInt64 type;
Block block;
SharedPtr<Exception> exception;
Progress progress;
2013-05-22 14:57:43 +00:00
BlockStreamProfileInfo profile_info;
2012-05-16 18:03:00 +00:00
Packet() : type(Protocol::Server::Hello) {}
};
/// Изменить базу данных по умолчанию. Изменения начинают использоваться только при следующем переподключении.
void setDefaultDatabase(const String & database);
2012-05-16 18:20:45 +00:00
void getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & revision);
2012-05-16 18:03:00 +00:00
/// Адрес сервера - для сообщений в логе и в эксепшенах.
String getServerAddress() const;
/// Если последний флаг true, то затем необходимо вызвать sendExternalTablesData
void sendQuery(const String & query, const String & query_id_ = "", UInt64 stage = QueryProcessingStage::Complete,
2014-04-08 07:31:51 +00:00
const Settings * settings = nullptr, bool with_pending_data = false);
2012-05-16 18:03:00 +00:00
void sendCancel();
/// Отправить блок данных, на сервере сохранить во временную таблицу name
void sendData(const Block & block, const String & name = "");
/// Отправить все содержимое внешних таблиц
void sendExternalTablesData(ExternalTablesData & data);
2012-05-16 18:03:00 +00:00
/// Отправить блок данных, который уже был заранее сериализован (и, если надо, сжат), который следует прочитать из input-а.
/// можно передать размер сериализованного/сжатого блока.
void sendPreparedData(ReadBuffer & input, size_t size, const String & name = "");
/// Проверить, есть ли данные, которые можно прочитать.
2012-05-16 18:03:00 +00:00
bool poll(size_t timeout_microseconds = 0);
/// Получить пакет от сервера.
Packet receivePacket();
/// Если ещё не соединено, или соединение разорвано - соединиться. Если не получилось - кинуть исключение.
void forceConnected();
/** Разорвать соединение.
* Это может быть нужно, например, чтобы соединение не осталось висеть в
* рассинхронизированном состоянии (когда кто-то чего-то продолжает ждать) после эксепшена.
*/
void disconnect();
const std::string & getHost() const
{
return host;
}
UInt16 getPort() const
{
return port;
}
size_t outBytesCount() const { return !out.isNull() ? out->count() : 0; }
size_t inBytesCount() const { return !in.isNull() ? in->count() : 0; }
2012-05-16 18:03:00 +00:00
private:
String host;
UInt16 port;
2012-05-30 06:46:57 +00:00
String default_database;
2013-08-10 09:04:45 +00:00
String user;
String password;
2012-05-16 18:20:45 +00:00
2012-05-23 19:51:30 +00:00
String client_name;
2012-05-21 20:38:34 +00:00
bool connected;
2012-05-16 18:20:45 +00:00
String server_name;
UInt64 server_version_major;
UInt64 server_version_minor;
UInt64 server_revision;
2012-05-16 18:03:00 +00:00
Poco::Net::StreamSocket socket;
SharedPtr<ReadBuffer> in;
SharedPtr<WriteBuffer> out;
2012-05-16 18:03:00 +00:00
String query_id;
2012-05-16 18:03:00 +00:00
UInt64 compression; /// Сжимать ли данные при взаимодействии с сервером.
2012-08-02 17:33:31 +00:00
const DataTypeFactory & data_type_factory;
2012-05-16 18:03:00 +00:00
2012-07-26 19:42:20 +00:00
Poco::Timespan connect_timeout;
Poco::Timespan receive_timeout;
Poco::Timespan send_timeout;
2012-05-16 18:03:00 +00:00
/// Откуда читать результат выполнения запроса.
SharedPtr<ReadBuffer> maybe_compressed_in;
BlockInputStreamPtr block_in;
/// Куда писать данные INSERT-а.
SharedPtr<WriteBuffer> maybe_compressed_out;
BlockOutputStreamPtr block_out;
/// логгер, создаваемый лениво, чтобы не обращаться к DNS в конструкторе
class LoggerWrapper
{
public:
LoggerWrapper(std::string & host_, size_t port_) : log(nullptr), host(host_), port(port_)
{
}
Logger * get()
{
if (!log)
log = &Logger::get("Connection (" + Poco::Net::SocketAddress(host, port).toString() + ")");
return log;
}
private:
std::atomic<Logger *> log;
std::string host;
size_t port;
};
LoggerWrapper log_wrapper;
2012-10-16 19:20:58 +00:00
2012-05-16 18:03:00 +00:00
void connect();
void sendHello();
void receiveHello();
bool ping();
Block receiveData();
SharedPtr<Exception> receiveException();
Progress receiveProgress();
2013-05-22 14:57:43 +00:00
BlockStreamProfileInfo receiveProfileInfo();
void initBlockInput();
2012-05-16 18:03:00 +00:00
};
2012-05-21 20:38:34 +00:00
typedef SharedPtr<Connection> ConnectionPtr;
typedef std::vector<ConnectionPtr> Connections;
2012-05-16 18:03:00 +00:00
}