From 090aae8e7b0a52609f4d173b8441c037fef370b3 Mon Sep 17 00:00:00 2001 From: Vyacheslav Alipov Date: Fri, 12 Jul 2013 13:35:05 +0000 Subject: [PATCH] propagated [de]serialization to storages and (in|out)put streams [#CONV-7967] --- dbms/include/DB/DataTypes/DataTypeNested.h | 2 +- .../MergeTree/MergeTreeBlockInputStream.h | 44 ++++++++- .../MergeTree/MergeTreeBlockOutputStream.h | 19 ++++ .../MergeTree/MergedBlockOutputStream.h | 40 ++++++++ .../DataStreams/NativeBlockInputStream.cpp | 24 +++++ .../DataStreams/NativeBlockOutputStream.cpp | 16 ++++ dbms/src/DataTypes/DataTypeNested.cpp | 4 +- dbms/src/Storages/StorageLog.cpp | 91 ++++++++++++++++++- dbms/src/Storages/StorageMergeTree.cpp | 2 + dbms/src/Storages/StorageTinyLog.cpp | 74 +++++++++++++++ 10 files changed, 308 insertions(+), 8 deletions(-) diff --git a/dbms/include/DB/DataTypes/DataTypeNested.h b/dbms/include/DB/DataTypes/DataTypeNested.h index 9470c04a394..e23925da41c 100644 --- a/dbms/include/DB/DataTypes/DataTypeNested.h +++ b/dbms/include/DB/DataTypes/DataTypeNested.h @@ -70,7 +70,7 @@ public: throw Exception("Method getDefault is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); } - const NamesAndTypesListPtr & getNestedTypes() const { return nested; } + const NamesAndTypesListPtr & getNestedTypesList() const { return nested; } const DataTypePtr & getOffsetsType() const { return offsets; } }; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h index 3d565529afc..973a7e6dad7 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h @@ -302,6 +302,19 @@ private: addStream(name, *type_arr->getNestedType(), mark_number, level + 1); } + else if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); + String escaped_size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); + + streams.insert(std::make_pair(size_name, new Stream( + path + escaped_size_name, + mark_number))); + + const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); + for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) + addStream(name + "." + it->first, *it->second, mark_number, level + 1); + } else streams.insert(std::make_pair(name, new Stream( path + escaped_column_name, @@ -316,15 +329,38 @@ private: type_arr->deserializeOffsets( column, streams[name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed, - max_rows_to_read); + max_rows_to_read); if (column.size()) readData( name, *type_arr->getNestedType(), - dynamic_cast(column).getData(), - dynamic_cast(column).getOffsets()[column.size() - 1], - level + 1); + dynamic_cast(column).getData(), + dynamic_cast(column).getOffsets()[column.size() - 1], + level + 1); + } + else if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + type_nested->deserializeOffsets( + column, + streams[name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed, + max_rows_to_read); + + if (column.size()) + { + ColumnNested & column_nested = dynamic_cast(column); + + NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin(); + for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it) + { + readData( + name + "." + it->first, + *it->second, + *column_nested.getData()[i], + column_nested.getOffsets()[column.size() - 1], + level + 1); + } + } } else type.deserializeBinary(column, streams[name]->compressed, max_rows_to_read); diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h index 756e2346e1a..799823f9c65 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h @@ -199,6 +199,25 @@ private: prev_mark += storage.index_granularity; } } + if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + String size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); + + WriteBufferFromFile plain(path + size_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags); + WriteBufferFromFile marks(path + size_name + ".mrk", 4096, flags); + CompressedWriteBuffer compressed(plain); + + size_t prev_mark = 0; + while (prev_mark < size) + { + /// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока) + writeIntBinary(plain.count(), marks); + writeIntBinary(compressed.offset(), marks); + + type_nested->serializeOffsets(column, compressed, prev_mark, storage.index_granularity); + prev_mark += storage.index_granularity; + } + } { WriteBufferFromFile plain(path + escaped_column_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags); diff --git a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h index 1cb1ae75f72..f87f165de8a 100644 --- a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h @@ -131,6 +131,19 @@ private: addStream(name, *type_arr->getNestedType(), level + 1); } + else if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); + String escaped_size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); + + column_streams[size_name] = new ColumnStream( + part_tmp_path + escaped_size_name + ".bin", + part_tmp_path + escaped_size_name + ".mrk"); + + const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); + for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) + addStream(name + "." + it->first, *it->second, level + 1); + } else column_streams[name] = new ColumnStream( part_tmp_path + escaped_column_name + ".bin", @@ -171,6 +184,33 @@ private: prev_mark += limit; } } + if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); + + ColumnStream & stream = *column_streams[size_name]; + + size_t prev_mark = 0; + while (prev_mark < size) + { + size_t limit = 0; + + /// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк. + if (prev_mark == 0 && index_offset != 0) + { + limit = index_offset; + } + else + { + limit = storage.index_granularity; + writeIntBinary(stream.plain.count(), stream.marks); + writeIntBinary(stream.compressed.offset(), stream.marks); + } + + type_nested->serializeOffsets(column, stream.compressed, prev_mark, limit); + prev_mark += limit; + } + } { ColumnStream & stream = *column_streams[name]; diff --git a/dbms/src/DataStreams/NativeBlockInputStream.cpp b/dbms/src/DataStreams/NativeBlockInputStream.cpp index 5956be11de4..c079f0c36b4 100644 --- a/dbms/src/DataStreams/NativeBlockInputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockInputStream.cpp @@ -4,8 +4,10 @@ #include #include +#include #include +#include #include @@ -33,6 +35,28 @@ static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr istr, dynamic_cast(column).getOffsets()[rows - 1]); } + else if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + ColumnNested & column_nested = dynamic_cast(column); + IColumn & offsets_column = *column_nested.getOffsetsColumn(); + type_nested->getOffsetsType()->deserializeBinary(offsets_column, istr, rows); + + if (offsets_column.size() != rows) + throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA); + + if (rows) + { + NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin(); + for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it) + { + readData( + *it->second, + *column_nested.getData()[i], + istr, + column_nested.getOffsets()[rows - 1]); + } + } + } else type.deserializeBinary(column, istr, rows); diff --git a/dbms/src/DataStreams/NativeBlockOutputStream.cpp b/dbms/src/DataStreams/NativeBlockOutputStream.cpp index 5485a1a2c0d..c1d8658657e 100644 --- a/dbms/src/DataStreams/NativeBlockOutputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockOutputStream.cpp @@ -5,8 +5,10 @@ #include #include +#include #include +#include #include @@ -26,6 +28,20 @@ static void writeData(const IDataType & type, const IColumn & column, WriteBuffe if (!dynamic_cast(column).getData().empty()) writeData(*type_arr->getNestedType(), dynamic_cast(column).getData(), ostr); } + else if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + const ColumnNested & column_nested = dynamic_cast(column); + + type_nested->getOffsetsType()->serializeBinary(*column_nested.getOffsetsColumn(), ostr); + + NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin(); + for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it) + { + if (column_nested.getData()[i]->empty()) + break; + writeData(*it->second, *column_nested.getData()[i], ostr); + } + } else type.serializeBinary(column, ostr); } diff --git a/dbms/src/DataTypes/DataTypeNested.cpp b/dbms/src/DataTypes/DataTypeNested.cpp index d432691c535..2b77b8f6c67 100644 --- a/dbms/src/DataTypes/DataTypeNested.cpp +++ b/dbms/src/DataTypes/DataTypeNested.cpp @@ -47,13 +47,13 @@ std::string DataTypeNested::getName() const void DataTypeNested::serializeBinary(const Field & field, WriteBuffer & ostr) const { - throw Exception("Method serializeBinary(const Field & field, WriteBuffer & ostr) is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); + throw Exception("Method serializeBinary(const Field &, WriteBuffer &) is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); } void DataTypeNested::deserializeBinary(Field & field, ReadBuffer & istr) const { - throw Exception("Method deserializeBinary(Field & field, ReadBuffer & istr) is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); + throw Exception("Method deserializeBinary(Field &, ReadBuffer &) is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); } diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index a930085ede4..f134a31b65b 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -10,8 +10,10 @@ #include #include +#include #include +#include #include @@ -94,6 +96,19 @@ void LogBlockInputStream::addStream(const String & name, const IDataType & type, addStream(name, *type_arr->getNestedType(), level + 1); } + else if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); + streams.insert(std::make_pair(size_name, new Stream( + storage.files[size_name].data_file.path(), + mark_number + ? storage.files[size_name].marks[mark_number].offset + : 0))); + + const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); + for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) + addStream(name + "." + it->first, *it->second, level + 1); + } else streams.insert(std::make_pair(name, new Stream( storage.files[name].data_file.path(), @@ -121,6 +136,29 @@ void LogBlockInputStream::readData(const String & name, const IDataType & type, dynamic_cast(column).getOffsets()[column.size() - 1], level + 1); } + else if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + type_nested->deserializeOffsets( + column, + streams[name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed, + max_rows_to_read); + + if (column.size()) + { + ColumnNested & column_nested = dynamic_cast(column); + + NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin(); + for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it) + { + readData( + name + "." + it->first, + *it->second, + *column_nested.getData()[i], + column_nested.getOffsets()[column.size() - 1], + level + 1); + } + } + } else type.deserializeBinary(column, streams[name]->compressed, max_rows_to_read); } @@ -161,6 +199,16 @@ void LogBlockOutputStream::addStream(const String & name, const IDataType & type addStream(name, *type_arr->getNestedType(), level + 1); } + else if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); + streams.insert(std::make_pair(size_name, new Stream( + storage.files[size_name].data_file.path()))); + + const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); + for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) + addStream(name + "." + it->first, *it->second, level + 1); + } else streams.insert(std::make_pair(name, new Stream( storage.files[name].data_file.path()))); @@ -185,6 +233,32 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type writeData(name, *type_arr->getNestedType(), dynamic_cast(column).getData(), out_marks, level + 1); } + else if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); + + Mark mark; + mark.rows = (storage.files[size_name].marks.empty() ? 0 : storage.files[size_name].marks.back().rows) + column.size(); + mark.offset = streams[size_name]->plain_offset + streams[size_name]->plain.count(); + + out_marks.push_back(std::make_pair(storage.files[size_name].column_index, mark)); + + type_nested->serializeOffsets(column, streams[size_name]->compressed); + streams[size_name]->compressed.next(); + + const ColumnNested & column_nested = dynamic_cast(column); + + NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin(); + for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it) + { + writeData( + name + "." + it->first, + *it->second, + *column_nested.getData()[i], + out_marks, + level + 1); + } + } else { Mark mark; @@ -249,7 +323,7 @@ StoragePtr StorageLog::create(const std::string & path_, const std::string & nam void StorageLog::addFile(const String & column_name, const IDataType & type, size_t level) { if (files.end() != files.find(column_name)) - throw Exception("Duplicate column with name " + column_name + " in constructor of StorageTinyLog.", + throw Exception("Duplicate column with name " + column_name + " in constructor of StorageLog.", ErrorCodes::DUPLICATE_COLUMN); if (const DataTypeArray * type_arr = dynamic_cast(&type)) @@ -265,6 +339,21 @@ void StorageLog::addFile(const String & column_name, const IDataType & type, siz addFile(column_name, *type_arr->getNestedType(), level + 1); } + else if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + String size_column_suffix = ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); + + ColumnData & column_data = files.insert(std::make_pair(column_name + size_column_suffix, ColumnData())).first->second; + column_data.column_index = column_names.size(); + column_data.data_file = Poco::File( + path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + size_column_suffix + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION); + + column_names.push_back(column_name + size_column_suffix); + + const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); + for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) + addFile(column_name + "." + it->first, *it->second, level + 1); + } else { ColumnData & column_data = files.insert(std::make_pair(column_name, ColumnData())).first->second; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index b26117e39ed..33634f6806e 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -17,9 +17,11 @@ #include #include +#include #include #include +#include #include #include diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index c96a2effb6f..02950520733 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -10,8 +10,10 @@ #include #include +#include #include +#include #include @@ -83,6 +85,15 @@ void TinyLogBlockInputStream::addStream(const String & name, const IDataType & t addStream(name, *type_arr->getNestedType(), level + 1); } + else if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); + streams.insert(std::make_pair(size_name, new Stream(storage.files[size_name].data_file.path()))); + + const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); + for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) + addStream(name + "." + it->first, *it->second, level + 1); + } else streams.insert(std::make_pair(name, new Stream(storage.files[name].data_file.path()))); } @@ -108,6 +119,29 @@ void TinyLogBlockInputStream::readData(const String & name, const IDataType & ty throw Exception("Cannot read array data for all offsets", ErrorCodes::CANNOT_READ_ALL_DATA); } } + else if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + type_nested->deserializeOffsets( + column, + streams[name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed, + limit); + + if (column.size()) + { + ColumnNested & column_nested = dynamic_cast(column); + + NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin(); + for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it) + { + readData( + name + "." + it->first, + *it->second, + *column_nested.getData()[i], + column_nested.getOffsets()[column.size() - 1], + level + 1); + } + } + } else type.deserializeBinary(column, streams[name]->compressed, limit); } @@ -131,6 +165,15 @@ void TinyLogBlockOutputStream::addStream(const String & name, const IDataType & addStream(name, *type_arr->getNestedType(), level + 1); } + else if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); + streams.insert(std::make_pair(size_name, new Stream(storage.files[size_name].data_file.path()))); + + const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); + for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) + addStream(name + "." + it->first, *it->second, level + 1); + } else streams.insert(std::make_pair(name, new Stream(storage.files[name].data_file.path()))); } @@ -147,6 +190,24 @@ void TinyLogBlockOutputStream::writeData(const String & name, const IDataType & writeData(name, *type_arr->getNestedType(), dynamic_cast(column).getData(), level + 1); } + else if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); + + type_nested->serializeOffsets(column, streams[size_name]->compressed); + + const ColumnNested & column_nested = dynamic_cast(column); + + NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin(); + for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it) + { + writeData( + name + "." + it->first, + *it->second, + *column_nested.getData()[i], + level + 1); + } + } else type.serializeBinary(column, streams[name]->compressed); } @@ -205,6 +266,19 @@ void StorageTinyLog::addFile(const String & column_name, const IDataType & type, addFile(column_name, *type_arr->getNestedType(), level + 1); } + else if (const DataTypeNested * type_nested = dynamic_cast(&type)) + { + String size_column_suffix = ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); + + ColumnData column_data; + files.insert(std::make_pair(column_name + size_column_suffix, column_data)); + files[column_name + size_column_suffix].data_file = Poco::File( + path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + size_column_suffix + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION); + + const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); + for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) + addFile(column_name + "." + it->first, *it->second, level + 1); + } else { ColumnData column_data;