From de131379582ee13a3a8faf397b15fadf7bf3b37a Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 11 Dec 2012 20:31:39 +0000 Subject: [PATCH] dbms: changed signature of IDataType::serializeBinary [#CONV-2944]. --- .../DB/DataTypes/DataTypeAggregateFunction.h | 2 +- dbms/include/DB/DataTypes/DataTypeArray.h | 4 +- .../DB/DataTypes/DataTypeFixedString.h | 2 +- dbms/include/DB/DataTypes/DataTypeString.h | 2 +- dbms/include/DB/DataTypes/IDataType.h | 13 +-- dbms/include/DB/DataTypes/IDataTypeDummy.h | 2 +- .../DB/DataTypes/IDataTypeNumberFixed.h | 18 +-- .../DB/DataTypes/IDataTypeNumberVariable.h | 15 +-- .../DataTypes/DataTypeAggregateFunction.cpp | 17 ++- dbms/src/DataTypes/DataTypeArray.cpp | 39 ++----- dbms/src/DataTypes/DataTypeFixedString.cpp | 17 +-- dbms/src/DataTypes/DataTypeString.cpp | 14 +-- dbms/src/Storages/StorageMergeTree.cpp | 110 ++++++++++-------- 13 files changed, 111 insertions(+), 144 deletions(-) diff --git a/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h b/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h index 30533193bb8..7642ace0a5e 100644 --- a/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h +++ b/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h @@ -42,7 +42,7 @@ public: void serializeBinary(const Field & field, WriteBuffer & ostr) const; void deserializeBinary(Field & field, ReadBuffer & istr) const; - void serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback = WriteCallback()) const; + void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const; void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const; void serializeText(const Field & field, WriteBuffer & ostr) const; void deserializeText(Field & field, ReadBuffer & istr) const; diff --git a/dbms/include/DB/DataTypes/DataTypeArray.h b/dbms/include/DB/DataTypes/DataTypeArray.h index 796177d2515..5b87d851f89 100644 --- a/dbms/include/DB/DataTypes/DataTypeArray.h +++ b/dbms/include/DB/DataTypes/DataTypeArray.h @@ -50,7 +50,7 @@ public: */ /** Записать только значения, без размеров. Вызывающая сторона также должна куда-нибудь записать смещения. */ - void serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback = WriteCallback()) const; + void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const; /** Прочитать только значения, без размеров. * При этом, в column уже заранее должны быть считаны все размеры. @@ -58,7 +58,7 @@ public: void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const; /** Записать размеры. */ - void serializeOffsets(const IColumn & column, WriteBuffer & ostr, WriteCallback callback = WriteCallback()) const; + void serializeOffsets(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const; /** Прочитать размеры. Вызывайте этот метод перед чтением значений. */ void deserializeOffsets(IColumn & column, ReadBuffer & istr, size_t limit) const; diff --git a/dbms/include/DB/DataTypes/DataTypeFixedString.h b/dbms/include/DB/DataTypes/DataTypeFixedString.h index 68734bf6b71..d22e6110526 100644 --- a/dbms/include/DB/DataTypes/DataTypeFixedString.h +++ b/dbms/include/DB/DataTypes/DataTypeFixedString.h @@ -34,7 +34,7 @@ public: void serializeBinary(const Field & field, WriteBuffer & ostr) const; void deserializeBinary(Field & field, ReadBuffer & istr) const; - void serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback = WriteCallback()) const; + void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const; void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const; void serializeText(const Field & field, WriteBuffer & ostr) const; diff --git a/dbms/include/DB/DataTypes/DataTypeString.h b/dbms/include/DB/DataTypes/DataTypeString.h index 1778539aff6..b0849fbfaff 100644 --- a/dbms/include/DB/DataTypes/DataTypeString.h +++ b/dbms/include/DB/DataTypes/DataTypeString.h @@ -28,7 +28,7 @@ public: void serializeBinary(const Field & field, WriteBuffer & ostr) const; void deserializeBinary(Field & field, ReadBuffer & istr) const; - void serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback = WriteCallback()) const; + void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const; void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const; void serializeText(const Field & field, WriteBuffer & ostr) const; diff --git a/dbms/include/DB/DataTypes/IDataType.h b/dbms/include/DB/DataTypes/IDataType.h index 65dea527271..0eda1fe4297 100644 --- a/dbms/include/DB/DataTypes/IDataType.h +++ b/dbms/include/DB/DataTypes/IDataType.h @@ -38,14 +38,13 @@ public: virtual void deserializeBinary(Field & field, ReadBuffer & istr) const = 0; /** Сериализация столбца. - * Можно передать callback, который будет вызван для некоторых значений. - * callback вызывается для 0-го значения и возвращает индекс следующего значения, - * для которого его следует вызвать. - * Это может быть использовано для одновременной записи индексного файла. + * offset и limit используются, чтобы сериализовать часть столбца. + * limit = 0 - означает - не ограничено. + * offset не должен быть больше размера столбца. + * offset + limit может быть больше размера столбца + * - в этом случае, столбец сериализуется до конца. */ - typedef boost::function WriteCallback; - virtual void serializeBinary(const IColumn & column, WriteBuffer & ostr, - WriteCallback callback = WriteCallback()) const = 0; + virtual void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const = 0; /** Считать не более limit значений. */ virtual void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const = 0; diff --git a/dbms/include/DB/DataTypes/IDataTypeDummy.h b/dbms/include/DB/DataTypes/IDataTypeDummy.h index a6fb79cac3a..dc16ba25fc2 100644 --- a/dbms/include/DB/DataTypes/IDataTypeDummy.h +++ b/dbms/include/DB/DataTypes/IDataTypeDummy.h @@ -24,7 +24,7 @@ public: void deserializeBinary(Field & field, ReadBuffer & istr) const { throwNoSerialization(); } void serializeBinary(const IColumn & column, WriteBuffer & ostr, - WriteCallback callback = WriteCallback()) const { throwNoSerialization(); } + size_t offset = 0, size_t limit = 0) const { throwNoSerialization(); } void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const { throwNoSerialization(); } diff --git a/dbms/include/DB/DataTypes/IDataTypeNumberFixed.h b/dbms/include/DB/DataTypes/IDataTypeNumberFixed.h index 88216bc46d8..b97421d5f9a 100644 --- a/dbms/include/DB/DataTypes/IDataTypeNumberFixed.h +++ b/dbms/include/DB/DataTypes/IDataTypeNumberFixed.h @@ -21,7 +21,6 @@ namespace DB template class IDataTypeNumberFixed : public IDataTypeNumber { - typedef IDataType::WriteCallback WriteCallback; public: /** Формат платформозависимый (зависит от представления данных в памяти). */ @@ -40,25 +39,16 @@ public: field = typename NearestFieldType::Type(x); } - void serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback = WriteCallback()) const + void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const { const typename ColumnType::Container_t & x = dynamic_cast(column).getData(); - size_t prev_callback_point = 0; - size_t next_callback_point = 0; size_t size = x.size(); - while (next_callback_point < size) - { - next_callback_point = callback ? callback() : size; - if (next_callback_point > size) - next_callback_point = size; - - ostr.write(reinterpret_cast(&x[prev_callback_point]), - sizeof(typename ColumnType::value_type) * (next_callback_point - prev_callback_point)); + if (limit == 0 || offset + limit > size) + limit = size - offset; - prev_callback_point = next_callback_point; - } + ostr.write(reinterpret_cast(&x[offset]), sizeof(typename ColumnType::value_type) * limit); } void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const diff --git a/dbms/include/DB/DataTypes/IDataTypeNumberVariable.h b/dbms/include/DB/DataTypes/IDataTypeNumberVariable.h index 608f71586df..628ac7e735f 100644 --- a/dbms/include/DB/DataTypes/IDataTypeNumberVariable.h +++ b/dbms/include/DB/DataTypes/IDataTypeNumberVariable.h @@ -24,7 +24,6 @@ namespace DB template class IDataTypeNumberVariable : public IDataTypeNumber { - typedef IDataType::WriteCallback WriteCallback; public: void serializeBinary(const Field & field, WriteBuffer & ostr) const { @@ -36,19 +35,17 @@ public: readVarT(static_cast(boost::get(field)), istr); } - void serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback = WriteCallback()) const + void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const { const typename ColumnType::Container_t & x = dynamic_cast(column).getData(); size_t size = x.size(); + + size_t end = limit && offset + limit < size + ? offset + limit + : size; - size_t next_callback_point = callback ? callback() : 0; - for (size_t i = 0; i < size; ++i) - { - if (next_callback_point && i == next_callback_point) - next_callback_point = callback(); - + for (size_t i = offset; i < end; ++i) writeVarT(x[i], ostr); - } } void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const diff --git a/dbms/src/DataTypes/DataTypeAggregateFunction.cpp b/dbms/src/DataTypes/DataTypeAggregateFunction.cpp index 84093272a4c..341c7f713b3 100644 --- a/dbms/src/DataTypes/DataTypeAggregateFunction.cpp +++ b/dbms/src/DataTypes/DataTypeAggregateFunction.cpp @@ -26,20 +26,19 @@ void DataTypeAggregateFunction::deserializeBinary(Field & field, ReadBuffer & is field = value; } -void DataTypeAggregateFunction::serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback) const +void DataTypeAggregateFunction::serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const { const ColumnAggregateFunction & real_column = dynamic_cast(column); const ColumnAggregateFunction::Container_t & vec = real_column.getData(); - size_t next_callback_point = callback ? callback() : 0; - size_t i = 0; - for (ColumnAggregateFunction::Container_t::const_iterator it = vec.begin(); it != vec.end(); ++it, ++i) - { - if (next_callback_point && i == next_callback_point) - next_callback_point = callback(); - + ColumnAggregateFunction::Container_t::const_iterator it = vec.begin() + offset; + ColumnAggregateFunction::Container_t::const_iterator end = limit ? it + limit : vec.end(); + + if (end > vec.end()) + end = vec.end(); + + for (; it != end; ++it) (*it)->serialize(ostr); - } } void DataTypeAggregateFunction::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const diff --git a/dbms/src/DataTypes/DataTypeArray.cpp b/dbms/src/DataTypes/DataTypeArray.cpp index 0bf112336f1..ada91a35a59 100644 --- a/dbms/src/DataTypes/DataTypeArray.cpp +++ b/dbms/src/DataTypes/DataTypeArray.cpp @@ -32,28 +32,17 @@ void DataTypeArray::deserializeBinary(Field & field, ReadBuffer & istr) const } -static size_t adjustedWriteCallback(IDataType::WriteCallback & original_callback, const ColumnArray::Offsets_t & offsets) -{ - size_t original_res = original_callback(); - - if (unlikely(original_res == 0)) - return 0; - else if (unlikely(original_res >= offsets.size())) - return offsets.back(); - else - return offsets[original_res - 1]; -} - - -void DataTypeArray::serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback) const +void DataTypeArray::serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const { const ColumnArray & column_array = dynamic_cast(column); const ColumnArray::Offsets_t & offsets = column_array.getOffsets(); - nested->serializeBinary(column_array.getData(), ostr, - callback - ? boost::bind(adjustedWriteCallback, boost::ref(callback), boost::cref(offsets)) - : WriteCallback()); + size_t nested_offset = offset ? offsets[offset] : 0; + size_t nested_limit = limit && offset + limit < offsets.size() + ? offsets[offset + limit] - offset + : 0; + + nested->serializeBinary(column_array.getData(), ostr, nested_offset, nested_limit); } @@ -71,7 +60,7 @@ void DataTypeArray::deserializeBinary(IColumn & column, ReadBuffer & istr, size_ } -void DataTypeArray::serializeOffsets(const IColumn & column, WriteBuffer & ostr, WriteCallback callback) const +void DataTypeArray::serializeOffsets(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const { const ColumnArray & column_array = dynamic_cast(column); const ColumnArray::Offsets_t & offsets = column_array.getOffsets(); @@ -80,16 +69,12 @@ void DataTypeArray::serializeOffsets(const IColumn & column, WriteBuffer & ostr, if (!size) return; - size_t next_callback_point = callback ? callback() : 0; - - writeIntBinary(offsets[0], ostr); - for (size_t i = 1; i < size; ++i) - { - if (next_callback_point && i == next_callback_point) - next_callback_point = callback(); + size_t end = limit && offset + limit < size + ? offset + limit + : size; + for (size_t i = offset; i < end; ++i) writeIntBinary(offsets[i] - offsets[i - 1], ostr); - } } diff --git a/dbms/src/DataTypes/DataTypeFixedString.cpp b/dbms/src/DataTypes/DataTypeFixedString.cpp index d4981e18df6..369ce950c34 100644 --- a/dbms/src/DataTypes/DataTypeFixedString.cpp +++ b/dbms/src/DataTypes/DataTypeFixedString.cpp @@ -38,26 +38,17 @@ void DataTypeFixedString::deserializeBinary(Field & field, ReadBuffer & istr) co } -void DataTypeFixedString::serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback) const +void DataTypeFixedString::serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const { const ColumnFixedArray & column_array = dynamic_cast(column); const ColumnUInt8::Container_t & data = dynamic_cast(column_array.getData()).getData(); - size_t prev_callback_point = 0; - size_t next_callback_point = 0; size_t size = data.size() / n; - while (next_callback_point < size) - { - next_callback_point = callback ? callback() : size; - if (next_callback_point > size) - next_callback_point = size; + if (limit == 0 || offset + limit > size) + limit = size - offset; - ostr.write(reinterpret_cast(&data[prev_callback_point * n]), - n * (next_callback_point - prev_callback_point)); - - prev_callback_point = next_callback_point; - } + ostr.write(reinterpret_cast(&data[offset]), n * limit); } diff --git a/dbms/src/DataTypes/DataTypeString.cpp b/dbms/src/DataTypes/DataTypeString.cpp index 05acdd7d563..f87859dd12d 100644 --- a/dbms/src/DataTypes/DataTypeString.cpp +++ b/dbms/src/DataTypes/DataTypeString.cpp @@ -40,7 +40,7 @@ void DataTypeString::deserializeBinary(Field & field, ReadBuffer & istr) const } -void DataTypeString::serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback) const +void DataTypeString::serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const { const ColumnArray & column_array = dynamic_cast(column); const ColumnUInt8::Container_t & data = dynamic_cast(column_array.getData()).getData(); @@ -50,16 +50,12 @@ void DataTypeString::serializeBinary(const IColumn & column, WriteBuffer & ostr, if (!size) return; - size_t next_callback_point = callback ? callback() : 0; - - writeVarUInt(offsets[0] - 1, ostr); - ostr.write(reinterpret_cast(&data[0]), offsets[0] - 1); + size_t end = limit && offset + limit < size + ? offset + limit + : size; - for (size_t i = 1; i < size; ++i) + for (size_t i = offset; i < end; ++i) { - if (next_callback_point && i == next_callback_point) - next_callback_point = callback(); - UInt64 str_size = offsets[i] - offsets[i - 1] - 1; writeVarUInt(str_size, ostr); ostr.write(reinterpret_cast(&data[offsets[i - 1]]), str_size); diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index e6f1edbacf8..480df09d38f 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -221,6 +221,7 @@ private: void writeData(const String & path, const String & name, const IDataType & type, const IColumn & column, size_t level = 0) { String escaped_column_name = escapeForFileName(name); + size_t size = column.size(); /// Для массивов требуется сначала сериализовать размеры, а потом значения. if (const DataTypeArray * type_arr = dynamic_cast(&type)) @@ -232,9 +233,15 @@ private: CompressedWriteBuffer compressed(plain); size_t prev_mark = 0; - type_arr->serializeOffsets(column, compressed, - boost::bind(&MergeTreeBlockOutputStream::writeCallback, this, - boost::ref(prev_mark), boost::ref(plain), boost::ref(compressed), boost::ref(marks))); + while (prev_mark < size) + { + /// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока) + writeIntBinary(plain.count(), marks); + writeIntBinary(compressed.offset(), marks); + + type_arr->serializeOffsets(column, compressed, prev_mark, storage.index_granularity); + prev_mark += storage.index_granularity; + } } { @@ -242,27 +249,18 @@ private: WriteBufferFromFile marks(path + escaped_column_name + ".mrk", 4096, flags); CompressedWriteBuffer compressed(plain); + // TODO Для массивов здесь баг - засечки сериализуются неправильно. size_t prev_mark = 0; - type.serializeBinary(column, compressed, - boost::bind(&MergeTreeBlockOutputStream::writeCallback, this, - boost::ref(prev_mark), boost::ref(plain), boost::ref(compressed), boost::ref(marks))); + while (prev_mark < size) + { + writeIntBinary(plain.count(), marks); + writeIntBinary(compressed.offset(), marks); + + type.serializeBinary(column, compressed, prev_mark, storage.index_granularity); + prev_mark += storage.index_granularity; + } } } - - /// Вызывается каждые index_granularity строк и пишет в файл с засечками (.mrk). - size_t writeCallback(size_t & prev_mark, - WriteBufferFromFile & plain, - CompressedWriteBuffer & compressed, - WriteBufferFromFile & marks) - { - /// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока) - - writeIntBinary(plain.count(), marks); - writeIntBinary(compressed.offset(), marks); - - prev_mark += storage.index_granularity; - return prev_mark; - } }; @@ -404,6 +402,8 @@ private: /// Записать данные одного столбца. void writeData(const String & name, const IDataType & type, const IColumn & column, size_t level = 0) { + size_t size = column.size(); + /// Для массивов требуется сначала сериализовать размеры, а потом значения. if (const DataTypeArray * type_arr = dynamic_cast(&type)) { @@ -412,43 +412,53 @@ private: ColumnStream & stream = *column_streams[size_name]; size_t prev_mark = 0; - type_arr->serializeOffsets(column, stream.compressed, - boost::bind(&MergedBlockOutputStream::writeCallback, this, - boost::ref(prev_mark), boost::ref(stream.plain), boost::ref(stream.compressed), boost::ref(stream.marks))); + while (prev_mark < size) + { + size_t limit = 0; + + /// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк. + if (prev_mark == 0 && index_offset != 0) + { + limit = index_offset; + } + else + { + limit = storage.index_granularity; + writeIntBinary(stream.plain.count(), stream.marks); + writeIntBinary(stream.compressed.offset(), stream.marks); + } + + type_arr->serializeOffsets(column, stream.compressed, prev_mark, limit); + prev_mark += limit; + } } { ColumnStream & stream = *column_streams[name]; + // TODO Для массивов здесь баг - засечки сериализуются неправильно. size_t prev_mark = 0; - type.serializeBinary(column, stream.compressed, - boost::bind(&MergedBlockOutputStream::writeCallback, this, - boost::ref(prev_mark), boost::ref(stream.plain), boost::ref(stream.compressed), boost::ref(stream.marks))); + while (prev_mark < size) + { + size_t limit = 0; + + /// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк. + if (prev_mark == 0 && index_offset != 0) + { + limit = index_offset; + } + else + { + limit = storage.index_granularity; + writeIntBinary(stream.plain.count(), stream.marks); + writeIntBinary(stream.compressed.offset(), stream.marks); + } + + type.serializeBinary(column, stream.compressed, prev_mark, limit); + prev_mark += limit; + } } } - - - /// Вызывается каждые index_granularity строк и пишет в файл с засечками (.mrk). - size_t writeCallback(size_t & prev_mark, - WriteBufferFromFile & plain, - CompressedWriteBuffer & compressed, - WriteBufferFromFile & marks) - { - /// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк. - if (prev_mark == 0 && index_offset != 0) - { - prev_mark = index_offset; - return prev_mark; - } - - /// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока) - - writeIntBinary(plain.count(), marks); - writeIntBinary(compressed.offset(), marks); - - prev_mark += storage.index_granularity; - return prev_mark; - } }; typedef Poco::SharedPtr MergedBlockOutputStreamPtr;