dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2011-09-04 21:23:19 +00:00
parent 2215bc9626
commit c65e913754
28 changed files with 142 additions and 77 deletions

View File

@ -1,10 +1,9 @@
#ifndef DBMS_DATA_STREAMS_BLOCKINPUTSTREAMFROMROWINPUTSTREAM_H
#define DBMS_DATA_STREAMS_BLOCKINPUTSTREAMFROMROWINPUTSTREAM_H
#pragma once
#include <Poco/SharedPtr.h>
#include <DB/Core/Defines.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/IRowInputStream.h>
@ -17,7 +16,7 @@ using Poco::SharedPtr;
/** Преобразует поток для чтения данных по строкам в поток для чтения данных по блокам.
* Наример, для чтения текстового дампа.
*/
class BlockInputStreamFromRowInputStream : public IBlockInputStream
class BlockInputStreamFromRowInputStream : public IProfilingBlockInputStream
{
public:
/** sample_ - пустой блок, который описывает, как интерпретировать значения */
@ -26,7 +25,9 @@ public:
const Block & sample_,
size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
Block read();
Block readImpl();
String getName() const { return "BlockInputStreamFromRowInputStream"; }
private:
@ -38,5 +39,3 @@ private:
};
}
#endif

View File

@ -3,7 +3,7 @@
#include <Poco/SharedPtr.h>
#include <DB/Interpreters/Expression.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
@ -19,13 +19,16 @@ using Poco::SharedPtr;
* part_id - идентификатор части выражения, которую надо вычислять.
* Например, может потребоваться вычислить только часть выражения в секции WHERE.
*/
class ExpressionBlockInputStream : public IBlockInputStream
class ExpressionBlockInputStream : public IProfilingBlockInputStream
{
public:
ExpressionBlockInputStream(BlockInputStreamPtr input_, SharedPtr<Expression> expression_, unsigned part_id_ = 0)
: input(input_), expression(expression_), part_id(part_id_) {}
: input(input_), expression(expression_), part_id(part_id_)
{
children.push_back(input);
}
Block read()
Block readImpl()
{
Block res = input->read();
if (!res)
@ -35,6 +38,8 @@ public:
return res;
}
String getName() const { return "ExpressionBlockInputStream"; }
private:
BlockInputStreamPtr input;
SharedPtr<Expression> expression;

View File

@ -2,7 +2,7 @@
#include <Poco/SharedPtr.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
@ -15,12 +15,14 @@ using Poco::SharedPtr;
* На вход подаётся поток блоков, в котором в одном из столбцов типа ColumnUInt8 содержатся условия фильтрации.
* Возвращается поток блоков, в котором содержатся только отфильтрованные строки.
*/
class FilterBlockInputStream : public IBlockInputStream
class FilterBlockInputStream : public IProfilingBlockInputStream
{
public:
/// filter_column_ - номер столбца с условиями фильтрации. -1 - последний столбец
FilterBlockInputStream(BlockInputStreamPtr input_, ssize_t filter_column_ = -1);
Block read();
Block readImpl();
String getName() const { return "FilterBlockInputStream"; }
private:
BlockInputStreamPtr input;

View File

@ -17,17 +17,30 @@ using Poco::SharedPtr;
class IBlockInputStream
{
public:
typedef SharedPtr<IBlockInputStream> BlockInputStreamPtr;
typedef std::vector<BlockInputStreamPtr> BlockInputStreams;
/** Прочитать следующий блок.
* Если блоков больше нет - вернуть пустой блок (для которого operator bool возвращает false).
*/
virtual Block read() = 0;
virtual ~IBlockInputStream() {}
/** Для вывода дерева преобразований потока данных (плана выполнения запроса).
*/
virtual String getName() const = 0;
BlockInputStreams & getChildren() { return children; }
void dumpTree(std::ostream & ostr, size_t indent = 0);
protected:
BlockInputStreams children;
};
typedef SharedPtr<IBlockInputStream> BlockInputStreamPtr;
typedef IBlockInputStream::BlockInputStreamPtr BlockInputStreamPtr;
typedef IBlockInputStream::BlockInputStreams BlockInputStreams;
}

View File

@ -30,22 +30,21 @@ struct BlockStreamProfileInfo
};
/** Смотрит за тем, как работает другой поток блоков.
/** Смотрит за тем, как работает поток блоков.
* Позволяет получить информацию для профайлинга:
* строк в секунду, блоков в секунду, мегабайт в секунду и т. п.
*/
class ProfilingBlockInputStream : public IBlockInputStream
class IProfilingBlockInputStream : public IBlockInputStream
{
public:
ProfilingBlockInputStream(BlockInputStreamPtr in_)
: in(in_) {}
Block read();
/// Наследники должны реализовать эту функцию.
virtual Block readImpl() = 0;
const BlockStreamProfileInfo & getInfo() const;
private:
BlockInputStreamPtr in;
BlockStreamProfileInfo info;
};

View File

@ -1,9 +1,8 @@
#ifndef DBMS_DATA_STREAMS_LIMITBLOCKINPUTSTREAM_H
#define DBMS_DATA_STREAMS_LIMITBLOCKINPUTSTREAM_H
#pragma once
#include <Poco/SharedPtr.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
@ -14,11 +13,13 @@ using Poco::SharedPtr;
/** Реализует реляционную операцию LIMIT.
*/
class LimitBlockInputStream : public IBlockInputStream
class LimitBlockInputStream : public IProfilingBlockInputStream
{
public:
LimitBlockInputStream(BlockInputStreamPtr input_, size_t limit_, size_t offset_ = 0);
Block read();
Block readImpl();
String getName() const { return "LimitBlockInputStream"; }
private:
BlockInputStreamPtr input;
@ -28,5 +29,3 @@ private:
};
}
#endif

View File

@ -2,7 +2,7 @@
#include <DB/Core/SortDescription.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
@ -10,13 +10,18 @@ namespace DB
/** Соединяет поток сортированных по отдельности блоков в сортированный целиком поток.
*/
class MergeSortingBlockInputStream : public IBlockInputStream
class MergeSortingBlockInputStream : public IProfilingBlockInputStream
{
public:
MergeSortingBlockInputStream(BlockInputStreamPtr input_, SortDescription & description_)
: input(input_), description(description_) {}
: input(input_), description(description_)
{
children.push_back(input);
}
Block read();
Block readImpl();
String getName() const { return "MergeSortingBlockInputStream"; }
private:
BlockInputStreamPtr input;

View File

@ -1,7 +1,7 @@
#pragma once
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
@ -10,7 +10,7 @@ namespace DB
/** Десериализует поток блоков из родного бинарного формата (с именами и типами столбцов).
* Предназначено для взаимодействия между серверами.
*/
class NativeBlockInputStream : public IBlockInputStream
class NativeBlockInputStream : public IProfilingBlockInputStream
{
public:
NativeBlockInputStream(ReadBuffer & istr_, DataTypeFactory & data_type_factory_)
@ -19,7 +19,9 @@ public:
/** Прочитать следующий блок.
* Если блоков больше нет - вернуть пустой блок (для которого operator bool возвращает false).
*/
Block read();
Block readImpl();
String getName() const { return "NativeBlockInputStream"; }
private:
ReadBuffer & istr;

View File

@ -2,7 +2,7 @@
#include <DB/Core/SortDescription.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
@ -11,13 +11,18 @@ namespace DB
/** Сортирует каждый блок по отдельности по значениям указанных столбцов.
* На данный момент, используется не очень оптимальный алгоритм.
*/
class PartialSortingBlockInputStream : public IBlockInputStream
class PartialSortingBlockInputStream : public IProfilingBlockInputStream
{
public:
PartialSortingBlockInputStream(BlockInputStreamPtr input_, SortDescription & description_)
: input(input_), description(description_) {}
: input(input_), description(description_)
{
children.push_back(input);
}
Block read();
Block readImpl();
String getName() const { return "PartialSortingBlockInputStream"; }
private:
BlockInputStreamPtr input;

View File

@ -3,7 +3,7 @@
#include <Poco/SharedPtr.h>
#include <DB/Interpreters/Expression.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
@ -15,7 +15,7 @@ using Poco::SharedPtr;
/** Выбирает из блока только столбцы, являющиеся результатом вычисления выражения.
* Следует применять после ExpressionBlockInputStream.
*/
class ProjectionBlockInputStream : public IBlockInputStream
class ProjectionBlockInputStream : public IProfilingBlockInputStream
{
public:
ProjectionBlockInputStream(
@ -23,9 +23,12 @@ public:
SharedPtr<Expression> expression_,
bool without_duplicates_ = false,
unsigned part_id_ = 0)
: input(input_), expression(expression_), without_duplicates(without_duplicates_), part_id(part_id_) {}
: input(input_), expression(expression_), without_duplicates(without_duplicates_), part_id(part_id_)
{
children.push_back(input);
}
Block read()
Block readImpl()
{
Block res = input->read();
if (!res)
@ -34,6 +37,8 @@ public:
return expression->projectResult(res, without_duplicates, part_id);
}
String getName() const { return "ProjectionBlockInputStream"; }
private:
BlockInputStreamPtr input;
SharedPtr<Expression> expression;

View File

@ -13,6 +13,7 @@
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/Storages/IStorage.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
@ -21,11 +22,12 @@ namespace DB
using Poco::SharedPtr;
class StorageLog;
class LogBlockInputStream : public IBlockInputStream
class LogBlockInputStream : public IProfilingBlockInputStream
{
public:
LogBlockInputStream(size_t block_size_, const Names & column_names_, StorageLog & storage_);
Block read();
Block readImpl();
String getName() const { return "LogBlockInputStream"; }
private:
size_t block_size;
Names column_names;

View File

@ -3,6 +3,7 @@
#include <Poco/SharedPtr.h>
#include <DB/Storages/IStorage.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
@ -11,11 +12,12 @@ namespace DB
using Poco::SharedPtr;
class NumbersBlockInputStream : public IBlockInputStream
class NumbersBlockInputStream : public IProfilingBlockInputStream
{
public:
NumbersBlockInputStream(size_t block_size_);
Block read();
Block readImpl();
String getName() const { return "NumbersBlockInputStream"; }
private:
size_t block_size;
UInt64 next;

View File

@ -1,17 +1,19 @@
#pragma once
#include <DB/Storages/IStorage.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
class OneValueBlockInputStream : public IBlockInputStream
class OneValueBlockInputStream : public IProfilingBlockInputStream
{
public:
OneValueBlockInputStream();
Block read();
Block readImpl();
String getName() const { return "OneValueBlockInputStream"; }
private:
bool has_been_read;
};

View File

@ -35,7 +35,7 @@ void BlockInputStreamFromRowInputStream::initBlock(Block & res)
}
Block BlockInputStreamFromRowInputStream::read()
Block BlockInputStreamFromRowInputStream::readImpl()
{
Block res;

View File

@ -10,10 +10,11 @@ namespace DB
FilterBlockInputStream::FilterBlockInputStream(BlockInputStreamPtr input_, ssize_t filter_column_)
: input(input_), filter_column(filter_column_)
{
children.push_back(input);
}
Block FilterBlockInputStream::read()
Block FilterBlockInputStream::readImpl()
{
/// Пока не встретится блок, после фильтрации которого что-нибудь останется, или поток не закончится.
while (1)

View File

@ -0,0 +1,27 @@
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/IBlockInputStream.h>
namespace DB
{
void IBlockInputStream::dumpTree(std::ostream & ostr, size_t indent)
{
String indent_str(indent, '-');
ostr << indent_str << getName() << std::endl;
if (IProfilingBlockInputStream * profiling = dynamic_cast<IProfilingBlockInputStream *>(this))
{
if (profiling->getInfo().blocks != 0)
{
profiling->getInfo().print(ostr);
ostr << std::endl;
}
}
for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it)
(*it)->dumpTree(ostr, indent + 1);
}
}

View File

@ -1,6 +1,6 @@
#include <iomanip>
#include <DB/DataStreams/ProfilingBlockInputStream.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
@ -28,13 +28,13 @@ void BlockStreamProfileInfo::print(std::ostream & ostr) const
}
Block ProfilingBlockInputStream::read()
Block IProfilingBlockInputStream::read()
{
if (!info.started)
info.total_stopwatch.start();
info.work_stopwatch.start();
Block res = in->read();
Block res = readImpl();
info.work_stopwatch.stop();
if (res)
@ -44,7 +44,7 @@ Block ProfilingBlockInputStream::read()
}
const BlockStreamProfileInfo & ProfilingBlockInputStream::getInfo() const
const BlockStreamProfileInfo & IProfilingBlockInputStream::getInfo() const
{
return info;
}

View File

@ -11,10 +11,11 @@ using Poco::SharedPtr;
LimitBlockInputStream::LimitBlockInputStream(BlockInputStreamPtr input_, size_t limit_, size_t offset_)
: input(input_), limit(limit_), offset(offset_), pos(0)
{
children.push_back(input);
}
Block LimitBlockInputStream::read()
Block LimitBlockInputStream::readImpl()
{
Block res;
size_t rows = 0;

View File

@ -4,7 +4,7 @@
namespace DB
{
Block MergeSortingBlockInputStream::read()
Block MergeSortingBlockInputStream::readImpl()
{
/** На данный момент - очень простой алгоритм:
* - прочитать в оперативку все блоки;

View File

@ -7,7 +7,7 @@
namespace DB
{
Block NativeBlockInputStream::read()
Block NativeBlockInputStream::readImpl()
{
Block res;

View File

@ -34,7 +34,7 @@ struct PartialSortingLess
};
Block PartialSortingBlockInputStream::read()
Block PartialSortingBlockInputStream::readImpl()
{
Block res = input->read();
if (!res)

View File

@ -15,7 +15,6 @@
#include <DB/DataStreams/ExpressionBlockInputStream.h>
#include <DB/DataStreams/ProjectionBlockInputStream.h>
#include <DB/DataStreams/FilterBlockInputStream.h>
#include <DB/DataStreams/ProfilingBlockInputStream.h>
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
#include <DB/DataStreams/copyData.h>
@ -168,8 +167,7 @@ int main(int argc, char ** argv)
;
Poco::SharedPtr<DB::IBlockInputStream> in = table.read(column_names, 0);
Poco::SharedPtr<DB::ProfilingBlockInputStream> profiling = new DB::ProfilingBlockInputStream(in);
in = new DB::ExpressionBlockInputStream(profiling, expression);
in = new DB::ExpressionBlockInputStream(in, expression);
in = new DB::ProjectionBlockInputStream(in, expression);
in = new DB::FilterBlockInputStream(in, 4);
//in = new DB::LimitBlockInputStream(in, 10);
@ -179,7 +177,7 @@ int main(int argc, char ** argv)
DB::copyData(*in, out);
profiling->getInfo().print(std::cerr);
//profiling->getInfo().print(std::cerr);
}
catch (const DB::Exception & e)
{

View File

@ -19,7 +19,6 @@
#include <DB/DataStreams/LimitBlockInputStream.h>
#include <DB/DataStreams/PartialSortingBlockInputStream.h>
#include <DB/DataStreams/MergeSortingBlockInputStream.h>
#include <DB/DataStreams/ProfilingBlockInputStream.h>
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
#include <DB/DataStreams/copyData.h>
@ -156,14 +155,8 @@ int main(int argc, char ** argv)
sort_columns.push_back(DB::SortColumnDescription(3, 1));
Poco::SharedPtr<DB::IBlockInputStream> in = table.read(column_names, 0, argc == 2 ? atoi(argv[1]) : 1048576);
Poco::SharedPtr<DB::ProfilingBlockInputStream> profiling1 = new DB::ProfilingBlockInputStream(in);
in = profiling1;
in = new DB::PartialSortingBlockInputStream(in, sort_columns);
Poco::SharedPtr<DB::ProfilingBlockInputStream> profiling2 = new DB::ProfilingBlockInputStream(in);
in = profiling2;
in = new DB::MergeSortingBlockInputStream(in, sort_columns);
Poco::SharedPtr<DB::ProfilingBlockInputStream> profiling3 = new DB::ProfilingBlockInputStream(in);
in = profiling3;
//in = new DB::LimitBlockInputStream(in, 10);
DB::WriteBufferFromOStream ob(std::cout);
@ -171,12 +164,12 @@ int main(int argc, char ** argv)
DB::copyData(*in, out);
std::cerr << std::endl << "Reading: " << std::endl;
/* std::cerr << std::endl << "Reading: " << std::endl;
profiling1->getInfo().print(std::cerr);
std::cerr << std::endl << "Sorting: " << std::endl;
profiling2->getInfo().print(std::cerr);
std::cerr << std::endl << "Merging: " << std::endl;
profiling3->getInfo().print(std::cerr);
profiling3->getInfo().print(std::cerr);*/
}
catch (const DB::Exception & e)
{

View File

@ -71,6 +71,8 @@ public:
else
return DB::Block();
}
DB::String getName() const { return "OneBlockInputStream"; }
};

View File

@ -166,11 +166,14 @@ int main(int argc, char ** argv)
*/
DB::InterpreterSelectQuery interpreter(ast, context);
DB::BlockInputStreamPtr in = interpreter.execute();
DB::WriteBufferFromOStream ob(std::cout);
DB::TabSeparatedRowOutputStream out(ob, new DB::DataTypes(interpreter.getReturnTypes()));
DB::copyData(*in, out);
std::cerr << std::endl;
in->dumpTree(std::cerr);
}
catch (const DB::Exception & e)
{

View File

@ -18,7 +18,7 @@ LogBlockInputStream::LogBlockInputStream(size_t block_size_, const Names & colum
}
Block LogBlockInputStream::read()
Block LogBlockInputStream::readImpl()
{
Block res;

View File

@ -18,7 +18,7 @@ NumbersBlockInputStream::NumbersBlockInputStream(size_t block_size_) : block_siz
}
Block NumbersBlockInputStream::read()
Block NumbersBlockInputStream::readImpl()
{
Block res;

View File

@ -17,7 +17,7 @@ OneValueBlockInputStream::OneValueBlockInputStream() : has_been_read(false)
}
Block OneValueBlockInputStream::read()
Block OneValueBlockInputStream::readImpl()
{
Block res;
if (has_been_read)