diff --git a/dbms/include/DB/Columns/IColumn.h b/dbms/include/DB/Columns/IColumn.h index ec7e6de647e..342d3678630 100644 --- a/dbms/include/DB/Columns/IColumn.h +++ b/dbms/include/DB/Columns/IColumn.h @@ -34,6 +34,8 @@ public: /** Количество значений в столбце. */ virtual size_t size() const = 0; + bool empty() const { return size() == 0; } + /** Получить значение n-го элемента. * Используется для преобразования из блоков в строки (например, при выводе значений в текстовый дамп) */ diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 82fa14c4cde..d58f1a1ff16 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -65,7 +65,9 @@ namespace ErrorCodes TABLE_ALREADY_EXISTS, TABLE_METADATA_ALREADY_EXISTS, ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER, - UNKNOWN_TABLE + UNKNOWN_TABLE, + ONLY_FILTER_COLUMN_IN_BLOCK, + SYNTAX_ERROR, }; } diff --git a/dbms/src/DataStreams/FilterBlockInputStream.cpp b/dbms/src/DataStreams/FilterBlockInputStream.cpp index 731b4a0b03a..920746fb43b 100644 --- a/dbms/src/DataStreams/FilterBlockInputStream.cpp +++ b/dbms/src/DataStreams/FilterBlockInputStream.cpp @@ -6,42 +6,70 @@ namespace DB { + FilterBlockInputStream::FilterBlockInputStream(BlockInputStreamPtr input_, ssize_t filter_column_) : input(input_), filter_column(filter_column_) { } + Block FilterBlockInputStream::read() { - Block res = input->read(); - if (!res) - return res; - - if (filter_column < 0) - filter_column = static_cast(res.columns()) + filter_column; - - size_t columns = res.columns(); - ColumnPtr column = res.getByPosition(filter_column).column; - - ColumnConstUInt8 * column_const = dynamic_cast(&*column); - if (column_const) + /// Пока не встретится блок, после фильтрации которого что-нибудь останется, или поток не закончится. + while (1) { - return column_const->getData() - ? res - : Block(); + Block res = input->read(); + if (!res) + return res; + + /// Если кроме столбца с фильтром ничего нет. + if (res.columns() <= 1) + throw Exception("There is only filter column in block.", ErrorCodes::ONLY_FILTER_COLUMN_IN_BLOCK); + + if (filter_column < 0) + filter_column = static_cast(res.columns()) + filter_column; + + /// Любой столбец - не являющийся фильтром. + IColumn & any_not_filter_column = *res.getByPosition(filter_column == 0 ? 1 : 0).column; + + size_t columns = res.columns(); + ColumnPtr column = res.getByPosition(filter_column).column; + + ColumnConstUInt8 * column_const = dynamic_cast(&*column); + if (column_const) + { + return column_const->getData() + ? res + : Block(); + } + + ColumnUInt8 * column_vec = dynamic_cast(&*column); + if (!column_vec) + throw Exception("Illegal type " + column->getName() + " of column for filter. Must be ColumnUInt8 or ColumnConstUInt8.", ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER); + + IColumn::Filter & filter = column_vec->getData(); + + for (size_t i = 0; i < columns; ++i) + { + if (i != static_cast(filter_column)) + { + IColumn & current_column = *res.getByPosition(i).column; + current_column.filter(filter); + if (current_column.empty()) + break; + } + } + + /// Если текущий блок полностью отфильтровался - перейдём к следующему. + if (any_not_filter_column.empty()) + continue; + + /// Сам столбец с фильтром заменяем на столбец с константой 1, так как после фильтрации в нём ничего другого не останется. + res.getByPosition(filter_column).column = new ColumnConstUInt8(any_not_filter_column.size(), 1); + + return res; } - - ColumnUInt8 * column_vec = dynamic_cast(&*column); - if (!column_vec) - throw Exception("Illegal type " + column->getName() + " of column for filter. Must be ColumnUInt8 or ColumnConstUInt8.", ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER); - - IColumn::Filter & filter = column_vec->getData(); - - for (size_t i = 0; i < columns; ++i) - if (i != static_cast(filter_column)) - res.getByPosition(i).column->filter(filter); - - return res; } + } diff --git a/dbms/src/DataStreams/LimitBlockInputStream.cpp b/dbms/src/DataStreams/LimitBlockInputStream.cpp index e174a1f91ac..afb632f5c54 100644 --- a/dbms/src/DataStreams/LimitBlockInputStream.cpp +++ b/dbms/src/DataStreams/LimitBlockInputStream.cpp @@ -27,6 +27,8 @@ Block LimitBlockInputStream::read() do { res = input->read(); + if (!res) + return res; rows = res.rows(); pos += rows; } while (pos <= offset); diff --git a/dbms/src/DataStreams/tests/filter_stream_hitlog.cpp b/dbms/src/DataStreams/tests/filter_stream_hitlog.cpp index b81f286816a..73a9936b1b4 100644 --- a/dbms/src/DataStreams/tests/filter_stream_hitlog.cpp +++ b/dbms/src/DataStreams/tests/filter_stream_hitlog.cpp @@ -21,9 +21,9 @@ #include -#include +//#include #include -#include +//#include #include #include @@ -106,15 +106,15 @@ int main(int argc, char ** argv) DB::Context context; - (*context.functions)["plus"] = new DB::FunctionPlus; +/* (*context.functions)["plus"] = new DB::FunctionPlus; (*context.functions)["minus"] = new DB::FunctionMinus; (*context.functions)["multiply"] = new DB::FunctionMultiply; (*context.functions)["divide"] = new DB::FunctionDivideFloating; (*context.functions)["intDiv"] = new DB::FunctionDivideIntegral; (*context.functions)["modulo"] = new DB::FunctionModulo; - +*/ (*context.functions)["equals"] = new DB::FunctionEquals; - (*context.functions)["notEquals"] = new DB::FunctionNotEquals; +/* (*context.functions)["notEquals"] = new DB::FunctionNotEquals; (*context.functions)["less"] = new DB::FunctionLess; (*context.functions)["greater"] = new DB::FunctionGreater; (*context.functions)["lessOrEquals"] = new DB::FunctionLessOrEquals; @@ -124,7 +124,7 @@ int main(int argc, char ** argv) (*context.functions)["or"] = new DB::FunctionOr; (*context.functions)["xor"] = new DB::FunctionXor; (*context.functions)["not"] = new DB::FunctionNot; - +*/ for (NamesAndTypesList::const_iterator it = names_and_types_list.begin(); it != names_and_types_list.end(); ++it) { names_and_types_map->insert(*it); @@ -172,7 +172,7 @@ int main(int argc, char ** argv) in = new DB::ExpressionBlockInputStream(profiling, expression); in = new DB::ProjectionBlockInputStream(in, expression); in = new DB::FilterBlockInputStream(in, 4); - //in = new DB::LimitBlockInputStream(in, 10, std::max(static_cast(0), static_cast(n) - 10)); + //in = new DB::LimitBlockInputStream(in, 10); DB::WriteBufferFromOStream ob(std::cout); DB::TabSeparatedRowOutputStream out(ob, new DB::DataTypes(expression->getReturnTypes())); diff --git a/dbms/src/Interpreters/Expression.cpp b/dbms/src/Interpreters/Expression.cpp index 4f5ae267fac..637aa75ccae 100644 --- a/dbms/src/Interpreters/Expression.cpp +++ b/dbms/src/Interpreters/Expression.cpp @@ -147,11 +147,8 @@ void Expression::executeImpl(ASTPtr ast, Block & block, unsigned part_id) /** Столбцы из таблицы уже загружены в блок. * Вычисление состоит в добавлении в блок новых столбцов - констант и результатов вычислений функций. */ - if (ASTFunction * node = dynamic_cast(&*ast)) { - //std::cerr << node->getTreeID() << std::endl; - /// Вставляем в блок столбцы - результаты вычисления функции ColumnNumbers argument_numbers; ColumnNumbers & result_numbers = node->return_column_numbers; @@ -184,10 +181,6 @@ void Expression::executeImpl(ASTPtr ast, Block & block, unsigned part_id) } else if (ASTLiteral * node = dynamic_cast(&*ast)) { - //std::cerr << node->getTreeID() << std::endl; - - /// Вставляем в блок столбец - константу - ColumnWithNameAndType column; column.column = node->type->createConstColumn(block.rows(), node->value); column.type = node->type; @@ -210,6 +203,7 @@ Block Expression::projectResult(Block & block, unsigned part_id) void Expression::collectFinalColumns(ASTPtr ast, Block & src, Block & dst, unsigned part_id) { + /// Обход в глубину, который не заходит внутрь функций. if (ast->part_id != part_id) { if (!dynamic_cast(&*ast)) @@ -244,21 +238,19 @@ DataTypes Expression::getReturnTypes() void Expression::getReturnTypesImpl(ASTPtr ast, DataTypes & res) { - if (ASTExpressionList * node = dynamic_cast(&*ast)) + /// Обход в глубину, который не заходит внутрь функций. + if (ASTIdentifier * ident = dynamic_cast(&*ast)) { - for (ASTs::iterator it = node->children.begin(); it != node->children.end(); ++it) - { - if (ASTIdentifier * ident = dynamic_cast(&**it)) - res.push_back(ident->type); - else if (ASTFunction * func = dynamic_cast(&**it)) - res.insert(res.end(), func->return_types.begin(), func->return_types.end()); - else if (ASTLiteral * lit = dynamic_cast(&**it)) - res.push_back(lit->type); - } + if (ident->kind == ASTIdentifier::Column) + res.push_back(ident->type); } + else if (ASTLiteral * lit = dynamic_cast(&*ast)) + res.push_back(lit->type); + else if (ASTFunction * func = dynamic_cast(&*ast)) + res.insert(res.end(), func->return_types.begin(), func->return_types.end()); else for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it) - getReturnTypesImpl(*it, res); + getReturnTypesImpl(*it, res); } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 8039872c344..b8ac5772b6d 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -47,7 +48,7 @@ StoragePtr InterpreterSelectQuery::getTable() if (context.databases->end() == context.databases->find(database_name) || (*context.databases)[database_name].end() == (*context.databases)[database_name].find(table_name)) - throw Exception("Unknown table " + table_name + " in database " + database_name, ErrorCodes::UNKNOWN_TABLE); + throw Exception("Unknown table '" + table_name + "' in database '" + database_name + "'", ErrorCodes::UNKNOWN_TABLE); return (*context.databases)[database_name][table_name]; } @@ -71,7 +72,13 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() context.columns = table->getColumns(); Poco::SharedPtr expression = new Expression(query_ptr, context); - BlockInputStreamPtr stream = table->read(expression->getRequiredColumns(), query_ptr, max_block_size); + Names required_columns = expression->getRequiredColumns(); + + /// Если не указан ни один столбец из таблицы, то будем читать первый попавшийся (чтобы хотя бы знать число строк). + if (required_columns.empty()) + required_columns.push_back(table->getColumns().begin()->first); + + BlockInputStreamPtr stream = table->read(required_columns, query_ptr, max_block_size); /// Если есть условие WHERE - сначала выполним часть выражения, необходимую для его вычисления if (query.where_expression) @@ -82,11 +89,21 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() } /// Выполним оставшуюся часть выражения - stream = new ExpressionBlockInputStream(stream, expression); - setPartID(query.select_expression_list, PART_SELECT); + stream = new ExpressionBlockInputStream(stream, expression, PART_SELECT); stream = new ProjectionBlockInputStream(stream, expression, PART_SELECT); + /// Если есть LIMIT + if (query.limit_length) + { + size_t limit_length = boost::get(dynamic_cast(*query.limit_length).value); + size_t limit_offset = 0; + if (query.limit_offset) + limit_offset = boost::get(dynamic_cast(*query.limit_offset).value); + + stream = new LimitBlockInputStream(stream, limit_length, limit_offset); + } + return stream; } diff --git a/dbms/src/Interpreters/tests/select_query.cpp b/dbms/src/Interpreters/tests/select_query.cpp index 8f48bb97e41..bc34b971c1b 100644 --- a/dbms/src/Interpreters/tests/select_query.cpp +++ b/dbms/src/Interpreters/tests/select_query.cpp @@ -10,15 +10,17 @@ #include #include +#include +#include #include #include #include -//#include +#include #include -//#include +#include #include #include @@ -108,15 +110,15 @@ int main(int argc, char ** argv) DB::Context context; -/* (*context.functions)["plus"] = new DB::FunctionPlus; + (*context.functions)["plus"] = new DB::FunctionPlus; (*context.functions)["minus"] = new DB::FunctionMinus; (*context.functions)["multiply"] = new DB::FunctionMultiply; (*context.functions)["divide"] = new DB::FunctionDivideFloating; (*context.functions)["intDiv"] = new DB::FunctionDivideIntegral; (*context.functions)["modulo"] = new DB::FunctionModulo; -*/ + (*context.functions)["equals"] = new DB::FunctionEquals; -/* (*context.functions)["notEquals"] = new DB::FunctionNotEquals; + (*context.functions)["notEquals"] = new DB::FunctionNotEquals; (*context.functions)["less"] = new DB::FunctionLess; (*context.functions)["greater"] = new DB::FunctionGreater; (*context.functions)["lessOrEquals"] = new DB::FunctionLessOrEquals; @@ -126,29 +128,41 @@ int main(int argc, char ** argv) (*context.functions)["or"] = new DB::FunctionOr; (*context.functions)["xor"] = new DB::FunctionXor; (*context.functions)["not"] = new DB::FunctionNot; -*/ - (*context.databases)["default"]["hits"] = new DB::StorageLog("./", "hits", names_and_types_map, ".bin"); + + (*context.databases)["default"]["hits"] = new DB::StorageLog("./", "hits", names_and_types_map, ".bin"); + (*context.databases)["system"]["one"] = new DB::StorageSystemOne("one"); + (*context.databases)["system"]["numbers"] = new DB::StorageSystemNumbers("numbers"); context.current_database = "default"; DB::ParserSelectQuery parser; DB::ASTPtr ast; - std::string input = "SELECT UniqID, URL, CounterID, IsLink FROM hits WHERE URL = 'http://mail.yandex.ru/neo2/#inbox'"; + std::string input;/* = + "SELECT UniqID, URL, CounterID, IsLink " + "FROM hits " + "WHERE URL = 'http://mail.yandex.ru/neo2/#inbox' " + "LIMIT 10";*/ + std::stringstream str; + str << std::cin.rdbuf(); + input = str.str(); + std::string expected; const char * begin = input.data(); const char * end = begin + input.size(); const char * pos = begin; - if (!parser.parse(pos, end, ast, expected)) - { - std::cout << "Failed at position " << (pos - begin) << ": " - << mysqlxx::quote << input.substr(pos - begin, 10) - << ", expected " << expected << "." << std::endl; - } + bool parse_res = parser.parse(pos, end, ast, expected); -/* DB::formatAST(*ast, std::cerr); + if (!parse_res || pos != end) + throw DB::Exception("Syntax error: failed at position " + + Poco::NumberFormatter::format(pos - begin) + ": " + + input.substr(pos - begin, 10) + + ", expected " + (parse_res ? "end of data" : expected) + ".", + DB::ErrorCodes::SYNTAX_ERROR); + + DB::formatAST(*ast, std::cerr); std::cerr << std::endl; - std::cerr << ast->getTreeID() << std::endl; +/* std::cerr << ast->getTreeID() << std::endl; */ DB::InterpreterSelectQuery interpreter(ast, context); DB::BlockInputStreamPtr in = interpreter.execute(); diff --git a/dbms/src/Storages/StorageSystemOne.cpp b/dbms/src/Storages/StorageSystemOne.cpp index 9e081768446..8c3e6ff2414 100644 --- a/dbms/src/Storages/StorageSystemOne.cpp +++ b/dbms/src/Storages/StorageSystemOne.cpp @@ -27,7 +27,7 @@ Block OneValueBlockInputStream::read() ColumnWithNameAndType col; col.name = "dummy"; col.type = new DataTypeUInt8; - col.column = new ColumnConstUInt8(0, 1); + col.column = new ColumnConstUInt8(1, 0); res.insert(col); return res; }