ClickHouse/dbms/src/Client/Connection.cpp

535 lines
14 KiB
C++
Raw Normal View History

#include <iomanip>
2012-05-16 18:03:00 +00:00
#include <Poco/Net/NetException.h>
#include <Yandex/Revision.h>
#include <DB/Core/Defines.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
2012-05-16 18:03:00 +00:00
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/IO/ReadBufferFromPocoSocket.h>
#include <DB/IO/WriteBufferFromPocoSocket.h>
2012-05-16 18:03:00 +00:00
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/copyData.h>
2012-05-16 18:03:00 +00:00
#include <DB/DataStreams/NativeBlockInputStream.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/Client/Connection.h>
#include <statdaemons/NetException.h>
2012-05-16 18:03:00 +00:00
namespace DB
{
void Connection::connect()
{
2012-10-15 19:38:33 +00:00
try
{
2012-10-20 06:40:55 +00:00
if (connected)
disconnect();
LOG_TRACE(log_wrapper.get(), "Connecting to " << default_database << "@" << host << ":" << port);
2012-10-18 23:38:16 +00:00
2012-10-15 19:38:33 +00:00
socket.connect(Poco::Net::SocketAddress(host, port), connect_timeout);
socket.setReceiveTimeout(receive_timeout);
socket.setSendTimeout(send_timeout);
socket.setNoDelay(true);
2012-07-26 19:42:20 +00:00
in = new ReadBufferFromPocoSocket(socket);
out = new WriteBufferFromPocoSocket(socket);
2012-10-15 19:38:33 +00:00
connected = true;
2012-05-16 18:03:00 +00:00
2012-10-15 19:38:33 +00:00
sendHello();
receiveHello();
2012-10-16 19:20:58 +00:00
LOG_TRACE(log_wrapper.get(), "Connected to " << server_name
<< " server version " << server_version_major
<< "." << server_version_minor
<< "." << server_revision
<< ".");
2012-10-15 19:38:33 +00:00
}
catch (Poco::Net::NetException & e)
{
2012-10-20 22:49:30 +00:00
disconnect();
/// Добавляем в сообщение адрес сервера. Также объект Exception запомнит stack trace. Жаль, что более точный тип исключения теряется.
throw NetException(e.displayText(), "(" + getServerAddress() + ")", ErrorCodes::NETWORK_ERROR);
2012-10-15 19:38:33 +00:00
}
catch (Poco::TimeoutException & e)
{
disconnect();
/// Добавляем в сообщение адрес сервера. Также объект Exception запомнит stack trace. Жаль, что более точный тип исключения теряется.
throw NetException(e.displayText(), "(" + getServerAddress() + ")", ErrorCodes::SOCKET_TIMEOUT);
}
2012-05-16 18:03:00 +00:00
}
2012-10-18 23:38:16 +00:00
void Connection::disconnect()
{
//LOG_TRACE(log_wrapper.get(), "Disconnecting (" << getServerAddress() << ")");
2012-10-18 23:38:16 +00:00
socket.close();
2014-04-08 07:31:51 +00:00
in = nullptr;
out = nullptr;
2012-10-18 23:38:16 +00:00
connected = false;
}
2012-05-16 18:03:00 +00:00
void Connection::sendHello()
{
//LOG_TRACE(log_wrapper.get(), "Sending hello (" << getServerAddress() << ")");
2013-08-10 09:04:45 +00:00
2012-05-21 06:49:05 +00:00
writeVarUInt(Protocol::Client::Hello, *out);
2012-05-23 19:51:30 +00:00
writeStringBinary((DBMS_NAME " ") + client_name, *out);
2012-05-21 06:49:05 +00:00
writeVarUInt(DBMS_VERSION_MAJOR, *out);
writeVarUInt(DBMS_VERSION_MINOR, *out);
writeVarUInt(Revision::get(), *out);
2012-05-30 06:46:57 +00:00
writeStringBinary(default_database, *out);
2013-08-10 09:04:45 +00:00
writeStringBinary(user, *out);
writeStringBinary(password, *out);
2012-05-21 06:49:05 +00:00
out->next();
2012-05-16 18:03:00 +00:00
}
void Connection::receiveHello()
{
//LOG_TRACE(log_wrapper.get(), "Receiving hello (" << getServerAddress() << ")");
2012-05-16 18:03:00 +00:00
/// Получить hello пакет.
UInt64 packet_type = 0;
2012-05-21 06:49:05 +00:00
readVarUInt(packet_type, *in);
2012-05-30 06:46:57 +00:00
if (packet_type == Protocol::Server::Hello)
{
readStringBinary(server_name, *in);
readVarUInt(server_version_major, *in);
readVarUInt(server_version_minor, *in);
readVarUInt(server_revision, *in);
2013-08-10 09:04:45 +00:00
/// Старые ревизии сервера не поддерживают имя пользователя и пароль, которые были отправлены в пакете hello.
if (server_revision < DBMS_MIN_REVISION_WITH_USER_PASSWORD)
throw Exception("Server revision is too old for this client. You must update server to at least " + toString(DBMS_MIN_REVISION_WITH_USER_PASSWORD) + ".",
ErrorCodes::SERVER_REVISION_IS_TOO_OLD);
2012-05-30 06:46:57 +00:00
}
else if (packet_type == Protocol::Server::Exception)
receiveException()->rethrow();
else
{
/// Закроем соединение, чтобы не было рассинхронизации.
2012-10-18 23:38:16 +00:00
disconnect();
throw NetException("Unexpected packet from server " + getServerAddress() + " (expected Hello or Exception, got "
+ String(Protocol::Server::toString(packet_type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
}
2012-05-16 18:03:00 +00:00
}
void Connection::setDefaultDatabase(const String & database)
{
default_database = database;
}
2012-05-16 18:20:45 +00:00
void Connection::getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & revision)
{
2012-05-21 20:38:34 +00:00
if (!connected)
connect();
2012-05-16 18:20:45 +00:00
name = server_name;
version_major = server_version_major;
version_minor = server_version_minor;
revision = server_revision;
}
2012-05-16 18:03:00 +00:00
void Connection::forceConnected()
{
2012-05-21 20:38:34 +00:00
if (!connected)
2012-10-18 23:38:16 +00:00
{
2012-05-21 20:38:34 +00:00
connect();
2012-10-18 23:38:16 +00:00
}
else if (!ping())
2012-05-16 18:03:00 +00:00
{
LOG_TRACE(log_wrapper.get(), "Connection was closed, will reconnect.");
2012-10-18 23:38:16 +00:00
connect();
2012-05-16 18:03:00 +00:00
}
}
struct PingTimeoutSetter
{
PingTimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & ping_timeout_)
: socket(socket_), ping_timeout(ping_timeout_)
{
old_send_timeout = socket.getSendTimeout();
old_receive_timeout = socket.getReceiveTimeout();
if (old_send_timeout > ping_timeout)
socket.setSendTimeout(ping_timeout);
if (old_receive_timeout > ping_timeout)
socket.setReceiveTimeout(ping_timeout);
}
~PingTimeoutSetter()
{
socket.setSendTimeout(old_send_timeout);
socket.setReceiveTimeout(old_receive_timeout);
}
Poco::Net::StreamSocket & socket;
Poco::Timespan ping_timeout;
Poco::Timespan old_send_timeout;
Poco::Timespan old_receive_timeout;
};
2012-05-16 18:03:00 +00:00
bool Connection::ping()
{
// LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")");
PingTimeoutSetter timeout_setter(socket, ping_timeout);
2012-10-18 23:38:16 +00:00
try
2012-06-01 10:45:29 +00:00
{
2012-10-18 23:38:16 +00:00
UInt64 pong = 0;
writeVarUInt(Protocol::Client::Ping, *out);
out->next();
2012-06-24 23:17:06 +00:00
if (in->eof())
return false;
2012-06-01 10:45:29 +00:00
readVarUInt(pong, *in);
2012-10-18 23:38:16 +00:00
/// Можем получить запоздалые пакеты прогресса. TODO: может быть, это можно исправить.
while (pong == Protocol::Server::Progress)
{
receiveProgress();
if (in->eof())
return false;
readVarUInt(pong, *in);
}
if (pong != Protocol::Server::Pong)
{
throw Exception("Unexpected packet from server " + getServerAddress() + " (expected Pong, got "
+ String(Protocol::Server::toString(pong)) + ")",
2012-10-18 23:38:16 +00:00
ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
}
}
catch (const Poco::Exception & e)
{
LOG_TRACE(log_wrapper.get(), e.displayText());
2012-10-18 23:38:16 +00:00
return false;
}
2012-05-16 18:03:00 +00:00
return true;
}
void Connection::sendQuery(const String & query, const String & query_id_, UInt64 stage, const Settings * settings, bool with_pending_data)
2012-05-16 18:03:00 +00:00
{
forceConnected();
query_id = query_id_;
2012-05-23 19:51:30 +00:00
//LOG_TRACE(log_wrapper.get(), "Sending query (" << getServerAddress() << ")");
2012-05-21 06:49:05 +00:00
writeVarUInt(Protocol::Client::Query, *out);
if (server_revision >= DBMS_MIN_REVISION_WITH_STRING_QUERY_ID)
writeStringBinary(query_id, *out);
else
writeIntBinary<UInt64>(1, *out);
/// Настройки на отдельный запрос.
if (server_revision >= DBMS_MIN_REVISION_WITH_PER_QUERY_SETTINGS)
{
if (settings)
settings->serialize(*out);
else
writeStringBinary("", *out);
}
2012-05-21 06:49:05 +00:00
writeVarUInt(stage, *out);
writeVarUInt(compression, *out);
2012-05-16 18:03:00 +00:00
2012-05-21 06:49:05 +00:00
writeStringBinary(query, *out);
2012-05-16 18:03:00 +00:00
2014-04-08 07:31:51 +00:00
maybe_compressed_in = nullptr;
maybe_compressed_out = nullptr;
block_in = nullptr;
block_out = nullptr;
/// Если версия сервера достаточно новая и стоит флаг, отправляем пустой блок, символизируя конец передачи данных.
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES && !with_pending_data)
{
sendData(Block());
out->next();
}
2012-05-16 18:03:00 +00:00
}
void Connection::sendCancel()
{
//LOG_TRACE(log_wrapper.get(), "Sending cancel (" << getServerAddress() << ")");
2012-05-21 06:49:05 +00:00
writeVarUInt(Protocol::Client::Cancel, *out);
out->next();
2012-05-16 18:03:00 +00:00
}
void Connection::sendData(const Block & block, const String & name)
2012-05-16 18:03:00 +00:00
{
//LOG_TRACE(log_wrapper.get(), "Sending data (" << getServerAddress() << ")");
2012-05-16 18:03:00 +00:00
if (!block_out)
{
2012-05-21 06:49:05 +00:00
if (compression == Protocol::Compression::Enable)
maybe_compressed_out = new CompressedWriteBuffer(*out);
else
maybe_compressed_out = out;
2012-05-16 18:03:00 +00:00
block_out = new NativeBlockOutputStream(*maybe_compressed_out, server_revision);
2012-05-16 18:03:00 +00:00
}
2012-05-21 06:49:05 +00:00
writeVarUInt(Protocol::Client::Data, *out);
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
writeStringBinary(name, *out);
size_t prev_bytes = out->count();
block.checkNestedArraysOffsets();
2012-05-16 18:03:00 +00:00
block_out->write(block);
2012-05-28 19:57:44 +00:00
maybe_compressed_out->next();
2012-05-21 06:49:05 +00:00
out->next();
if (throttler)
throttler->add(out->count() - prev_bytes);
2012-05-16 18:03:00 +00:00
}
void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String & name)
{
/// NOTE В этом методе не используется throttler (хотя можно использовать, но это пока не важно).
writeVarUInt(Protocol::Client::Data, *out);
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
writeStringBinary(name, *out);
if (0 == size)
copyData(input, *out);
else
copyData(input, *out, size);
out->next();
}
void Connection::sendExternalTablesData(ExternalTablesData & data)
{
/// Если работаем со старым сервером, то никакой информации не отправляем
if (server_revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
{
out->next();
return;
}
if (data.empty())
{
/// Отправляем пустой блок, символизируя конец передачи данных
sendData(Block());
return;
}
Stopwatch watch;
size_t out_bytes = out ? out->count() : 0;
size_t maybe_compressed_out_bytes = maybe_compressed_out ? maybe_compressed_out->count() : 0;
size_t rows = 0;
for (auto & elem : data)
{
elem.first->readPrefix();
while (Block block = elem.first->read())
{
rows += block.rowsInFirstColumn();
sendData(block, elem.second);
}
elem.first->readSuffix();
}
/// Отправляем пустой блок, символизируя конец передачи данных
sendData(Block());
out_bytes = out->count() - out_bytes;
maybe_compressed_out_bytes = maybe_compressed_out->count() - maybe_compressed_out_bytes;
double elapsed = watch.elapsedSeconds();
std::stringstream msg;
msg << std::fixed << std::setprecision(3);
msg << "Sent data for " << data.size() << " external tables, total " << rows << " rows in " << elapsed << " sec., "
<< static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., "
<< maybe_compressed_out_bytes / 1048576.0 << " MiB (" << maybe_compressed_out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)";
if (compression == Protocol::Compression::Enable)
msg << ", compressed " << static_cast<double>(maybe_compressed_out_bytes) / out_bytes << " times to "
<< out_bytes / 1048576.0 << " MiB (" << out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)";
else
msg << ", no compression.";
LOG_DEBUG(log_wrapper.get(), msg.rdbuf());
}
2012-05-16 18:03:00 +00:00
bool Connection::poll(size_t timeout_microseconds)
{
return static_cast<ReadBufferFromPocoSocket &>(*in).poll(timeout_microseconds);
2012-05-16 18:03:00 +00:00
}
bool Connection::hasReadBufferPendingData() const
{
return static_cast<const ReadBufferFromPocoSocket &>(*in).hasPendingData();
}
2012-05-16 18:03:00 +00:00
Connection::Packet Connection::receivePacket()
{
//LOG_TRACE(log_wrapper.get(), "Receiving packet (" << getServerAddress() << ")");
2012-05-16 18:03:00 +00:00
try
2012-05-16 18:03:00 +00:00
{
Packet res;
readVarUInt(res.type, *in);
switch (res.type)
{
case Protocol::Server::Data:
res.block = receiveData();
return res;
case Protocol::Server::Exception:
res.exception = receiveException();
return res;
case Protocol::Server::Progress:
res.progress = receiveProgress();
return res;
case Protocol::Server::ProfileInfo:
res.profile_info = receiveProfileInfo();
return res;
case Protocol::Server::Totals:
/// Блок с тотальными значениями передаётся так же, как обычный блок данных. Разница только в идентификаторе пакета.
res.block = receiveData();
return res;
case Protocol::Server::Extremes:
/// Аналогично.
res.block = receiveData();
return res;
case Protocol::Server::EndOfStream:
return res;
default:
/// Закроем соединение, чтобы не было рассинхронизации.
disconnect();
throw Exception("Unknown packet "
+ toString(res.type)
+ " from server " + getServerAddress(), ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
catch (Exception & e)
{
/// Дописываем в текст исключения адрес сервера, если надо.
if (e.code() != ErrorCodes::UNKNOWN_PACKET_FROM_SERVER)
e.addMessage("while receiving packet from " + getServerAddress());
throw;
2012-05-16 18:03:00 +00:00
}
}
Block Connection::receiveData()
{
//LOG_TRACE(log_wrapper.get(), "Receiving data (" << getServerAddress() << ")");
initBlockInput();
String external_table_name;
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
readStringBinary(external_table_name, *in);
size_t prev_bytes = in->count();
/// Прочитать из сети один блок
Block res = block_in->read();
if (throttler)
throttler->add(in->count() - prev_bytes);
return res;
}
void Connection::initBlockInput()
{
2012-05-16 18:03:00 +00:00
if (!block_in)
{
2012-05-21 06:49:05 +00:00
if (compression == Protocol::Compression::Enable)
maybe_compressed_in = new CompressedReadBuffer(*in);
else
maybe_compressed_in = in;
block_in = new NativeBlockInputStream(*maybe_compressed_in, data_type_factory, server_revision);
2012-05-16 18:03:00 +00:00
}
}
String Connection::getServerAddress() const
{
return Poco::Net::SocketAddress(host, port).toString();
}
2012-05-16 18:03:00 +00:00
SharedPtr<Exception> Connection::receiveException()
{
//LOG_TRACE(log_wrapper.get(), "Receiving exception (" << getServerAddress() << ")");
2012-05-16 18:03:00 +00:00
Exception e;
readException(e, *in, "Received from " + getServerAddress());
2012-05-16 18:03:00 +00:00
return e.clone();
}
Progress Connection::receiveProgress()
{
//LOG_TRACE(log_wrapper.get(), "Receiving progress (" << getServerAddress() << ")");
2012-05-16 18:03:00 +00:00
Progress progress;
progress.read(*in, server_revision);
2012-05-16 18:03:00 +00:00
return progress;
}
2013-05-22 14:57:43 +00:00
BlockStreamProfileInfo Connection::receiveProfileInfo()
{
BlockStreamProfileInfo profile_info;
profile_info.read(*in);
return profile_info;
}
2012-05-16 18:03:00 +00:00
}