dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-05-09 13:12:38 +00:00
parent 5056b0cfa5
commit 4217baebdf
19 changed files with 357 additions and 166 deletions

View File

@ -107,4 +107,9 @@ template <typename T> ColumnPtr ColumnConst<T>::convertToFullColumn() const
return res; return res;
} }
template <> ColumnPtr ColumnConst<String>::convertToFullColumn() const;
// TODO: convertToFullColumn для остальных типов столбцов.
} }

View File

@ -18,6 +18,7 @@ namespace Protocol
Exception = 2, /// Исключение во время обработки запроса. Exception = 2, /// Исключение во время обработки запроса.
Progress = 3, /// Прогресс выполнения запроса: строк считано, всего строк, байт считано, всего байт. Progress = 3, /// Прогресс выполнения запроса: строк считано, всего строк, байт считано, всего байт.
Ok = 4, /// Запрос без возвращаемого результата успешно выполнен. Ok = 4, /// Запрос без возвращаемого результата успешно выполнен.
Pong = 5, /// Ответ на Ping.
}; };
} }
@ -31,16 +32,7 @@ namespace Protocol
*/ */
Data = 1, /// Идентификатор запроса, признак последнего чанка, размер чанка, часть данных со сжатием или без. Data = 1, /// Идентификатор запроса, признак последнего чанка, размер чанка, часть данных со сжатием или без.
Cancel = 2, /// Отменить выполнение запроса. Cancel = 2, /// Отменить выполнение запроса.
}; Ping = 3, /// Проверка живости соединения с сервером.
}
/// До какой стадии выполнять запрос.
namespace QueryProcessingStage
{
enum Enum
{
Complete = 0, /// Полностью.
WithMergeableState = 1, /// До стадии, когда результаты обработки на разных серверах можно объединить.
}; };
} }

View File

@ -0,0 +1,17 @@
#pragma once
namespace DB
{
/// До какой стадии выполнен или нужно выполнить SELECT запрос.
namespace QueryProcessingStage
{
enum Enum
{
Complete = 0, /// Полностью.
WithMergeableState = 1, /// До стадии, когда результаты обработки на разных серверах можно объединить.
FetchColumns = 2, /// Только прочитать/прочитаны указанные в запросе столбцы.
};
}
}

View File

@ -29,7 +29,7 @@ public:
* Агрегатные функции ищутся везде в выражении. * Агрегатные функции ищутся везде в выражении.
* Столбцы, соответствующие keys и аргументам агрегатных функций, уже должны быть вычислены. * Столбцы, соответствующие keys и аргументам агрегатных функций, уже должны быть вычислены.
*/ */
AggregatingBlockInputStream(BlockInputStreamPtr input_, SharedPtr<Expression> expression); AggregatingBlockInputStream(BlockInputStreamPtr input_, ExpressionPtr expression);
Block readImpl(); Block readImpl();

View File

@ -22,7 +22,7 @@ using Poco::SharedPtr;
class ExpressionBlockInputStream : public IProfilingBlockInputStream class ExpressionBlockInputStream : public IProfilingBlockInputStream
{ {
public: public:
ExpressionBlockInputStream(BlockInputStreamPtr input_, SharedPtr<Expression> expression_, unsigned part_id_ = 0) ExpressionBlockInputStream(BlockInputStreamPtr input_, ExpressionPtr expression_, unsigned part_id_ = 0)
: input(input_), expression(expression_), part_id(part_id_) : input(input_), expression(expression_), part_id(part_id_)
{ {
children.push_back(input); children.push_back(input);
@ -44,7 +44,7 @@ public:
private: private:
BlockInputStreamPtr input; BlockInputStreamPtr input;
SharedPtr<Expression> expression; ExpressionPtr expression;
unsigned part_id; unsigned part_id;
}; };

View File

@ -53,7 +53,7 @@ public:
const BlockStreamProfileInfo & getInfo() const; const BlockStreamProfileInfo & getInfo() const;
/** Установить колбэк, который вызывается, чтобы проверить, не был ли запрос остановлен. /** Установить колбэк, который вызывается, чтобы проверить, не был ли запрос остановлен.
* Колбэк пробрасывается во все дочерние источники и вызывается там перед чтением данных. * Колбэк пробрасывается во все листовые источники и вызывается там перед чтением данных.
* Следует иметь ввиду, что колбэк может вызываться из разных потоков. * Следует иметь ввиду, что колбэк может вызываться из разных потоков.
*/ */
typedef boost::function<bool()> IsCancelledCallback; typedef boost::function<bool()> IsCancelledCallback;

View File

@ -29,7 +29,7 @@ public:
* Агрегатные функции ищутся везде в выражении. * Агрегатные функции ищутся везде в выражении.
* Столбцы, соответствующие keys и аргументам агрегатных функций, уже должны быть вычислены. * Столбцы, соответствующие keys и аргументам агрегатных функций, уже должны быть вычислены.
*/ */
AggregatingBlockInputStream(BlockInputStreamPtr input_, SharedPtr<Expression> expression); AggregatingBlockInputStream(BlockInputStreamPtr input_, ExpressionPtr expression);
Block readImpl(); Block readImpl();

View File

@ -31,7 +31,7 @@ public:
* Агрегатные функции ищутся везде в выражении. * Агрегатные функции ищутся везде в выражении.
* Столбцы, соответствующие keys и аргументам агрегатных функций, уже должны быть вычислены. * Столбцы, соответствующие keys и аргументам агрегатных функций, уже должны быть вычислены.
*/ */
ParallelAggregatingBlockInputStream(BlockInputStreams inputs_, SharedPtr<Expression> expression, unsigned max_threads_ = 1) ParallelAggregatingBlockInputStream(BlockInputStreams inputs_, ExpressionPtr expression, unsigned max_threads_ = 1)
: inputs(inputs_), has_been_read(false), max_threads(max_threads_), pool(max_threads) : inputs(inputs_), has_been_read(false), max_threads(max_threads_), pool(max_threads)
{ {
children.insert(children.end(), inputs_.begin(), inputs_.end()); children.insert(children.end(), inputs_.begin(), inputs_.end());

View File

@ -20,7 +20,7 @@ class ProjectionBlockInputStream : public IProfilingBlockInputStream
public: public:
ProjectionBlockInputStream( ProjectionBlockInputStream(
BlockInputStreamPtr input_, BlockInputStreamPtr input_,
SharedPtr<Expression> expression_, ExpressionPtr expression_,
bool without_duplicates_and_aliases_ = false, bool without_duplicates_and_aliases_ = false,
unsigned part_id_ = 0, unsigned part_id_ = 0,
ASTPtr subtree_ = NULL) ASTPtr subtree_ = NULL)
@ -44,7 +44,7 @@ public:
private: private:
BlockInputStreamPtr input; BlockInputStreamPtr input;
SharedPtr<Expression> expression; ExpressionPtr expression;
bool without_duplicates_and_aliases; bool without_duplicates_and_aliases;
unsigned part_id; unsigned part_id;
ASTPtr subtree; ASTPtr subtree;

View File

@ -64,6 +64,12 @@ public:
*/ */
virtual SharedPtr<IColumn> createConstColumn(size_t size, const Field & field) const = 0; virtual SharedPtr<IColumn> createConstColumn(size_t size, const Field & field) const = 0;
/// Вернуть приблизительный (оценочный) размер значения.
virtual size_t getSizeOfField() const
{
throw Exception("getSizeOfField() method is not implemented for data type " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
virtual ~IDataType() {} virtual ~IDataType() {}
}; };

View File

@ -1,5 +1,4 @@
#ifndef DBMS_DATA_TYPES_IDATATYPE_NUMBER_H #pragma once
#define DBMS_DATA_TYPES_IDATATYPE_NUMBER_H
#include <DB/DataTypes/IDataType.h> #include <DB/DataTypes/IDataType.h>
@ -53,8 +52,8 @@ public:
{ {
deserializeText(field, istr); deserializeText(field, istr);
} }
size_t getSizeOfField() const { return sizeof(FieldType); }
}; };
} }
#endif

View File

@ -113,5 +113,7 @@ private:
void markBeforeAndAfterAggregationImpl(ASTPtr ast, unsigned before_part_id, unsigned after_part_id, bool below = false); void markBeforeAndAfterAggregationImpl(ASTPtr ast, unsigned before_part_id, unsigned after_part_id, bool below = false);
}; };
typedef SharedPtr<Expression> ExpressionPtr;
} }

View File

@ -1,19 +1,22 @@
#pragma once #pragma once
#include <DB/Core/QueryProcessingStage.h>
#include <DB/Interpreters/Expression.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/DataStreams/IBlockInputStream.h> #include <DB/DataStreams/IBlockInputStream.h>
#include <DB/Parsers/ASTSelectQuery.h>
namespace DB namespace DB
{ {
/** Интерпретирует запрос SELECT. Возвращает поток блоков с результатами выполнения запроса. /** Интерпретирует запрос SELECT. Возвращает поток блоков с результатами выполнения запроса до стадии to_stage.
*/ */
class InterpreterSelectQuery class InterpreterSelectQuery
{ {
public: public:
InterpreterSelectQuery(ASTPtr query_ptr_, Context & context_); InterpreterSelectQuery(ASTPtr query_ptr_, Context & context_, QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete);
/// Выполнить запрос, получить поток блоков для чтения /// Выполнить запрос, получить поток блоков для чтения
BlockInputStreamPtr execute(); BlockInputStreamPtr execute();
@ -36,6 +39,10 @@ private:
*/ */
void setPartID(ASTPtr ast, unsigned part_id); void setPartID(ASTPtr ast, unsigned part_id);
/** Выбрать из списка столбцов какой-нибудь, лучше - минимального размера.
*/
String getAnyColumn();
enum PartID enum PartID
{ {
PART_OTHER = 1, PART_OTHER = 1,
@ -49,8 +56,22 @@ private:
}; };
/// Разные стадии выполнения запроса.
void executeFetchColumns( BlockInputStreams & streams, ExpressionPtr & expression);
void executeWhere( BlockInputStreams & streams, ExpressionPtr & expression);
void executeAggregation( BlockInputStreams & streams, ExpressionPtr & expression);
void executeFinalizeAggregates( BlockInputStreams & streams, ExpressionPtr & expression);
void executeHaving( BlockInputStreams & streams, ExpressionPtr & expression);
void executeOuterExpression( BlockInputStreams & streams, ExpressionPtr & expression);
void executeOrder( BlockInputStreams & streams, ExpressionPtr & expression);
void executeUnion( BlockInputStreams & streams, ExpressionPtr & expression);
void executeLimit( BlockInputStreams & streams, ExpressionPtr & expression);
ASTPtr query_ptr; ASTPtr query_ptr;
ASTSelectQuery & query;
Context context; Context context;
QueryProcessingStage::Enum to_stage;
}; };

View File

@ -19,6 +19,7 @@
#include <Poco/SharedPtr.h> #include <Poco/SharedPtr.h>
#include <Poco/Util/Application.h> #include <Poco/Util/Application.h>
#include <Poco/Net/StreamSocket.h> #include <Poco/Net/StreamSocket.h>
#include <Poco/Net/NetException.h>
#include <Yandex/Revision.h> #include <Yandex/Revision.h>
@ -27,6 +28,7 @@
#include <DB/Core/Exception.h> #include <DB/Core/Exception.h>
#include <DB/Core/Types.h> #include <DB/Core/Types.h>
#include <DB/Core/Protocol.h> #include <DB/Core/Protocol.h>
#include <DB/Core/QueryProcessingStage.h>
#include <DB/IO/ReadBufferFromPocoSocket.h> #include <DB/IO/ReadBufferFromPocoSocket.h>
#include <DB/IO/WriteBufferFromPocoSocket.h> #include <DB/IO/WriteBufferFromPocoSocket.h>
@ -259,36 +261,7 @@ private:
format = config().getString("format", is_interactive ? "PrettyCompact" : "TabSeparated"); format = config().getString("format", is_interactive ? "PrettyCompact" : "TabSeparated");
format_max_block_size = config().getInt("format_max_block_size", DEFAULT_BLOCK_SIZE); format_max_block_size = config().getInt("format_max_block_size", DEFAULT_BLOCK_SIZE);
String host = config().getString("host", "localhost"); connect();
UInt16 port = config().getInt("port", 9000);
if (is_interactive)
std::cout << "Connecting to " << host << ":" << port << "." << std::endl;
socket.connect(Poco::Net::SocketAddress(host, port));
/// Получить hello пакет.
UInt64 packet_type = 0;
String server_name;
UInt64 server_version_major = 0;
UInt64 server_version_minor = 0;
UInt64 server_revision = 0;
readVarUInt(packet_type, in);
if (packet_type != Protocol::Server::Hello)
throw Exception("Unexpected packet from server", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
readStringBinary(server_name, in);
readVarUInt(server_version_major, in);
readVarUInt(server_version_minor, in);
readVarUInt(server_revision, in);
if (is_interactive)
std::cout << "Connected to " << server_name
<< " server version " << server_version_major
<< "." << server_version_minor
<< "." << server_revision
<< "." << std::endl << std::endl;
context.format_factory = new FormatFactory(); context.format_factory = new FormatFactory();
context.data_type_factory = new DataTypeFactory(); context.data_type_factory = new DataTypeFactory();
@ -320,6 +293,41 @@ private:
} }
void connect()
{
String host = config().getString("host", "localhost");
UInt16 port = config().getInt("port", 9000);
if (is_interactive)
std::cout << "Connecting to " << host << ":" << port << "." << std::endl;
socket.connect(Poco::Net::SocketAddress(host, port));
/// Получить hello пакет.
UInt64 packet_type = 0;
String server_name;
UInt64 server_version_major = 0;
UInt64 server_version_minor = 0;
UInt64 server_revision = 0;
readVarUInt(packet_type, in);
if (packet_type != Protocol::Server::Hello)
throw Exception("Unexpected packet from server", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
readStringBinary(server_name, in);
readVarUInt(server_version_major, in);
readVarUInt(server_version_minor, in);
readVarUInt(server_revision, in);
if (is_interactive)
std::cout << "Connected to " << server_name
<< " server version " << server_version_major
<< "." << server_version_minor
<< "." << server_revision
<< "." << std::endl << std::endl;
}
void loop() void loop()
{ {
while (char * line_ = readline(":) ")) while (char * line_ = readline(":) "))
@ -367,8 +375,10 @@ private:
return true; return true;
++query_id; ++query_id;
sendQuery(); forceConnected();
sendQuery();
sendData(); sendData();
receiveResult(); receiveResult();
@ -385,6 +395,26 @@ private:
} }
void forceConnected()
{
try
{
if (!ping())
{
socket.close();
connect();
}
}
catch (const Poco::Net::NetException & e)
{
if (is_interactive)
std::cout << e.displayText() << std::endl;
connect();
}
}
bool parseQuery() bool parseQuery()
{ {
ParserQuery parser; ParserQuery parser;
@ -419,9 +449,27 @@ private:
} }
bool ping()
{
UInt64 pong = 0;
writeVarUInt(Protocol::Client::Ping, out);
out.next();
if (in.eof())
return false;
readVarUInt(pong, in);
if (pong != Protocol::Server::Pong)
throw Exception("Unknown packet from server (expected Pong)", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
return true;
}
void sendQuery() void sendQuery()
{ {
UInt64 stage = Protocol::QueryProcessingStage::Complete; UInt64 stage = QueryProcessingStage::Complete;
writeVarUInt(Protocol::Client::Query, out); writeVarUInt(Protocol::Client::Query, out);
writeIntBinary(query_id, out); writeIntBinary(query_id, out);
@ -431,6 +479,7 @@ private:
writeStringBinary(out_format, out); writeStringBinary(out_format, out);
writeStringBinary(query, out); writeStringBinary(query, out);
out.next(); out.next();
} }
@ -643,7 +692,7 @@ private:
Poco::Util::Application::defineOptions(options); Poco::Util::Application::defineOptions(options);
options.addOption( options.addOption(
Poco::Util::Option("config-file", "C") Poco::Util::Option("config-file", "c")
.required(false) .required(false)
.repeatable(false) .repeatable(false)
.argument("<file>") .argument("<file>")

View File

@ -7,7 +7,7 @@ namespace DB
{ {
AggregatingBlockInputStream::AggregatingBlockInputStream(BlockInputStreamPtr input_, SharedPtr<Expression> expression) AggregatingBlockInputStream::AggregatingBlockInputStream(BlockInputStreamPtr input_, ExpressionPtr expression)
: input(input_), has_been_read(false) : input(input_), has_been_read(false)
{ {
children.push_back(input); children.push_back(input);

View File

@ -127,9 +127,10 @@ void IProfilingBlockInputStream::setIsCancelledCallback(IsCancelledCallback call
{ {
is_cancelled_callback = callback; is_cancelled_callback = callback;
for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it) BlockInputStreams leaves = getLeaves();
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&**it)) for (BlockInputStreams::iterator it = leaves.begin(); it != leaves.end(); ++it)
child->setIsCancelledCallback(callback); if (IProfilingBlockInputStream * leaf = dynamic_cast<IProfilingBlockInputStream *>(&**it))
leaf->setIsCancelledCallback(callback);
} }

View File

@ -25,16 +25,14 @@ namespace DB
{ {
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, Context & context_) InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, Context & context_, QueryProcessingStage::Enum to_stage_)
: query_ptr(query_ptr_), context(context_) : query_ptr(query_ptr_), query(dynamic_cast<ASTSelectQuery &>(*query_ptr)), context(context_), to_stage(to_stage_)
{ {
} }
StoragePtr InterpreterSelectQuery::getTable() StoragePtr InterpreterSelectQuery::getTable()
{ {
ASTSelectQuery & query = dynamic_cast<ASTSelectQuery &>(*query_ptr);
/// Из какой таблицы читать данные. JOIN-ы не поддерживаются. /// Из какой таблицы читать данные. JOIN-ы не поддерживаются.
String database_name; String database_name;
@ -66,8 +64,6 @@ StoragePtr InterpreterSelectQuery::getTable()
void InterpreterSelectQuery::setColumns() void InterpreterSelectQuery::setColumns()
{ {
ASTSelectQuery & query = dynamic_cast<ASTSelectQuery &>(*query_ptr);
context.columns = !query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table) context.columns = !query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table)
? getTable()->getColumnsList() ? getTable()->getColumnsList()
: InterpreterSelectQuery(query.table, context).getSampleBlock().getColumnsList(); : InterpreterSelectQuery(query.table, context).getSampleBlock().getColumnsList();
@ -104,51 +100,11 @@ static inline BlockInputStreamPtr maybeAsynchronous(BlockInputStreamPtr in, bool
BlockInputStreamPtr InterpreterSelectQuery::execute() BlockInputStreamPtr InterpreterSelectQuery::execute()
{ {
ASTSelectQuery & query = dynamic_cast<ASTSelectQuery &>(*query_ptr);
/// Таблица, откуда читать данные, если не подзапрос.
StoragePtr table;
/// Интерпретатор подзапроса, если подзапрос
SharedPtr<InterpreterSelectQuery> interpreter_subquery;
/// Добавляем в контекст список доступных столбцов. /// Добавляем в контекст список доступных столбцов.
setColumns(); setColumns();
if (!query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table))
table = getTable();
else
interpreter_subquery = new InterpreterSelectQuery(query.table, context);
/// Объект, с помощью которого анализируется запрос. /// Объект, с помощью которого анализируется запрос.
Poco::SharedPtr<Expression> expression = new Expression(query_ptr, context); ExpressionPtr expression = new Expression(query_ptr, context);
/// Список столбцов, которых нужно прочитать, чтобы выполнить запрос.
Names required_columns = expression->getRequiredColumns();
/// Если не указан ни один столбец из таблицы, то будем читать первый попавшийся (чтобы хотя бы знать число строк).
if (required_columns.empty())
required_columns.push_back(context.columns.front().first);
/// Нужно ли агрегировать.
bool need_aggregate = expression->hasAggregates() || query.group_expression_list;
size_t limit_length = 0;
size_t limit_offset = 0;
if (query.limit_length)
{
limit_length = boost::get<UInt64>(dynamic_cast<ASTLiteral &>(*query.limit_length).value);
if (query.limit_offset)
limit_offset = boost::get<UInt64>(dynamic_cast<ASTLiteral &>(*query.limit_offset).value);
}
/** Оптимизация - если не указаны WHERE, GROUP, HAVING, ORDER, но указан LIMIT, и limit + offset < max_block_size,
* то в качестве размера блока будем использовать limit + offset (чтобы не читать из таблицы больше, чем запрошено).
*/
size_t block_size = context.settings.max_block_size;
if (!query.where_expression && !query.group_expression_list && !query.having_expression && !query.order_expression_list
&& query.limit_length && !need_aggregate && limit_length + limit_offset < block_size)
{
block_size = limit_length + limit_offset;
}
/** Потоки данных. При параллельном выполнении запроса, имеем несколько потоков данных. /** Потоки данных. При параллельном выполнении запроса, имеем несколько потоков данных.
* Если нет GROUP BY, то выполним все операции до ORDER BY и LIMIT параллельно, затем * Если нет GROUP BY, то выполним все операции до ORDER BY и LIMIT параллельно, затем
@ -161,6 +117,89 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
*/ */
BlockInputStreams streams; BlockInputStreams streams;
executeFetchColumns(streams, expression);
if (to_stage == QueryProcessingStage::FetchColumns)
{
executeUnion(streams, expression);
}
else
{
executeWhere(streams, expression);
/// Нужно ли агрегировать.
bool need_aggregate = expression->hasAggregates() || query.group_expression_list;
/// Если есть GROUP BY - сначала выполним часть выражения, необходимую для его вычисления
if (need_aggregate)
executeAggregation(streams, expression);
if (to_stage != QueryProcessingStage::WithMergeableState)
{
if (need_aggregate)
executeFinalizeAggregates(streams, expression);
executeHaving(streams, expression);
executeOuterExpression(streams, expression);
executeOrder(streams, expression);
executeUnion(streams, expression);
executeLimit(streams, expression);
}
}
return streams[0];
}
static void getLimitLengthAndOffset(ASTSelectQuery & query, size_t & length, size_t & offset)
{
length = 0;
offset = 0;
if (query.limit_length)
{
length = boost::get<UInt64>(dynamic_cast<ASTLiteral &>(*query.limit_length).value);
if (query.limit_offset)
offset = boost::get<UInt64>(dynamic_cast<ASTLiteral &>(*query.limit_offset).value);
}
}
void InterpreterSelectQuery::executeFetchColumns(BlockInputStreams & streams, ExpressionPtr & expression)
{
/// Таблица, откуда читать данные, если не подзапрос.
StoragePtr table;
/// Интерпретатор подзапроса, если подзапрос
SharedPtr<InterpreterSelectQuery> interpreter_subquery;
/// Добавляем в контекст список доступных столбцов.
setColumns();
if (!query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table))
table = getTable();
else
interpreter_subquery = new InterpreterSelectQuery(query.table, context);
/// Список столбцов, которых нужно прочитать, чтобы выполнить запрос.
Names required_columns = expression->getRequiredColumns();
/// Если не указан ни один столбец из таблицы, то будем читать первый попавшийся (чтобы хотя бы знать число строк).
if (required_columns.empty())
required_columns.push_back(getAnyColumn());
size_t limit_length = 0;
size_t limit_offset = 0;
getLimitLengthAndOffset(query, limit_length, limit_offset);
/** Оптимизация - если не указаны WHERE, GROUP, HAVING, ORDER, но указан LIMIT, и limit + offset < max_block_size,
* то в качестве размера блока будем использовать limit + offset (чтобы не читать из таблицы больше, чем запрошено).
*/
size_t block_size = context.settings.max_block_size;
if (!query.where_expression && !query.group_expression_list && !query.having_expression && !query.order_expression_list
&& query.limit_length && !expression->hasAggregates() && limit_length + limit_offset < block_size)
{
block_size = limit_length + limit_offset;
}
/// Инициализируем изначальные потоки данных, на которые накладываются преобразования запроса. Таблица или подзапрос? /// Инициализируем изначальные потоки данных, на которые накладываются преобразования запроса. Таблица или подзапрос?
if (!query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table)) if (!query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table))
streams = table->read(required_columns, query_ptr, block_size, context.settings.max_threads); streams = table->read(required_columns, query_ptr, block_size, context.settings.max_threads);
@ -169,7 +208,11 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
if (streams.empty()) if (streams.empty())
throw Exception("No streams returned from table.", ErrorCodes::NO_STREAMS_RETURNED_FROM_TABLE); throw Exception("No streams returned from table.", ErrorCodes::NO_STREAMS_RETURNED_FROM_TABLE);
}
void InterpreterSelectQuery::executeWhere(BlockInputStreams & streams, ExpressionPtr & expression)
{
/// Если есть условие WHERE - сначала выполним часть выражения, необходимую для его вычисления /// Если есть условие WHERE - сначала выполним часть выражения, необходимую для его вычисления
if (query.where_expression) if (query.where_expression)
{ {
@ -182,36 +225,45 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
stream = maybeAsynchronous(new FilterBlockInputStream(stream), context.settings.asynchronous); stream = maybeAsynchronous(new FilterBlockInputStream(stream), context.settings.asynchronous);
} }
} }
}
/// Если есть GROUP BY - сначала выполним часть выражения, необходимую для его вычисления
if (need_aggregate) void InterpreterSelectQuery::executeAggregation(BlockInputStreams & streams, ExpressionPtr & expression)
{
expression->markBeforeAndAfterAggregation(PART_BEFORE_AGGREGATING, PART_AFTER_AGGREGATING);
if (query.group_expression_list)
setPartID(query.group_expression_list, PART_GROUP);
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{ {
expression->markBeforeAndAfterAggregation(PART_BEFORE_AGGREGATING, PART_AFTER_AGGREGATING); BlockInputStreamPtr & stream = *it;
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_GROUP | PART_BEFORE_AGGREGATING), context.settings.asynchronous);
if (query.group_expression_list)
setPartID(query.group_expression_list, PART_GROUP);
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_GROUP | PART_BEFORE_AGGREGATING), context.settings.asynchronous);
}
BlockInputStreamPtr & stream = streams[0];
/// Если потоков несколько, то выполняем параллельную агрегацию
if (streams.size() > 1)
{
stream = maybeAsynchronous(new ParallelAggregatingBlockInputStream(streams, expression, context.settings.max_threads), context.settings.asynchronous);
streams.resize(1);
}
else
stream = maybeAsynchronous(new AggregatingBlockInputStream(stream, expression), context.settings.asynchronous);
/// Финализируем агрегатные функции - заменяем их состояния вычислений на готовые значения
stream = maybeAsynchronous(new FinalizingAggregatedBlockInputStream(stream), context.settings.asynchronous);
} }
BlockInputStreamPtr & stream = streams[0];
/// Если потоков несколько, то выполняем параллельную агрегацию
if (streams.size() > 1)
{
stream = maybeAsynchronous(new ParallelAggregatingBlockInputStream(streams, expression, context.settings.max_threads), context.settings.asynchronous);
streams.resize(1);
}
else
stream = maybeAsynchronous(new AggregatingBlockInputStream(stream, expression), context.settings.asynchronous);
}
void InterpreterSelectQuery::executeFinalizeAggregates(BlockInputStreams & streams, ExpressionPtr & expression)
{
/// Финализируем агрегатные функции - заменяем их состояния вычислений на готовые значения
BlockInputStreamPtr & stream = streams[0];
stream = maybeAsynchronous(new FinalizingAggregatedBlockInputStream(stream), context.settings.asynchronous);
}
void InterpreterSelectQuery::executeHaving(BlockInputStreams & streams, ExpressionPtr & expression)
{
/// Если есть условие HAVING - сначала выполним часть выражения, необходимую для его вычисления /// Если есть условие HAVING - сначала выполним часть выражения, необходимую для его вычисления
if (query.having_expression) if (query.having_expression)
{ {
@ -224,7 +276,11 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
stream = maybeAsynchronous(new FilterBlockInputStream(stream), context.settings.asynchronous); stream = maybeAsynchronous(new FilterBlockInputStream(stream), context.settings.asynchronous);
} }
} }
}
void InterpreterSelectQuery::executeOuterExpression(BlockInputStreams & streams, ExpressionPtr & expression)
{
/// Выполним оставшуюся часть выражения /// Выполним оставшуюся часть выражения
setPartID(query.select_expression_list, PART_SELECT); setPartID(query.select_expression_list, PART_SELECT);
if (query.order_expression_list) if (query.order_expression_list)
@ -243,7 +299,11 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
PART_SELECT | PART_ORDER, PART_SELECT | PART_ORDER,
query.order_expression_list ? NULL : query.select_expression_list); query.order_expression_list ? NULL : query.select_expression_list);
} }
}
void InterpreterSelectQuery::executeOrder(BlockInputStreams & streams, ExpressionPtr & expression)
{
/// Если есть ORDER BY /// Если есть ORDER BY
if (query.order_expression_list) if (query.order_expression_list)
{ {
@ -278,29 +338,38 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
/// Оставим только столбцы, нужные для SELECT части /// Оставим только столбцы, нужные для SELECT части
stream = new ProjectionBlockInputStream(stream, expression, false, PART_SELECT, query.select_expression_list); stream = new ProjectionBlockInputStream(stream, expression, false, PART_SELECT, query.select_expression_list);
} }
}
void InterpreterSelectQuery::executeUnion(BlockInputStreams & streams, ExpressionPtr & expression)
{
/// Если до сих пор есть несколько потоков, то объединяем их в один /// Если до сих пор есть несколько потоков, то объединяем их в один
if (streams.size() > 1) if (streams.size() > 1)
{ {
streams[0] = new UnionBlockInputStream(streams, context.settings.max_threads); streams[0] = new UnionBlockInputStream(streams, context.settings.max_threads);
streams.resize(1); streams.resize(1);
} }
}
void InterpreterSelectQuery::executeLimit(BlockInputStreams & streams, ExpressionPtr & expression)
{
size_t limit_length = 0;
size_t limit_offset = 0;
getLimitLengthAndOffset(query, limit_length, limit_offset);
BlockInputStreamPtr & stream = streams[0]; BlockInputStreamPtr & stream = streams[0];
/// Если есть LIMIT /// Если есть LIMIT
if (query.limit_length) if (query.limit_length)
{ {
stream = new LimitBlockInputStream(stream, limit_length, limit_offset); stream = new LimitBlockInputStream(stream, limit_length, limit_offset);
} }
return stream;
} }
BlockInputStreamPtr InterpreterSelectQuery::executeAndFormat(WriteBuffer & buf) BlockInputStreamPtr InterpreterSelectQuery::executeAndFormat(WriteBuffer & buf)
{ {
ASTSelectQuery & query = dynamic_cast<ASTSelectQuery &>(*query_ptr);
Block sample = getSampleBlock(); Block sample = getSampleBlock();
String format_name = query.format ? dynamic_cast<ASTIdentifier &>(*query.format).name : "TabSeparated"; String format_name = query.format ? dynamic_cast<ASTIdentifier &>(*query.format).name : "TabSeparated";
@ -321,4 +390,25 @@ void InterpreterSelectQuery::setPartID(ASTPtr ast, unsigned part_id)
setPartID(*it, part_id); setPartID(*it, part_id);
} }
String InterpreterSelectQuery::getAnyColumn()
{
NamesAndTypesList::const_iterator it = context.columns.begin();
size_t min_size = it->second->isNumeric() ? it->second->getSizeOfField() : 100;
String res = it->first;
for (; it != context.columns.end(); ++it)
{
size_t current_size = it->second->isNumeric() ? it->second->getSizeOfField() : 100;
if (current_size < min_size)
{
min_size = current_size;
res = it->first;
}
}
return res;
}
} }

View File

@ -41,7 +41,7 @@ void TCPHandler::runImpl()
try try
{ {
/// Пакет с запросом. /// Пакет с запросом.
receivePacket(in); receivePacket(in, out);
LOG_DEBUG(log, "Query ID: " << state.query_id); LOG_DEBUG(log, "Query ID: " << state.query_id);
LOG_DEBUG(log, "Query: " << state.query); LOG_DEBUG(log, "Query: " << state.query);
@ -53,7 +53,7 @@ void TCPHandler::runImpl()
/// Читаем из сети данные для INSERT-а, если надо, и вставляем их. /// Читаем из сети данные для INSERT-а, если надо, и вставляем их.
if (state.io.out) if (state.io.out)
{ {
while (receivePacket(in)) while (receivePacket(in, out))
; ;
} }
@ -112,28 +112,36 @@ void TCPHandler::sendHello(WriteBuffer & out)
out.next(); out.next();
} }
bool TCPHandler::receivePacket(ReadBuffer & in) bool TCPHandler::receivePacket(ReadBuffer & in, WriteBuffer & out)
{ {
UInt64 packet_type = 0; while (true) /// Если пришёл пакет типа Ping, то игнорируем его и получаем следующий пакет.
readVarUInt(packet_type, in);
std::cerr << "Packet: " << packet_type << std::endl;
switch (packet_type)
{ {
case Protocol::Client::Query: UInt64 packet_type = 0;
if (!state.empty()) readVarUInt(packet_type, in);
throw Exception("Unexpected packet Query received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
receiveQuery(in); std::cerr << "Packet: " << packet_type << std::endl;
return true;
switch (packet_type)
case Protocol::Client::Data: {
if (state.empty()) case Protocol::Client::Query:
throw Exception("Unexpected packet Data received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT); if (!state.empty())
return receiveData(in); throw Exception("Unexpected packet Query received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
receiveQuery(in);
default: return true;
throw Exception("Unknown packet from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
case Protocol::Client::Data:
if (state.empty())
throw Exception("Unexpected packet Data received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
return receiveData(in);
case Protocol::Client::Ping:
writeVarUInt(Protocol::Server::Pong, out);
out.next();
break;
default:
throw Exception("Unknown packet from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
}
} }
} }
@ -145,7 +153,7 @@ void TCPHandler::receiveQuery(ReadBuffer & in)
readIntBinary(state.query_id, in); readIntBinary(state.query_id, in);
readVarUInt(stage, in); readVarUInt(stage, in);
state.stage = Protocol::QueryProcessingStage::Enum(stage); state.stage = QueryProcessingStage::Enum(stage);
readVarUInt(compression, in); readVarUInt(compression, in);
state.compression = Protocol::Compression::Enum(compression); state.compression = Protocol::Compression::Enum(compression);

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <DB/Core/Protocol.h> #include <DB/Core/Protocol.h>
#include <DB/Core/QueryProcessingStage.h>
#include <DB/IO/ChunkedReadBuffer.h> #include <DB/IO/ChunkedReadBuffer.h>
#include <DB/IO/ChunkedWriteBuffer.h> #include <DB/IO/ChunkedWriteBuffer.h>
@ -23,7 +24,7 @@ struct QueryState
/// Идентификатор запроса. /// Идентификатор запроса.
UInt64 query_id; UInt64 query_id;
Protocol::QueryProcessingStage::Enum stage; QueryProcessingStage::Enum stage;
Protocol::Compression::Enum compression; Protocol::Compression::Enum compression;
String in_format; String in_format;
String out_format; String out_format;
@ -51,7 +52,7 @@ struct QueryState
SharedPtr<Exception> exception; SharedPtr<Exception> exception;
QueryState() : query_id(0), stage(Protocol::QueryProcessingStage::Complete), compression(Protocol::Compression::Disable) {} QueryState() : query_id(0), stage(QueryProcessingStage::Complete), compression(Protocol::Compression::Disable) {}
void reset() void reset()
{ {
@ -95,7 +96,7 @@ private:
void sendProgress(WriteBuffer & out); void sendProgress(WriteBuffer & out);
void sendOk(WriteBuffer & out); void sendOk(WriteBuffer & out);
bool receivePacket(ReadBuffer & in); bool receivePacket(ReadBuffer & in, WriteBuffer & out);
void receiveQuery(ReadBuffer & in); void receiveQuery(ReadBuffer & in);
bool receiveData(ReadBuffer & in); bool receiveData(ReadBuffer & in);