DB: development.

This commit is contained in:
Alexey Milovidov 2010-03-01 16:59:51 +00:00
parent 301e578860
commit d1016346bf
29 changed files with 1536 additions and 0 deletions

View File

@ -0,0 +1,61 @@
#ifndef DBMS_COLUMN_TYPE_UINT64_H
#define DBMS_COLUMN_TYPE_UINT64_H
#include <DB/ColumnTypes/IColumnType.h>
namespace DB
{
/** Аналог BIGINT UNSIGNED, сериализуется в набор байт фиксированной длины */
class ColumnTypeUInt64 : public IColumnType
{
public:
std::string getName() const { return "UInt64"; }
void serializeBinary(const Field & field, std::ostream & ostr) const
{
Poco::BinaryWriter w(ostr);
w << boost::get<UInt64>(field);
}
void deserializeBinary(Field & field, std::istream & istr) const
{
Poco::BinaryReader r(istr);
UInt64 x;
r >> x;
field = x;
}
void serializeBinary(const Column & column, std::ostream & ostr) const
{
Poco::BinaryWriter w(ostr);
const UInt64Column & column = boost::get<UInt64Column>(column);
for (size_t i = 0, size = column.size(); i < size; ++i)
w << column[i];
}
void deserializeBinary(Column & column, std::istream & istr) const
{
Poco::BinaryReader r(istr);
UInt64Column & column = boost::get<UInt64Column>(column);
for (size_t i = 0, size = column.size(); i < size; ++i)
r >> column[i];
}
void serializeText(const Field & field, std::ostream & ostr) const
{
ostr << boost::get<UInt64>(field);
}
void deserializeText(Field & field, std::istream & istr) const
{
UInt64 x;
istr >> x;
field = x;
}
};
}
#endif

View File

@ -0,0 +1,75 @@
#ifndef DBMS_COLUMN_TYPES_ICOLUMNTYPE_H
#ifndef DBMS_COLUMN_TYPES_ICOLUMNTYPE_H
#include <ostream>
#include <DB/Core/Field.h>
#include <DB/Core/Column.h>
namespace DB
{
/** Стиль квотирования значения.
*/
namespace QuoteStyle
{
enum Enum
{
None = 0, /// Без квотирования и эскейпинга. Hапример, для чисел.
String, /// В '', с эскейпингом с помощью \.
Array, /// В [], список значений через запятую, каждое из которых квотируется по своему.
Tuple, /// В (), список значений через запятую, каждое из которых квотируется по своему.
};
}
/** Метаданные типа для хранения (столбца).
* Содержит методы для сериализации/десериализации.
*/
class IColumnType
{
public:
/// Основное имя типа (например, BIGINT UNSIGNED).
virtual std::string getName() const = 0;
/** Бинарная сериализация - для сохранения на диск / в сеть и т. п.
* Обратите внимание, что присутствует по два вида методов
* - для работы с единичными значениями и целыми столбцами.
*/
virtual void serializeBinary(const Field & field, std::ostream & ostr) const = 0;
virtual void deserializeBinary(Field & field, std::istream & istr) const = 0;
virtual void serializeBinary(const Column & column, std::ostream & ostr) const = 0;
virtual void deserializeBinary(Column & column, std::istream & istr) const = 0;
/** Текстовая сериализация - для вывода на экран / сохранения в текстовый файл и т. п.
* Без эскейпинга и квотирования.
*/
virtual void serializeText(const Field & field, std::ostream & ostr) const = 0;
virtual void deserializeText(Field & field, std::istream & istr) const = 0;
/** Получить стиль квотирования значений этого типа.
*/
virtual QuoteStyle::Enum getQuoteStyle() const = 0;
/** Текстовая сериализация с эскейпингом, но без квотирования.
* Есть реализация по умолчанию, которая подходит почти для всех случаев.
*/
virtual void serializeTextEscaped(const Field & field, std::ostream & ostr) const;
virtual void deserializeTextEscaped(Field & field, std::istream & istr) const;
/** Текстовая сериализация в виде литерала, который может быть вставлен в запрос.
* Если compatible = true, то значение типа "массив" и "кортеж" ещё дополнительно записывается в кавычки,
* чтобы текстовый дамп можно было загрузить в другую СУБД с этими значениями в виде строки.
* Есть реализация по умолчанию, которая подходит почти для всех случаев.
*/
virtual void serializeTextQuoted(const Field & field, std::ostream & ostr, bool compatible = false) const;
virtual void deserializeTextQuoted(Field & field, std::istream & istr, bool compatible = false) const;
virtual ~IColumnType() {}
};
}
#endif

View File

@ -0,0 +1,76 @@
#ifndef DBMS_COMMON_COMPRESSED_INPUT_STREAM_H
#define DBMS_COMMON_COMPRESSED_INPUT_STREAM_H
#include <istream>
#include <ostream>
#include <vector>
#include <Poco/BufferedStreamBuf.h>
#include <quicklz/quicklz_level1.h>
#include <DB/Common/CompressedStream.h>
namespace DB
{
/** Аналогично Poco::InflatingStreamBuf, но используется библиотека QuickLZ,
* а также поддерживается только istream.
*/
class DecompressingStreamBuf : public Poco::BufferedStreamBuf
{
public:
DecompressingStreamBuf(std::istream & istr);
/** прочитать целиком один сжатый блок данных;
*/
void getChunk(std::vector<char> & res);
protected:
int readFromDevice(char * buffer, std::streamsize length);
private:
size_t pos_in_buffer;
std::istream * p_istr;
std::vector<char> uncompressed_buffer;
std::vector<char> compressed_buffer;
std::vector<char> scratch;
/** Читает и разжимает следующий кусок сжатых данных. */
void readCompressedChunk();
};
/** Базовый класс для CompressedInputStream; содержит DecompressingStreamBuf
*/
class DecompressingIOS : public virtual std::ios
{
public:
DecompressingIOS(std::istream & istr);
DecompressingStreamBuf * rdbuf();
protected:
DecompressingStreamBuf buf;
};
/** Разжимает данные, сжатые с помощью алгоритма QuickLZ.
*/
class CompressedInputStream : public DecompressingIOS, public std::istream
{
public:
CompressedInputStream(std::istream & istr);
int close();
/** прочитать целиком один сжатый блок данных
*/
void getChunk(std::vector<char> & res);
};
}
#endif

View File

@ -0,0 +1,71 @@
#ifndef DBMS_COMMON_COMPRESSED_OUTPUT_STREAM_H
#define DBMS_COMMON_COMPRESSED_OUTPUT_STREAM_H
#include <istream>
#include <ostream>
#include <vector>
#include <Poco/BufferedStreamBuf.h>
#include <quicklz/quicklz_level1.h>
#include <DB/Common/CompressedStream.h>
namespace DB
{
/** Аналогично Poco::DeflatingStreamBuf, но используется библиотека QuickLZ,
* а также поддерживается только ostream.
*/
class CompressingStreamBuf : public Poco::BufferedStreamBuf
{
public:
CompressingStreamBuf(std::ostream & ostr);
~CompressingStreamBuf();
int close();
protected:
int writeToDevice(const char * buffer, std::streamsize length);
private:
size_t pos_in_buffer;
std::ostream * p_ostr;
std::vector<char> uncompressed_buffer;
std::vector<char> compressed_buffer;
std::vector<char> scratch;
/** Сжимает данные, находящиеся в буфере и записывает их. */
void writeCompressedChunk();
};
/** Базовый класс для CompressedOutputStream; содержит CompressingStreamBuf
*/
class CompressingIOS : public virtual std::ios
{
public:
CompressingIOS(std::ostream & ostr);
CompressingStreamBuf * rdbuf();
protected:
CompressingStreamBuf buf;
};
/** Сжимает всё с помощью алгоритма QuickLZ блоками не более DBMS_COMPRESSING_STREAM_BUFFER_SIZE.
* Для записи последнего блока, следует вызвать метод close().
*/
class CompressedOutputStream : public CompressingIOS, public std::ostream
{
public:
CompressedOutputStream(std::ostream & ostr);
int close();
};
}
#endif

View File

@ -0,0 +1,11 @@
#ifndef DBMS_COMMON_COMPRESSING_STREAM_DEFINES_H
#define DBMS_COMMON_COMPRESSING_STREAM_DEFINES_H
/** Общие для CompressingStream.h и DecompressingStream.h дефайны */
#define DBMS_STREAM_BUFFER_SIZE 4096
#define DBMS_COMPRESSING_STREAM_BUFFER_SIZE 1048576
#define QUICKLZ_ADDITIONAL_SPACE 400
#define QUICKLZ_HEADER_SIZE 9
#endif

View File

@ -0,0 +1,17 @@
#ifndef DBMS_COMMON_ESCAPE_MANIPULATORS_H
#define DBMS_COMMON_ESCAPE_MANIPULATORS_H
#include <strconvert/escape_manip.h>
#include <strconvert/unescape_manip.h>
namespace DB
{
typedef strconvert::escape_file escape;
typedef strconvert::unescape_file unescape;
}
#endif

View File

@ -0,0 +1,49 @@
#ifndef DBMS_COMMON_ESCAPING_OUTPUT_STREAM_H
#define DBMS_COMMON_ESCAPING_OUTPUT_STREAM_H
#include <ostream>
#include <Poco/UnbufferedStreamBuf.h>
namespace DB
{
/** Поток, который эскейпит всё, что в него пишут.
*/
class EscapingStreamBuf : public Poco::UnbufferedStreamBuf
{
public:
EscapingStreamBuf(std::ostream & ostr);
protected:
int writeToDevice(const char * buffer, std::streamsize length);
private:
std::ostream * p_ostr;
};
class EscapingIOS : public virtual std::ios
{
public:
EscapingIOS(std::ostream & ostr);
EscapingStreamBuf * rdbuf();
protected:
EscapingStreamBuf buf;
};
class EscapingOutputStream : public EscapingIOS, public std::ostream
{
public:
EscapingOutputStream(std::ostream & ostr);
};
}
#endif

View File

@ -0,0 +1,17 @@
#ifndef DBMS_COMMON_QUOTE_MANIPULATORS_H
#define DBMS_COMMON_QUOTE_MANIPULATORS_H
#include <strconvert/escape_manip.h>
#include <strconvert/unescape_manip.h>
namespace DB
{
typedef strconvert::quote_fast quote;
typedef strconvert::unquote_fast unquote;
}
#endif

View File

@ -0,0 +1,56 @@
#ifndef DBMS_COMMON_UNESCAPING_INPUT_STREAM_H
#define DBMS_COMMON_UNESCAPING_INPUT_STREAM_H
#include <istream>
#include <Poco/UnbufferedStreamBuf.h>
namespace DB
{
/** Поток, который unescape-ит всё, что из него читают.
*/
class UnescapingStreamBuf : public Poco::UnbufferedStreamBuf
{
public:
UnescapingStreamBuf(std::istream & istr);
protected:
int readFromDevice(char * buffer, std::streamsize length);
private:
std::istream * p_istr;
enum State
{
Normal = 0,
EscapeSequence
};
State state;
};
class UnescapingIOS : public virtual std::ios
{
public:
UnescapingIOS(std::istream & istr);
UnescapingStreamBuf * rdbuf();
protected:
UnescapingStreamBuf buf;
};
class UnescapingInputStream : public UnescapingIOS, public std::istream
{
public:
UnescapingInputStream(std::istream & istr);
};
}
#endif

View File

@ -0,0 +1,40 @@
#ifndef DB_VARINT_H
#define DB_VARINT_H
#include <DB/Core/Types.h>
namespace DB
{
/** Записать UInt64 в формате переменной длины (base128) */
void writeVarUInt(UInt64 x, std::ostream & ostr);
/** Прочитать UInt64, записанный в формате переменной длины (base128) */
void readVarUInt(UInt64 & x, std::istream & istr);
/** Получить длину UInt64 в формате VarUInt */
size_t getLengthOfVarUInt(UInt64 x);
/** Записать Int64 в формате переменной длины (base128) */
inline void writeVarInt(Int64 x, std::ostream & ostr)
{
writeVarUInt(static_cast<UInt64>((x << 1) ^ (x >> 63)), ostr);
}
/** Прочитать Int64, записанный в формате переменной длины (base128) */
inline void readVarInt(Int64 & x, std::istream & istr)
{
readVarUInt(*reinterpret_cast<UInt64*>(&x), istr);
x = (static_cast<UInt64>(x) >> 1) ^ -(x & 1);
}
}
#endif

View File

@ -0,0 +1,49 @@
#ifndef DBMS_CORE_BLOCK_H
#define DBMS_CORE_BLOCK_H
#include <vector>
#include <map>
#include <list>
#include <DB/Core/ColumnWithMetadata.h>
namespace DB
{
/** Тип данных для представления подмножества строк и столбцов в оперативке.
* Содержит также метаданные (типы) столбцов и их имена.
* Позволяет вставлять, удалять столбцы в любом порядке, менять порядок столбцов.
*/
class Block
{
public:
typedef std::list<ColumnWithNameAndType> Container_t;
typedef std::vector<Container_t::iterator> IndexByPosition_t;
typedef std::map<String, Container_t::iterator> IndexByName_t;
private:
Container_t columns;
IndexByPosition_t index_by_position;
IndexByName_t index_by_name;
void rebuildIndexByPosition();
public:
void insert(size_t position, const ColumnWithNameAndType & elem);
void erase(size_t position);
ColumnWithNameAndType & getByPosition(size_t position);
const ColumnWithNameAndType & getByPosition(size_t position) const;
ColumnWithNameAndType & getByName(const std::string & name);
const ColumnWithNameAndType & getByName(const std::string & name) const;
operator bool() { return !columns.empty(); }
operator!() { return columns.empty(); }
};
}
#endif

View File

@ -0,0 +1,51 @@
#ifndef DBMS_CORE_COLUMN_H
#define DBMS_CORE_COLUMN_H
#include <vector>
#include <boost/variant.hpp>
#include <boost/variant/recursive_variant.hpp>
#include <boost/variant/static_visitor.hpp>
#include <DB/Core/Types.h>
#include <DB/Core/Field.h>
namespace DB
{
/** Типы данных для представления столбцов значений в оперативке.
*/
typedef std::vector<UInt8> UInt8Column;
typedef std::vector<UInt16> UInt16Column;
typedef std::vector<UInt32> UInt32Column;
typedef std::vector<UInt64> UInt64Column;
typedef std::vector<Int8> Int8Column;
typedef std::vector<Int16> Int16Column;
typedef std::vector<Int32> Int32Column;
typedef std::vector<Int64> Int64Column;
typedef std::vector<Float32> Float32Column;
typedef std::vector<Float64> Float64Column;
typedef std::vector<String> StringColumn;
typedef std::vector<Field> VariantColumn; /// Столбец произвольных значений, а также nullable значений
typedef boost::make_recursive_variant<
UInt8Column, UInt16Column, UInt32Column, UInt64Column,
Int8Column, Int16Column, Int32Column, Int64Column,
Float32Column, Float64Column,
StringColumn,
VariantColumn, /// Variant, Nullable
std::vector<boost::recursive_variant_> /// Tuple, Array
>::type Column;
typedef std::vector<Column> TupleColumn; /// Столбец значений типа "кортеж" - несколько столбцов произвольного типа
typedef std::vector<Column> ArrayColumn; /// Столбец значений типа "массив" - столбец, значения в котором - массивы
}
#endif

View File

@ -0,0 +1,27 @@
#ifndef DBMS_CORE_COLUMN_WITH_NAME_AND_TYPE_H
#define DBMS_CORE_COLUMN_WITH_NAME_AND_TYPE_H
#include <Poco/SharedPtr.h>
#include <DB/Core/Column.h>
#include <DB/ColumnTypes/IColumnType.h>
namespace DB
{
using Poco::SharedPtr;
/** Тип данных для представления столбца вместе с его типом и именем в оперативке.
*/
struct ColumnWithNameAndType
{
SharedPtr<Column> column;
SharedPtr<IColumnType> type;
String name;
};
}
#endif

View File

@ -0,0 +1,18 @@
#ifndef DBMS_CORE_ERROR_CODES_H
#define DBMS_CORE_ERROR_CODES_H
namespace DB
{
namespace ErrorCodes
{
enum ErrorCodes
{
};
}
}
#endif

View File

@ -0,0 +1,16 @@
#ifndef DBMS_CORE_EXCEPTION_H
#define DBMS_CORE_EXCEPTION_H
#include <Poco/Exception.h>
namespace DB
{
/** Тип исключения, чтобы отличать его от других.
*/
POCO_DECLARE_EXCEPTION(Foundation_API, Exception, Poco::Exception);
}
#endif

View File

@ -0,0 +1,77 @@
#ifndef DBMS_CORE_FIELD_H
#define DBMS_CORE_FIELD_H
#include <vector>
#include <boost/variant.hpp>
#include <boost/variant/recursive_variant.hpp>
#include <boost/variant/static_visitor.hpp>
#include <DB/Core/Types.h>
namespace DB
{
/** Типы данных для представления единичного значения произвольного типа в оперативке.
* Внимание! Предпочтительно вместо единичных значений хранить кусочки столбцов. См. Column.h
*/
typedef boost::make_recursive_variant<
Null,
UInt64,
Int64,
Float64,
String,
std::vector<boost::recursive_variant_> /// Array, Tuple
>::type Field;
typedef std::vector<Field> Array; /// Значение типа "массив"
/// почему-то у boost::variant определены операторы < и ==, но не остальные операторы сравнения
inline bool operator!= (const Field & lhs, const Field & rhs) { return !(lhs == rhs); }
inline bool operator<= (const Field & lhs, const Field & rhs) { return lhs < rhs || lhs == rhs; }
inline bool operator>= (const Field & lhs, const Field & rhs) { return !(lhs < rhs); }
inline bool operator> (const Field & lhs, const Field & rhs) { return !(lhs < rhs) && !(lhs == rhs); }
/** Числовое значение конкретного типа Field */
namespace FieldType
{
enum Enum
{
Null = 0,
UInt64,
Int64,
Float64,
String,
Array
};
}
/** Возвращает true, если вариант - Null */
class FieldVisitorIsNull : public boost::static_visitor<bool>
{
public:
template <typename T> bool operator() (const T & x) const { return false; }
bool operator() (const Null & x) const { return true; }
};
/** Возвращает числовое значение типа */
class FieldVisitorGetType : public boost::static_visitor<FieldType::Enum>
{
public:
FieldType::Enum operator() (const Null & x) const { return FieldType::Null; }
FieldType::Enum operator() (const UInt64 & x) const { return FieldType::UInt64; }
FieldType::Enum operator() (const Int64 & x) const { return FieldType::Int64; }
FieldType::Enum operator() (const Float64 & x) const { return FieldType::Float64; }
FieldType::Enum operator() (const String & x) const { return FieldType::String; }
FieldType::Enum operator() (const Array & x) const { return FieldType::Array; }
};
}
#endif

View File

@ -0,0 +1,26 @@
#ifndef DBMS_CORE_NAME_AND_TYPE_H
#define DBMS_CORE_NAME_AND_TYPE_H
#include <Poco/SharedPtr.h>
#include <DB/Core/Column.h>
#include <DB/ColumnTypes/IColumnType.h>
namespace DB
{
using Poco::SharedPtr;
/** Имя столбца и тип столбца.
*/
struct NameAndType
{
SharedPtr<IColumnType> type;
String name;
};
}
#endif

View File

@ -0,0 +1,20 @@
#ifndef DBMS_CORE_ROW_H
#define DBMS_CORE_ROW_H
#include <vector>
#include <DB/Core/Field.h>
namespace DB
{
/** Тип данных для представления одной строки таблицы в оперативке.
* Внимание! Предпочтительно вместо единичных строк хранить блоки столбцов. См. Block.h
*/
typedef std::vector<Field> Row;
}
#endif

View File

@ -0,0 +1,35 @@
#ifndef DBMS_CORE_TYPES_H
#define DBMS_CORE_TYPES_H
#include <string>
#include <boost/none.hpp>
#include <Poco/Types.h>
#include <Poco/SharedPtr.h>
namespace DB
{
/** Типы данных для представления значений из БД в оперативке.
*/
typedef boost::none_t Null;
typedef Poco::UInt8 UInt8;
typedef Poco::UInt16 UInt16;
typedef Poco::UInt32 UInt32;
typedef Poco::UInt64 UInt64;
typedef Poco::Int8 Int8;
typedef Poco::Int16 Int16;
typedef Poco::Int32 Int32;
typedef Poco::Int64 Int64;
typedef float Float32;
typedef double Float64;
typedef std::string String;
}
#endif

View File

@ -0,0 +1,33 @@
#ifndef DBMS_DATA_STREAMS_IBLOCKINPUTSTREAM_H
#ifndef DBMS_DATA_STREAMS_IBLOCKINPUTSTREAM_H
#include <ostream>
#include <Poco/SharedPtr.h>
#include <DB/Core/Block.h>
namespace DB
{
using Poco::SharedPtr;
/** Интерфейс потока для чтения данных по блокам из БД.
* Реляционные операции предполагается делать также реализациями этого интерфейса.
*/
class IBlockInputStream
{
public:
/** Прочитать следующий блок.
* Если блоков больше нет - вернуть пустой блок (для которого operator bool возвращает false).
*/
virtual Block read() = 0;
virtual ~IBlockInputStream() {}
};
}
#endif

View File

@ -0,0 +1,48 @@
#ifndef DBMS_STORAGES_ISTORAGE_H
#define DBMS_STORAGES_ISTORAGE_H
#include <Poco/SharedPtr.h>
namespace DB
{
using Poco::SharedPtr;
/** Хранилище. Отвечает за:
* - хранение данных таблицы;
* - определение, в каком файле (или не файле) хранятся данные;
* - поиск данных и обновление данных;
* - структура хранения данных (сжатие, etc.)
* - конкуррентный доступ к данным (блокировки, etc.)
*/
class IStorage
{
private:
/** Установить указатель на таблицу и кол-группу.
* - часть инициализации, которая выполняется при инициализации таблицы.
* (инициализация хранилища выполняется в два шага:
* 1 - конструктор,
* 2 - добавление к таблице (выполняется в конструкторе Table))
*/
virtual void addToTable(Table * table_, ColumnGroup * column_group_) = 0;
public:
/** Прочитать данные, соответствующие точному значению ключа или префиксу.
* Возвращает объект, с помощью которого можно последовательно читать данные.
*/
virtual Poco::SharedPtr<ITablePartReader> read(const Row & key) = 0;
/** Записать пачку данных в таблицу, обновляя существующие данные, если они есть.
* @param data - набор данных вида ключ (набор столбцов) -> значение (набор столбцов)
* @param mask - битовая маска - какие столбцы входят в кол-группу,
* которую хранит это хранилище
*/
virtual void merge(const AggregatedRowSet & data, const ColumnMask & mask) = 0;
virtual ~IStorage() {}
};
}
#endif

View File

@ -0,0 +1,30 @@
#include <boost/variant/apply_visitor.hpp>
#include <DB/Common/EscapeManipulators.h>
#include <DB/Common/QuoteManipulators.h>
#include <DB/ColumnTypes/IColumnType.h>
namespace DB
{
virtual void serializeTextEscaped(const Field & field, std::ostream & ostr) const
{
FieldVisitorIsNull visitor;
if (boost::apply_visitor(visitor, field))
ostr << "\\N";
else
ostr <<
}
virtual void deserializeTextEscaped(Field & field, std::istream & istr) const;
virtual void serializeTextQuoted(const Field & field, std::ostream & ostr, bool compatible = false) const;
virtual void deserializeTextQuoted(Field & field, std::istream & istr, bool compatible = false) const;
};
}
#endif

View File

@ -0,0 +1,115 @@
#include <algorithm>
#include <DB/Common/CompressedInputStream.h>
namespace DB
{
DecompressingStreamBuf::DecompressingStreamBuf(std::istream & istr)
: Poco::BufferedStreamBuf(DBMS_STREAM_BUFFER_SIZE, std::ios::in),
pos_in_buffer(0),
p_istr(&istr),
compressed_buffer(QUICKLZ_HEADER_SIZE),
scratch(QLZ_SCRATCH_DECOMPRESS)
{
}
void DecompressingStreamBuf::getChunk(std::vector<char> & res)
{
readCompressedChunk();
pos_in_buffer = uncompressed_buffer.size();
res.resize(pos_in_buffer);
memcpy(&res[0], &uncompressed_buffer[0], pos_in_buffer);
}
void DecompressingStreamBuf::readCompressedChunk()
{
/// прочитаем заголовок
p_istr->read(&compressed_buffer[0], QUICKLZ_HEADER_SIZE);
if (!p_istr->good())
return;
size_t size_compressed = qlz_size_compressed(&compressed_buffer[0]);
size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]);
compressed_buffer.resize(size_compressed);
uncompressed_buffer.resize(size_decompressed);
/// считаем остаток сжатого блока
p_istr->read(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE);
if (!p_istr->good())
return;
/// разжимаем блок
qlz_decompress(&compressed_buffer[0], &uncompressed_buffer[0], &scratch[0]);
}
int DecompressingStreamBuf::readFromDevice(char * buffer, std::streamsize length)
{
if (length == 0 || !p_istr)
return 0;
size_t bytes_processed = 0;
while (bytes_processed < static_cast<size_t>(length))
{
if (pos_in_buffer == uncompressed_buffer.size())
{
readCompressedChunk();
pos_in_buffer = 0;
if (!p_istr->good())
{
p_istr = 0;
return bytes_processed;
}
}
size_t bytes_to_copy = std::min(
uncompressed_buffer.size() - pos_in_buffer,
static_cast<size_t>(length) - bytes_processed);
memcpy(buffer + bytes_processed, &uncompressed_buffer[pos_in_buffer], bytes_to_copy);
pos_in_buffer += bytes_to_copy;
bytes_processed += bytes_to_copy;
}
return static_cast<int>(length);
}
DecompressingIOS::DecompressingIOS(std::istream & istr)
: buf(istr)
{
poco_ios_init(&buf);
}
DecompressingStreamBuf * DecompressingIOS::rdbuf()
{
return &buf;
}
CompressedInputStream::CompressedInputStream(std::istream & istr)
: DecompressingIOS(istr),
std::istream(&buf)
{
}
void CompressedInputStream::getChunk(std::vector<char> & res)
{
buf.getChunk(res);
}
}

View File

@ -0,0 +1,107 @@
#include <algorithm>
#include <DB/Common/CompressedOutputStream.h>
namespace DB
{
CompressingStreamBuf::CompressingStreamBuf(std::ostream & ostr)
: Poco::BufferedStreamBuf(DBMS_STREAM_BUFFER_SIZE, std::ios::out),
pos_in_buffer(0),
p_ostr(&ostr),
uncompressed_buffer(DBMS_COMPRESSING_STREAM_BUFFER_SIZE),
compressed_buffer(DBMS_COMPRESSING_STREAM_BUFFER_SIZE + QUICKLZ_ADDITIONAL_SPACE),
scratch(QLZ_SCRATCH_COMPRESS)
{
}
CompressingStreamBuf::~CompressingStreamBuf()
{
close();
}
void CompressingStreamBuf::writeCompressedChunk()
{
size_t compressed_size = qlz_compress(
&uncompressed_buffer[0],
&compressed_buffer[0],
pos_in_buffer,
&scratch[0]);
p_ostr->write(&compressed_buffer[0], compressed_size);
pos_in_buffer = 0;
}
int CompressingStreamBuf::close()
{
sync();
if (pos_in_buffer != 0)
writeCompressedChunk();
return 0;
}
int CompressingStreamBuf::writeToDevice(const char * buffer, std::streamsize length)
{
if (length == 0 || !p_ostr)
return 0;
size_t bytes_processed = 0;
while (bytes_processed < static_cast<size_t>(length))
{
size_t bytes_to_copy = std::min(
uncompressed_buffer.size() - pos_in_buffer,
static_cast<size_t>(length) - bytes_processed);
memcpy(&uncompressed_buffer[pos_in_buffer], buffer + bytes_processed, bytes_to_copy);
pos_in_buffer += bytes_to_copy;
bytes_processed += bytes_to_copy;
if (pos_in_buffer == uncompressed_buffer.size())
writeCompressedChunk();
if (!p_ostr->good())
{
p_ostr = 0;
return bytes_processed;
}
}
return static_cast<int>(length);
}
CompressingIOS::CompressingIOS(std::ostream & ostr)
: buf(ostr)
{
poco_ios_init(&buf);
}
CompressingStreamBuf * CompressingIOS::rdbuf()
{
return &buf;
}
CompressedOutputStream::CompressedOutputStream(std::ostream & ostr)
: CompressingIOS(ostr),
std::ostream(&buf)
{
}
int CompressedOutputStream::close()
{
return buf.close();
}
}

View File

@ -0,0 +1,84 @@
#include <DB/Common/EscapingOutputStream.h>
#define ESCAPING_STREAM_BUFFER_SIZE 4096
namespace DB
{
EscapingStreamBuf::EscapingStreamBuf(std::ostream & ostr)
: p_ostr(&ostr)
{
}
int EscapingStreamBuf::writeToDevice(const char * buffer, std::streamsize length)
{
if (length == 0 || !p_ostr)
return 0;
for (std::streamsize pos = 0; pos < length; ++pos)
{
char c = buffer[pos];
switch(c)
{
case '\\':
p_ostr->write("\\\\", 2);
break;
case '\b':
p_ostr->write("\\b", 2);
break;
case '\f':
p_ostr->write("\\f", 2);
break;
case '\n':
p_ostr->write("\\n", 2);
break;
case '\r':
p_ostr->write("\\r", 2);
break;
case '\t':
p_ostr->write("\\t", 2);
break;
case '\'':
p_ostr->write("\\'", 2);
break;
case '"':
p_ostr->write("\\\"", 2);
break;
default:
p_ostr->put(c);
}
}
if (!p_ostr->good())
{
p_ostr = 0;
return -1;
}
return static_cast<int>(length);
}
EscapingIOS::EscapingIOS(std::ostream & ostr)
: buf(ostr)
{
poco_ios_init(&buf);
}
EscapingStreamBuf * EscapingIOS::rdbuf()
{
return &buf;
}
EscapingOutputStream::EscapingOutputStream(std::ostream & ostr)
: EscapingIOS(ostr),
std::ostream(&buf)
{
}
}

View File

@ -0,0 +1,102 @@
#include <DB/Common/UnescapingInputStream.h>
namespace DB
{
UnescapingStreamBuf::UnescapingStreamBuf(std::istream & istr)
: p_istr(&istr), state(Normal)
{
}
int UnescapingStreamBuf::readFromDevice(char * buffer, std::streamsize length)
{
if (length == 0 || !p_istr)
return 0;
for (std::streamsize pos = 0; pos < length;)
{
char c = p_istr->get();
if (!p_istr->good())
{
p_istr = 0;
return pos;
}
switch (state)
{
case EscapeSequence:
switch (c)
{
case '\\':
buffer[pos] = '\\';
break;
case 'b':
buffer[pos] = '\b';
break;
case 'f':
buffer[pos] = '\f';
break;
case 'n':
buffer[pos] = '\n';
break;
case 'r':
buffer[pos] = '\r';
break;
case 't':
buffer[pos] = '\t';
break;
case '\'':
buffer[pos] = '\'';
break;
case '"':
buffer[pos] = '"';
break;
default:
buffer[pos] = c;
}
++pos;
state = Normal;
break;
case Normal:
default:
switch (c)
{
case '\\':
state = EscapeSequence;
break;
default:
buffer[pos] = c;
++pos;
}
}
}
return static_cast<int>(length);
}
UnescapingIOS::UnescapingIOS(std::istream & istr)
: buf(istr)
{
poco_ios_init(&buf);
}
UnescapingStreamBuf * UnescapingIOS::rdbuf()
{
return &buf;
}
UnescapingInputStream::UnescapingInputStream(std::istream & istr)
: UnescapingIOS(istr),
std::istream(&buf)
{
}
}

157
dbms/src/Common/VarInt.cpp Normal file
View File

@ -0,0 +1,157 @@
#include <istream>
#include <ostream>
#include <DB/Common/VarInt.h>
#include <Poco/Types.h>
namespace DB
{
void writeVarUInt(UInt64 x, std::ostream & ostr)
{
char buf[9];
buf[0] = static_cast<Poco::UInt8>(x | 0x80);
if (x >= (1ULL << 7))
{
buf[1] = static_cast<Poco::UInt8>((x >> 7) | 0x80);
if (x >= (1ULL << 14))
{
buf[2] = static_cast<Poco::UInt8>((x >> 14) | 0x80);
if (x >= (1ULL << 21))
{
buf[3] = static_cast<Poco::UInt8>((x >> 21) | 0x80);
if (x >= (1ULL << 28))
{
buf[4] = static_cast<Poco::UInt8>((x >> 28) | 0x80);
if (x >= (1ULL << 35))
{
buf[5] = static_cast<Poco::UInt8>((x >> 35) | 0x80);
if (x >= (1ULL << 42))
{
buf[6] = static_cast<Poco::UInt8>((x >> 42) | 0x80);
if (x >= (1ULL << 49))
{
buf[7] = static_cast<Poco::UInt8>((x >> 49) | 0x80);
if (x >= (1ULL << 56))
{
buf[8] = static_cast<Poco::UInt8>((x >> 56) | 0x80);
ostr.write(buf, 9);
}
else
{
buf[7] &= 0x7F;
ostr.write(buf, 8);
}
}
else
{
buf[6] &= 0x7F;
ostr.write(buf, 7);
}
}
else
{
buf[5] &= 0x7F;
ostr.write(buf, 6);
}
}
else
{
buf[4] &= 0x7F;
ostr.write(buf, 5);
}
}
else
{
buf[3] &= 0x7F;
ostr.write(buf, 4);
}
}
else
{
buf[2] &= 0x7F;
ostr.write(buf, 3);
}
}
else
{
buf[1] &= 0x7F;
ostr.write(buf, 2);
}
}
else
{
buf[0] &= 0x7F;
ostr.write(buf, 1);
}
}
void readVarUInt(UInt64 & x, std::istream & istr)
{
int byte;
byte = istr.get();
x = static_cast<Poco::UInt64>(byte) & 0x7F;
if (byte & 0x80)
{
byte = istr.get();
x |= (static_cast<Poco::UInt64>(byte) & 0x7F) << 7;
if (byte & 0x80)
{
byte = istr.get();
x |= (static_cast<Poco::UInt64>(byte) & 0x7F) << 14;
if (byte & 0x80)
{
byte = istr.get();
x |= (static_cast<Poco::UInt64>(byte) & 0x7F) << 21;
if (byte & 0x80)
{
byte = istr.get();
x |= (static_cast<Poco::UInt64>(byte) & 0x7F) << 28;
if (byte & 0x80)
{
byte = istr.get();
x |= (static_cast<Poco::UInt64>(byte) & 0x7F) << 35;
if (byte & 0x80)
{
byte = istr.get();
x |= (static_cast<Poco::UInt64>(byte) & 0x7F) << 42;
if (byte & 0x80)
{
byte = istr.get();
x |= (static_cast<Poco::UInt64>(byte) & 0x7F) << 49;
if (byte & 0x80)
{
byte = istr.get();
x |= (static_cast<Poco::UInt64>(byte) & 0x7F) << 56;
}
}
}
}
}
}
}
}
}
size_t getLengthOfVarUInt(UInt64 x)
{
return x < (1ULL << 7) ? 1
: (x < (1ULL << 14) ? 2
: (x < (1ULL << 21) ? 3
: (x < (1ULL << 28) ? 4
: (x < (1ULL << 35) ? 5
: (x < (1ULL << 42) ? 6
: (x < (1ULL << 49) ? 7
: (x < (1ULL << 56) ? 8
: 9)))))));
}
}

58
dbms/src/Core/Block.cpp Normal file
View File

@ -0,0 +1,58 @@
#include <DB/Core/Block.h>
namespace DB
{
void Block::rebuildIndexByPosition()
{
index_by_position.resize(columns.size());
size_t pos = 0;
for (Container_t::iterator it = columns.begin(); it != columns.end(); ++it, ++pos)
index_by_position[pos] = it;
}
void Block::insert(size_t position, const ColumnWithNameAndType & elem)
{
Container_t::iterator it = columns.insert(index_by_position[position], elem);
rebuildIndexByPosition();
index_by_name[elem.name] = it;
}
void Block::erase(size_t position)
{
Container_t::iterator it = index_by_position[position];
index_by_name.erase(index_by_name.find(it->name));
columns.erase(it);
rebuildIndexByPosition();
}
Block::ColumnWithNameAndType & Block::getByPosition(size_t position)
{
return *index_by_position[position];
}
const Block::ColumnWithNameAndType & Block::getByPosition(size_t position) const
{
return *index_by_position[position];
}
Block::ColumnWithNameAndType & Block::getByName(const std::string & name)
{
return *index_by_name[name];
}
const Block::ColumnWithNameAndType & Block::getByName(const std::string & name) const
{
return *index_by_name[name];
}
}
#endif

View File

@ -0,0 +1,10 @@
#include <typeinfo>
#include <DB/Core/Exception.h>
namespace DB
{
POCO_IMPLEMENT_EXCEPTION(Exception, Poco::Exception, "DB::Exception");
}