dbms: development.

This commit is contained in:
Alexey Milovidov 2010-03-12 18:25:35 +00:00
parent c7a73d68ca
commit f25307c662
22 changed files with 430 additions and 161 deletions

View File

@ -0,0 +1,40 @@
#ifndef DBMS_CORE_COLUMN_CONST_H
#define DBMS_CORE_COLUMN_CONST_H
#include <Poco/SharedPtr.h>
#include <DB/Columns/IColumn.h>
namespace DB
{
using Poco::SharedPtr;
/** шаблон для столбцов-констант (столбцов одинаковых значений).
*/
template <typename T>
class ColumnConst : public IColumn
{
public:
ColumnConst(size_t s_, T & data_) : s(s_), data(data_) {}
size_t size() const { return s; }
Field operator[](size_t n) const { return data; }
void cut(size_t start, size_t length) { s = length; }
/** Более эффективные методы манипуляции */
T & getData() { return data; }
const T & getData() const { return data; }
/** Преобразование из константы в полноценный столбец */
virtual SharedPtr<IColumn> convertToFullColumn() const = 0;
private:
T data;
};
}
#endif

View File

@ -0,0 +1,17 @@
#ifndef DBMS_CORE_COLUMN_STRING_H
#define DBMS_CORE_COLUMN_STRING_H
#include <DB/Core/Types.h>
#include <DB/Columns/ColumnVector.h>
namespace DB
{
/** Столбец строк. */
typedef ColumnVector<String> ColumnString;
}
#endif

View File

@ -0,0 +1,93 @@
#ifndef DBMS_CORE_COLUMN_TUPLE_H
#define DBMS_CORE_COLUMN_TUPLE_H
#include <vector>
#include <Poco/SharedPtr.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Column/IColumn.h>
namespace DB
{
using Poco::SharedPtr;
/** Столбец со значениями-кортежами.
*/
class ColumnTuple
{
private:
typedef std::vector<SharedPtr<IColumn> > Container_t;
Container_t data;
/// убедиться в том, что размеры столбцов-элементов совпадают.
void checkSizes() const
{
if (data.empty())
throw Exception("Empty tuple", ErrorCodes::EMPTY_TUPLE);
size_t size = data[0]->size();
for (size_t i = 1; i < data.size(); ++i)
if (data[i]->size() != size)
throw Exception("Sizes of columns (elements of tuple column) doesn't match",
ErrorCodes::SIZES_OF_COLUMNS_IN_TUPLE_DOESNT_MATCH);
}
public:
ColumnTuple(const Container_t & data_)
: data(data_)
{
checkSizes();
}
size_t size() const
{
return data[0]->size();
}
Field operator[](size_t n) const
{
Array res = Array(data.size());
for (size_t i = 0; i < data.size(); ++i)
res[i] = (*data[i])[n];
return res;
}
void cut(size_t start, size_t length)
{
for (size_t i = 0; i < data.size(); ++i)
data[i]->cut(start, length);
}
/// манипуляция с Tuple
void insertColumn(size_t pos, SharedPtr<IColumn> & column)
{
if (pos > data.size())
throw Exception("Position out of bound in ColumnTuple::insertColumn().",
ErrorCodes::POSITION_OUT_OF_BOUND);
data.insert(data.begin() + pos, column);
checkSizes();
}
void eraseColumn(size_t pos)
{
if (data.size() == 1)
throw Exception("Empty tuple", ErrorCodes::EMPTY_TUPLE);
if (pos >= data.size())
throw Exception("Position out of bound in ColumnTuple::eraseColumn().",
ErrorCodes::POSITION_OUT_OF_BOUND);
data.erase(data.begin() + pos);
}
};
}
#endif

View File

@ -0,0 +1,17 @@
#ifndef DBMS_CORE_COLUMN_VARIANT_H
#define DBMS_CORE_COLUMN_VARIANT_H
#include <DB/Core/Field.h>
#include <DB/Columns/ColumnVector.h>
namespace DB
{
/** Столбец значений произвольного типа. */
typedef ColumnVector<Field> ColumnVariant;
}
#endif

View File

@ -0,0 +1,71 @@
#ifndef DBMS_CORE_COLUMN_VECTOR_H
#define DBMS_CORE_COLUMN_VECTOR_H
#include <string.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Columns/IColumn.h>
namespace DB
{
/** Шаблон столбцов, которые используют для хранения std::vector.
*/
template <typename T>
class ColumnVector : public IColumn
{
public:
typedef T value_type;
typedef std::vector<value_type> Container_t;
ColumnVector() {}
ColumnVector(size_t n) : data(n) {}
size_t size() const
{
return data.size();
}
Field operator[](size_t n) const
{
return data[n];
}
void cut(size_t start, size_t length)
{
if (start + length > data.size())
throw Exception("Parameter out of bound in IColumnVector<T>::cut() method.",
ErrorCodes::PARAMETER_OUT_OF_BOUND);
if (start == 0)
data.resize(length);
else
{
Container_t tmp(length);
memcpy(&tmp[0], &data[start], length * sizeof(data[0]));
tmp.swap(data);
}
}
/** Более эффективные методы манипуляции */
Container_t & getData()
{
return data;
}
const Container_t & getData() const
{
return data;
}
private:
Container_t data;
};
}
#endif

View File

@ -0,0 +1,28 @@
#ifndef DBMS_CORE_COLUMNS_NUMBER_H
#define DBMS_CORE_COLUMNS_NUMBER_H
#include <DB/Core/Types.h>
#include <DB/Columns/ColumnVector.h>
namespace DB
{
/** Столбцы чисел. */
typedef ColumnVector<UInt8> ColumnUInt8;
typedef ColumnVector<UInt16> ColumnUInt16;
typedef ColumnVector<UInt32> ColumnUInt32;
typedef ColumnVector<UInt64> ColumnUInt64;
typedef ColumnVector<Int8> ColumnInt8;
typedef ColumnVector<Int16> ColumnInt16;
typedef ColumnVector<Int32> ColumnInt32;
typedef ColumnVector<Int64> ColumnInt64;
typedef ColumnVector<Float32> ColumnFloat32;
typedef ColumnVector<Float64> ColumnFloat64;
}
#endif

View File

@ -0,0 +1,29 @@
#ifndef DBMS_CORE_ICOLUMN_H
#define DBMS_CORE_ICOLUMN_H
#include <DB/Core/Field.h>
namespace DB
{
/** Интерфейс для хранения столбцов значений в оперативке.
*/
class IColumn
{
public:
/** Количество значений в столбце */
virtual size_t size() const = 0;
/** Получить значение n-го элемента */
virtual Field operator[](size_t n) const = 0;
/** Удалить всё кроме диапазона элементов */
virtual void cut(size_t start, size_t length) = 0;
virtual ~IColumn() {}
};
}
#endif

View File

@ -1,51 +0,0 @@
#ifndef DBMS_CORE_COLUMN_H
#define DBMS_CORE_COLUMN_H
#include <vector>
#include <boost/variant.hpp>
#include <boost/variant/recursive_variant.hpp>
#include <DB/Core/Types.h>
#include <DB/Core/Field.h>
namespace DB
{
/** Типы данных для представления столбцов значений в оперативке.
* Столбец значений может быть представлен массивом значений или единичным значением, если столбец - константа.
*/
typedef std::vector<UInt8> UInt8Column;
typedef std::vector<UInt16> UInt16Column;
typedef std::vector<UInt32> UInt32Column;
typedef std::vector<UInt64> UInt64Column;
typedef std::vector<Int8> Int8Column;
typedef std::vector<Int16> Int16Column;
typedef std::vector<Int32> Int32Column;
typedef std::vector<Int64> Int64Column;
typedef std::vector<Float32> Float32Column;
typedef std::vector<Float64> Float64Column;
typedef std::vector<String> StringColumn;
typedef std::vector<Field> VariantColumn; /// Столбец произвольных значений, а также nullable значений
typedef boost::make_recursive_variant<
UInt8Column, UInt16Column, UInt32Column, UInt64Column,
Int8Column, Int16Column, Int32Column, Int64Column,
Float32Column, Float64Column,
StringColumn,
VariantColumn, /// Variant, Nullable
std::vector<boost::recursive_variant_> /// Tuple, Array
>::type Column;
typedef std::vector<Column> TupleColumn; /// Столбец значений типа "кортеж" - несколько столбцов произвольного типа
typedef std::vector<Column> ArrayColumn; /// Столбец значений типа "массив" - столбец, значения в котором - массивы
}
#endif

View File

@ -1,33 +0,0 @@
#ifndef DBMS_CORE_COLUMN_TYPE_TO_FIELD_TYPE_H
#define DBMS_CORE_COLUMN_TYPE_TO_FIELD_TYPE_H
#include <DB/Core/Field.h>
#include <DB/Core/Column.h>
namespace DB
{
/** Переводит типы, использующиеся в Column в типы, использующиеся в Field
*/
template <typename T> struct ColumnTypeToFieldType;
template <> struct ColumnTypeToFieldType<UInt8Column> { typedef UInt64 Type; };
template <> struct ColumnTypeToFieldType<UInt16Column> { typedef UInt64 Type; };
template <> struct ColumnTypeToFieldType<UInt32Column> { typedef UInt64 Type; };
template <> struct ColumnTypeToFieldType<UInt64Column> { typedef UInt64 Type; };
template <> struct ColumnTypeToFieldType<Int8Column> { typedef Int64 Type; };
template <> struct ColumnTypeToFieldType<Int16Column> { typedef Int64 Type; };
template <> struct ColumnTypeToFieldType<Int32Column> { typedef Int64 Type; };
template <> struct ColumnTypeToFieldType<Int64Column> { typedef Int64 Type; };
template <> struct ColumnTypeToFieldType<Float32Column> { typedef Float64 Type; };
template <> struct ColumnTypeToFieldType<Float64Column> { typedef Float64 Type; };
template <> struct ColumnTypeToFieldType<StringColumn> { typedef String Type; };
template <> struct ColumnTypeToFieldType<VariantColumn> { typedef Field Type; };
}
#endif

View File

@ -1,53 +0,0 @@
#ifndef DBMS_CORE_COLUMN_VISITORS_H
#define DBMS_CORE_COLUMN_VISITORS_H
#include <boost/variant/static_visitor.hpp>
#include <DB/Core/Field.h>
#include <DB/Core/Column.h>
#include <DB/Core/ColumnTypeToFieldType.h>
namespace DB
{
/** Возвращает количество значений в столбце
* TODO: поправить для tuple.
*/
class ColumnVisitorSize : public boost::static_visitor<size_t>
{
public:
template <typename T> size_t operator() (const T & x) const { return x.size(); }
};
/** Возвращает n-ый элемент столбца.
* TODO: поправить для tuple.
*/
class ColumnVisitorNthElement : public boost::static_visitor<Field>
{
public:
ColumnVisitorNthElement(size_t n_) : n(n_) {}
template <typename T> Field operator() (const T & x) const
{
return typename ColumnTypeToFieldType<T>::Type(
x.size() == 1
? x[0] /// столбец - константа
: x[n]);
}
Field operator() (const TupleColumn & x) const
{
return UInt64(0); /// заглушка
}
private:
size_t n;
};
}
#endif

View File

@ -3,7 +3,7 @@
#include <Poco/SharedPtr.h>
#include <DB/Core/Column.h>
#include <DB/Columns/IColumn.h>
#include <DB/DataTypes/IDataType.h>
@ -17,7 +17,7 @@ using Poco::SharedPtr;
struct ColumnWithNameAndType
{
SharedPtr<Column> column;
SharedPtr<IColumn> column;
SharedPtr<IDataType> type;
String name;
};

View File

@ -20,6 +20,9 @@ namespace ErrorCodes
EMPTY_COLUMN_IN_BLOCK,
NOT_FOUND_COLUMN_IN_BLOCK,
POSITION_OUT_OF_BOUND,
PARAMETER_OUT_OF_BOUND,
SIZES_OF_COLUMNS_IN_TUPLE_DOESNT_MATCH,
EMPTY_TUPLE,
};
}

View File

@ -0,0 +1,32 @@
#ifndef DBMS_DATA_STREAMS_LIMITBLOCKINPUTSTREAM_H
#define DBMS_DATA_STREAMS_LIMITBLOCKINPUTSTREAM_H
#include <Poco/SharedPtr.h>
#include <DB/DataStreams/IBlockInputStream.h>
namespace DB
{
using Poco::SharedPtr;
/** Реализует реляционную операцию LIMIT.
*/
class LimitBlockInputStream : public IBlockInputStream
{
public:
LimitBlockInputStream(SharedPtr<IBlockInputStream> input_, size_t limit_, size_t offset_ = 0);
Block read();
private:
SharedPtr<IBlockInputStream> input;
size_t limit;
size_t offset;
size_t pos;
};
}
#endif

View File

@ -1,7 +1,7 @@
#ifndef DBMS_DATA_TYPES_NUMBER_FIXED_H
#define DBMS_DATA_TYPES_NUMBER_FIXED_H
#include <DB/Core/Column.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/DataTypes/IDataTypeNumberFixed.h>
@ -11,7 +11,7 @@ namespace DB
/** Типы столбцов для чисел фиксированной ширины. */
#define DEFINE_DATA_TYPE_NUMBER_FIXED(TYPE) \
class DataType ## TYPE : public IDataTypeNumberFixed<TYPE, TYPE ## Column> \
class DataType ## TYPE : public IDataTypeNumberFixed<TYPE, Column ## TYPE> \
{ \
public: \
std::string getName() const { return #TYPE; } \

View File

@ -4,7 +4,7 @@
#include <ostream>
#include <DB/Core/Field.h>
#include <DB/Core/Column.h>
#include <DB/Columns/IColumn.h>
namespace DB
@ -29,9 +29,9 @@ public:
*/
virtual void serializeBinary(const Field & field, std::ostream & ostr) const = 0;
virtual void deserializeBinary(Field & field, std::istream & istr) const = 0;
virtual void serializeBinary(const Column & column, std::ostream & ostr) const = 0;
virtual void serializeBinary(const IColumn & column, std::ostream & ostr) const = 0;
/** Считать не более limit значений. */
virtual void deserializeBinary(Column & column, std::istream & istr, size_t limit) const = 0;
virtual void deserializeBinary(IColumn & column, std::istream & istr, size_t limit) const = 0;
/** Текстовая сериализация - для вывода на экран / сохранения в текстовый файл и т. п.
* Без эскейпинга и квотирования.

View File

@ -16,7 +16,7 @@ namespace DB
* Остаётся лишь чисто виртуальный метод getName().
*
* Параметры: FieldType - тип единичного значения, ColumnType - тип столбца со значениями.
* (см. Field.h, Column.h)
* (см. Core/Field.h, Columns/IColumn.h)
*/
template <typename FieldType, typename ColumnType>
class IDataTypeNumberFixed : public IDataTypeNumber<FieldType>
@ -39,15 +39,15 @@ public:
field = x;
}
void serializeBinary(const Column & column, std::ostream & ostr) const
void serializeBinary(const IColumn & column, std::ostream & ostr) const
{
const ColumnType & x = boost::get<ColumnType>(column);
const typename ColumnType::Container_t & x = dynamic_cast<const ColumnType &>(column).getData();
ostr.write(reinterpret_cast<const char *>(&x[0]), sizeof(typename ColumnType::value_type) * x.size());
}
void deserializeBinary(Column & column, std::istream & istr, size_t limit) const
void deserializeBinary(IColumn & column, std::istream & istr, size_t limit) const
{
ColumnType & x = boost::get<ColumnType>(column);
typename ColumnType::Container_t & x = dynamic_cast<ColumnType &>(column).getData();
x.resize(limit);
istr.read(reinterpret_cast<char*>(&x[0]), sizeof(typename ColumnType::value_type) * limit);
x.resize(istr.gcount());

View File

@ -1,6 +1,5 @@
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Core/ColumnVisitors.h>
#include <DB/Core/Block.h>
@ -79,10 +78,9 @@ const ColumnWithNameAndType & Block::getByName(const std::string & name) const
size_t Block::rows() const
{
size_t res = 0;
ColumnVisitorSize visitor;
for (Container_t::const_iterator it = data.begin(); it != data.end(); ++it)
{
size_t size = boost::apply_visitor(visitor, *it->column);
size_t size = it->column->size();
if (size == 0)
throw Exception("Empty column in block.", ErrorCodes::EMPTY_COLUMN_IN_BLOCK);

View File

@ -0,0 +1,46 @@
#include <algorithm>
#include <DB/DataStreams/LimitBlockInputStream.h>
namespace DB
{
using Poco::SharedPtr;
LimitBlockInputStream::LimitBlockInputStream(SharedPtr<IBlockInputStream> input_, size_t limit_, size_t offset_)
: input(input_), limit(limit_), offset(offset_), pos(0)
{
}
Block LimitBlockInputStream::read()
{
Block res;
size_t rows = 0;
if (pos >= offset + limit)
return res;
while (pos + rows <= offset)
{
res = input->read();
rows = res.rows();
pos += rows;
}
if (pos >= offset && pos + rows <= offset + limit)
{
pos += rows;
return res;
}
/// блок, от которого надо выбрать кусок
for (size_t i = 0; i < res.columns(); ++i)
res.getByPosition(i).column->cut(std::max(0, static_cast<int>(offset) - static_cast<int>(pos)), limit);
pos += rows;
return res;
}
}

View File

@ -1,5 +1,3 @@
#include <DB/Core/ColumnVisitors.h>
#include <DB/DataStreams/RowInputStreamFromBlockInputStream.h>
namespace DB
@ -23,12 +21,11 @@ Row RowInputStreamFromBlockInputStream::read()
pos = 0;
}
ColumnVisitorNthElement visitor(pos);
size_t columns = current_block.columns();
Row row(columns);
for (size_t i = 0; i < columns; ++i)
row[i] = boost::apply_visitor(visitor, *current_block.getByPosition(i).column);
row[i] = (*current_block.getByPosition(i).column)[pos];
++pos;
return row;

View File

@ -0,0 +1,33 @@
#include <iostream>
#include <fstream>
#include <Poco/Stopwatch.h>
#include <Poco/SharedPtr.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
int main(int argc, char ** argv)
{
Poco::SharedPtr<DB::ColumnUInt64> column = new DB::ColumnUInt64();
DB::ColumnUInt64::Container_t & vec = column->getData();
DB::DataTypeUInt64 data_type;
Poco::Stopwatch stopwatch;
size_t n = 10000000;
vec.resize(n);
for (size_t i = 0; i < n; ++i)
vec[i] = i;
std::ofstream ostr("/dev/null");
stopwatch.restart();
data_type.serializeBinary(*column, ostr);
stopwatch.stop();
std::cout << "Elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;
return 0;
}

View File

@ -2,6 +2,7 @@
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/Storages/StorageSystemNumbers.h>
@ -24,9 +25,9 @@ Block NumbersBlockInputStream::read()
ColumnWithNameAndType & column_with_name_and_type = res.getByPosition(0);
column_with_name_and_type.name = "number";
column_with_name_and_type.type = new DataTypeUInt64();
column_with_name_and_type.column = new Column;
*column_with_name_and_type.column = UInt64Column(block_size);
UInt64Column & vec = boost::get<UInt64Column>(*column_with_name_and_type.column);
ColumnUInt64 * column = new ColumnUInt64(block_size);
ColumnUInt64::Container_t & vec = column->getData();
column_with_name_and_type.column = column;
for (size_t i = 0; i < block_size; ++i)
vec[i] = next++;

View File

@ -3,6 +3,7 @@
#include <Poco/SharedPtr.h>
#include <DB/Storages/StorageSystemNumbers.h>
#include <DB/DataStreams/LimitBlockInputStream.h>
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
@ -22,10 +23,10 @@ int main(int argc, char ** argv)
Poco::SharedPtr<DB::DataTypes> column_types = new DB::DataTypes;
column_types->push_back(new DB::DataTypeUInt64);
SharedPtr<DB::IBlockInputStream> input = table.read(column_names, 0);
DB::LimitBlockInputStream input(table.read(column_names, 0, 10), 100, 100);
DB::TabSeparatedRowOutputStream output(std::cout, column_types);
DB::copyData(*input, output);
DB::copyData(input, output);
}
catch (const DB::Exception & e)
{