dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-03-09 15:46:52 +00:00
parent 1039ea55d9
commit d2c7aafe14
11 changed files with 390 additions and 22 deletions

View File

@ -1,6 +1,10 @@
#pragma once
#define DEFAULT_BLOCK_SIZE 1048576
#define DEFAULT_MAX_QUERY_SIZE 1048576
#define SHOW_CHARS_ON_SYNTAX_ERROR 160L
#define DEFAULT_MAX_THREADS 8
#define DBMS_NAME "ClickHouse"
#define DBMS_VERSION_MAJOR 0
#define DBMS_VERSION_MINOR 0
#define DEFAULT_BLOCK_SIZE 1048576
#define DEFAULT_MAX_QUERY_SIZE 1048576
#define SHOW_CHARS_ON_SYNTAX_ERROR 160L
#define DEFAULT_MAX_THREADS 8

View File

@ -101,6 +101,9 @@ namespace ErrorCodes
UNKNOWN_AGGREGATED_DATA_VARIANT,
CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS,
NO_STREAMS_RETURNED_FROM_TABLE,
CANNOT_READ_FROM_SOCKET,
CANNOT_WRITE_TO_SOCKET,
CANNOT_READ_ALL_QUERY,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -0,0 +1,44 @@
#pragma once
namespace DB
{
/** Протокол взаимодействия с сервером.
*/
namespace Protocol
{
/// То, что передаёт сервер.
namespace Server
{
enum Enum
{
Hello, /// Имя, версия, ревизия.
ResultBlock, /// Идентификатор запроса, признак последнего блока, блок результата в формате Native со сжатием.
Exception, /// Исключение во время обработки запроса.
Progress, /// Прогресс выполнения запроса: строк считано, всего строк, байт считано, всего байт.
};
}
/// То, что передаёт клиент.
namespace Client
{
enum Enum
{
QueryHeader, /// Идентификатор запроса, информация, до какой стадии исполнять запрос.
QueryPart, /// Кусок запроса - длина, признак последнего куска, байты.
};
}
/// До какой стадии выполнять запрос.
namespace QueryProcessingStage
{
enum Enum
{
Complete, /// Полностью.
WithMergeableState, /// До стадии, когда результаты обработки на разных серверах можно объединить.
};
}
}
}

View File

@ -0,0 +1,41 @@
#pragma once
#include <Poco/Net/Socket.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
namespace DB
{
/** Работает с готовым Poco::Net::Socket. Операции блокирующие.
*/
class ReadBufferFromPocoSocket : public BufferWithOwnMemory<ReadBuffer>
{
protected:
Poco::Net::Socket & socket;
bool nextImpl()
{
ssize_t bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), internal_buffer.size());
if (bytes_read < 0)
throw Exception("Cannot read from socket", ErrorCodes::CANNOT_READ_FROM_SOCKET);
if (bytes_read)
working_buffer.resize(bytes_read);
else
return false;
return true;
}
public:
ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: BufferWithOwnMemory<ReadBuffer>(buf_size), socket(socket_) {}
};
}

View File

@ -0,0 +1,48 @@
#pragma once
#include <Poco/Net/Socket.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
namespace DB
{
/** Работает с готовым Poco::Net::Socket. Операции блокирующие.
*/
class WriteBufferFromPocoSocket : public BufferWithOwnMemory<WriteBuffer>
{
protected:
Poco::Net::Socket & socket;
void nextImpl()
{
if (!offset())
return;
size_t bytes_written = 0;
while (bytes_written != offset())
{
ssize_t res = socket.impl()->sendBytes(working_buffer.begin() + bytes_written, offset() - bytes_written);
if (res < 0)
throw Exception("Cannot write to socket", ErrorCodes::CANNOT_WRITE_TO_SOCKET);
bytes_written += res;
}
}
public:
WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: BufferWithOwnMemory<WriteBuffer>(buf_size), socket(socket_) {}
~WriteBufferFromPocoSocket()
{
if (!std::uncaught_exception())
next();
}
};
}

View File

@ -10,6 +10,8 @@
#include <DB/IO/ReadBufferFromIStream.h>
#include <DB/IO/ReadBufferFromString.h>
#include <DB/IO/ConcatReadBuffer.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/WriteHelpers.h>
@ -18,7 +20,8 @@
#include <DB/Interpreters/executeQuery.h>
#include "Handler.h"
#include "HTTPHandler.h"
namespace DB
@ -37,7 +40,7 @@ struct HTMLForm : public Poco::Net::HTMLForm
};
void HTTPRequestHandler::processQuery(Poco::Net::NameValueCollection & params, std::ostream & ostr, std::istream & istr)
void HTTPHandler::processQuery(Poco::Net::NameValueCollection & params, std::ostream & ostr, std::istream & istr)
{
BlockInputStreamPtr query_plan;
@ -49,10 +52,26 @@ void HTTPRequestHandler::processQuery(Poco::Net::NameValueCollection & params, s
query_param += '\n';
ReadBufferFromString in_param(query_param);
ReadBufferFromIStream in_post(istr);
SharedPtr<ReadBuffer> in_post = new ReadBufferFromIStream(istr);
SharedPtr<ReadBuffer> in_post_maybe_compressed;
ConcatReadBuffer in(in_param, in_post);
WriteBufferFromOStream out(ostr);
/// Если указано decompress, то будем разжимать то, что передано POST-ом.
if (0 != Poco::NumberParser::parseUnsigned(params.get("decompress", "0")))
in_post_maybe_compressed = new CompressedReadBuffer(*in_post);
else
in_post_maybe_compressed = in_post;
ConcatReadBuffer in(in_param, *in_post_maybe_compressed);
/// Если указано compress, то будем сжимать результат.
SharedPtr<WriteBuffer> out = new WriteBufferFromOStream(ostr);
SharedPtr<WriteBuffer> out_maybe_compressed;
if (0 != Poco::NumberParser::parseUnsigned(params.get("compress", "0")))
out_maybe_compressed = new CompressedWriteBuffer(*out);
else
out_maybe_compressed = out;
Context context = server.global_context;
/// Некоторые настройки могут быть переопределены в запросе.
@ -66,7 +85,7 @@ void HTTPRequestHandler::processQuery(Poco::Net::NameValueCollection & params, s
context.settings.max_threads = Poco::NumberParser::parseUnsigned(params.get("max_threads"));
Stopwatch watch;
executeQuery(in, out, context, query_plan);
executeQuery(in, *out_maybe_compressed, context, query_plan);
watch.stop();
if (query_plan)
@ -101,7 +120,7 @@ void HTTPRequestHandler::processQuery(Poco::Net::NameValueCollection & params, s
}
void HTTPRequestHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
std::ostream & ostr = response.send();
try

View File

@ -8,12 +8,12 @@
namespace DB
{
class HTTPRequestHandler : public Poco::Net::HTTPRequestHandler
class HTTPHandler : public Poco::Net::HTTPRequestHandler
{
public:
HTTPRequestHandler(Server & server_)
HTTPHandler(Server & server_)
: server(server_)
, log(&Logger::get("HTTPRequestHandler"))
, log(&Logger::get("HTTPHandler"))
{
LOG_TRACE(log, "In constructor.");
}

View File

@ -8,7 +8,8 @@
#include <DB/Storages/StorageSystemOne.h>
#include "Server.h"
#include "Handler.h"
#include "HTTPHandler.h"
#include "TCPHandler.h"
namespace DB
@ -39,7 +40,7 @@ Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactory::createRequestHandler(
<< ", User-Agent: " << (request.has("User-Agent") ? request.get("User-Agent") : "none"));
if (request.getURI().find('?') != std::string::npos)
return new HTTPRequestHandler(server);
return new HTTPHandler(server);
else if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
return new PingRequestHandler();
else
@ -47,6 +48,14 @@ Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactory::createRequestHandler(
}
Poco::Net::TCPServerConnection * TCPConnectionFactory::createConnection(const Poco::Net::StreamSocket & socket)
{
LOG_TRACE(log, "TCP Request. " << "Address: " << socket.address().toString());
return new TCPHandler(server, socket);
}
int Server::main(const std::vector<std::string> & args)
{
/// Заранее инициализируем DateLUT, чтобы первая инициализация потом не влияла на измеряемую скорость выполнения.
@ -73,19 +82,31 @@ int Server::main(const std::vector<std::string> & args)
global_context.settings.max_query_size = config.getInt("max_query_size", global_context.settings.max_query_size);
global_context.settings.max_threads = config.getInt("max_threads", global_context.settings.max_threads);
Poco::Net::ServerSocket socket(Poco::Net::SocketAddress("[::]:" + config.getString("http_port")));
Poco::Net::ServerSocket http_socket(Poco::Net::SocketAddress("[::]:" + config.getString("http_port")));
Poco::Net::ServerSocket tcp_socket(Poco::Net::SocketAddress("[::]:" + config.getString("tcp_port")));
Poco::ThreadPool server_pool(2, config.getInt("max_threads", 128));
Poco::Net::HTTPServer server(
Poco::Net::HTTPServer http_server(
new HTTPRequestHandlerFactory(*this),
server_pool,
socket,
http_socket,
new Poco::Net::HTTPServerParams);
server.start();
Poco::Net::TCPServer tcp_server(
new TCPConnectionFactory(*this),
server_pool,
tcp_socket,
new Poco::Net::TCPServerParams);
http_server.start();
tcp_server.start();
waitForTerminationRequest();
server.stop();
http_server.stop();
tcp_server.stop();
return Application::EXIT_OK;
}

View File

@ -10,6 +10,10 @@
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/TCPServer.h>
#include <Poco/Net/TCPServerConnectionFactory.h>
#include <Poco/Net/TCPServerConnection.h>
#include <Yandex/logger_useful.h>
#include <Yandex/daemon.h>
@ -32,6 +36,7 @@ namespace DB
class Server;
class HTTPRequestHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
{
private:
@ -39,11 +44,23 @@ private:
Logger * log;
public:
HTTPRequestHandlerFactory(Server & server_) : server(server_), log(&Logger::get("RequestHandlerFactory")) {}
HTTPRequestHandlerFactory(Server & server_) : server(server_), log(&Logger::get("HTTPRequestHandlerFactory")) {}
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request);
};
class TCPConnectionFactory : public Poco::Net::TCPServerConnectionFactory
{
private:
Server & server;
Logger * log;
public:
TCPConnectionFactory(Server & server_) : server(server_), log(&Logger::get("TCPConnectionFactory")) {}
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket);
};
class Server : public Daemon
{
private:

View File

@ -0,0 +1,143 @@
#include <Yandex/Revision.h>
#include <statdaemons/Stopwatch.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Core/Protocol.h>
#include <DB/IO/ReadBufferFromPocoSocket.h>
#include <DB/IO/WriteBufferFromPocoSocket.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/copyData.h>
#include <DB/Interpreters/executeQuery.h>
#include "TCPHandler.h"
namespace DB
{
static void sendHello(WriteBuffer & out)
{
writeVarUInt(Protocol::Server::Hello, out);
writeStringBinary(DBMS_NAME, out);
writeVarUInt(DBMS_VERSION_MAJOR, out);
writeVarUInt(DBMS_VERSION_MINOR, out);
writeVarUInt(Revision::get(), out);
out.next();
}
/// Считывает запрос из данных, имеющих вид последовательности блоков (размер, флаг конца, кусок данных).
class QueryReader : public DB::ReadBuffer
{
protected:
DB::ReadBuffer & in;
bool all_read;
size_t read_in_block;
size_t block_size;
bool nextImpl()
{
/// Если прочитали ещё не весь блок - получим следующие данные. Если следующих данных нет - ошибка.
if (read_in_block < block_size)
{
if (!in.next())
throw Exception("Cannot read all query", ErrorCodes::CANNOT_READ_ALL_QUERY);
working_buffer = in.buffer();
if (block_size - read_in_block < working_buffer.size())
{
working_buffer.resize(block_size - read_in_block);
read_in_block = block_size;
}
else
read_in_block += working_buffer.size();
}
else
{
if (all_read)
return false;
/// Размер блока.
readVarUInt(block_size, in);
/// Флаг конца.
readIntBinary(all_read, in);
read_in_block = std::min(block_size, in.buffer().size() - in.offset());
working_buffer = Buffer(in.position(), in.position() + read_in_block);
in.position() += read_in_block;
}
return true;
}
public:
QueryReader(ReadBuffer & in_) : ReadBuffer(NULL, 0), in(in_), all_read(false), read_in_block(0), block_size(0) {}
};
void TCPHandler::runImpl()
{
ReadBufferFromPocoSocket in(socket());
WriteBufferFromPocoSocket out(socket());
/// Сразу после соединения, отправляем hello-пакет.
sendHello(out);
//while (1)
{
/// Считываем заголовок запроса: идентификатор запроса и информацию, до какой стадии выполнять запрос.
UInt64 query_id = 0;
UInt64 query_processing_stage = Protocol::QueryProcessingStage::Complete;
readIntBinary(query_id, in);
readVarUInt(query_processing_stage, in);
LOG_DEBUG(log, "Query ID: " << query_id);
QueryReader query_reader(in);
BlockInputStreamPtr query_plan;
Context context = server.global_context;
Stopwatch watch;
executeQuery(query_reader, out, context, query_plan);
watch.stop();
}
}
void TCPHandler::run()
{
try
{
runImpl();
LOG_INFO(log, "Done processing connection.");
}
catch (Poco::Exception & e)
{
std::stringstream s;
s << "Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
<< ", e.message() = " << e.message() << ", e.what() = " << e.what();
LOG_ERROR(log, s.str());
}
catch (std::exception & e)
{
std::stringstream s;
s << "Code: " << ErrorCodes::STD_EXCEPTION << ". " << e.what();
LOG_ERROR(log, s.str());
}
catch (...)
{
std::stringstream s;
s << "Code: " << ErrorCodes::UNKNOWN_EXCEPTION << ". Unknown exception.";
LOG_ERROR(log, s.str());
}
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include "Server.h"
namespace DB
{
class TCPHandler : public Poco::Net::TCPServerConnection
{
public:
TCPHandler(Server & server_, const Poco::Net::StreamSocket & socket_)
: Poco::Net::TCPServerConnection(socket_), server(server_)
, log(&Logger::get("TCPHandler"))
{
LOG_TRACE(log, "In constructor.");
}
void run();
private:
Server & server;
Logger * log;
void runImpl();
};
}