dbms: development.

This commit is contained in:
Alexey Milovidov 2010-03-04 19:20:28 +00:00
parent dbd0f68738
commit 148b08ff1a
20 changed files with 566 additions and 40 deletions

View File

@ -0,0 +1,20 @@
#ifndef DBMS_COLUMN_TYPES_COLUMNTYPES_H
#define DBMS_COLUMN_TYPES_COLUMNTYPES_H
#include <vector>
#include <Poco/SharedPtr.h>
#include <DB/ColumnTypes/IColumnType.h>
namespace DB
{
using Poco::SharedPtr;
typedef std::vector<SharedPtr<IColumnType> > ColumnTypes;
}
#endif

View File

@ -5,7 +5,7 @@
#include <map>
#include <list>
#include <DB/Core/ColumnWithMetadata.h>
#include <DB/Core/ColumnWithNameAndType.h>
namespace DB
@ -24,7 +24,7 @@ public:
typedef std::map<String, Container_t::iterator> IndexByName_t;
private:
Container_t columns;
Container_t data;
IndexByPosition_t index_by_position;
IndexByName_t index_by_name;
@ -40,8 +40,15 @@ public:
ColumnWithNameAndType & getByName(const std::string & name);
const ColumnWithNameAndType & getByName(const std::string & name) const;
operator bool() { return !columns.empty(); }
operator!() { return columns.empty(); }
/** Возвращает количество строк в блоке.
* Заодно проверяет, что все столбцы кроме констант (которые содержат единственное значение),
* содержат одинаковое число значений.
*/
size_t rows() const;
size_t columns() const;
operator bool() const { return !data.empty(); }
bool operator!() const { return data.empty(); }
};
}

View File

@ -14,6 +14,7 @@ namespace DB
{
/** Типы данных для представления столбцов значений в оперативке.
* Столбец значений может быть представлен массивом значений или единичным значением, если столбец - константа.
*/
typedef std::vector<UInt8> UInt8Column;
@ -46,6 +47,36 @@ typedef boost::make_recursive_variant<
typedef std::vector<Column> TupleColumn; /// Столбец значений типа "кортеж" - несколько столбцов произвольного типа
typedef std::vector<Column> ArrayColumn; /// Столбец значений типа "массив" - столбец, значения в котором - массивы
/** Возвращает количество значений в столбце
* TODO: поправить для tuple.
*/
class ColumnVisitorSize : public boost::static_visitor<size_t>
{
public:
template <typename T> size_t operator() (const T & x) const { return x.size(); }
};
/** Возвращает n-ый элемент столбца.
* TODO: поправить для tuple.
*/
class ColumnVisitorNthElement : public boost::static_visitor<Field>
{
public:
ColumnVisitorNthElement(size_t n_) : n(n_) {}
template <typename T> Field operator() (const T & x) const
{
return x.size() == 1
? x[0] /// столбец - константа
: x[n];
}
private:
size_t n;
};
}
#endif

View File

@ -0,0 +1,15 @@
#ifndef DBMS_CORE_COLUMN_NAMES_H
#define DBMS_CORE_COLUMN_NAMES_H
#include <vector>
#include <string>
namespace DB
{
typedef std::vector<std::string> ColumnNames;
}
#endif

View File

@ -14,6 +14,11 @@ namespace ErrorCodes
UNEXPECTED_END_OF_FILE,
CANNOT_READ_DATA_FROM_ISTREAM,
CANNOT_PARSE_TEXT,
INCORRECT_NUMBER_OF_COLUMNS,
THERE_IS_NO_COLUMN,
SIZES_OF_COLUMNS_DOESNT_MATCH,
EMPTY_COLUMN_IN_BLOCK,
NOT_FOUND_COLUMN_IN_BLOCK,
};
}

View File

@ -1,9 +1,5 @@
#ifndef DBMS_DATA_STREAMS_IBLOCKINPUTSTREAM_H
#ifndef DBMS_DATA_STREAMS_IBLOCKINPUTSTREAM_H
#include <ostream>
#include <Poco/SharedPtr.h>
#define DBMS_DATA_STREAMS_IBLOCKINPUTSTREAM_H
#include <DB/Core/Block.h>
@ -11,8 +7,6 @@
namespace DB
{
using Poco::SharedPtr;
/** Интерфейс потока для чтения данных по блокам из БД.
* Реляционные операции предполагается делать также реализациями этого интерфейса.
*/

View File

@ -0,0 +1,25 @@
#ifndef DBMS_DATA_STREAMS_IBLOCKOUTPUTSTREAM_H
#define DBMS_DATA_STREAMS_IBLOCKOUTPUTSTREAM_H
#include <DB/Core/Block.h>
namespace DB
{
/** Интерфейс потока для записи данных в БД или в сеть, или в консоль и т. п.
*/
class IBlockOutputStream
{
public:
/** Записать блок.
*/
virtual void write(const Block & block) = 0;
virtual ~IBlockOutputStream() {}
};
}
#endif

View File

@ -0,0 +1,26 @@
#ifndef DBMS_DATA_STREAMS_IROWINPUTSTREAM_H
#define DBMS_DATA_STREAMS_IROWINPUTSTREAM_H
#include <DB/Core/Row.h>
namespace DB
{
/** Интерфейс потока для чтения данных по строкам.
*/
class IRowInputStream
{
public:
/** Прочитать следующую строку.
* Если строк больше нет - вернуть пустую строку.
*/
virtual Row read() = 0;
virtual ~IRowInputStream() {}
};
}
#endif

View File

@ -0,0 +1,34 @@
#ifndef DBMS_DATA_STREAMS_IROWOUTPUTSTREAM_H
#define DBMS_DATA_STREAMS_IROWOUTPUTSTREAM_H
#include <DB/Core/Row.h>
namespace DB
{
/** Интерфейс потока для записи данных по строкам (например, для вывода в консоль).
*/
class IRowOutputStream
{
public:
/** Записать строку.
* Есть реализация по умолчанию, которая использует методы для записи одиночных значений и разделителей.
*/
virtual void write(const Row & row);
/** Записать значение. */
virtual void writeField(const Field & field) = 0;
/** Записать разделитель. */
virtual void writeFieldDelimiter() {};
virtual void writeRowStartDelimiter() {};
virtual void writeRowEndDelimiter() {};
virtual ~IRowOutputStream() {}
};
}
#endif

View File

@ -0,0 +1,34 @@
#ifndef DBMS_DATA_STREAMS_ROWINPUTSTREAMFROMBLOCKINPUTSTREAM_H
#define DBMS_DATA_STREAMS_ROWINPUTSTREAMFROMBLOCKINPUTSTREAM_H
#include <Poco/SharedPtr.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IRowInputStream.h>
namespace DB
{
using Poco::SharedPtr;
/** Преобразует поток для чтения данных по блокам в поток для чтения данных по строкам.
*/
class RowInputStreamFromBlockInputStream : public IRowInputStream
{
public:
explicit RowInputStreamFromBlockInputStream(IBlockInputStream & block_input_);
Row read();
private:
IBlockInputStream & block_input;
size_t pos;
size_t current_rows;
Block current_block;
};
}
#endif

View File

@ -0,0 +1,37 @@
#ifndef DBMS_DATA_STREAMS_TABSEPARATEDROWOUTPUTSTREAM_H
#define DBMS_DATA_STREAMS_TABSEPARATEDROWOUTPUTSTREAM_H
#include <ostream>
#include <Poco/SharedPtr.h>
#include <DB/ColumnTypes/ColumnTypes.h>
#include <DB/DataStreams/IRowOutputStream.h>
namespace DB
{
using Poco::SharedPtr;
/** Интерфейс потока для вывода данных в формате tsv.
*/
class TabSeparatedRowOutputStream : public IRowOutputStream
{
public:
TabSeparatedRowOutputStream(std::ostream & ostr_, SharedPtr<ColumnTypes> column_types_);
void writeField(const Field & field);
void writeFieldDelimiter();
void writeRowEndDelimiter();
private:
std::ostream & ostr;
SharedPtr<ColumnTypes> column_types;
size_t field_number;
};
}
#endif

View File

@ -0,0 +1,22 @@
#ifndef DBMS_DATA_STREAMS_COPY_DATA_H
#define DBMS_DATA_STREAMS_COPY_DATA_H
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/DataStreams/IRowInputStream.h>
#include <DB/DataStreams/IRowOutputStream.h>
namespace DB
{
/** Копирует данные из InputStream в OutputStream
* (например, из БД в консоль и т. п.)
*/
void copyData(IBlockInputStream & from, IBlockOutputStream & to);
void copyData(IRowInputStream & from, IRowOutputStream & to);
void copyData(IBlockInputStream & from, IRowOutputStream & to);
}
#endif

View File

@ -1,12 +1,22 @@
#ifndef DBMS_STORAGES_ISTORAGE_H
#define DBMS_STORAGES_ISTORAGE_H
//#include <boost/property_tree/ptree.hpp>
#include <Poco/SharedPtr.h>
#include <DB/Core/ColumnNames.h>
#include <DB/Core/Exception.h>
#include <DB/DataStreams/IBlockInputStream.h>
#define DEFAULT_BLOCK_SIZE 1048576
namespace DB
{
typedef char ptree; /// временная заглушка, вместо boost::property_tree::ptree
//using boost::property_tree::ptree;
using Poco::SharedPtr;
/** Хранилище. Отвечает за:
@ -18,27 +28,23 @@ using Poco::SharedPtr;
*/
class IStorage
{
private:
/** Установить указатель на таблицу и кол-группу.
* - часть инициализации, которая выполняется при инициализации таблицы.
* (инициализация хранилища выполняется в два шага:
* 1 - конструктор,
* 2 - добавление к таблице (выполняется в конструкторе Table))
*/
virtual void addToTable(Table * table_, ColumnGroup * column_group_) = 0;
public:
/** Прочитать данные, соответствующие точному значению ключа или префиксу.
/// Основное имя типа таблицы (например, StorageWithoutKey).
virtual std::string getName() const = 0;
/** Читать набор столбцов из таблицы.
* Принимает список столбцов, которых нужно прочитать, а также описание запроса,
* из которого может быть извлечена информация о том, каким способом извлекать данные
* (индексы, блокировки и т. п.)
* Возвращает объект, с помощью которого можно последовательно читать данные.
*/
virtual Poco::SharedPtr<ITablePartReader> read(const Row & key) = 0;
/** Записать пачку данных в таблицу, обновляя существующие данные, если они есть.
* @param data - набор данных вида ключ (набор столбцов) -> значение (набор столбцов)
* @param mask - битовая маска - какие столбцы входят в кол-группу,
* которую хранит это хранилище
*/
virtual void merge(const AggregatedRowSet & data, const ColumnMask & mask) = 0;
virtual SharedPtr<IBlockInputStream> read(
const ColumnNames & column_names,
const ptree & query,
size_t max_block_size = DEFAULT_BLOCK_SIZE)
{
throw Exception("Method read() is not supported by storage " + getName());
}
virtual ~IStorage() {}
};

View File

@ -0,0 +1,43 @@
#ifndef DBMS_STORAGES_STORAGE_SYSTEM_NUMBERS_H
#define DBMS_STORAGES_STORAGE_SYSTEM_NUMBERS_H
#include <Poco/SharedPtr.h>
#include <DB/Storages/IStorage.h>
namespace DB
{
using Poco::SharedPtr;
class NumbersBlockInputStream : public IBlockInputStream
{
public:
NumbersBlockInputStream(size_t block_size_);
Block read();
private:
size_t block_size;
UInt64 next;
};
/** Реализует хранилище для системной таблицы Numbers.
* Таблица содержит единственный столбец number UInt64.
* Из этой таблицы можно прочитать все натуральные числа, начиная с 0 (до 2^64 - 1, а потом заново).
*/
class StorageSystemNumbers : public IStorage
{
public:
std::string getName() const { return "SystemNumbers"; }
SharedPtr<IBlockInputStream> read(
const ColumnNames & column_names,
const ptree & query,
size_t max_block_size = DEFAULT_BLOCK_SIZE);
};
}
#endif

View File

@ -1,3 +1,6 @@
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Core/Block.h>
@ -6,16 +9,16 @@ namespace DB
void Block::rebuildIndexByPosition()
{
index_by_position.resize(columns.size());
index_by_position.resize(data.size());
size_t pos = 0;
for (Container_t::iterator it = columns.begin(); it != columns.end(); ++it, ++pos)
for (Container_t::iterator it = data.begin(); it != data.end(); ++it, ++pos)
index_by_position[pos] = it;
}
void Block::insert(size_t position, const ColumnWithNameAndType & elem)
{
Container_t::iterator it = columns.insert(index_by_position[position], elem);
Container_t::iterator it = data.insert(index_by_position[position], elem);
rebuildIndexByPosition();
index_by_name[elem.name] = it;
}
@ -25,34 +28,63 @@ void Block::erase(size_t position)
{
Container_t::iterator it = index_by_position[position];
index_by_name.erase(index_by_name.find(it->name));
columns.erase(it);
data.erase(it);
rebuildIndexByPosition();
}
Block::ColumnWithNameAndType & Block::getByPosition(size_t position)
ColumnWithNameAndType & Block::getByPosition(size_t position)
{
return *index_by_position[position];
}
const Block::ColumnWithNameAndType & Block::getByPosition(size_t position) const
const ColumnWithNameAndType & Block::getByPosition(size_t position) const
{
return *index_by_position[position];
}
Block::ColumnWithNameAndType & Block::getByName(const std::string & name)
ColumnWithNameAndType & Block::getByName(const std::string & name)
{
return *index_by_name[name];
}
const Block::ColumnWithNameAndType & Block::getByName(const std::string & name) const
const ColumnWithNameAndType & Block::getByName(const std::string & name) const
{
return *index_by_name[name];
IndexByName_t::const_iterator it = index_by_name.find(name);
if (index_by_name.end() == it)
throw Exception("Not found column " + name + " in block.", ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
return *it->second;
}
size_t Block::rows() const
{
size_t res = 0;
ColumnVisitorSize visitor;
for (Container_t::const_iterator it = data.begin(); it != data.end(); ++it)
{
size_t size = boost::apply_visitor(visitor, *it->column);
if (size == 0)
throw Exception("Empty column in block.", ErrorCodes::EMPTY_COLUMN_IN_BLOCK);
if (size != 1 && res != 0 && size != res)
throw Exception("Sizes of columns doesn't match.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
res = size;
}
return res;
}
size_t Block::columns() const
{
return data.size();
}
}
#endif

View File

@ -0,0 +1,24 @@
#include <DB/DataStreams/IRowOutputStream.h>
namespace DB
{
void IRowOutputStream::write(const Row & row)
{
writeRowStartDelimiter();
if (!row.empty())
{
writeField(row[0]);
for (size_t i = 1; i < row.size(); ++i)
{
writeFieldDelimiter();
writeField(row[i]);
}
}
writeRowEndDelimiter();
}
}

View File

@ -0,0 +1,51 @@
#include <DB/DataStreams/RowInputStreamFromBlockInputStream.h>
namespace DB
{
using Poco::SharedPtr;
RowInputStreamFromBlockInputStream::RowInputStreamFromBlockInputStream(IBlockInputStream & block_input_)
: block_input(block_input_), pos(0), current_rows(0)
{
}
class TestVisitor : public boost::static_visitor<UInt64>
{
public:
template <typename T> UInt64 operator() (const T & x) const
{
return 0;
}
};
Row RowInputStreamFromBlockInputStream::read()
{
/* if (pos >= current_rows)
{
current_block = block_input.read();
current_rows = current_block.rows();
pos = 0;
}
ColumnVisitorNthElement visitor(pos);
size_t columns = current_block.columns();
Row row(columns);
for (size_t i = 0; i < columns; ++i)
row[i] = boost::apply_visitor(visitor, *current_block.getByPosition(i).column);
return row;
*/
Column column = UInt64Column(0);
Field field = boost::apply_visitor(TestVisitor(), column);
Row row;
return row;
}
}

View File

@ -0,0 +1,35 @@
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
namespace DB
{
using Poco::SharedPtr;
TabSeparatedRowOutputStream::TabSeparatedRowOutputStream(std::ostream & ostr_, SharedPtr<ColumnTypes> column_types_)
: ostr(ostr_), column_types(column_types_), field_number(0)
{
}
void TabSeparatedRowOutputStream::writeField(const Field & field)
{
column_types->at(field_number)->serializeTextEscaped(field, ostr);
++field_number;
}
void TabSeparatedRowOutputStream::writeFieldDelimiter()
{
ostr.put('\t');
}
void TabSeparatedRowOutputStream::writeRowEndDelimiter()
{
ostr.put('\n');
field_number = 0;
}
}

View File

@ -0,0 +1,34 @@
#include <DB/DataStreams/RowInputStreamFromBlockInputStream.h>
#include <DB/DataStreams/copyData.h>
namespace DB
{
void copyData(IBlockInputStream & from, IBlockOutputStream & to)
{
while (Block block = from.read())
to.write(block);
}
void copyData(IRowInputStream & from, IRowOutputStream & to)
{
while (1)
{
Row row = from.read();
if (row.empty())
break;
to.write(row);
}
}
void copyData(IBlockInputStream & from, IRowOutputStream & to)
{
RowInputStreamFromBlockInputStream row_input(from);
copyData(row_input, to);
}
}

View File

@ -0,0 +1,51 @@
#include <Poco/SharedPtr.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/ColumnTypes/ColumnTypesNumberFixed.h>
#include <DB/Storages/StorageSystemNumbers.h>
namespace DB
{
using Poco::SharedPtr;
NumbersBlockInputStream::NumbersBlockInputStream(size_t block_size_) : block_size(block_size_), next(0)
{
}
Block NumbersBlockInputStream::read()
{
Block res;
res.insert(0, ColumnWithNameAndType());
ColumnWithNameAndType & column_with_name_and_type = res.getByPosition(0);
column_with_name_and_type.name = "number";
column_with_name_and_type.type = new ColumnTypeUInt64();
column_with_name_and_type.column = new Column;
*column_with_name_and_type.column = UInt64Column(block_size);
UInt64Column & vec = boost::get<UInt64Column>(*column_with_name_and_type.column);
for (size_t i = 0; i < block_size; ++i)
vec[i] = next++;
return res;
}
SharedPtr<IBlockInputStream> StorageSystemNumbers::read(
const ColumnNames & column_names, const ptree & query, size_t max_block_size)
{
if (column_names.size() != 1)
throw Exception("Incorrect number of columns.", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
if (column_names[0] != "number")
throw Exception("There is no column " + column_names[0] + " in table System.Numbers.",
ErrorCodes::THERE_IS_NO_COLUMN);
return new NumbersBlockInputStream(max_block_size);
}
}