From 670968af973d80eb3670cc6f89b1f30bdc3f2d96 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Tue, 19 Feb 2019 23:01:31 +0300 Subject: [PATCH] Added function deserializeProtobuf() to each data type. --- .../DataTypes/DataTypeAggregateFunction.cpp | 31 +++++++++ .../src/DataTypes/DataTypeAggregateFunction.h | 1 + dbms/src/DataTypes/DataTypeArray.cpp | 31 +++++++++ dbms/src/DataTypes/DataTypeArray.h | 4 ++ dbms/src/DataTypes/DataTypeDate.cpp | 18 +++++ dbms/src/DataTypes/DataTypeDate.h | 1 + dbms/src/DataTypes/DataTypeDateTime.cpp | 18 +++++ dbms/src/DataTypes/DataTypeDateTime.h | 1 + dbms/src/DataTypes/DataTypeEnum.cpp | 20 ++++++ dbms/src/DataTypes/DataTypeEnum.h | 1 + dbms/src/DataTypes/DataTypeFixedString.cpp | 66 ++++++++++++++++--- dbms/src/DataTypes/DataTypeFixedString.h | 1 + dbms/src/DataTypes/DataTypeLowCardinality.cpp | 36 +++++++--- dbms/src/DataTypes/DataTypeLowCardinality.h | 20 +++--- dbms/src/DataTypes/DataTypeNullable.cpp | 20 ++++++ dbms/src/DataTypes/DataTypeNullable.h | 1 + dbms/src/DataTypes/DataTypeNumberBase.cpp | 20 ++++++ dbms/src/DataTypes/DataTypeNumberBase.h | 1 + dbms/src/DataTypes/DataTypeString.cpp | 45 ++++++++++++- dbms/src/DataTypes/DataTypeString.h | 1 + dbms/src/DataTypes/DataTypeTuple.cpp | 16 +++++ dbms/src/DataTypes/DataTypeTuple.h | 1 + dbms/src/DataTypes/DataTypeUUID.cpp | 17 +++++ dbms/src/DataTypes/DataTypeUUID.h | 1 + dbms/src/DataTypes/DataTypesDecimal.cpp | 20 ++++++ dbms/src/DataTypes/DataTypesDecimal.h | 1 + dbms/src/DataTypes/IDataType.h | 2 + dbms/src/DataTypes/IDataTypeDummy.h | 6 +- 28 files changed, 366 insertions(+), 35 deletions(-) diff --git a/dbms/src/DataTypes/DataTypeAggregateFunction.cpp b/dbms/src/DataTypes/DataTypeAggregateFunction.cpp index 38f58268dbd..c3277bdd90a 100644 --- a/dbms/src/DataTypes/DataTypeAggregateFunction.cpp +++ b/dbms/src/DataTypes/DataTypeAggregateFunction.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -254,6 +255,36 @@ void DataTypeAggregateFunction::serializeProtobuf(const IColumn & column, size_t protobuf.writeAggregateFunction(function, static_cast(column).getData()[row_num]); } +void DataTypeAggregateFunction::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + ColumnAggregateFunction & column_concrete = static_cast(column); + Arena & arena = column_concrete.createOrGetArena(); + size_t size_of_state = function->sizeOfData(); + AggregateDataPtr place = arena.alignedAlloc(size_of_state, function->alignOfData()); + function->create(place); + try + { + if (!protobuf.readAggregateFunction(function, place, arena)) + { + function->destroy(place); + return; + } + auto & container = column_concrete.getData(); + if (allow_add_row) + { + container.emplace_back(place); + row_added = true; + } + else + container.back() = place; + } + catch (...) + { + function->destroy(place); + throw; + } +} MutableColumnPtr DataTypeAggregateFunction::createColumn() const { diff --git a/dbms/src/DataTypes/DataTypeAggregateFunction.h b/dbms/src/DataTypes/DataTypeAggregateFunction.h index 1bd5269ffe0..d156e982681 100644 --- a/dbms/src/DataTypes/DataTypeAggregateFunction.h +++ b/dbms/src/DataTypes/DataTypeAggregateFunction.h @@ -57,6 +57,7 @@ public: void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override; void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/DataTypeArray.cpp b/dbms/src/DataTypes/DataTypeArray.cpp index 54d2668ad2b..580ec888197 100644 --- a/dbms/src/DataTypes/DataTypeArray.cpp +++ b/dbms/src/DataTypes/DataTypeArray.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -442,6 +443,36 @@ void DataTypeArray::serializeProtobuf(const IColumn & column, size_t row_num, Pr } +void DataTypeArray::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + ColumnArray & column_array = static_cast(column); + IColumn & nested_column = column_array.getData(); + ColumnArray::Offsets & offsets = column_array.getOffsets(); + size_t old_size = offsets.size(); + try + { + bool nested_row_added; + do + nested->deserializeProtobuf(nested_column, protobuf, true, nested_row_added); + while (nested_row_added && protobuf.maybeCanReadValue()); + if (allow_add_row) + { + offsets.emplace_back(nested_column.size()); + row_added = true; + } + else + offsets.back() = nested_column.size(); + } + catch (...) + { + offsets.resize_assume_reserved(old_size); + nested_column.popBack(nested_column.size() - offsets.back()); + throw; + } +} + + MutableColumnPtr DataTypeArray::createColumn() const { return ColumnArray::create(nested->createColumn(), ColumnArray::ColumnOffsets::create()); diff --git a/dbms/src/DataTypes/DataTypeArray.h b/dbms/src/DataTypes/DataTypeArray.h index 802f5922342..0b985a63ad1 100644 --- a/dbms/src/DataTypes/DataTypeArray.h +++ b/dbms/src/DataTypes/DataTypeArray.h @@ -87,6 +87,10 @@ public: void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, + ProtobufReader & protobuf, + bool allow_add_row, + bool & row_added) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/DataTypeDate.cpp b/dbms/src/DataTypes/DataTypeDate.cpp index d5556cb3378..df69473c7b4 100644 --- a/dbms/src/DataTypes/DataTypeDate.cpp +++ b/dbms/src/DataTypes/DataTypeDate.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include @@ -78,6 +79,23 @@ void DataTypeDate::serializeProtobuf(const IColumn & column, size_t row_num, Pro protobuf.writeDate(DayNum(static_cast(column).getData()[row_num])); } +void DataTypeDate::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + DayNum d; + if (!protobuf.readDate(d)) + return; + + auto & container = static_cast(column).getData(); + if (allow_add_row) + { + container.emplace_back(d); + row_added = true; + } + else + container.back() = d; +} + bool DataTypeDate::equals(const IDataType & rhs) const { return typeid(rhs) == typeid(*this); diff --git a/dbms/src/DataTypes/DataTypeDate.h b/dbms/src/DataTypes/DataTypeDate.h index 9bc56cc3762..43118467d90 100644 --- a/dbms/src/DataTypes/DataTypeDate.h +++ b/dbms/src/DataTypes/DataTypeDate.h @@ -22,6 +22,7 @@ public: void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override; void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; bool canBeUsedAsVersion() const override { return true; } bool canBeInsideNullable() const override { return true; } diff --git a/dbms/src/DataTypes/DataTypeDateTime.cpp b/dbms/src/DataTypes/DataTypeDateTime.cpp index 5081b68e9f0..ad328c49a6c 100644 --- a/dbms/src/DataTypes/DataTypeDateTime.cpp +++ b/dbms/src/DataTypes/DataTypeDateTime.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -144,6 +145,23 @@ void DataTypeDateTime::serializeProtobuf(const IColumn & column, size_t row_num, protobuf.writeDateTime(static_cast(column).getData()[row_num]); } +void DataTypeDateTime::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + time_t t; + if (!protobuf.readDateTime(t)) + return; + + auto & container = static_cast(column).getData(); + if (allow_add_row) + { + container.emplace_back(t); + row_added = true; + } + else + container.back() = t; +} + bool DataTypeDateTime::equals(const IDataType & rhs) const { /// DateTime with different timezones are equal, because: diff --git a/dbms/src/DataTypes/DataTypeDateTime.h b/dbms/src/DataTypes/DataTypeDateTime.h index eba05814550..b13767d8bf8 100644 --- a/dbms/src/DataTypes/DataTypeDateTime.h +++ b/dbms/src/DataTypes/DataTypeDateTime.h @@ -47,6 +47,7 @@ public: void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override; void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; bool canBeUsedAsVersion() const override { return true; } bool canBeInsideNullable() const override { return true; } diff --git a/dbms/src/DataTypes/DataTypeEnum.cpp b/dbms/src/DataTypes/DataTypeEnum.cpp index f6566ad9040..f655e679883 100644 --- a/dbms/src/DataTypes/DataTypeEnum.cpp +++ b/dbms/src/DataTypes/DataTypeEnum.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -230,6 +231,25 @@ void DataTypeEnum::serializeProtobuf(const IColumn & column, size_t row_nu protobuf.writeEnum(static_cast(column).getData()[row_num]); } +template +void DataTypeEnum::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + protobuf.prepareEnumMapping(values); + row_added = false; + Type value; + if (!protobuf.readEnum(value)) + return; + + auto & container = static_cast(column).getData(); + if (allow_add_row) + { + container.emplace_back(value); + row_added = true; + } + else + container.back() = value; +} + template Field DataTypeEnum::getDefault() const { diff --git a/dbms/src/DataTypes/DataTypeEnum.h b/dbms/src/DataTypes/DataTypeEnum.h index 61104e70505..a1dae0130b0 100644 --- a/dbms/src/DataTypes/DataTypeEnum.h +++ b/dbms/src/DataTypes/DataTypeEnum.h @@ -106,6 +106,7 @@ public: void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, const size_t limit, const double avg_value_size_hint) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; MutableColumnPtr createColumn() const override { return ColumnType::create(); } diff --git a/dbms/src/DataTypes/DataTypeFixedString.cpp b/dbms/src/DataTypes/DataTypeFixedString.cpp index 17cb82b7c3d..fdc4d52c6fb 100644 --- a/dbms/src/DataTypes/DataTypeFixedString.cpp +++ b/dbms/src/DataTypes/DataTypeFixedString.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -122,30 +123,37 @@ void DataTypeFixedString::serializeTextEscaped(const IColumn & column, size_t ro } +static inline void alignStringLength(const DataTypeFixedString & type, + ColumnFixedString::Chars & data, + size_t string_start) +{ + size_t length = data.size() - string_start; + if (length < type.getN()) + { + data.resize_fill(string_start + type.getN()); + } + else if (length > type.getN()) + { + data.resize_assume_reserved(string_start); + throw Exception("Too large value for " + type.getName(), ErrorCodes::TOO_LARGE_STRING_SIZE); + } +} + template static inline void read(const DataTypeFixedString & self, IColumn & column, Reader && reader) { ColumnFixedString::Chars & data = typeid_cast(column).getChars(); size_t prev_size = data.size(); - try { reader(data); + alignStringLength(self, data, prev_size); } catch (...) { data.resize_assume_reserved(prev_size); throw; } - - if (data.size() < prev_size + self.getN()) - data.resize_fill(prev_size + self.getN()); - - if (data.size() > prev_size + self.getN()) - { - data.resize_assume_reserved(prev_size); - throw Exception("Too large value for " + self.getName(), ErrorCodes::TOO_LARGE_STRING_SIZE); - } } @@ -208,6 +216,44 @@ void DataTypeFixedString::serializeProtobuf(const IColumn & column, size_t row_n } +void DataTypeFixedString::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + auto & column_string = static_cast(column); + ColumnFixedString::Chars & data = column_string.getChars(); + size_t old_size = data.size(); + try + { + if (allow_add_row) + { + if (protobuf.readStringInto(data)) + { + alignStringLength(*this, data, old_size); + row_added = true; + } + else + data.resize_assume_reserved(old_size); + } + else + { + ColumnFixedString::Chars temp_data; + if (protobuf.readStringInto(temp_data)) + { + alignStringLength(*this, temp_data, 0); + column_string.popBack(1); + old_size = data.size(); + data.insertSmallAllowReadWriteOverflow15(temp_data.begin(), temp_data.end()); + } + } + } + catch (...) + { + data.resize_assume_reserved(old_size); + throw; + } +} + + MutableColumnPtr DataTypeFixedString::createColumn() const { return ColumnFixedString::create(n); diff --git a/dbms/src/DataTypes/DataTypeFixedString.h b/dbms/src/DataTypes/DataTypeFixedString.h index f929e09a7ee..75823342320 100644 --- a/dbms/src/DataTypes/DataTypeFixedString.h +++ b/dbms/src/DataTypes/DataTypeFixedString.h @@ -65,6 +65,7 @@ public: void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/DataTypeLowCardinality.cpp b/dbms/src/DataTypes/DataTypeLowCardinality.cpp index 8d6c457646d..105e989f69a 100644 --- a/dbms/src/DataTypes/DataTypeLowCardinality.cpp +++ b/dbms/src/DataTypes/DataTypeLowCardinality.cpp @@ -729,27 +729,43 @@ void DataTypeLowCardinality::deserializeBinary(Field & field, ReadBuffer & istr) dictionary_type->deserializeBinary(field, istr); } -template +void DataTypeLowCardinality::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + if (allow_add_row) + { + deserializeImpl(column, &IDataType::deserializeProtobuf, protobuf, true, row_added); + return; + } + + row_added = false; + auto & low_cardinality_column= getColumnLowCardinality(column); + auto nested_column = low_cardinality_column.getDictionary().getNestedColumn(); + auto temp_column = nested_column->cloneEmpty(); + size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(low_cardinality_column.size() - 1); + temp_column->insertFrom(*nested_column, unique_row_number); + bool dummy; + dictionary_type.get()->deserializeProtobuf(*temp_column, protobuf, false, dummy); + low_cardinality_column.popBack(1); + low_cardinality_column.insertFromFullColumn(*temp_column, 0); +} + +template void DataTypeLowCardinality::serializeImpl( - const IColumn & column, size_t row_num, - DataTypeLowCardinality::SerializeFunctionPtr func, - OutputStream & ostr, Args & ... args) const + const IColumn & column, size_t row_num, DataTypeLowCardinality::SerializeFunctionPtr func, Args &&... args) const { auto & low_cardinality_column = getColumnLowCardinality(column); size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(row_num); - (dictionary_type.get()->*func)(*low_cardinality_column.getDictionary().getNestedColumn(), unique_row_number, ostr, std::forward(args)...); + (dictionary_type.get()->*func)(*low_cardinality_column.getDictionary().getNestedColumn(), unique_row_number, std::forward(args)...); } -template +template void DataTypeLowCardinality::deserializeImpl( - IColumn & column, - DataTypeLowCardinality::DeserializeFunctionPtr func, - ReadBuffer & istr, Args & ... args) const + IColumn & column, DataTypeLowCardinality::DeserializeFunctionPtr func, Args &&... args) const { auto & low_cardinality_column= getColumnLowCardinality(column); auto temp_column = low_cardinality_column.getDictionary().getNestedColumn()->cloneEmpty(); - (dictionary_type.get()->*func)(*temp_column, istr, std::forward(args)...); + (dictionary_type.get()->*func)(*temp_column, std::forward(args)...); low_cardinality_column.insertFromFullColumn(*temp_column, 0); } diff --git a/dbms/src/DataTypes/DataTypeLowCardinality.h b/dbms/src/DataTypes/DataTypeLowCardinality.h index 8ba2a4d1c4d..cab13d5ec69 100644 --- a/dbms/src/DataTypes/DataTypeLowCardinality.h +++ b/dbms/src/DataTypes/DataTypeLowCardinality.h @@ -115,6 +115,8 @@ public: serializeImpl(column, row_num, &IDataType::serializeProtobuf, protobuf); } + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; + MutableColumnPtr createColumn() const override; Field getDefault() const override { return dictionary_type->getDefault(); } @@ -148,19 +150,17 @@ public: private: - template - using SerializeFunctionPtr = void (IDataType::*)(const IColumn &, size_t, OutputStream &, Args & ...) const; + template + using SerializeFunctionPtr = void (IDataType::*)(const IColumn &, size_t, Params ...) const; - template - void serializeImpl(const IColumn & column, size_t row_num, SerializeFunctionPtr func, - OutputStream & ostr, Args & ... args) const; + template + void serializeImpl(const IColumn & column, size_t row_num, SerializeFunctionPtr func, Args &&... args) const; - template - using DeserializeFunctionPtr = void (IDataType::*)(IColumn &, ReadBuffer &, Args & ...) const; + template + using DeserializeFunctionPtr = void (IDataType::*)(IColumn &, Params ...) const; - template - void deserializeImpl(IColumn & column, DeserializeFunctionPtr func, - ReadBuffer & istr, Args & ... args) const; + template + void deserializeImpl(IColumn & column, DeserializeFunctionPtr func, Args &&... args) const; template static MutableColumnUniquePtr createColumnUniqueImpl(const IDataType & keys_type, const Creator & creator); diff --git a/dbms/src/DataTypes/DataTypeNullable.cpp b/dbms/src/DataTypes/DataTypeNullable.cpp index 3cf798724ac..d12ec1ffef4 100644 --- a/dbms/src/DataTypes/DataTypeNullable.cpp +++ b/dbms/src/DataTypes/DataTypeNullable.cpp @@ -318,6 +318,26 @@ void DataTypeNullable::serializeProtobuf(const IColumn & column, size_t row_num, nested_data_type->serializeProtobuf(col.getNestedColumn(), row_num, protobuf); } +void DataTypeNullable::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + ColumnNullable & col = static_cast(column); + IColumn & nested_column = col.getNestedColumn(); + size_t old_size = nested_column.size(); + try + { + nested_data_type->deserializeProtobuf(nested_column, protobuf, allow_add_row, row_added); + if (row_added) + col.getNullMapData().push_back(0); + } + catch (...) + { + nested_column.popBack(nested_column.size() - old_size); + col.getNullMapData().resize_assume_reserved(old_size); + row_added = false; + throw; + } +} + MutableColumnPtr DataTypeNullable::createColumn() const { return ColumnNullable::create(nested_data_type->createColumn(), ColumnUInt8::create()); diff --git a/dbms/src/DataTypes/DataTypeNullable.h b/dbms/src/DataTypes/DataTypeNullable.h index 20a6b195c84..f83aa3a71cf 100644 --- a/dbms/src/DataTypes/DataTypeNullable.h +++ b/dbms/src/DataTypes/DataTypeNullable.h @@ -71,6 +71,7 @@ public: void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/DataTypeNumberBase.cpp b/dbms/src/DataTypes/DataTypeNumberBase.cpp index b93ef034c67..0245788b987 100644 --- a/dbms/src/DataTypes/DataTypeNumberBase.cpp +++ b/dbms/src/DataTypes/DataTypeNumberBase.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include @@ -210,6 +211,25 @@ void DataTypeNumberBase::serializeProtobuf(const IColumn & column, size_t row } +template +void DataTypeNumberBase::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + T value; + if (!protobuf.readNumber(value)) + return; + + auto & container = typeid_cast &>(column).getData(); + if (allow_add_row) + { + container.emplace_back(value); + row_added = true; + } + else + container.back() = value; +} + + template MutableColumnPtr DataTypeNumberBase::createColumn() const { diff --git a/dbms/src/DataTypes/DataTypeNumberBase.h b/dbms/src/DataTypes/DataTypeNumberBase.h index 2728d32a6a9..d88cd221625 100644 --- a/dbms/src/DataTypes/DataTypeNumberBase.h +++ b/dbms/src/DataTypes/DataTypeNumberBase.h @@ -37,6 +37,7 @@ public: void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/DataTypeString.cpp b/dbms/src/DataTypes/DataTypeString.cpp index 34077e4bbe7..11649e845d5 100644 --- a/dbms/src/DataTypes/DataTypeString.cpp +++ b/dbms/src/DataTypes/DataTypeString.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include #include @@ -237,10 +238,8 @@ static inline void read(IColumn & column, Reader && reader) ColumnString & column_string = static_cast(column); ColumnString::Chars & data = column_string.getChars(); ColumnString::Offsets & offsets = column_string.getOffsets(); - size_t old_chars_size = data.size(); size_t old_offsets_size = offsets.size(); - try { reader(data); @@ -310,6 +309,48 @@ void DataTypeString::serializeProtobuf(const IColumn & column, size_t row_num, P } +void DataTypeString::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + auto & column_string = static_cast(column); + ColumnString::Chars & data = column_string.getChars(); + ColumnString::Offsets & offsets = column_string.getOffsets(); + size_t old_size = offsets.size(); + try + { + if (allow_add_row) + { + if (protobuf.readStringInto(data)) + { + data.emplace_back(0); + offsets.emplace_back(data.size()); + row_added = true; + } + else + data.resize_assume_reserved(offsets.back()); + } + else + { + ColumnString::Chars temp_data; + if (protobuf.readStringInto(temp_data)) + { + temp_data.emplace_back(0); + column_string.popBack(1); + old_size = offsets.size(); + data.insertSmallAllowReadWriteOverflow15(temp_data.begin(), temp_data.end()); + offsets.emplace_back(data.size()); + } + } + } + catch (...) + { + offsets.resize_assume_reserved(old_size); + data.resize_assume_reserved(offsets.back()); + throw; + } +} + + MutableColumnPtr DataTypeString::createColumn() const { return ColumnString::create(); diff --git a/dbms/src/DataTypes/DataTypeString.h b/dbms/src/DataTypes/DataTypeString.h index 202c8374c27..7f69f46f290 100644 --- a/dbms/src/DataTypes/DataTypeString.h +++ b/dbms/src/DataTypes/DataTypeString.h @@ -46,6 +46,7 @@ public: void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/DataTypeTuple.cpp b/dbms/src/DataTypes/DataTypeTuple.cpp index 1940c44134f..8e7ce5f7c94 100644 --- a/dbms/src/DataTypes/DataTypeTuple.cpp +++ b/dbms/src/DataTypes/DataTypeTuple.cpp @@ -413,6 +413,22 @@ void DataTypeTuple::serializeProtobuf(const IColumn & column, size_t row_num, Pr elems[i]->serializeProtobuf(extractElementColumn(column, i), row_num, protobuf); } +void DataTypeTuple::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + bool all_elements_get_row = true; + addElementSafe(elems, column, [&] + { + for (const auto & i : ext::range(0, ext::size(elems))) + { + bool element_row_added; + elems[i]->deserializeProtobuf(extractElementColumn(column, i), protobuf, allow_add_row, element_row_added); + all_elements_get_row &= element_row_added; + } + }); + row_added = all_elements_get_row; +} + MutableColumnPtr DataTypeTuple::createColumn() const { size_t size = elems.size(); diff --git a/dbms/src/DataTypes/DataTypeTuple.h b/dbms/src/DataTypes/DataTypeTuple.h index d489ae4ba7f..5bef9f00566 100644 --- a/dbms/src/DataTypes/DataTypeTuple.h +++ b/dbms/src/DataTypes/DataTypeTuple.h @@ -78,6 +78,7 @@ public: DeserializeBinaryBulkStatePtr & state) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & reader, bool allow_add_row, bool & row_added) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/DataTypeUUID.cpp b/dbms/src/DataTypes/DataTypeUUID.cpp index 9f913b5bf80..9243b80721a 100644 --- a/dbms/src/DataTypes/DataTypeUUID.cpp +++ b/dbms/src/DataTypes/DataTypeUUID.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -77,6 +78,22 @@ void DataTypeUUID::serializeProtobuf(const IColumn & column, size_t row_num, Pro protobuf.writeUUID(UUID(static_cast(column).getData()[row_num])); } +void DataTypeUUID::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + UUID uuid; + if (!protobuf.readUUID(uuid)) + return; + + auto & container = static_cast(column).getData(); + if (allow_add_row) + { + container.emplace_back(uuid); + row_added = true; + } + else + container.back() = uuid; +} bool DataTypeUUID::equals(const IDataType & rhs) const { diff --git a/dbms/src/DataTypes/DataTypeUUID.h b/dbms/src/DataTypes/DataTypeUUID.h index 0a0ce6ad035..80b313aeb41 100644 --- a/dbms/src/DataTypes/DataTypeUUID.h +++ b/dbms/src/DataTypes/DataTypeUUID.h @@ -25,6 +25,7 @@ public: void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override; void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; bool canBeUsedInBitOperations() const override { return true; } bool canBeInsideNullable() const override { return true; } diff --git a/dbms/src/DataTypes/DataTypesDecimal.cpp b/dbms/src/DataTypes/DataTypesDecimal.cpp index b9ccb41af3d..f115118d67c 100644 --- a/dbms/src/DataTypes/DataTypesDecimal.cpp +++ b/dbms/src/DataTypes/DataTypesDecimal.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -141,6 +142,25 @@ void DataTypeDecimal::serializeProtobuf(const IColumn & column, size_t row_nu } +template +void DataTypeDecimal::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + T decimal; + if (!protobuf.readDecimal(decimal, precision, scale)) + return; + + auto & container = static_cast(column).getData(); + if (allow_add_row) + { + container.emplace_back(decimal); + row_added = true; + } + else + container.back() = decimal; +} + + template Field DataTypeDecimal::getDefault() const { diff --git a/dbms/src/DataTypes/DataTypesDecimal.h b/dbms/src/DataTypes/DataTypesDecimal.h index 7159750c36b..9918e8c90fd 100644 --- a/dbms/src/DataTypes/DataTypesDecimal.h +++ b/dbms/src/DataTypes/DataTypesDecimal.h @@ -101,6 +101,7 @@ public: void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; Field getDefault() const override; bool canBePromoted() const override { return true; } diff --git a/dbms/src/DataTypes/IDataType.h b/dbms/src/DataTypes/IDataType.h index 6bb59106d10..681f81a1b00 100644 --- a/dbms/src/DataTypes/IDataType.h +++ b/dbms/src/DataTypes/IDataType.h @@ -23,6 +23,7 @@ using MutableColumnPtr = COWPtr::MutablePtr; using DataTypePtr = std::shared_ptr; using DataTypes = std::vector; +class ProtobufReader; class ProtobufWriter; @@ -254,6 +255,7 @@ public: /** Serialize to a protobuf. */ virtual void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const = 0; + virtual void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const = 0; protected: virtual String doGetName() const; diff --git a/dbms/src/DataTypes/IDataTypeDummy.h b/dbms/src/DataTypes/IDataTypeDummy.h index bb122126577..1aa5135c53e 100644 --- a/dbms/src/DataTypes/IDataTypeDummy.h +++ b/dbms/src/DataTypes/IDataTypeDummy.h @@ -27,8 +27,9 @@ public: void serializeBinaryBulk(const IColumn &, WriteBuffer &, size_t, size_t) const override { throwNoSerialization(); } void deserializeBinaryBulk(IColumn &, ReadBuffer &, size_t, double) const override { throwNoSerialization(); } void serializeText(const IColumn &, size_t, WriteBuffer &, const FormatSettings &) const override { throwNoSerialization(); } - void deserializeText(IColumn &, ReadBuffer &, const FormatSettings &) const override { throwNoSerialization(); } - void serializeProtobuf(const IColumn &, size_t, ProtobufWriter &) const override { throwNoSerialization(); } + void deserializeText(IColumn &, ReadBuffer &, const FormatSettings &) const override { throwNoSerialization(); } + void serializeProtobuf(const IColumn &, size_t, ProtobufWriter &) const override { throwNoSerialization(); } + void deserializeProtobuf(IColumn &, ProtobufReader &, bool, bool &) const override { throwNoSerialization(); } MutableColumnPtr createColumn() const override { @@ -50,4 +51,3 @@ public: }; } -