dbms: development.

This commit is contained in:
Alexey Milovidov 2010-05-21 19:52:50 +00:00
parent b2db328d0a
commit ec705eddef
22 changed files with 270 additions and 8 deletions

View File

@ -33,6 +33,11 @@ public:
data->clear();
}
SharedPtr<IColumn> cloneEmpty() const
{
return new ColumnArray(data->cloneEmpty());
}
size_t size() const
{
return offsets.size();

View File

@ -21,6 +21,7 @@ class ColumnConst : public IColumn
public:
ColumnConst(size_t s_, T & data_) : s(s_), data(data_) {}
SharedPtr<IColumn> cloneEmpty() const { return new ColumnConst(0, data); }
size_t size() const { return s; }
Field operator[](size_t n) const { return data; }
void cut(size_t start, size_t length) { s = length; }

View File

@ -20,12 +20,17 @@ class ColumnFixedArray : public IColumn
{
public:
/** Создать пустой столбец массивов фиксированного размера n, со типом значений, как в столбце nested_column */
ColumnVector(SharedPtr<IColumn> nested_column, size_t n_)
ColumnFixedArray(SharedPtr<IColumn> nested_column, size_t n_)
: data(nested_column), n(n_)
{
data.clear();
}
SharedPtr<IColumn> cloneEmpty() const
{
return new ColumnFixedArray(data->cloneEmpty(), n);
}
size_t size() const
{
return data->size() / n;

View File

@ -25,12 +25,17 @@ public:
typedef std::vector<Flag_t> Nulls_t;
/** Создать пустой столбец, с типом значений, как в столбце nested_column */
ColumnVector(SharedPtr<IColumn> nested_column)
ColumnNullable(SharedPtr<IColumn> nested_column)
: data(nested_column)
{
data.clear();
}
SharedPtr<IColumn> cloneEmpty() const
{
return new ColumnNullable(data->cloneEmpty());
}
size_t size() const
{
return data->size();

View File

@ -26,6 +26,11 @@ public:
{
}
SharedPtr<IColumn> cloneEmpty() const
{
return new ColumnString;
}
Field operator[](size_t n) const
{
size_t offset = n == 0 ? 0 : offsets[n - 1];

View File

@ -43,6 +43,15 @@ public:
checkSizes();
}
SharedPtr<IColumn> cloneEmpty() const
{
Container_t new_data(data.size());
for (size_t i = 0; i < data.size(); ++i)
new_data[i] = data[i]->cloneEmpty();
return new ColumnTuple(new_data);
}
size_t size() const
{
return data[0]->size();

View File

@ -24,6 +24,11 @@ public:
ColumnVector() {}
ColumnVector(size_t n) : data(n) {}
SharedPtr<IColumn> cloneEmpty() const
{
return new ColumnVector<T>;
}
size_t size() const
{
return data.size();

View File

@ -1,16 +1,23 @@
#ifndef DBMS_CORE_ICOLUMN_H
#define DBMS_CORE_ICOLUMN_H
#include <Poco/SharedPtr.h>
#include <DB/Core/Field.h>
namespace DB
{
using Poco::SharedPtr;
/** Интерфейс для хранения столбцов значений в оперативке.
*/
class IColumn
{
public:
/** Создать пустой столбец такого же типа */
virtual SharedPtr<IColumn> cloneEmpty() const = 0;
/** Количество значений в столбце. */
virtual size_t size() const = 0;

View File

@ -0,0 +1,6 @@
#ifndef DBMS_CORE_DEFINES_H
#define DBMS_CORE_DEFINES_H
#define DEFAULT_BLOCK_SIZE 1048576
#endif

View File

@ -28,6 +28,9 @@ namespace ErrorCodes
DELIMITER_IN_STRING_LITERAL_DOESNT_MATCH,
CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN,
SIZE_OF_ARRAY_DOESNT_MATCH_SIZE_OF_FIXEDARRAY_COLUMN,
NUMBER_OF_COLUMNS_DOESNT_MATCH,
CANNOT_READ_ALL_DATA_FROM_TAB_SEPARATED_INPUT,
CANNOT_PARSE_ALL_VALUE_FROM_TAB_SEPARATED_INPUT,
};
}

View File

@ -0,0 +1,40 @@
#ifndef DBMS_DATA_STREAMS_BLOCKINPUTSTREAMFROMROWINPUTSTREAM_H
#define DBMS_DATA_STREAMS_BLOCKINPUTSTREAMFROMROWINPUTSTREAM_H
#include <Poco/SharedPtr.h>
#include <DB/Core/Defines.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IRowInputStream.h>
namespace DB
{
using Poco::SharedPtr;
/** Преобразует поток для чтения данных по строкам в поток для чтения данных по блокам.
* Наример, для чтения текстового дампа.
*/
class BlockInputStreamFromRowInputStream : public IBlockInputStream
{
public:
/** sample_ - пустой блок, который описывает, как интерпретировать значения */
BlockInputStreamFromRowInputStream(
IRowInputStream & row_input_,
const Block & sample_,
size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
Block read();
private:
IRowInputStream & row_input;
const Block & sample;
size_t max_block_size;
};
}
#endif

View File

@ -21,6 +21,7 @@ public:
DataTypeDate() : date_lut(DateLUTSingleton::instance()) {}
std::string getName() const { return "Date"; }
SharedPtr<IDataType> clone() const { return new DataTypeDate; }
void serializeText(const Field & field, std::ostream & ostr) const
{

View File

@ -22,6 +22,11 @@ public:
return "String";
}
SharedPtr<IDataType> clone() const
{
return new DataTypeString;
}
void serializeBinary(const Field & field, std::ostream & ostr) const;
void deserializeBinary(Field & field, std::istream & istr) const;
void serializeBinary(const IColumn & column, std::ostream & ostr) const;

View File

@ -15,6 +15,7 @@ namespace DB
{ \
public: \
std::string getName() const { return #TYPE; } \
SharedPtr<IDataType> clone() const { return new DataType ## TYPE; } \
};
DEFINE_DATA_TYPE_NUMBER_FIXED(UInt8);

View File

@ -14,12 +14,14 @@ class DataTypeVarUInt : public IDataTypeNumberVariable<UInt64, ColumnUInt64>
{
public:
std::string getName() const { return "VarUInt"; }
SharedPtr<IDataType> clone() const { return new DataTypeVarUInt; }
};
class DataTypeVarInt : public IDataTypeNumberVariable<Int64, ColumnInt64>
{
public:
std::string getName() const { return "VarInt"; }
SharedPtr<IDataType> clone() const { return new DataTypeVarInt; }
};
}

View File

@ -23,6 +23,9 @@ public:
/// Основное имя типа (например, BIGINT UNSIGNED).
virtual std::string getName() const = 0;
/// Клонировать
virtual SharedPtr<IDataType> clone() const = 0;
/** Предполагается, что проверка статуса stream-ов производится вызывающей стороной.
*/

View File

@ -12,7 +12,7 @@ namespace DB
/** Реализует часть интерфейса IDataType, общую для всяких чисел фиксированной ширины
* - ввод и вывод в текстовом и бинарном виде.
* Остаётся лишь чисто виртуальный метод getName().
* Остаётся лишь чисто виртуальный метод getName() и clone().
*
* Параметры: FieldType - тип единичного значения, ColumnType - тип столбца со значениями.
* (см. Core/Field.h, Columns/IColumn.h)

View File

@ -5,13 +5,12 @@
#include <Poco/SharedPtr.h>
#include <DB/Core/Defines.h>
#include <DB/Core/ColumnNames.h>
#include <DB/Core/Exception.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#define DEFAULT_BLOCK_SIZE 1048576
namespace DB
{

View File

@ -0,0 +1,55 @@
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/DataStreams/BlockInputStreamFromRowInputStream.h>
namespace DB
{
using Poco::SharedPtr;
BlockInputStreamFromRowInputStream::BlockInputStreamFromRowInputStream(
IRowInputStream & row_input_,
const Block & sample_,
size_t max_block_size_)
: row_input(row_input_), sample(sample_), max_block_size(max_block_size_)
{
}
Block BlockInputStreamFromRowInputStream::read()
{
Block res;
for (size_t i = 0; i < sample.columns(); ++i)
{
const ColumnWithNameAndType & sample_elem = sample.getByPosition(i);
ColumnWithNameAndType res_elem;
res_elem.column = sample_elem.column->cloneEmpty();
res_elem.type = sample_elem.type->clone();
res_elem.name = sample_elem.name;
res.insert(res_elem);
}
for (size_t rows = 0; rows < max_block_size; ++rows)
{
Row row = row_input.read();
if (row.empty())
return sample;
if (row.size() != sample.columns())
throw Exception("Number of columns doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
for (size_t i = 0; i < row.size(); ++i)
res.getByPosition(i).column->insert(row[i]);
}
return res;
}
}

View File

@ -0,0 +1,60 @@
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/DataStreams/TabSeparatedRowInputStream.h>
#include <iostream>
namespace DB
{
using Poco::SharedPtr;
TabSeparatedRowInputStream::TabSeparatedRowInputStream(std::istream & istr_, SharedPtr<DataTypes> data_types_)
: istr(istr_), data_types(data_types_)
{
}
Row TabSeparatedRowInputStream::read()
{
Row res;
size_t size = data_types->size();
res.resize(size);
for (size_t i = 0; i < size; ++i)
{
(*data_types)[i]->deserializeTextEscaped(res[i], istr);
if (i == 0 && istr.eof())
{
res.clear();
return res;
}
else if (!istr.good())
throw Exception("Cannot read all data from tab separated input",
ErrorCodes::CANNOT_READ_ALL_DATA_FROM_TAB_SEPARATED_INPUT);
/// пропускаем разделители
if (i + 1 == size)
{
if (istr.peek() == '\n')
istr.ignore();
else if (!istr.eof())
throw Exception("Cannot parse all value in tab separated input",
ErrorCodes::CANNOT_PARSE_ALL_VALUE_FROM_TAB_SEPARATED_INPUT);
}
else
{
if (istr.peek() == '\t')
istr.ignore();
else
throw Exception("Cannot parse all value in tab separated input",
ErrorCodes::CANNOT_PARSE_ALL_VALUE_FROM_TAB_SEPARATED_INPUT);
}
}
return res;
}
}

View File

@ -0,0 +1,39 @@
#include <string>
#include <iostream>
#include <fstream>
#include <Poco/Stopwatch.h>
#include <Poco/SharedPtr.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataStreams/TabSeparatedRowInputStream.h>
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
#include <DB/DataStreams/copyData.h>
int main(int argc, char ** argv)
{
try
{
Poco::SharedPtr<DB::DataTypes> data_types = new DB::DataTypes;
data_types->push_back(new DB::DataTypeUInt64);
data_types->push_back(new DB::DataTypeString);
std::ifstream istr("test_in");
std::ofstream ostr("test_out");
DB::TabSeparatedRowInputStream row_input(istr, data_types);
DB::TabSeparatedRowOutputStream row_output(ostr, data_types);
DB::copyData(row_input, row_output);
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.message() << std::endl;
return 1;
}
return 0;
}

View File

@ -101,7 +101,9 @@ void DataTypeString::serializeText(const Field & field, std::ostream & ostr) con
void DataTypeString::deserializeText(Field & field, std::istream & istr) const
{
istr >> boost::get<String &>(field);
String s;
istr >> s;
field = s;
}
@ -113,7 +115,9 @@ void DataTypeString::serializeTextEscaped(const Field & field, std::ostream & os
void DataTypeString::deserializeTextEscaped(Field & field, std::istream & istr) const
{
istr >> strconvert::unescape_file >> boost::get<String &>(field);
String s;
istr >> strconvert::unescape_file >> s;
field = s;
}
@ -125,7 +129,9 @@ void DataTypeString::serializeTextQuoted(const Field & field, std::ostream & ost
void DataTypeString::deserializeTextQuoted(Field & field, std::istream & istr, bool compatible) const
{
istr >> strconvert::unquote_fast >> boost::get<String &>(field);
String s;
istr >> strconvert::unquote_fast >> s;
field = s;
}