From 1f3aeb066a29b79c53c9a754c03aed1b67a544da Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 3 Jan 2017 02:08:09 +0300 Subject: [PATCH] Renamed methods for bulk binary serialization (continued) [#METR-2944]. --- dbms/include/DB/DataTypes/IDataType.h | 11 ++++++++--- dbms/include/DB/Interpreters/Aggregator.h | 4 ++-- dbms/src/DataStreams/NativeBlockInputStream.cpp | 2 +- dbms/src/DataStreams/NativeBlockOutputStream.cpp | 2 +- dbms/src/DataTypes/tests/data_type_string.cpp | 2 +- dbms/src/DataTypes/tests/data_types_number_fixed.cpp | 2 +- dbms/src/Storages/MergeTree/MergeTreeReader.cpp | 2 +- .../Storages/MergeTree/MergedBlockOutputStream.cpp | 2 +- dbms/src/Storages/StorageLog.cpp | 6 +++--- dbms/src/Storages/StorageTinyLog.cpp | 8 ++++---- 10 files changed, 23 insertions(+), 18 deletions(-) diff --git a/dbms/include/DB/DataTypes/IDataType.h b/dbms/include/DB/DataTypes/IDataType.h index dd678ab0f83..d5170b5cb01 100644 --- a/dbms/include/DB/DataTypes/IDataType.h +++ b/dbms/include/DB/DataTypes/IDataType.h @@ -61,9 +61,14 @@ public: virtual void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const = 0; /** Serialization/deserialization of individual values. - * For complex data types (like arrays) it may differ from bulk serde. - * For example, if you serialize single array, it will be represented as its size and values in single contiguous stream, - * but if you serialize column with arrays as bulk, then sizes and values will be written to separate streams. + * + * These are helper methods for implementation of various formats to input/output for user (like CSV, JSON, etc.). + * There is no one-to-one correspondence between formats and these methods. + * For example, TabSeparated and Pretty formats could use same helper method serializeTextEscaped. + * + * For complex data types (like arrays) binary serde for individual values may differ from bulk serde. + * For example, if you serialize single array, it will be represented as its size and elements in single contiguous stream, + * but if you bulk serialize column with arrays, then sizes and elements will be written to separate streams. */ /// Для бинарной сериализации есть два варианта. Один вариант работает с Field. diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index 4318a693889..2fb72ed6f34 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -597,7 +597,7 @@ private: { ColumnNullable & nullable_col = static_cast(*key_columns[i]); observed_column = nullable_col.getNestedColumn().get(); - ColumnUInt8 & null_map = static_cast(*nullable_col.getNullMapColumn()); + ColumnUInt8 & null_map = nullable_col.getNullMapConcreteColumn(); size_t bucket = i / 8; size_t offset = i % 8; @@ -625,7 +625,7 @@ private: { ColumnNullable & nullable_col = static_cast(*key_columns[i]); observed_column = nullable_col.getNestedColumn().get(); - null_map = static_cast(nullable_col.getNullMapColumn().get()); + null_map = &nullable_col.getNullMapConcreteColumn(); } else { diff --git a/dbms/src/DataStreams/NativeBlockInputStream.cpp b/dbms/src/DataStreams/NativeBlockInputStream.cpp index 644bbde8980..05ee93bf1e6 100644 --- a/dbms/src/DataStreams/NativeBlockInputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockInputStream.cpp @@ -54,7 +54,7 @@ void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ColumnNullable & nullable_col = static_cast(column); IColumn & nested_col = *nullable_col.getNestedColumn(); - IColumn & null_map = *nullable_col.getNullMapColumn(); + IColumn & null_map = nullable_col.getNullMapConcreteColumn(); DataTypeUInt8{}.deserializeBinaryBulk(null_map, istr, rows, 0); readData(nested_type, nested_col, istr, rows); diff --git a/dbms/src/DataStreams/NativeBlockOutputStream.cpp b/dbms/src/DataStreams/NativeBlockOutputStream.cpp index bc3b60859d6..1a659d28f5e 100644 --- a/dbms/src/DataStreams/NativeBlockOutputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockOutputStream.cpp @@ -61,7 +61,7 @@ void NativeBlockOutputStream::writeData(const IDataType & type, const ColumnPtr const ColumnNullable & nullable_col = static_cast(*full_column.get()); const ColumnPtr & nested_col = nullable_col.getNestedColumn(); - const IColumn & null_map = *nullable_col.getNullMapColumn(); + const IColumn & null_map = nullable_col.getNullMapConcreteColumn(); DataTypeUInt8{}.serializeBinaryBulk(null_map, ostr, offset, limit); writeData(nested_type, nested_col, ostr, offset, limit); diff --git a/dbms/src/DataTypes/tests/data_type_string.cpp b/dbms/src/DataTypes/tests/data_type_string.cpp index e852b3e2968..eb10edf7478 100644 --- a/dbms/src/DataTypes/tests/data_type_string.cpp +++ b/dbms/src/DataTypes/tests/data_type_string.cpp @@ -39,7 +39,7 @@ try WriteBufferFromOStream out_buf(ostr); stopwatch.restart(); - data_type.serializeBinaryBulk(*column, out_buf); + data_type.serializeBinaryBulk(*column, out_buf, 0, 0); stopwatch.stop(); std::cout << "Writing, elapsed: " << stopwatch.elapsedSeconds() << std::endl; diff --git a/dbms/src/DataTypes/tests/data_types_number_fixed.cpp b/dbms/src/DataTypes/tests/data_types_number_fixed.cpp index e7e7d6132e9..fb5b73e9a8e 100644 --- a/dbms/src/DataTypes/tests/data_types_number_fixed.cpp +++ b/dbms/src/DataTypes/tests/data_types_number_fixed.cpp @@ -27,7 +27,7 @@ int main(int argc, char ** argv) WriteBufferFromOStream out_buf(ostr); stopwatch.restart(); - data_type.serializeBinaryBulk(*column, out_buf); + data_type.serializeBinaryBulk(*column, out_buf, 0, 0); stopwatch.stop(); std::cout << "Elapsed: " << stopwatch.elapsedSeconds() << std::endl; diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp index 2b7f7dc2370..1f5bd7fb53f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp @@ -416,7 +416,7 @@ void MergeTreeReader::readData(const String & name, const IDataType & type, ICol Stream & stream = *(streams.at(filename)); stream.seekToMark(from_mark); - IColumn & col8 = *(nullable_col.getNullMapColumn()); + IColumn & col8 = nullable_col.getNullMapConcreteColumn(); DataTypeUInt8{}.deserializeBinaryBulk(col8, *stream.data_buffer, max_rows_to_read, 0); /// Then read data. diff --git a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 52b121eb158..9073e5a31d1 100644 --- a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -170,7 +170,7 @@ void IMergedBlockOutputStream::writeDataImpl( writeIntBinary(stream.compressed.offset(), stream.marks); } - DataTypeUInt8{}.serializeBinaryBulk(*(nullable_col.getNullMapColumn()), stream.compressed); + DataTypeUInt8{}.serializeBinaryBulk(nullable_col.getNullMapConcreteColumn(), stream.compressed, 0, 0); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего. stream.compressed.nextIfAtEnd(); diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index abd7f88cdab..f904b100282 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -359,7 +359,7 @@ void LogBlockInputStream::readData(const String & name, const IDataType & type, ColumnNullable & nullable_col = static_cast(column); IColumn & nested_col = *nullable_col.getNestedColumn(); - DataTypeUInt8{}.deserializeBinaryBulk(*nullable_col.getNullMapColumn(), + DataTypeUInt8{}.deserializeBinaryBulk(nullable_col.getNullMapConcreteColumn(), streams[name + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION]->compressed, max_rows_to_read, 0); /// Then read data. readData(name, nested_type, nested_col, max_rows_to_read, level, read_offsets); @@ -490,7 +490,7 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type out_null_marks.emplace_back(storage.files[filename].column_index, mark); - DataTypeUInt8{}.serializeBinaryBulk(*nullable_col.getNullMapColumn(), streams[filename]->compressed); + DataTypeUInt8{}.serializeBinaryBulk(nullable_col.getNullMapConcreteColumn(), streams[filename]->compressed, 0, 0); streams[filename]->compressed.next(); /// Then write data. @@ -526,7 +526,7 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type out_marks.push_back(std::make_pair(storage.files[name].column_index, mark)); - type.serializeBinaryBulk(column, streams[name]->compressed); + type.serializeBinaryBulk(column, streams[name]->compressed, 0, 0); streams[name]->compressed.next(); } } diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index 8501a83074d..214dc8de326 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -300,7 +300,7 @@ void TinyLogBlockInputStream::readData(const String & name, const IDataType & ty IColumn & nested_col = *nullable_col.getNestedColumn(); /// First read from the null map. - DataTypeUInt8{}.deserializeBinaryBulk(*nullable_col.getNullMapColumn(), + DataTypeUInt8{}.deserializeBinaryBulk(nullable_col.getNullMapConcreteColumn(), streams[name + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION]->compressed, limit, 0); /// Then read data. @@ -372,8 +372,8 @@ void TinyLogBlockOutputStream::writeData(const String & name, const IDataType & const ColumnNullable & nullable_col = static_cast(column); const IColumn & nested_col = *nullable_col.getNestedColumn(); - DataTypeUInt8{}.serializeBinaryBulk(*nullable_col.getNullMapColumn(), - streams[name + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION]->compressed); + DataTypeUInt8{}.serializeBinaryBulk(nullable_col.getNullMapConcreteColumn(), + streams[name + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION]->compressed, 0, 0); /// Then write data. writeData(name, nested_type, nested_col, offset_columns, level); @@ -394,7 +394,7 @@ void TinyLogBlockOutputStream::writeData(const String & name, const IDataType & writeData(name, *type_arr->getNestedType(), typeid_cast(column).getData(), offset_columns, level + 1); } else - type.serializeBinaryBulk(column, streams[name]->compressed); + type.serializeBinaryBulk(column, streams[name]->compressed, 0, 0); }