dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-03-11 08:52:56 +00:00
parent d2c7aafe14
commit 70b8947933
15 changed files with 383 additions and 41 deletions

View File

@ -103,7 +103,12 @@ namespace ErrorCodes
NO_STREAMS_RETURNED_FROM_TABLE,
CANNOT_READ_FROM_SOCKET,
CANNOT_WRITE_TO_SOCKET,
CANNOT_READ_ALL_QUERY,
CANNOT_READ_ALL_DATA_FROM_CHUNKED_INPUT,
CANNOT_WRITE_TO_EMPTY_BLOCK_OUTPUT_STREAM,
UNKNOWN_PACKET_FROM_CLIENT,
UNEXPECTED_PACKET_FROM_CLIENT,
RECEIVED_DATA_FOR_WRONG_QUERY_ID,
TOO_SMALL_BUFFER_SIZE,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -14,7 +14,7 @@ namespace Protocol
enum Enum
{
Hello, /// Имя, версия, ревизия.
ResultBlock, /// Идентификатор запроса, признак последнего блока, блок результата в формате Native со сжатием.
Data, /// Идентификатор запроса, признак последнего чанка, размер чанка, часть данных со сжатием или без.
Exception, /// Исключение во время обработки запроса.
Progress, /// Прогресс выполнения запроса: строк считано, всего строк, байт считано, всего байт.
};
@ -25,8 +25,10 @@ namespace Protocol
{
enum Enum
{
QueryHeader, /// Идентификатор запроса, информация, до какой стадии исполнять запрос.
QueryPart, /// Кусок запроса - длина, признак последнего куска, байты.
Query, /** Идентификатор запроса, информация, до какой стадии исполнять запрос,
* использовать ли сжатие, формат входных данных, формат выходных данных, текст запроса (без данных для INSERT-а).
*/
Data, /// Идентификатор запроса, признак последнего чанка, размер чанка, часть данных со сжатием или без.
};
}
@ -39,6 +41,16 @@ namespace Protocol
WithMergeableState, /// До стадии, когда результаты обработки на разных серверах можно объединить.
};
}
/// Использовать ли сжатие.
namespace Compression
{
enum Enum
{
Disable,
Enable,
};
}
}
}

View File

@ -0,0 +1,19 @@
#pragma once
#include <DB/DataStreams/IBlockInputStream.h>
namespace DB
{
/** При чтении сразу возвращает пустой блок, что обозначает конец потока блоков.
*/
class EmptyBlockInputStream : public IBlockInputStream
{
public:
Block read() { return Block(); }
String getName() const { return "EmptyBlockInputStream"; }
BlockInputStreamPtr clone() { return new EmptyBlockInputStream(); }
};
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <DB/DataStreams/IBlockOutputStream.h>
namespace DB
{
/** При попытке записать в этот поток блоков, кидает исключение.
* Используется там, где, в общем случае, нужно передать поток блоков, но в некоторых случаях, он не должен быть использован.
*/
class EmptyBlockOutputStream : public IBlockOutputStream
{
public:
void write(const Block & block)
{
throw Exception("Cannot write to EmptyBlockOutputStream", ErrorCodes::CANNOT_WRITE_TO_EMPTY_BLOCK_OUTPUT_STREAM);
}
BlockOutputStreamPtr clone() { return new EmptyBlockOutputStream(); }
};
}

View File

@ -22,4 +22,6 @@ public:
Block & sample) const;
};
typedef SharedPtr<FormatFactory> FormatFactoryPtr;
}

View File

@ -7,6 +7,7 @@
#include <Poco/Mutex.h>
#include <DB/Core/NamesAndTypes.h>
#include <DB/DataStreams/FormatFactory.h>
#include <DB/Storages/IStorage.h>
#include <DB/Functions/FunctionsLibrary.h>
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
@ -38,6 +39,7 @@ struct Context
AggregateFunctionFactoryPtr aggregate_function_factory; /// Агрегатные функции.
DataTypeFactoryPtr data_type_factory; /// Типы данных.
StorageFactoryPtr storage_factory; /// Движки таблиц.
FormatFactoryPtr format_factory; /// Форматы.
NamesAndTypesList columns; /// Столбцы текущей обрабатываемой таблицы.
Settings settings; /// Настройки выполнения запроса.
Logger * log; /// Логгер.

View File

@ -1,5 +1,6 @@
#pragma once
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/Interpreters/Context.h>
@ -20,6 +21,11 @@ public:
*/
void execute(ReadBuffer * remaining_data_istr);
/** Подготовить запрос к выполнению. Вернуть поток блоков, в который можно писать данные для выполнения запроса.
* Или вернуть NULL, если запрос INSERT SELECT (самодостаточный запрос - не принимает входные данные).
*/
BlockOutputStreamPtr execute();
private:
StoragePtr getTable();

View File

@ -1,5 +1,6 @@
#pragma once
#include <DB/DataStreams/BlockIO.h>
#include <DB/Interpreters/Context.h>
@ -28,6 +29,10 @@ public:
*/
void execute(WriteBuffer & ostr, ReadBuffer * remaining_data_istr, BlockInputStreamPtr & query_plan);
/** Подготовить запрос к выполнению. Вернуть потоки блоков, используя которые можно выполнить запрос.
*/
BlockIO execute();
private:
ASTPtr query_ptr;
Context context;

View File

@ -16,4 +16,23 @@ void executeQuery(
Context & context, /// БД, таблицы, типы данных, движки таблиц, функции, агрегатные функции...
BlockInputStreamPtr & query_plan); /// Сюда может быть записано описание, как выполнялся запрос
/** Более низкоуровневая функция для межсерверного взаимодействия.
* Подготавливает запрос к выполнению, но не выполняет его.
* Возвращает потоки блоков, при использовании которых, запрос будет выполняться.
* То есть, вы можете, в некоторой степени, управлять циклом выполнения запроса.
*
* Для выполнения запроса:
* - сначала передайте данные INSERT-а, если есть, в BlockIO::out;
* - затем читайте результат из BlockIO::in;
*
* Если запрос не предполагает записи данных или возврата результата, то out и in,
* соответственно, будут равны NULL.
*
* Часть запроса по парсингу и форматированию (секция FORMAT) необходимо выполнить отдельно.
*/
BlockIO executeQuery(
const String & query, /// Текст запроса, без данных INSERT-а (если есть). Данные INSERT-а следует писать в BlockIO::out.
Context & context);
}

View File

@ -1,7 +1,6 @@
#include <DB/IO/ConcatReadBuffer.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/FormatFactory.h>
#include <DB/DataStreams/copyData.h>
#include <DB/Parsers/ASTInsertQuery.h>
@ -57,7 +56,6 @@ void InterpreterInsertQuery::execute(ReadBuffer * remaining_data_istr)
/// Какой тип запроса: INSERT VALUES | INSERT FORMAT | INSERT SELECT?
if (!query.select)
{
FormatFactory format_factory;
String format = query.format;
if (format.empty())
format = "Values";
@ -73,7 +71,7 @@ void InterpreterInsertQuery::execute(ReadBuffer * remaining_data_istr)
ConcatReadBuffer istr(buffers);
Block sample = table->getSampleBlock();
in = format_factory.getInput(format, istr, sample, context.settings.max_block_size, *context.data_type_factory);
in = context.format_factory->getInput(format, istr, sample, context.settings.max_block_size, *context.data_type_factory);
copyData(*in, *out);
}
else
@ -86,4 +84,27 @@ void InterpreterInsertQuery::execute(ReadBuffer * remaining_data_istr)
}
BlockOutputStreamPtr InterpreterInsertQuery::execute()
{
ASTInsertQuery & query = dynamic_cast<ASTInsertQuery &>(*query_ptr);
StoragePtr table = getTable();
/// TODO - если указаны не все столбцы, то дополнить поток недостающими столбцами со значениями по-умолчанию.
BlockOutputStreamPtr out = table->write(query_ptr);
/// Какой тип запроса: INSERT или INSERT SELECT?
if (!query.select)
return out;
else
{
InterpreterSelectQuery interpreter_select(query.select, context);
BlockInputStreamPtr in = interpreter_select.execute();
in = new MaterializingBlockInputStream(in);
copyData(*in, *out);
return NULL;
}
}
}

View File

@ -47,4 +47,35 @@ void InterpreterQuery::execute(WriteBuffer & ostr, ReadBuffer * remaining_data_i
}
BlockIO InterpreterQuery::execute()
{
BlockIO res;
if (dynamic_cast<ASTSelectQuery *>(&*query_ptr))
{
InterpreterSelectQuery interpreter(query_ptr, context);
res.in = interpreter.execute();
}
else if (dynamic_cast<ASTInsertQuery *>(&*query_ptr))
{
InterpreterInsertQuery interpreter(query_ptr, context);
res.out = interpreter.execute();
}
else if (dynamic_cast<ASTCreateQuery *>(&*query_ptr))
{
InterpreterCreateQuery interpreter(query_ptr, context);
interpreter.execute();
}
else if (dynamic_cast<ASTDropQuery *>(&*query_ptr))
{
InterpreterDropQuery interpreter(query_ptr, context);
interpreter.execute();
}
else
throw Exception("Unknown type of query: " + query_ptr->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
return res;
}
}

View File

@ -9,7 +9,6 @@
#include <DB/DataStreams/AsynchronousBlockInputStream.h>
#include <DB/DataStreams/UnionBlockInputStream.h>
#include <DB/DataStreams/ParallelAggregatingBlockInputStream.h>
#include <DB/DataStreams/FormatFactory.h>
#include <DB/DataStreams/copyData.h>
#include <DB/Parsers/ASTSelectQuery.h>
@ -301,13 +300,12 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
BlockInputStreamPtr InterpreterSelectQuery::executeAndFormat(WriteBuffer & buf)
{
FormatFactory format_factory;
ASTSelectQuery & query = dynamic_cast<ASTSelectQuery &>(*query_ptr);
Block sample = getSampleBlock();
String format_name = query.format ? dynamic_cast<ASTIdentifier &>(*query.format).name : "TabSeparated";
BlockInputStreamPtr in = execute();
BlockOutputStreamPtr out = format_factory.getOutput(format_name, buf, sample);
BlockOutputStreamPtr out = context.format_factory->getOutput(format_name, buf, sample);
copyData(*in, *out);

View File

@ -1,5 +1,6 @@
#include <DB/Parsers/formatAST.h>
#include <DB/DataStreams/BlockIO.h>
#include <DB/Interpreters/executeQuery.h>
@ -61,4 +62,34 @@ void executeQuery(
}
BlockIO executeQuery(
const String & query,
Context & context)
{
DB::ParserQuery parser;
DB::ASTPtr ast;
std::string expected;
const char * begin = query.data();
const char * end = begin + query.size();
const char * pos = begin;
bool parse_res = parser.parse(pos, end, ast, expected);
/// Распарсенный запрос должен заканчиваться на конец входных данных или на точку с запятой.
if (!parse_res || (pos != end && *pos != ';'))
throw DB::Exception("Syntax error: failed at position "
+ Poco::NumberFormatter::format(pos - begin) + ": "
+ std::string(pos, std::min(SHOW_CHARS_ON_SYNTAX_ERROR, end - pos))
+ ", expected " + (parse_res ? "end of query" : expected) + ".",
DB::ErrorCodes::SYNTAX_ERROR);
formatAST(*ast, std::cerr);
std::cerr << std::endl;
InterpreterQuery interpreter(ast, context);
return interpreter.execute();
}
}

View File

@ -3,10 +3,11 @@
#include <statdaemons/Stopwatch.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Core/Protocol.h>
#include <DB/IO/ReadBufferFromPocoSocket.h>
#include <DB/IO/WriteBufferFromPocoSocket.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/copyData.h>
@ -20,63 +21,107 @@ namespace DB
{
static void sendHello(WriteBuffer & out)
{
writeVarUInt(Protocol::Server::Hello, out);
writeStringBinary(DBMS_NAME, out);
writeVarUInt(DBMS_VERSION_MAJOR, out);
writeVarUInt(DBMS_VERSION_MINOR, out);
writeVarUInt(Revision::get(), out);
out.next();
}
/// Считывает запрос из данных, имеющих вид последовательности блоков (размер, флаг конца, кусок данных).
class QueryReader : public DB::ReadBuffer
/** Считывает данные, из формата, состоящего из чанков
* (идентификатор запроса, признак последнего чанка, размер чанка, часть данных со сжатием или без).
*/
class ChunkedReadBuffer : public ReadBuffer
{
protected:
DB::ReadBuffer & in;
ReadBuffer & in;
bool all_read;
size_t read_in_block;
size_t block_size;
size_t read_in_chunk;
size_t chunk_size;
UInt64 assert_query_id;
bool nextImpl()
{
/// Если прочитали ещё не весь блок - получим следующие данные. Если следующих данных нет - ошибка.
if (read_in_block < block_size)
if (read_in_chunk < chunk_size)
{
if (!in.next())
throw Exception("Cannot read all query", ErrorCodes::CANNOT_READ_ALL_QUERY);
throw Exception("Cannot read all query", ErrorCodes::CANNOT_READ_ALL_DATA_FROM_CHUNKED_INPUT);
working_buffer = in.buffer();
if (block_size - read_in_block < working_buffer.size())
if (chunk_size - read_in_chunk < working_buffer.size())
{
working_buffer.resize(block_size - read_in_block);
read_in_block = block_size;
working_buffer.resize(chunk_size - read_in_chunk);
read_in_chunk = chunk_size;
}
else
read_in_block += working_buffer.size();
read_in_chunk += working_buffer.size();
}
else
{
if (all_read)
return false;
/// Размер блока.
readVarUInt(block_size, in);
UInt64 query_id = 0;
readIntBinary(query_id, in);
if (query_id != assert_query_id)
throw Exception("Received data for wrong query id", ErrorCodes::RECEIVED_DATA_FOR_WRONG_QUERY_ID);
/// Флаг конца.
readIntBinary(all_read, in);
/// Размер блока.
readIntBinary(chunk_size, in);
read_in_block = std::min(block_size, in.buffer().size() - in.offset());
working_buffer = Buffer(in.position(), in.position() + read_in_block);
in.position() += read_in_block;
read_in_chunk = std::min(chunk_size, in.buffer().size() - in.offset());
working_buffer = Buffer(in.position(), in.position() + read_in_chunk);
in.position() += read_in_chunk;
}
return true;
}
public:
QueryReader(ReadBuffer & in_) : ReadBuffer(NULL, 0), in(in_), all_read(false), read_in_block(0), block_size(0) {}
ChunkedReadBuffer(ReadBuffer & in_, UInt64 assert_query_id_)
: ReadBuffer(NULL, 0), in(in_), all_read(false), read_in_chunk(0), chunk_size(0), assert_query_id(assert_query_id_) {}
};
/** Записывает данные в формат, состоящий из чанков
* (идентификатор запроса, признак последнего чанка, размер чанка, часть данных со сжатием или без).
*/
class ChunkedWriteBuffer : public WriteBuffer
{
protected:
WriteBuffer & out;
UInt64 query_id;
inline size_t headerSize() { return sizeof(query_id) + sizeof(bool) + sizeof(size_t); }
void checkBufferSize()
{
if (out.buffer().end() - out.position() < 2 * static_cast<int>(headerSize()))
throw Exception("Too small remaining buffer size to write chunked data", ErrorCodes::TOO_SMALL_BUFFER_SIZE);
}
void nextImpl()
{
checkBufferSize();
writeIntBinary(query_id, out);
writeIntBinary(false, out);
writeIntBinary(offset(), out);
out.position() = position();
out.next();
working_buffer = Buffer(out.buffer().begin() + headerSize(), out.buffer().end());
}
public:
ChunkedWriteBuffer(WriteBuffer & out_, UInt64 query_id_)
: WriteBuffer(out.buffer().begin() + headerSize(), out.buffer().size() - headerSize()), out(out_), query_id(query_id_)
{
checkBufferSize();
}
void writeEnd()
{
writeIntBinary(query_id, out);
writeIntBinary(true, out);
writeIntBinary(static_cast<size_t>(0), out);
}
};
@ -88,7 +133,7 @@ void TCPHandler::runImpl()
/// Сразу после соединения, отправляем hello-пакет.
sendHello(out);
//while (1)
/*while (1)
{
/// Считываем заголовок запроса: идентификатор запроса и информацию, до какой стадии выполнять запрос.
UInt64 query_id = 0;
@ -106,9 +151,75 @@ void TCPHandler::runImpl()
Stopwatch watch;
executeQuery(query_reader, out, context, query_plan);
watch.stop();
}*/
}
void TCPHandler::sendHello(WriteBuffer & out)
{
writeVarUInt(Protocol::Server::Hello, out);
writeStringBinary(DBMS_NAME, out);
writeVarUInt(DBMS_VERSION_MAJOR, out);
writeVarUInt(DBMS_VERSION_MINOR, out);
writeVarUInt(Revision::get(), out);
out.next();
}
void TCPHandler::receivePacket(ReadBuffer & in)
{
UInt64 packet_type = 0;
readVarUInt(packet_type, in);
switch (packet_type)
{
case Protocol::Client::Query:
if (!state.empty())
throw Exception("Unexpected packet Query received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
receiveQuery(in);
break;
case Protocol::Client::Data:
if (state.empty())
throw Exception("Unexpected packet Data received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
receiveData(in);
break;
default:
throw Exception("Unknown packet from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
}
}
void TCPHandler::receiveQuery(ReadBuffer & in)
{
UInt64 stage = 0;
UInt64 compression = 0;
readIntBinary(state.query_id, in);
readVarUInt(stage, in);
state.stage = Protocol::QueryProcessingStage::Enum(stage);
readVarUInt(compression, in);
state.compression = Protocol::Compression::Enum(compression);
readStringBinary(state.in_format, in);
readStringBinary(state.out_format, in);
readStringBinary(state.query, in);
}
void TCPHandler::receiveData(ReadBuffer & in)
{
SharedPtr<ReadBuffer> chunked_in = new ChunkedReadBuffer(in, state.query_id);
SharedPtr<ReadBuffer> maybe_compressed_in = state.compression == Protocol::Compression::Enable
? new CompressedReadBuffer(*chunked_in)
: chunked_in;
//BlockInputStreamPtr block_in = state.context.format_factory->getInput();
// TODO
}
void TCPHandler::run()
{

View File

@ -1,11 +1,54 @@
#pragma once
#include <DB/Core/Protocol.h>
#include <DB/DataStreams/BlockIO.h>
#include "Server.h"
namespace DB
{
/// Состояние обработки запроса.
struct QueryState
{
/// Идентификатор запроса.
UInt64 query_id;
Protocol::QueryProcessingStage::Enum stage;
Protocol::Compression::Enum compression;
String in_format;
String out_format;
/// Текст запроса.
String query;
/// Потоки блоков, с помощью которых выполнять запрос.
BlockIO io;
Context context;
QueryState() : query_id(0), stage(Protocol::QueryProcessingStage::Complete), compression(Protocol::Compression::Disable) {}
void reset()
{
query_id = 0;
stage = Protocol::QueryProcessingStage::Complete;
compression = Protocol::Compression::Disable;
in_format.clear();
out_format.clear();
query.clear();
io = BlockIO();
}
bool empty()
{
return query_id == 0;
}
};
class TCPHandler : public Poco::Net::TCPServerConnection
{
public:
@ -22,7 +65,20 @@ private:
Server & server;
Logger * log;
/// На данный момент, поддерживается одновременное выполнение только одного запроса в соединении.
QueryState state;
void runImpl();
void sendHello(WriteBuffer & out);
void sendData(WriteBuffer & out);
void sendException(WriteBuffer & out);
void sendProgress(WriteBuffer & out);
void receivePacket(ReadBuffer & in);
void receiveQuery(ReadBuffer & in);
void receiveData(ReadBuffer & in);
};
}