dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2011-08-28 02:22:23 +00:00
parent a9228f798a
commit 9b153918e7
23 changed files with 196 additions and 71 deletions

View File

@ -1,5 +1,4 @@
#ifndef DBMS_CORE_ERROR_CODES_H #pragma once
#define DBMS_CORE_ERROR_CODES_H
namespace DB namespace DB
@ -66,9 +65,8 @@ namespace ErrorCodes
TABLE_ALREADY_EXISTS, TABLE_ALREADY_EXISTS,
TABLE_METADATA_ALREADY_EXISTS, TABLE_METADATA_ALREADY_EXISTS,
ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER, ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER,
UNKNOWN_TABLE
}; };
} }
} }
#endif

View File

@ -20,7 +20,7 @@ using Poco::SharedPtr;
class ExpressionBlockInputStream : public IBlockInputStream class ExpressionBlockInputStream : public IBlockInputStream
{ {
public: public:
ExpressionBlockInputStream(SharedPtr<IBlockInputStream> input_, SharedPtr<Expression> expression_) ExpressionBlockInputStream(BlockInputStreamPtr input_, SharedPtr<Expression> expression_)
: input(input_), expression(expression_) {} : input(input_), expression(expression_) {}
Block read() Block read()
@ -30,11 +30,11 @@ public:
return res; return res;
expression->execute(res); expression->execute(res);
return res; return expression->projectResult(res);
} }
private: private:
SharedPtr<IBlockInputStream> input; BlockInputStreamPtr input;
SharedPtr<Expression> expression; SharedPtr<Expression> expression;
}; };

View File

@ -19,11 +19,11 @@ class FilterBlockInputStream : public IBlockInputStream
{ {
public: public:
/// filter_column_ - номер столбца с условиями фильтрации /// filter_column_ - номер столбца с условиями фильтрации
FilterBlockInputStream(SharedPtr<IBlockInputStream> input_, size_t filter_column_); FilterBlockInputStream(BlockInputStreamPtr input_, size_t filter_column_);
Block read(); Block read();
private: private:
SharedPtr<IBlockInputStream> input; BlockInputStreamPtr input;
size_t filter_column; size_t filter_column;
}; };

View File

@ -1,5 +1,6 @@
#ifndef DBMS_DATA_STREAMS_IBLOCKINPUTSTREAM_H #pragma once
#define DBMS_DATA_STREAMS_IBLOCKINPUTSTREAM_H
#include <Poco/SharedPtr.h>
#include <DB/Core/Block.h> #include <DB/Core/Block.h>
@ -7,6 +8,9 @@
namespace DB namespace DB
{ {
using Poco::SharedPtr;
/** Интерфейс потока для чтения данных по блокам из БД. /** Интерфейс потока для чтения данных по блокам из БД.
* Реляционные операции предполагается делать также реализациями этого интерфейса. * Реляционные операции предполагается делать также реализациями этого интерфейса.
*/ */
@ -22,6 +26,8 @@ public:
virtual ~IBlockInputStream() {} virtual ~IBlockInputStream() {}
}; };
typedef SharedPtr<IBlockInputStream> BlockInputStreamPtr;
} }
#endif

View File

@ -1,5 +1,6 @@
#ifndef DBMS_DATA_STREAMS_IBLOCKOUTPUTSTREAM_H #pragma once
#define DBMS_DATA_STREAMS_IBLOCKOUTPUTSTREAM_H
#include <Poco/SharedPtr.h>
#include <DB/Core/Block.h> #include <DB/Core/Block.h>
@ -7,6 +8,9 @@
namespace DB namespace DB
{ {
using Poco::SharedPtr;
/** Интерфейс потока для записи данных в БД или в сеть, или в консоль и т. п. /** Интерфейс потока для записи данных в БД или в сеть, или в консоль и т. п.
*/ */
class IBlockOutputStream class IBlockOutputStream
@ -20,6 +24,6 @@ public:
virtual ~IBlockOutputStream() {} virtual ~IBlockOutputStream() {}
}; };
} typedef SharedPtr<IBlockOutputStream> BlockOutputStreamPtr;
#endif }

View File

@ -17,11 +17,11 @@ using Poco::SharedPtr;
class LimitBlockInputStream : public IBlockInputStream class LimitBlockInputStream : public IBlockInputStream
{ {
public: public:
LimitBlockInputStream(SharedPtr<IBlockInputStream> input_, size_t limit_, size_t offset_ = 0); LimitBlockInputStream(BlockInputStreamPtr input_, size_t limit_, size_t offset_ = 0);
Block read(); Block read();
private: private:
SharedPtr<IBlockInputStream> input; BlockInputStreamPtr input;
size_t limit; size_t limit;
size_t offset; size_t offset;
size_t pos; size_t pos;

View File

@ -37,7 +37,7 @@ struct BlockStreamProfileInfo
class ProfilingBlockInputStream : public IBlockInputStream class ProfilingBlockInputStream : public IBlockInputStream
{ {
public: public:
ProfilingBlockInputStream(SharedPtr<IBlockInputStream> in_) ProfilingBlockInputStream(BlockInputStreamPtr in_)
: in(in_) {} : in(in_) {}
Block read(); Block read();
@ -45,7 +45,7 @@ public:
const BlockStreamProfileInfo & getInfo() const; const BlockStreamProfileInfo & getInfo() const;
private: private:
SharedPtr<IBlockInputStream> in; BlockInputStreamPtr in;
BlockStreamProfileInfo info; BlockStreamProfileInfo info;
}; };

View File

@ -1,7 +1,4 @@
#ifndef DBMS_DATA_STREAMS_ROWINPUTSTREAMFROMBLOCKINPUTSTREAM_H #pragma once
#define DBMS_DATA_STREAMS_ROWINPUTSTREAMFROMBLOCKINPUTSTREAM_H
#include <Poco/SharedPtr.h>
#include <DB/DataStreams/IBlockInputStream.h> #include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IRowInputStream.h> #include <DB/DataStreams/IRowInputStream.h>
@ -10,8 +7,6 @@
namespace DB namespace DB
{ {
using Poco::SharedPtr;
/** Преобразует поток для чтения данных по блокам в поток для чтения данных по строкам. /** Преобразует поток для чтения данных по блокам в поток для чтения данных по строкам.
*/ */
@ -30,5 +25,3 @@ private:
}; };
} }
#endif

View File

@ -26,8 +26,14 @@ public:
/** Выполнить выражение над блоком. Блок должен содержать все столбцы - идентификаторы. /** Выполнить выражение над блоком. Блок должен содержать все столбцы - идентификаторы.
* Функция добавляет в блок новые столбцы - результаты вычислений. * Функция добавляет в блок новые столбцы - результаты вычислений.
* part_id - какую часть выражения вычислять.
*/ */
void execute(Block & block); void execute(Block & block, unsigned part_id = 0);
/** Взять из блока с промежуточными результатами вычислений только столбцы, представляющие собой конечный результат.
* Вернуть новый блок, в котором эти столбцы расположены в правильном порядке.
*/
Block projectResult(Block & block);
/** Получить список типов столбцов результата. /** Получить список типов столбцов результата.
*/ */
@ -61,14 +67,9 @@ private:
/** Прописать во всех узлах, что они ещё не вычислены. /** Прописать во всех узлах, что они ещё не вычислены.
*/ */
void setNotCalculated(ASTPtr ast); void setNotCalculated(ASTPtr ast, unsigned part_id);
void executeImpl(ASTPtr ast, Block & block); void executeImpl(ASTPtr ast, Block & block, unsigned part_id);
/** Взять из блока с промежуточными результатами вычислений только столбцы, представляющие собой конечный результат.
* Вернуть новый блок, в котором эти столбцы расположены в правильном порядке.
*/
Block projectResult(ASTPtr ast, Block & block);
void collectFinalColumns(ASTPtr ast, Block & src, Block & dst); void collectFinalColumns(ASTPtr ast, Block & src, Block & dst);

View File

@ -1,5 +1,4 @@
#ifndef DBMS_PARSERS_ASTIDENTIFIER_H #pragma once
#define DBMS_PARSERS_ASTIDENTIFIER_H
#include <DB/DataTypes/IDataType.h> #include <DB/DataTypes/IDataType.h>
#include <DB/Parsers/IAST.h> #include <DB/Parsers/IAST.h>
@ -13,18 +12,27 @@ namespace DB
class ASTIdentifier : public IAST class ASTIdentifier : public IAST
{ {
public: public:
enum Kind
{
Column,
Database,
Table,
};
/// имя /// имя
String name; String name;
/// тип
/// чего идентифицирует этот идентификатор
Kind kind;
/// тип (только для столбцов)
DataTypePtr type; DataTypePtr type;
ASTIdentifier() {} ASTIdentifier() {}
ASTIdentifier(StringRange range_, const String & name_) : IAST(range_), name(name_) {} ASTIdentifier(StringRange range_, const String & name_, Kind kind_ = Column) : IAST(range_), name(name_), kind(kind_) {}
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() { return "Identifier_" + name; } String getID() { return "Identifier_" + name; }
}; };
} }
#endif

View File

@ -23,12 +23,20 @@ public:
typedef std::list<SharedPtr<IAST> > ASTs; typedef std::list<SharedPtr<IAST> > ASTs;
ASTs children; ASTs children;
StringRange range; StringRange range;
/// Было ли соответствующее выражение вычислено.
/** Было ли соответствующее выражение вычислено.
* Используется, чтобы при обходе графа выражения, не вычислять несколько раз одни и те же узлы.
*/
bool calculated; bool calculated;
/** Идентификатор части выражения. Используется при интерпретации, чтобы вычислять не всё выражение сразу,
* а по частям (например, сначала WHERE, потом фильтрация, потом всё остальное).
*/
unsigned part_id;
IAST() : range(NULL, NULL), calculated(false) {} IAST() : range(NULL, NULL), calculated(false), part_id(0) {}
IAST(StringRange range_) : range(range_), calculated(false) {} IAST(StringRange range_) : range(range_), calculated(false), part_id(0) {}
virtual ~IAST() {} virtual ~IAST() {}
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */

View File

@ -44,7 +44,7 @@ public:
* (индексы, блокировки и т. п.) * (индексы, блокировки и т. п.)
* Возвращает объект, с помощью которого можно последовательно читать данные. * Возвращает объект, с помощью которого можно последовательно читать данные.
*/ */
virtual SharedPtr<IBlockInputStream> read( virtual BlockInputStreamPtr read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
size_t max_block_size = DEFAULT_BLOCK_SIZE) size_t max_block_size = DEFAULT_BLOCK_SIZE)
@ -56,7 +56,7 @@ public:
* Принимает описание запроса, в котором может содержаться информация о методе записи данных. * Принимает описание запроса, в котором может содержаться информация о методе записи данных.
* Возвращает объект, с помощью которого можно последовательно писать данные. * Возвращает объект, с помощью которого можно последовательно писать данные.
*/ */
virtual SharedPtr<IBlockOutputStream> write( virtual BlockOutputStreamPtr write(
ASTPtr query) ASTPtr query)
{ {
throw Exception("Method write() is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); throw Exception("Method write() is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);

View File

@ -91,12 +91,12 @@ public:
const NamesAndTypes & getColumns() const { return *columns; } const NamesAndTypes & getColumns() const { return *columns; }
SharedPtr<IBlockInputStream> read( BlockInputStreamPtr read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
size_t max_block_size = DEFAULT_BLOCK_SIZE); size_t max_block_size = DEFAULT_BLOCK_SIZE);
SharedPtr<IBlockOutputStream> write( BlockOutputStreamPtr write(
ASTPtr query); ASTPtr query);
private: private:

View File

@ -36,7 +36,7 @@ public:
const NamesAndTypes & getColumns() const { return columns; } const NamesAndTypes & getColumns() const { return columns; }
SharedPtr<IBlockInputStream> read( BlockInputStreamPtr read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
size_t max_block_size = DEFAULT_BLOCK_SIZE); size_t max_block_size = DEFAULT_BLOCK_SIZE);

View File

@ -0,0 +1,45 @@
#pragma once
#include <DB/Storages/IStorage.h>
namespace DB
{
class OneValueBlockInputStream : public IBlockInputStream
{
public:
OneValueBlockInputStream();
Block read();
private:
bool has_been_read;
};
/** Реализует хранилище для системной таблицы One.
* Таблица содержит единственный столбец dummy UInt8 и единственную строку со значением 0.
* Используется, если в запросе не указана таблица.
* Аналог таблицы DUAL в Oracle и MySQL.
*/
class StorageSystemOne : public IStorage
{
public:
StorageSystemOne(const std::string & name_);
std::string getName() const { return "SystemOne"; }
std::string getTableName() const { return "One"; }
const NamesAndTypes & getColumns() const { return columns; }
BlockInputStreamPtr read(
const Names & column_names,
ASTPtr query,
size_t max_block_size = DEFAULT_BLOCK_SIZE);
private:
const std::string name;
NamesAndTypes columns;
};
}

View File

@ -6,7 +6,7 @@
namespace DB namespace DB
{ {
FilterBlockInputStream::FilterBlockInputStream(SharedPtr<IBlockInputStream> input_, size_t filter_column_) FilterBlockInputStream::FilterBlockInputStream(BlockInputStreamPtr input_, size_t filter_column_)
: input(input_), filter_column(filter_column_) : input(input_), filter_column(filter_column_)
{ {
} }

View File

@ -8,7 +8,7 @@ namespace DB
using Poco::SharedPtr; using Poco::SharedPtr;
LimitBlockInputStream::LimitBlockInputStream(SharedPtr<IBlockInputStream> input_, size_t limit_, size_t offset_) LimitBlockInputStream::LimitBlockInputStream(BlockInputStreamPtr input_, size_t limit_, size_t offset_)
: input(input_), limit(limit_), offset(offset_), pos(0) : input(input_), limit(limit_), offset(offset_), pos(0)
{ {
} }

View File

@ -23,11 +23,14 @@ void Expression::addSemantic(ASTPtr ast)
} }
else if (ASTIdentifier * node = dynamic_cast<ASTIdentifier *>(&*ast)) else if (ASTIdentifier * node = dynamic_cast<ASTIdentifier *>(&*ast))
{ {
NamesAndTypes::const_iterator it = context.columns.find(node->name); if (node->kind == ASTIdentifier::Column)
if (it == context.columns.end()) {
throw Exception("Unknown identifier " + node->name, ErrorCodes::UNKNOWN_IDENTIFIER); NamesAndTypes::const_iterator it = context.columns.find(node->name);
if (it == context.columns.end())
throw Exception("Unknown identifier " + node->name, ErrorCodes::UNKNOWN_IDENTIFIER);
node->type = it->second; node->type = it->second;
}
} }
else if (ASTLiteral * node = dynamic_cast<ASTLiteral *>(&*ast)) else if (ASTLiteral * node = dynamic_cast<ASTLiteral *>(&*ast))
{ {
@ -104,30 +107,30 @@ void Expression::glueTreeImpl(ASTPtr ast, Subtrees & subtrees)
} }
void Expression::setNotCalculated(ASTPtr ast) void Expression::setNotCalculated(ASTPtr ast, unsigned part_id)
{ {
ast->calculated = false; if (ast->part_id == part_id)
ast->calculated = false;
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it) for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
setNotCalculated(*it); setNotCalculated(*it, part_id);
} }
void Expression::execute(Block & block) void Expression::execute(Block & block, unsigned part_id)
{ {
setNotCalculated(ast); setNotCalculated(ast, part_id);
executeImpl(ast, block); executeImpl(ast, block, part_id);
block = projectResult(ast, block);
} }
void Expression::executeImpl(ASTPtr ast, Block & block) void Expression::executeImpl(ASTPtr ast, Block & block, unsigned part_id)
{ {
/// Обход в глубину /// Обход в глубину
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it) for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
executeImpl(*it, block); executeImpl(*it, block, part_id);
if (ast->calculated) if (ast->calculated || ast->part_id != part_id)
return; return;
/** Столбцы из таблицы уже загружены в блок. /** Столбцы из таблицы уже загружены в блок.
@ -186,7 +189,7 @@ void Expression::executeImpl(ASTPtr ast, Block & block)
} }
Block Expression::projectResult(ASTPtr ast, Block & block) Block Expression::projectResult(Block & block)
{ {
Block res; Block res;
collectFinalColumns(ast, block, res); collectFinalColumns(ast, block, res);
@ -197,7 +200,10 @@ Block Expression::projectResult(ASTPtr ast, Block & block)
void Expression::collectFinalColumns(ASTPtr ast, Block & src, Block & dst) void Expression::collectFinalColumns(ASTPtr ast, Block & src, Block & dst)
{ {
if (ASTIdentifier * ident = dynamic_cast<ASTIdentifier *>(&*ast)) if (ASTIdentifier * ident = dynamic_cast<ASTIdentifier *>(&*ast))
dst.insert(src.getByName(ident->name)); {
if (ident->kind == ASTIdentifier::Column)
dst.insert(src.getByName(ident->name));
}
else if (dynamic_cast<ASTLiteral *>(&*ast)) else if (dynamic_cast<ASTLiteral *>(&*ast))
dst.insert(src.getByName(ast->getTreeID())); dst.insert(src.getByName(ast->getTreeID()));
else if (ASTFunction * func = dynamic_cast<ASTFunction *>(&*ast)) else if (ASTFunction * func = dynamic_cast<ASTFunction *>(&*ast))

View File

@ -179,6 +179,7 @@ int main(int argc, char ** argv)
stopwatch.start(); stopwatch.start();
expression.execute(block); expression.execute(block);
block = expression.projectResult(block);
stopwatch.stop(); stopwatch.stop();
std::cout << std::fixed << std::setprecision(2) std::cout << std::fixed << std::setprecision(2)

View File

@ -1,4 +1,5 @@
#include <DB/Parsers/ASTSelectQuery.h> #include <DB/Parsers/ASTSelectQuery.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/IParserBase.h> #include <DB/Parsers/IParserBase.h>
#include <DB/Parsers/CommonParsers.h> #include <DB/Parsers/CommonParsers.h>
#include <DB/Parsers/ExpressionElementParsers.h> #include <DB/Parsers/ExpressionElementParsers.h>
@ -65,6 +66,9 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & ex
ws.ignore(pos, end); ws.ignore(pos, end);
} }
dynamic_cast<ASTIdentifier &>(*select_query->database).kind = ASTIdentifier::Database;
dynamic_cast<ASTIdentifier &>(*select_query->table).kind = ASTIdentifier::Table;
} }
/// WHERE expr /// WHERE expr

View File

@ -84,7 +84,7 @@ StorageLog::StorageLog(const std::string & path_, const std::string & name_, Sha
} }
SharedPtr<IBlockInputStream> StorageLog::read( BlockInputStreamPtr StorageLog::read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
size_t max_block_size) size_t max_block_size)
@ -94,7 +94,7 @@ SharedPtr<IBlockInputStream> StorageLog::read(
} }
SharedPtr<IBlockOutputStream> StorageLog::write( BlockOutputStreamPtr StorageLog::write(
ASTPtr query) ASTPtr query)
{ {
return new LogBlockOutputStream(*this); return new LogBlockOutputStream(*this);

View File

@ -46,7 +46,7 @@ StorageSystemNumbers::StorageSystemNumbers(const std::string & name_)
} }
SharedPtr<IBlockInputStream> StorageSystemNumbers::read( BlockInputStreamPtr StorageSystemNumbers::read(
const Names & column_names, ASTPtr query, size_t max_block_size) const Names & column_names, ASTPtr query, size_t max_block_size)
{ {
check(column_names); check(column_names);

View File

@ -0,0 +1,51 @@
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/Storages/StorageSystemOne.h>
namespace DB
{
OneValueBlockInputStream::OneValueBlockInputStream() : has_been_read(false)
{
}
Block OneValueBlockInputStream::read()
{
Block res;
if (has_been_read)
return res;
has_been_read = true;
ColumnWithNameAndType col;
col.name = "dummy";
col.type = new DataTypeUInt8;
col.column = new ColumnConstUInt8(0, 1);
res.insert(col);
return res;
}
StorageSystemOne::StorageSystemOne(const std::string & name_)
: name(name_)
{
columns["dummy"] = new DataTypeUInt8;
}
BlockInputStreamPtr StorageSystemOne::read(
const Names & column_names, ASTPtr query, size_t max_block_size)
{
check(column_names);
return new OneValueBlockInputStream();
}
}