diff --git a/dbms/include/DB/Storages/StorageTinyLog.h b/dbms/include/DB/Storages/StorageTinyLog.h index b85a7e3a1fe..b7424ce43b7 100644 --- a/dbms/include/DB/Storages/StorageTinyLog.h +++ b/dbms/include/DB/Storages/StorageTinyLog.h @@ -75,6 +75,7 @@ private: size_t max_compress_block_size; Files_t files; + Files_t null_files; FileChecker file_checker; @@ -91,6 +92,7 @@ private: size_t max_compress_block_size_); void addFile(const String & column_name, const IDataType & type, size_t level = 0); + void addNullFile(const String & column_name); }; } diff --git a/dbms/src/DataStreams/NativeBlockInputStream.cpp b/dbms/src/DataStreams/NativeBlockInputStream.cpp index f5305755863..b767e3022c1 100644 --- a/dbms/src/DataStreams/NativeBlockInputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockInputStream.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -24,21 +25,6 @@ namespace ErrorCodes extern const int CANNOT_READ_ALL_DATA; } -namespace -{ - -void deserializeNullValuesByteMap(ColumnNullable & nullable_col, ReadBuffer & istr, size_t limit) -{ - ColumnUInt8 & null_map = static_cast(*(nullable_col.getNullValuesByteMap().get())); - auto & x = null_map.getData(); - - x.resize(limit); - size_t read_count = istr.readBig(reinterpret_cast(&x[0]), limit); - x.resize(read_count); -} - -} - NativeBlockInputStream::NativeBlockInputStream( ReadBuffer & istr_, UInt64 server_revision_, bool use_index_, @@ -68,7 +54,9 @@ void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ColumnNullable & nullable_col = static_cast(column); IColumn & nested_col = *(nullable_col.getNestedColumn().get()); - deserializeNullValuesByteMap(nullable_col, istr, rows); + ColumnUInt8 & null_map = static_cast(*(nullable_col.getNullValuesByteMap().get())); + DataTypeUInt8{}.deserializeBinary(null_map, istr, rows, 0); + readData(nested_type, nested_col, istr, rows); return; diff --git a/dbms/src/DataStreams/NativeBlockOutputStream.cpp b/dbms/src/DataStreams/NativeBlockOutputStream.cpp index 64531a59694..a4524901cfc 100644 --- a/dbms/src/DataStreams/NativeBlockOutputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockOutputStream.cpp @@ -11,6 +11,7 @@ #include #include +#include #include #include @@ -19,24 +20,6 @@ namespace DB { -namespace -{ - -void serializeNullValuesByteMap(const ColumnNullable & nullable_col, WriteBuffer & ostr, size_t offset, size_t limit) -{ - const IColumn & nested_col = *(nullable_col.getNestedColumn().get()); - const ColumnUInt8 & content = static_cast(*(nullable_col.getNullValuesByteMap().get())); - const auto & x = content.getData(); - - size_t size = nested_col.size(); - if ((limit == 0) || (offset + limit) > size) - limit = size - offset; - - ostr.write(reinterpret_cast(&x[offset]), limit); -} - -} - NativeBlockOutputStream::NativeBlockOutputStream( WriteBuffer & ostr_, UInt64 client_revision_, WriteBuffer * index_ostr_, size_t initial_size_of_file_) @@ -72,7 +55,9 @@ void NativeBlockOutputStream::writeData(const IDataType & type, const ColumnPtr const ColumnNullable & nullable_col = static_cast(*full_column.get()); const ColumnPtr & nested_col = nullable_col.getNestedColumn(); - serializeNullValuesByteMap(nullable_col, ostr, offset, limit); + const ColumnUInt8 & content = static_cast(*(nullable_col.getNullValuesByteMap().get())); + DataTypeUInt8{}.serializeBinary(content, ostr, offset, limit); + writeData(nested_type, nested_col, ostr, offset, limit); } else if (const DataTypeArray * type_arr = typeid_cast(&type)) diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index 7722fbef4f6..cde457a9832 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -16,16 +16,20 @@ #include #include +#include +#include #include #include #include +#include #include #include #define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin" +#define DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION ".null" namespace DB @@ -73,8 +77,10 @@ private: using FileStreams = std::map >; FileStreams streams; + FileStreams null_streams; void addStream(const String & name, const IDataType & type, size_t level = 0); + void addNullStream(const String & name); void readData(const String & name, const IDataType & type, IColumn & column, size_t limit, size_t level = 0, bool read_offsets = true); }; @@ -128,10 +134,12 @@ private: using FileStreams = std::map >; FileStreams streams; + FileStreams null_streams; using OffsetColumns = std::set; void addStream(const String & name, const IDataType & type, size_t level = 0); + void addNullStream(const String & name); void writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns, size_t level = 0); }; @@ -161,6 +169,7 @@ Block TinyLogBlockInputStream::readImpl() */ finished = true; streams.clear(); + null_streams.clear(); return res; } @@ -220,6 +229,7 @@ Block TinyLogBlockInputStream::readImpl() { finished = true; streams.clear(); + null_streams.clear(); } return res; @@ -228,9 +238,16 @@ Block TinyLogBlockInputStream::readImpl() void TinyLogBlockInputStream::addStream(const String & name, const IDataType & type, size_t level) { - /// Для массивов используются отдельные потоки для размеров. - if (const DataTypeArray * type_arr = typeid_cast(&type)) + if (type.isNullable()) { + const DataTypeNullable & nullable_type = static_cast(type); + const IDataType & nested_type = *(nullable_type.getNestedType().get()); + addNullStream(name); + addStream(name, nested_type, level); + } + else if (const DataTypeArray * type_arr = typeid_cast(&type)) + { + /// Для массивов используются отдельные потоки для размеров. String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); if (!streams.count(size_name)) streams.emplace(size_name, std::unique_ptr(new Stream(storage.files[size_name].data_file.path(), max_read_buffer_size))); @@ -242,11 +259,31 @@ void TinyLogBlockInputStream::addStream(const String & name, const IDataType & t } +void TinyLogBlockInputStream::addNullStream(const String & name) +{ + null_streams[name].reset(new Stream{storage.null_files[name].data_file.path(), max_read_buffer_size}); +} + + void TinyLogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t limit, size_t level, bool read_offsets) { - /// Для массивов требуется сначала десериализовать размеры, а потом значения. - if (const DataTypeArray * type_arr = typeid_cast(&type)) + if (type.isNullable()) { + const DataTypeNullable & nullable_type = static_cast(type); + const IDataType & nested_type = *(nullable_type.getNestedType().get()); + + if (!column.isNullable()) + throw Exception{"Column not nullable!!!", ErrorCodes::LOGICAL_ERROR}; + + ColumnNullable & nullable_col = static_cast(column); + IColumn & nested_col = *(nullable_col.getNestedColumn().get()); + + DataTypeUInt8{}.deserializeBinary(*(nullable_col.getNullValuesByteMap().get()), null_streams[name]->compressed, limit, 0); + readData(name, nested_type, nested_col, limit, level, read_offsets); + } + else if (const DataTypeArray * type_arr = typeid_cast(&type)) + { + /// Для массивов требуется сначала десериализовать размеры, а потом значения. if (read_offsets) { type_arr->deserializeOffsets( @@ -272,9 +309,16 @@ void TinyLogBlockInputStream::readData(const String & name, const IDataType & ty void TinyLogBlockOutputStream::addStream(const String & name, const IDataType & type, size_t level) { - /// Для массивов используются отдельные потоки для размеров. - if (const DataTypeArray * type_arr = typeid_cast(&type)) + if (type.isNullable()) { + const DataTypeNullable & nullable_type = static_cast(type); + const IDataType & nested_type = *(nullable_type.getNestedType().get()); + addNullStream(name); + addStream(name, nested_type, level); + } + else if (const DataTypeArray * type_arr = typeid_cast(&type)) + { + /// Для массивов используются отдельные потоки для размеров. String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); if (!streams.count(size_name)) streams.emplace(size_name, std::unique_ptr(new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size))); @@ -286,12 +330,29 @@ void TinyLogBlockOutputStream::addStream(const String & name, const IDataType & } +void TinyLogBlockOutputStream::addNullStream(const String & name) +{ + null_streams[name].reset(new Stream{storage.null_files[name].data_file.path(), storage.max_compress_block_size}); +} + + void TinyLogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns, size_t level) { - /// Для массивов требуется сначала сериализовать размеры, а потом значения. - if (const DataTypeArray * type_arr = typeid_cast(&type)) + if (type.isNullable()) { + const DataTypeNullable & nullable_type = static_cast(type); + const IDataType & nested_type = *(nullable_type.getNestedType().get()); + + const ColumnNullable & nullable_col = static_cast(column); + const IColumn & nested_col = *(nullable_col.getNestedColumn().get()); + + DataTypeUInt8{}.serializeBinary(*(nullable_col.getNullValuesByteMap().get()), null_streams[name]->compressed); + writeData(name, nested_type, nested_col, offset_columns, level); + } + else if (const DataTypeArray * type_arr = typeid_cast(&type)) + { + /// Для массивов требуется сначала сериализовать размеры, а потом значения. String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); if (offset_columns.count(size_name) == 0) @@ -319,6 +380,9 @@ void TinyLogBlockOutputStream::writeSuffix() for (FileStreams::iterator it = streams.begin(); it != streams.end(); ++it) it->second->finalize(); + for (auto & it : null_streams) + it.second->finalize(); + std::vector column_files; for (auto & pair : streams) column_files.push_back(storage.files[pair.first].data_file); @@ -326,6 +390,7 @@ void TinyLogBlockOutputStream::writeSuffix() storage.file_checker.update(column_files.begin(), column_files.end()); streams.clear(); + null_streams.clear(); } @@ -398,7 +463,15 @@ void StorageTinyLog::addFile(const String & column_name, const IDataType & type, throw Exception("Duplicate column with name " + column_name + " in constructor of StorageTinyLog.", ErrorCodes::DUPLICATE_COLUMN); - if (const DataTypeArray * type_arr = typeid_cast(&type)) + if (type.isNullable()) + { + const DataTypeNullable & nullable_type = static_cast(type); + const IDataType & actual_type = *(nullable_type.getNestedType().get()); + + addNullFile(column_name); + addFile(column_name, actual_type, level); + } + else if (const DataTypeArray * type_arr = typeid_cast(&type)) { String size_column_suffix = ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); String size_name = DataTypeNested::extractNestedTableName(column_name) + size_column_suffix; @@ -423,6 +496,14 @@ void StorageTinyLog::addFile(const String & column_name, const IDataType & type, } +void StorageTinyLog::addNullFile(const String & column_name) +{ + ColumnData & column_data = null_files.emplace(column_name, ColumnData{}).first->second; + column_data.data_file = Poco::File{ + path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION}; +} + + void StorageTinyLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) { /// Переименовываем директорию с данными. @@ -434,6 +515,9 @@ void StorageTinyLog::rename(const String & new_path_to_db, const String & new_da for (Files_t::iterator it = files.begin(); it != files.end(); ++it) it->second.data_file = Poco::File(path + escapeForFileName(name) + '/' + Poco::Path(it->second.data_file.path()).getFileName()); + + for (Files_t::iterator it = null_files.begin(); it != null_files.end(); ++it) + it->second.data_file = Poco::File(path + escapeForFileName(name) + '/' + Poco::Path(it->second.data_file.path()).getFileName()); } @@ -462,8 +546,16 @@ BlockOutputStreamPtr StorageTinyLog::write( void StorageTinyLog::drop() { for (Files_t::iterator it = files.begin(); it != files.end(); ++it) + { if (it->second.data_file.exists()) it->second.data_file.remove(); + } + + for (Files_t::iterator it = null_files.begin(); it != null_files.end(); ++it) + { + if (it->second.data_file.exists()) + it->second.data_file.remove(); + } } bool StorageTinyLog::checkData() const