diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 11bf7f6576a..82fa14c4cde 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -1,5 +1,4 @@ -#ifndef DBMS_CORE_ERROR_CODES_H -#define DBMS_CORE_ERROR_CODES_H +#pragma once namespace DB @@ -66,9 +65,8 @@ namespace ErrorCodes TABLE_ALREADY_EXISTS, TABLE_METADATA_ALREADY_EXISTS, ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER, + UNKNOWN_TABLE }; } } - -#endif diff --git a/dbms/include/DB/DataStreams/ExpressionBlockInputStream.h b/dbms/include/DB/DataStreams/ExpressionBlockInputStream.h index b3069fc42b2..db152000289 100644 --- a/dbms/include/DB/DataStreams/ExpressionBlockInputStream.h +++ b/dbms/include/DB/DataStreams/ExpressionBlockInputStream.h @@ -20,7 +20,7 @@ using Poco::SharedPtr; class ExpressionBlockInputStream : public IBlockInputStream { public: - ExpressionBlockInputStream(SharedPtr input_, SharedPtr expression_) + ExpressionBlockInputStream(BlockInputStreamPtr input_, SharedPtr expression_) : input(input_), expression(expression_) {} Block read() @@ -30,11 +30,11 @@ public: return res; expression->execute(res); - return res; + return expression->projectResult(res); } private: - SharedPtr input; + BlockInputStreamPtr input; SharedPtr expression; }; diff --git a/dbms/include/DB/DataStreams/FilterBlockInputStream.h b/dbms/include/DB/DataStreams/FilterBlockInputStream.h index f5b86bc52f2..1dcdd544071 100644 --- a/dbms/include/DB/DataStreams/FilterBlockInputStream.h +++ b/dbms/include/DB/DataStreams/FilterBlockInputStream.h @@ -19,11 +19,11 @@ class FilterBlockInputStream : public IBlockInputStream { public: /// filter_column_ - номер столбца с условиями фильтрации - FilterBlockInputStream(SharedPtr input_, size_t filter_column_); + FilterBlockInputStream(BlockInputStreamPtr input_, size_t filter_column_); Block read(); private: - SharedPtr input; + BlockInputStreamPtr input; size_t filter_column; }; diff --git a/dbms/include/DB/DataStreams/IBlockInputStream.h b/dbms/include/DB/DataStreams/IBlockInputStream.h index f5b3c44bfc6..27f61537e40 100644 --- a/dbms/include/DB/DataStreams/IBlockInputStream.h +++ b/dbms/include/DB/DataStreams/IBlockInputStream.h @@ -1,5 +1,6 @@ -#ifndef DBMS_DATA_STREAMS_IBLOCKINPUTSTREAM_H -#define DBMS_DATA_STREAMS_IBLOCKINPUTSTREAM_H +#pragma once + +#include #include @@ -7,6 +8,9 @@ namespace DB { +using Poco::SharedPtr; + + /** Интерфейс потока для чтения данных по блокам из БД. * Реляционные операции предполагается делать также реализациями этого интерфейса. */ @@ -22,6 +26,8 @@ public: virtual ~IBlockInputStream() {} }; + +typedef SharedPtr BlockInputStreamPtr; + } -#endif diff --git a/dbms/include/DB/DataStreams/IBlockOutputStream.h b/dbms/include/DB/DataStreams/IBlockOutputStream.h index 8f1d2b45910..ba61bc72edb 100644 --- a/dbms/include/DB/DataStreams/IBlockOutputStream.h +++ b/dbms/include/DB/DataStreams/IBlockOutputStream.h @@ -1,5 +1,6 @@ -#ifndef DBMS_DATA_STREAMS_IBLOCKOUTPUTSTREAM_H -#define DBMS_DATA_STREAMS_IBLOCKOUTPUTSTREAM_H +#pragma once + +#include #include @@ -7,6 +8,9 @@ namespace DB { +using Poco::SharedPtr; + + /** Интерфейс потока для записи данных в БД или в сеть, или в консоль и т. п. */ class IBlockOutputStream @@ -20,6 +24,6 @@ public: virtual ~IBlockOutputStream() {} }; -} +typedef SharedPtr BlockOutputStreamPtr; -#endif +} diff --git a/dbms/include/DB/DataStreams/LimitBlockInputStream.h b/dbms/include/DB/DataStreams/LimitBlockInputStream.h index 9479d27e06e..84f97077cc8 100644 --- a/dbms/include/DB/DataStreams/LimitBlockInputStream.h +++ b/dbms/include/DB/DataStreams/LimitBlockInputStream.h @@ -17,11 +17,11 @@ using Poco::SharedPtr; class LimitBlockInputStream : public IBlockInputStream { public: - LimitBlockInputStream(SharedPtr input_, size_t limit_, size_t offset_ = 0); + LimitBlockInputStream(BlockInputStreamPtr input_, size_t limit_, size_t offset_ = 0); Block read(); private: - SharedPtr input; + BlockInputStreamPtr input; size_t limit; size_t offset; size_t pos; diff --git a/dbms/include/DB/DataStreams/ProfilingBlockInputStream.h b/dbms/include/DB/DataStreams/ProfilingBlockInputStream.h index b3024b5c8a6..edd6b9159d4 100644 --- a/dbms/include/DB/DataStreams/ProfilingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/ProfilingBlockInputStream.h @@ -37,7 +37,7 @@ struct BlockStreamProfileInfo class ProfilingBlockInputStream : public IBlockInputStream { public: - ProfilingBlockInputStream(SharedPtr in_) + ProfilingBlockInputStream(BlockInputStreamPtr in_) : in(in_) {} Block read(); @@ -45,7 +45,7 @@ public: const BlockStreamProfileInfo & getInfo() const; private: - SharedPtr in; + BlockInputStreamPtr in; BlockStreamProfileInfo info; }; diff --git a/dbms/include/DB/DataStreams/RowInputStreamFromBlockInputStream.h b/dbms/include/DB/DataStreams/RowInputStreamFromBlockInputStream.h index cf43c16753b..27059a0b0d6 100644 --- a/dbms/include/DB/DataStreams/RowInputStreamFromBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RowInputStreamFromBlockInputStream.h @@ -1,7 +1,4 @@ -#ifndef DBMS_DATA_STREAMS_ROWINPUTSTREAMFROMBLOCKINPUTSTREAM_H -#define DBMS_DATA_STREAMS_ROWINPUTSTREAMFROMBLOCKINPUTSTREAM_H - -#include +#pragma once #include #include @@ -10,8 +7,6 @@ namespace DB { -using Poco::SharedPtr; - /** Преобразует поток для чтения данных по блокам в поток для чтения данных по строкам. */ @@ -30,5 +25,3 @@ private: }; } - -#endif diff --git a/dbms/include/DB/Interpreters/Expression.h b/dbms/include/DB/Interpreters/Expression.h index a5f16d0e145..359a25d60e9 100644 --- a/dbms/include/DB/Interpreters/Expression.h +++ b/dbms/include/DB/Interpreters/Expression.h @@ -26,8 +26,14 @@ public: /** Выполнить выражение над блоком. Блок должен содержать все столбцы - идентификаторы. * Функция добавляет в блок новые столбцы - результаты вычислений. + * part_id - какую часть выражения вычислять. */ - void execute(Block & block); + void execute(Block & block, unsigned part_id = 0); + + /** Взять из блока с промежуточными результатами вычислений только столбцы, представляющие собой конечный результат. + * Вернуть новый блок, в котором эти столбцы расположены в правильном порядке. + */ + Block projectResult(Block & block); /** Получить список типов столбцов результата. */ @@ -61,14 +67,9 @@ private: /** Прописать во всех узлах, что они ещё не вычислены. */ - void setNotCalculated(ASTPtr ast); + void setNotCalculated(ASTPtr ast, unsigned part_id); - void executeImpl(ASTPtr ast, Block & block); - - /** Взять из блока с промежуточными результатами вычислений только столбцы, представляющие собой конечный результат. - * Вернуть новый блок, в котором эти столбцы расположены в правильном порядке. - */ - Block projectResult(ASTPtr ast, Block & block); + void executeImpl(ASTPtr ast, Block & block, unsigned part_id); void collectFinalColumns(ASTPtr ast, Block & src, Block & dst); diff --git a/dbms/include/DB/Parsers/ASTIdentifier.h b/dbms/include/DB/Parsers/ASTIdentifier.h index f56adc357fb..08b9f065572 100644 --- a/dbms/include/DB/Parsers/ASTIdentifier.h +++ b/dbms/include/DB/Parsers/ASTIdentifier.h @@ -1,5 +1,4 @@ -#ifndef DBMS_PARSERS_ASTIDENTIFIER_H -#define DBMS_PARSERS_ASTIDENTIFIER_H +#pragma once #include #include @@ -13,18 +12,27 @@ namespace DB class ASTIdentifier : public IAST { public: + enum Kind + { + Column, + Database, + Table, + }; + /// имя String name; - /// тип + + /// чего идентифицирует этот идентификатор + Kind kind; + + /// тип (только для столбцов) DataTypePtr type; ASTIdentifier() {} - ASTIdentifier(StringRange range_, const String & name_) : IAST(range_), name(name_) {} + ASTIdentifier(StringRange range_, const String & name_, Kind kind_ = Column) : IAST(range_), name(name_), kind(kind_) {} /** Получить текст, который идентифицирует этот элемент. */ String getID() { return "Identifier_" + name; } }; } - -#endif diff --git a/dbms/include/DB/Parsers/IAST.h b/dbms/include/DB/Parsers/IAST.h index 0771d541f88..23288af8bb2 100644 --- a/dbms/include/DB/Parsers/IAST.h +++ b/dbms/include/DB/Parsers/IAST.h @@ -23,12 +23,20 @@ public: typedef std::list > ASTs; ASTs children; StringRange range; - /// Было ли соответствующее выражение вычислено. + + /** Было ли соответствующее выражение вычислено. + * Используется, чтобы при обходе графа выражения, не вычислять несколько раз одни и те же узлы. + */ bool calculated; + /** Идентификатор части выражения. Используется при интерпретации, чтобы вычислять не всё выражение сразу, + * а по частям (например, сначала WHERE, потом фильтрация, потом всё остальное). + */ + unsigned part_id; + - IAST() : range(NULL, NULL), calculated(false) {} - IAST(StringRange range_) : range(range_), calculated(false) {} + IAST() : range(NULL, NULL), calculated(false), part_id(0) {} + IAST(StringRange range_) : range(range_), calculated(false), part_id(0) {} virtual ~IAST() {} /** Получить текст, который идентифицирует этот элемент. */ diff --git a/dbms/include/DB/Storages/IStorage.h b/dbms/include/DB/Storages/IStorage.h index 45cad9d4e87..e25c60b2f29 100644 --- a/dbms/include/DB/Storages/IStorage.h +++ b/dbms/include/DB/Storages/IStorage.h @@ -44,7 +44,7 @@ public: * (индексы, блокировки и т. п.) * Возвращает объект, с помощью которого можно последовательно читать данные. */ - virtual SharedPtr read( + virtual BlockInputStreamPtr read( const Names & column_names, ASTPtr query, size_t max_block_size = DEFAULT_BLOCK_SIZE) @@ -56,7 +56,7 @@ public: * Принимает описание запроса, в котором может содержаться информация о методе записи данных. * Возвращает объект, с помощью которого можно последовательно писать данные. */ - virtual SharedPtr write( + virtual BlockOutputStreamPtr write( ASTPtr query) { throw Exception("Method write() is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); diff --git a/dbms/include/DB/Storages/StorageLog.h b/dbms/include/DB/Storages/StorageLog.h index 7e11165d891..4d254738167 100644 --- a/dbms/include/DB/Storages/StorageLog.h +++ b/dbms/include/DB/Storages/StorageLog.h @@ -91,12 +91,12 @@ public: const NamesAndTypes & getColumns() const { return *columns; } - SharedPtr read( + BlockInputStreamPtr read( const Names & column_names, ASTPtr query, size_t max_block_size = DEFAULT_BLOCK_SIZE); - SharedPtr write( + BlockOutputStreamPtr write( ASTPtr query); private: diff --git a/dbms/include/DB/Storages/StorageSystemNumbers.h b/dbms/include/DB/Storages/StorageSystemNumbers.h index 1797a7a0f86..2386de059c1 100644 --- a/dbms/include/DB/Storages/StorageSystemNumbers.h +++ b/dbms/include/DB/Storages/StorageSystemNumbers.h @@ -36,7 +36,7 @@ public: const NamesAndTypes & getColumns() const { return columns; } - SharedPtr read( + BlockInputStreamPtr read( const Names & column_names, ASTPtr query, size_t max_block_size = DEFAULT_BLOCK_SIZE); diff --git a/dbms/include/DB/Storages/StorageSystemOne.h b/dbms/include/DB/Storages/StorageSystemOne.h new file mode 100644 index 00000000000..6ecc421b479 --- /dev/null +++ b/dbms/include/DB/Storages/StorageSystemOne.h @@ -0,0 +1,45 @@ +#pragma once + +#include + + +namespace DB +{ + + +class OneValueBlockInputStream : public IBlockInputStream +{ +public: + OneValueBlockInputStream(); + Block read(); +private: + bool has_been_read; +}; + + +/** Реализует хранилище для системной таблицы One. + * Таблица содержит единственный столбец dummy UInt8 и единственную строку со значением 0. + * Используется, если в запросе не указана таблица. + * Аналог таблицы DUAL в Oracle и MySQL. + */ +class StorageSystemOne : public IStorage +{ +public: + StorageSystemOne(const std::string & name_); + + std::string getName() const { return "SystemOne"; } + std::string getTableName() const { return "One"; } + + const NamesAndTypes & getColumns() const { return columns; } + + BlockInputStreamPtr read( + const Names & column_names, + ASTPtr query, + size_t max_block_size = DEFAULT_BLOCK_SIZE); + +private: + const std::string name; + NamesAndTypes columns; +}; + +} diff --git a/dbms/src/DataStreams/FilterBlockInputStream.cpp b/dbms/src/DataStreams/FilterBlockInputStream.cpp index 059e70b0508..4ecb0feab61 100644 --- a/dbms/src/DataStreams/FilterBlockInputStream.cpp +++ b/dbms/src/DataStreams/FilterBlockInputStream.cpp @@ -6,7 +6,7 @@ namespace DB { -FilterBlockInputStream::FilterBlockInputStream(SharedPtr input_, size_t filter_column_) +FilterBlockInputStream::FilterBlockInputStream(BlockInputStreamPtr input_, size_t filter_column_) : input(input_), filter_column(filter_column_) { } diff --git a/dbms/src/DataStreams/LimitBlockInputStream.cpp b/dbms/src/DataStreams/LimitBlockInputStream.cpp index 2d653be7517..e174a1f91ac 100644 --- a/dbms/src/DataStreams/LimitBlockInputStream.cpp +++ b/dbms/src/DataStreams/LimitBlockInputStream.cpp @@ -8,7 +8,7 @@ namespace DB using Poco::SharedPtr; -LimitBlockInputStream::LimitBlockInputStream(SharedPtr input_, size_t limit_, size_t offset_) +LimitBlockInputStream::LimitBlockInputStream(BlockInputStreamPtr input_, size_t limit_, size_t offset_) : input(input_), limit(limit_), offset(offset_), pos(0) { } diff --git a/dbms/src/Interpreters/Expression.cpp b/dbms/src/Interpreters/Expression.cpp index 00fdb0980a6..54109c5c945 100644 --- a/dbms/src/Interpreters/Expression.cpp +++ b/dbms/src/Interpreters/Expression.cpp @@ -23,11 +23,14 @@ void Expression::addSemantic(ASTPtr ast) } else if (ASTIdentifier * node = dynamic_cast(&*ast)) { - NamesAndTypes::const_iterator it = context.columns.find(node->name); - if (it == context.columns.end()) - throw Exception("Unknown identifier " + node->name, ErrorCodes::UNKNOWN_IDENTIFIER); + if (node->kind == ASTIdentifier::Column) + { + NamesAndTypes::const_iterator it = context.columns.find(node->name); + if (it == context.columns.end()) + throw Exception("Unknown identifier " + node->name, ErrorCodes::UNKNOWN_IDENTIFIER); - node->type = it->second; + node->type = it->second; + } } else if (ASTLiteral * node = dynamic_cast(&*ast)) { @@ -104,30 +107,30 @@ void Expression::glueTreeImpl(ASTPtr ast, Subtrees & subtrees) } -void Expression::setNotCalculated(ASTPtr ast) +void Expression::setNotCalculated(ASTPtr ast, unsigned part_id) { - ast->calculated = false; + if (ast->part_id == part_id) + ast->calculated = false; for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it) - setNotCalculated(*it); + setNotCalculated(*it, part_id); } -void Expression::execute(Block & block) +void Expression::execute(Block & block, unsigned part_id) { - setNotCalculated(ast); - executeImpl(ast, block); - block = projectResult(ast, block); + setNotCalculated(ast, part_id); + executeImpl(ast, block, part_id); } -void Expression::executeImpl(ASTPtr ast, Block & block) +void Expression::executeImpl(ASTPtr ast, Block & block, unsigned part_id) { /// Обход в глубину for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it) - executeImpl(*it, block); + executeImpl(*it, block, part_id); - if (ast->calculated) + if (ast->calculated || ast->part_id != part_id) return; /** Столбцы из таблицы уже загружены в блок. @@ -186,7 +189,7 @@ void Expression::executeImpl(ASTPtr ast, Block & block) } -Block Expression::projectResult(ASTPtr ast, Block & block) +Block Expression::projectResult(Block & block) { Block res; collectFinalColumns(ast, block, res); @@ -197,7 +200,10 @@ Block Expression::projectResult(ASTPtr ast, Block & block) void Expression::collectFinalColumns(ASTPtr ast, Block & src, Block & dst) { if (ASTIdentifier * ident = dynamic_cast(&*ast)) - dst.insert(src.getByName(ident->name)); + { + if (ident->kind == ASTIdentifier::Column) + dst.insert(src.getByName(ident->name)); + } else if (dynamic_cast(&*ast)) dst.insert(src.getByName(ast->getTreeID())); else if (ASTFunction * func = dynamic_cast(&*ast)) diff --git a/dbms/src/Interpreters/tests/expression.cpp b/dbms/src/Interpreters/tests/expression.cpp index f543adc4508..f19ac001f29 100644 --- a/dbms/src/Interpreters/tests/expression.cpp +++ b/dbms/src/Interpreters/tests/expression.cpp @@ -179,6 +179,7 @@ int main(int argc, char ** argv) stopwatch.start(); expression.execute(block); + block = expression.projectResult(block); stopwatch.stop(); std::cout << std::fixed << std::setprecision(2) diff --git a/dbms/src/Parsers/ParserSelectQuery.cpp b/dbms/src/Parsers/ParserSelectQuery.cpp index 3a5deeaec6f..28c1e8c2074 100644 --- a/dbms/src/Parsers/ParserSelectQuery.cpp +++ b/dbms/src/Parsers/ParserSelectQuery.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -65,6 +66,9 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & ex ws.ignore(pos, end); } + + dynamic_cast(*select_query->database).kind = ASTIdentifier::Database; + dynamic_cast(*select_query->table).kind = ASTIdentifier::Table; } /// WHERE expr diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index 60356368d0e..878cd24c58f 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -84,7 +84,7 @@ StorageLog::StorageLog(const std::string & path_, const std::string & name_, Sha } -SharedPtr StorageLog::read( +BlockInputStreamPtr StorageLog::read( const Names & column_names, ASTPtr query, size_t max_block_size) @@ -94,7 +94,7 @@ SharedPtr StorageLog::read( } -SharedPtr StorageLog::write( +BlockOutputStreamPtr StorageLog::write( ASTPtr query) { return new LogBlockOutputStream(*this); diff --git a/dbms/src/Storages/StorageSystemNumbers.cpp b/dbms/src/Storages/StorageSystemNumbers.cpp index 351e6533e32..0847b1d5195 100644 --- a/dbms/src/Storages/StorageSystemNumbers.cpp +++ b/dbms/src/Storages/StorageSystemNumbers.cpp @@ -46,7 +46,7 @@ StorageSystemNumbers::StorageSystemNumbers(const std::string & name_) } -SharedPtr StorageSystemNumbers::read( +BlockInputStreamPtr StorageSystemNumbers::read( const Names & column_names, ASTPtr query, size_t max_block_size) { check(column_names); diff --git a/dbms/src/Storages/StorageSystemOne.cpp b/dbms/src/Storages/StorageSystemOne.cpp new file mode 100644 index 00000000000..9e081768446 --- /dev/null +++ b/dbms/src/Storages/StorageSystemOne.cpp @@ -0,0 +1,51 @@ +#include +#include + +#include + +#include + +#include + + +namespace DB +{ + + +OneValueBlockInputStream::OneValueBlockInputStream() : has_been_read(false) +{ +} + + +Block OneValueBlockInputStream::read() +{ + Block res; + if (has_been_read) + return res; + + has_been_read = true; + ColumnWithNameAndType col; + col.name = "dummy"; + col.type = new DataTypeUInt8; + col.column = new ColumnConstUInt8(0, 1); + res.insert(col); + return res; +} + + +StorageSystemOne::StorageSystemOne(const std::string & name_) + : name(name_) +{ + columns["dummy"] = new DataTypeUInt8; +} + + +BlockInputStreamPtr StorageSystemOne::read( + const Names & column_names, ASTPtr query, size_t max_block_size) +{ + check(column_names); + return new OneValueBlockInputStream(); +} + + +}