ClickHouse/dbms/src/Server/TCPHandler.h

139 lines
3.7 KiB
C
Raw Normal View History

2012-03-09 15:46:52 +00:00
#pragma once
2012-03-11 08:52:56 +00:00
#include <DB/Core/Protocol.h>
2012-05-09 13:12:38 +00:00
#include <DB/Core/QueryProcessingStage.h>
2012-03-11 08:52:56 +00:00
2012-03-19 12:57:56 +00:00
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
2012-05-09 08:16:09 +00:00
#include <DB/IO/ReadBufferFromPocoSocket.h>
2012-05-09 15:15:45 +00:00
#include <DB/IO/WriteBufferFromPocoSocket.h>
2012-03-19 12:57:56 +00:00
2012-03-11 08:52:56 +00:00
#include <DB/DataStreams/BlockIO.h>
2012-05-09 15:15:45 +00:00
#include <statdaemons/Stopwatch.h>
2012-03-09 15:46:52 +00:00
#include "Server.h"
namespace DB
{
2012-03-11 08:52:56 +00:00
/// Состояние обработки запроса.
struct QueryState
{
/// Идентификатор запроса.
String query_id;
2012-03-11 08:52:56 +00:00
2012-05-09 13:12:38 +00:00
QueryProcessingStage::Enum stage;
2012-03-11 08:52:56 +00:00
Protocol::Compression::Enum compression;
2012-03-19 12:57:56 +00:00
/// Откуда читать данные для INSERT-а.
SharedPtr<ReadBuffer> maybe_compressed_in;
BlockInputStreamPtr block_in;
/// Куда писать возвращаемые данные.
SharedPtr<WriteBuffer> maybe_compressed_out;
BlockOutputStreamPtr block_out;
2012-03-11 08:52:56 +00:00
/// Текст запроса.
String query;
/// Потоки блоков, с помощью которых выполнять запрос.
BlockIO io;
2012-05-09 15:50:42 +00:00
bool is_cancelled;
2012-05-09 15:15:45 +00:00
/// Данные были отправлены.
bool sent_all_data;
2012-05-09 16:34:41 +00:00
/// Для вывода прогресса - разница после предыдущей отправки прогресса.
volatile size_t rows_processed;
volatile size_t bytes_processed;
2012-05-09 15:50:42 +00:00
QueryState() : query_id(""), stage(QueryProcessingStage::Complete), compression(Protocol::Compression::Disable),
2012-05-09 15:50:42 +00:00
is_cancelled(false), sent_all_data(false), rows_processed(0), bytes_processed(0) {}
2012-03-11 08:52:56 +00:00
void reset()
{
2012-05-09 08:16:09 +00:00
*this = QueryState();
2012-03-11 08:52:56 +00:00
}
bool empty()
{
return query_id == "";
2012-03-11 08:52:56 +00:00
}
};
2012-03-09 15:46:52 +00:00
class TCPHandler : public Poco::Net::TCPServerConnection
{
public:
TCPHandler(Server & server_, const Poco::Net::StreamSocket & socket_)
: Poco::Net::TCPServerConnection(socket_), server(server_),
log(&Logger::get("TCPHandler")), client_revision(0),
2013-09-14 05:14:22 +00:00
connection_context(*server.global_context), query_context(connection_context)
2012-03-09 15:46:52 +00:00
{
}
void run();
private:
Server & server;
Logger * log;
UInt64 client_revision;
2012-05-17 19:15:53 +00:00
Context connection_context;
Context query_context;
2012-05-17 19:15:53 +00:00
2012-05-21 06:49:05 +00:00
SharedPtr<ReadBufferFromPocoSocket> in;
SharedPtr<WriteBufferFromPocoSocket> out;
2012-05-09 15:15:45 +00:00
/// Время после последней проверки остановки запроса и отправки прогресса.
Stopwatch after_check_cancelled;
Stopwatch after_send_progress;
2012-05-30 06:46:57 +00:00
String default_database;
2012-12-06 17:32:48 +00:00
/// На данный момент, поддерживается одновременное выполнение только одного запроса в соединении.
QueryState state;
2012-05-09 08:16:09 +00:00
2012-03-09 15:46:52 +00:00
void runImpl();
2012-03-11 08:52:56 +00:00
2012-05-21 06:49:05 +00:00
void receiveHello();
bool receivePacket();
void receiveQuery();
bool receiveData();
2012-05-17 19:15:53 +00:00
2012-05-21 06:49:05 +00:00
/// Обработать запрос INSERT
void processInsertQuery(const Settings & global_settings);
2012-05-17 19:15:53 +00:00
2012-05-21 06:49:05 +00:00
/// Обработать запрос, который не требует приёма блоков данных от клиента
void processOrdinaryQuery();
2012-03-11 08:52:56 +00:00
2012-05-21 06:49:05 +00:00
void sendHello();
void sendData(Block & block); /// Записать в сеть блок.
2012-05-30 06:46:57 +00:00
void sendException(const Exception & e);
void sendProgress();
2012-05-21 06:49:05 +00:00
void sendEndOfStream();
2013-05-22 14:57:43 +00:00
void sendProfileInfo();
void sendTotals();
void sendExtremes();
/// Создаёт state.block_in/block_out для чтения/записи блоков, в зависимости от того, включено ли сжатие.
void initBlockInput();
void initBlockOutput();
2012-05-09 08:16:09 +00:00
2012-05-21 06:49:05 +00:00
bool isQueryCancelled();
2012-07-21 07:02:55 +00:00
/// Эта функция вызывается из разных потоков.
void updateProgress(size_t rows, size_t bytes);
2012-07-21 07:02:55 +00:00
/// Вывести информацию о скорости выполнения SELECT запроса.
void logProfileInfo(Stopwatch & watch, IBlockInputStream & in);
2012-03-09 15:46:52 +00:00
};
2012-03-11 08:52:56 +00:00
2012-03-09 15:46:52 +00:00
}