Improved serialization for data types combined of Arrays and Tuples.

Improved matching enum data types to protobuf enum type.
Fixed serialization of the Map data type.
Omitted values are now set by default.
This commit is contained in:
Vitaly Baranov 2021-01-11 04:50:30 +03:00
parent 1795d45609
commit 18e036d19b
73 changed files with 3990 additions and 3079 deletions

View File

@ -13,6 +13,7 @@ RUN apt-get update -y \
ncdu \
netcat-openbsd \
openssl \
protobuf-compiler \
python3 \
python3-lxml \
python3-requests \

View File

@ -446,4 +446,18 @@ void ColumnFixedString::getExtremes(Field & min, Field & max) const
get(max_idx, max);
}
void ColumnFixedString::alignStringLength(ColumnFixedString::Chars & data, size_t n, size_t old_size)
{
size_t length = data.size() - old_size;
if (length < n)
{
data.resize_fill(old_size + n);
}
else if (length > n)
{
data.resize_assume_reserved(old_size);
throw Exception("Too large value for FixedString(" + std::to_string(n) + ")", ErrorCodes::TOO_LARGE_STRING_SIZE);
}
}
}

View File

@ -182,7 +182,8 @@ public:
const Chars & getChars() const { return chars; }
size_t getN() const { return n; }
static void alignStringLength(ColumnFixedString::Chars & data, size_t n, size_t old_size);
};
}

View File

@ -404,7 +404,7 @@
M(432, UNKNOWN_CODEC) \
M(433, ILLEGAL_CODEC_PARAMETER) \
M(434, CANNOT_PARSE_PROTOBUF_SCHEMA) \
M(435, NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD) \
M(435, NO_COLUMN_SERIALIZED_TO_REQUIRED_PROTOBUF_FIELD) \
M(436, PROTOBUF_BAD_CAST) \
M(437, PROTOBUF_FIELD_NOT_REPEATED) \
M(438, DATA_TYPE_CANNOT_BE_PROMOTED) \
@ -412,7 +412,7 @@
M(440, INVALID_LIMIT_EXPRESSION) \
M(441, CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING) \
M(442, BAD_DATABASE_FOR_TEMPORARY_TABLE) \
M(443, NO_COMMON_COLUMNS_WITH_PROTOBUF_SCHEMA) \
M(443, NO_COLUMNS_SERIALIZED_TO_PROTOBUF_FIELDS) \
M(444, UNKNOWN_PROTOBUF_FORMAT) \
M(445, CANNOT_MPROTECT) \
M(446, FUNCTION_NOT_ALLOWED) \
@ -535,6 +535,8 @@
M(566, CANNOT_RMDIR) \
M(567, DUPLICATED_PART_UUIDS) \
M(568, RAFT_ERROR) \
M(569, MULTIPLE_COLUMNS_SERIALIZED_TO_SAME_PROTOBUF_FIELD) \
M(570, DATA_TYPE_INCOMPATIBLE_WITH_PROTOBUF_FIELD) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -10,8 +10,6 @@
#include <Common/AlignedBuffer.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/WriteBufferFromString.h>
@ -261,45 +259,6 @@ void DataTypeAggregateFunction::deserializeTextCSV(IColumn & column, ReadBuffer
}
void DataTypeAggregateFunction::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
if (value_index)
return;
value_index = static_cast<bool>(
protobuf.writeAggregateFunction(function, assert_cast<const ColumnAggregateFunction &>(column).getData()[row_num]));
}
void DataTypeAggregateFunction::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
ColumnAggregateFunction & column_concrete = assert_cast<ColumnAggregateFunction &>(column);
Arena & arena = column_concrete.createOrGetArena();
size_t size_of_state = function->sizeOfData();
AggregateDataPtr place = arena.alignedAlloc(size_of_state, function->alignOfData());
function->create(place);
try
{
if (!protobuf.readAggregateFunction(function, place, arena))
{
function->destroy(place);
return;
}
auto & container = column_concrete.getData();
if (allow_add_row)
{
container.emplace_back(place);
row_added = true;
}
else
container.back() = place;
}
catch (...)
{
function->destroy(place);
throw;
}
}
MutableColumnPtr DataTypeAggregateFunction::createColumn() const
{
return ColumnAggregateFunction::create(function);

View File

@ -59,8 +59,6 @@ public:
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;

View File

@ -6,7 +6,6 @@
#include <IO/WriteBufferFromString.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufReader.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeFactory.h>
@ -522,55 +521,6 @@ void DataTypeArray::deserializeTextCSV(IColumn & column, ReadBuffer & istr, cons
}
void DataTypeArray::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
const ColumnArray & column_array = assert_cast<const ColumnArray &>(column);
const ColumnArray::Offsets & offsets = column_array.getOffsets();
size_t offset = offsets[row_num - 1] + value_index;
size_t next_offset = offsets[row_num];
const IColumn & nested_column = column_array.getData();
size_t i;
for (i = offset; i < next_offset; ++i)
{
size_t element_stored = 0;
nested->serializeProtobuf(nested_column, i, protobuf, element_stored);
if (!element_stored)
break;
}
value_index += i - offset;
}
void DataTypeArray::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
ColumnArray & column_array = assert_cast<ColumnArray &>(column);
IColumn & nested_column = column_array.getData();
ColumnArray::Offsets & offsets = column_array.getOffsets();
size_t old_size = offsets.size();
try
{
bool nested_row_added;
do
nested->deserializeProtobuf(nested_column, protobuf, true, nested_row_added);
while (nested_row_added && protobuf.canReadMoreValues());
if (allow_add_row)
{
offsets.emplace_back(nested_column.size());
row_added = true;
}
else
offsets.back() = nested_column.size();
}
catch (...)
{
offsets.resize_assume_reserved(old_size);
nested_column.popBack(nested_column.size() - offsets.back());
throw;
}
}
MutableColumnPtr DataTypeArray::createColumn() const
{
return ColumnArray::create(nested->createColumn(), ColumnArray::ColumnOffsets::create());

View File

@ -85,15 +85,6 @@ public:
DeserializeBinaryBulkStatePtr & state,
SubstreamsCache * cache) const override;
void serializeProtobuf(const IColumn & column,
size_t row_num,
ProtobufWriter & protobuf,
size_t & value_index) const override;
void deserializeProtobuf(IColumn & column,
ProtobufReader & protobuf,
bool allow_add_row,
bool & row_added) const override;
MutableColumnPtr createColumn() const override;
Field getDefault() const override;

View File

@ -4,8 +4,6 @@
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <Common/assert_cast.h>
@ -81,30 +79,6 @@ void DataTypeDate::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const
assert_cast<ColumnUInt16 &>(column).getData().push_back(value.getDayNum());
}
void DataTypeDate::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
if (value_index)
return;
value_index = static_cast<bool>(protobuf.writeDate(DayNum(assert_cast<const ColumnUInt16 &>(column).getData()[row_num])));
}
void DataTypeDate::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
DayNum d;
if (!protobuf.readDate(d))
return;
auto & container = assert_cast<ColumnUInt16 &>(column).getData();
if (allow_add_row)
{
container.emplace_back(d);
row_added = true;
}
else
container.back() = d;
}
bool DataTypeDate::equals(const IDataType & rhs) const
{
return typeid(rhs) == typeid(*this);

View File

@ -24,8 +24,6 @@ public:
void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
bool canBeUsedAsVersion() const override { return true; }
bool canBeInsideNullable() const override { return true; }

View File

@ -5,8 +5,6 @@
#include <common/DateLUT.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <IO/Operators.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
@ -164,32 +162,6 @@ void DataTypeDateTime::deserializeTextCSV(IColumn & column, ReadBuffer & istr, c
assert_cast<ColumnType &>(column).getData().push_back(x);
}
void DataTypeDateTime::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
if (value_index)
return;
// On some platforms `time_t` is `long` but not `unsigned int` (UInt32 that we store in column), hence static_cast.
value_index = static_cast<bool>(protobuf.writeDateTime(static_cast<time_t>(assert_cast<const ColumnType &>(column).getData()[row_num])));
}
void DataTypeDateTime::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
time_t t;
if (!protobuf.readDateTime(t))
return;
auto & container = assert_cast<ColumnType &>(column).getData();
if (allow_add_row)
{
container.emplace_back(t);
row_added = true;
}
else
container.back() = t;
}
bool DataTypeDateTime::equals(const IDataType & rhs) const
{
/// DateTime with different timezones are equal, because:

View File

@ -68,8 +68,6 @@ public:
void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
bool canBeUsedAsVersion() const override { return true; }
bool canBeInsideNullable() const override { return true; }

View File

@ -6,8 +6,6 @@
#include <common/DateLUT.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <IO/Operators.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
@ -182,30 +180,6 @@ void DataTypeDateTime64::deserializeTextCSV(IColumn & column, ReadBuffer & istr,
assert_cast<ColumnType &>(column).getData().push_back(x);
}
void DataTypeDateTime64::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
if (value_index)
return;
value_index = static_cast<bool>(protobuf.writeDateTime64(assert_cast<const ColumnType &>(column).getData()[row_num], scale));
}
void DataTypeDateTime64::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
DateTime64 t = 0;
if (!protobuf.readDateTime64(t, scale))
return;
auto & container = assert_cast<ColumnType &>(column).getData();
if (allow_add_row)
{
container.emplace_back(t);
row_added = true;
}
else
container.back() = t;
}
bool DataTypeDateTime64::equals(const IDataType & rhs) const
{
if (const auto * ptype = typeid_cast<const DataTypeDateTime64 *>(&rhs))

View File

@ -42,8 +42,6 @@ public:
void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
bool equals(const IDataType & rhs) const override;

View File

@ -4,8 +4,6 @@
#include <Common/typeid_cast.h>
#include <Core/DecimalFunctions.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>

View File

@ -1,7 +1,5 @@
#include <IO/WriteBufferFromString.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeFactory.h>
#include <Parsers/IAST.h>
@ -254,34 +252,6 @@ void DataTypeEnum<Type>::deserializeBinaryBulk(
x.resize(initial_size + size / sizeof(FieldType));
}
template <typename Type>
void DataTypeEnum<Type>::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
if (value_index)
return;
protobuf.prepareEnumMapping(values);
value_index = static_cast<bool>(protobuf.writeEnum(assert_cast<const ColumnType &>(column).getData()[row_num]));
}
template<typename Type>
void DataTypeEnum<Type>::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
protobuf.prepareEnumMapping(values);
row_added = false;
Type value;
if (!protobuf.readEnum(value))
return;
auto & container = assert_cast<ColumnType &>(column).getData();
if (allow_add_row)
{
container.emplace_back(value);
row_added = true;
}
else
container.back() = value;
}
template <typename Type>
Field DataTypeEnum<Type>::getDefault() const
{

View File

@ -132,9 +132,6 @@ public:
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, const size_t offset, size_t limit) const override;
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, const size_t limit, const double avg_value_size_hint) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override { return ColumnType::create(); }
Field getDefault() const override;

View File

@ -2,8 +2,6 @@
#include <Columns/ColumnConst.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeFactory.h>
@ -25,7 +23,6 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_READ_ALL_DATA;
extern const int TOO_LARGE_STRING_SIZE;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int UNEXPECTED_AST_STRUCTURE;
}
@ -127,16 +124,7 @@ static inline void alignStringLength(const DataTypeFixedString & type,
ColumnFixedString::Chars & data,
size_t string_start)
{
size_t length = data.size() - string_start;
if (length < type.getN())
{
data.resize_fill(string_start + type.getN());
}
else if (length > type.getN())
{
data.resize_assume_reserved(string_start);
throw Exception("Too large value for " + type.getName(), ErrorCodes::TOO_LARGE_STRING_SIZE);
}
ColumnFixedString::alignStringLength(data, type.getN(), string_start);
}
template <typename Reader>
@ -215,53 +203,6 @@ void DataTypeFixedString::deserializeTextCSV(IColumn & column, ReadBuffer & istr
}
void DataTypeFixedString::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
if (value_index)
return;
const char * pos = reinterpret_cast<const char *>(&assert_cast<const ColumnFixedString &>(column).getChars()[n * row_num]);
value_index = static_cast<bool>(protobuf.writeString(StringRef(pos, n)));
}
void DataTypeFixedString::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
auto & column_string = assert_cast<ColumnFixedString &>(column);
ColumnFixedString::Chars & data = column_string.getChars();
size_t old_size = data.size();
try
{
if (allow_add_row)
{
if (protobuf.readStringInto(data))
{
alignStringLength(*this, data, old_size);
row_added = true;
}
else
data.resize_assume_reserved(old_size);
}
else
{
ColumnFixedString::Chars temp_data;
if (protobuf.readStringInto(temp_data))
{
alignStringLength(*this, temp_data, 0);
column_string.popBack(1);
old_size = data.size();
data.insertSmallAllowReadWriteOverflow15(temp_data.begin(), temp_data.end());
}
}
}
catch (...)
{
data.resize_assume_reserved(old_size);
throw;
}
}
MutableColumnPtr DataTypeFixedString::createColumn() const
{
return ColumnFixedString::create(n);

View File

@ -66,9 +66,6 @@ public:
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;
Field getDefault() const override;

View File

@ -808,31 +808,6 @@ void DataTypeLowCardinality::serializeTextXML(const IColumn & column, size_t row
serializeImpl(column, row_num, &IDataType::serializeAsTextXML, ostr, settings);
}
void DataTypeLowCardinality::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
serializeImpl(column, row_num, &IDataType::serializeProtobuf, protobuf, value_index);
}
void DataTypeLowCardinality::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
if (allow_add_row)
{
deserializeImpl(column, &IDataType::deserializeProtobuf, protobuf, true, row_added);
return;
}
row_added = false;
auto & low_cardinality_column= getColumnLowCardinality(column);
auto nested_column = low_cardinality_column.getDictionary().getNestedColumn();
auto temp_column = nested_column->cloneEmpty();
size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(low_cardinality_column.size() - 1);
temp_column->insertFrom(*nested_column, unique_row_number);
bool dummy;
dictionary_type.get()->deserializeProtobuf(*temp_column, protobuf, false, dummy);
low_cardinality_column.popBack(1);
low_cardinality_column.insertFromFullColumn(*temp_column, 0);
}
template <typename... Params, typename... Args>
void DataTypeLowCardinality::serializeImpl(
const IColumn & column, size_t row_num, DataTypeLowCardinality::SerializeFunctionPtr<Params...> func, Args &&... args) const

View File

@ -65,8 +65,6 @@ public:
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override;
void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;

View File

@ -336,16 +336,6 @@ void DataTypeMap::deserializeBinaryBulkWithMultipleStreamsImpl(
nested->deserializeBinaryBulkWithMultipleStreams(column_map.getNestedColumnPtr(), limit, settings, state, cache);
}
void DataTypeMap::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
nested->serializeProtobuf(extractNestedColumn(column), row_num, protobuf, value_index);
}
void DataTypeMap::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
nested->deserializeProtobuf(extractNestedColumn(column), protobuf, allow_add_row, row_added);
}
MutableColumnPtr DataTypeMap::createColumn() const
{
return ColumnMap::create(nested->createColumn());

View File

@ -76,9 +76,6 @@ public:
DeserializeBinaryBulkStatePtr & state,
SubstreamsCache * cache) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;
Field getDefault() const override;
@ -92,6 +89,8 @@ public:
const DataTypePtr & getValueType() const { return value_type; }
DataTypes getKeyValueTypes() const { return {key_type, value_type}; }
const DataTypePtr & getNestedType() const { return nested; }
private:
template <typename Writer>
void serializeTextImpl(const IColumn & column, size_t row_num, WriteBuffer & ostr, Writer && writer) const;

View File

@ -486,33 +486,6 @@ void DataTypeNullable::serializeTextXML(const IColumn & column, size_t row_num,
nested_data_type->serializeAsTextXML(col.getNestedColumn(), row_num, ostr, settings);
}
void DataTypeNullable::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
const ColumnNullable & col = assert_cast<const ColumnNullable &>(column);
if (!col.isNullAt(row_num))
nested_data_type->serializeProtobuf(col.getNestedColumn(), row_num, protobuf, value_index);
}
void DataTypeNullable::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
ColumnNullable & col = assert_cast<ColumnNullable &>(column);
IColumn & nested_column = col.getNestedColumn();
size_t old_size = nested_column.size();
try
{
nested_data_type->deserializeProtobuf(nested_column, protobuf, allow_add_row, row_added);
if (row_added)
col.getNullMapData().push_back(0);
}
catch (...)
{
nested_column.popBack(nested_column.size() - old_size);
col.getNullMapData().resize_assume_reserved(old_size);
row_added = false;
throw;
}
}
MutableColumnPtr DataTypeNullable::createColumn() const
{
return ColumnNullable::create(nested_data_type->createColumn(), ColumnUInt8::create());

View File

@ -73,9 +73,6 @@ public:
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;
Field getDefault() const override;

View File

@ -8,8 +8,6 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
namespace DB
@ -205,34 +203,6 @@ void DataTypeNumberBase<T>::deserializeBinaryBulk(IColumn & column, ReadBuffer &
}
template <typename T>
void DataTypeNumberBase<T>::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
if (value_index)
return;
value_index = static_cast<bool>(protobuf.writeNumber(assert_cast<const ColumnVector<T> &>(column).getData()[row_num]));
}
template <typename T>
void DataTypeNumberBase<T>::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
T value;
if (!protobuf.readNumber(value))
return;
auto & container = typeid_cast<ColumnVector<T> &>(column).getData();
if (allow_add_row)
{
container.emplace_back(value);
row_added = true;
}
else
container.back() = value;
}
template <typename T>
MutableColumnPtr DataTypeNumberBase<T>::createColumn() const
{

View File

@ -45,9 +45,6 @@ public:
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override;
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;
bool isParametric() const override { return false; }

View File

@ -9,8 +9,6 @@
#include <Core/Field.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFactory.h>
@ -311,55 +309,6 @@ void DataTypeString::deserializeTextCSV(IColumn & column, ReadBuffer & istr, con
}
void DataTypeString::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
if (value_index)
return;
value_index = static_cast<bool>(protobuf.writeString(assert_cast<const ColumnString &>(column).getDataAt(row_num)));
}
void DataTypeString::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
auto & column_string = assert_cast<ColumnString &>(column);
ColumnString::Chars & data = column_string.getChars();
ColumnString::Offsets & offsets = column_string.getOffsets();
size_t old_size = offsets.size();
try
{
if (allow_add_row)
{
if (protobuf.readStringInto(data))
{
data.emplace_back(0);
offsets.emplace_back(data.size());
row_added = true;
}
else
data.resize_assume_reserved(offsets.back());
}
else
{
ColumnString::Chars temp_data;
if (protobuf.readStringInto(temp_data))
{
temp_data.emplace_back(0);
column_string.popBack(1);
old_size = offsets.size();
data.insertSmallAllowReadWriteOverflow15(temp_data.begin(), temp_data.end());
offsets.emplace_back(data.size());
}
}
}
catch (...)
{
offsets.resize_assume_reserved(old_size);
data.resize_assume_reserved(offsets.back());
throw;
}
}
Field DataTypeString::getDefault() const
{
return String();

View File

@ -47,9 +47,6 @@ public:
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;
Field getDefault() const override;

View File

@ -504,33 +504,6 @@ void DataTypeTuple::deserializeBinaryBulkWithMultipleStreamsImpl(
settings.path.pop_back();
}
void DataTypeTuple::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
for (; value_index < elems.size(); ++value_index)
{
size_t stored = 0;
elems[value_index]->serializeProtobuf(extractElementColumn(column, value_index), row_num, protobuf, stored);
if (!stored)
break;
}
}
void DataTypeTuple::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
bool all_elements_get_row = true;
addElementSafe(elems, column, [&]
{
for (const auto & i : ext::range(0, ext::size(elems)))
{
bool element_row_added;
elems[i]->deserializeProtobuf(extractElementColumn(column, i), protobuf, allow_add_row, element_row_added);
all_elements_get_row &= element_row_added;
}
});
row_added = all_elements_get_row;
}
MutableColumnPtr DataTypeTuple::createColumn() const
{
size_t size = elems.size();

View File

@ -81,9 +81,6 @@ public:
DeserializeBinaryBulkStatePtr & state,
SubstreamsCache * cache) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;
Field getDefault() const override;

View File

@ -1,8 +1,6 @@
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeFactory.h>
#include <Columns/ColumnsNumber.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Common/assert_cast.h>
@ -79,30 +77,6 @@ void DataTypeUUID::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const
assert_cast<ColumnUInt128 &>(column).getData().push_back(value);
}
void DataTypeUUID::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
if (value_index)
return;
value_index = static_cast<bool>(protobuf.writeUUID(UUID(assert_cast<const ColumnUInt128 &>(column).getData()[row_num])));
}
void DataTypeUUID::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
UUID uuid;
if (!protobuf.readUUID(uuid))
return;
auto & container = assert_cast<ColumnUInt128 &>(column).getData();
if (allow_add_row)
{
container.emplace_back(uuid);
row_added = true;
}
else
container.back() = uuid;
}
bool DataTypeUUID::equals(const IDataType & rhs) const
{
return typeid(rhs) == typeid(*this);

View File

@ -26,8 +26,6 @@ public:
void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
bool canBeUsedInBitOperations() const override { return true; }
bool canBeInsideNullable() const override { return true; }

View File

@ -4,8 +4,6 @@
#include <Common/typeid_cast.h>
#include <Core/DecimalFunctions.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/readDecimalText.h>
@ -111,33 +109,6 @@ T DataTypeDecimal<T>::parseFromString(const String & str) const
return x;
}
template <typename T>
void DataTypeDecimal<T>::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
if (value_index)
return;
value_index = static_cast<bool>(protobuf.writeDecimal(assert_cast<const ColumnType &>(column).getData()[row_num], this->scale));
}
template <typename T>
void DataTypeDecimal<T>::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
T decimal;
if (!protobuf.readDecimal(decimal, this->precision, this->scale))
return;
auto & container = assert_cast<ColumnType &>(column).getData();
if (allow_add_row)
{
container.emplace_back(decimal);
row_added = true;
}
else
container.back() = decimal;
}
static DataTypePtr create(const ASTPtr & arguments)
{

View File

@ -46,9 +46,6 @@ public:
void deserializeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
bool equals(const IDataType & rhs) const override;
T parseFromString(const String & str) const;

View File

@ -26,9 +26,6 @@ class Field;
using DataTypePtr = std::shared_ptr<const IDataType>;
using DataTypes = std::vector<DataTypePtr>;
class ProtobufReader;
class ProtobufWriter;
struct NameAndTypePair;
@ -235,10 +232,6 @@ public:
/// If method will throw an exception, then column will be in same state as before call to method.
virtual void deserializeBinary(IColumn & column, ReadBuffer & istr) const = 0;
/** Serialize to a protobuf. */
virtual void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const = 0;
virtual void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const = 0;
/** Text serialization with escaping but without quoting.
*/
void serializeAsTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const;

View File

@ -34,8 +34,6 @@ public:
void deserializeBinaryBulk(IColumn &, ReadBuffer &, size_t, double) const override { throwNoSerialization(); }
void serializeText(const IColumn &, size_t, WriteBuffer &, const FormatSettings &) const override { throwNoSerialization(); }
void deserializeText(IColumn &, ReadBuffer &, const FormatSettings &) const override { throwNoSerialization(); }
void serializeProtobuf(const IColumn &, size_t, ProtobufWriter &, size_t &) const override { throwNoSerialization(); }
void deserializeProtobuf(IColumn &, ProtobufReader &, bool, bool &) const override { throwNoSerialization(); }
MutableColumnPtr createColumn() const override
{

View File

@ -120,7 +120,6 @@ struct FormatSettings
struct
{
bool write_row_delimiters = true;
/**
* Some buffers (kafka / rabbit) split the rows internally using callback,
* and always send one row per message, so we can push there formats
@ -128,7 +127,7 @@ struct FormatSettings
* we have to enforce exporting at most one row in the format output,
* because Protobuf without delimiters is not generally useful.
*/
bool allow_many_rows_no_delimiters = false;
bool allow_multiple_rows_without_delimiter = false;
} protobuf;
struct

View File

@ -1,55 +0,0 @@
#include "ProtobufColumnMatcher.h"
#if USE_PROTOBUF
#include <Common/Exception.h>
#include <google/protobuf/descriptor.pb.h>
#include <Poco/String.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NO_COMMON_COLUMNS_WITH_PROTOBUF_SCHEMA;
}
namespace
{
String columnNameToSearchableForm(const String & str)
{
return Poco::replace(Poco::toUpper(str), ".", "_");
}
}
namespace ProtobufColumnMatcher
{
namespace details
{
ColumnNameMatcher::ColumnNameMatcher(const std::vector<String> & column_names) : column_usage(column_names.size())
{
column_usage.resize(column_names.size(), false);
for (size_t i = 0; i != column_names.size(); ++i)
column_name_to_index_map.emplace(columnNameToSearchableForm(column_names[i]), i);
}
size_t ColumnNameMatcher::findColumn(const String & field_name)
{
auto it = column_name_to_index_map.find(columnNameToSearchableForm(field_name));
if (it == column_name_to_index_map.end())
return -1;
size_t column_index = it->second;
if (column_usage[column_index])
return -1;
column_usage[column_index] = true;
return column_index;
}
void throwNoCommonColumns()
{
throw Exception("No common columns with provided protobuf schema", ErrorCodes::NO_COMMON_COLUMNS_WITH_PROTOBUF_SCHEMA);
}
}
}
}
#endif

View File

@ -1,196 +0,0 @@
#pragma once
#if !defined(ARCADIA_BUILD)
# include "config_formats.h"
#endif
#if USE_PROTOBUF
# include <memory>
# include <unordered_map>
# include <vector>
# include <common/types.h>
# include <boost/blank.hpp>
# include <google/protobuf/descriptor.h>
# include <google/protobuf/descriptor.pb.h>
namespace google
{
namespace protobuf
{
class Descriptor;
class FieldDescriptor;
}
}
namespace DB
{
namespace ProtobufColumnMatcher
{
struct DefaultTraits
{
using MessageData = boost::blank;
using FieldData = boost::blank;
};
template <typename Traits = DefaultTraits>
struct Message;
/// Represents a field in a protobuf message.
template <typename Traits = DefaultTraits>
struct Field
{
const google::protobuf::FieldDescriptor * field_descriptor = nullptr;
/// Same as field_descriptor->number().
UInt32 field_number = 0;
/// Index of a column; either 'column_index' or 'nested_message' is set.
size_t column_index = -1;
std::unique_ptr<Message<Traits>> nested_message;
typename Traits::FieldData data;
};
/// Represents a protobuf message.
template <typename Traits>
struct Message
{
std::vector<Field<Traits>> fields;
/// Points to the parent message if this is a nested message.
Message * parent = nullptr;
size_t index_in_parent = -1;
typename Traits::MessageData data;
};
/// Utility function finding matching columns for each protobuf field.
template <typename Traits = DefaultTraits>
static std::unique_ptr<Message<Traits>> matchColumns(
const std::vector<String> & column_names,
const google::protobuf::Descriptor * message_type);
template <typename Traits = DefaultTraits>
static std::unique_ptr<Message<Traits>> matchColumns(
const std::vector<String> & column_names,
const google::protobuf::Descriptor * message_type,
std::vector<const google::protobuf::FieldDescriptor *> & field_descriptors_without_match);
namespace details
{
[[noreturn]] void throwNoCommonColumns();
class ColumnNameMatcher
{
public:
ColumnNameMatcher(const std::vector<String> & column_names);
size_t findColumn(const String & field_name);
private:
std::unordered_map<String, size_t> column_name_to_index_map;
std::vector<bool> column_usage;
};
template <typename Traits>
std::unique_ptr<Message<Traits>> matchColumnsRecursive(
ColumnNameMatcher & name_matcher,
const google::protobuf::Descriptor * message_type,
const String & field_name_prefix,
std::vector<const google::protobuf::FieldDescriptor *> * field_descriptors_without_match)
{
auto message = std::make_unique<Message<Traits>>();
for (int i = 0; i != message_type->field_count(); ++i)
{
const google::protobuf::FieldDescriptor * field_descriptor = message_type->field(i);
if ((field_descriptor->type() == google::protobuf::FieldDescriptor::TYPE_MESSAGE)
|| (field_descriptor->type() == google::protobuf::FieldDescriptor::TYPE_GROUP))
{
auto nested_message = matchColumnsRecursive<Traits>(
name_matcher,
field_descriptor->message_type(),
field_name_prefix + field_descriptor->name() + ".",
field_descriptors_without_match);
if (nested_message)
{
message->fields.emplace_back();
auto & current_field = message->fields.back();
current_field.field_number = field_descriptor->number();
current_field.field_descriptor = field_descriptor;
current_field.nested_message = std::move(nested_message);
current_field.nested_message->parent = message.get();
}
}
else
{
size_t column_index = name_matcher.findColumn(field_name_prefix + field_descriptor->name());
if (column_index == static_cast<size_t>(-1))
{
if (field_descriptors_without_match)
field_descriptors_without_match->emplace_back(field_descriptor);
}
else
{
message->fields.emplace_back();
auto & current_field = message->fields.back();
current_field.field_number = field_descriptor->number();
current_field.field_descriptor = field_descriptor;
current_field.column_index = column_index;
}
}
}
if (message->fields.empty())
return nullptr;
// Columns should be sorted by field_number, it's necessary for writing protobufs and useful reading protobufs.
std::sort(message->fields.begin(), message->fields.end(), [](const Field<Traits> & left, const Field<Traits> & right)
{
return left.field_number < right.field_number;
});
for (size_t i = 0; i != message->fields.size(); ++i)
{
auto & field = message->fields[i];
if (field.nested_message)
field.nested_message->index_in_parent = i;
}
return message;
}
}
template <typename Data>
static std::unique_ptr<Message<Data>> matchColumnsImpl(
const std::vector<String> & column_names,
const google::protobuf::Descriptor * message_type,
std::vector<const google::protobuf::FieldDescriptor *> * field_descriptors_without_match)
{
details::ColumnNameMatcher name_matcher(column_names);
auto message = details::matchColumnsRecursive<Data>(name_matcher, message_type, "", field_descriptors_without_match);
if (!message)
details::throwNoCommonColumns();
return message;
}
template <typename Data>
static std::unique_ptr<Message<Data>> matchColumns(
const std::vector<String> & column_names,
const google::protobuf::Descriptor * message_type)
{
return matchColumnsImpl<Data>(column_names, message_type, nullptr);
}
template <typename Data>
static std::unique_ptr<Message<Data>> matchColumns(
const std::vector<String> & column_names,
const google::protobuf::Descriptor * message_type,
std::vector<const google::protobuf::FieldDescriptor *> & field_descriptors_without_match)
{
return matchColumnsImpl<Data>(column_names, message_type, &field_descriptors_without_match);
}
}
}
#endif

File diff suppressed because it is too large Load Diff

View File

@ -1,258 +1,72 @@
#pragma once
#include <common/DayNum.h>
#include <Common/PODArray.h>
#include <Common/UInt128.h>
#include <Core/UUID.h>
#if !defined(ARCADIA_BUILD)
# include "config_formats.h"
# include "config_formats.h"
#endif
#if USE_PROTOBUF
# include <memory>
# include <IO/ReadBuffer.h>
# include <boost/noncopyable.hpp>
# include "ProtobufColumnMatcher.h"
# include <Common/PODArray.h>
# include <IO/ReadBuffer.h>
namespace google
{
namespace protobuf
{
class Descriptor;
}
}
namespace DB
{
class Arena;
class IAggregateFunction;
class ReadBuffer;
using AggregateDataPtr = char *;
using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
/** Deserializes a protobuf, tries to cast data types if necessarily.
*/
class ProtobufReader : private boost::noncopyable
{
public:
ProtobufReader(ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool use_length_delimiters_);
~ProtobufReader();
/// Should be called when we start reading a new message.
bool startMessage();
/// Ends reading a message.
void endMessage(bool ignore_errors = false);
/// Reads the column index.
/// The function returns false if there are no more columns to read (call endMessage() in this case).
bool readColumnIndex(size_t & column_index);
/// Reads a value which should be put to column at index received with readColumnIndex().
/// The function returns false if there are no more values to read now (call readColumnIndex() in this case).
bool readNumber(Int8 & value) { return current_converter->readInt8(value); }
bool readNumber(UInt8 & value) { return current_converter->readUInt8(value); }
bool readNumber(Int16 & value) { return current_converter->readInt16(value); }
bool readNumber(UInt16 & value) { return current_converter->readUInt16(value); }
bool readNumber(Int32 & value) { return current_converter->readInt32(value); }
bool readNumber(UInt32 & value) { return current_converter->readUInt32(value); }
bool readNumber(Int64 & value) { return current_converter->readInt64(value); }
bool readNumber(UInt64 & value) { return current_converter->readUInt64(value); }
bool readNumber(Int128 & value) { return current_converter->readInt128(value); }
bool readNumber(UInt128 & value) { return current_converter->readUInt128(value); }
bool readNumber(Int256 & value) { return current_converter->readInt256(value); }
bool readNumber(UInt256 & value) { return current_converter->readUInt256(value); }
bool readNumber(Float32 & value) { return current_converter->readFloat32(value); }
bool readNumber(Float64 & value) { return current_converter->readFloat64(value); }
bool readStringInto(PaddedPODArray<UInt8> & str) { return current_converter->readStringInto(str); }
void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & name_value_pairs) { current_converter->prepareEnumMapping8(name_value_pairs); }
void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & name_value_pairs) { current_converter->prepareEnumMapping16(name_value_pairs); }
bool readEnum(Int8 & value) { return current_converter->readEnum8(value); }
bool readEnum(Int16 & value) { return current_converter->readEnum16(value); }
bool readUUID(UUID & uuid) { return current_converter->readUUID(uuid); }
bool readDate(DayNum & date) { return current_converter->readDate(date); }
bool readDateTime(time_t & tm) { return current_converter->readDateTime(tm); }
bool readDateTime64(DateTime64 & tm, UInt32 scale) { return current_converter->readDateTime64(tm, scale); }
bool readDecimal(Decimal32 & decimal, UInt32 precision, UInt32 scale) { return current_converter->readDecimal32(decimal, precision, scale); }
bool readDecimal(Decimal64 & decimal, UInt32 precision, UInt32 scale) { return current_converter->readDecimal64(decimal, precision, scale); }
bool readDecimal(Decimal128 & decimal, UInt32 precision, UInt32 scale) { return current_converter->readDecimal128(decimal, precision, scale); }
bool readDecimal(Decimal256 & decimal, UInt32 precision, UInt32 scale) { return current_converter->readDecimal256(decimal, precision, scale); }
bool readAggregateFunction(const AggregateFunctionPtr & function, AggregateDataPtr place, Arena & arena) { return current_converter->readAggregateFunction(function, place, arena); }
/// Call it after calling one of the read*() function to determine if there are more values available for reading.
bool ALWAYS_INLINE canReadMoreValues() const { return simple_reader.canReadMoreValues(); }
private:
class SimpleReader
{
public:
SimpleReader(ReadBuffer & in_, const bool use_length_delimiters_);
bool startMessage();
void endMessage(bool ignore_errors);
void startNestedMessage();
void endNestedMessage();
bool readFieldNumber(UInt32 & field_number);
bool readInt(Int64 & value);
bool readSInt(Int64 & value);
bool readUInt(UInt64 & value);
template<typename T> bool readFixed(T & value);
bool readStringInto(PaddedPODArray<UInt8> & str);
bool ALWAYS_INLINE canReadMoreValues() const { return cursor < field_end; }
private:
void readBinary(void * data, size_t size);
void ignore(UInt64 num_bytes);
void moveCursorBackward(UInt64 num_bytes);
UInt64 ALWAYS_INLINE readVarint()
{
char c;
in.readStrict(c);
UInt64 first_byte = static_cast<UInt8>(c);
++cursor;
if (likely(!(c & 0x80)))
return first_byte;
return continueReadingVarint(first_byte);
}
UInt64 continueReadingVarint(UInt64 first_byte);
void ignoreVarint();
void ignoreGroup();
[[noreturn]] void throwUnknownFormat() const;
ReadBuffer & in;
Int64 cursor;
size_t current_message_level;
Int64 current_message_end;
std::vector<Int64> parent_message_ends;
Int64 field_end;
Int64 last_string_pos;
const bool use_length_delimiters;
};
class IConverter
{
public:
virtual ~IConverter() = default;
virtual bool readStringInto(PaddedPODArray<UInt8> &) = 0;
virtual bool readInt8(Int8&) = 0;
virtual bool readUInt8(UInt8 &) = 0;
virtual bool readInt16(Int16 &) = 0;
virtual bool readUInt16(UInt16 &) = 0;
virtual bool readInt32(Int32 &) = 0;
virtual bool readUInt32(UInt32 &) = 0;
virtual bool readInt64(Int64 &) = 0;
virtual bool readUInt64(UInt64 &) = 0;
virtual bool readInt128(Int128 &) = 0;
virtual bool readUInt128(UInt128 &) = 0;
virtual bool readInt256(Int256 &) = 0;
virtual bool readUInt256(UInt256 &) = 0;
virtual bool readFloat32(Float32 &) = 0;
virtual bool readFloat64(Float64 &) = 0;
virtual void prepareEnumMapping8(const std::vector<std::pair<std::string, Int8>> &) = 0;
virtual void prepareEnumMapping16(const std::vector<std::pair<std::string, Int16>> &) = 0;
virtual bool readEnum8(Int8 &) = 0;
virtual bool readEnum16(Int16 &) = 0;
virtual bool readUUID(UUID &) = 0;
virtual bool readDate(DayNum &) = 0;
virtual bool readDateTime(time_t &) = 0;
virtual bool readDateTime64(DateTime64 &, UInt32) = 0;
virtual bool readDecimal32(Decimal32 &, UInt32, UInt32) = 0;
virtual bool readDecimal64(Decimal64 &, UInt32, UInt32) = 0;
virtual bool readDecimal128(Decimal128 &, UInt32, UInt32) = 0;
virtual bool readDecimal256(Decimal256 &, UInt32, UInt32) = 0;
virtual bool readAggregateFunction(const AggregateFunctionPtr &, AggregateDataPtr, Arena &) = 0;
};
class ConverterBaseImpl;
class ConverterFromString;
template<int field_type_id, typename FromType> class ConverterFromNumber;
class ConverterFromBool;
class ConverterFromEnum;
struct ColumnMatcherTraits
{
struct FieldData
{
std::unique_ptr<IConverter> converter;
};
struct MessageData
{
std::unordered_map<UInt32, const ProtobufColumnMatcher::Field<ColumnMatcherTraits>*> field_number_to_field_map;
};
};
using Message = ProtobufColumnMatcher::Message<ColumnMatcherTraits>;
using Field = ProtobufColumnMatcher::Field<ColumnMatcherTraits>;
void setTraitsDataAfterMatchingColumns(Message * message);
template <int field_type_id>
std::unique_ptr<IConverter> createConverter(const google::protobuf::FieldDescriptor * field);
SimpleReader simple_reader;
std::unique_ptr<Message> root_message;
Message* current_message = nullptr;
size_t current_field_index = 0;
IConverter* current_converter = nullptr;
};
}
#else
namespace DB
{
class Arena;
class IAggregateFunction;
class ReadBuffer;
using AggregateDataPtr = char *;
using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
/// Utility class for reading in the Protobuf format.
/// Knows nothing about protobuf schemas, just provides useful functions to serialize data.
class ProtobufReader
{
public:
bool startMessage() { return false; }
void endMessage() {}
bool readColumnIndex(size_t &) { return false; }
bool readNumber(Int8 &) { return false; }
bool readNumber(UInt8 &) { return false; }
bool readNumber(Int16 &) { return false; }
bool readNumber(UInt16 &) { return false; }
bool readNumber(Int32 &) { return false; }
bool readNumber(UInt32 &) { return false; }
bool readNumber(Int64 &) { return false; }
bool readNumber(UInt64 &) { return false; }
bool readNumber(Int128 &) { return false; }
bool readNumber(UInt128 &) { return false; }
bool readNumber(Int256 &) { return false; }
bool readNumber(UInt256 &) { return false; }
bool readNumber(Float32 &) { return false; }
bool readNumber(Float64 &) { return false; }
bool readStringInto(PaddedPODArray<UInt8> &) { return false; }
void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> &) {}
void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> &) {}
bool readEnum(Int8 &) { return false; }
bool readEnum(Int16 &) { return false; }
bool readUUID(UUID &) { return false; }
bool readDate(DayNum &) { return false; }
bool readDateTime(time_t &) { return false; }
bool readDateTime64(DateTime64 & /*tm*/, UInt32 /*scale*/) { return false; }
bool readDecimal(Decimal32 &, UInt32, UInt32) { return false; }
bool readDecimal(Decimal64 &, UInt32, UInt32) { return false; }
bool readDecimal(Decimal128 &, UInt32, UInt32) { return false; }
bool readDecimal(Decimal256 &, UInt32, UInt32) { return false; }
bool readAggregateFunction(const AggregateFunctionPtr &, AggregateDataPtr, Arena &) { return false; }
bool canReadMoreValues() const { return false; }
ProtobufReader(ReadBuffer & in_);
void startMessage(bool with_length_delimiter_);
void endMessage(bool ignore_errors);
void startNestedMessage();
void endNestedMessage();
bool readFieldNumber(int & field_number);
Int64 readInt();
Int64 readSInt();
UInt64 readUInt();
template<typename T> T readFixed();
void readString(String & str);
void readStringAndAppend(PaddedPODArray<UInt8> & str);
bool eof() const { return in.eof(); }
private:
void readBinary(void * data, size_t size);
void ignore(UInt64 num_bytes);
void ignoreAll();
void moveCursorBackward(UInt64 num_bytes);
UInt64 ALWAYS_INLINE readVarint()
{
char c;
in.readStrict(c);
UInt64 first_byte = static_cast<UInt8>(c);
++cursor;
if (likely(!(c & 0x80)))
return first_byte;
return continueReadingVarint(first_byte);
}
UInt64 continueReadingVarint(UInt64 first_byte);
void ignoreVarint();
void ignoreGroup();
[[noreturn]] void throwUnknownFormat() const;
ReadBuffer & in;
Int64 cursor = 0;
bool root_message_has_length_delimiter = false;
size_t current_message_level = 0;
Int64 current_message_end = 0;
std::vector<Int64> parent_message_ends;
int field_number = 0;
int next_field_number = 0;
Int64 field_end = 0;
};
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,52 @@
#pragma once
#if !defined(ARCADIA_BUILD)
# include "config_formats.h"
#endif
#if USE_PROTOBUF
# include <Columns/IColumn.h>
namespace google::protobuf { class Descriptor; }
namespace DB
{
class ProtobufReader;
class ProtobufWriter;
class IDataType;
using DataTypePtr = std::shared_ptr<const IDataType>;
using DataTypes = std::vector<DataTypePtr>;
/// Utility class, does all the work for serialization in the Protobuf format.
class ProtobufSerializer
{
public:
virtual ~ProtobufSerializer() = default;
virtual void setColumns(const ColumnPtr * columns, size_t num_columns) = 0;
virtual void writeRow(size_t row_num) = 0;
virtual void setColumns(const MutableColumnPtr * columns, size_t num_columns) = 0;
virtual void readRow(size_t row_num) = 0;
virtual void insertDefaults(size_t row_num) = 0;
static std::unique_ptr<ProtobufSerializer> create(
const Strings & column_names,
const DataTypes & data_types,
std::vector<size_t> & missing_column_indices,
const google::protobuf::Descriptor & message_descriptor,
bool with_length_delimiter,
ProtobufReader & reader);
static std::unique_ptr<ProtobufSerializer> create(
const Strings & column_names,
const DataTypes & data_types,
const google::protobuf::Descriptor & message_descriptor,
bool with_length_delimiter,
ProtobufWriter & writer);
};
}
#endif

View File

@ -1,29 +1,11 @@
#include "ProtobufWriter.h"
#if USE_PROTOBUF
# include <cassert>
# include <optional>
# include <math.h>
# include <AggregateFunctions/IAggregateFunction.h>
# include <DataTypes/DataTypesDecimal.h>
# include <IO/ReadHelpers.h>
# include <IO/WriteHelpers.h>
# include <boost/numeric/conversion/cast.hpp>
# include <google/protobuf/descriptor.h>
# include <google/protobuf/descriptor.pb.h>
# include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD;
extern const int PROTOBUF_BAD_CAST;
extern const int PROTOBUF_FIELD_NOT_REPEATED;
}
namespace
{
constexpr size_t MAX_VARINT_SIZE = 10;
@ -81,66 +63,24 @@ namespace
}
void writeFieldNumber(UInt32 field_number, WireType wire_type, PODArray<UInt8> & buf) { writeVarint((field_number << 3) | wire_type, buf); }
// Should we pack repeated values while storing them.
// It depends on type of the field in the protobuf schema and the syntax of that schema.
bool shouldPackRepeated(const google::protobuf::FieldDescriptor * field)
{
if (!field->is_repeated())
return false;
switch (field->type())
{
case google::protobuf::FieldDescriptor::TYPE_INT32:
case google::protobuf::FieldDescriptor::TYPE_UINT32:
case google::protobuf::FieldDescriptor::TYPE_SINT32:
case google::protobuf::FieldDescriptor::TYPE_INT64:
case google::protobuf::FieldDescriptor::TYPE_UINT64:
case google::protobuf::FieldDescriptor::TYPE_SINT64:
case google::protobuf::FieldDescriptor::TYPE_FIXED32:
case google::protobuf::FieldDescriptor::TYPE_SFIXED32:
case google::protobuf::FieldDescriptor::TYPE_FIXED64:
case google::protobuf::FieldDescriptor::TYPE_SFIXED64:
case google::protobuf::FieldDescriptor::TYPE_FLOAT:
case google::protobuf::FieldDescriptor::TYPE_DOUBLE:
case google::protobuf::FieldDescriptor::TYPE_BOOL:
case google::protobuf::FieldDescriptor::TYPE_ENUM:
break;
default:
return false;
}
if (field->options().has_packed())
return field->options().packed();
return field->file()->syntax() == google::protobuf::FileDescriptor::SYNTAX_PROTO3;
}
// Should we omit null values (zero for numbers / empty string for strings) while storing them.
bool shouldSkipNullValue(const google::protobuf::FieldDescriptor * field)
{
return field->is_optional() && (field->file()->syntax() == google::protobuf::FileDescriptor::SYNTAX_PROTO3);
}
}
// SimpleWriter is an utility class to serialize protobufs.
// Knows nothing about protobuf schemas, just provides useful functions to serialize data.
ProtobufWriter::SimpleWriter::SimpleWriter(WriteBuffer & out_, const bool use_length_delimiters_)
ProtobufWriter::ProtobufWriter(WriteBuffer & out_)
: out(out_)
, current_piece_start(0)
, num_bytes_skipped(0)
, use_length_delimiters(use_length_delimiters_)
{
}
ProtobufWriter::SimpleWriter::~SimpleWriter() = default;
ProtobufWriter::~ProtobufWriter() = default;
void ProtobufWriter::SimpleWriter::startMessage()
void ProtobufWriter::startMessage()
{
}
void ProtobufWriter::SimpleWriter::endMessage()
void ProtobufWriter::endMessage(bool with_length_delimiter)
{
pieces.emplace_back(current_piece_start, buffer.size());
if (use_length_delimiters)
if (with_length_delimiter)
{
size_t size_of_message = buffer.size() - num_bytes_skipped;
writeVarint(size_of_message, out);
@ -154,7 +94,7 @@ void ProtobufWriter::SimpleWriter::endMessage()
current_piece_start = 0;
}
void ProtobufWriter::SimpleWriter::startNestedMessage()
void ProtobufWriter::startNestedMessage()
{
nested_infos.emplace_back(pieces.size(), num_bytes_skipped);
pieces.emplace_back(current_piece_start, buffer.size());
@ -167,7 +107,7 @@ void ProtobufWriter::SimpleWriter::startNestedMessage()
num_bytes_skipped = NESTED_MESSAGE_PADDING;
}
void ProtobufWriter::SimpleWriter::endNestedMessage(UInt32 field_number, bool is_group, bool skip_if_empty)
void ProtobufWriter::endNestedMessage(int field_number, bool is_group, bool skip_if_empty)
{
const auto & nested_info = nested_infos.back();
size_t num_pieces_at_start = nested_info.num_pieces_at_start;
@ -203,8 +143,13 @@ void ProtobufWriter::SimpleWriter::endNestedMessage(UInt32 field_number, bool is
num_bytes_skipped += num_bytes_skipped_at_start - num_bytes_inserted;
}
void ProtobufWriter::SimpleWriter::writeUInt(UInt32 field_number, UInt64 value)
void ProtobufWriter::writeUInt(int field_number, UInt64 value)
{
if (in_repeated_pack)
{
writeVarint(value, buffer);
return;
}
size_t old_size = buffer.size();
buffer.reserve(old_size + 2 * MAX_VARINT_SIZE);
UInt8 * ptr = buffer.data() + old_size;
@ -213,20 +158,27 @@ void ProtobufWriter::SimpleWriter::writeUInt(UInt32 field_number, UInt64 value)
buffer.resize_assume_reserved(ptr - buffer.data());
}
void ProtobufWriter::SimpleWriter::writeInt(UInt32 field_number, Int64 value)
void ProtobufWriter::writeInt(int field_number, Int64 value)
{
writeUInt(field_number, static_cast<UInt64>(value));
}
void ProtobufWriter::SimpleWriter::writeSInt(UInt32 field_number, Int64 value)
void ProtobufWriter::writeSInt(int field_number, Int64 value)
{
writeUInt(field_number, encodeZigZag(value));
}
template <typename T>
void ProtobufWriter::SimpleWriter::writeFixed(UInt32 field_number, T value)
void ProtobufWriter::writeFixed(int field_number, T value)
{
static_assert((sizeof(T) == 4) || (sizeof(T) == 8));
if (in_repeated_pack)
{
size_t old_size = buffer.size();
buffer.resize(old_size + sizeof(T));
memcpy(buffer.data() + old_size, &value, sizeof(T));
return;
}
constexpr WireType wire_type = (sizeof(T) == 4) ? BITS32 : BITS64;
size_t old_size = buffer.size();
buffer.reserve(old_size + MAX_VARINT_SIZE + sizeof(T));
@ -237,19 +189,27 @@ void ProtobufWriter::SimpleWriter::writeFixed(UInt32 field_number, T value)
buffer.resize_assume_reserved(ptr - buffer.data());
}
void ProtobufWriter::SimpleWriter::writeString(UInt32 field_number, const StringRef & str)
template void ProtobufWriter::writeFixed<Int32>(int field_number, Int32 value);
template void ProtobufWriter::writeFixed<UInt32>(int field_number, UInt32 value);
template void ProtobufWriter::writeFixed<Int64>(int field_number, Int64 value);
template void ProtobufWriter::writeFixed<UInt64>(int field_number, UInt64 value);
template void ProtobufWriter::writeFixed<Float32>(int field_number, Float32 value);
template void ProtobufWriter::writeFixed<Float64>(int field_number, Float64 value);
void ProtobufWriter::writeString(int field_number, const std::string_view & str)
{
size_t length = str.length();
size_t old_size = buffer.size();
buffer.reserve(old_size + 2 * MAX_VARINT_SIZE + str.size);
buffer.reserve(old_size + 2 * MAX_VARINT_SIZE + length);
UInt8 * ptr = buffer.data() + old_size;
ptr = writeFieldNumber(field_number, LENGTH_DELIMITED, ptr);
ptr = writeVarint(str.size, ptr);
memcpy(ptr, str.data, str.size);
ptr += str.size;
ptr = writeVarint(length, ptr);
memcpy(ptr, str.data(), length);
ptr += length;
buffer.resize_assume_reserved(ptr - buffer.data());
}
void ProtobufWriter::SimpleWriter::startRepeatedPack()
void ProtobufWriter::startRepeatedPack()
{
pieces.emplace_back(current_piece_start, buffer.size());
@ -259,17 +219,19 @@ void ProtobufWriter::SimpleWriter::startRepeatedPack()
current_piece_start = buffer.size() + REPEATED_PACK_PADDING;
buffer.resize(current_piece_start);
num_bytes_skipped += REPEATED_PACK_PADDING;
in_repeated_pack = true;
}
void ProtobufWriter::SimpleWriter::endRepeatedPack(UInt32 field_number)
void ProtobufWriter::endRepeatedPack(int field_number, bool skip_if_empty)
{
size_t size = buffer.size() - current_piece_start;
if (!size)
if (!size && skip_if_empty)
{
current_piece_start = pieces.back().start;
buffer.resize(pieces.back().end);
pieces.pop_back();
num_bytes_skipped -= REPEATED_PACK_PADDING;
in_repeated_pack = false;
return;
}
UInt8 * ptr = &buffer[pieces.back().end];
@ -278,726 +240,7 @@ void ProtobufWriter::SimpleWriter::endRepeatedPack(UInt32 field_number)
size_t num_bytes_inserted = endptr - ptr;
pieces.back().end += num_bytes_inserted;
num_bytes_skipped -= num_bytes_inserted;
}
void ProtobufWriter::SimpleWriter::addUIntToRepeatedPack(UInt64 value)
{
writeVarint(value, buffer);
}
void ProtobufWriter::SimpleWriter::addIntToRepeatedPack(Int64 value)
{
writeVarint(static_cast<UInt64>(value), buffer);
}
void ProtobufWriter::SimpleWriter::addSIntToRepeatedPack(Int64 value)
{
writeVarint(encodeZigZag(value), buffer);
}
template <typename T>
void ProtobufWriter::SimpleWriter::addFixedToRepeatedPack(T value)
{
static_assert((sizeof(T) == 4) || (sizeof(T) == 8));
size_t old_size = buffer.size();
buffer.resize(old_size + sizeof(T));
memcpy(buffer.data() + old_size, &value, sizeof(T));
}
// Implementation for a converter from any DB data type to any protobuf field type.
class ProtobufWriter::ConverterBaseImpl : public IConverter
{
public:
ConverterBaseImpl(SimpleWriter & simple_writer_, const google::protobuf::FieldDescriptor * field_)
: simple_writer(simple_writer_), field(field_)
{
field_number = field->number();
}
virtual void writeString(const StringRef &) override { cannotConvertType("String"); }
virtual void writeInt8(Int8) override { cannotConvertType("Int8"); }
virtual void writeUInt8(UInt8) override { cannotConvertType("UInt8"); }
virtual void writeInt16(Int16) override { cannotConvertType("Int16"); }
virtual void writeUInt16(UInt16) override { cannotConvertType("UInt16"); }
virtual void writeInt32(Int32) override { cannotConvertType("Int32"); }
virtual void writeUInt32(UInt32) override { cannotConvertType("UInt32"); }
virtual void writeInt64(Int64) override { cannotConvertType("Int64"); }
virtual void writeUInt64(UInt64) override { cannotConvertType("UInt64"); }
virtual void writeInt128(Int128) override { cannotConvertType("Int128"); }
virtual void writeUInt128(const UInt128 &) override { cannotConvertType("UInt128"); }
virtual void writeInt256(const Int256 &) override { cannotConvertType("Int256"); }
virtual void writeUInt256(const UInt256 &) override { cannotConvertType("UInt256"); }
virtual void writeFloat32(Float32) override { cannotConvertType("Float32"); }
virtual void writeFloat64(Float64) override { cannotConvertType("Float64"); }
virtual void prepareEnumMapping8(const std::vector<std::pair<std::string, Int8>> &) override {}
virtual void prepareEnumMapping16(const std::vector<std::pair<std::string, Int16>> &) override {}
virtual void writeEnum8(Int8) override { cannotConvertType("Enum"); }
virtual void writeEnum16(Int16) override { cannotConvertType("Enum"); }
virtual void writeUUID(const UUID &) override { cannotConvertType("UUID"); }
virtual void writeDate(DayNum) override { cannotConvertType("Date"); }
virtual void writeDateTime(time_t) override { cannotConvertType("DateTime"); }
virtual void writeDateTime64(DateTime64, UInt32) override { cannotConvertType("DateTime64"); }
virtual void writeDecimal32(Decimal32, UInt32) override { cannotConvertType("Decimal32"); }
virtual void writeDecimal64(Decimal64, UInt32) override { cannotConvertType("Decimal64"); }
virtual void writeDecimal128(const Decimal128 &, UInt32) override { cannotConvertType("Decimal128"); }
virtual void writeDecimal256(const Decimal256 &, UInt32) override { cannotConvertType("Decimal256"); }
virtual void writeAggregateFunction(const AggregateFunctionPtr &, ConstAggregateDataPtr) override { cannotConvertType("AggregateFunction"); }
protected:
[[noreturn]] void cannotConvertType(const String & type_name)
{
throw Exception(
"Could not convert data type '" + type_name + "' to protobuf type '" + field->type_name() + "' (field: " + field->name() + ")",
ErrorCodes::PROTOBUF_BAD_CAST);
}
[[noreturn]] void cannotConvertValue(const String & value)
{
throw Exception(
"Could not convert value '" + value + "' to protobuf type '" + field->type_name() + "' (field: " + field->name() + ")",
ErrorCodes::PROTOBUF_BAD_CAST);
}
template <typename To, typename From>
To numericCast(From value)
{
if constexpr (std::is_same_v<To, From>)
return value;
To result;
try
{
result = boost::numeric_cast<To>(value);
}
catch (boost::numeric::bad_numeric_cast &)
{
cannotConvertValue(toString(value));
}
return result;
}
template <typename To>
To parseFromString(const StringRef & str)
{
To result;
try
{
result = ::DB::parse<To>(str.data, str.size);
}
catch (...)
{
cannotConvertValue(str.toString());
}
return result;
}
SimpleWriter & simple_writer;
const google::protobuf::FieldDescriptor * field;
UInt32 field_number;
};
template <bool skip_null_value>
class ProtobufWriter::ConverterToString : public ConverterBaseImpl
{
public:
using ConverterBaseImpl::ConverterBaseImpl;
void writeString(const StringRef & str) override { writeField(str); }
void writeInt8(Int8 value) override { convertToStringAndWriteField(value); }
void writeUInt8(UInt8 value) override { convertToStringAndWriteField(value); }
void writeInt16(Int16 value) override { convertToStringAndWriteField(value); }
void writeUInt16(UInt16 value) override { convertToStringAndWriteField(value); }
void writeInt32(Int32 value) override { convertToStringAndWriteField(value); }
void writeUInt32(UInt32 value) override { convertToStringAndWriteField(value); }
void writeInt64(Int64 value) override { convertToStringAndWriteField(value); }
void writeUInt64(UInt64 value) override { convertToStringAndWriteField(value); }
void writeFloat32(Float32 value) override { convertToStringAndWriteField(value); }
void writeFloat64(Float64 value) override { convertToStringAndWriteField(value); }
void prepareEnumMapping8(const std::vector<std::pair<String, Int8>> & name_value_pairs) override
{
prepareEnumValueToNameMap(name_value_pairs);
}
void prepareEnumMapping16(const std::vector<std::pair<String, Int16>> & name_value_pairs) override
{
prepareEnumValueToNameMap(name_value_pairs);
}
void writeEnum8(Int8 value) override { writeEnum16(value); }
void writeEnum16(Int16 value) override
{
auto it = enum_value_to_name_map->find(value);
if (it == enum_value_to_name_map->end())
cannotConvertValue(toString(value));
writeField(it->second);
}
void writeUUID(const UUID & uuid) override { convertToStringAndWriteField(uuid); }
void writeDate(DayNum date) override { convertToStringAndWriteField(date); }
void writeDateTime(time_t tm) override
{
writeDateTimeText(tm, text_buffer);
writeField(text_buffer.stringRef());
text_buffer.restart();
}
void writeDateTime64(DateTime64 date_time, UInt32 scale) override
{
writeDateTimeText(date_time, scale, text_buffer);
writeField(text_buffer.stringRef());
text_buffer.restart();
}
void writeDecimal32(Decimal32 decimal, UInt32 scale) override { writeDecimal(decimal, scale); }
void writeDecimal64(Decimal64 decimal, UInt32 scale) override { writeDecimal(decimal, scale); }
void writeDecimal128(const Decimal128 & decimal, UInt32 scale) override { writeDecimal(decimal, scale); }
void writeAggregateFunction(const AggregateFunctionPtr & function, ConstAggregateDataPtr place) override
{
function->serialize(place, text_buffer);
writeField(text_buffer.stringRef());
text_buffer.restart();
}
private:
template <typename T>
void convertToStringAndWriteField(T value)
{
writeText(value, text_buffer);
writeField(text_buffer.stringRef());
text_buffer.restart();
}
template <typename T>
void writeDecimal(const Decimal<T> & decimal, UInt32 scale)
{
writeText(decimal, scale, text_buffer);
writeField(text_buffer.stringRef());
text_buffer.restart();
}
template <typename T>
void prepareEnumValueToNameMap(const std::vector<std::pair<String, T>> & name_value_pairs)
{
if (enum_value_to_name_map.has_value())
return;
enum_value_to_name_map.emplace();
for (const auto & name_value_pair : name_value_pairs)
enum_value_to_name_map->emplace(name_value_pair.second, name_value_pair.first);
}
void writeField(const StringRef & str)
{
if constexpr (skip_null_value)
{
if (!str.size)
return;
}
simple_writer.writeString(field_number, str);
}
WriteBufferFromOwnString text_buffer;
std::optional<std::unordered_map<Int16, String>> enum_value_to_name_map;
};
# define PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_STRINGS(field_type_id) \
template <> \
std::unique_ptr<ProtobufWriter::IConverter> ProtobufWriter::createConverter<field_type_id>( \
const google::protobuf::FieldDescriptor * field) \
{ \
if (shouldSkipNullValue(field)) \
return std::make_unique<ConverterToString<true>>(simple_writer, field); \
else \
return std::make_unique<ConverterToString<false>>(simple_writer, field); \
}
PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_STRINGS(google::protobuf::FieldDescriptor::TYPE_STRING)
PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_STRINGS(google::protobuf::FieldDescriptor::TYPE_BYTES)
# undef PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_STRINGS
template <int field_type_id, typename ToType, bool skip_null_value, bool pack_repeated>
class ProtobufWriter::ConverterToNumber : public ConverterBaseImpl
{
public:
using ConverterBaseImpl::ConverterBaseImpl;
void writeString(const StringRef & str) override { writeField(parseFromString<ToType>(str)); }
void writeInt8(Int8 value) override { castNumericAndWriteField(value); }
void writeUInt8(UInt8 value) override { castNumericAndWriteField(value); }
void writeInt16(Int16 value) override { castNumericAndWriteField(value); }
void writeUInt16(UInt16 value) override { castNumericAndWriteField(value); }
void writeInt32(Int32 value) override { castNumericAndWriteField(value); }
void writeUInt32(UInt32 value) override { castNumericAndWriteField(value); }
void writeInt64(Int64 value) override { castNumericAndWriteField(value); }
void writeUInt64(UInt64 value) override { castNumericAndWriteField(value); }
void writeFloat32(Float32 value) override { castNumericAndWriteField(value); }
void writeFloat64(Float64 value) override { castNumericAndWriteField(value); }
void writeEnum8(Int8 value) override { writeEnum16(value); }
void writeEnum16(Int16 value) override
{
if constexpr (!is_integer_v<ToType>)
cannotConvertType("Enum"); // It's not correct to convert enum to floating point.
castNumericAndWriteField(value);
}
void writeDate(DayNum date) override { castNumericAndWriteField(static_cast<UInt16>(date)); }
void writeDateTime(time_t tm) override { castNumericAndWriteField(tm); }
void writeDateTime64(DateTime64 date_time, UInt32 scale) override { writeDecimal(date_time, scale); }
void writeDecimal32(Decimal32 decimal, UInt32 scale) override { writeDecimal(decimal, scale); }
void writeDecimal64(Decimal64 decimal, UInt32 scale) override { writeDecimal(decimal, scale); }
void writeDecimal128(const Decimal128 & decimal, UInt32 scale) override { writeDecimal(decimal, scale); }
private:
template <typename FromType>
void castNumericAndWriteField(FromType value)
{
writeField(numericCast<ToType>(value));
}
template <typename S>
void writeDecimal(const Decimal<S> & decimal, UInt32 scale)
{
castNumericAndWriteField(DecimalUtils::convertTo<ToType>(decimal, scale));
}
void writeField(ToType value)
{
if constexpr (skip_null_value)
{
if (value == 0)
return;
}
if constexpr (((field_type_id == google::protobuf::FieldDescriptor::TYPE_INT32) && std::is_same_v<ToType, Int32>)
|| ((field_type_id == google::protobuf::FieldDescriptor::TYPE_INT64) && std::is_same_v<ToType, Int64>))
{
if constexpr (pack_repeated)
simple_writer.addIntToRepeatedPack(value);
else
simple_writer.writeInt(field_number, value);
}
else if constexpr (((field_type_id == google::protobuf::FieldDescriptor::TYPE_SINT32) && std::is_same_v<ToType, Int32>)
|| ((field_type_id == google::protobuf::FieldDescriptor::TYPE_SINT64) && std::is_same_v<ToType, Int64>))
{
if constexpr (pack_repeated)
simple_writer.addSIntToRepeatedPack(value);
else
simple_writer.writeSInt(field_number, value);
}
else if constexpr (((field_type_id == google::protobuf::FieldDescriptor::TYPE_UINT32) && std::is_same_v<ToType, UInt32>)
|| ((field_type_id == google::protobuf::FieldDescriptor::TYPE_UINT64) && std::is_same_v<ToType, UInt64>))
{
if constexpr (pack_repeated)
simple_writer.addUIntToRepeatedPack(value);
else
simple_writer.writeUInt(field_number, value);
}
else
{
static_assert(((field_type_id == google::protobuf::FieldDescriptor::TYPE_FIXED32) && std::is_same_v<ToType, UInt32>)
|| ((field_type_id == google::protobuf::FieldDescriptor::TYPE_SFIXED32) && std::is_same_v<ToType, Int32>)
|| ((field_type_id == google::protobuf::FieldDescriptor::TYPE_FIXED64) && std::is_same_v<ToType, UInt64>)
|| ((field_type_id == google::protobuf::FieldDescriptor::TYPE_SFIXED64) && std::is_same_v<ToType, Int64>)
|| ((field_type_id == google::protobuf::FieldDescriptor::TYPE_FLOAT) && std::is_same_v<ToType, float>)
|| ((field_type_id == google::protobuf::FieldDescriptor::TYPE_DOUBLE) && std::is_same_v<ToType, double>));
if constexpr (pack_repeated)
simple_writer.addFixedToRepeatedPack(value);
else
simple_writer.writeFixed(field_number, value);
}
}
};
# define PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(field_type_id, field_type) \
template <> \
std::unique_ptr<ProtobufWriter::IConverter> ProtobufWriter::createConverter<field_type_id>( \
const google::protobuf::FieldDescriptor * field) \
{ \
if (shouldSkipNullValue(field)) \
return std::make_unique<ConverterToNumber<field_type_id, field_type, true, false>>(simple_writer, field); \
else if (shouldPackRepeated(field)) \
return std::make_unique<ConverterToNumber<field_type_id, field_type, false, true>>(simple_writer, field); \
else \
return std::make_unique<ConverterToNumber<field_type_id, field_type, false, false>>(simple_writer, field); \
}
PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_INT32, Int32);
PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_SINT32, Int32);
PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_UINT32, UInt32);
PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_INT64, Int64);
PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_SINT64, Int64);
PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_UINT64, UInt64);
PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_FIXED32, UInt32);
PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_SFIXED32, Int32);
PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_FIXED64, UInt64);
PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_SFIXED64, Int64);
PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_FLOAT, float);
PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_DOUBLE, double);
# undef PROTOBUF_WRITER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS
template <bool skip_null_value, bool pack_repeated>
class ProtobufWriter::ConverterToBool : public ConverterBaseImpl
{
public:
using ConverterBaseImpl::ConverterBaseImpl;
void writeString(const StringRef & str) override
{
if (str == "true")
writeField(true);
else if (str == "false")
writeField(false);
else
cannotConvertValue(str.toString());
}
void writeInt8(Int8 value) override { convertToBoolAndWriteField(value); }
void writeUInt8(UInt8 value) override { convertToBoolAndWriteField(value); }
void writeInt16(Int16 value) override { convertToBoolAndWriteField(value); }
void writeUInt16(UInt16 value) override { convertToBoolAndWriteField(value); }
void writeInt32(Int32 value) override { convertToBoolAndWriteField(value); }
void writeUInt32(UInt32 value) override { convertToBoolAndWriteField(value); }
void writeInt64(Int64 value) override { convertToBoolAndWriteField(value); }
void writeUInt64(UInt64 value) override { convertToBoolAndWriteField(value); }
void writeFloat32(Float32 value) override { convertToBoolAndWriteField(value); }
void writeFloat64(Float64 value) override { convertToBoolAndWriteField(value); }
void writeDecimal32(Decimal32 decimal, UInt32) override { convertToBoolAndWriteField(decimal.value); }
void writeDecimal64(Decimal64 decimal, UInt32) override { convertToBoolAndWriteField(decimal.value); }
void writeDecimal128(const Decimal128 & decimal, UInt32) override { convertToBoolAndWriteField(decimal.value); }
private:
template <typename T>
void convertToBoolAndWriteField(T value)
{
writeField(static_cast<bool>(value));
}
void writeField(bool b)
{
if constexpr (skip_null_value)
{
if (!b)
return;
}
if constexpr (pack_repeated)
simple_writer.addUIntToRepeatedPack(b);
else
simple_writer.writeUInt(field_number, b);
}
};
template <>
std::unique_ptr<ProtobufWriter::IConverter> ProtobufWriter::createConverter<google::protobuf::FieldDescriptor::TYPE_BOOL>(
const google::protobuf::FieldDescriptor * field)
{
if (shouldSkipNullValue(field))
return std::make_unique<ConverterToBool<true, false>>(simple_writer, field);
else if (shouldPackRepeated(field))
return std::make_unique<ConverterToBool<false, true>>(simple_writer, field);
else
return std::make_unique<ConverterToBool<false, false>>(simple_writer, field);
}
template <bool skip_null_value, bool pack_repeated>
class ProtobufWriter::ConverterToEnum : public ConverterBaseImpl
{
public:
using ConverterBaseImpl::ConverterBaseImpl;
void writeString(const StringRef & str) override
{
prepareEnumNameToPbNumberMap();
auto it = enum_name_to_pbnumber_map->find(str);
if (it == enum_name_to_pbnumber_map->end())
cannotConvertValue(str.toString());
writeField(it->second);
}
void writeInt8(Int8 value) override { convertToEnumAndWriteField(value); }
void writeUInt8(UInt8 value) override { convertToEnumAndWriteField(value); }
void writeInt16(Int16 value) override { convertToEnumAndWriteField(value); }
void writeUInt16(UInt16 value) override { convertToEnumAndWriteField(value); }
void writeInt32(Int32 value) override { convertToEnumAndWriteField(value); }
void writeUInt32(UInt32 value) override { convertToEnumAndWriteField(value); }
void writeInt64(Int64 value) override { convertToEnumAndWriteField(value); }
void writeUInt64(UInt64 value) override { convertToEnumAndWriteField(value); }
void prepareEnumMapping8(const std::vector<std::pair<String, Int8>> & name_value_pairs) override
{
prepareEnumValueToPbNumberMap(name_value_pairs);
}
void prepareEnumMapping16(const std::vector<std::pair<String, Int16>> & name_value_pairs) override
{
prepareEnumValueToPbNumberMap(name_value_pairs);
}
void writeEnum8(Int8 value) override { writeEnum16(value); }
void writeEnum16(Int16 value) override
{
int pbnumber;
if (enum_value_always_equals_pbnumber)
pbnumber = value;
else
{
auto it = enum_value_to_pbnumber_map->find(value);
if (it == enum_value_to_pbnumber_map->end())
cannotConvertValue(toString(value));
pbnumber = it->second;
}
writeField(pbnumber);
}
private:
template <typename T>
void convertToEnumAndWriteField(T value)
{
const auto * enum_descriptor = field->enum_type()->FindValueByNumber(numericCast<int>(value));
if (!enum_descriptor)
cannotConvertValue(toString(value));
writeField(enum_descriptor->number());
}
void prepareEnumNameToPbNumberMap()
{
if (enum_name_to_pbnumber_map.has_value())
return;
enum_name_to_pbnumber_map.emplace();
const auto * enum_type = field->enum_type();
for (int i = 0; i != enum_type->value_count(); ++i)
{
const auto * enum_value = enum_type->value(i);
enum_name_to_pbnumber_map->emplace(enum_value->name(), enum_value->number());
}
}
template <typename T>
void prepareEnumValueToPbNumberMap(const std::vector<std::pair<String, T>> & name_value_pairs)
{
if (enum_value_to_pbnumber_map.has_value())
return;
enum_value_to_pbnumber_map.emplace();
enum_value_always_equals_pbnumber = true;
for (const auto & name_value_pair : name_value_pairs)
{
Int16 value = name_value_pair.second; // NOLINT
const auto * enum_descriptor = field->enum_type()->FindValueByName(name_value_pair.first);
if (enum_descriptor)
{
enum_value_to_pbnumber_map->emplace(value, enum_descriptor->number());
if (value != enum_descriptor->number())
enum_value_always_equals_pbnumber = false;
}
else
enum_value_always_equals_pbnumber = false;
}
}
void writeField(int enum_pbnumber)
{
if constexpr (skip_null_value)
{
if (!enum_pbnumber)
return;
}
if constexpr (pack_repeated)
simple_writer.addUIntToRepeatedPack(enum_pbnumber);
else
simple_writer.writeUInt(field_number, enum_pbnumber);
}
std::optional<std::unordered_map<StringRef, int>> enum_name_to_pbnumber_map;
std::optional<std::unordered_map<Int16, int>> enum_value_to_pbnumber_map;
bool enum_value_always_equals_pbnumber;
};
template <>
std::unique_ptr<ProtobufWriter::IConverter> ProtobufWriter::createConverter<google::protobuf::FieldDescriptor::TYPE_ENUM>(
const google::protobuf::FieldDescriptor * field)
{
if (shouldSkipNullValue(field))
return std::make_unique<ConverterToEnum<true, false>>(simple_writer, field);
else if (shouldPackRepeated(field))
return std::make_unique<ConverterToEnum<false, true>>(simple_writer, field);
else
return std::make_unique<ConverterToEnum<false, false>>(simple_writer, field);
}
ProtobufWriter::ProtobufWriter(
WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool use_length_delimiters_)
: simple_writer(out, use_length_delimiters_)
{
std::vector<const google::protobuf::FieldDescriptor *> field_descriptors_without_match;
root_message = ProtobufColumnMatcher::matchColumns<ColumnMatcherTraits>(column_names, message_type, field_descriptors_without_match);
for (const auto * field_descriptor_without_match : field_descriptors_without_match)
{
if (field_descriptor_without_match->is_required())
throw Exception(
"Output doesn't have a column named '" + field_descriptor_without_match->name()
+ "' which is required to write the output in the protobuf format.",
ErrorCodes::NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD);
}
setTraitsDataAfterMatchingColumns(root_message.get());
}
ProtobufWriter::~ProtobufWriter() = default;
void ProtobufWriter::setTraitsDataAfterMatchingColumns(Message * message)
{
Field * parent_field = message->parent ? &message->parent->fields[message->index_in_parent] : nullptr;
message->data.parent_field_number = parent_field ? parent_field->field_number : 0;
message->data.is_required = parent_field && parent_field->data.is_required;
if (parent_field && parent_field->data.is_repeatable)
message->data.repeatable_container_message = message;
else if (message->parent)
message->data.repeatable_container_message = message->parent->data.repeatable_container_message;
else
message->data.repeatable_container_message = nullptr;
message->data.is_group = parent_field && (parent_field->field_descriptor->type() == google::protobuf::FieldDescriptor::TYPE_GROUP);
for (auto & field : message->fields)
{
field.data.is_repeatable = field.field_descriptor->is_repeated();
field.data.is_required = field.field_descriptor->is_required();
field.data.repeatable_container_message = message->data.repeatable_container_message;
field.data.should_pack_repeated = shouldPackRepeated(field.field_descriptor);
if (field.nested_message)
{
setTraitsDataAfterMatchingColumns(field.nested_message.get());
continue;
}
switch (field.field_descriptor->type())
{
# define PROTOBUF_WRITER_CONVERTER_CREATING_CASE(field_type_id) \
case field_type_id: \
field.data.converter = createConverter<field_type_id>(field.field_descriptor); \
break
PROTOBUF_WRITER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_STRING);
PROTOBUF_WRITER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_BYTES);
PROTOBUF_WRITER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_INT32);
PROTOBUF_WRITER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_SINT32);
PROTOBUF_WRITER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_UINT32);
PROTOBUF_WRITER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_FIXED32);
PROTOBUF_WRITER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_SFIXED32);
PROTOBUF_WRITER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_INT64);
PROTOBUF_WRITER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_SINT64);
PROTOBUF_WRITER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_UINT64);
PROTOBUF_WRITER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_FIXED64);
PROTOBUF_WRITER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_SFIXED64);
PROTOBUF_WRITER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_FLOAT);
PROTOBUF_WRITER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_DOUBLE);
PROTOBUF_WRITER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_BOOL);
PROTOBUF_WRITER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_ENUM);
# undef PROTOBUF_WRITER_CONVERTER_CREATING_CASE
default:
throw Exception(
String("Protobuf type '") + field.field_descriptor->type_name() + "' isn't supported", ErrorCodes::NOT_IMPLEMENTED);
}
}
}
void ProtobufWriter::startMessage()
{
current_message = root_message.get();
current_field_index = 0;
simple_writer.startMessage();
}
void ProtobufWriter::endMessage()
{
if (!current_message)
return;
endWritingField();
while (current_message->parent)
{
simple_writer.endNestedMessage(
current_message->data.parent_field_number, current_message->data.is_group, !current_message->data.is_required);
current_message = current_message->parent;
}
simple_writer.endMessage();
current_message = nullptr;
}
bool ProtobufWriter::writeField(size_t & column_index)
{
endWritingField();
while (true)
{
if (current_field_index < current_message->fields.size())
{
Field & field = current_message->fields[current_field_index];
if (!field.nested_message)
{
current_field = &current_message->fields[current_field_index];
current_converter = current_field->data.converter.get();
column_index = current_field->column_index;
if (current_field->data.should_pack_repeated)
simple_writer.startRepeatedPack();
return true;
}
simple_writer.startNestedMessage();
current_message = field.nested_message.get();
current_message->data.need_repeat = false;
current_field_index = 0;
continue;
}
if (current_message->parent)
{
simple_writer.endNestedMessage(
current_message->data.parent_field_number, current_message->data.is_group, !current_message->data.is_required);
if (current_message->data.need_repeat)
{
simple_writer.startNestedMessage();
current_message->data.need_repeat = false;
current_field_index = 0;
continue;
}
current_field_index = current_message->index_in_parent + 1;
current_message = current_message->parent;
continue;
}
return false;
}
}
void ProtobufWriter::endWritingField()
{
if (!current_field)
return;
if (current_field->data.should_pack_repeated)
simple_writer.endRepeatedPack(current_field->field_number);
else if ((num_values == 0) && current_field->data.is_required)
throw Exception(
"No data for the required field '" + current_field->field_descriptor->name() + "'",
ErrorCodes::NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD);
current_field = nullptr;
current_converter = nullptr;
num_values = 0;
++current_field_index;
}
void ProtobufWriter::setNestedMessageNeedsRepeat()
{
if (current_field->data.repeatable_container_message)
current_field->data.repeatable_container_message->data.need_repeat = true;
else
throw Exception(
"Cannot write more than single value to the non-repeated field '" + current_field->field_descriptor->name() + "'",
ErrorCodes::PROTOBUF_FIELD_NOT_REPEATED);
in_repeated_pack = false;
}
}

View File

@ -1,290 +1,68 @@
#pragma once
#include <Core/UUID.h>
#include <common/DayNum.h>
#include <memory>
#if !defined(ARCADIA_BUILD)
# include "config_formats.h"
#endif
#if USE_PROTOBUF
# include <IO/WriteBufferFromString.h>
# include <boost/noncopyable.hpp>
# include <Common/PODArray.h>
# include "ProtobufColumnMatcher.h"
namespace google
{
namespace protobuf
{
class Descriptor;
class FieldDescriptor;
}
}
namespace DB
{
class IAggregateFunction;
using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
using ConstAggregateDataPtr = const char *;
/** Serializes a protobuf, tries to cast types if necessarily.
*/
class ProtobufWriter : private boost::noncopyable
{
public:
ProtobufWriter(WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool use_length_delimiters_);
~ProtobufWriter();
/// Should be called at the beginning of writing a message.
void startMessage();
/// Should be called at the end of writing a message.
void endMessage();
/// Prepares for writing values of a field.
/// Returns true and sets 'column_index' to the corresponding column's index.
/// Returns false if there are no more fields to write in the message type (call endMessage() in this case).
bool writeField(size_t & column_index);
/// Writes a value. This function should be called one or multiple times after writeField().
/// Returns false if there are no more place for the values in the protobuf's field.
/// This can happen if the protobuf's field is not declared as repeated in the protobuf schema.
bool writeNumber(Int8 value) { return writeValueIfPossible(&IConverter::writeInt8, value); }
bool writeNumber(UInt8 value) { return writeValueIfPossible(&IConverter::writeUInt8, value); }
bool writeNumber(Int16 value) { return writeValueIfPossible(&IConverter::writeInt16, value); }
bool writeNumber(UInt16 value) { return writeValueIfPossible(&IConverter::writeUInt16, value); }
bool writeNumber(Int32 value) { return writeValueIfPossible(&IConverter::writeInt32, value); }
bool writeNumber(UInt32 value) { return writeValueIfPossible(&IConverter::writeUInt32, value); }
bool writeNumber(Int64 value) { return writeValueIfPossible(&IConverter::writeInt64, value); }
bool writeNumber(UInt64 value) { return writeValueIfPossible(&IConverter::writeUInt64, value); }
bool writeNumber(Int128 value) { return writeValueIfPossible(&IConverter::writeInt128, value); }
bool writeNumber(UInt128 value) { return writeValueIfPossible(&IConverter::writeUInt128, value); }
bool writeNumber(Int256 value) { return writeValueIfPossible(&IConverter::writeInt256, value); }
bool writeNumber(UInt256 value) { return writeValueIfPossible(&IConverter::writeUInt256, value); }
bool writeNumber(Float32 value) { return writeValueIfPossible(&IConverter::writeFloat32, value); }
bool writeNumber(Float64 value) { return writeValueIfPossible(&IConverter::writeFloat64, value); }
bool writeString(const StringRef & str) { return writeValueIfPossible(&IConverter::writeString, str); }
void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & enum_values) { current_converter->prepareEnumMapping8(enum_values); }
void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & enum_values) { current_converter->prepareEnumMapping16(enum_values); }
bool writeEnum(Int8 value) { return writeValueIfPossible(&IConverter::writeEnum8, value); }
bool writeEnum(Int16 value) { return writeValueIfPossible(&IConverter::writeEnum16, value); }
bool writeUUID(const UUID & uuid) { return writeValueIfPossible(&IConverter::writeUUID, uuid); }
bool writeDate(DayNum date) { return writeValueIfPossible(&IConverter::writeDate, date); }
bool writeDateTime(time_t tm) { return writeValueIfPossible(&IConverter::writeDateTime, tm); }
bool writeDateTime64(DateTime64 tm, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDateTime64, tm, scale); }
bool writeDecimal(Decimal32 decimal, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDecimal32, decimal, scale); }
bool writeDecimal(Decimal64 decimal, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDecimal64, decimal, scale); }
bool writeDecimal(const Decimal128 & decimal, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDecimal128, decimal, scale); }
bool writeDecimal(const Decimal256 & decimal, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDecimal256, decimal, scale); }
bool writeAggregateFunction(const AggregateFunctionPtr & function, ConstAggregateDataPtr place) { return writeValueIfPossible(&IConverter::writeAggregateFunction, function, place); }
private:
class SimpleWriter
{
public:
SimpleWriter(WriteBuffer & out_, const bool use_length_delimiters_);
~SimpleWriter();
void startMessage();
void endMessage();
void startNestedMessage();
void endNestedMessage(UInt32 field_number, bool is_group, bool skip_if_empty);
void writeInt(UInt32 field_number, Int64 value);
void writeUInt(UInt32 field_number, UInt64 value);
void writeSInt(UInt32 field_number, Int64 value);
template <typename T>
void writeFixed(UInt32 field_number, T value);
void writeString(UInt32 field_number, const StringRef & str);
void startRepeatedPack();
void addIntToRepeatedPack(Int64 value);
void addUIntToRepeatedPack(UInt64 value);
void addSIntToRepeatedPack(Int64 value);
template <typename T>
void addFixedToRepeatedPack(T value);
void endRepeatedPack(UInt32 field_number);
private:
struct Piece
{
size_t start;
size_t end;
Piece(size_t start_, size_t end_) : start(start_), end(end_) {}
Piece() = default;
};
struct NestedInfo
{
size_t num_pieces_at_start;
size_t num_bytes_skipped_at_start;
NestedInfo(size_t num_pieces_at_start_, size_t num_bytes_skipped_at_start_)
: num_pieces_at_start(num_pieces_at_start_), num_bytes_skipped_at_start(num_bytes_skipped_at_start_)
{
}
};
WriteBuffer & out;
PODArray<UInt8> buffer;
std::vector<Piece> pieces;
size_t current_piece_start;
size_t num_bytes_skipped;
std::vector<NestedInfo> nested_infos;
const bool use_length_delimiters;
};
class IConverter
{
public:
virtual ~IConverter() = default;
virtual void writeString(const StringRef &) = 0;
virtual void writeInt8(Int8) = 0;
virtual void writeUInt8(UInt8) = 0;
virtual void writeInt16(Int16) = 0;
virtual void writeUInt16(UInt16) = 0;
virtual void writeInt32(Int32) = 0;
virtual void writeUInt32(UInt32) = 0;
virtual void writeInt64(Int64) = 0;
virtual void writeUInt64(UInt64) = 0;
virtual void writeInt128(Int128) = 0;
virtual void writeUInt128(const UInt128 &) = 0;
virtual void writeInt256(const Int256 &) = 0;
virtual void writeUInt256(const UInt256 &) = 0;
virtual void writeFloat32(Float32) = 0;
virtual void writeFloat64(Float64) = 0;
virtual void prepareEnumMapping8(const std::vector<std::pair<std::string, Int8>> &) = 0;
virtual void prepareEnumMapping16(const std::vector<std::pair<std::string, Int16>> &) = 0;
virtual void writeEnum8(Int8) = 0;
virtual void writeEnum16(Int16) = 0;
virtual void writeUUID(const UUID &) = 0;
virtual void writeDate(DayNum) = 0;
virtual void writeDateTime(time_t) = 0;
virtual void writeDateTime64(DateTime64, UInt32 scale) = 0;
virtual void writeDecimal32(Decimal32, UInt32) = 0;
virtual void writeDecimal64(Decimal64, UInt32) = 0;
virtual void writeDecimal128(const Decimal128 &, UInt32) = 0;
virtual void writeDecimal256(const Decimal256 &, UInt32) = 0;
virtual void writeAggregateFunction(const AggregateFunctionPtr &, ConstAggregateDataPtr) = 0;
};
class ConverterBaseImpl;
template <bool skip_null_value>
class ConverterToString;
template <int field_type_id, typename ToType, bool skip_null_value, bool pack_repeated>
class ConverterToNumber;
template <bool skip_null_value, bool pack_repeated>
class ConverterToBool;
template <bool skip_null_value, bool pack_repeated>
class ConverterToEnum;
struct ColumnMatcherTraits
{
struct FieldData
{
std::unique_ptr<IConverter> converter;
bool is_required;
bool is_repeatable;
bool should_pack_repeated;
ProtobufColumnMatcher::Message<ColumnMatcherTraits> * repeatable_container_message;
};
struct MessageData
{
UInt32 parent_field_number;
bool is_group;
bool is_required;
ProtobufColumnMatcher::Message<ColumnMatcherTraits> * repeatable_container_message;
bool need_repeat;
};
};
using Message = ProtobufColumnMatcher::Message<ColumnMatcherTraits>;
using Field = ProtobufColumnMatcher::Field<ColumnMatcherTraits>;
void setTraitsDataAfterMatchingColumns(Message * message);
template <int field_type_id>
std::unique_ptr<IConverter> createConverter(const google::protobuf::FieldDescriptor * field);
template <typename... Params>
using WriteValueFunctionPtr = void (IConverter::*)(Params...);
template <typename... Params, typename... Args>
bool writeValueIfPossible(WriteValueFunctionPtr<Params...> func, Args &&... args)
{
if (num_values && !current_field->data.is_repeatable)
{
setNestedMessageNeedsRepeat();
return false;
}
(current_converter->*func)(std::forward<Args>(args)...);
++num_values;
return true;
}
void setNestedMessageNeedsRepeat();
void endWritingField();
SimpleWriter simple_writer;
std::unique_ptr<Message> root_message;
Message * current_message;
size_t current_field_index = 0;
const Field * current_field = nullptr;
IConverter * current_converter = nullptr;
size_t num_values = 0;
};
}
#else
# include <common/StringRef.h>
# include <Core/Types.h>
# include <Common/PODArray.h>
namespace DB
{
class IAggregateFunction;
using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
using ConstAggregateDataPtr = const char *;
class WriteBuffer;
/// Utility class for writing in the Protobuf format.
/// Knows nothing about protobuf schemas, just provides useful functions to serialize data.
class ProtobufWriter
{
public:
bool writeNumber(Int8 /* value */) { return false; }
bool writeNumber(UInt8 /* value */) { return false; }
bool writeNumber(Int16 /* value */) { return false; }
bool writeNumber(UInt16 /* value */) { return false; }
bool writeNumber(Int32 /* value */) { return false; }
bool writeNumber(UInt32 /* value */) { return false; }
bool writeNumber(Int64 /* value */) { return false; }
bool writeNumber(UInt64 /* value */) { return false; }
bool writeNumber(Int128 /* value */) { return false; }
bool writeNumber(UInt128 /* value */) { return false; }
bool writeNumber(Int256 /* value */) { return false; }
bool writeNumber(UInt256 /* value */) { return false; }
bool writeNumber(Float32 /* value */) { return false; }
bool writeNumber(Float64 /* value */) { return false; }
bool writeString(const StringRef & /* value */) { return false; }
void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & /* name_value_pairs */) {}
void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & /* name_value_pairs */) {}
bool writeEnum(Int8 /* value */) { return false; }
bool writeEnum(Int16 /* value */) { return false; }
bool writeUUID(const UUID & /* value */) { return false; }
bool writeDate(DayNum /* date */) { return false; }
bool writeDateTime(time_t /* tm */) { return false; }
bool writeDateTime64(DateTime64 /*tm*/, UInt32 /*scale*/) { return false; }
bool writeDecimal(Decimal32 /* decimal */, UInt32 /* scale */) { return false; }
bool writeDecimal(Decimal64 /* decimal */, UInt32 /* scale */) { return false; }
bool writeDecimal(const Decimal128 & /* decimal */, UInt32 /* scale */) { return false; }
bool writeDecimal(const Decimal256 & /* decimal */, UInt32 /* scale */) { return false; }
bool writeAggregateFunction(const AggregateFunctionPtr & /* function */, ConstAggregateDataPtr /* place */) { return false; }
ProtobufWriter(WriteBuffer & out_);
~ProtobufWriter();
void startMessage();
void endMessage(bool with_length_delimiter);
void startNestedMessage();
void endNestedMessage(int field_number, bool is_group, bool skip_if_empty);
void writeInt(int field_number, Int64 value);
void writeUInt(int field_number, UInt64 value);
void writeSInt(int field_number, Int64 value);
template <typename T>
void writeFixed(int field_number, T value);
void writeString(int field_number, const std::string_view & str);
void startRepeatedPack();
void endRepeatedPack(int field_number, bool skip_if_empty);
private:
struct Piece
{
size_t start;
size_t end;
Piece(size_t start_, size_t end_) : start(start_), end(end_) {}
Piece() = default;
};
struct NestedInfo
{
size_t num_pieces_at_start;
size_t num_bytes_skipped_at_start;
NestedInfo(size_t num_pieces_at_start_, size_t num_bytes_skipped_at_start_)
: num_pieces_at_start(num_pieces_at_start_), num_bytes_skipped_at_start(num_bytes_skipped_at_start_)
{
}
};
WriteBuffer & out;
PODArray<UInt8> buffer;
std::vector<Piece> pieces;
size_t current_piece_start = 0;
size_t num_bytes_skipped = 0;
std::vector<NestedInfo> nested_infos;
bool in_repeated_pack = false;
};
}

View File

@ -20,9 +20,9 @@ SRCS(
NativeFormat.cpp
NullFormat.cpp
ParsedTemplateFormatString.cpp
ProtobufColumnMatcher.cpp
ProtobufReader.cpp
ProtobufSchemas.cpp
ProtobufSerializer.cpp
ProtobufWriter.cpp
registerFormats.cpp
verbosePrintString.cpp

View File

@ -1,57 +1,48 @@
#include "ProtobufRowInputFormat.h"
#if USE_PROTOBUF
#include <Core/Block.h>
#include <Formats/FormatFactory.h>
#include <Formats/FormatSchemaInfo.h>
#include <Formats/ProtobufSchemas.h>
#include <Interpreters/Context.h>
# include <Core/Block.h>
# include <Formats/FormatFactory.h>
# include <Formats/FormatSchemaInfo.h>
# include <Formats/ProtobufReader.h>
# include <Formats/ProtobufSchemas.h>
# include <Formats/ProtobufSerializer.h>
# include <Interpreters/Context.h>
# include <ext/range.h>
namespace DB
{
ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSchemaInfo & info_, const bool use_length_delimiters_)
ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, const Params & params_, const FormatSchemaInfo & schema_info_, bool with_length_delimiter_)
: IRowInputFormat(header_, in_, params_)
, data_types(header_.getDataTypes())
, reader(in, ProtobufSchemas::instance().getMessageTypeForFormatSchema(info_), header_.getNames(), use_length_delimiters_)
, reader(std::make_unique<ProtobufReader>(in_))
, serializer(ProtobufSerializer::create(
header_.getNames(),
header_.getDataTypes(),
missing_column_indices,
*ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_),
with_length_delimiter_,
*reader))
{
}
ProtobufRowInputFormat::~ProtobufRowInputFormat() = default;
bool ProtobufRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & extra)
bool ProtobufRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & row_read_extension)
{
if (!reader.startMessage())
return false; // EOF reached, no more messages.
if (reader->eof())
return false;
// Set of columns for which the values were read. The rest will be filled with default values.
auto & read_columns = extra.read_columns;
read_columns.assign(columns.size(), false);
size_t row_num = columns.empty() ? 0 : columns[0]->size();
if (!row_num)
serializer->setColumns(columns.data(), columns.size());
// Read values from this message and put them to the columns while it's possible.
size_t column_index;
while (reader.readColumnIndex(column_index))
{
bool allow_add_row = !static_cast<bool>(read_columns[column_index]);
do
{
bool row_added;
data_types[column_index]->deserializeProtobuf(*columns[column_index], reader, allow_add_row, row_added);
if (row_added)
{
read_columns[column_index] = true;
allow_add_row = false;
}
} while (reader.canReadMoreValues());
}
serializer->readRow(row_num);
// Fill non-visited columns with the default values.
for (column_index = 0; column_index < read_columns.size(); ++column_index)
if (!read_columns[column_index])
data_types[column_index]->insertDefaultInto(*columns[column_index]);
reader.endMessage();
row_read_extension.read_columns.clear();
row_read_extension.read_columns.resize(columns.size(), true);
for (size_t column_idx : missing_column_indices)
row_read_extension.read_columns[column_idx] = false;
return true;
}
@ -62,14 +53,14 @@ bool ProtobufRowInputFormat::allowSyncAfterError() const
void ProtobufRowInputFormat::syncAfterError()
{
reader.endMessage(true);
reader->endMessage(true);
}
void registerInputFormatProcessorProtobuf(FormatFactory & factory)
{
for (bool use_length_delimiters : {false, true})
for (bool with_length_delimiter : {false, true})
{
factory.registerInputFormatProcessor(use_length_delimiters ? "Protobuf" : "ProtobufSingle", [use_length_delimiters](
factory.registerInputFormatProcessor(with_length_delimiter ? "Protobuf" : "ProtobufSingle", [with_length_delimiter](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
@ -78,7 +69,7 @@ void registerInputFormatProcessorProtobuf(FormatFactory & factory)
return std::make_shared<ProtobufRowInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
settings.schema.is_server, settings.schema.format_schema_path),
use_length_delimiters);
with_length_delimiter);
});
}
}

View File

@ -5,14 +5,14 @@
#endif
#if USE_PROTOBUF
# include <DataTypes/IDataType.h>
# include <Formats/ProtobufReader.h>
# include <Processors/Formats/IRowInputFormat.h>
namespace DB
{
class Block;
class FormatSchemaInfo;
class ProtobufReader;
class ProtobufSerializer;
/** Stream designed to deserialize data from the google protobuf format.
@ -29,18 +29,19 @@ class FormatSchemaInfo;
class ProtobufRowInputFormat : public IRowInputFormat
{
public:
ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSchemaInfo & info_, const bool use_length_delimiters_);
ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, const Params & params_, const FormatSchemaInfo & schema_info_, bool with_length_delimiter_);
~ProtobufRowInputFormat() override;
String getName() const override { return "ProtobufRowInputFormat"; }
bool readRow(MutableColumns & columns, RowReadExtension & extra) override;
bool readRow(MutableColumns & columns, RowReadExtension &) override;
bool allowSyncAfterError() const override;
void syncAfterError() override;
private:
DataTypes data_types;
ProtobufReader reader;
std::unique_ptr<ProtobufReader> reader;
std::vector<size_t> missing_column_indices;
std::unique_ptr<ProtobufSerializer> serializer;
};
}

View File

@ -1,13 +1,13 @@
#include <Formats/FormatFactory.h>
#include "ProtobufRowOutputFormat.h"
#if USE_PROTOBUF
#include <Core/Block.h>
#include <Formats/FormatSchemaInfo.h>
#include <Formats/ProtobufSchemas.h>
#include <Interpreters/Context.h>
#include <google/protobuf/descriptor.h>
# include <Formats/FormatFactory.h>
# include <Core/Block.h>
# include <Formats/FormatSchemaInfo.h>
# include <Formats/ProtobufSchemas.h>
# include <Formats/ProtobufSerializer.h>
# include <Formats/ProtobufWriter.h>
# include <google/protobuf/descriptor.h>
namespace DB
@ -20,58 +20,55 @@ namespace ErrorCodes
ProtobufRowOutputFormat::ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & format_schema,
const FormatSettings & settings)
: IRowOutputFormat(header, out_, params_)
, data_types(header.getDataTypes())
, writer(out,
ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema),
header.getNames(), settings.protobuf.write_row_delimiters)
, allow_only_one_row(
!settings.protobuf.write_row_delimiters
&& !settings.protobuf.allow_many_rows_no_delimiters)
const FormatSchemaInfo & schema_info_,
const FormatSettings & settings_,
bool with_length_delimiter_)
: IRowOutputFormat(header_, out_, params_)
, writer(std::make_unique<ProtobufWriter>(out))
, serializer(ProtobufSerializer::create(
header_.getNames(),
header_.getDataTypes(),
*ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_),
with_length_delimiter_,
*writer))
, allow_multiple_rows(with_length_delimiter_ || settings_.protobuf.allow_multiple_rows_without_delimiter)
{
value_indices.resize(header.columns());
}
void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num)
{
if (allow_only_one_row && !first_row)
{
throw Exception("The ProtobufSingle format can't be used to write multiple rows because this format doesn't have any row delimiter.", ErrorCodes::NO_ROW_DELIMITER);
}
if (!allow_multiple_rows && !first_row)
throw Exception(
"The ProtobufSingle format can't be used to write multiple rows because this format doesn't have any row delimiter.",
ErrorCodes::NO_ROW_DELIMITER);
writer.startMessage();
std::fill(value_indices.begin(), value_indices.end(), 0);
size_t column_index;
while (writer.writeField(column_index))
data_types[column_index]->serializeProtobuf(
*columns[column_index], row_num, writer, value_indices[column_index]);
writer.endMessage();
if (!row_num)
serializer->setColumns(columns.data(), columns.size());
serializer->writeRow(row_num);
}
void registerOutputFormatProcessorProtobuf(FormatFactory & factory)
{
for (bool write_row_delimiters : {false, true})
for (bool with_length_delimiter : {false, true})
{
factory.registerOutputFormatProcessor(
write_row_delimiters ? "Protobuf" : "ProtobufSingle",
[write_row_delimiters](WriteBuffer & buf,
with_length_delimiter ? "Protobuf" : "ProtobufSingle",
[with_length_delimiter](WriteBuffer & buf,
const Block & header,
const RowOutputFormatParams & params,
const FormatSettings & _settings)
const FormatSettings & settings)
{
FormatSettings settings = _settings;
settings.protobuf.write_row_delimiters = write_row_delimiters;
return std::make_shared<ProtobufRowOutputFormat>(
buf, header, params,
FormatSchemaInfo(settings.schema.format_schema, "Protobuf",
true, settings.schema.is_server,
settings.schema.format_schema_path),
settings);
settings,
with_length_delimiter);
});
}
}

View File

@ -8,21 +8,16 @@
# include <Core/Block.h>
# include <Formats/FormatSchemaInfo.h>
# include <Formats/FormatSettings.h>
# include <Formats/ProtobufWriter.h>
# include <Processors/Formats/IRowOutputFormat.h>
namespace google
{
namespace protobuf
{
class Message;
}
}
namespace DB
{
class ProtobufWriter;
class ProtobufSerializer;
class FormatSchemaInfo;
struct FormatSettings;
/** Stream designed to serialize data in the google protobuf format.
* Each row is written as a separated message.
*
@ -38,10 +33,11 @@ class ProtobufRowOutputFormat : public IRowOutputFormat
public:
ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & format_schema,
const FormatSettings & settings);
const FormatSchemaInfo & schema_info_,
const FormatSettings & settings_,
bool with_length_delimiter_);
String getName() const override { return "ProtobufRowOutputFormat"; }
@ -50,10 +46,9 @@ public:
std::string getContentType() const override { return "application/octet-stream"; }
private:
DataTypes data_types;
ProtobufWriter writer;
std::vector<size_t> value_indices;
const bool allow_only_one_row;
std::unique_ptr<ProtobufWriter> writer;
std::unique_ptr<ProtobufSerializer> serializer;
const bool allow_multiple_rows;
};
}

View File

@ -26,7 +26,7 @@ void KafkaBlockOutputStream::writePrefix()
buffer = storage.createWriteBuffer(getHeader());
auto format_settings = getFormatSettings(*context);
format_settings.protobuf.allow_many_rows_no_delimiters = true;
format_settings.protobuf.allow_multiple_rows_without_delimiter = true;
child = FormatFactory::instance().getOutputStream(storage.getFormatName(), *buffer,
getHeader(), *context,

View File

@ -34,7 +34,7 @@ void RabbitMQBlockOutputStream::writePrefix()
buffer->activateWriting();
auto format_settings = getFormatSettings(context);
format_settings.protobuf.allow_many_rows_no_delimiters = true;
format_settings.protobuf.allow_multiple_rows_without_delimiter = true;
child = FormatFactory::instance().getOutputStream(storage.getFormatName(), *buffer,
getHeader(), context,

View File

@ -0,0 +1,14 @@
syntax = "proto3";
message ABC
{
message nested
{
message nested
{
repeated int32 c = 1;
}
repeated nested b = 1;
}
repeated nested a = 1;
}

View File

@ -0,0 +1,52 @@
[[],[[]],[[1]],[[2,3],[4]]]
[[[5,6,7]],[[8,9,10]]]
Binary representation:
00000000 1a 0a 00 0a 02 0a 00 0a 05 0a 03 0a 01 01 0a 0b |................|
00000010 0a 04 0a 02 02 03 0a 03 0a 01 04 12 0a 07 0a 05 |................|
00000020 0a 03 05 06 07 0a 07 0a 05 0a 03 08 09 0a |..............|
0000002e
MESSAGE #1 AT 0x00000001
a {
}
a {
b {
}
}
a {
b {
c: 1
}
}
a {
b {
c: 2
c: 3
}
b {
c: 4
}
}
MESSAGE #2 AT 0x0000001C
a {
b {
c: 5
c: 6
c: 7
}
}
a {
b {
c: 8
c: 9
c: 10
}
}
Binary representation is as expected
[[],[[]],[[1]],[[2,3],[4]]]
[[[5,6,7]],[[8,9,10]]]
[[],[[]],[[1]],[[2,3],[4]]]
[[[5,6,7]],[[8,9,10]]]

View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -eo pipefail
# Run the client.
$CLICKHOUSE_CLIENT --multiquery <<'EOF'
DROP TABLE IF EXISTS array_3dim_protobuf_00825;
CREATE TABLE array_3dim_protobuf_00825
(
`a_b_c` Array(Array(Array(Int32)))
) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO array_3dim_protobuf_00825 VALUES ([[], [[]], [[1]], [[2,3],[4]]]), ([[[5, 6, 7]], [[8, 9, 10]]]);
SELECT * FROM array_3dim_protobuf_00825;
EOF
BINARY_FILE_PATH=$(mktemp "$CURDIR/00825_protobuf_format_array_3dim.XXXXXX.binary")
$CLICKHOUSE_CLIENT --query "SELECT * FROM array_3dim_protobuf_00825 FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format_array_3dim:ABC'" > "$BINARY_FILE_PATH"
# Check the output in the protobuf format
echo
$CURDIR/helpers/protobuf_length_delimited_encoder.py --decode_and_check --format_schema "$CURDIR/00825_protobuf_format_array_3dim:ABC" --input "$BINARY_FILE_PATH"
# Check the input in the protobuf format (now the table contains the same data twice).
echo
$CLICKHOUSE_CLIENT --query "INSERT INTO array_3dim_protobuf_00825 FORMAT Protobuf SETTINGS format_schema='$CURDIR/00825_protobuf_format_array_3dim:ABC'" < "$BINARY_FILE_PATH"
$CLICKHOUSE_CLIENT --query "SELECT * FROM array_3dim_protobuf_00825"
rm "$BINARY_FILE_PATH"

View File

@ -0,0 +1,9 @@
syntax = "proto3";
message AA {
message nested_array {
repeated double c = 2;
}
string a = 1;
repeated nested_array b = 2;
}

View File

@ -0,0 +1,41 @@
one [[1,2,3],[0.5,0.25],[],[4,5],[0.125,0.0625],[6]]
Binary representation:
00000000 6b 0a 03 6f 6e 65 12 1a 12 18 00 00 00 00 00 00 |k..one..........|
00000010 f0 3f 00 00 00 00 00 00 00 40 00 00 00 00 00 00 |.?.......@......|
00000020 08 40 12 12 12 10 00 00 00 00 00 00 e0 3f 00 00 |.@...........?..|
00000030 00 00 00 00 d0 3f 12 00 12 12 12 10 00 00 00 00 |.....?..........|
00000040 00 00 10 40 00 00 00 00 00 00 14 40 12 12 12 10 |...@.......@....|
00000050 00 00 00 00 00 00 c0 3f 00 00 00 00 00 00 b0 3f |.......?.......?|
00000060 12 0a 12 08 00 00 00 00 00 00 18 40 |...........@|
0000006c
MESSAGE #1 AT 0x00000001
a: "one"
b {
c: 1
c: 2
c: 3
}
b {
c: 0.5
c: 0.25
}
b {
}
b {
c: 4
c: 5
}
b {
c: 0.125
c: 0.0625
}
b {
c: 6
}
Binary representation is as expected
one [[1,2,3],[0.5,0.25],[],[4,5],[0.125,0.0625],[6]]
one [[1,2,3],[0.5,0.25],[],[4,5],[0.125,0.0625],[6]]

View File

@ -0,0 +1,38 @@
#!/usr/bin/env bash
# https://github.com/ClickHouse/ClickHouse/issues/9069
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -eo pipefail
# Run the client.
$CLICKHOUSE_CLIENT --multiquery <<'EOF'
CREATE TABLE array_of_arrays_protobuf_00825
(
`a` String,
`b` Nested (
`c` Array(Float64)
)
) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO array_of_arrays_protobuf_00825 VALUES ('one', [[1,2,3],[0.5,0.25],[],[4,5],[0.125,0.0625],[6]]);
SELECT * FROM array_of_arrays_protobuf_00825;
EOF
BINARY_FILE_PATH=$(mktemp "$CURDIR/00825_protobuf_format_array_of_arrays.XXXXXX.binary")
$CLICKHOUSE_CLIENT --query "SELECT * FROM array_of_arrays_protobuf_00825 FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format_array_of_arrays:AA'" > "$BINARY_FILE_PATH"
# Check the output in the protobuf format
echo
$CURDIR/helpers/protobuf_length_delimited_encoder.py --decode_and_check --format_schema "$CURDIR/00825_protobuf_format_array_of_arrays:AA" --input "$BINARY_FILE_PATH"
# Check the input in the protobuf format (now the table contains the same data twice).
echo
$CLICKHOUSE_CLIENT --query "INSERT INTO array_of_arrays_protobuf_00825 FORMAT Protobuf SETTINGS format_schema='$CURDIR/00825_protobuf_format_array_of_arrays:AA'" < "$BINARY_FILE_PATH"
$CLICKHOUSE_CLIENT --query "SELECT * FROM array_of_arrays_protobuf_00825"
rm "$BINARY_FILE_PATH"

View File

@ -0,0 +1,13 @@
syntax = "proto3";
message Message
{
enum Enum
{
FIRST = 0;
SECOND = 1;
TEN = 10;
HUNDRED = 100;
};
Enum x = 1;
};

View File

@ -0,0 +1,31 @@
Second
Third
First
First
Second
Binary representation:
00000000 02 08 01 02 08 64 00 00 02 08 01 |.....d.....|
0000000b
MESSAGE #1 AT 0x00000001
x: SECOND
MESSAGE #2 AT 0x00000004
x: HUNDRED
MESSAGE #3 AT 0x00000007
MESSAGE #4 AT 0x00000008
MESSAGE #5 AT 0x00000009
x: SECOND
Binary representation is as expected
Second
Third
First
First
Second
Second
Third
First
First
Second

View File

@ -0,0 +1,37 @@
#!/usr/bin/env bash
# https://github.com/ClickHouse/ClickHouse/issues/7438
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -eo pipefail
# Run the client.
$CLICKHOUSE_CLIENT --multiquery <<'EOF'
DROP TABLE IF EXISTS enum_mapping_protobuf_00825;
CREATE TABLE enum_mapping_protobuf_00825
(
x Enum16('First'=-100, 'Second'=0, 'Third'=100)
) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO enum_mapping_protobuf_00825 VALUES ('Second'), ('Third'), ('First'), ('First'), ('Second');
SELECT * FROM enum_mapping_protobuf_00825;
EOF
BINARY_FILE_PATH=$(mktemp "$CURDIR/00825_protobuf_format_enum_mapping.XXXXXX.binary")
$CLICKHOUSE_CLIENT --query "SELECT * FROM enum_mapping_protobuf_00825 FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format_enum_mapping:Message'" > "$BINARY_FILE_PATH"
# Check the output in the protobuf format
echo
$CURDIR/helpers/protobuf_length_delimited_encoder.py --decode_and_check --format_schema "$CURDIR/00825_protobuf_format_enum_mapping:Message" --input "$BINARY_FILE_PATH"
# Check the input in the protobuf format (now the table contains the same data twice).
echo
$CLICKHOUSE_CLIENT --query "INSERT INTO enum_mapping_protobuf_00825 FORMAT Protobuf SETTINGS format_schema='$CURDIR/00825_protobuf_format_enum_mapping:Message'" < "$BINARY_FILE_PATH"
$CLICKHOUSE_CLIENT --query "SELECT * FROM enum_mapping_protobuf_00825"
rm "$BINARY_FILE_PATH"

View File

@ -0,0 +1,5 @@
syntax = "proto3";
message Message {
map<string, uint32> a = 1;
};

View File

@ -0,0 +1,19 @@
{'x':5,'y':7}
{'z':11}
{'temp':0}
{'':0}
Binary representation:
00000000 0e 0a 05 0a 01 78 10 05 0a 05 0a 01 79 10 07 07 |.....x......y...|
00000010 0a 05 0a 01 7a 10 0b 0a 0a 08 0a 04 74 65 6d 70 |....z.......temp|
00000020 10 00 06 0a 04 0a 00 10 00 |.........|
00000029
{'x':5,'y':7}
{'z':11}
{'temp':0}
{'':0}
{'x':5,'y':7}
{'z':11}
{'temp':0}
{'':0}

View File

@ -0,0 +1,40 @@
#!/usr/bin/env bash
# https://github.com/ClickHouse/ClickHouse/issues/6497
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -eo pipefail
# Run the client.
$CLICKHOUSE_CLIENT --multiquery <<'EOF'
SET allow_experimental_map_type = 1;
DROP TABLE IF EXISTS map_00825;
CREATE TABLE map_00825
(
a Map(String, UInt32)
) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO map_00825 VALUES ({'x':5, 'y':7}), ({'z':11}), ({'temp':0}), ({'':0});
SELECT * FROM map_00825;
EOF
BINARY_FILE_PATH=$(mktemp "$CURDIR/00825_protobuf_format_map.XXXXXX.binary")
$CLICKHOUSE_CLIENT --query "SELECT * FROM map_00825 FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format_map:Message'" > "$BINARY_FILE_PATH"
# Check the output in the protobuf format
echo
echo "Binary representation:"
hexdump -C $BINARY_FILE_PATH
# Check the input in the protobuf format (now the table contains the same data twice).
echo
$CLICKHOUSE_CLIENT --query "INSERT INTO map_00825 FORMAT Protobuf SETTINGS format_schema='$CURDIR/00825_protobuf_format_map:Message'" < "$BINARY_FILE_PATH"
$CLICKHOUSE_CLIENT --query "SELECT * FROM map_00825"
rm "$BINARY_FILE_PATH"

View File

@ -0,0 +1,10 @@
syntax = "proto3";
message Repeated {
string foo = 1;
int64 bar = 2;
}
message Message {
repeated Repeated messages = 1;
};

View File

@ -0,0 +1,25 @@
['1'] [0]
['1',''] [0,1]
Binary representation:
00000000 05 0a 03 0a 01 31 09 0a 03 0a 01 31 0a 02 10 01 |.....1.....1....|
00000010
MESSAGE #1 AT 0x00000001
messages {
foo: "1"
}
MESSAGE #2 AT 0x00000007
messages {
foo: "1"
}
messages {
bar: 1
}
Binary representation is as expected
['1'] [0]
['1',''] [0,1]
['1'] [0]
['1',''] [0,1]

View File

@ -0,0 +1,41 @@
#!/usr/bin/env bash
# https://github.com/ClickHouse/ClickHouse/issues/6497
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -eo pipefail
# Run the client.
$CLICKHOUSE_CLIENT --multiquery <<'EOF'
DROP TABLE IF EXISTS nested_optional_protobuf_00825;
CREATE TABLE nested_optional_protobuf_00825
(
messages Nested
(
foo String,
bar Int64
)
) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO nested_optional_protobuf_00825 VALUES (['1'], [0]), (['1', ''], [0, 1]);
SELECT * FROM nested_optional_protobuf_00825;
EOF
BINARY_FILE_PATH=$(mktemp "$CURDIR/00825_protobuf_format_nested_optional.XXXXXX.binary")
$CLICKHOUSE_CLIENT --query "SELECT * FROM nested_optional_protobuf_00825 FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format_nested_optional:Message'" > "$BINARY_FILE_PATH"
# Check the output in the protobuf format
echo
$CURDIR/helpers/protobuf_length_delimited_encoder.py --decode_and_check --format_schema "$CURDIR/00825_protobuf_format_nested_optional:Message" --input "$BINARY_FILE_PATH"
# Check the input in the protobuf format (now the table contains the same data twice).
echo
$CLICKHOUSE_CLIENT --query "INSERT INTO nested_optional_protobuf_00825 FORMAT Protobuf SETTINGS format_schema='$CURDIR/00825_protobuf_format_nested_optional:Message'" < "$BINARY_FILE_PATH"
$CLICKHOUSE_CLIENT --query "SELECT * FROM nested_optional_protobuf_00825"
rm "$BINARY_FILE_PATH"

View File

@ -0,0 +1,6 @@
syntax = "proto3";
message Message {
sint32 x = 1;
sint32 z = 2;
};

View File

@ -0,0 +1,37 @@
0 0 0
2 4 8
3 9 27
5 25 125
101 102 103
Binary representation:
00000000 00 04 08 04 10 10 04 08 06 10 36 05 08 0a 10 fa |..........6.....|
00000010 01 06 08 ca 01 10 ce 01 |........|
00000018
MESSAGE #1 AT 0x00000001
MESSAGE #2 AT 0x00000002
x: 2
z: 8
MESSAGE #3 AT 0x00000007
x: 3
z: 27
MESSAGE #4 AT 0x0000000C
x: 5
z: 125
MESSAGE #5 AT 0x00000012
x: 101
z: 103
Binary representation is as expected
0 0 0
0 0 0
2 4 8
2 4 8
3 9 27
3 9 27
5 25 125
5 25 125
101 102 103
101 10201 103

View File

@ -0,0 +1,38 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -eo pipefail
# Run the client.
$CLICKHOUSE_CLIENT --multiquery <<'EOF'
DROP TABLE IF EXISTS table_default_protobuf_00825;
CREATE TABLE table_default_protobuf_00825
(
x Int64,
y Int64 DEFAULT x * x,
z Int64 DEFAULT x * x * x
) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO table_default_protobuf_00825 (x) VALUES (0), (2), (3), (5);
INSERT INTO table_default_protobuf_00825 VALUES (101, 102, 103);
SELECT * FROM table_default_protobuf_00825 ORDER BY x,y,z;
EOF
BINARY_FILE_PATH=$(mktemp "$CURDIR/00825_protobuf_format_table_default.XXXXXX.binary")
$CLICKHOUSE_CLIENT --query "SELECT * FROM table_default_protobuf_00825 ORDER BY x,y,z FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format_table_default:Message'" > "$BINARY_FILE_PATH"
# Check the output in the protobuf format
echo
$CURDIR/helpers/protobuf_length_delimited_encoder.py --decode_and_check --format_schema "$CURDIR/00825_protobuf_format_table_default:Message" --input "$BINARY_FILE_PATH"
# Check the input in the protobuf format (now the table contains the same data twice).
echo
$CLICKHOUSE_CLIENT --query "INSERT INTO table_default_protobuf_00825 FORMAT Protobuf SETTINGS format_schema='$CURDIR/00825_protobuf_format_table_default:Message'" < "$BINARY_FILE_PATH"
$CLICKHOUSE_CLIENT --query "SELECT * FROM table_default_protobuf_00825 ORDER BY x,y,z"
rm "$BINARY_FILE_PATH"

View File

@ -0,0 +1,180 @@
#!/usr/bin/env python3
# The protobuf compiler protoc doesn't support encoding or decoding length-delimited protobuf message.
# To do that this script has been written.
import argparse
import os.path
import struct
import subprocess
import sys
import tempfile
def read_varint(input):
res = 0
shift = 0
while True:
c = input.read(1)
if len(c) == 0:
return None
b = c[0]
if b < 0x80:
res += b << shift
break
b -= 0x80
res += b << shift
shift = shift << 7
return res
def write_varint(output, value):
while True:
if value < 0x80:
b = value
output.write(b.to_bytes(1, byteorder='little'))
break
b = (value & 0x7F) + 0x80
output.write(b.to_bytes(1, byteorder='little'))
value = value >> 7
def write_hexdump(output, data):
with subprocess.Popen(["hexdump", "-C"], stdin=subprocess.PIPE, stdout=output, shell=False) as proc:
proc.communicate(data)
if proc.returncode != 0:
raise RuntimeError("hexdump returned code " + str(proc.returncode))
output.flush()
class FormatSchemaSplitted:
def __init__(self, format_schema):
self.format_schema = format_schema
splitted = self.format_schema.split(':')
if len(splitted) < 2:
raise RuntimeError('The format schema must have the format "schemafile:MessageType"')
path = splitted[0]
self.schemadir = os.path.dirname(path)
self.schemaname = os.path.basename(path)
if not self.schemaname.endswith(".proto"):
self.schemaname = self.schemaname + ".proto"
self.message_type = splitted[1]
def decode(input, output, format_schema):
if not type(format_schema) is FormatSchemaSplitted:
format_schema = FormatSchemaSplitted(format_schema)
msgindex = 1
while True:
sz = read_varint(input)
if sz is None:
break
output.write("MESSAGE #{msgindex} AT 0x{msgoffset:08X}\n".format(msgindex=msgindex, msgoffset=input.tell()).encode())
output.flush()
msg = input.read(sz)
if len(msg) < sz:
raise EOFError('Unexpected end of file')
with subprocess.Popen(["protoc",
"--decode", format_schema.message_type, format_schema.schemaname],
cwd=format_schema.schemadir,
stdin=subprocess.PIPE,
stdout=output,
shell=False) as proc:
proc.communicate(msg)
if proc.returncode != 0:
raise RuntimeError("protoc returned code " + str(proc.returncode))
output.flush()
msgindex = msgindex + 1
def encode(input, output, format_schema):
if not type(format_schema) is FormatSchemaSplitted:
format_schema = FormatSchemaSplitted(format_schema)
line_offset = input.tell()
line = input.readline()
while True:
if len(line) == 0:
break
if not line.startswith(b"MESSAGE #"):
raise RuntimeError("The line at 0x{line_offset:08X} must start with the text 'MESSAGE #'".format(line_offset=line_offset))
msg = b""
while True:
line_offset = input.tell()
line = input.readline()
if line.startswith(b"MESSAGE #") or len(line) == 0:
break
msg += line
with subprocess.Popen(["protoc",
"--encode", format_schema.message_type, format_schema.schemaname],
cwd=format_schema.schemadir,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
shell=False) as proc:
msgbin = proc.communicate(msg)[0]
if proc.returncode != 0:
raise RuntimeError("protoc returned code " + str(proc.returncode))
write_varint(output, len(msgbin))
output.write(msgbin)
output.flush()
def decode_and_check(input, output, format_schema):
input_data = input.read()
output.write(b"Binary representation:\n")
output.flush()
write_hexdump(output, input_data)
output.write(b"\n")
output.flush()
with tempfile.TemporaryFile() as tmp_input, tempfile.TemporaryFile() as tmp_decoded, tempfile.TemporaryFile() as tmp_encoded:
tmp_input.write(input_data)
tmp_input.flush()
tmp_input.seek(0)
decode(tmp_input, tmp_decoded, format_schema)
tmp_decoded.seek(0)
decoded_text = tmp_decoded.read()
output.write(decoded_text)
output.flush()
tmp_decoded.seek(0)
encode(tmp_decoded, tmp_encoded, format_schema)
tmp_encoded.seek(0)
encoded_data = tmp_encoded.read()
if encoded_data == input_data:
output.write(b"\nBinary representation is as expected\n")
output.flush()
else:
output.write(b"\nBinary representation differs from the expected one (listed below):\n")
output.flush()
write_hexdump(output, encoded_data)
sys.exit(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Encodes or decodes length-delimited protobuf messages.')
parser.add_argument('--input', help='The input file, the standard input will be used if not specified.')
parser.add_argument('--output', help='The output file, the standard output will be used if not specified')
parser.add_argument('--format_schema', required=True, help='Format schema in the format "schemafile:MessageType"')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--encode', action='store_true', help='Specify to encode length-delimited messages.'
'The utility will read text-format messages of the given type from the input and write it in binary to the output.')
group.add_argument('--decode', action='store_true', help='Specify to decode length-delimited messages.'
'The utility will read messages in binary from the input and write text-format messages to the output.')
group.add_argument('--decode_and_check', action='store_true', help='The same as --decode, and the utility will then encode '
' the decoded data back to the binary form to check that the result of that encoding is the same as the input was.')
args = parser.parse_args()
custom_input_file = None
custom_output_file = None
try:
if args.input:
custom_input_file = open(args.input, "rb")
if args.output:
custom_output_file = open(args.output, "wb")
input = custom_input_file if custom_input_file else sys.stdin.buffer
output = custom_output_file if custom_output_file else sys.stdout.buffer
if args.encode:
encode(input, output, args.format_schema)
elif args.decode:
decode(input, output, args.format_schema)
elif args.decode_and_check:
decode_and_check(input, output, args.format_schema)
finally:
if custom_input_file:
custom_input_file.close()
if custom_output_file:
custom_output_file.close()

View File

@ -131,6 +131,12 @@
"00763_create_query_as_table_engine_bug",
"00765_sql_compatibility_aliases",
"00825_protobuf_format_input",
"00825_protobuf_format_nested_optional",
"00825_protobuf_format_array_3dim",
"00825_protobuf_format_map",
"00825_protobuf_format_array_of_arrays",
"00825_protobuf_format_table_default",
"00825_protobuf_format_enum_mapping",
"00826_cross_to_inner_join",
"00834_not_between",
"00909_kill_not_initialized_query",