propagated [de]serialization to storages and (in|out)put streams [#CONV-7967]

This commit is contained in:
Vyacheslav Alipov 2013-07-12 13:35:05 +00:00
parent 73bc183ffc
commit 090aae8e7b
10 changed files with 308 additions and 8 deletions

View File

@ -70,7 +70,7 @@ public:
throw Exception("Method getDefault is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
const NamesAndTypesListPtr & getNestedTypes() const { return nested; }
const NamesAndTypesListPtr & getNestedTypesList() const { return nested; }
const DataTypePtr & getOffsetsType() const { return offsets; }
};

View File

@ -302,6 +302,19 @@ private:
addStream(name, *type_arr->getNestedType(), mark_number, level + 1);
}
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
String escaped_size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream(
path + escaped_size_name,
mark_number)));
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(name + "." + it->first, *it->second, mark_number, level + 1);
}
else
streams.insert(std::make_pair(name, new Stream(
path + escaped_column_name,
@ -326,6 +339,29 @@ private:
dynamic_cast<const ColumnArray &>(column).getOffsets()[column.size() - 1],
level + 1);
}
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
type_nested->deserializeOffsets(
column,
streams[name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed,
max_rows_to_read);
if (column.size())
{
ColumnNested & column_nested = dynamic_cast<ColumnNested &>(column);
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
{
readData(
name + "." + it->first,
*it->second,
*column_nested.getData()[i],
column_nested.getOffsets()[column.size() - 1],
level + 1);
}
}
}
else
type.deserializeBinary(column, streams[name]->compressed, max_rows_to_read);
}

View File

@ -199,6 +199,25 @@ private:
prev_mark += storage.index_granularity;
}
}
if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
String size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
WriteBufferFromFile plain(path + size_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags);
WriteBufferFromFile marks(path + size_name + ".mrk", 4096, flags);
CompressedWriteBuffer compressed(plain);
size_t prev_mark = 0;
while (prev_mark < size)
{
/// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока)
writeIntBinary(plain.count(), marks);
writeIntBinary(compressed.offset(), marks);
type_nested->serializeOffsets(column, compressed, prev_mark, storage.index_granularity);
prev_mark += storage.index_granularity;
}
}
{
WriteBufferFromFile plain(path + escaped_column_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags);

View File

@ -131,6 +131,19 @@ private:
addStream(name, *type_arr->getNestedType(), level + 1);
}
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
String escaped_size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
column_streams[size_name] = new ColumnStream(
part_tmp_path + escaped_size_name + ".bin",
part_tmp_path + escaped_size_name + ".mrk");
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(name + "." + it->first, *it->second, level + 1);
}
else
column_streams[name] = new ColumnStream(
part_tmp_path + escaped_column_name + ".bin",
@ -171,6 +184,33 @@ private:
prev_mark += limit;
}
}
if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
ColumnStream & stream = *column_streams[size_name];
size_t prev_mark = 0;
while (prev_mark < size)
{
size_t limit = 0;
/// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк.
if (prev_mark == 0 && index_offset != 0)
{
limit = index_offset;
}
else
{
limit = storage.index_granularity;
writeIntBinary(stream.plain.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
type_nested->serializeOffsets(column, stream.compressed, prev_mark, limit);
prev_mark += limit;
}
}
{
ColumnStream & stream = *column_streams[name];

View File

@ -4,8 +4,10 @@
#include <DB/IO/VarInt.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnNested.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataStreams/NativeBlockInputStream.h>
@ -33,6 +35,28 @@ static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr
istr,
dynamic_cast<const ColumnArray &>(column).getOffsets()[rows - 1]);
}
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
ColumnNested & column_nested = dynamic_cast<ColumnNested &>(column);
IColumn & offsets_column = *column_nested.getOffsetsColumn();
type_nested->getOffsetsType()->deserializeBinary(offsets_column, istr, rows);
if (offsets_column.size() != rows)
throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA);
if (rows)
{
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
{
readData(
*it->second,
*column_nested.getData()[i],
istr,
column_nested.getOffsets()[rows - 1]);
}
}
}
else
type.deserializeBinary(column, istr, rows);

View File

@ -5,8 +5,10 @@
#include <DB/Columns/ColumnConst.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnNested.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
@ -26,6 +28,20 @@ static void writeData(const IDataType & type, const IColumn & column, WriteBuffe
if (!dynamic_cast<const ColumnArray &>(column).getData().empty())
writeData(*type_arr->getNestedType(), dynamic_cast<const ColumnArray &>(column).getData(), ostr);
}
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
const ColumnNested & column_nested = dynamic_cast<const ColumnNested &>(column);
type_nested->getOffsetsType()->serializeBinary(*column_nested.getOffsetsColumn(), ostr);
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
{
if (column_nested.getData()[i]->empty())
break;
writeData(*it->second, *column_nested.getData()[i], ostr);
}
}
else
type.serializeBinary(column, ostr);
}

View File

@ -47,13 +47,13 @@ std::string DataTypeNested::getName() const
void DataTypeNested::serializeBinary(const Field & field, WriteBuffer & ostr) const
{
throw Exception("Method serializeBinary(const Field & field, WriteBuffer & ostr) is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Method serializeBinary(const Field &, WriteBuffer &) is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void DataTypeNested::deserializeBinary(Field & field, ReadBuffer & istr) const
{
throw Exception("Method deserializeBinary(Field & field, ReadBuffer & istr) is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Method deserializeBinary(Field &, ReadBuffer &) is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}

View File

@ -10,8 +10,10 @@
#include <DB/IO/WriteHelpers.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnNested.h>
#include <DB/Storages/StorageLog.h>
@ -94,6 +96,19 @@ void LogBlockInputStream::addStream(const String & name, const IDataType & type,
addStream(name, *type_arr->getNestedType(), level + 1);
}
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream(
storage.files[size_name].data_file.path(),
mark_number
? storage.files[size_name].marks[mark_number].offset
: 0)));
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(name + "." + it->first, *it->second, level + 1);
}
else
streams.insert(std::make_pair(name, new Stream(
storage.files[name].data_file.path(),
@ -121,6 +136,29 @@ void LogBlockInputStream::readData(const String & name, const IDataType & type,
dynamic_cast<const ColumnArray &>(column).getOffsets()[column.size() - 1],
level + 1);
}
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
type_nested->deserializeOffsets(
column,
streams[name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed,
max_rows_to_read);
if (column.size())
{
ColumnNested & column_nested = dynamic_cast<ColumnNested &>(column);
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
{
readData(
name + "." + it->first,
*it->second,
*column_nested.getData()[i],
column_nested.getOffsets()[column.size() - 1],
level + 1);
}
}
}
else
type.deserializeBinary(column, streams[name]->compressed, max_rows_to_read);
}
@ -161,6 +199,16 @@ void LogBlockOutputStream::addStream(const String & name, const IDataType & type
addStream(name, *type_arr->getNestedType(), level + 1);
}
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream(
storage.files[size_name].data_file.path())));
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(name + "." + it->first, *it->second, level + 1);
}
else
streams.insert(std::make_pair(name, new Stream(
storage.files[name].data_file.path())));
@ -185,6 +233,32 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
writeData(name, *type_arr->getNestedType(), dynamic_cast<const ColumnArray &>(column).getData(), out_marks, level + 1);
}
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
Mark mark;
mark.rows = (storage.files[size_name].marks.empty() ? 0 : storage.files[size_name].marks.back().rows) + column.size();
mark.offset = streams[size_name]->plain_offset + streams[size_name]->plain.count();
out_marks.push_back(std::make_pair(storage.files[size_name].column_index, mark));
type_nested->serializeOffsets(column, streams[size_name]->compressed);
streams[size_name]->compressed.next();
const ColumnNested & column_nested = dynamic_cast<const ColumnNested &>(column);
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
{
writeData(
name + "." + it->first,
*it->second,
*column_nested.getData()[i],
out_marks,
level + 1);
}
}
else
{
Mark mark;
@ -249,7 +323,7 @@ StoragePtr StorageLog::create(const std::string & path_, const std::string & nam
void StorageLog::addFile(const String & column_name, const IDataType & type, size_t level)
{
if (files.end() != files.find(column_name))
throw Exception("Duplicate column with name " + column_name + " in constructor of StorageTinyLog.",
throw Exception("Duplicate column with name " + column_name + " in constructor of StorageLog.",
ErrorCodes::DUPLICATE_COLUMN);
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
@ -265,6 +339,21 @@ void StorageLog::addFile(const String & column_name, const IDataType & type, siz
addFile(column_name, *type_arr->getNestedType(), level + 1);
}
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
String size_column_suffix = ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
ColumnData & column_data = files.insert(std::make_pair(column_name + size_column_suffix, ColumnData())).first->second;
column_data.column_index = column_names.size();
column_data.data_file = Poco::File(
path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + size_column_suffix + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION);
column_names.push_back(column_name + size_column_suffix);
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addFile(column_name + "." + it->first, *it->second, level + 1);
}
else
{
ColumnData & column_data = files.insert(std::make_pair(column_name, ColumnData())).first->second;

View File

@ -17,9 +17,11 @@
#include <DB/Columns/ColumnsNumber.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnNested.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/MergingSortedBlockInputStream.h>

View File

@ -10,8 +10,10 @@
#include <DB/IO/WriteHelpers.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnNested.h>
#include <DB/Storages/StorageTinyLog.h>
@ -83,6 +85,15 @@ void TinyLogBlockInputStream::addStream(const String & name, const IDataType & t
addStream(name, *type_arr->getNestedType(), level + 1);
}
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream(storage.files[size_name].data_file.path())));
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(name + "." + it->first, *it->second, level + 1);
}
else
streams.insert(std::make_pair(name, new Stream(storage.files[name].data_file.path())));
}
@ -108,6 +119,29 @@ void TinyLogBlockInputStream::readData(const String & name, const IDataType & ty
throw Exception("Cannot read array data for all offsets", ErrorCodes::CANNOT_READ_ALL_DATA);
}
}
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
type_nested->deserializeOffsets(
column,
streams[name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed,
limit);
if (column.size())
{
ColumnNested & column_nested = dynamic_cast<ColumnNested &>(column);
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
{
readData(
name + "." + it->first,
*it->second,
*column_nested.getData()[i],
column_nested.getOffsets()[column.size() - 1],
level + 1);
}
}
}
else
type.deserializeBinary(column, streams[name]->compressed, limit);
}
@ -131,6 +165,15 @@ void TinyLogBlockOutputStream::addStream(const String & name, const IDataType &
addStream(name, *type_arr->getNestedType(), level + 1);
}
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream(storage.files[size_name].data_file.path())));
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(name + "." + it->first, *it->second, level + 1);
}
else
streams.insert(std::make_pair(name, new Stream(storage.files[name].data_file.path())));
}
@ -147,6 +190,24 @@ void TinyLogBlockOutputStream::writeData(const String & name, const IDataType &
writeData(name, *type_arr->getNestedType(), dynamic_cast<const ColumnArray &>(column).getData(), level + 1);
}
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
type_nested->serializeOffsets(column, streams[size_name]->compressed);
const ColumnNested & column_nested = dynamic_cast<const ColumnNested &>(column);
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
{
writeData(
name + "." + it->first,
*it->second,
*column_nested.getData()[i],
level + 1);
}
}
else
type.serializeBinary(column, streams[name]->compressed);
}
@ -205,6 +266,19 @@ void StorageTinyLog::addFile(const String & column_name, const IDataType & type,
addFile(column_name, *type_arr->getNestedType(), level + 1);
}
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{
String size_column_suffix = ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
ColumnData column_data;
files.insert(std::make_pair(column_name + size_column_suffix, column_data));
files[column_name + size_column_suffix].data_file = Poco::File(
path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + size_column_suffix + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION);
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addFile(column_name + "." + it->first, *it->second, level + 1);
}
else
{
ColumnData column_data;