dbms: development.

This commit is contained in:
Alexey Milovidov 2010-03-18 19:32:14 +00:00
parent 90a9820e79
commit 6ef3ffc267
14 changed files with 368 additions and 36 deletions

View File

@ -31,7 +31,7 @@ public:
Field operator[](size_t n) const
{
return data[n];
return typename NearestFieldType<T>::Type(data[n]);
}
void cut(size_t start, size_t length)

View File

@ -30,14 +30,9 @@ protected:
int writeToDevice(const char * buffer, std::streamsize length);
private:
size_t pos_in_buffer;
std::ostream * p_ostr;
std::vector<char> uncompressed_buffer;
std::vector<char> compressed_buffer;
std::vector<char> scratch;
/** Сжимает данные, находящиеся в буфере и записывает их. */
void writeCompressedChunk();
};

View File

@ -0,0 +1,16 @@
#ifndef DBMS_CORE_COLUMN_WITH_NAME_AND_TYPE_H
#define DBMS_CORE_COLUMN_WITH_NAME_AND_TYPE_H
#include <vector>
#include <DB/Core/ColumnWithNameAndType.h>
namespace DB
{
typedef std::vector<ColumnWithNameAndType> ColumnsWithNameAndType;
}
#endif

View File

@ -23,6 +23,8 @@ namespace ErrorCodes
PARAMETER_OUT_OF_BOUND,
SIZES_OF_COLUMNS_IN_TUPLE_DOESNT_MATCH,
EMPTY_TUPLE,
DUPLICATE_COLUMN,
NO_SUCH_COLUMN_IN_TABLE,
};
}

View File

@ -65,6 +65,22 @@ public:
};
template <typename T> struct NearestFieldType;
template <> struct NearestFieldType<UInt8> { typedef UInt64 Type; };
template <> struct NearestFieldType<UInt16> { typedef UInt64 Type; };
template <> struct NearestFieldType<UInt32> { typedef UInt64 Type; };
template <> struct NearestFieldType<UInt64> { typedef UInt64 Type; };
template <> struct NearestFieldType<Int8> { typedef Int64 Type; };
template <> struct NearestFieldType<Int16> { typedef Int64 Type; };
template <> struct NearestFieldType<Int32> { typedef Int64 Type; };
template <> struct NearestFieldType<Int64> { typedef Int64 Type; };
template <> struct NearestFieldType<Float32> { typedef Float64 Type; };
template <> struct NearestFieldType<Float64> { typedef Float64 Type; };
}
#endif

View File

@ -0,0 +1,21 @@
#ifndef DBMS_CORE_NAMES_AND_TYPES_H
#define DBMS_CORE_NAMES_AND_TYPES_H
#include <map>
#include <string>
#include <Poco/SharedPtr.h>
#include <DB/DataTypes/IDataType.h>
namespace DB
{
using Poco::SharedPtr;
typedef std::map<std::string, SharedPtr<IDataType> > NamesAndTypes;
}
#endif

View File

@ -3,6 +3,8 @@
#include <ostream>
#include <Poco/SharedPtr.h>
#include <DB/Core/Field.h>
#include <DB/Columns/IColumn.h>
@ -10,6 +12,7 @@
namespace DB
{
using Poco::SharedPtr;
/** Метаданные типа для хранения (столбца).
* Содержит методы для сериализации/десериализации.
@ -52,6 +55,9 @@ public:
virtual void serializeTextQuoted(const Field & field, std::ostream & ostr, bool compatible = false) const = 0;
virtual void deserializeTextQuoted(Field & field, std::istream & istr, bool compatible = false) const = 0;
/** Создать пустой столбец соответствующего типа.
*/
virtual SharedPtr<IColumn> createColumn() const = 0;
virtual ~IDataType() {}
};

View File

@ -17,14 +17,14 @@ class IDataTypeNumber : public IDataType
public:
void serializeText(const Field & field, std::ostream & ostr) const
{
ostr << boost::get<FieldType>(field);
ostr << boost::get<typename NearestFieldType<FieldType>::Type>(field);
}
void deserializeText(Field & field, std::istream & istr) const
{
FieldType x;
istr >> x;
field = x;
field = typename NearestFieldType<FieldType>::Type(x);
}
void serializeTextEscaped(const Field & field, std::ostream & ostr) const

View File

@ -36,7 +36,7 @@ public:
{
typename ColumnType::value_type x;
istr.read(reinterpret_cast<char *>(&x), sizeof(x));
field = x;
field = typename NearestFieldType<FieldType>::Type(x);
}
void serializeBinary(const IColumn & column, std::ostream & ostr) const
@ -52,6 +52,11 @@ public:
istr.read(reinterpret_cast<char*>(&x[0]), sizeof(typename ColumnType::value_type) * limit);
x.resize(istr.gcount());
}
SharedPtr<IColumn> createColumn() const
{
return new ColumnType;
}
};
}

View File

@ -8,6 +8,7 @@
#include <DB/Core/ColumnNames.h>
#include <DB/Core/Exception.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#define DEFAULT_BLOCK_SIZE 1048576
@ -46,6 +47,16 @@ public:
throw Exception("Method read() is not supported by storage " + getName());
}
/** Пишет данные в таблицу.
* Принимает описание запроса, в котором может содержаться информация о методе записи данных.
* Возвращает объект, с помощью которого можно последовательно писать данные.
*/
virtual SharedPtr<IBlockOutputStream> write(
const ptree & query)
{
throw Exception("Method write() is not supported by storage " + getName());
}
virtual ~IStorage() {}
};

View File

@ -0,0 +1,87 @@
#ifndef DBMS_STORAGES_STORAGE_LOG_H
#define DBMS_STORAGES_STORAGE_LOG_H
#include <map>
#include <Poco/SharedPtr.h>
#include <Poco/File.h>
#include <Poco/FileStream.h>
#include <DB/Core/NamesAndTypes.h>
#include <DB/Storages/IStorage.h>
namespace DB
{
using Poco::SharedPtr;
class StorageLog;
class LogBlockInputStream : public IBlockInputStream
{
public:
LogBlockInputStream(size_t block_size_, const ColumnNames & column_names_, StorageLog & storage_);
Block read();
private:
size_t block_size;
const ColumnNames & column_names;
StorageLog & storage;
typedef std::map<std::string, SharedPtr<Poco::FileInputStream> > FileStreams;
FileStreams streams;
};
class LogBlockOutputStream : public IBlockOutputStream
{
public:
LogBlockOutputStream(StorageLog & storage_);
void write(const Block & block);
private:
StorageLog & storage;
typedef std::map<std::string, SharedPtr<Poco::FileOutputStream> > FileStreams;
FileStreams streams;
};
/** Реализует хранилище, подходящее для логов.
* В нём не поддерживаются ключи; запись блокирует всю таблицу.
* Данные хранятся в сжатом виде.
*/
class StorageLog : public IStorage
{
friend class LogBlockInputStream;
friend class LogBlockOutputStream;
public:
/** Подцепить таблицу с соответствующим именем, по соответствующему пути (с / на конце),
* (корректность имён и путей не проверяется)
* состоящую из указанных столбцов; создать файлы, если их нет.
*/
StorageLog(const std::string & path_, const std::string & name_, SharedPtr<NamesAndTypes> columns_,
const std::string & extension_ = ".bin");
std::string getName() const { return "Log"; }
SharedPtr<IBlockInputStream> read(
const ColumnNames & column_names,
const ptree & query,
size_t max_block_size = DEFAULT_BLOCK_SIZE);
SharedPtr<IBlockOutputStream> write(
const ptree & query);
private:
const std::string path;
const std::string name;
SharedPtr<NamesAndTypes> columns;
const std::string extension;
typedef std::map<std::string, Poco::File> Files_t;
Files_t files;
};
}
#endif

View File

@ -8,10 +8,8 @@ namespace DB
CompressingStreamBuf::CompressingStreamBuf(std::ostream & ostr)
: Poco::BufferedStreamBuf(DBMS_STREAM_BUFFER_SIZE, std::ios::out),
pos_in_buffer(0),
: Poco::BufferedStreamBuf(DBMS_COMPRESSING_STREAM_BUFFER_SIZE, std::ios::out),
p_ostr(&ostr),
uncompressed_buffer(DBMS_COMPRESSING_STREAM_BUFFER_SIZE),
compressed_buffer(DBMS_COMPRESSING_STREAM_BUFFER_SIZE + QUICKLZ_ADDITIONAL_SPACE),
scratch(QLZ_SCRATCH_COMPRESS)
{
@ -40,10 +38,6 @@ void CompressingStreamBuf::writeCompressedChunk()
int CompressingStreamBuf::close()
{
sync();
if (pos_in_buffer != 0)
writeCompressedChunk();
return 0;
}
@ -53,27 +47,13 @@ int CompressingStreamBuf::writeToDevice(const char * buffer, std::streamsize len
if (length == 0 || !p_ostr)
return 0;
size_t bytes_processed = 0;
while (bytes_processed < static_cast<size_t>(length))
{
size_t bytes_to_copy = std::min(
uncompressed_buffer.size() - pos_in_buffer,
static_cast<size_t>(length) - bytes_processed);
memcpy(&uncompressed_buffer[pos_in_buffer], buffer + bytes_processed, bytes_to_copy);
pos_in_buffer += bytes_to_copy;
bytes_processed += bytes_to_copy;
if (pos_in_buffer == uncompressed_buffer.size())
writeCompressedChunk();
if (!p_ostr->good())
{
p_ostr = 0;
return bytes_processed;
}
}
size_t compressed_size = qlz_compress(
buffer,
&compressed_buffer[0],
length,
&scratch[0]);
p_ostr->write(&compressed_buffer[0], compressed_size);
return static_cast<int>(length);
}

View File

@ -0,0 +1,109 @@
#include <map>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Storages/StorageLog.h>
namespace DB
{
using Poco::SharedPtr;
LogBlockInputStream::LogBlockInputStream(size_t block_size_, const ColumnNames & column_names_, StorageLog & storage_)
: block_size(block_size_), column_names(column_names_), storage(storage_)
{
}
Block LogBlockInputStream::read()
{
Block res;
for (ColumnNames::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
{
if (storage.columns->end() == storage.columns->find(*it))
throw Exception("There is no column with name " + *it + " in table.",
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
streams.insert(std::make_pair(*it, new Poco::FileInputStream(storage.files[*it].path())));
}
for (ColumnNames::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
{
ColumnWithNameAndType column;
column.name = *it;
column.type = (*storage.columns)[*it];
column.column = column.type->createColumn();
column.type->deserializeBinary(*column.column, *streams[column.name], block_size);
res.insert(column);
}
return res;
}
LogBlockOutputStream::LogBlockOutputStream(StorageLog & storage_)
: storage(storage_)
{
}
void LogBlockOutputStream::write(const Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
{
const std::string & name = block.getByPosition(i).name;
if (storage.columns->end() == storage.columns->find(name))
throw Exception("There is no column with name " + name + " in table.",
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
streams.insert(std::make_pair(name, new Poco::FileOutputStream(storage.files[name].path())));
}
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithNameAndType & column = block.getByPosition(i);
column.type->serializeBinary(*column.column, *streams[column.name]);
}
}
StorageLog::StorageLog(const std::string & path_, const std::string & name_, SharedPtr<NamesAndTypes> columns_,
const std::string & extension_)
: path(path_), name(name_), columns(columns_), extension(extension_)
{
/// создаём файлы, если их нет
Poco::File dir(path + name + '/');
dir.createDirectories();
for (NamesAndTypes::const_iterator it = columns->begin(); it != columns->end(); ++it)
{
if (files.end() != files.find(it->first))
throw Exception("Duplicate column with name " + it->first + " in constructor of StorageLog.",
ErrorCodes::DUPLICATE_COLUMN);
files.insert(std::make_pair(it->first, Poco::File(path + name + '/' + it->first + extension)));
}
}
SharedPtr<IBlockInputStream> StorageLog::read(
const ColumnNames & column_names,
const ptree & query,
size_t max_block_size)
{
return new LogBlockInputStream(max_block_size, column_names, *this);
}
SharedPtr<IBlockOutputStream> StorageLog::write(
const ptree & query)
{
return new LogBlockOutputStream(*this);
}
}

View File

@ -0,0 +1,84 @@
#include <iostream>
#include <Poco/SharedPtr.h>
#include <DB/Storages/StorageLog.h>
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
#include <DB/DataStreams/LimitBlockInputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/Columns/ColumnsNumber.h>
using Poco::SharedPtr;
int main(int argc, char ** argv)
{
try
{
const size_t rows = 1000000;
/// создаём таблицу с парой столбцов
SharedPtr<DB::NamesAndTypes> names_and_types = new DB::NamesAndTypes;
(*names_and_types)["a"] = new DB::DataTypeUInt64;
(*names_and_types)["b"] = new DB::DataTypeUInt8;
DB::StorageLog table("./", "test", names_and_types, ".bin");
/// пишем в неё
{
DB::Block block;
DB::ColumnWithNameAndType column1;
column1.name = "a";
column1.type = (*names_and_types)["a"];
column1.column = column1.type->createColumn();
DB::ColumnUInt64::Container_t vec1 = dynamic_cast<DB::ColumnUInt64&>(*column1.column).getData();
for (size_t i = 0; i < rows; ++i)
vec1[i] = i;
block.insert(column1);
DB::ColumnWithNameAndType column2;
column2.name = "b";
column2.type = (*names_and_types)["b"];
column2.column = column2.type->createColumn();
DB::ColumnUInt8::Container_t vec2 = dynamic_cast<DB::ColumnUInt8&>(*column2.column).getData();
for (size_t i = 0; i < rows; ++i)
vec2[i] = i;
block.insert(column2);
SharedPtr<DB::IBlockOutputStream> out = table.write(0);
out->write(block);
}
/// читаем из неё
{
DB::ColumnNames column_names;
column_names.push_back("a");
column_names.push_back("b");
SharedPtr<DB::IBlockInputStream> in = table.read(column_names, 0);
Poco::SharedPtr<DB::DataTypes> data_types = new DB::DataTypes;
data_types->push_back(new DB::DataTypeUInt64);
data_types->push_back(new DB::DataTypeUInt8);
DB::LimitBlockInputStream in_limit(in, 10);
DB::TabSeparatedRowOutputStream output(std::cout, data_types);
DB::copyData(in_limit, output);
}
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.message() << std::endl;
return 1;
}
return 0;
}