mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Added function deserializeProtobuf() to each data type.
This commit is contained in:
parent
630012dfb7
commit
670968af97
@ -9,6 +9,7 @@
|
||||
#include <Common/AlignedBuffer.h>
|
||||
|
||||
#include <Formats/FormatSettings.h>
|
||||
#include <Formats/ProtobufReader.h>
|
||||
#include <Formats/ProtobufWriter.h>
|
||||
#include <DataTypes/DataTypeAggregateFunction.h>
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
@ -254,6 +255,36 @@ void DataTypeAggregateFunction::serializeProtobuf(const IColumn & column, size_t
|
||||
protobuf.writeAggregateFunction(function, static_cast<const ColumnAggregateFunction &>(column).getData()[row_num]);
|
||||
}
|
||||
|
||||
void DataTypeAggregateFunction::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
|
||||
{
|
||||
row_added = false;
|
||||
ColumnAggregateFunction & column_concrete = static_cast<ColumnAggregateFunction &>(column);
|
||||
Arena & arena = column_concrete.createOrGetArena();
|
||||
size_t size_of_state = function->sizeOfData();
|
||||
AggregateDataPtr place = arena.alignedAlloc(size_of_state, function->alignOfData());
|
||||
function->create(place);
|
||||
try
|
||||
{
|
||||
if (!protobuf.readAggregateFunction(function, place, arena))
|
||||
{
|
||||
function->destroy(place);
|
||||
return;
|
||||
}
|
||||
auto & container = column_concrete.getData();
|
||||
if (allow_add_row)
|
||||
{
|
||||
container.emplace_back(place);
|
||||
row_added = true;
|
||||
}
|
||||
else
|
||||
container.back() = place;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
function->destroy(place);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
MutableColumnPtr DataTypeAggregateFunction::createColumn() const
|
||||
{
|
||||
|
@ -57,6 +57,7 @@ public:
|
||||
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
|
||||
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
|
||||
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
|
||||
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
|
||||
|
||||
MutableColumnPtr createColumn() const override;
|
||||
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
|
||||
#include <Formats/FormatSettings.h>
|
||||
#include <Formats/ProtobufReader.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
@ -442,6 +443,36 @@ void DataTypeArray::serializeProtobuf(const IColumn & column, size_t row_num, Pr
|
||||
}
|
||||
|
||||
|
||||
void DataTypeArray::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
|
||||
{
|
||||
row_added = false;
|
||||
ColumnArray & column_array = static_cast<ColumnArray &>(column);
|
||||
IColumn & nested_column = column_array.getData();
|
||||
ColumnArray::Offsets & offsets = column_array.getOffsets();
|
||||
size_t old_size = offsets.size();
|
||||
try
|
||||
{
|
||||
bool nested_row_added;
|
||||
do
|
||||
nested->deserializeProtobuf(nested_column, protobuf, true, nested_row_added);
|
||||
while (nested_row_added && protobuf.maybeCanReadValue());
|
||||
if (allow_add_row)
|
||||
{
|
||||
offsets.emplace_back(nested_column.size());
|
||||
row_added = true;
|
||||
}
|
||||
else
|
||||
offsets.back() = nested_column.size();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
offsets.resize_assume_reserved(old_size);
|
||||
nested_column.popBack(nested_column.size() - offsets.back());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr DataTypeArray::createColumn() const
|
||||
{
|
||||
return ColumnArray::create(nested->createColumn(), ColumnArray::ColumnOffsets::create());
|
||||
|
@ -87,6 +87,10 @@ public:
|
||||
void serializeProtobuf(const IColumn & column,
|
||||
size_t row_num,
|
||||
ProtobufWriter & protobuf) const override;
|
||||
void deserializeProtobuf(IColumn & column,
|
||||
ProtobufReader & protobuf,
|
||||
bool allow_add_row,
|
||||
bool & row_added) const override;
|
||||
|
||||
MutableColumnPtr createColumn() const override;
|
||||
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <DataTypes/DataTypeDate.h>
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
#include <Formats/ProtobufReader.h>
|
||||
#include <Formats/ProtobufWriter.h>
|
||||
|
||||
|
||||
@ -78,6 +79,23 @@ void DataTypeDate::serializeProtobuf(const IColumn & column, size_t row_num, Pro
|
||||
protobuf.writeDate(DayNum(static_cast<const ColumnUInt16 &>(column).getData()[row_num]));
|
||||
}
|
||||
|
||||
void DataTypeDate::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
|
||||
{
|
||||
row_added = false;
|
||||
DayNum d;
|
||||
if (!protobuf.readDate(d))
|
||||
return;
|
||||
|
||||
auto & container = static_cast<ColumnUInt16 &>(column).getData();
|
||||
if (allow_add_row)
|
||||
{
|
||||
container.emplace_back(d);
|
||||
row_added = true;
|
||||
}
|
||||
else
|
||||
container.back() = d;
|
||||
}
|
||||
|
||||
bool DataTypeDate::equals(const IDataType & rhs) const
|
||||
{
|
||||
return typeid(rhs) == typeid(*this);
|
||||
|
@ -22,6 +22,7 @@ public:
|
||||
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
|
||||
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
|
||||
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
|
||||
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
|
||||
|
||||
bool canBeUsedAsVersion() const override { return true; }
|
||||
bool canBeInsideNullable() const override { return true; }
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Formats/FormatSettings.h>
|
||||
#include <Formats/ProtobufReader.h>
|
||||
#include <Formats/ProtobufWriter.h>
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
@ -144,6 +145,23 @@ void DataTypeDateTime::serializeProtobuf(const IColumn & column, size_t row_num,
|
||||
protobuf.writeDateTime(static_cast<const ColumnUInt32 &>(column).getData()[row_num]);
|
||||
}
|
||||
|
||||
void DataTypeDateTime::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
|
||||
{
|
||||
row_added = false;
|
||||
time_t t;
|
||||
if (!protobuf.readDateTime(t))
|
||||
return;
|
||||
|
||||
auto & container = static_cast<ColumnUInt32 &>(column).getData();
|
||||
if (allow_add_row)
|
||||
{
|
||||
container.emplace_back(t);
|
||||
row_added = true;
|
||||
}
|
||||
else
|
||||
container.back() = t;
|
||||
}
|
||||
|
||||
bool DataTypeDateTime::equals(const IDataType & rhs) const
|
||||
{
|
||||
/// DateTime with different timezones are equal, because:
|
||||
|
@ -47,6 +47,7 @@ public:
|
||||
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
|
||||
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
|
||||
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
|
||||
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
|
||||
|
||||
bool canBeUsedAsVersion() const override { return true; }
|
||||
bool canBeInsideNullable() const override { return true; }
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <Formats/FormatSettings.h>
|
||||
#include <Formats/ProtobufReader.h>
|
||||
#include <Formats/ProtobufWriter.h>
|
||||
#include <DataTypes/DataTypeEnum.h>
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
@ -230,6 +231,25 @@ void DataTypeEnum<Type>::serializeProtobuf(const IColumn & column, size_t row_nu
|
||||
protobuf.writeEnum(static_cast<const ColumnType &>(column).getData()[row_num]);
|
||||
}
|
||||
|
||||
template<typename Type>
|
||||
void DataTypeEnum<Type>::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
|
||||
{
|
||||
protobuf.prepareEnumMapping(values);
|
||||
row_added = false;
|
||||
Type value;
|
||||
if (!protobuf.readEnum(value))
|
||||
return;
|
||||
|
||||
auto & container = static_cast<ColumnType &>(column).getData();
|
||||
if (allow_add_row)
|
||||
{
|
||||
container.emplace_back(value);
|
||||
row_added = true;
|
||||
}
|
||||
else
|
||||
container.back() = value;
|
||||
}
|
||||
|
||||
template <typename Type>
|
||||
Field DataTypeEnum<Type>::getDefault() const
|
||||
{
|
||||
|
@ -106,6 +106,7 @@ public:
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, const size_t limit, const double avg_value_size_hint) const override;
|
||||
|
||||
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
|
||||
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
|
||||
|
||||
MutableColumnPtr createColumn() const override { return ColumnType::create(); }
|
||||
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Columns/ColumnConst.h>
|
||||
|
||||
#include <Formats/FormatSettings.h>
|
||||
#include <Formats/ProtobufReader.h>
|
||||
#include <Formats/ProtobufWriter.h>
|
||||
#include <DataTypes/DataTypeFixedString.h>
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
@ -122,30 +123,37 @@ void DataTypeFixedString::serializeTextEscaped(const IColumn & column, size_t ro
|
||||
}
|
||||
|
||||
|
||||
static inline void alignStringLength(const DataTypeFixedString & type,
|
||||
ColumnFixedString::Chars & data,
|
||||
size_t string_start)
|
||||
{
|
||||
size_t length = data.size() - string_start;
|
||||
if (length < type.getN())
|
||||
{
|
||||
data.resize_fill(string_start + type.getN());
|
||||
}
|
||||
else if (length > type.getN())
|
||||
{
|
||||
data.resize_assume_reserved(string_start);
|
||||
throw Exception("Too large value for " + type.getName(), ErrorCodes::TOO_LARGE_STRING_SIZE);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Reader>
|
||||
static inline void read(const DataTypeFixedString & self, IColumn & column, Reader && reader)
|
||||
{
|
||||
ColumnFixedString::Chars & data = typeid_cast<ColumnFixedString &>(column).getChars();
|
||||
size_t prev_size = data.size();
|
||||
|
||||
try
|
||||
{
|
||||
reader(data);
|
||||
alignStringLength(self, data, prev_size);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
data.resize_assume_reserved(prev_size);
|
||||
throw;
|
||||
}
|
||||
|
||||
if (data.size() < prev_size + self.getN())
|
||||
data.resize_fill(prev_size + self.getN());
|
||||
|
||||
if (data.size() > prev_size + self.getN())
|
||||
{
|
||||
data.resize_assume_reserved(prev_size);
|
||||
throw Exception("Too large value for " + self.getName(), ErrorCodes::TOO_LARGE_STRING_SIZE);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -208,6 +216,44 @@ void DataTypeFixedString::serializeProtobuf(const IColumn & column, size_t row_n
|
||||
}
|
||||
|
||||
|
||||
void DataTypeFixedString::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
|
||||
{
|
||||
row_added = false;
|
||||
auto & column_string = static_cast<ColumnFixedString &>(column);
|
||||
ColumnFixedString::Chars & data = column_string.getChars();
|
||||
size_t old_size = data.size();
|
||||
try
|
||||
{
|
||||
if (allow_add_row)
|
||||
{
|
||||
if (protobuf.readStringInto(data))
|
||||
{
|
||||
alignStringLength(*this, data, old_size);
|
||||
row_added = true;
|
||||
}
|
||||
else
|
||||
data.resize_assume_reserved(old_size);
|
||||
}
|
||||
else
|
||||
{
|
||||
ColumnFixedString::Chars temp_data;
|
||||
if (protobuf.readStringInto(temp_data))
|
||||
{
|
||||
alignStringLength(*this, temp_data, 0);
|
||||
column_string.popBack(1);
|
||||
old_size = data.size();
|
||||
data.insertSmallAllowReadWriteOverflow15(temp_data.begin(), temp_data.end());
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
data.resize_assume_reserved(old_size);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr DataTypeFixedString::createColumn() const
|
||||
{
|
||||
return ColumnFixedString::create(n);
|
||||
|
@ -65,6 +65,7 @@ public:
|
||||
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
|
||||
|
||||
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
|
||||
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
|
||||
|
||||
MutableColumnPtr createColumn() const override;
|
||||
|
||||
|
@ -729,27 +729,43 @@ void DataTypeLowCardinality::deserializeBinary(Field & field, ReadBuffer & istr)
|
||||
dictionary_type->deserializeBinary(field, istr);
|
||||
}
|
||||
|
||||
template <typename OutputStream, typename ... Args>
|
||||
void DataTypeLowCardinality::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
|
||||
{
|
||||
if (allow_add_row)
|
||||
{
|
||||
deserializeImpl(column, &IDataType::deserializeProtobuf, protobuf, true, row_added);
|
||||
return;
|
||||
}
|
||||
|
||||
row_added = false;
|
||||
auto & low_cardinality_column= getColumnLowCardinality(column);
|
||||
auto nested_column = low_cardinality_column.getDictionary().getNestedColumn();
|
||||
auto temp_column = nested_column->cloneEmpty();
|
||||
size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(low_cardinality_column.size() - 1);
|
||||
temp_column->insertFrom(*nested_column, unique_row_number);
|
||||
bool dummy;
|
||||
dictionary_type.get()->deserializeProtobuf(*temp_column, protobuf, false, dummy);
|
||||
low_cardinality_column.popBack(1);
|
||||
low_cardinality_column.insertFromFullColumn(*temp_column, 0);
|
||||
}
|
||||
|
||||
template <typename... Params, typename... Args>
|
||||
void DataTypeLowCardinality::serializeImpl(
|
||||
const IColumn & column, size_t row_num,
|
||||
DataTypeLowCardinality::SerializeFunctionPtr<OutputStream, Args ...> func,
|
||||
OutputStream & ostr, Args & ... args) const
|
||||
const IColumn & column, size_t row_num, DataTypeLowCardinality::SerializeFunctionPtr<Params...> func, Args &&... args) const
|
||||
{
|
||||
auto & low_cardinality_column = getColumnLowCardinality(column);
|
||||
size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(row_num);
|
||||
(dictionary_type.get()->*func)(*low_cardinality_column.getDictionary().getNestedColumn(), unique_row_number, ostr, std::forward<Args>(args)...);
|
||||
(dictionary_type.get()->*func)(*low_cardinality_column.getDictionary().getNestedColumn(), unique_row_number, std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
template <typename ... Args>
|
||||
template <typename... Params, typename... Args>
|
||||
void DataTypeLowCardinality::deserializeImpl(
|
||||
IColumn & column,
|
||||
DataTypeLowCardinality::DeserializeFunctionPtr<Args ...> func,
|
||||
ReadBuffer & istr, Args & ... args) const
|
||||
IColumn & column, DataTypeLowCardinality::DeserializeFunctionPtr<Params...> func, Args &&... args) const
|
||||
{
|
||||
auto & low_cardinality_column= getColumnLowCardinality(column);
|
||||
auto temp_column = low_cardinality_column.getDictionary().getNestedColumn()->cloneEmpty();
|
||||
|
||||
(dictionary_type.get()->*func)(*temp_column, istr, std::forward<Args>(args)...);
|
||||
(dictionary_type.get()->*func)(*temp_column, std::forward<Args>(args)...);
|
||||
|
||||
low_cardinality_column.insertFromFullColumn(*temp_column, 0);
|
||||
}
|
||||
|
@ -115,6 +115,8 @@ public:
|
||||
serializeImpl(column, row_num, &IDataType::serializeProtobuf, protobuf);
|
||||
}
|
||||
|
||||
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
|
||||
|
||||
MutableColumnPtr createColumn() const override;
|
||||
|
||||
Field getDefault() const override { return dictionary_type->getDefault(); }
|
||||
@ -148,19 +150,17 @@ public:
|
||||
|
||||
private:
|
||||
|
||||
template <typename OutputStream, typename ... Args>
|
||||
using SerializeFunctionPtr = void (IDataType::*)(const IColumn &, size_t, OutputStream &, Args & ...) const;
|
||||
template <typename ... Params>
|
||||
using SerializeFunctionPtr = void (IDataType::*)(const IColumn &, size_t, Params ...) const;
|
||||
|
||||
template <typename OutputStream, typename ... Args>
|
||||
void serializeImpl(const IColumn & column, size_t row_num, SerializeFunctionPtr<OutputStream, Args ...> func,
|
||||
OutputStream & ostr, Args & ... args) const;
|
||||
template <typename... Params, typename... Args>
|
||||
void serializeImpl(const IColumn & column, size_t row_num, SerializeFunctionPtr<Params...> func, Args &&... args) const;
|
||||
|
||||
template <typename ... Args>
|
||||
using DeserializeFunctionPtr = void (IDataType::*)(IColumn &, ReadBuffer &, Args & ...) const;
|
||||
template <typename ... Params>
|
||||
using DeserializeFunctionPtr = void (IDataType::*)(IColumn &, Params ...) const;
|
||||
|
||||
template <typename ... Args>
|
||||
void deserializeImpl(IColumn & column, DeserializeFunctionPtr<Args ...> func,
|
||||
ReadBuffer & istr, Args & ... args) const;
|
||||
template <typename ... Params, typename... Args>
|
||||
void deserializeImpl(IColumn & column, DeserializeFunctionPtr<Params...> func, Args &&... args) const;
|
||||
|
||||
template <typename Creator>
|
||||
static MutableColumnUniquePtr createColumnUniqueImpl(const IDataType & keys_type, const Creator & creator);
|
||||
|
@ -318,6 +318,26 @@ void DataTypeNullable::serializeProtobuf(const IColumn & column, size_t row_num,
|
||||
nested_data_type->serializeProtobuf(col.getNestedColumn(), row_num, protobuf);
|
||||
}
|
||||
|
||||
void DataTypeNullable::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
|
||||
{
|
||||
ColumnNullable & col = static_cast<ColumnNullable &>(column);
|
||||
IColumn & nested_column = col.getNestedColumn();
|
||||
size_t old_size = nested_column.size();
|
||||
try
|
||||
{
|
||||
nested_data_type->deserializeProtobuf(nested_column, protobuf, allow_add_row, row_added);
|
||||
if (row_added)
|
||||
col.getNullMapData().push_back(0);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
nested_column.popBack(nested_column.size() - old_size);
|
||||
col.getNullMapData().resize_assume_reserved(old_size);
|
||||
row_added = false;
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
MutableColumnPtr DataTypeNullable::createColumn() const
|
||||
{
|
||||
return ColumnNullable::create(nested_data_type->createColumn(), ColumnUInt8::create());
|
||||
|
@ -71,6 +71,7 @@ public:
|
||||
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
|
||||
|
||||
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
|
||||
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
|
||||
|
||||
MutableColumnPtr createColumn() const override;
|
||||
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <Common/NaNUtils.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Formats/FormatSettings.h>
|
||||
#include <Formats/ProtobufReader.h>
|
||||
#include <Formats/ProtobufWriter.h>
|
||||
|
||||
|
||||
@ -210,6 +211,25 @@ void DataTypeNumberBase<T>::serializeProtobuf(const IColumn & column, size_t row
|
||||
}
|
||||
|
||||
|
||||
template <typename T>
|
||||
void DataTypeNumberBase<T>::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
|
||||
{
|
||||
row_added = false;
|
||||
T value;
|
||||
if (!protobuf.readNumber(value))
|
||||
return;
|
||||
|
||||
auto & container = typeid_cast<ColumnVector<T> &>(column).getData();
|
||||
if (allow_add_row)
|
||||
{
|
||||
container.emplace_back(value);
|
||||
row_added = true;
|
||||
}
|
||||
else
|
||||
container.back() = value;
|
||||
}
|
||||
|
||||
|
||||
template <typename T>
|
||||
MutableColumnPtr DataTypeNumberBase<T>::createColumn() const
|
||||
{
|
||||
|
@ -37,6 +37,7 @@ public:
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override;
|
||||
|
||||
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
|
||||
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
|
||||
|
||||
MutableColumnPtr createColumn() const override;
|
||||
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <Common/typeid_cast.h>
|
||||
|
||||
#include <Formats/FormatSettings.h>
|
||||
#include <Formats/ProtobufReader.h>
|
||||
#include <Formats/ProtobufWriter.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
@ -237,10 +238,8 @@ static inline void read(IColumn & column, Reader && reader)
|
||||
ColumnString & column_string = static_cast<ColumnString &>(column);
|
||||
ColumnString::Chars & data = column_string.getChars();
|
||||
ColumnString::Offsets & offsets = column_string.getOffsets();
|
||||
|
||||
size_t old_chars_size = data.size();
|
||||
size_t old_offsets_size = offsets.size();
|
||||
|
||||
try
|
||||
{
|
||||
reader(data);
|
||||
@ -310,6 +309,48 @@ void DataTypeString::serializeProtobuf(const IColumn & column, size_t row_num, P
|
||||
}
|
||||
|
||||
|
||||
void DataTypeString::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
|
||||
{
|
||||
row_added = false;
|
||||
auto & column_string = static_cast<ColumnString &>(column);
|
||||
ColumnString::Chars & data = column_string.getChars();
|
||||
ColumnString::Offsets & offsets = column_string.getOffsets();
|
||||
size_t old_size = offsets.size();
|
||||
try
|
||||
{
|
||||
if (allow_add_row)
|
||||
{
|
||||
if (protobuf.readStringInto(data))
|
||||
{
|
||||
data.emplace_back(0);
|
||||
offsets.emplace_back(data.size());
|
||||
row_added = true;
|
||||
}
|
||||
else
|
||||
data.resize_assume_reserved(offsets.back());
|
||||
}
|
||||
else
|
||||
{
|
||||
ColumnString::Chars temp_data;
|
||||
if (protobuf.readStringInto(temp_data))
|
||||
{
|
||||
temp_data.emplace_back(0);
|
||||
column_string.popBack(1);
|
||||
old_size = offsets.size();
|
||||
data.insertSmallAllowReadWriteOverflow15(temp_data.begin(), temp_data.end());
|
||||
offsets.emplace_back(data.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
offsets.resize_assume_reserved(old_size);
|
||||
data.resize_assume_reserved(offsets.back());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
MutableColumnPtr DataTypeString::createColumn() const
|
||||
{
|
||||
return ColumnString::create();
|
||||
|
@ -46,6 +46,7 @@ public:
|
||||
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
|
||||
|
||||
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
|
||||
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
|
||||
|
||||
MutableColumnPtr createColumn() const override;
|
||||
|
||||
|
@ -413,6 +413,22 @@ void DataTypeTuple::serializeProtobuf(const IColumn & column, size_t row_num, Pr
|
||||
elems[i]->serializeProtobuf(extractElementColumn(column, i), row_num, protobuf);
|
||||
}
|
||||
|
||||
void DataTypeTuple::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
|
||||
{
|
||||
row_added = false;
|
||||
bool all_elements_get_row = true;
|
||||
addElementSafe(elems, column, [&]
|
||||
{
|
||||
for (const auto & i : ext::range(0, ext::size(elems)))
|
||||
{
|
||||
bool element_row_added;
|
||||
elems[i]->deserializeProtobuf(extractElementColumn(column, i), protobuf, allow_add_row, element_row_added);
|
||||
all_elements_get_row &= element_row_added;
|
||||
}
|
||||
});
|
||||
row_added = all_elements_get_row;
|
||||
}
|
||||
|
||||
MutableColumnPtr DataTypeTuple::createColumn() const
|
||||
{
|
||||
size_t size = elems.size();
|
||||
|
@ -78,6 +78,7 @@ public:
|
||||
DeserializeBinaryBulkStatePtr & state) const override;
|
||||
|
||||
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
|
||||
void deserializeProtobuf(IColumn & column, ProtobufReader & reader, bool allow_add_row, bool & row_added) const override;
|
||||
|
||||
MutableColumnPtr createColumn() const override;
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <DataTypes/DataTypeUUID.h>
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Formats/ProtobufReader.h>
|
||||
#include <Formats/ProtobufWriter.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
@ -77,6 +78,22 @@ void DataTypeUUID::serializeProtobuf(const IColumn & column, size_t row_num, Pro
|
||||
protobuf.writeUUID(UUID(static_cast<const ColumnUInt128 &>(column).getData()[row_num]));
|
||||
}
|
||||
|
||||
void DataTypeUUID::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
|
||||
{
|
||||
row_added = false;
|
||||
UUID uuid;
|
||||
if (!protobuf.readUUID(uuid))
|
||||
return;
|
||||
|
||||
auto & container = static_cast<ColumnUInt128 &>(column).getData();
|
||||
if (allow_add_row)
|
||||
{
|
||||
container.emplace_back(uuid);
|
||||
row_added = true;
|
||||
}
|
||||
else
|
||||
container.back() = uuid;
|
||||
}
|
||||
|
||||
bool DataTypeUUID::equals(const IDataType & rhs) const
|
||||
{
|
||||
|
@ -25,6 +25,7 @@ public:
|
||||
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
|
||||
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
|
||||
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
|
||||
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
|
||||
|
||||
bool canBeUsedInBitOperations() const override { return true; }
|
||||
bool canBeInsideNullable() const override { return true; }
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <DataTypes/DataTypesDecimal.h>
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
#include <Formats/ProtobufReader.h>
|
||||
#include <Formats/ProtobufWriter.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
@ -141,6 +142,25 @@ void DataTypeDecimal<T>::serializeProtobuf(const IColumn & column, size_t row_nu
|
||||
}
|
||||
|
||||
|
||||
template <typename T>
|
||||
void DataTypeDecimal<T>::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
|
||||
{
|
||||
row_added = false;
|
||||
T decimal;
|
||||
if (!protobuf.readDecimal(decimal, precision, scale))
|
||||
return;
|
||||
|
||||
auto & container = static_cast<ColumnType &>(column).getData();
|
||||
if (allow_add_row)
|
||||
{
|
||||
container.emplace_back(decimal);
|
||||
row_added = true;
|
||||
}
|
||||
else
|
||||
container.back() = decimal;
|
||||
}
|
||||
|
||||
|
||||
template <typename T>
|
||||
Field DataTypeDecimal<T>::getDefault() const
|
||||
{
|
||||
|
@ -101,6 +101,7 @@ public:
|
||||
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override;
|
||||
|
||||
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
|
||||
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
|
||||
|
||||
Field getDefault() const override;
|
||||
bool canBePromoted() const override { return true; }
|
||||
|
@ -23,6 +23,7 @@ using MutableColumnPtr = COWPtr<IColumn>::MutablePtr;
|
||||
using DataTypePtr = std::shared_ptr<const IDataType>;
|
||||
using DataTypes = std::vector<DataTypePtr>;
|
||||
|
||||
class ProtobufReader;
|
||||
class ProtobufWriter;
|
||||
|
||||
|
||||
@ -254,6 +255,7 @@ public:
|
||||
|
||||
/** Serialize to a protobuf. */
|
||||
virtual void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const = 0;
|
||||
virtual void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const = 0;
|
||||
|
||||
protected:
|
||||
virtual String doGetName() const;
|
||||
|
@ -27,8 +27,9 @@ public:
|
||||
void serializeBinaryBulk(const IColumn &, WriteBuffer &, size_t, size_t) const override { throwNoSerialization(); }
|
||||
void deserializeBinaryBulk(IColumn &, ReadBuffer &, size_t, double) const override { throwNoSerialization(); }
|
||||
void serializeText(const IColumn &, size_t, WriteBuffer &, const FormatSettings &) const override { throwNoSerialization(); }
|
||||
void deserializeText(IColumn &, ReadBuffer &, const FormatSettings &) const override { throwNoSerialization(); }
|
||||
void serializeProtobuf(const IColumn &, size_t, ProtobufWriter &) const override { throwNoSerialization(); }
|
||||
void deserializeText(IColumn &, ReadBuffer &, const FormatSettings &) const override { throwNoSerialization(); }
|
||||
void serializeProtobuf(const IColumn &, size_t, ProtobufWriter &) const override { throwNoSerialization(); }
|
||||
void deserializeProtobuf(IColumn &, ProtobufReader &, bool, bool &) const override { throwNoSerialization(); }
|
||||
|
||||
MutableColumnPtr createColumn() const override
|
||||
{
|
||||
@ -50,4 +51,3 @@ public:
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user