dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2011-09-25 03:37:09 +00:00
parent 5c8b01da3a
commit b979162b00
17 changed files with 178 additions and 67 deletions

View File

@ -71,6 +71,7 @@ namespace ErrorCodes
UNKNOWN_AGGREGATE_FUNCTION,
CANNOT_READ_AGGREGATE_FUNCTION_FROM_TEXT,
CANNOT_WRITE_AGGREGATE_FUNCTION_AS_TEXT,
NOT_A_COLUMN,
};
}

View File

@ -20,16 +20,16 @@ class AggregatingBlockInputStream : public IProfilingBlockInputStream
{
public:
AggregatingBlockInputStream(BlockInputStreamPtr input_, const ColumnNumbers & keys_, AggregateDescriptions & aggregates_)
: input(input_), aggregator(keys_, aggregates_), has_been_read(false)
: input(input_), aggregator(new Aggregator(keys_, aggregates_)), has_been_read(false)
{
children.push_back(input);
}
/** keys берутся из Expression::PART_GROUP
/** keys берутся из GROUP BY части запроса
* Агрегатные функции ищутся везде в выражении.
* Столбцы, соответствующие keys и аргументам агрегатных функций, уже должны быть вычислены.
*/
// AggregatingBlockInputStream(BlockInputStreamPtr input_, SharedPtr<Expression> expression);
AggregatingBlockInputStream(BlockInputStreamPtr input_, SharedPtr<Expression> expression);
Block readImpl();
@ -37,7 +37,7 @@ public:
private:
BlockInputStreamPtr input;
Aggregator aggregator;
SharedPtr<Aggregator> aggregator;
bool has_been_read;
};

View File

@ -16,14 +16,18 @@ using Poco::SharedPtr;
* Выражение состоит из идентификаторов столбцов из блока, констант, обычных функций.
* Например: hits * 2 + 3, instr("yandex", url)
* Выражение не меняет количество строк в потоке, и обрабатывает каждую строку независимо от других.
* is_first - если часть вычислений является первой для блока.
* В этом случае, перед обработкой каждого блока, сбрасываются флаги, что элемент в дереве запроса уже был вычислен.
* При вложенных применениях нескольких ExpressionBlockInputStream, нужно указать is_first только у первого,
* чтобы у последующих не дублировались вычисления.
* part_id - идентификатор части выражения, которую надо вычислять.
* Например, может потребоваться вычислить только часть выражения в секции WHERE.
*/
class ExpressionBlockInputStream : public IProfilingBlockInputStream
{
public:
ExpressionBlockInputStream(BlockInputStreamPtr input_, SharedPtr<Expression> expression_, unsigned part_id_ = 0)
: input(input_), expression(expression_), part_id(part_id_)
ExpressionBlockInputStream(BlockInputStreamPtr input_, SharedPtr<Expression> expression_, bool is_first_ = true, unsigned part_id_ = 0)
: input(input_), expression(expression_), is_first(is_first_), part_id(part_id_)
{
children.push_back(input);
}
@ -34,6 +38,9 @@ public:
if (!res)
return res;
if (is_first)
expression->setNotCalculated(part_id);
expression->execute(res, part_id);
return res;
}
@ -43,6 +50,7 @@ public:
private:
BlockInputStreamPtr input;
SharedPtr<Expression> expression;
bool is_first;
unsigned part_id;
};

View File

@ -22,8 +22,9 @@ public:
BlockInputStreamPtr input_,
SharedPtr<Expression> expression_,
bool without_duplicates_ = false,
unsigned part_id_ = 0)
: input(input_), expression(expression_), without_duplicates(without_duplicates_), part_id(part_id_)
unsigned part_id_ = 0,
ASTPtr subtree_ = NULL)
: input(input_), expression(expression_), without_duplicates(without_duplicates_), part_id(part_id_), subtree(subtree_)
{
children.push_back(input);
}
@ -34,7 +35,7 @@ public:
if (!res)
return res;
return expression->projectResult(res, without_duplicates, part_id);
return expression->projectResult(res, without_duplicates, part_id, subtree);
}
String getName() const { return "ProjectionBlockInputStream"; }
@ -44,6 +45,7 @@ private:
SharedPtr<Expression> expression;
bool without_duplicates;
unsigned part_id;
ASTPtr subtree;
};
}

View File

@ -15,6 +15,7 @@ struct AggregateDescription
AggregateFunctionPtr function;
ColumnNumbers arguments;
Names argument_names; /// Используются, если arguments не заданы.
String column_name; /// Какое имя использовать для столбца со значениями агрегатной функции
};
typedef std::vector<AggregateDescription> AggregateDescriptions;

View File

@ -29,6 +29,11 @@ public:
*/
Names getRequiredColumns();
/** Прописать во всех узлах, что они ещё не вычислены.
* Вызывайте в начале серии вычислений, для каждого блока.
*/
void setNotCalculated(unsigned part_id = 0, ASTPtr subtree = NULL);
/** Выполнить выражение над блоком. Блок должен содержать все столбцы - идентификаторы.
* Функция добавляет в блок новые столбцы - результаты вычислений.
* part_id - какую часть выражения вычислять.
@ -38,7 +43,7 @@ public:
/** Взять из блока с промежуточными результатами вычислений только столбцы, представляющие собой конечный результат.
* Вернуть новый блок, в котором эти столбцы расположены в правильном порядке.
*/
Block projectResult(Block & block, bool without_duplicates = false, unsigned part_id = 0);
Block projectResult(Block & block, bool without_duplicates = false, unsigned part_id = 0, ASTPtr subtree = NULL);
/** Получить список типов столбцов результата.
*/
@ -48,6 +53,15 @@ public:
*/
void getAggregateInfo(Names & key_names, AggregateDescriptions & aggregates);
/** Есть ли в выражении агрегатные функции.
*/
bool hasAggregates();
/** Пометить то, что должно быть вычислено до агрегирования одним part_id,
* а то, что должно быть вычислено после агрегирования, а также сами агрегатные функции - другим part_id.
*/
void markBeforeAndAfterAggregation(unsigned before_part_id, unsigned after_part_id);
private:
ASTPtr ast;
const Context & context;
@ -78,17 +92,17 @@ private:
void glueTreeImpl(ASTPtr ast, Subtrees & Subtrees);
/** Прописать во всех узлах, что они ещё не вычислены.
*/
void setNotCalculated(ASTPtr ast, unsigned part_id);
void executeImpl(ASTPtr ast, Block & block, unsigned part_id);
void collectFinalColumns(ASTPtr ast, Block & src, Block & dst, bool without_duplicates, unsigned part_id);
void getReturnTypesImpl(ASTPtr ast, DataTypes & res);
void getAggregateInfoImpl(ASTPtr ast, Names & key_names, AggregateDescriptions & aggregates);
void getAggregateInfoImpl(ASTPtr ast, Names & key_names, AggregateDescriptions & aggregates, NamesSet & processed);
bool hasAggregatesImpl(ASTPtr ast);
void markBeforeAndAfterAggregationImpl(ASTPtr ast, unsigned before_part_id, unsigned after_part_id, bool below = false);
};

View File

@ -35,8 +35,8 @@ private:
PART_GROUP = 8,
PART_HAVING = 16,
PART_ORDER = 32,
PART_BELOW_AGGREGATE_FUNCTIONS = 64,
PART_ABOVE_AGGREGATE_FUNCTIONS = 128,
PART_BEFORE_AGGREGATING = 64, /// Под агрегатной функцией, или в ветке, не содержащей агрегатных функций
PART_AFTER_AGGREGATING = 128,
};

View File

@ -30,6 +30,8 @@ public:
ASTFunction() {}
ASTFunction(StringRange range_) : IAST(range_) {}
String getColumnName() { return getTreeID(); }
/** Получить текст, который идентифицирует этот элемент. */
String getID() { return "Function_" + name; }
};

View File

@ -30,6 +30,8 @@ public:
ASTIdentifier() {}
ASTIdentifier(StringRange range_, const String & name_, Kind kind_ = Column) : IAST(range_), name(name_), kind(kind_) {}
String getColumnName() { return name; }
/** Получить текст, который идентифицирует этот элемент. */
String getID() { return "Identifier_" + name; }

View File

@ -20,6 +20,8 @@ public:
ASTLiteral() {}
ASTLiteral(StringRange range_, const Field & value_) : IAST(range_), value(value_) {}
String getColumnName() { return getTreeID(); }
/** Получить текст, который идентифицирует этот элемент. */
String getID() { return "Literal_" + boost::apply_visitor(FieldVisitorDump(), value); }

View File

@ -6,6 +6,8 @@
#include <Poco/SharedPtr.h>
#include <DB/Core/Types.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Parsers/StringRange.h>
@ -38,6 +40,9 @@ public:
IAST() : range(NULL, NULL), calculated(false), part_id(0) {}
IAST(StringRange range_) : range(range_), calculated(false), part_id(0) {}
virtual ~IAST() {}
/** Получить каноническое имя столбца, если элемент является столбцом */
virtual String getColumnName() { throw Exception("Trying to get name of not a column", ErrorCodes::NOT_A_COLUMN); }
/** Получить текст, который идентифицирует этот элемент. */
virtual String getID() = 0;
@ -64,6 +69,14 @@ public:
return s.str();
}
void dumpTree(std::ostream & ostr, size_t indent = 0)
{
String indent_str(indent, '-');
ostr << indent_str << getID() << ", " << this << ", part_id = " << part_id << ", calculated = " << calculated << std::endl;
for (ASTs::iterator it = children.begin(); it != children.end(); ++it)
(*it)->dumpTree(ostr, indent + 1);
}
};

View File

@ -5,13 +5,16 @@ namespace DB
{
/*AggregatingBlockInputStream::AggregatingBlockInputStream(BlockInputStreamPtr input_, SharedPtr<Expression> expression)
AggregatingBlockInputStream::AggregatingBlockInputStream(BlockInputStreamPtr input_, SharedPtr<Expression> expression)
: input(input_), has_been_read(false)
{
children.push_back(input);
}*/
Names key_names;
AggregateDescriptions aggregates;
expression->getAggregateInfo(key_names, aggregates);
aggregator = new Aggregator(key_names, aggregates);
}
@ -22,8 +25,8 @@ Block AggregatingBlockInputStream::readImpl()
has_been_read = true;
AggregatedData data = aggregator.execute(input);
Block res = aggregator.getSampleBlock();
AggregatedData data = aggregator->execute(input);
Block res = aggregator->getSampleBlock();
for (AggregatedData::const_iterator it = data.begin(); it != data.end(); ++it)
{

View File

@ -43,7 +43,14 @@ Block IProfilingBlockInputStream::read()
if (res)
info.update(res);
if (res)
{
std::cerr << std::endl;
std::cerr << getName() << std::endl;
getInfo().print(std::cerr);
}
return res;
}

View File

@ -17,7 +17,7 @@ AggregatedData Aggregator::execute(BlockInputStreamPtr stream)
{
AggregatedData res;
size_t keys_size = keys.size();
size_t keys_size = keys.empty() ? key_names.size() : keys.size();
size_t aggregates_size = aggregates.size();
Row key(keys_size);
Columns key_columns(keys_size);
@ -64,15 +64,7 @@ AggregatedData Aggregator::execute(BlockInputStreamPtr stream)
for (size_t i = 0; i < aggregates_size; ++i)
{
ColumnWithNameAndType col;
col.name = aggregates[i].function->getName() + "(";
for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
{
if (j != 0)
col.name += ",";
col.name += block.getByPosition(aggregates[i].arguments[j]).name;
}
col.name += ")";
col.name = aggregates[i].column_name;
col.type = new DataTypeAggregateFunction;
col.column = new ColumnAggregateFunction;

View File

@ -14,15 +14,6 @@ namespace DB
{
static std::string getName(ASTPtr & ast)
{
if (ASTIdentifier * ident = dynamic_cast<ASTIdentifier *>(&*ast))
return ident->name;
else
return ast->getTreeID();
}
void Expression::addSemantic(ASTPtr & ast)
{
if (dynamic_cast<ASTAsterisk *>(&*ast))
@ -144,18 +135,18 @@ Names Expression::getRequiredColumns()
}
void Expression::setNotCalculated(ASTPtr ast, unsigned part_id)
void Expression::setNotCalculated(unsigned part_id, ASTPtr subtree)
{
if ((ast->part_id & part_id) || (ast->part_id == 0 && part_id == 0))
ast->calculated = false;
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
setNotCalculated(*it, part_id);
if (!subtree)
subtree = ast;
for (ASTs::iterator it = subtree->children.begin(); it != subtree->children.end(); ++it)
setNotCalculated(part_id, *it);
}
void Expression::execute(Block & block, unsigned part_id)
{
setNotCalculated(ast, part_id);
executeImpl(ast, block, part_id);
}
@ -182,14 +173,14 @@ void Expression::executeImpl(ASTPtr ast, Block & block, unsigned part_id)
ColumnWithNameAndType column;
column.type = node->return_type;
column.name = getName(ast);
column.name = node->getColumnName();
size_t result_number = block.columns();
block.insert(column);
ASTs arguments = node->arguments->children;
for (ASTs::iterator it = arguments.begin(); it != arguments.end(); ++it)
argument_numbers.push_back(block.getPositionByName(getName(*it)));
argument_numbers.push_back(block.getPositionByName((*it)->getColumnName()));
node->function->execute(block, argument_numbers, result_number);
}
@ -199,7 +190,7 @@ void Expression::executeImpl(ASTPtr ast, Block & block, unsigned part_id)
ColumnWithNameAndType column;
column.column = node->type->createConstColumn(block.rows(), node->value);
column.type = node->type;
column.name = getName(ast);
column.name = node->getColumnName();
block.insert(column);
}
@ -208,10 +199,10 @@ void Expression::executeImpl(ASTPtr ast, Block & block, unsigned part_id)
}
Block Expression::projectResult(Block & block, bool without_duplicates, unsigned part_id)
Block Expression::projectResult(Block & block, bool without_duplicates, unsigned part_id, ASTPtr subtree)
{
Block res;
collectFinalColumns(ast, block, res, without_duplicates, part_id);
collectFinalColumns(subtree ? subtree : ast, block, res, without_duplicates, part_id);
return res;
}
@ -230,10 +221,10 @@ void Expression::collectFinalColumns(ASTPtr ast, Block & src, Block & dst, bool
if (ASTIdentifier * ident = dynamic_cast<ASTIdentifier *>(&*ast))
{
if (ident->kind == ASTIdentifier::Column)
without_duplicates ? dst.insertUnique(src.getByName(getName(ast))) : dst.insert(src.getByName(getName(ast)));
without_duplicates ? dst.insertUnique(src.getByName(ast->getColumnName())) : dst.insert(src.getByName(ast->getColumnName()));
}
else if (dynamic_cast<ASTLiteral *>(&*ast) || dynamic_cast<ASTFunction *>(&*ast))
without_duplicates ? dst.insertUnique(src.getByName(getName(ast))) : dst.insert(src.getByName(getName(ast)));
without_duplicates ? dst.insertUnique(src.getByName(ast->getColumnName())) : dst.insert(src.getByName(ast->getColumnName()));
else
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
collectFinalColumns(*it, src, dst, without_duplicates, part_id);
@ -266,7 +257,7 @@ void Expression::getReturnTypesImpl(ASTPtr ast, DataTypes & res)
}
void Expression::getAggregateInfoImpl(ASTPtr ast, Names & key_names, AggregateDescriptions & aggregates)
void Expression::getAggregateInfoImpl(ASTPtr ast, Names & key_names, AggregateDescriptions & aggregates, NamesSet & processed)
{
/// Обход в глубину
if (ASTSelectQuery * select = dynamic_cast<ASTSelectQuery *>(&*ast))
@ -278,37 +269,81 @@ void Expression::getAggregateInfoImpl(ASTPtr ast, Names & key_names, AggregateDe
if (ASTIdentifier * ident = dynamic_cast<ASTIdentifier *>(&**it))
{
if (ident->kind == ASTIdentifier::Column)
key_names.push_back(getName(*it));
key_names.push_back((*it)->getColumnName());
}
else if (dynamic_cast<ASTLiteral *>(&**it) || dynamic_cast<ASTFunction *>(&**it))
key_names.push_back(getName(*it));
key_names.push_back((*it)->getColumnName());
}
}
}
if (ASTFunction * func = dynamic_cast<ASTFunction *>(&*ast))
{
if (func->aggregate_function)
if (func->aggregate_function && processed.end() == processed.find(ast->getColumnName()))
{
AggregateDescription desc;
desc.function = func->aggregate_function;
desc.column_name = ast->getColumnName();
for (ASTs::iterator it = func->arguments->children.begin(); it != func->arguments->children.end(); ++it)
desc.argument_names.push_back(getName(*it));
desc.argument_names.push_back((*it)->getColumnName());
aggregates.push_back(desc);
processed.insert(ast->getColumnName());
}
}
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
getAggregateInfoImpl(*it, key_names, aggregates);
getAggregateInfoImpl(*it, key_names, aggregates, processed);
}
void Expression::getAggregateInfo(Names & key_names, AggregateDescriptions & aggregates)
{
getAggregateInfoImpl(ast, key_names, aggregates);
NamesSet processed;
getAggregateInfoImpl(ast, key_names, aggregates, processed);
}
bool Expression::hasAggregatesImpl(ASTPtr ast)
{
if (ASTFunction * func = dynamic_cast<ASTFunction *>(&*ast))
if (func->aggregate_function)
return true;
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
if (hasAggregatesImpl(*it))
return true;
return false;
}
bool Expression::hasAggregates()
{
return hasAggregatesImpl(ast);
}
void Expression::markBeforeAndAfterAggregationImpl(ASTPtr ast, unsigned before_part_id, unsigned after_part_id, bool below)
{
if (ASTFunction * func = dynamic_cast<ASTFunction *>(&*ast))
if (func->aggregate_function)
below = true;
if (below)
ast->part_id |= before_part_id;
else
ast->part_id |= after_part_id;
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
markBeforeAndAfterAggregationImpl(*it, before_part_id, after_part_id, below);
}
void Expression::markBeforeAndAfterAggregation(unsigned before_part_id, unsigned after_part_id)
{
markBeforeAndAfterAggregationImpl(ast, before_part_id, after_part_id);
}
}

View File

@ -4,6 +4,8 @@
#include <DB/DataStreams/LimitBlockInputStream.h>
#include <DB/DataStreams/PartialSortingBlockInputStream.h>
#include <DB/DataStreams/MergeSortingBlockInputStream.h>
#include <DB/DataStreams/AggregatingBlockInputStream.h>
#include <DB/DataStreams/FinalizingAggregatedBlockInputStream.h>
#include <DB/Parsers/ASTSelectQuery.h>
#include <DB/Parsers/ASTIdentifier.h>
@ -91,31 +93,56 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
limit_offset = boost::get<UInt64>(dynamic_cast<ASTLiteral &>(*query.limit_offset).value);
}
bool has_aggregates = expression->hasAggregates();
/** Оптимизация - если не указаны WHERE, GROUP, HAVING, ORDER, но указан LIMIT, и limit + offset < max_block_size,
* то в качестве размера блока будем использовать limit + offset (чтобы не читать из таблицы больше, чем запрошено).
*/
size_t block_size = max_block_size;
if (!query.where_expression && !query.group_expression_list && !query.having_expression && !query.order_expression_list
&& query.limit_length && limit_length + limit_offset < block_size)
&& query.limit_length && !has_aggregates && limit_length + limit_offset < block_size)
{
block_size = limit_length + limit_offset;
}
BlockInputStreamPtr stream = table->read(required_columns, query_ptr, block_size);
bool is_first_expression = true;
/// Если есть условие WHERE - сначала выполним часть выражения, необходимую для его вычисления
if (query.where_expression)
{
setPartID(query.where_expression, PART_WHERE);
stream = new ExpressionBlockInputStream(stream, expression, PART_WHERE);
stream = new ExpressionBlockInputStream(stream, expression, is_first_expression, PART_WHERE);
is_first_expression = false;
}
if (query.where_expression)
{
stream = new FilterBlockInputStream(stream);
}
/// Если есть GROUP BY - сначала выполним часть выражения, необходимую для его вычисления
if (has_aggregates)
{
expression->markBeforeAndAfterAggregation(PART_BEFORE_AGGREGATING, PART_AFTER_AGGREGATING);
if (query.group_expression_list)
setPartID(query.group_expression_list, PART_GROUP);
stream = new ExpressionBlockInputStream(stream, expression, is_first_expression, PART_GROUP | PART_BEFORE_AGGREGATING);
stream = new AggregatingBlockInputStream(stream, expression);
stream = new FinalizingAggregatedBlockInputStream(stream);
is_first_expression = false;
}
/// Выполним оставшуюся часть выражения
setPartID(query.select_expression_list, PART_SELECT);
if (query.order_expression_list)
setPartID(query.order_expression_list, PART_ORDER);
stream = new ExpressionBlockInputStream(stream, expression, PART_SELECT | PART_ORDER);
stream = new ExpressionBlockInputStream(stream, expression, is_first_expression, PART_SELECT | PART_ORDER);
is_first_expression = false;
/// Оставим только столбцы, нужные для SELECT и ORDER BY части
stream = new ProjectionBlockInputStream(stream, expression, true, PART_SELECT | PART_ORDER);
@ -140,7 +167,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
stream = new MergeSortingBlockInputStream(stream, order_descr);
/// Оставим только столбцы, нужные для SELECT части
stream = new ProjectionBlockInputStream(stream, expression, false, PART_SELECT);
stream = new ProjectionBlockInputStream(stream, expression, false, PART_SELECT, query.select_expression_list);
}
/// Если есть LIMIT

View File

@ -130,6 +130,8 @@ int main(int argc, char ** argv)
(*context.functions)["xor"] = new DB::FunctionXor;
(*context.functions)["not"] = new DB::FunctionNot;
context.aggregate_function_factory = new DB::AggregateFunctionFactory;
(*context.databases)["default"]["hits"] = new DB::StorageLog("./", "hits", names_and_types_map, ".bin");
(*context.databases)["system"]["one"] = new DB::StorageSystemOne("one");
(*context.databases)["system"]["numbers"] = new DB::StorageSystemNumbers("numbers");