2012-03-19 12:57:56 +00:00
|
|
|
|
#include <iomanip>
|
|
|
|
|
|
2012-05-09 08:16:09 +00:00
|
|
|
|
#include <boost/bind.hpp>
|
|
|
|
|
|
2012-03-09 15:46:52 +00:00
|
|
|
|
#include <Yandex/Revision.h>
|
|
|
|
|
|
|
|
|
|
#include <statdaemons/Stopwatch.h>
|
|
|
|
|
|
|
|
|
|
#include <DB/Core/ErrorCodes.h>
|
2012-05-16 18:32:32 +00:00
|
|
|
|
#include <DB/Core/Progress.h>
|
2012-03-09 15:46:52 +00:00
|
|
|
|
|
2012-03-11 08:52:56 +00:00
|
|
|
|
#include <DB/IO/CompressedReadBuffer.h>
|
|
|
|
|
#include <DB/IO/CompressedWriteBuffer.h>
|
2012-03-09 15:46:52 +00:00
|
|
|
|
#include <DB/IO/copyData.h>
|
|
|
|
|
|
2012-05-09 08:16:09 +00:00
|
|
|
|
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
2012-03-09 15:46:52 +00:00
|
|
|
|
#include <DB/Interpreters/executeQuery.h>
|
|
|
|
|
|
|
|
|
|
#include "TCPHandler.h"
|
2012-05-30 01:38:02 +00:00
|
|
|
|
#include <Poco/Ext/ThreadNumber.h>
|
2012-03-09 15:46:52 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void TCPHandler::runImpl()
|
|
|
|
|
{
|
2012-05-17 19:15:53 +00:00
|
|
|
|
connection_context = server.global_context;
|
2012-07-12 19:49:22 +00:00
|
|
|
|
connection_context.session_context = &connection_context;
|
2012-07-26 20:16:57 +00:00
|
|
|
|
|
|
|
|
|
socket().setReceiveTimeout(server.global_context.settings.receive_timeout);
|
|
|
|
|
socket().setSendTimeout(server.global_context.settings.send_timeout);
|
2012-05-17 19:15:53 +00:00
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
in = new ReadBufferFromPocoSocket(socket());
|
|
|
|
|
out = new WriteBufferFromPocoSocket(socket());
|
2012-03-09 15:46:52 +00:00
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
receiveHello();
|
2012-05-30 06:46:57 +00:00
|
|
|
|
|
|
|
|
|
/// При соединении может быть указана БД по-умолчанию.
|
|
|
|
|
if (!default_database.empty())
|
|
|
|
|
{
|
|
|
|
|
Poco::ScopedLock<Poco::Mutex> lock(*connection_context.mutex);
|
|
|
|
|
|
|
|
|
|
if (connection_context.databases->end() == connection_context.databases->find(default_database))
|
|
|
|
|
{
|
|
|
|
|
Exception e("Database " + default_database + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
|
|
|
|
|
LOG_ERROR(log, "DB::Exception. Code: " << e.code() << ", e.displayText() = " << e.displayText()
|
|
|
|
|
<< ", Stack trace:\n\n" << e.getStackTrace().toString());
|
|
|
|
|
sendException(e);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
connection_context.current_database = default_database;
|
|
|
|
|
}
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
sendHello();
|
2012-03-09 15:46:52 +00:00
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
while (!in->eof())
|
2012-03-09 15:46:52 +00:00
|
|
|
|
{
|
2012-03-19 12:57:56 +00:00
|
|
|
|
Stopwatch watch;
|
2012-05-09 08:16:09 +00:00
|
|
|
|
state.reset();
|
2012-05-08 05:42:05 +00:00
|
|
|
|
|
|
|
|
|
try
|
2012-05-09 15:15:45 +00:00
|
|
|
|
{
|
2012-05-21 06:49:05 +00:00
|
|
|
|
/// Пакет Query. (Также, если пришёл пакет Ping - обрабатываем его и продолжаем ждать Query.)
|
|
|
|
|
receivePacket();
|
2012-05-08 05:42:05 +00:00
|
|
|
|
|
2012-05-17 19:15:53 +00:00
|
|
|
|
/// Обрабатываем Query
|
|
|
|
|
|
2012-05-09 15:15:45 +00:00
|
|
|
|
after_check_cancelled.restart();
|
|
|
|
|
after_send_progress.restart();
|
|
|
|
|
|
2012-05-08 05:42:05 +00:00
|
|
|
|
LOG_DEBUG(log, "Query ID: " << state.query_id);
|
|
|
|
|
LOG_DEBUG(log, "Query: " << state.query);
|
2012-05-23 19:51:30 +00:00
|
|
|
|
LOG_DEBUG(log, "Requested stage: " << QueryProcessingStage::toString(state.stage));
|
2012-05-08 05:42:05 +00:00
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
/// Запрос требует приёма данных от клиента?
|
2012-05-08 05:42:05 +00:00
|
|
|
|
if (state.io.out)
|
2012-05-21 06:49:05 +00:00
|
|
|
|
processInsertQuery();
|
|
|
|
|
else
|
|
|
|
|
processOrdinaryQuery();
|
|
|
|
|
|
|
|
|
|
sendEndOfStream();
|
2012-03-19 12:57:56 +00:00
|
|
|
|
}
|
2012-05-08 05:42:05 +00:00
|
|
|
|
catch (DB::Exception & e)
|
2012-03-19 12:57:56 +00:00
|
|
|
|
{
|
2012-05-08 05:42:05 +00:00
|
|
|
|
LOG_ERROR(log, "DB::Exception. Code: " << e.code() << ", e.displayText() = " << e.displayText()
|
|
|
|
|
<< ", Stack trace:\n\n" << e.getStackTrace().toString());
|
|
|
|
|
state.exception = e.clone();
|
2012-05-28 19:34:55 +00:00
|
|
|
|
|
|
|
|
|
if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT)
|
|
|
|
|
throw;
|
2012-05-08 05:42:05 +00:00
|
|
|
|
}
|
|
|
|
|
catch (Poco::Exception & e)
|
|
|
|
|
{
|
|
|
|
|
LOG_ERROR(log, "Poco::Exception. Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code() << ", e.displayText() = " << e.displayText());
|
|
|
|
|
state.exception = new Exception(e.message(), e.code());
|
|
|
|
|
}
|
|
|
|
|
catch (std::exception & e)
|
|
|
|
|
{
|
|
|
|
|
LOG_ERROR(log, "std::exception. Code: " << ErrorCodes::STD_EXCEPTION << ", e.what() = " << e.what());
|
|
|
|
|
state.exception = new Exception(e.what(), ErrorCodes::STD_EXCEPTION);
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
LOG_ERROR(log, "Unknown exception. Code: " << ErrorCodes::UNKNOWN_EXCEPTION);
|
|
|
|
|
state.exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
|
2012-03-19 12:57:56 +00:00
|
|
|
|
}
|
2012-03-09 15:46:52 +00:00
|
|
|
|
|
2012-05-08 05:42:05 +00:00
|
|
|
|
if (state.exception)
|
2012-05-30 06:46:57 +00:00
|
|
|
|
sendException(*state.exception);
|
2012-05-08 05:42:05 +00:00
|
|
|
|
|
2012-05-31 05:54:15 +00:00
|
|
|
|
state.reset();
|
2012-03-09 15:46:52 +00:00
|
|
|
|
watch.stop();
|
2012-03-19 12:57:56 +00:00
|
|
|
|
|
|
|
|
|
LOG_INFO(log, std::fixed << std::setprecision(3)
|
|
|
|
|
<< "Processed in " << watch.elapsedSeconds() << " sec.");
|
|
|
|
|
}
|
2012-03-11 08:52:56 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
void TCPHandler::processInsertQuery()
|
|
|
|
|
{
|
|
|
|
|
/// Отправляем клиенту блок - структура таблицы.
|
|
|
|
|
Block block = state.io.out_sample;
|
|
|
|
|
sendData(block);
|
|
|
|
|
|
|
|
|
|
while (receivePacket())
|
|
|
|
|
;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void TCPHandler::processOrdinaryQuery()
|
|
|
|
|
{
|
|
|
|
|
/// Вынимаем результат выполнения запроса, если есть, и пишем его в сеть.
|
|
|
|
|
if (state.io.in)
|
|
|
|
|
{
|
|
|
|
|
if (IProfilingBlockInputStream * profiling_in = dynamic_cast<IProfilingBlockInputStream *>(&*state.io.in))
|
|
|
|
|
{
|
|
|
|
|
profiling_in->setIsCancelledCallback(boost::bind(&TCPHandler::isQueryCancelled, this));
|
|
|
|
|
profiling_in->setProgressCallback(boost::bind(&TCPHandler::sendProgress, this, _1, _2));
|
2012-05-30 03:30:29 +00:00
|
|
|
|
|
2012-06-25 03:56:45 +00:00
|
|
|
|
std::stringstream query_pipeline;
|
|
|
|
|
profiling_in->dumpTree(query_pipeline);
|
2012-06-25 05:07:34 +00:00
|
|
|
|
LOG_DEBUG(log, "Query pipeline:\n" << query_pipeline.rdbuf());
|
2012-05-21 06:49:05 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-07-21 07:02:55 +00:00
|
|
|
|
Stopwatch watch;
|
2012-05-21 06:49:05 +00:00
|
|
|
|
while (true)
|
|
|
|
|
{
|
|
|
|
|
Block block = state.io.in->read();
|
|
|
|
|
sendData(block);
|
|
|
|
|
if (!block)
|
|
|
|
|
break;
|
|
|
|
|
}
|
2012-07-21 07:02:55 +00:00
|
|
|
|
|
|
|
|
|
watch.stop();
|
|
|
|
|
logProfileInfo(watch, *state.io.in);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void TCPHandler::logProfileInfo(Stopwatch & watch, IBlockInputStream & in)
|
|
|
|
|
{
|
|
|
|
|
/// Выведем информацию о том, сколько считано строк и байт.
|
|
|
|
|
BlockInputStreams leaves = in.getLeaves();
|
|
|
|
|
size_t rows = 0;
|
|
|
|
|
size_t bytes = 0;
|
|
|
|
|
|
|
|
|
|
for (BlockInputStreams::const_iterator it = leaves.begin(); it != leaves.end(); ++it)
|
|
|
|
|
{
|
|
|
|
|
if (const IProfilingBlockInputStream * profiling = dynamic_cast<const IProfilingBlockInputStream *>(&**it))
|
|
|
|
|
{
|
|
|
|
|
const BlockStreamProfileInfo & info = profiling->getInfo();
|
|
|
|
|
rows += info.rows;
|
|
|
|
|
bytes += info.bytes;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (rows != 0)
|
|
|
|
|
{
|
|
|
|
|
LOG_INFO(log, std::fixed << std::setprecision(3)
|
2012-07-23 06:19:17 +00:00
|
|
|
|
<< "Read " << rows << " rows, " << bytes / 1048576.0 << " MiB in " << watch.elapsedSeconds() << " sec., "
|
|
|
|
|
<< static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., " << bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.");
|
2012-05-21 06:49:05 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void TCPHandler::receiveHello()
|
2012-05-16 18:03:00 +00:00
|
|
|
|
{
|
|
|
|
|
/// Получить hello пакет.
|
|
|
|
|
UInt64 packet_type = 0;
|
|
|
|
|
String client_name;
|
|
|
|
|
UInt64 client_version_major = 0;
|
|
|
|
|
UInt64 client_version_minor = 0;
|
|
|
|
|
UInt64 client_revision = 0;
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
readVarUInt(packet_type, *in);
|
2012-05-16 18:03:00 +00:00
|
|
|
|
if (packet_type != Protocol::Client::Hello)
|
|
|
|
|
throw Exception("Unexpected packet from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
readStringBinary(client_name, *in);
|
|
|
|
|
readVarUInt(client_version_major, *in);
|
|
|
|
|
readVarUInt(client_version_minor, *in);
|
|
|
|
|
readVarUInt(client_revision, *in);
|
2012-05-30 06:46:57 +00:00
|
|
|
|
readStringBinary(default_database, *in);
|
2012-05-16 18:03:00 +00:00
|
|
|
|
|
2012-05-16 18:20:45 +00:00
|
|
|
|
LOG_DEBUG(log, "Connected " << client_name
|
|
|
|
|
<< " version " << client_version_major
|
2012-05-16 18:03:00 +00:00
|
|
|
|
<< "." << client_version_minor
|
|
|
|
|
<< "." << client_revision
|
2012-05-30 06:46:57 +00:00
|
|
|
|
<< (!default_database.empty() ? ", database: " + default_database : "")
|
2012-05-16 18:03:00 +00:00
|
|
|
|
<< ".")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
void TCPHandler::sendHello()
|
2012-03-11 08:52:56 +00:00
|
|
|
|
{
|
2012-05-21 06:49:05 +00:00
|
|
|
|
writeVarUInt(Protocol::Server::Hello, *out);
|
|
|
|
|
writeStringBinary(DBMS_NAME, *out);
|
|
|
|
|
writeVarUInt(DBMS_VERSION_MAJOR, *out);
|
|
|
|
|
writeVarUInt(DBMS_VERSION_MINOR, *out);
|
|
|
|
|
writeVarUInt(Revision::get(), *out);
|
|
|
|
|
out->next();
|
2012-03-11 08:52:56 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-05-17 19:15:53 +00:00
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
bool TCPHandler::receivePacket()
|
2012-03-11 08:52:56 +00:00
|
|
|
|
{
|
2012-05-21 06:49:05 +00:00
|
|
|
|
while (true) /// Если пришёл пакет типа Ping, то обработаем его и получаем следующий пакет.
|
2012-05-09 13:12:38 +00:00
|
|
|
|
{
|
|
|
|
|
UInt64 packet_type = 0;
|
2012-05-21 06:49:05 +00:00
|
|
|
|
readVarUInt(packet_type, *in);
|
2012-03-11 08:52:56 +00:00
|
|
|
|
|
2012-06-25 03:56:45 +00:00
|
|
|
|
// std::cerr << "Packet: " << packet_type << std::endl;
|
2012-03-19 12:57:56 +00:00
|
|
|
|
|
2012-05-09 13:12:38 +00:00
|
|
|
|
switch (packet_type)
|
|
|
|
|
{
|
|
|
|
|
case Protocol::Client::Query:
|
|
|
|
|
if (!state.empty())
|
|
|
|
|
throw Exception("Unexpected packet Query received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
2012-05-21 06:49:05 +00:00
|
|
|
|
receiveQuery();
|
2012-05-09 13:12:38 +00:00
|
|
|
|
return true;
|
|
|
|
|
|
|
|
|
|
case Protocol::Client::Data:
|
|
|
|
|
if (state.empty())
|
|
|
|
|
throw Exception("Unexpected packet Data received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
2012-05-21 06:49:05 +00:00
|
|
|
|
return receiveData();
|
2012-05-09 13:12:38 +00:00
|
|
|
|
|
|
|
|
|
case Protocol::Client::Ping:
|
2012-05-21 06:49:05 +00:00
|
|
|
|
writeVarUInt(Protocol::Server::Pong, *out);
|
|
|
|
|
out->next();
|
2012-05-17 19:15:53 +00:00
|
|
|
|
break;
|
|
|
|
|
|
2012-05-09 13:12:38 +00:00
|
|
|
|
default:
|
|
|
|
|
throw Exception("Unknown packet from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
|
|
|
|
|
}
|
2012-03-09 15:46:52 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-05-17 19:15:53 +00:00
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
void TCPHandler::receiveQuery()
|
2012-03-11 08:52:56 +00:00
|
|
|
|
{
|
|
|
|
|
UInt64 stage = 0;
|
|
|
|
|
UInt64 compression = 0;
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
readIntBinary(state.query_id, *in);
|
2012-03-11 08:52:56 +00:00
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
readVarUInt(stage, *in);
|
2012-05-09 13:12:38 +00:00
|
|
|
|
state.stage = QueryProcessingStage::Enum(stage);
|
2012-03-11 08:52:56 +00:00
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
readVarUInt(compression, *in);
|
2012-03-11 08:52:56 +00:00
|
|
|
|
state.compression = Protocol::Compression::Enum(compression);
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
readStringBinary(state.query, *in);
|
2012-03-19 12:57:56 +00:00
|
|
|
|
|
2012-05-30 06:46:57 +00:00
|
|
|
|
state.context = connection_context;
|
2012-05-30 01:38:02 +00:00
|
|
|
|
state.io = executeQuery(state.query, state.context, state.stage);
|
2012-03-11 08:52:56 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
|
|
|
|
|
bool TCPHandler::receiveData()
|
2012-03-11 08:52:56 +00:00
|
|
|
|
{
|
2012-03-19 12:57:56 +00:00
|
|
|
|
if (!state.block_in)
|
|
|
|
|
{
|
2012-05-21 06:49:05 +00:00
|
|
|
|
if (state.compression == Protocol::Compression::Enable)
|
|
|
|
|
state.maybe_compressed_in = new CompressedReadBuffer(*in);
|
|
|
|
|
else
|
|
|
|
|
state.maybe_compressed_in = in;
|
2012-03-19 12:57:56 +00:00
|
|
|
|
|
|
|
|
|
state.block_in = state.context.format_factory->getInput(
|
2012-05-21 06:49:05 +00:00
|
|
|
|
"Native",
|
2012-03-19 12:57:56 +00:00
|
|
|
|
*state.maybe_compressed_in,
|
|
|
|
|
state.io.out_sample,
|
|
|
|
|
state.context.settings.max_block_size,
|
|
|
|
|
*state.context.data_type_factory);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Прочитать из сети один блок и засунуть его в state.io.out (данные для INSERT-а)
|
|
|
|
|
Block block = state.block_in->read();
|
|
|
|
|
if (block)
|
|
|
|
|
{
|
|
|
|
|
state.io.out->write(block);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
return false;
|
|
|
|
|
}
|
2012-03-11 08:52:56 +00:00
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
|
|
|
|
|
bool TCPHandler::isQueryCancelled()
|
2012-05-09 08:16:09 +00:00
|
|
|
|
{
|
2012-05-09 15:15:45 +00:00
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(is_cancelled_mutex);
|
2012-05-09 15:50:42 +00:00
|
|
|
|
|
2012-05-09 16:55:56 +00:00
|
|
|
|
if (state.is_cancelled || state.sent_all_data)
|
2012-05-09 15:50:42 +00:00
|
|
|
|
return true;
|
2012-05-09 15:15:45 +00:00
|
|
|
|
|
|
|
|
|
if (after_check_cancelled.elapsed() / 1000 < state.context.settings.interactive_delay)
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
|
|
after_check_cancelled.restart();
|
|
|
|
|
|
2012-05-09 08:16:09 +00:00
|
|
|
|
/// Во время выполнения запроса, единственный пакет, который может прийти от клиента - это остановка выполнения запроса.
|
2012-05-21 06:49:05 +00:00
|
|
|
|
if (in->poll(0))
|
2012-05-09 08:16:09 +00:00
|
|
|
|
{
|
2012-06-25 03:56:45 +00:00
|
|
|
|
// std::cerr << "checking cancelled; socket has data" << std::endl;
|
2012-05-09 08:16:09 +00:00
|
|
|
|
|
|
|
|
|
UInt64 packet_type = 0;
|
2012-05-21 06:49:05 +00:00
|
|
|
|
readVarUInt(packet_type, *in);
|
2012-05-09 08:16:09 +00:00
|
|
|
|
|
|
|
|
|
switch (packet_type)
|
|
|
|
|
{
|
|
|
|
|
case Protocol::Client::Cancel:
|
|
|
|
|
if (state.empty())
|
|
|
|
|
throw Exception("Unexpected packet Cancel received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
|
|
|
|
LOG_INFO(log, "Query was cancelled.");
|
2012-05-09 15:50:42 +00:00
|
|
|
|
state.is_cancelled = true;
|
2012-05-09 08:16:09 +00:00
|
|
|
|
return true;
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
throw Exception("Unknown packet from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
|
|
|
|
|
void TCPHandler::sendData(Block & block)
|
2012-03-19 12:57:56 +00:00
|
|
|
|
{
|
2012-05-09 15:15:45 +00:00
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(send_mutex);
|
2012-05-09 08:16:09 +00:00
|
|
|
|
|
2012-03-19 12:57:56 +00:00
|
|
|
|
if (!state.block_out)
|
|
|
|
|
{
|
2012-05-21 06:49:05 +00:00
|
|
|
|
if (state.compression == Protocol::Compression::Enable)
|
|
|
|
|
state.maybe_compressed_out = new CompressedWriteBuffer(*out);
|
|
|
|
|
else
|
|
|
|
|
state.maybe_compressed_out = out;
|
2012-03-19 12:57:56 +00:00
|
|
|
|
|
|
|
|
|
state.block_out = state.context.format_factory->getOutput(
|
2012-05-21 06:49:05 +00:00
|
|
|
|
"Native",
|
2012-03-19 12:57:56 +00:00
|
|
|
|
*state.maybe_compressed_out,
|
|
|
|
|
state.io.in_sample);
|
|
|
|
|
}
|
2012-03-11 08:52:56 +00:00
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
writeVarUInt(Protocol::Server::Data, *out);
|
|
|
|
|
|
|
|
|
|
state.block_out->write(block);
|
|
|
|
|
state.maybe_compressed_out->next();
|
|
|
|
|
out->next();
|
2012-03-19 12:57:56 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
|
2012-05-30 06:46:57 +00:00
|
|
|
|
void TCPHandler::sendException(const Exception & e)
|
2012-03-19 12:57:56 +00:00
|
|
|
|
{
|
2012-05-09 15:15:45 +00:00
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(send_mutex);
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
writeVarUInt(Protocol::Server::Exception, *out);
|
2012-05-30 06:46:57 +00:00
|
|
|
|
writeException(e, *out);
|
2012-05-21 06:49:05 +00:00
|
|
|
|
out->next();
|
2012-03-19 12:57:56 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
|
|
|
|
|
void TCPHandler::sendEndOfStream()
|
2012-05-08 11:19:00 +00:00
|
|
|
|
{
|
2012-05-09 15:15:45 +00:00
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(send_mutex);
|
|
|
|
|
|
2012-07-15 21:43:04 +00:00
|
|
|
|
state.sent_all_data = true;
|
2012-05-21 06:49:05 +00:00
|
|
|
|
writeVarUInt(Protocol::Server::EndOfStream, *out);
|
|
|
|
|
out->next();
|
2012-05-08 11:19:00 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
|
|
|
|
|
void TCPHandler::sendProgress(size_t rows, size_t bytes)
|
2012-03-19 12:57:56 +00:00
|
|
|
|
{
|
2012-05-09 15:15:45 +00:00
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(send_mutex);
|
|
|
|
|
|
2012-05-09 15:50:42 +00:00
|
|
|
|
state.rows_processed += rows;
|
|
|
|
|
state.bytes_processed += bytes;
|
|
|
|
|
|
2012-05-09 15:15:45 +00:00
|
|
|
|
/// Не будем отправлять прогресс после того, как отправлены все данные.
|
|
|
|
|
if (state.sent_all_data)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
if (after_send_progress.elapsed() / 1000 < state.context.settings.interactive_delay)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
after_send_progress.restart();
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
writeVarUInt(Protocol::Server::Progress, *out);
|
2012-05-16 18:37:08 +00:00
|
|
|
|
Progress progress(state.rows_processed, state.bytes_processed);
|
2012-05-21 06:49:05 +00:00
|
|
|
|
progress.write(*out);
|
|
|
|
|
out->next();
|
2012-05-09 16:34:41 +00:00
|
|
|
|
|
|
|
|
|
state.rows_processed = 0;
|
|
|
|
|
state.bytes_processed = 0;
|
2012-03-11 08:52:56 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-03-09 15:46:52 +00:00
|
|
|
|
|
|
|
|
|
void TCPHandler::run()
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
runImpl();
|
|
|
|
|
|
|
|
|
|
LOG_INFO(log, "Done processing connection.");
|
|
|
|
|
}
|
|
|
|
|
catch (Poco::Exception & e)
|
|
|
|
|
{
|
|
|
|
|
std::stringstream s;
|
|
|
|
|
s << "Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
|
|
|
|
|
<< ", e.message() = " << e.message() << ", e.what() = " << e.what();
|
|
|
|
|
LOG_ERROR(log, s.str());
|
|
|
|
|
}
|
|
|
|
|
catch (std::exception & e)
|
|
|
|
|
{
|
|
|
|
|
std::stringstream s;
|
|
|
|
|
s << "Code: " << ErrorCodes::STD_EXCEPTION << ". " << e.what();
|
|
|
|
|
LOG_ERROR(log, s.str());
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
std::stringstream s;
|
|
|
|
|
s << "Code: " << ErrorCodes::UNKNOWN_EXCEPTION << ". Unknown exception.";
|
|
|
|
|
LOG_ERROR(log, s.str());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|