dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2011-08-28 08:02:11 +00:00
parent 420e68e7c7
commit 2d530f9b84
9 changed files with 131 additions and 74 deletions

View File

@ -34,6 +34,8 @@ public:
/** Количество значений в столбце. */
virtual size_t size() const = 0;
bool empty() const { return size() == 0; }
/** Получить значение n-го элемента.
* Используется для преобразования из блоков в строки (например, при выводе значений в текстовый дамп)
*/

View File

@ -65,7 +65,9 @@ namespace ErrorCodes
TABLE_ALREADY_EXISTS,
TABLE_METADATA_ALREADY_EXISTS,
ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER,
UNKNOWN_TABLE
UNKNOWN_TABLE,
ONLY_FILTER_COLUMN_IN_BLOCK,
SYNTAX_ERROR,
};
}

View File

@ -6,42 +6,70 @@
namespace DB
{
FilterBlockInputStream::FilterBlockInputStream(BlockInputStreamPtr input_, ssize_t filter_column_)
: input(input_), filter_column(filter_column_)
{
}
Block FilterBlockInputStream::read()
{
Block res = input->read();
if (!res)
return res;
if (filter_column < 0)
filter_column = static_cast<ssize_t>(res.columns()) + filter_column;
size_t columns = res.columns();
ColumnPtr column = res.getByPosition(filter_column).column;
ColumnConstUInt8 * column_const = dynamic_cast<ColumnConstUInt8 *>(&*column);
if (column_const)
/// Пока не встретится блок, после фильтрации которого что-нибудь останется, или поток не закончится.
while (1)
{
return column_const->getData()
? res
: Block();
Block res = input->read();
if (!res)
return res;
/// Если кроме столбца с фильтром ничего нет.
if (res.columns() <= 1)
throw Exception("There is only filter column in block.", ErrorCodes::ONLY_FILTER_COLUMN_IN_BLOCK);
if (filter_column < 0)
filter_column = static_cast<ssize_t>(res.columns()) + filter_column;
/// Любой столбец - не являющийся фильтром.
IColumn & any_not_filter_column = *res.getByPosition(filter_column == 0 ? 1 : 0).column;
size_t columns = res.columns();
ColumnPtr column = res.getByPosition(filter_column).column;
ColumnConstUInt8 * column_const = dynamic_cast<ColumnConstUInt8 *>(&*column);
if (column_const)
{
return column_const->getData()
? res
: Block();
}
ColumnUInt8 * column_vec = dynamic_cast<ColumnUInt8 *>(&*column);
if (!column_vec)
throw Exception("Illegal type " + column->getName() + " of column for filter. Must be ColumnUInt8 or ColumnConstUInt8.", ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
IColumn::Filter & filter = column_vec->getData();
for (size_t i = 0; i < columns; ++i)
{
if (i != static_cast<size_t>(filter_column))
{
IColumn & current_column = *res.getByPosition(i).column;
current_column.filter(filter);
if (current_column.empty())
break;
}
}
/// Если текущий блок полностью отфильтровался - перейдём к следующему.
if (any_not_filter_column.empty())
continue;
/// Сам столбец с фильтром заменяем на столбец с константой 1, так как после фильтрации в нём ничего другого не останется.
res.getByPosition(filter_column).column = new ColumnConstUInt8(any_not_filter_column.size(), 1);
return res;
}
ColumnUInt8 * column_vec = dynamic_cast<ColumnUInt8 *>(&*column);
if (!column_vec)
throw Exception("Illegal type " + column->getName() + " of column for filter. Must be ColumnUInt8 or ColumnConstUInt8.", ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
IColumn::Filter & filter = column_vec->getData();
for (size_t i = 0; i < columns; ++i)
if (i != static_cast<size_t>(filter_column))
res.getByPosition(i).column->filter(filter);
return res;
}
}

View File

@ -27,6 +27,8 @@ Block LimitBlockInputStream::read()
do
{
res = input->read();
if (!res)
return res;
rows = res.rows();
pos += rows;
} while (pos <= offset);

View File

@ -21,9 +21,9 @@
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/Functions/FunctionsArithmetic.h>
//#include <DB/Functions/FunctionsArithmetic.h>
#include <DB/Functions/FunctionsComparison.h>
#include <DB/Functions/FunctionsLogical.h>
//#include <DB/Functions/FunctionsLogical.h>
#include <DB/Parsers/ParserSelectQuery.h>
#include <DB/Parsers/formatAST.h>
@ -106,15 +106,15 @@ int main(int argc, char ** argv)
DB::Context context;
(*context.functions)["plus"] = new DB::FunctionPlus;
/* (*context.functions)["plus"] = new DB::FunctionPlus;
(*context.functions)["minus"] = new DB::FunctionMinus;
(*context.functions)["multiply"] = new DB::FunctionMultiply;
(*context.functions)["divide"] = new DB::FunctionDivideFloating;
(*context.functions)["intDiv"] = new DB::FunctionDivideIntegral;
(*context.functions)["modulo"] = new DB::FunctionModulo;
*/
(*context.functions)["equals"] = new DB::FunctionEquals;
(*context.functions)["notEquals"] = new DB::FunctionNotEquals;
/* (*context.functions)["notEquals"] = new DB::FunctionNotEquals;
(*context.functions)["less"] = new DB::FunctionLess;
(*context.functions)["greater"] = new DB::FunctionGreater;
(*context.functions)["lessOrEquals"] = new DB::FunctionLessOrEquals;
@ -124,7 +124,7 @@ int main(int argc, char ** argv)
(*context.functions)["or"] = new DB::FunctionOr;
(*context.functions)["xor"] = new DB::FunctionXor;
(*context.functions)["not"] = new DB::FunctionNot;
*/
for (NamesAndTypesList::const_iterator it = names_and_types_list.begin(); it != names_and_types_list.end(); ++it)
{
names_and_types_map->insert(*it);
@ -172,7 +172,7 @@ int main(int argc, char ** argv)
in = new DB::ExpressionBlockInputStream(profiling, expression);
in = new DB::ProjectionBlockInputStream(in, expression);
in = new DB::FilterBlockInputStream(in, 4);
//in = new DB::LimitBlockInputStream(in, 10, std::max(static_cast<Int64>(0), static_cast<Int64>(n) - 10));
//in = new DB::LimitBlockInputStream(in, 10);
DB::WriteBufferFromOStream ob(std::cout);
DB::TabSeparatedRowOutputStream out(ob, new DB::DataTypes(expression->getReturnTypes()));

View File

@ -147,11 +147,8 @@ void Expression::executeImpl(ASTPtr ast, Block & block, unsigned part_id)
/** Столбцы из таблицы уже загружены в блок.
* Вычисление состоит в добавлении в блок новых столбцов - констант и результатов вычислений функций.
*/
if (ASTFunction * node = dynamic_cast<ASTFunction *>(&*ast))
{
//std::cerr << node->getTreeID() << std::endl;
/// Вставляем в блок столбцы - результаты вычисления функции
ColumnNumbers argument_numbers;
ColumnNumbers & result_numbers = node->return_column_numbers;
@ -184,10 +181,6 @@ void Expression::executeImpl(ASTPtr ast, Block & block, unsigned part_id)
}
else if (ASTLiteral * node = dynamic_cast<ASTLiteral *>(&*ast))
{
//std::cerr << node->getTreeID() << std::endl;
/// Вставляем в блок столбец - константу
ColumnWithNameAndType column;
column.column = node->type->createConstColumn(block.rows(), node->value);
column.type = node->type;
@ -210,6 +203,7 @@ Block Expression::projectResult(Block & block, unsigned part_id)
void Expression::collectFinalColumns(ASTPtr ast, Block & src, Block & dst, unsigned part_id)
{
/// Обход в глубину, который не заходит внутрь функций.
if (ast->part_id != part_id)
{
if (!dynamic_cast<ASTFunction *>(&*ast))
@ -244,21 +238,19 @@ DataTypes Expression::getReturnTypes()
void Expression::getReturnTypesImpl(ASTPtr ast, DataTypes & res)
{
if (ASTExpressionList * node = dynamic_cast<ASTExpressionList *>(&*ast))
/// Обход в глубину, который не заходит внутрь функций.
if (ASTIdentifier * ident = dynamic_cast<ASTIdentifier *>(&*ast))
{
for (ASTs::iterator it = node->children.begin(); it != node->children.end(); ++it)
{
if (ASTIdentifier * ident = dynamic_cast<ASTIdentifier *>(&**it))
res.push_back(ident->type);
else if (ASTFunction * func = dynamic_cast<ASTFunction *>(&**it))
res.insert(res.end(), func->return_types.begin(), func->return_types.end());
else if (ASTLiteral * lit = dynamic_cast<ASTLiteral *>(&**it))
res.push_back(lit->type);
}
if (ident->kind == ASTIdentifier::Column)
res.push_back(ident->type);
}
else if (ASTLiteral * lit = dynamic_cast<ASTLiteral *>(&*ast))
res.push_back(lit->type);
else if (ASTFunction * func = dynamic_cast<ASTFunction *>(&*ast))
res.insert(res.end(), func->return_types.begin(), func->return_types.end());
else
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
getReturnTypesImpl(*it, res);
getReturnTypesImpl(*it, res);
}

View File

@ -5,6 +5,7 @@
#include <DB/Parsers/ASTSelectQuery.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTLiteral.h>
#include <DB/Interpreters/Expression.h>
#include <DB/Interpreters/InterpreterSelectQuery.h>
@ -47,7 +48,7 @@ StoragePtr InterpreterSelectQuery::getTable()
if (context.databases->end() == context.databases->find(database_name)
|| (*context.databases)[database_name].end() == (*context.databases)[database_name].find(table_name))
throw Exception("Unknown table " + table_name + " in database " + database_name, ErrorCodes::UNKNOWN_TABLE);
throw Exception("Unknown table '" + table_name + "' in database '" + database_name + "'", ErrorCodes::UNKNOWN_TABLE);
return (*context.databases)[database_name][table_name];
}
@ -71,7 +72,13 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
context.columns = table->getColumns();
Poco::SharedPtr<Expression> expression = new Expression(query_ptr, context);
BlockInputStreamPtr stream = table->read(expression->getRequiredColumns(), query_ptr, max_block_size);
Names required_columns = expression->getRequiredColumns();
/// Если не указан ни один столбец из таблицы, то будем читать первый попавшийся (чтобы хотя бы знать число строк).
if (required_columns.empty())
required_columns.push_back(table->getColumns().begin()->first);
BlockInputStreamPtr stream = table->read(required_columns, query_ptr, max_block_size);
/// Если есть условие WHERE - сначала выполним часть выражения, необходимую для его вычисления
if (query.where_expression)
@ -82,11 +89,21 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
}
/// Выполним оставшуюся часть выражения
stream = new ExpressionBlockInputStream(stream, expression);
setPartID(query.select_expression_list, PART_SELECT);
stream = new ExpressionBlockInputStream(stream, expression, PART_SELECT);
stream = new ProjectionBlockInputStream(stream, expression, PART_SELECT);
/// Если есть LIMIT
if (query.limit_length)
{
size_t limit_length = boost::get<UInt64>(dynamic_cast<ASTLiteral &>(*query.limit_length).value);
size_t limit_offset = 0;
if (query.limit_offset)
limit_offset = boost::get<UInt64>(dynamic_cast<ASTLiteral &>(*query.limit_offset).value);
stream = new LimitBlockInputStream(stream, limit_length, limit_offset);
}
return stream;
}

View File

@ -10,15 +10,17 @@
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/Storages/StorageLog.h>
#include <DB/Storages/StorageSystemNumbers.h>
#include <DB/Storages/StorageSystemOne.h>
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
//#include <DB/Functions/FunctionsArithmetic.h>
#include <DB/Functions/FunctionsArithmetic.h>
#include <DB/Functions/FunctionsComparison.h>
//#include <DB/Functions/FunctionsLogical.h>
#include <DB/Functions/FunctionsLogical.h>
#include <DB/Parsers/ParserSelectQuery.h>
#include <DB/Parsers/formatAST.h>
@ -108,15 +110,15 @@ int main(int argc, char ** argv)
DB::Context context;
/* (*context.functions)["plus"] = new DB::FunctionPlus;
(*context.functions)["plus"] = new DB::FunctionPlus;
(*context.functions)["minus"] = new DB::FunctionMinus;
(*context.functions)["multiply"] = new DB::FunctionMultiply;
(*context.functions)["divide"] = new DB::FunctionDivideFloating;
(*context.functions)["intDiv"] = new DB::FunctionDivideIntegral;
(*context.functions)["modulo"] = new DB::FunctionModulo;
*/
(*context.functions)["equals"] = new DB::FunctionEquals;
/* (*context.functions)["notEquals"] = new DB::FunctionNotEquals;
(*context.functions)["notEquals"] = new DB::FunctionNotEquals;
(*context.functions)["less"] = new DB::FunctionLess;
(*context.functions)["greater"] = new DB::FunctionGreater;
(*context.functions)["lessOrEquals"] = new DB::FunctionLessOrEquals;
@ -126,29 +128,41 @@ int main(int argc, char ** argv)
(*context.functions)["or"] = new DB::FunctionOr;
(*context.functions)["xor"] = new DB::FunctionXor;
(*context.functions)["not"] = new DB::FunctionNot;
*/
(*context.databases)["default"]["hits"] = new DB::StorageLog("./", "hits", names_and_types_map, ".bin");
(*context.databases)["default"]["hits"] = new DB::StorageLog("./", "hits", names_and_types_map, ".bin");
(*context.databases)["system"]["one"] = new DB::StorageSystemOne("one");
(*context.databases)["system"]["numbers"] = new DB::StorageSystemNumbers("numbers");
context.current_database = "default";
DB::ParserSelectQuery parser;
DB::ASTPtr ast;
std::string input = "SELECT UniqID, URL, CounterID, IsLink FROM hits WHERE URL = 'http://mail.yandex.ru/neo2/#inbox'";
std::string input;/* =
"SELECT UniqID, URL, CounterID, IsLink "
"FROM hits "
"WHERE URL = 'http://mail.yandex.ru/neo2/#inbox' "
"LIMIT 10";*/
std::stringstream str;
str << std::cin.rdbuf();
input = str.str();
std::string expected;
const char * begin = input.data();
const char * end = begin + input.size();
const char * pos = begin;
if (!parser.parse(pos, end, ast, expected))
{
std::cout << "Failed at position " << (pos - begin) << ": "
<< mysqlxx::quote << input.substr(pos - begin, 10)
<< ", expected " << expected << "." << std::endl;
}
bool parse_res = parser.parse(pos, end, ast, expected);
/* DB::formatAST(*ast, std::cerr);
if (!parse_res || pos != end)
throw DB::Exception("Syntax error: failed at position "
+ Poco::NumberFormatter::format(pos - begin) + ": "
+ input.substr(pos - begin, 10)
+ ", expected " + (parse_res ? "end of data" : expected) + ".",
DB::ErrorCodes::SYNTAX_ERROR);
DB::formatAST(*ast, std::cerr);
std::cerr << std::endl;
std::cerr << ast->getTreeID() << std::endl;
/* std::cerr << ast->getTreeID() << std::endl;
*/
DB::InterpreterSelectQuery interpreter(ast, context);
DB::BlockInputStreamPtr in = interpreter.execute();

View File

@ -27,7 +27,7 @@ Block OneValueBlockInputStream::read()
ColumnWithNameAndType col;
col.name = "dummy";
col.type = new DataTypeUInt8;
col.column = new ColumnConstUInt8(0, 1);
col.column = new ColumnConstUInt8(1, 0);
res.insert(col);
return res;
}