diff --git a/dbms/programs/client/Client.cpp b/dbms/programs/client/Client.cpp index a51f7f9b6da..6b4a0c6eb58 100644 --- a/dbms/programs/client/Client.cpp +++ b/dbms/programs/client/Client.cpp @@ -966,8 +966,12 @@ private: /// Data format can be specified in the INSERT query. if (ASTInsertQuery * insert = typeid_cast(&*parsed_query)) + { if (!insert->format.empty()) current_format = insert->format; + if (insert->settings_ast) + InterpreterSetQuery(insert->settings_ast, context).executeForCurrentContext(); + } BlockInputStreamPtr block_input = context.getInputFormat( current_format, buf, sample, insert_format_max_block_size); diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 610856136ef..7d04c6b25c2 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -410,13 +410,15 @@ namespace ErrorCodes extern const int ILLEGAL_CODEC_PARAMETER = 433; extern const int CANNOT_PARSE_PROTOBUF_SCHEMA = 434; extern const int NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD = 435; - extern const int CANNOT_CONVERT_TO_PROTOBUF_TYPE = 436; + extern const int PROTOBUF_BAD_CAST = 436; extern const int PROTOBUF_FIELD_NOT_REPEATED = 437; extern const int DATA_TYPE_CANNOT_BE_PROMOTED = 438; extern const int CANNOT_SCHEDULE_TASK = 439; extern const int INVALID_LIMIT_EXPRESSION = 440; extern const int CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING = 441; extern const int BAD_DATABASE_FOR_TEMPORARY_TABLE = 442; + extern const int NO_COMMON_COLUMNS_WITH_PROTOBUF_SCHEMA = 443; + extern const int UNKNOWN_PROTOBUF_FORMAT = 444; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp index 5475b75b994..f9da736d4c4 100644 --- a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp +++ b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -17,7 +18,7 @@ namespace ErrorCodes InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( - const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context) + const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, Context & context) { const ASTInsertQuery * ast_insert_query = dynamic_cast(ast.get()); @@ -27,6 +28,8 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( String format = ast_insert_query->format; if (format.empty()) format = "Values"; + if (ast_insert_query->settings_ast) + InterpreterSetQuery(ast_insert_query->settings_ast, context).executeForCurrentContext(); /// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query. diff --git a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.h b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.h index 3ecda33289e..3c613a4a656 100644 --- a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.h +++ b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.h @@ -19,7 +19,7 @@ class Context; class InputStreamFromASTInsertQuery : public IBlockInputStream { public: - InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context); + InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, Context & context); Block readImpl() override { return res_stream->read(); } void readPrefixImpl() override { return res_stream->readPrefix(); } diff --git a/dbms/src/DataTypes/DataTypeAggregateFunction.cpp b/dbms/src/DataTypes/DataTypeAggregateFunction.cpp index 38f58268dbd..c3277bdd90a 100644 --- a/dbms/src/DataTypes/DataTypeAggregateFunction.cpp +++ b/dbms/src/DataTypes/DataTypeAggregateFunction.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -254,6 +255,36 @@ void DataTypeAggregateFunction::serializeProtobuf(const IColumn & column, size_t protobuf.writeAggregateFunction(function, static_cast(column).getData()[row_num]); } +void DataTypeAggregateFunction::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + ColumnAggregateFunction & column_concrete = static_cast(column); + Arena & arena = column_concrete.createOrGetArena(); + size_t size_of_state = function->sizeOfData(); + AggregateDataPtr place = arena.alignedAlloc(size_of_state, function->alignOfData()); + function->create(place); + try + { + if (!protobuf.readAggregateFunction(function, place, arena)) + { + function->destroy(place); + return; + } + auto & container = column_concrete.getData(); + if (allow_add_row) + { + container.emplace_back(place); + row_added = true; + } + else + container.back() = place; + } + catch (...) + { + function->destroy(place); + throw; + } +} MutableColumnPtr DataTypeAggregateFunction::createColumn() const { diff --git a/dbms/src/DataTypes/DataTypeAggregateFunction.h b/dbms/src/DataTypes/DataTypeAggregateFunction.h index 1bd5269ffe0..d156e982681 100644 --- a/dbms/src/DataTypes/DataTypeAggregateFunction.h +++ b/dbms/src/DataTypes/DataTypeAggregateFunction.h @@ -57,6 +57,7 @@ public: void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override; void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/DataTypeArray.cpp b/dbms/src/DataTypes/DataTypeArray.cpp index 54d2668ad2b..580ec888197 100644 --- a/dbms/src/DataTypes/DataTypeArray.cpp +++ b/dbms/src/DataTypes/DataTypeArray.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -442,6 +443,36 @@ void DataTypeArray::serializeProtobuf(const IColumn & column, size_t row_num, Pr } +void DataTypeArray::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + ColumnArray & column_array = static_cast(column); + IColumn & nested_column = column_array.getData(); + ColumnArray::Offsets & offsets = column_array.getOffsets(); + size_t old_size = offsets.size(); + try + { + bool nested_row_added; + do + nested->deserializeProtobuf(nested_column, protobuf, true, nested_row_added); + while (nested_row_added && protobuf.maybeCanReadValue()); + if (allow_add_row) + { + offsets.emplace_back(nested_column.size()); + row_added = true; + } + else + offsets.back() = nested_column.size(); + } + catch (...) + { + offsets.resize_assume_reserved(old_size); + nested_column.popBack(nested_column.size() - offsets.back()); + throw; + } +} + + MutableColumnPtr DataTypeArray::createColumn() const { return ColumnArray::create(nested->createColumn(), ColumnArray::ColumnOffsets::create()); diff --git a/dbms/src/DataTypes/DataTypeArray.h b/dbms/src/DataTypes/DataTypeArray.h index 802f5922342..0b985a63ad1 100644 --- a/dbms/src/DataTypes/DataTypeArray.h +++ b/dbms/src/DataTypes/DataTypeArray.h @@ -87,6 +87,10 @@ public: void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, + ProtobufReader & protobuf, + bool allow_add_row, + bool & row_added) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/DataTypeDate.cpp b/dbms/src/DataTypes/DataTypeDate.cpp index d5556cb3378..df69473c7b4 100644 --- a/dbms/src/DataTypes/DataTypeDate.cpp +++ b/dbms/src/DataTypes/DataTypeDate.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include @@ -78,6 +79,23 @@ void DataTypeDate::serializeProtobuf(const IColumn & column, size_t row_num, Pro protobuf.writeDate(DayNum(static_cast(column).getData()[row_num])); } +void DataTypeDate::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + DayNum d; + if (!protobuf.readDate(d)) + return; + + auto & container = static_cast(column).getData(); + if (allow_add_row) + { + container.emplace_back(d); + row_added = true; + } + else + container.back() = d; +} + bool DataTypeDate::equals(const IDataType & rhs) const { return typeid(rhs) == typeid(*this); diff --git a/dbms/src/DataTypes/DataTypeDate.h b/dbms/src/DataTypes/DataTypeDate.h index 9bc56cc3762..43118467d90 100644 --- a/dbms/src/DataTypes/DataTypeDate.h +++ b/dbms/src/DataTypes/DataTypeDate.h @@ -22,6 +22,7 @@ public: void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override; void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; bool canBeUsedAsVersion() const override { return true; } bool canBeInsideNullable() const override { return true; } diff --git a/dbms/src/DataTypes/DataTypeDateTime.cpp b/dbms/src/DataTypes/DataTypeDateTime.cpp index 5081b68e9f0..ad328c49a6c 100644 --- a/dbms/src/DataTypes/DataTypeDateTime.cpp +++ b/dbms/src/DataTypes/DataTypeDateTime.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -144,6 +145,23 @@ void DataTypeDateTime::serializeProtobuf(const IColumn & column, size_t row_num, protobuf.writeDateTime(static_cast(column).getData()[row_num]); } +void DataTypeDateTime::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + time_t t; + if (!protobuf.readDateTime(t)) + return; + + auto & container = static_cast(column).getData(); + if (allow_add_row) + { + container.emplace_back(t); + row_added = true; + } + else + container.back() = t; +} + bool DataTypeDateTime::equals(const IDataType & rhs) const { /// DateTime with different timezones are equal, because: diff --git a/dbms/src/DataTypes/DataTypeDateTime.h b/dbms/src/DataTypes/DataTypeDateTime.h index eba05814550..b13767d8bf8 100644 --- a/dbms/src/DataTypes/DataTypeDateTime.h +++ b/dbms/src/DataTypes/DataTypeDateTime.h @@ -47,6 +47,7 @@ public: void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override; void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; bool canBeUsedAsVersion() const override { return true; } bool canBeInsideNullable() const override { return true; } diff --git a/dbms/src/DataTypes/DataTypeEnum.cpp b/dbms/src/DataTypes/DataTypeEnum.cpp index f6566ad9040..f655e679883 100644 --- a/dbms/src/DataTypes/DataTypeEnum.cpp +++ b/dbms/src/DataTypes/DataTypeEnum.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -230,6 +231,25 @@ void DataTypeEnum::serializeProtobuf(const IColumn & column, size_t row_nu protobuf.writeEnum(static_cast(column).getData()[row_num]); } +template +void DataTypeEnum::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + protobuf.prepareEnumMapping(values); + row_added = false; + Type value; + if (!protobuf.readEnum(value)) + return; + + auto & container = static_cast(column).getData(); + if (allow_add_row) + { + container.emplace_back(value); + row_added = true; + } + else + container.back() = value; +} + template Field DataTypeEnum::getDefault() const { diff --git a/dbms/src/DataTypes/DataTypeEnum.h b/dbms/src/DataTypes/DataTypeEnum.h index 61104e70505..a1dae0130b0 100644 --- a/dbms/src/DataTypes/DataTypeEnum.h +++ b/dbms/src/DataTypes/DataTypeEnum.h @@ -106,6 +106,7 @@ public: void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, const size_t limit, const double avg_value_size_hint) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; MutableColumnPtr createColumn() const override { return ColumnType::create(); } diff --git a/dbms/src/DataTypes/DataTypeFixedString.cpp b/dbms/src/DataTypes/DataTypeFixedString.cpp index 17cb82b7c3d..fdc4d52c6fb 100644 --- a/dbms/src/DataTypes/DataTypeFixedString.cpp +++ b/dbms/src/DataTypes/DataTypeFixedString.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -122,30 +123,37 @@ void DataTypeFixedString::serializeTextEscaped(const IColumn & column, size_t ro } +static inline void alignStringLength(const DataTypeFixedString & type, + ColumnFixedString::Chars & data, + size_t string_start) +{ + size_t length = data.size() - string_start; + if (length < type.getN()) + { + data.resize_fill(string_start + type.getN()); + } + else if (length > type.getN()) + { + data.resize_assume_reserved(string_start); + throw Exception("Too large value for " + type.getName(), ErrorCodes::TOO_LARGE_STRING_SIZE); + } +} + template static inline void read(const DataTypeFixedString & self, IColumn & column, Reader && reader) { ColumnFixedString::Chars & data = typeid_cast(column).getChars(); size_t prev_size = data.size(); - try { reader(data); + alignStringLength(self, data, prev_size); } catch (...) { data.resize_assume_reserved(prev_size); throw; } - - if (data.size() < prev_size + self.getN()) - data.resize_fill(prev_size + self.getN()); - - if (data.size() > prev_size + self.getN()) - { - data.resize_assume_reserved(prev_size); - throw Exception("Too large value for " + self.getName(), ErrorCodes::TOO_LARGE_STRING_SIZE); - } } @@ -208,6 +216,44 @@ void DataTypeFixedString::serializeProtobuf(const IColumn & column, size_t row_n } +void DataTypeFixedString::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + auto & column_string = static_cast(column); + ColumnFixedString::Chars & data = column_string.getChars(); + size_t old_size = data.size(); + try + { + if (allow_add_row) + { + if (protobuf.readStringInto(data)) + { + alignStringLength(*this, data, old_size); + row_added = true; + } + else + data.resize_assume_reserved(old_size); + } + else + { + ColumnFixedString::Chars temp_data; + if (protobuf.readStringInto(temp_data)) + { + alignStringLength(*this, temp_data, 0); + column_string.popBack(1); + old_size = data.size(); + data.insertSmallAllowReadWriteOverflow15(temp_data.begin(), temp_data.end()); + } + } + } + catch (...) + { + data.resize_assume_reserved(old_size); + throw; + } +} + + MutableColumnPtr DataTypeFixedString::createColumn() const { return ColumnFixedString::create(n); diff --git a/dbms/src/DataTypes/DataTypeFixedString.h b/dbms/src/DataTypes/DataTypeFixedString.h index f929e09a7ee..75823342320 100644 --- a/dbms/src/DataTypes/DataTypeFixedString.h +++ b/dbms/src/DataTypes/DataTypeFixedString.h @@ -65,6 +65,7 @@ public: void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/DataTypeLowCardinality.cpp b/dbms/src/DataTypes/DataTypeLowCardinality.cpp index 8d6c457646d..105e989f69a 100644 --- a/dbms/src/DataTypes/DataTypeLowCardinality.cpp +++ b/dbms/src/DataTypes/DataTypeLowCardinality.cpp @@ -729,27 +729,43 @@ void DataTypeLowCardinality::deserializeBinary(Field & field, ReadBuffer & istr) dictionary_type->deserializeBinary(field, istr); } -template +void DataTypeLowCardinality::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + if (allow_add_row) + { + deserializeImpl(column, &IDataType::deserializeProtobuf, protobuf, true, row_added); + return; + } + + row_added = false; + auto & low_cardinality_column= getColumnLowCardinality(column); + auto nested_column = low_cardinality_column.getDictionary().getNestedColumn(); + auto temp_column = nested_column->cloneEmpty(); + size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(low_cardinality_column.size() - 1); + temp_column->insertFrom(*nested_column, unique_row_number); + bool dummy; + dictionary_type.get()->deserializeProtobuf(*temp_column, protobuf, false, dummy); + low_cardinality_column.popBack(1); + low_cardinality_column.insertFromFullColumn(*temp_column, 0); +} + +template void DataTypeLowCardinality::serializeImpl( - const IColumn & column, size_t row_num, - DataTypeLowCardinality::SerializeFunctionPtr func, - OutputStream & ostr, Args & ... args) const + const IColumn & column, size_t row_num, DataTypeLowCardinality::SerializeFunctionPtr func, Args &&... args) const { auto & low_cardinality_column = getColumnLowCardinality(column); size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(row_num); - (dictionary_type.get()->*func)(*low_cardinality_column.getDictionary().getNestedColumn(), unique_row_number, ostr, std::forward(args)...); + (dictionary_type.get()->*func)(*low_cardinality_column.getDictionary().getNestedColumn(), unique_row_number, std::forward(args)...); } -template +template void DataTypeLowCardinality::deserializeImpl( - IColumn & column, - DataTypeLowCardinality::DeserializeFunctionPtr func, - ReadBuffer & istr, Args & ... args) const + IColumn & column, DataTypeLowCardinality::DeserializeFunctionPtr func, Args &&... args) const { auto & low_cardinality_column= getColumnLowCardinality(column); auto temp_column = low_cardinality_column.getDictionary().getNestedColumn()->cloneEmpty(); - (dictionary_type.get()->*func)(*temp_column, istr, std::forward(args)...); + (dictionary_type.get()->*func)(*temp_column, std::forward(args)...); low_cardinality_column.insertFromFullColumn(*temp_column, 0); } diff --git a/dbms/src/DataTypes/DataTypeLowCardinality.h b/dbms/src/DataTypes/DataTypeLowCardinality.h index 8ba2a4d1c4d..cab13d5ec69 100644 --- a/dbms/src/DataTypes/DataTypeLowCardinality.h +++ b/dbms/src/DataTypes/DataTypeLowCardinality.h @@ -115,6 +115,8 @@ public: serializeImpl(column, row_num, &IDataType::serializeProtobuf, protobuf); } + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; + MutableColumnPtr createColumn() const override; Field getDefault() const override { return dictionary_type->getDefault(); } @@ -148,19 +150,17 @@ public: private: - template - using SerializeFunctionPtr = void (IDataType::*)(const IColumn &, size_t, OutputStream &, Args & ...) const; + template + using SerializeFunctionPtr = void (IDataType::*)(const IColumn &, size_t, Params ...) const; - template - void serializeImpl(const IColumn & column, size_t row_num, SerializeFunctionPtr func, - OutputStream & ostr, Args & ... args) const; + template + void serializeImpl(const IColumn & column, size_t row_num, SerializeFunctionPtr func, Args &&... args) const; - template - using DeserializeFunctionPtr = void (IDataType::*)(IColumn &, ReadBuffer &, Args & ...) const; + template + using DeserializeFunctionPtr = void (IDataType::*)(IColumn &, Params ...) const; - template - void deserializeImpl(IColumn & column, DeserializeFunctionPtr func, - ReadBuffer & istr, Args & ... args) const; + template + void deserializeImpl(IColumn & column, DeserializeFunctionPtr func, Args &&... args) const; template static MutableColumnUniquePtr createColumnUniqueImpl(const IDataType & keys_type, const Creator & creator); diff --git a/dbms/src/DataTypes/DataTypeNullable.cpp b/dbms/src/DataTypes/DataTypeNullable.cpp index 3cf798724ac..d12ec1ffef4 100644 --- a/dbms/src/DataTypes/DataTypeNullable.cpp +++ b/dbms/src/DataTypes/DataTypeNullable.cpp @@ -318,6 +318,26 @@ void DataTypeNullable::serializeProtobuf(const IColumn & column, size_t row_num, nested_data_type->serializeProtobuf(col.getNestedColumn(), row_num, protobuf); } +void DataTypeNullable::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + ColumnNullable & col = static_cast(column); + IColumn & nested_column = col.getNestedColumn(); + size_t old_size = nested_column.size(); + try + { + nested_data_type->deserializeProtobuf(nested_column, protobuf, allow_add_row, row_added); + if (row_added) + col.getNullMapData().push_back(0); + } + catch (...) + { + nested_column.popBack(nested_column.size() - old_size); + col.getNullMapData().resize_assume_reserved(old_size); + row_added = false; + throw; + } +} + MutableColumnPtr DataTypeNullable::createColumn() const { return ColumnNullable::create(nested_data_type->createColumn(), ColumnUInt8::create()); diff --git a/dbms/src/DataTypes/DataTypeNullable.h b/dbms/src/DataTypes/DataTypeNullable.h index 20a6b195c84..f83aa3a71cf 100644 --- a/dbms/src/DataTypes/DataTypeNullable.h +++ b/dbms/src/DataTypes/DataTypeNullable.h @@ -71,6 +71,7 @@ public: void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/DataTypeNumberBase.cpp b/dbms/src/DataTypes/DataTypeNumberBase.cpp index b93ef034c67..0245788b987 100644 --- a/dbms/src/DataTypes/DataTypeNumberBase.cpp +++ b/dbms/src/DataTypes/DataTypeNumberBase.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include @@ -210,6 +211,25 @@ void DataTypeNumberBase::serializeProtobuf(const IColumn & column, size_t row } +template +void DataTypeNumberBase::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + T value; + if (!protobuf.readNumber(value)) + return; + + auto & container = typeid_cast &>(column).getData(); + if (allow_add_row) + { + container.emplace_back(value); + row_added = true; + } + else + container.back() = value; +} + + template MutableColumnPtr DataTypeNumberBase::createColumn() const { diff --git a/dbms/src/DataTypes/DataTypeNumberBase.h b/dbms/src/DataTypes/DataTypeNumberBase.h index 2728d32a6a9..d88cd221625 100644 --- a/dbms/src/DataTypes/DataTypeNumberBase.h +++ b/dbms/src/DataTypes/DataTypeNumberBase.h @@ -37,6 +37,7 @@ public: void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/DataTypeString.cpp b/dbms/src/DataTypes/DataTypeString.cpp index 34077e4bbe7..11649e845d5 100644 --- a/dbms/src/DataTypes/DataTypeString.cpp +++ b/dbms/src/DataTypes/DataTypeString.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include #include @@ -237,10 +238,8 @@ static inline void read(IColumn & column, Reader && reader) ColumnString & column_string = static_cast(column); ColumnString::Chars & data = column_string.getChars(); ColumnString::Offsets & offsets = column_string.getOffsets(); - size_t old_chars_size = data.size(); size_t old_offsets_size = offsets.size(); - try { reader(data); @@ -310,6 +309,48 @@ void DataTypeString::serializeProtobuf(const IColumn & column, size_t row_num, P } +void DataTypeString::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + auto & column_string = static_cast(column); + ColumnString::Chars & data = column_string.getChars(); + ColumnString::Offsets & offsets = column_string.getOffsets(); + size_t old_size = offsets.size(); + try + { + if (allow_add_row) + { + if (protobuf.readStringInto(data)) + { + data.emplace_back(0); + offsets.emplace_back(data.size()); + row_added = true; + } + else + data.resize_assume_reserved(offsets.back()); + } + else + { + ColumnString::Chars temp_data; + if (protobuf.readStringInto(temp_data)) + { + temp_data.emplace_back(0); + column_string.popBack(1); + old_size = offsets.size(); + data.insertSmallAllowReadWriteOverflow15(temp_data.begin(), temp_data.end()); + offsets.emplace_back(data.size()); + } + } + } + catch (...) + { + offsets.resize_assume_reserved(old_size); + data.resize_assume_reserved(offsets.back()); + throw; + } +} + + MutableColumnPtr DataTypeString::createColumn() const { return ColumnString::create(); diff --git a/dbms/src/DataTypes/DataTypeString.h b/dbms/src/DataTypes/DataTypeString.h index 202c8374c27..7f69f46f290 100644 --- a/dbms/src/DataTypes/DataTypeString.h +++ b/dbms/src/DataTypes/DataTypeString.h @@ -46,6 +46,7 @@ public: void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/DataTypeTuple.cpp b/dbms/src/DataTypes/DataTypeTuple.cpp index 1940c44134f..8e7ce5f7c94 100644 --- a/dbms/src/DataTypes/DataTypeTuple.cpp +++ b/dbms/src/DataTypes/DataTypeTuple.cpp @@ -413,6 +413,22 @@ void DataTypeTuple::serializeProtobuf(const IColumn & column, size_t row_num, Pr elems[i]->serializeProtobuf(extractElementColumn(column, i), row_num, protobuf); } +void DataTypeTuple::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + bool all_elements_get_row = true; + addElementSafe(elems, column, [&] + { + for (const auto & i : ext::range(0, ext::size(elems))) + { + bool element_row_added; + elems[i]->deserializeProtobuf(extractElementColumn(column, i), protobuf, allow_add_row, element_row_added); + all_elements_get_row &= element_row_added; + } + }); + row_added = all_elements_get_row; +} + MutableColumnPtr DataTypeTuple::createColumn() const { size_t size = elems.size(); diff --git a/dbms/src/DataTypes/DataTypeTuple.h b/dbms/src/DataTypes/DataTypeTuple.h index d489ae4ba7f..5bef9f00566 100644 --- a/dbms/src/DataTypes/DataTypeTuple.h +++ b/dbms/src/DataTypes/DataTypeTuple.h @@ -78,6 +78,7 @@ public: DeserializeBinaryBulkStatePtr & state) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & reader, bool allow_add_row, bool & row_added) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/DataTypeUUID.cpp b/dbms/src/DataTypes/DataTypeUUID.cpp index 9f913b5bf80..9243b80721a 100644 --- a/dbms/src/DataTypes/DataTypeUUID.cpp +++ b/dbms/src/DataTypes/DataTypeUUID.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -77,6 +78,22 @@ void DataTypeUUID::serializeProtobuf(const IColumn & column, size_t row_num, Pro protobuf.writeUUID(UUID(static_cast(column).getData()[row_num])); } +void DataTypeUUID::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + UUID uuid; + if (!protobuf.readUUID(uuid)) + return; + + auto & container = static_cast(column).getData(); + if (allow_add_row) + { + container.emplace_back(uuid); + row_added = true; + } + else + container.back() = uuid; +} bool DataTypeUUID::equals(const IDataType & rhs) const { diff --git a/dbms/src/DataTypes/DataTypeUUID.h b/dbms/src/DataTypes/DataTypeUUID.h index 0a0ce6ad035..80b313aeb41 100644 --- a/dbms/src/DataTypes/DataTypeUUID.h +++ b/dbms/src/DataTypes/DataTypeUUID.h @@ -25,6 +25,7 @@ public: void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override; void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; bool canBeUsedInBitOperations() const override { return true; } bool canBeInsideNullable() const override { return true; } diff --git a/dbms/src/DataTypes/DataTypesDecimal.cpp b/dbms/src/DataTypes/DataTypesDecimal.cpp index b9ccb41af3d..f115118d67c 100644 --- a/dbms/src/DataTypes/DataTypesDecimal.cpp +++ b/dbms/src/DataTypes/DataTypesDecimal.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -141,6 +142,25 @@ void DataTypeDecimal::serializeProtobuf(const IColumn & column, size_t row_nu } +template +void DataTypeDecimal::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const +{ + row_added = false; + T decimal; + if (!protobuf.readDecimal(decimal, precision, scale)) + return; + + auto & container = static_cast(column).getData(); + if (allow_add_row) + { + container.emplace_back(decimal); + row_added = true; + } + else + container.back() = decimal; +} + + template Field DataTypeDecimal::getDefault() const { diff --git a/dbms/src/DataTypes/DataTypesDecimal.h b/dbms/src/DataTypes/DataTypesDecimal.h index 7159750c36b..9918e8c90fd 100644 --- a/dbms/src/DataTypes/DataTypesDecimal.h +++ b/dbms/src/DataTypes/DataTypesDecimal.h @@ -101,6 +101,7 @@ public: void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override; void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override; + void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override; Field getDefault() const override; bool canBePromoted() const override { return true; } diff --git a/dbms/src/DataTypes/IDataType.h b/dbms/src/DataTypes/IDataType.h index 6bb59106d10..681f81a1b00 100644 --- a/dbms/src/DataTypes/IDataType.h +++ b/dbms/src/DataTypes/IDataType.h @@ -23,6 +23,7 @@ using MutableColumnPtr = COWPtr::MutablePtr; using DataTypePtr = std::shared_ptr; using DataTypes = std::vector; +class ProtobufReader; class ProtobufWriter; @@ -254,6 +255,7 @@ public: /** Serialize to a protobuf. */ virtual void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const = 0; + virtual void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const = 0; protected: virtual String doGetName() const; diff --git a/dbms/src/DataTypes/IDataTypeDummy.h b/dbms/src/DataTypes/IDataTypeDummy.h index bb122126577..1aa5135c53e 100644 --- a/dbms/src/DataTypes/IDataTypeDummy.h +++ b/dbms/src/DataTypes/IDataTypeDummy.h @@ -27,8 +27,9 @@ public: void serializeBinaryBulk(const IColumn &, WriteBuffer &, size_t, size_t) const override { throwNoSerialization(); } void deserializeBinaryBulk(IColumn &, ReadBuffer &, size_t, double) const override { throwNoSerialization(); } void serializeText(const IColumn &, size_t, WriteBuffer &, const FormatSettings &) const override { throwNoSerialization(); } - void deserializeText(IColumn &, ReadBuffer &, const FormatSettings &) const override { throwNoSerialization(); } - void serializeProtobuf(const IColumn &, size_t, ProtobufWriter &) const override { throwNoSerialization(); } + void deserializeText(IColumn &, ReadBuffer &, const FormatSettings &) const override { throwNoSerialization(); } + void serializeProtobuf(const IColumn &, size_t, ProtobufWriter &) const override { throwNoSerialization(); } + void deserializeProtobuf(IColumn &, ProtobufReader &, bool, bool &) const override { throwNoSerialization(); } MutableColumnPtr createColumn() const override { @@ -50,4 +51,3 @@ public: }; } - diff --git a/dbms/src/Formats/FormatFactory.cpp b/dbms/src/Formats/FormatFactory.cpp index c17b9422f2d..9b54720bf45 100644 --- a/dbms/src/Formats/FormatFactory.cpp +++ b/dbms/src/Formats/FormatFactory.cpp @@ -114,6 +114,7 @@ void registerInputFormatJSONEachRow(FormatFactory & factory); void registerOutputFormatJSONEachRow(FormatFactory & factory); void registerInputFormatParquet(FormatFactory & factory); void registerOutputFormatParquet(FormatFactory & factory); +void registerInputFormatProtobuf(FormatFactory & factory); void registerOutputFormatProtobuf(FormatFactory & factory); /// Output only (presentational) formats. @@ -150,6 +151,7 @@ FormatFactory::FormatFactory() registerOutputFormatTSKV(*this); registerInputFormatJSONEachRow(*this); registerOutputFormatJSONEachRow(*this); + registerInputFormatProtobuf(*this); registerOutputFormatProtobuf(*this); registerInputFormatCapnProto(*this); registerInputFormatParquet(*this); diff --git a/dbms/src/Formats/FormatSchemaInfo.cpp b/dbms/src/Formats/FormatSchemaInfo.cpp index b7a5da7bf04..5fcf1f981eb 100644 --- a/dbms/src/Formats/FormatSchemaInfo.cpp +++ b/dbms/src/Formats/FormatSchemaInfo.cpp @@ -20,7 +20,7 @@ FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & schem { throw Exception( "Format schema requires the 'format_schema' setting to have the 'schema_file:message_name' format" - + (schema_file_extension.empty() ? "" : "e.g. 'schema." + schema_file_extension + ":Message'"), + + (schema_file_extension.empty() ? "" : ", e.g. 'schema." + schema_file_extension + ":Message'"), ErrorCodes::BAD_ARGUMENTS); } return; @@ -29,11 +29,11 @@ FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & schem size_t colon_pos = format_schema.find(':'); Poco::Path path; if ((colon_pos == String::npos) || (colon_pos == 0) || (colon_pos == format_schema.length() - 1) - || path.assign(format_schema.substr(0, colon_pos)).getFileName().empty()) + || path.assign(format_schema.substr(0, colon_pos)).makeFile().getFileName().empty()) { throw Exception( "Format schema requires the 'format_schema' setting to have the 'schema_file:message_name' format" - + (schema_file_extension.empty() ? "" : "e.g. 'schema." + schema_file_extension + ":Message'") + ". Got '" + format_schema + + (schema_file_extension.empty() ? "" : ", e.g. 'schema." + schema_file_extension + ":Message'") + ". Got '" + format_schema + "'", ErrorCodes::BAD_ARGUMENTS); } diff --git a/dbms/src/Formats/ProtobufColumnMatcher.cpp b/dbms/src/Formats/ProtobufColumnMatcher.cpp new file mode 100644 index 00000000000..af092fe3db9 --- /dev/null +++ b/dbms/src/Formats/ProtobufColumnMatcher.cpp @@ -0,0 +1,58 @@ +#include +#if USE_PROTOBUF + +#include +#include +#include +#include +#include + + +namespace DB +{ +namespace ErrorCodes +{ + extern const int NO_COMMON_COLUMNS_WITH_PROTOBUF_SCHEMA; +} + + +namespace +{ + String columnNameToSearchableForm(const String & str) + { + return Poco::replace(Poco::toUpper(str), ".", "_"); + } +} + +namespace ProtobufColumnMatcher +{ + namespace details + { + ColumnNameMatcher::ColumnNameMatcher(const std::vector & column_names) : column_usage(column_names.size()) + { + column_usage.resize(column_names.size(), false); + for (size_t i = 0; i != column_names.size(); ++i) + column_name_to_index_map.emplace(columnNameToSearchableForm(column_names[i]), i); + } + + size_t ColumnNameMatcher::findColumn(const String & field_name) + { + auto it = column_name_to_index_map.find(columnNameToSearchableForm(field_name)); + if (it == column_name_to_index_map.end()) + return -1; + size_t column_index = it->second; + if (column_usage[column_index]) + return -1; + column_usage[column_index] = true; + return column_index; + } + + void throwNoCommonColumns() + { + throw Exception("No common columns with provided protobuf schema", ErrorCodes::NO_COMMON_COLUMNS_WITH_PROTOBUF_SCHEMA); + } + } +} + +} +#endif diff --git a/dbms/src/Formats/ProtobufColumnMatcher.h b/dbms/src/Formats/ProtobufColumnMatcher.h new file mode 100644 index 00000000000..2d689276627 --- /dev/null +++ b/dbms/src/Formats/ProtobufColumnMatcher.h @@ -0,0 +1,160 @@ +#pragma once + +#include +#if USE_PROTOBUF + +#include +#include +#include +#include +#include +#include +#include + +namespace google +{ +namespace protobuf +{ + class Descriptor; + class FieldDescriptor; +} +} + + +namespace DB +{ +namespace ProtobufColumnMatcher +{ + struct DefaultTraits + { + using MessageData = boost::blank; + using FieldData = boost::blank; + }; + + template + struct Message; + + /// Represents a field in a protobuf message. + template + struct Field + { + const google::protobuf::FieldDescriptor * field_descriptor = nullptr; + + /// Same as field_descriptor->number(). + UInt32 field_number = 0; + + /// Index of a column; either 'column_index' or 'nested_message' is set. + size_t column_index = -1; + std::unique_ptr> nested_message; + + typename Traits::FieldData data; + }; + + /// Represents a protobuf message. + template + struct Message + { + std::vector> fields; + + /// Points to the parent message if this is a nested message. + Message * parent = nullptr; + size_t index_in_parent = -1; + + typename Traits::MessageData data; + }; + + /// Utility function finding matching columns for each protobuf field. + template + static std::unique_ptr> matchColumns( + const std::vector & column_names, + const google::protobuf::Descriptor * message_type); + + namespace details + { + void throwNoCommonColumns(); + + class ColumnNameMatcher + { + public: + ColumnNameMatcher(const std::vector & column_names); + size_t findColumn(const String & field_name); + + private: + std::unordered_map column_name_to_index_map; + std::vector column_usage; + }; + + template + std::unique_ptr> matchColumnsRecursive( + ColumnNameMatcher & name_matcher, + const google::protobuf::Descriptor * message_type, + const String & field_name_prefix) + { + auto message = std::make_unique>(); + for (int i = 0; i != message_type->field_count(); ++i) + { + const google::protobuf::FieldDescriptor * field_descriptor = message_type->field(i); + if ((field_descriptor->type() == google::protobuf::FieldDescriptor::TYPE_MESSAGE) + || (field_descriptor->type() == google::protobuf::FieldDescriptor::TYPE_GROUP)) + { + auto nested_message = matchColumnsRecursive( + name_matcher, field_descriptor->message_type(), field_name_prefix + field_descriptor->name() + "."); + if (nested_message) + { + message->fields.emplace_back(); + auto & current_field = message->fields.back(); + current_field.field_number = field_descriptor->number(); + current_field.field_descriptor = field_descriptor; + current_field.nested_message = std::move(nested_message); + current_field.nested_message->parent = message.get(); + } + } + else + { + size_t column_index = name_matcher.findColumn(field_name_prefix + field_descriptor->name()); + if (column_index != static_cast(-1)) + { + message->fields.emplace_back(); + auto & current_field = message->fields.back(); + current_field.field_number = field_descriptor->number(); + current_field.field_descriptor = field_descriptor; + current_field.column_index = column_index; + } + } + } + + if (message->fields.empty()) + return nullptr; + + // Columns should be sorted by field_number, it's necessary for writing protobufs and useful reading protobufs. + std::sort(message->fields.begin(), message->fields.end(), [](const Field & left, const Field & right) + { + return left.field_number < right.field_number; + }); + + for (size_t i = 0; i != message->fields.size(); ++i) + { + auto & field = message->fields[i]; + if (field.nested_message) + field.nested_message->index_in_parent = i; + } + + return message; + } + } + + template + static std::unique_ptr> matchColumns( + const std::vector & column_names, + const google::protobuf::Descriptor * message_type) + { + details::ColumnNameMatcher name_matcher(column_names); + auto message = details::matchColumnsRecursive(name_matcher, message_type, ""); + if (!message) + details::throwNoCommonColumns(); + return message; + } +} + +} +#endif diff --git a/dbms/src/Formats/ProtobufReader.cpp b/dbms/src/Formats/ProtobufReader.cpp new file mode 100644 index 00000000000..2676133c9be --- /dev/null +++ b/dbms/src/Formats/ProtobufReader.cpp @@ -0,0 +1,1152 @@ +#include +#if USE_PROTOBUF + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ +namespace ErrorCodes +{ + extern const int UNKNOWN_PROTOBUF_FORMAT; + extern const int PROTOBUF_BAD_CAST; +} + + +namespace +{ + enum WireType + { + VARINT = 0, + BITS64 = 1, + LENGTH_DELIMITED = 2, + GROUP_START = 3, + GROUP_END = 4, + BITS32 = 5, + }; + + // The following inequation should be always true to simplify conditions: + // REACHED_END < any cursor position < min(END_OF_VARINT, END_OF_GROUP) + constexpr UInt64 END_OF_VARINT = static_cast(-1); + constexpr UInt64 END_OF_GROUP = static_cast(-2); + + Int64 decodeZigZag(UInt64 n) { return static_cast((n >> 1) ^ (~(n & 1) + 1)); } + + void unknownFormat() + { + throw Exception("Protobuf messages are corrupted or doesn't match the provided schema", ErrorCodes::UNKNOWN_PROTOBUF_FORMAT); + } +} + + + +ProtobufReader::SimpleReader::SimpleReader(ReadBuffer & in_) + : in(in_) + , cursor(1 /* Should be greater than REACHED_END to simplify conditions */) + , current_message_end(REACHED_END) + , field_end(REACHED_END) +{ +} + +bool ProtobufReader::SimpleReader::startMessage() +{ + if ((current_message_end == REACHED_END) && parent_message_ends.empty()) + { + // Start reading a root message. + if (in.eof()) + return false; + size_t size_of_message = readVarint(); + current_message_end = cursor + size_of_message; + } + else + { + // Start reading a nested message which is located inside a length-delimited field + // of another message.s + parent_message_ends.emplace_back(current_message_end); + current_message_end = field_end; + } + field_end = REACHED_END; + return true; +} + +void ProtobufReader::SimpleReader::endMessage() +{ + if (current_message_end != REACHED_END) + { + if (current_message_end == END_OF_GROUP) + ignoreGroup(); + else if (cursor < current_message_end) + ignore(current_message_end - cursor); + else if (cursor > current_message_end) + { + if (!parent_message_ends.empty()) + unknownFormat(); + moveCursorBackward(cursor - current_message_end); + } + current_message_end = REACHED_END; + } + + field_end = REACHED_END; + if (!parent_message_ends.empty()) + { + current_message_end = parent_message_ends.back(); + parent_message_ends.pop_back(); + } +} + +void ProtobufReader::SimpleReader::endRootMessage() +{ + UInt64 message_end = parent_message_ends.empty() ? current_message_end : parent_message_ends.front(); + if (message_end != REACHED_END) + { + if (cursor < message_end) + ignore(message_end - cursor); + else if (cursor > message_end) + moveCursorBackward(cursor - message_end); + } + parent_message_ends.clear(); + current_message_end = REACHED_END; + field_end = REACHED_END; +} + +bool ProtobufReader::SimpleReader::readFieldNumber(UInt32 & field_number) +{ + if (field_end != REACHED_END) + { + if (field_end == END_OF_VARINT) + ignoreVarint(); + else if (field_end == END_OF_GROUP) + ignoreGroup(); + else if (cursor < field_end) + ignore(field_end - cursor); + field_end = REACHED_END; + } + + if (cursor >= current_message_end) + { + current_message_end = REACHED_END; + return false; + } + + UInt64 varint = readVarint(); + if (varint & (static_cast(0xFFFFFFFF) << 32)) + unknownFormat(); + UInt32 key = static_cast(varint); + field_number = (key >> 3); + WireType wire_type = static_cast(key & 0x07); + switch (wire_type) + { + case BITS64: + { + field_end = cursor + 8; + return true; + } + case LENGTH_DELIMITED: + { + size_t length = readVarint(); + field_end = cursor + length; + return true; + } + case VARINT: + { + field_end = END_OF_VARINT; + return true; + } + case GROUP_START: + { + field_end = END_OF_GROUP; + return true; + } + case GROUP_END: + { + if (current_message_end != END_OF_GROUP) + unknownFormat(); + current_message_end = REACHED_END; + return false; + } + case BITS32: + { + field_end = cursor + 4; + return true; + } + } + unknownFormat(); + __builtin_unreachable(); +} + +bool ProtobufReader::SimpleReader::readUInt(UInt64 & value) +{ + if (cursor >= field_end) + { + field_end = REACHED_END; + return false; + } + value = readVarint(); + if ((field_end == END_OF_VARINT) || (cursor >= field_end)) + field_end = REACHED_END; + return true; +} + +bool ProtobufReader::SimpleReader::readInt(Int64 & value) +{ + UInt64 varint; + if (!readUInt(varint)) + return false; + value = static_cast(varint); + return true; +} + +bool ProtobufReader::SimpleReader::readSInt(Int64 & value) +{ + UInt64 varint; + if (!readUInt(varint)) + return false; + value = decodeZigZag(varint); + return true; +} + +template +bool ProtobufReader::SimpleReader::readFixed(T & value) +{ + if (cursor >= field_end) + { + field_end = REACHED_END; + return false; + } + readBinary(&value, sizeof(T)); + if (cursor >= field_end) + field_end = REACHED_END; + return true; +} + +bool ProtobufReader::SimpleReader::readStringInto(PaddedPODArray & str) +{ + if (cursor > field_end) + return false; + size_t length = field_end - cursor; + size_t old_size = str.size(); + str.resize(old_size + length); + readBinary(reinterpret_cast(str.data() + old_size), length); + field_end = REACHED_END; + return true; +} + +void ProtobufReader::SimpleReader::readBinary(void* data, size_t size) +{ + in.readStrict(reinterpret_cast(data), size); + cursor += size; +} + +void ProtobufReader::SimpleReader::ignore(UInt64 num_bytes) +{ + in.ignore(num_bytes); + cursor += num_bytes; +} + +void ProtobufReader::SimpleReader::moveCursorBackward(UInt64 num_bytes) +{ + if (in.offset() < num_bytes) + unknownFormat(); + in.position() -= num_bytes; + cursor -= num_bytes; +} + +UInt64 ProtobufReader::SimpleReader::readVarint() +{ + char c; + UInt64 result = 0; + +#define PROTOBUF_READER_VARINT_READ_HELPER(i) \ + in.readStrict(&c, 1); \ + result |= static_cast(c) << (7 * i); \ + if constexpr (i < 9) \ + { \ + if (!(c & 0x80)) \ + { \ + cursor += i + 1; \ + return result; \ + } \ + if constexpr (i < 8) \ + result &= ((static_cast(0x80) << (7 * i)) - 1); \ + } \ + else \ + { \ + if (c == 1) \ + { \ + cursor += i + 1; \ + return result; \ + } \ + } + PROTOBUF_READER_VARINT_READ_HELPER(0); + PROTOBUF_READER_VARINT_READ_HELPER(1); + PROTOBUF_READER_VARINT_READ_HELPER(2); + PROTOBUF_READER_VARINT_READ_HELPER(3); + PROTOBUF_READER_VARINT_READ_HELPER(4); + PROTOBUF_READER_VARINT_READ_HELPER(5); + PROTOBUF_READER_VARINT_READ_HELPER(6); + PROTOBUF_READER_VARINT_READ_HELPER(7); + PROTOBUF_READER_VARINT_READ_HELPER(8); + PROTOBUF_READER_VARINT_READ_HELPER(9); +#undef PROTOBUF_READER_VARINT_READ_HELPER + + unknownFormat(); + return 0; +} + +void ProtobufReader::SimpleReader::ignoreVarint() +{ + char c; + +#define PROTOBUF_READER_VARINT_IGNORE_HELPER(i) \ + in.readStrict(&c, 1); \ + if constexpr (i < 9) \ + { \ + if (!(c & 0x80)) \ + { \ + cursor += i + 1; \ + return; \ + } \ + } \ + else \ + { \ + if (c == 1) \ + { \ + cursor += i + 1; \ + return; \ + } \ + } + PROTOBUF_READER_VARINT_IGNORE_HELPER(0); + PROTOBUF_READER_VARINT_IGNORE_HELPER(1); + PROTOBUF_READER_VARINT_IGNORE_HELPER(2); + PROTOBUF_READER_VARINT_IGNORE_HELPER(3); + PROTOBUF_READER_VARINT_IGNORE_HELPER(4); + PROTOBUF_READER_VARINT_IGNORE_HELPER(5); + PROTOBUF_READER_VARINT_IGNORE_HELPER(6); + PROTOBUF_READER_VARINT_IGNORE_HELPER(7); + PROTOBUF_READER_VARINT_IGNORE_HELPER(8); + PROTOBUF_READER_VARINT_IGNORE_HELPER(9); +#undef PROTOBUF_READER_VARINT_IGNORE_HELPER + + unknownFormat(); +} + +void ProtobufReader::SimpleReader::ignoreGroup() +{ + size_t level = 1; + while (true) + { + UInt64 varint = readVarint(); + WireType wire_type = static_cast(varint & 0x07); + switch (wire_type) + { + case VARINT: + { + ignoreVarint(); + break; + } + case BITS64: + { + ignore(8); + break; + } + case LENGTH_DELIMITED: + { + ignore(readVarint()); + break; + } + case GROUP_START: + { + ++level; + break; + } + case GROUP_END: + { + if (!--level) + return; + break; + } + case BITS32: + { + ignore(4); + break; + } + } + unknownFormat(); + } +} + + +class ProtobufReader::ConverterBaseImpl : public ProtobufReader::IConverter +{ +public: + ConverterBaseImpl(SimpleReader & simple_reader_, const google::protobuf::FieldDescriptor * field_) + : simple_reader(simple_reader_), field(field_) {} + + bool readStringInto(PaddedPODArray &) override + { + cannotConvertType("String"); + return false; + } + + bool readInt8(Int8 &) override + { + cannotConvertType("Int8"); + return false; + } + + bool readUInt8(UInt8 &) override + { + cannotConvertType("UInt8"); + return false; + } + + bool readInt16(Int16 &) override + { + cannotConvertType("Int16"); + return false; + } + + bool readUInt16(UInt16 &) override + { + cannotConvertType("UInt16"); + return false; + } + + bool readInt32(Int32 &) override + { + cannotConvertType("Int32"); + return false; + } + + bool readUInt32(UInt32 &) override + { + cannotConvertType("UInt32"); + return false; + } + + bool readInt64(Int64 &) override + { + cannotConvertType("Int64"); + return false; + } + + bool readUInt64(UInt64 &) override + { + cannotConvertType("UInt64"); + return false; + } + + bool readUInt128(UInt128 &) override + { + cannotConvertType("UInt128"); + return false; + } + + bool readFloat32(Float32 &) override + { + cannotConvertType("Float32"); + return false; + } + + bool readFloat64(Float64 &) override + { + cannotConvertType("Float64"); + return false; + } + + void prepareEnumMapping8(const std::vector> &) override {} + void prepareEnumMapping16(const std::vector> &) override {} + + bool readEnum8(Int8 &) override + { + cannotConvertType("Enum"); + return false; + } + + bool readEnum16(Int16 &) override + { + cannotConvertType("Enum"); + return false; + } + + bool readUUID(UUID &) override + { + cannotConvertType("UUID"); + return false; + } + + bool readDate(DayNum &) override + { + cannotConvertType("Date"); + return false; + } + + bool readDateTime(time_t &) override + { + cannotConvertType("DateTime"); + return false; + } + + bool readDecimal32(Decimal32 &, UInt32, UInt32) override + { + cannotConvertType("Decimal32"); + return false; + } + + bool readDecimal64(Decimal64 &, UInt32, UInt32) override + { + cannotConvertType("Decimal64"); + return false; + } + + bool readDecimal128(Decimal128 &, UInt32, UInt32) override + { + cannotConvertType("Decimal128"); + return false; + } + + bool readAggregateFunction(const AggregateFunctionPtr &, AggregateDataPtr, Arena &) override + { + cannotConvertType("AggregateFunction"); + return false; + } + +protected: + void cannotConvertType(const String & type_name) + { + throw Exception( + String("Could not convert type '") + field->type_name() + "' from protobuf field '" + field->name() + "' to data type '" + + type_name + "'", + ErrorCodes::PROTOBUF_BAD_CAST); + } + + void cannotConvertValue(const String & value, const String & type_name) + { + throw Exception( + "Could not convert value '" + value + "' from protobuf field '" + field->name() + "' to data type '" + type_name + "'", + ErrorCodes::PROTOBUF_BAD_CAST); + } + + template + To numericCast(From value) + { + if constexpr (std::is_same_v) + return value; + To result; + try + { + result = boost::numeric_cast(value); + } + catch (boost::numeric::bad_numeric_cast &) + { + cannotConvertValue(toString(value), TypeName::get()); + } + return result; + } + + template + To parseFromString(const PaddedPODArray & str) + { + try + { + To result; + ReadBufferFromString buf(str); + readText(result, buf); + return result; + } + catch (...) + { + cannotConvertValue(StringRef(str.data(), str.size()).toString(), TypeName::get()); + __builtin_unreachable(); + } + } + + SimpleReader & simple_reader; + const google::protobuf::FieldDescriptor * field; +}; + + + +class ProtobufReader::ConverterFromString : public ConverterBaseImpl +{ +public: + using ConverterBaseImpl::ConverterBaseImpl; + + bool readStringInto(PaddedPODArray & str) override { return simple_reader.readStringInto(str); } + + bool readInt8(Int8 & value) override { return readNumeric(value); } + bool readUInt8(UInt8 & value) override { return readNumeric(value); } + bool readInt16(Int16 & value) override { return readNumeric(value); } + bool readUInt16(UInt16 & value) override { return readNumeric(value); } + bool readInt32(Int32 & value) override { return readNumeric(value); } + bool readUInt32(UInt32 & value) override { return readNumeric(value); } + bool readInt64(Int64 & value) override { return readNumeric(value); } + bool readUInt64(UInt64 & value) override { return readNumeric(value); } + bool readFloat32(Float32 & value) override { return readNumeric(value); } + bool readFloat64(Float64 & value) override { return readNumeric(value); } + + void prepareEnumMapping8(const std::vector> & name_value_pairs) override + { + prepareEnumNameToValueMap(name_value_pairs); + } + void prepareEnumMapping16(const std::vector> & name_value_pairs) override + { + prepareEnumNameToValueMap(name_value_pairs); + } + + bool readEnum8(Int8 & value) override { return readEnum(value); } + bool readEnum16(Int16 & value) override { return readEnum(value); } + + bool readUUID(UUID & uuid) override + { + if (!readTempString()) + return false; + ReadBufferFromString buf(temp_string); + readUUIDText(uuid, buf); + return true; + } + + bool readDate(DayNum & date) override + { + if (!readTempString()) + return false; + ReadBufferFromString buf(temp_string); + readDateText(date, buf); + return true; + } + + bool readDateTime(time_t & tm) override + { + if (!readTempString()) + return false; + ReadBufferFromString buf(temp_string); + readDateTimeText(tm, buf); + return true; + } + + bool readDecimal32(Decimal32 & decimal, UInt32 precision, UInt32 scale) override { return readDecimal(decimal, precision, scale); } + bool readDecimal64(Decimal64 & decimal, UInt32 precision, UInt32 scale) override { return readDecimal(decimal, precision, scale); } + bool readDecimal128(Decimal128 & decimal, UInt32 precision, UInt32 scale) override { return readDecimal(decimal, precision, scale); } + + bool readAggregateFunction(const AggregateFunctionPtr & function, AggregateDataPtr place, Arena & arena) override + { + if (!readTempString()) + return false; + ReadBufferFromString buf(temp_string); + function->deserialize(place, buf, &arena); + return true; + } + +private: + bool readTempString() + { + temp_string.clear(); + return simple_reader.readStringInto(temp_string); + } + + template + bool readNumeric(T & value) + { + if (!readTempString()) + return false; + value = parseFromString(temp_string); + return true; + } + + template + bool readEnum(T & value) + { + if (!readTempString()) + return false; + StringRef strref = StringRef(temp_string.data(), temp_string.size()); + auto it = enum_name_to_value_map->find(strref); + if (it == enum_name_to_value_map->end()) + cannotConvertValue(strref.toString(), "Enum"); + value = static_cast(it->second); + return true; + } + + template + bool readDecimal(Decimal & decimal, UInt32 precision, UInt32 scale) + { + if (!readTempString()) + return false; + ReadBufferFromString buf(temp_string); + DataTypeDecimal>::readText(decimal, buf, precision, scale); + return true; + } + + template + void prepareEnumNameToValueMap(const std::vector> & name_value_pairs) + { + if (enum_name_to_value_map.has_value()) + return; + enum_name_to_value_map.emplace(); + for (const auto & name_value_pair : name_value_pairs) + enum_name_to_value_map->emplace(name_value_pair.first, name_value_pair.second); + } + + PaddedPODArray temp_string; + std::optional> enum_name_to_value_map; +}; + +#define PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_STRINGS(field_type_id) \ + template<> \ + class ProtobufReader::ConverterImpl : public ConverterFromString \ + { \ + using ConverterFromString::ConverterFromString; \ + } +PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_STRINGS(google::protobuf::FieldDescriptor::TYPE_STRING); +PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_STRINGS(google::protobuf::FieldDescriptor::TYPE_BYTES); +#undef PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_STRINGS + + + +template +class ProtobufReader::ConverterFromNumber : public ConverterBaseImpl +{ +public: + using ConverterBaseImpl::ConverterBaseImpl; + + bool readStringInto(PaddedPODArray & str) override + { + T number; + if (!readField(number)) + return false; + WriteBufferFromVector> buf(str); + writeText(number, buf); + return true; + } + + bool readInt8(Int8 & value) override { return readNumeric(value); } + bool readUInt8(UInt8 & value) override { return readNumeric(value); } + bool readInt16(Int16 & value) override { return readNumeric(value); } + bool readUInt16(UInt16 & value) override { return readNumeric(value); } + bool readInt32(Int32 & value) override { return readNumeric(value); } + bool readUInt32(UInt32 & value) override { return readNumeric(value); } + bool readInt64(Int64 & value) override { return readNumeric(value); } + bool readUInt64(UInt64 & value) override { return readNumeric(value); } + bool readFloat32(Float32 & value) override { return readNumeric(value); } + bool readFloat64(Float64 & value) override { return readNumeric(value); } + + bool readEnum8(Int8 & value) override { return readEnum(value); } + bool readEnum16(Int16 & value) override { return readEnum(value); } + + void prepareEnumMapping8(const std::vector> & name_value_pairs) override + { + prepareSetOfEnumValues(name_value_pairs); + } + void prepareEnumMapping16(const std::vector> & name_value_pairs) override + { + prepareSetOfEnumValues(name_value_pairs); + } + + bool readDate(DayNum & date) override + { + UInt16 number; + if (!readNumeric(number)) + return false; + date = DayNum(number); + return true; + } + + bool readDateTime(time_t & tm) override + { + UInt32 number; + if (!readNumeric(number)) + return false; + tm = number; + return true; + } + + bool readDecimal32(Decimal32 & decimal, UInt32, UInt32 scale) override { return readDecimal(decimal, scale); } + bool readDecimal64(Decimal64 & decimal, UInt32, UInt32 scale) override { return readDecimal(decimal, scale); } + bool readDecimal128(Decimal128 & decimal, UInt32, UInt32 scale) override { return readDecimal(decimal, scale); } + +private: + template + bool readNumeric(To & value) + { + T number; + if (!readField(number)) + return false; + value = numericCast(number); + return true; + } + + template + bool readEnum(EnumType & value) + { + if constexpr (!std::is_integral_v) + cannotConvertType("Enum"); // It's not correct to convert floating point to enum. + T number; + if (!readField(number)) + return false; + value = numericCast(number); + if (set_of_enum_values->find(value) == set_of_enum_values->end()) + cannotConvertValue(toString(value), "Enum"); + return true; + } + + template + void prepareSetOfEnumValues(const std::vector> & name_value_pairs) + { + if (set_of_enum_values.has_value()) + return; + set_of_enum_values.emplace(); + for (const auto & name_value_pair : name_value_pairs) + set_of_enum_values->emplace(name_value_pair.second); + } + + template + bool readDecimal(Decimal & decimal, UInt32 scale) + { + T number; + if (!readField(number)) + return false; + decimal.value = convertToDecimal, DataTypeDecimal>>(number, scale); + return true; + } + + bool readField(T & value) + { + if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_INT32) && std::is_same_v) + return simple_reader.readInt(value); + else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_SINT32) && std::is_same_v) + return simple_reader.readSInt(value); + else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_UINT32) && std::is_same_v) + return simple_reader.readUInt(value); + else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_INT64) && std::is_same_v) + return simple_reader.readInt(value); + else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_SINT64) && std::is_same_v) + return simple_reader.readSInt(value); + else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_UINT64) && std::is_same_v) + return simple_reader.readUInt(value); + else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_FIXED32) && std::is_same_v) + return simple_reader.readFixed(value); + else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_SFIXED32) && std::is_same_v) + return simple_reader.readFixed(value); + else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_FIXED64) && std::is_same_v) + return simple_reader.readFixed(value); + else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_SFIXED64) && std::is_same_v) + return simple_reader.readFixed(value); + else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_FLOAT) && std::is_same_v) + return simple_reader.readFixed(value); + else + { + static_assert((field_type_id == google::protobuf::FieldDescriptor::TYPE_DOUBLE) && std::is_same_v); + return simple_reader.readFixed(value); + } + } + + std::optional> set_of_enum_values; +}; + +#define PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(field_type_id, field_type) \ + template<> \ + class ProtobufReader::ConverterImpl : public ConverterFromNumber \ + { \ + using ConverterFromNumber::ConverterFromNumber; \ + } +PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_INT32, Int64); +PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_SINT32, Int64); +PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_UINT32, UInt64); +PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_INT64, Int64); +PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_SINT64, Int64); +PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_UINT64, UInt64); +PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_FIXED32, UInt32); +PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_SFIXED32, Int32); +PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_FIXED64, UInt64); +PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_SFIXED64, Int64); +PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_FLOAT, float); +PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_DOUBLE, double); +#undef PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS + + + +template<> +class ProtobufReader::ConverterImpl : public ConverterBaseImpl +{ +public: + using ConverterBaseImpl::ConverterBaseImpl; + + bool readStringInto(PaddedPODArray & str) override + { + bool b; + if (!readField(b)) + return false; + WriteBufferFromVector> buf(str); + writeString(StringRef(b ? "true" : "false"), buf); + return true; + } + + bool readInt8(Int8 & value) override { return readNumeric(value); } + bool readUInt8(UInt8 & value) override { return readNumeric(value); } + bool readInt16(Int16 & value) override { return readNumeric(value); } + bool readUInt16(UInt16 & value) override { return readNumeric(value); } + bool readInt32(Int32 & value) override { return readNumeric(value); } + bool readUInt32(UInt32 & value) override { return readNumeric(value); } + bool readInt64(Int64 & value) override { return readNumeric(value); } + bool readUInt64(UInt64 & value) override { return readNumeric(value); } + bool readFloat32(Float32 & value) override { return readNumeric(value); } + bool readFloat64(Float64 & value) override { return readNumeric(value); } + bool readDecimal32(Decimal32 & decimal, UInt32, UInt32) override { return readNumeric(decimal.value); } + bool readDecimal64(Decimal64 & decimal, UInt32, UInt32) override { return readNumeric(decimal.value); } + bool readDecimal128(Decimal128 & decimal, UInt32, UInt32) override { return readNumeric(decimal.value); } + +private: + template + bool readNumeric(T & value) + { + bool b; + if (!readField(b)) + return false; + value = b ? 1 : 0; + return true; + } + + bool readField(bool & b) + { + UInt64 number; + if (!simple_reader.readUInt(number)) + return false; + b = static_cast(number); + return true; + } +}; + + + +template<> +class ProtobufReader::ConverterImpl : public ConverterBaseImpl +{ +public: + using ConverterBaseImpl::ConverterBaseImpl; + + bool readStringInto(PaddedPODArray & str) override + { + prepareEnumPbNumberToNameMap(); + Int64 pbnumber; + if (!readField(pbnumber)) + return false; + auto it = enum_pbnumber_to_name_map->find(pbnumber); + if (it == enum_pbnumber_to_name_map->end()) + cannotConvertValue(toString(pbnumber), "Enum"); + + WriteBufferFromVector> buf(str); + writeString(it->second, buf); + return true; + } + + bool readInt8(Int8 & value) override { return readNumeric(value); } + bool readUInt8(UInt8 & value) override { return readNumeric(value); } + bool readInt16(Int16 & value) override { return readNumeric(value); } + bool readUInt16(UInt16 & value) override { return readNumeric(value); } + bool readInt32(Int32 & value) override { return readNumeric(value); } + bool readUInt32(UInt32 & value) override { return readNumeric(value); } + bool readInt64(Int64 & value) override { return readNumeric(value); } + bool readUInt64(UInt64 & value) override { return readNumeric(value); } + + void prepareEnumMapping8(const std::vector> & name_value_pairs) override + { + prepareEnumPbNumberToValueMap(name_value_pairs); + } + void prepareEnumMapping16(const std::vector> & name_value_pairs) override + { + prepareEnumPbNumberToValueMap(name_value_pairs); + } + + bool readEnum8(Int8 & value) override { return readEnum(value); } + bool readEnum16(Int16 & value) override { return readEnum(value); } + +private: + template + bool readNumeric(T & value) + { + Int64 pbnumber; + if (!readField(pbnumber)) + return false; + value = numericCast(pbnumber); + return true; + } + + template + bool readEnum(T & value) + { + Int64 pbnumber; + if (!readField(pbnumber)) + return false; + auto it = enum_pbnumber_to_value_map->find(pbnumber); + if (it == enum_pbnumber_to_value_map->end()) + cannotConvertValue(toString(pbnumber), "Enum"); + value = static_cast(it->second); + return true; + } + + void prepareEnumPbNumberToNameMap() + { + if (enum_pbnumber_to_name_map.has_value()) + return; + enum_pbnumber_to_name_map.emplace(); + const auto * enum_type = field->enum_type(); + for (int i = 0; i != enum_type->value_count(); ++i) + { + const auto * enum_value = enum_type->value(i); + enum_pbnumber_to_name_map->emplace(enum_value->number(), enum_value->name()); + } + } + + template + void prepareEnumPbNumberToValueMap(const std::vector> & name_value_pairs) + { + if (enum_pbnumber_to_value_map.has_value()) + return; + enum_pbnumber_to_value_map.emplace(); + for (const auto & name_value_pair : name_value_pairs) + { + Int16 value = name_value_pair.second; + const auto * enum_descriptor = field->enum_type()->FindValueByName(name_value_pair.first); + if (enum_descriptor) + enum_pbnumber_to_value_map->emplace(enum_descriptor->number(), value); + } + } + + bool readField(Int64 & enum_pbnumber) + { + return simple_reader.readInt(enum_pbnumber); + } + + std::optional> enum_pbnumber_to_name_map; + std::optional> enum_pbnumber_to_value_map; +}; + + +ProtobufReader::ProtobufReader( + ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector & column_names) + : simple_reader(in_) +{ + root_message = ProtobufColumnMatcher::matchColumns(column_names, message_type); + setTraitsDataAfterMatchingColumns(root_message.get()); +} + +ProtobufReader::~ProtobufReader() = default; + +void ProtobufReader::setTraitsDataAfterMatchingColumns(Message * message) +{ + for (Field & field : message->fields) + { + if (field.nested_message) + { + setTraitsDataAfterMatchingColumns(field.nested_message.get()); + continue; + } + switch (field.field_descriptor->type()) + { +#define PROTOBUF_READER_CONVERTER_CREATING_CASE(field_type_id) \ + case field_type_id: \ + field.data.converter = std::make_unique>(simple_reader, field.field_descriptor); \ + break + PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_STRING); + PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_BYTES); + PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_INT32); + PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_SINT32); + PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_UINT32); + PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_FIXED32); + PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_SFIXED32); + PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_INT64); + PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_SINT64); + PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_UINT64); + PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_FIXED64); + PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_SFIXED64); + PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_FLOAT); + PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_DOUBLE); + PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_BOOL); + PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_ENUM); +#undef PROTOBUF_READER_CONVERTER_CREATING_CASE + default: __builtin_unreachable(); + } + message->data.field_number_to_field_map.emplace(field.field_number, &field); + } +} + +bool ProtobufReader::startMessage() +{ + if (!simple_reader.startMessage()) + return false; + current_message = root_message.get(); + current_field_index = 0; + return true; +} + +void ProtobufReader::endMessage() +{ + simple_reader.endRootMessage(); + current_message = nullptr; + current_converter = nullptr; +} + +bool ProtobufReader::readColumnIndex(size_t & column_index) +{ + while (true) + { + UInt32 field_number; + if (!simple_reader.readFieldNumber(field_number)) + { + if (!current_message->parent) + { + current_converter = nullptr; + return false; + } + simple_reader.endMessage(); + current_field_index = current_message->index_in_parent; + current_message = current_message->parent; + continue; + } + + const Field * field = nullptr; + for (; current_field_index < current_message->fields.size(); ++current_field_index) + { + const Field & f = current_message->fields[current_field_index]; + if (f.field_number == field_number) + { + field = &f; + break; + } + if (f.field_number > field_number) + break; + } + + if (!field) + { + const auto & field_number_to_field_map = current_message->data.field_number_to_field_map; + auto it = field_number_to_field_map.find(field_number); + if (it == field_number_to_field_map.end()) + continue; + field = it->second; + } + + if (field->nested_message) + { + simple_reader.startMessage(); + current_message = field->nested_message.get(); + current_field_index = 0; + continue; + } + + column_index = field->column_index; + current_converter = field->data.converter.get(); + return true; + } +} + +} +#endif diff --git a/dbms/src/Formats/ProtobufReader.h b/dbms/src/Formats/ProtobufReader.h new file mode 100644 index 00000000000..751daf92f57 --- /dev/null +++ b/dbms/src/Formats/ProtobufReader.h @@ -0,0 +1,220 @@ +#pragma once + +#include +#include +#include +#include + +#include +#if USE_PROTOBUF + +#include +#include +#include + +namespace google +{ +namespace protobuf +{ + class Descriptor; +} +} + +namespace DB +{ +class Arena; +class IAggregateFunction; +class ReadBuffer; +using AggregateDataPtr = char *; +using AggregateFunctionPtr = std::shared_ptr; + + +/** Deserializes a protobuf, tries to cast data types if necessarily. + */ +class ProtobufReader : private boost::noncopyable +{ +public: + ProtobufReader(ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector & column_names); + ~ProtobufReader(); + + /// Should be called when we start reading a new message. + bool startMessage(); + + /// Ends reading a message. + void endMessage(); + + /// Reads the column index. + /// The function returns false if there are no more columns to read (call endMessage() in this case). + bool readColumnIndex(size_t & column_index); + + /// Reads a value which should be put to column at index received with readColumnIndex(). + /// The function returns false if there are no more values to read now (call readColumnIndex() in this case). + bool readNumber(Int8 & value) { return current_converter->readInt8(value); } + bool readNumber(UInt8 & value) { return current_converter->readUInt8(value); } + bool readNumber(Int16 & value) { return current_converter->readInt16(value); } + bool readNumber(UInt16 & value) { return current_converter->readUInt16(value); } + bool readNumber(Int32 & value) { return current_converter->readInt32(value); } + bool readNumber(UInt32 & value) { return current_converter->readUInt32(value); } + bool readNumber(Int64 & value) { return current_converter->readInt64(value); } + bool readNumber(UInt64 & value) { return current_converter->readUInt64(value); } + bool readNumber(UInt128 & value) { return current_converter->readUInt128(value); } + bool readNumber(Float32 & value) { return current_converter->readFloat32(value); } + bool readNumber(Float64 & value) { return current_converter->readFloat64(value); } + + bool readStringInto(PaddedPODArray & str) { return current_converter->readStringInto(str); } + + void prepareEnumMapping(const std::vector> & name_value_pairs) { current_converter->prepareEnumMapping8(name_value_pairs); } + void prepareEnumMapping(const std::vector> & name_value_pairs) { current_converter->prepareEnumMapping16(name_value_pairs); } + bool readEnum(Int8 & value) { return current_converter->readEnum8(value); } + bool readEnum(Int16 & value) { return current_converter->readEnum16(value); } + + bool readUUID(UUID & uuid) { return current_converter->readUUID(uuid); } + bool readDate(DayNum & date) { return current_converter->readDate(date); } + bool readDateTime(time_t & tm) { return current_converter->readDateTime(tm); } + + bool readDecimal(Decimal32 & decimal, UInt32 precision, UInt32 scale) { return current_converter->readDecimal32(decimal, precision, scale); } + bool readDecimal(Decimal64 & decimal, UInt32 precision, UInt32 scale) { return current_converter->readDecimal64(decimal, precision, scale); } + bool readDecimal(Decimal128 & decimal, UInt32 precision, UInt32 scale) { return current_converter->readDecimal128(decimal, precision, scale); } + + bool readAggregateFunction(const AggregateFunctionPtr & function, AggregateDataPtr place, Arena & arena) { return current_converter->readAggregateFunction(function, place, arena); } + + /// When it returns false there is no more values left and from now on all the read() functions will return false + /// until readColumnIndex() is called. When it returns true it's unclear. + bool maybeCanReadValue() const { return simple_reader.maybeCanReadValue(); } + +private: + class SimpleReader + { + public: + SimpleReader(ReadBuffer & in_); + bool startMessage(); + void endMessage(); + void endRootMessage(); + bool readFieldNumber(UInt32 & field_number); + bool readInt(Int64 & value); + bool readSInt(Int64 & value); + bool readUInt(UInt64 & value); + template bool readFixed(T & value); + bool readStringInto(PaddedPODArray & str); + bool maybeCanReadValue() const { return field_end != REACHED_END; } + + private: + void readBinary(void* data, size_t size); + void ignore(UInt64 num_bytes); + void moveCursorBackward(UInt64 num_bytes); + UInt64 readVarint(); + void ignoreVarint(); + void ignoreGroup(); + + static constexpr UInt64 REACHED_END = 0; + + ReadBuffer & in; + UInt64 cursor; + std::vector parent_message_ends; + UInt64 current_message_end; + UInt64 field_end; + }; + + class IConverter + { + public: + virtual ~IConverter() = default; + virtual bool readStringInto(PaddedPODArray &) = 0; + virtual bool readInt8(Int8&) = 0; + virtual bool readUInt8(UInt8 &) = 0; + virtual bool readInt16(Int16 &) = 0; + virtual bool readUInt16(UInt16 &) = 0; + virtual bool readInt32(Int32 &) = 0; + virtual bool readUInt32(UInt32 &) = 0; + virtual bool readInt64(Int64 &) = 0; + virtual bool readUInt64(UInt64 &) = 0; + virtual bool readUInt128(UInt128 &) = 0; + virtual bool readFloat32(Float32 &) = 0; + virtual bool readFloat64(Float64 &) = 0; + virtual void prepareEnumMapping8(const std::vector> &) = 0; + virtual void prepareEnumMapping16(const std::vector> &) = 0; + virtual bool readEnum8(Int8 &) = 0; + virtual bool readEnum16(Int16 &) = 0; + virtual bool readUUID(UUID &) = 0; + virtual bool readDate(DayNum &) = 0; + virtual bool readDateTime(time_t &) = 0; + virtual bool readDecimal32(Decimal32 &, UInt32, UInt32) = 0; + virtual bool readDecimal64(Decimal64 &, UInt32, UInt32) = 0; + virtual bool readDecimal128(Decimal128 &, UInt32, UInt32) = 0; + virtual bool readAggregateFunction(const AggregateFunctionPtr &, AggregateDataPtr, Arena &) = 0; + }; + + class ConverterBaseImpl; + template class ConverterImpl; + class ConverterFromString; + template class ConverterFromNumber; + + struct ColumnMatcherTraits + { + struct FieldData + { + std::unique_ptr converter; + }; + struct MessageData + { + std::unordered_map*> field_number_to_field_map; + }; + }; + using Message = ProtobufColumnMatcher::Message; + using Field = ProtobufColumnMatcher::Field; + + void setTraitsDataAfterMatchingColumns(Message * message); + + SimpleReader simple_reader; + std::unique_ptr root_message; + Message* current_message = nullptr; + size_t current_field_index = 0; + IConverter* current_converter = nullptr; +}; + +} + +#else + +namespace DB +{ +class Arena; +class IAggregateFunction; +class ReadBuffer; +using AggregateDataPtr = char *; +using AggregateFunctionPtr = std::shared_ptr; + +class ProtobufReader +{ +public: + bool startMessage() { return false; } + void endMessage() {} + bool readColumnIndex(size_t & column_index) { return false; } + bool readNumber(Int8 & value) { return false; } + bool readNumber(UInt8 & value) { return false; } + bool readNumber(Int16 & value) { return false; } + bool readNumber(UInt16 & value) { return false; } + bool readNumber(Int32 & value) { return false; } + bool readNumber(UInt32 & value) { return false; } + bool readNumber(Int64 & value) { return false; } + bool readNumber(UInt64 & value) { return false; } + bool readNumber(UInt128 & value) { return false; } + bool readNumber(Float32 & value) { return false; } + bool readNumber(Float64 & value) { return false; } + bool readStringInto(PaddedPODArray & str) { return false; } + void prepareEnumMapping(const std::vector> & name_value_pairs) {} + void prepareEnumMapping(const std::vector> & name_value_pairs) {} + bool readEnum(Int8 & value) { return false; } + bool readEnum(Int16 & value) { return false; } + bool readUUID(UUID & uuid) { return false; } + bool readDate(DayNum & date) { return false; } + bool readDateTime(time_t & tm) { return false; } + bool readDecimal(Decimal32 & decimal, UInt32 precision, UInt32 scale) { return false; } + bool readDecimal(Decimal64 & decimal, UInt32 precision, UInt32 scale) { return false; } + bool readDecimal(Decimal128 & decimal, UInt32 precision, UInt32 scale) { return false; } + bool readAggregateFunction(const AggregateFunctionPtr & function, AggregateDataPtr place, Arena & arena) { return false; } + bool maybeCanReadValue() const { return false; } +}; + +} +#endif diff --git a/dbms/src/Formats/ProtobufRowInputStream.cpp b/dbms/src/Formats/ProtobufRowInputStream.cpp new file mode 100644 index 00000000000..4eccafe9896 --- /dev/null +++ b/dbms/src/Formats/ProtobufRowInputStream.cpp @@ -0,0 +1,93 @@ +#include +#if USE_PROTOBUF + +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +ProtobufRowInputStream::ProtobufRowInputStream(ReadBuffer & in_, const Block & header, const FormatSchemaInfo & info) + : data_types(header.getDataTypes()), reader(in_, ProtobufSchemas::instance().getMessageTypeForFormatSchema(info), header.getNames()) +{ +} + +ProtobufRowInputStream::~ProtobufRowInputStream() = default; + +bool ProtobufRowInputStream::read(MutableColumns & columns, RowReadExtension & extra) +{ + if (!reader.startMessage()) + return false; // EOF reached, no more messages. + + // Set of columns for which the values were read. The rest will be filled with default values. + auto & read_columns = extra.read_columns; + read_columns.assign(columns.size(), false); + + // Read values from this message and put them to the columns while it's possible. + size_t column_index; + while (reader.readColumnIndex(column_index)) + { + bool allow_add_row = !static_cast(read_columns[column_index]); + do + { + bool row_added; + data_types[column_index]->deserializeProtobuf(*columns[column_index], reader, allow_add_row, row_added); + if (row_added) + { + read_columns[column_index] = true; + allow_add_row = false; + } + } while (reader.maybeCanReadValue()); + } + + // Fill non-visited columns with the default values. + for (column_index = 0; column_index < read_columns.size(); ++column_index) + if (!read_columns[column_index]) + data_types[column_index]->insertDefaultInto(*columns[column_index]); + + reader.endMessage(); + return true; +} + +bool ProtobufRowInputStream::allowSyncAfterError() const +{ + return true; +} + +void ProtobufRowInputStream::syncAfterError() +{ + reader.endMessage(); +} + + +void registerInputFormatProtobuf(FormatFactory & factory) +{ + factory.registerInputFormat("Protobuf", []( + ReadBuffer & buf, + const Block & sample, + const Context & context, + UInt64 max_block_size, + const FormatSettings & settings) + { + return std::make_shared( + std::make_shared(buf, sample, FormatSchemaInfo(context, "proto")), + sample, max_block_size, settings); + }); +} + +} + +#else + +namespace DB +{ +class FormatFactory; +void registerInputFormatProtobuf(FormatFactory & factory) {} +} + +#endif diff --git a/dbms/src/Formats/ProtobufRowInputStream.h b/dbms/src/Formats/ProtobufRowInputStream.h new file mode 100644 index 00000000000..535ac408b01 --- /dev/null +++ b/dbms/src/Formats/ProtobufRowInputStream.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#if USE_PROTOBUF + +#include +#include +#include + +namespace DB +{ +class Block; +class FormatSchemaInfo; + + +/** Interface of stream, that allows to read data by rows. + */ +class ProtobufRowInputStream : public IRowInputStream +{ +public: + ProtobufRowInputStream(ReadBuffer & in_, const Block & header, const FormatSchemaInfo & info); + ~ProtobufRowInputStream() override; + + bool read(MutableColumns & columns, RowReadExtension & extra) override; + bool allowSyncAfterError() const override; + void syncAfterError() override; + +private: + DataTypes data_types; + ProtobufReader reader; +}; + +} +#endif diff --git a/dbms/src/Formats/ProtobufWriter.cpp b/dbms/src/Formats/ProtobufWriter.cpp index 94c31a1c09c..dd36a7fa0ad 100644 --- a/dbms/src/Formats/ProtobufWriter.cpp +++ b/dbms/src/Formats/ProtobufWriter.cpp @@ -18,7 +18,7 @@ namespace ErrorCodes { extern const int NOT_IMPLEMENTED; extern const int NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD; - extern const int CANNOT_CONVERT_TO_PROTOBUF_TYPE; + extern const int PROTOBUF_BAD_CAST; extern const int PROTOBUF_FIELD_NOT_REPEATED; } @@ -67,14 +67,14 @@ protected: { throw Exception( "Could not convert data type '" + type_name + "' to protobuf type '" + field->type_name() + "' (field: " + field->name() + ")", - ErrorCodes::CANNOT_CONVERT_TO_PROTOBUF_TYPE); + ErrorCodes::PROTOBUF_BAD_CAST); } void cannotConvertValue(const String & value) { throw Exception( "Could not convert value '" + value + "' to protobuf type '" + field->type_name() + "' (field: " + field->name() + ")", - ErrorCodes::CANNOT_CONVERT_TO_PROTOBUF_TYPE); + ErrorCodes::PROTOBUF_BAD_CAST); } template diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.cpp b/dbms/src/Interpreters/InterpreterInsertQuery.cpp index 7f579b4f88e..8249ff88232 100644 --- a/dbms/src/Interpreters/InterpreterInsertQuery.cpp +++ b/dbms/src/Interpreters/InterpreterInsertQuery.cpp @@ -36,7 +36,7 @@ namespace ErrorCodes InterpreterInsertQuery::InterpreterInsertQuery( - const ASTPtr & query_ptr_, const Context & context_, bool allow_materialized_) + const ASTPtr & query_ptr_, Context & context_, bool allow_materialized_) : query_ptr(query_ptr_), context(context_), allow_materialized(allow_materialized_) { } diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.h b/dbms/src/Interpreters/InterpreterInsertQuery.h index 9cde2c274fe..9c7f9e4babc 100644 --- a/dbms/src/Interpreters/InterpreterInsertQuery.h +++ b/dbms/src/Interpreters/InterpreterInsertQuery.h @@ -15,7 +15,7 @@ namespace DB class InterpreterInsertQuery : public IInterpreter { public: - InterpreterInsertQuery(const ASTPtr & query_ptr_, const Context & context_, bool allow_materialized_ = false); + InterpreterInsertQuery(const ASTPtr & query_ptr_, Context & context_, bool allow_materialized_ = false); /** Prepare a request for execution. Return block streams * - the stream into which you can write data to execute the query, if INSERT; @@ -32,7 +32,7 @@ private: void checkAccess(const ASTInsertQuery & query); ASTPtr query_ptr; - const Context & context; + Context & context; bool allow_materialized; }; diff --git a/dbms/src/Parsers/ASTInsertQuery.cpp b/dbms/src/Parsers/ASTInsertQuery.cpp index 1ecf4f9daef..f4f85ce38f3 100644 --- a/dbms/src/Parsers/ASTInsertQuery.cpp +++ b/dbms/src/Parsers/ASTInsertQuery.cpp @@ -42,6 +42,12 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s settings.ostr << (settings.hilite ? hilite_keyword : "") << " VALUES" << (settings.hilite ? hilite_none : ""); } } + + if (settings_ast) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << "SETTINGS " << (settings.hilite ? hilite_none : ""); + settings_ast->formatImpl(settings, state, frame); + } } } diff --git a/dbms/src/Parsers/ASTInsertQuery.h b/dbms/src/Parsers/ASTInsertQuery.h index 17c8214c1ba..457e51beef2 100644 --- a/dbms/src/Parsers/ASTInsertQuery.h +++ b/dbms/src/Parsers/ASTInsertQuery.h @@ -18,6 +18,7 @@ public: String format; ASTPtr select; ASTPtr table_function; + ASTPtr settings_ast; // Set to true if the data should only be inserted into attached views bool no_destination = false; @@ -39,10 +40,8 @@ public: if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); } if (select) { res->select = select->clone(); res->children.push_back(res->select); } - if (table_function) - { - res->table_function = table_function->clone(); res->children.push_back(res->table_function); - } + if (table_function) { res->table_function = table_function->clone(); res->children.push_back(res->table_function); } + if (settings_ast) { res->settings_ast = settings_ast->clone(); res->children.push_back(res->settings_ast); } return res; } diff --git a/dbms/src/Parsers/ParserInsertQuery.cpp b/dbms/src/Parsers/ParserInsertQuery.cpp index 017c4ad67ab..e86535e8094 100644 --- a/dbms/src/Parsers/ParserInsertQuery.cpp +++ b/dbms/src/Parsers/ParserInsertQuery.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include @@ -27,6 +28,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserToken s_dot(TokenType::Dot); ParserKeyword s_values("VALUES"); ParserKeyword s_format("FORMAT"); + ParserKeyword s_settings("SETTINGS"); ParserKeyword s_select("SELECT"); ParserKeyword s_with("WITH"); ParserToken s_lparen(TokenType::OpeningRoundBracket); @@ -41,6 +43,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ASTPtr format; ASTPtr select; ASTPtr table_function; + ASTPtr settings_ast; /// Insertion data const char * data = nullptr; @@ -86,12 +89,32 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) } else if (s_format.ignore(pos, expected)) { - auto name_pos = pos; - if (!name_p.parse(pos, format, expected)) return false; + } + else if (s_select.ignore(pos, expected) || s_with.ignore(pos,expected)) + { + pos = before_select; + ParserSelectWithUnionQuery select_p; + select_p.parse(pos, select, expected); + } + else + { + return false; + } - data = name_pos->end; + if (s_settings.ignore(pos, expected)) + { + ParserSetQuery parser_settings(true); + if (!parser_settings.parse(pos, settings_ast, expected)) + return false; + } + + if (format) + { + Pos last_token = pos; + --last_token; + data = last_token->end; if (data < end && *data == ';') throw Exception("You have excessive ';' symbol before data for INSERT.\n" @@ -114,16 +137,6 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) if (data < end && *data == '\n') ++data; } - else if (s_select.ignore(pos, expected) || s_with.ignore(pos,expected)) - { - pos = before_select; - ParserSelectWithUnionQuery select_p; - select_p.parse(pos, select, expected); - } - else - { - return false; - } auto query = std::make_shared(); node = query; @@ -142,6 +155,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) query->columns = columns; query->select = select; + query->settings_ast = settings_ast; query->data = data != end ? data : nullptr; query->end = end; @@ -149,6 +163,8 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) query->children.push_back(columns); if (select) query->children.push_back(select); + if (settings_ast) + query->children.push_back(settings_ast); return true; } diff --git a/dbms/tests/queries/0_stateless/00825_protobuf_format_input.insh b/dbms/tests/queries/0_stateless/00825_protobuf_format_input.insh new file mode 100644 index 00000000000..52d94a6272a --- /dev/null +++ b/dbms/tests/queries/0_stateless/00825_protobuf_format_input.insh @@ -0,0 +1,4 @@ +echo -ne '\xe0\x01\x0a\x24\x61\x37\x35\x32\x32\x31\x35\x38\x2d\x33\x64\x34\x31\x2d\x34\x62\x37\x37\x2d\x61\x64\x36\x39\x2d\x36\x63\x35\x39\x38\x65\x65\x35\x35\x63\x34\x39\x12\x04\x49\x76\x61\x6e\x1a\x06\x50\x65\x74\x72\x6f\x76\x20\x01\x28\xaf\x1f\x32\x03\x70\x6e\x67\x3a\x0c\x2b\x37\x34\x39\x35\x31\x32\x33\x34\x35\x36\x37\x40\x01\x4d\xfc\xd0\x30\x5c\x50\x26\x58\x09\x62\x09\x59\x65\x73\x74\x65\x72\x64\x61\x79\x62\x07\x46\x6c\x6f\x77\x65\x72\x73\x6a\x04\xff\x01\x00\x00\x72\x06\x4d\x6f\x73\x63\x6f\x77\x7a\x08\x4b\x03\x5f\x42\x72\x7d\x16\x42\x81\x01\x1f\x85\xeb\x51\xb8\x1e\x09\x40\x89\x01\x33\x33\x33\x33\x33\xc3\x6a\x40\x95\x01\xcd\xcc\xcc\x3d\x9d\x01\x9a\x99\xb9\x40\xa0\x01\x80\xc4\xd7\x8d\x7f\xaa\x01\x0c\x0a\x05\x6d\x65\x74\x65\x72\x15\x00\x00\x80\x3f\xaa\x01\x11\x0a\x0a\x63\x65\x6e\x74\x69\x6d\x65\x74\x65\x72\x15\x0a\xd7\x23\x3c\xaa\x01\x10\x0a\x09\x6b\x69\x6c\x6f\x6d\x65\x74\x65\x72\x15\x00\x00\x7a\x44\x7e\x0a\x24\x63\x36\x39\x34\x61\x64\x38\x61\x2d\x66\x37\x31\x34\x2d\x34\x65\x61\x33\x2d\x39\x30\x37\x64\x2d\x66\x64\x35\x34\x66\x62\x32\x35\x64\x39\x62\x35\x12\x07\x4e\x61\x74\x61\x6c\x69\x61\x1a\x08\x53\x6f\x6b\x6f\x6c\x6f\x76\x61\x28\xa6\x3f\x32\x03\x6a\x70\x67\x50\x1a\x58\x0b\x6a\x04\x64\xc8\x01\x32\x72\x08\x50\x6c\x79\x6d\x6f\x75\x74\x68\x7a\x08\x6a\x9d\x49\x42\x46\x8c\x84\xc0\x81\x01\x6e\x86\x1b\xf0\xf9\x21\x09\x40\x95\x01\x42\x60\xe5\x3b\x9d\x01\xcd\xcc\xac\x40\xa0\x01\xff\xff\xa9\xce\x93\x8c\x09\xb3\x01\x0a\x24\x61\x37\x64\x61\x31\x61\x61\x36\x2d\x66\x34\x32\x35\x2d\x34\x37\x38\x39\x2d\x38\x39\x34\x37\x2d\x62\x30\x33\x34\x37\x38\x36\x65\x64\x33\x37\x34\x12\x06\x56\x61\x73\x69\x6c\x79\x1a\x07\x53\x69\x64\x6f\x72\x6f\x76\x20\x01\x28\xfb\x48\x32\x03\x62\x6d\x70\x3a\x0d\x2b\x34\x34\x32\x30\x31\x32\x33\x34\x35\x36\x37\x38\x40\x01\x4d\x50\xe0\x27\x5c\x50\x17\x58\x04\x62\x05\x53\x75\x6e\x6e\x79\x6a\x05\xfa\x01\xf4\x01\x0a\x72\x08\x4d\x75\x72\x6d\x61\x6e\x73\x6b\x7a\x08\xfd\xf0\x89\x42\xc8\x4c\x04\x42\x81\x01\x11\x2d\x44\x54\xfb\x21\x09\x40\x89\x01\x00\x00\x00\xe8\x76\x48\x37\x42\x95\x01\x00\x00\x48\x44\x9d\x01\xcd\xcc\x4c\xc0\xa0\x01\x80\xd4\x9f\x93\x01\xaa\x01\x0c\x0a\x05\x70\x6f\x75\x6e\x64\x15\x00\x00\x80\x41' | $CLICKHOUSE_CLIENT --query="INSERT INTO test.table FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format_with_nested:Person'" +echo -ne '\xaa\x01\x12\x05\x46\x72\x69\x64\x61\x28\x99\xe1\xf3\xd1\x0b\x52\x08\x45\x72\x6d\x61\x6b\x6f\x76\x61\x72\x0c\x00\x00\xdc\x42\x00\x00\x52\x43\x00\x00\x94\x42\x79\x48\xce\x3d\x51\x00\x00\x00\x00\xc8\x02\x14\xc2\x05\x08\x00\x00\x80\x44\x00\x00\x80\x49\x9a\x06\x02\x4b\x42\x9a\x06\x02\x4d\x42\xa1\x06\x00\x00\x00\x00\x00\x00\xe0\x3f\xa8\x06\x2a\xa8\x06\xa8\xff\xff\xff\xff\xff\xff\xff\xff\x01\xb0\x06\x01\xbd\x06\x25\x06\x49\x40\xfa\x06\x02\x34\x30\x90\x08\xe2\x08\xe1\x08\x89\xe6\x6e\xdd\x01\x00\x00\x00\xb0\x09\xc3\x19\xd0\x0c\xb7\x02\xe2\x12\x24\x32\x30\x66\x63\x64\x39\x35\x61\x2d\x33\x33\x32\x64\x2d\x34\x31\x64\x62\x2d\x61\x39\x65\x63\x2d\x31\x36\x31\x66\x36\x34\x34\x64\x30\x35\x39\x63\xb0\x01\x08\x01\x12\x06\x49\x73\x6f\x6c\x64\x65\x52\x07\x4c\x61\x76\x72\x6f\x76\x61\x72\x0c\x00\x00\x7f\x43\x00\x00\x00\x00\x00\x00\x7f\x43\xaa\x01\x03\x61\x62\x63\xc8\x02\x32\xc2\x05\x08\x00\x00\x00\x41\x00\x00\x80\x3f\x9a\x06\x04\x42\x79\x74\x65\x9a\x06\x03\x42\x69\x74\xa1\x06\x00\x00\x00\x00\x00\x00\x12\x40\xa8\x06\x1a\xa8\x06\xb0\xff\xff\xff\xff\xff\xff\xff\xff\x01\xb0\x06\x01\xbd\x06\xf9\x0f\x49\x40\xc2\x06\x01\x2c\xfa\x06\x02\x33\x32\x90\x08\x78\xe1\x08\x39\x4e\x2b\xfe\xe4\xf5\xff\xff\xb0\x09\xe8\x30\xd8\x12\x01\xe2\x12\x24\x37\x63\x66\x61\x36\x38\x35\x36\x2d\x61\x35\x34\x61\x2d\x34\x37\x38\x36\x2d\x62\x38\x65\x35\x2d\x37\x34\x35\x31\x35\x39\x64\x35\x32\x32\x37\x38\xc2\x3e\x05\x15\x00\x00\xb6\x42' | $CLICKHOUSE_CLIENT --query="INSERT INTO test.table FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format_with_nested:AltPerson'" +echo -ne '\x9a\x02\x0a\x24\x61\x61\x30\x65\x35\x61\x30\x36\x2d\x63\x61\x62\x32\x2d\x34\x30\x33\x34\x2d\x61\x36\x61\x32\x2d\x34\x38\x65\x38\x32\x62\x39\x31\x36\x36\x34\x65\x12\x06\x4c\x65\x6f\x6e\x69\x64\x1a\x08\x4b\x69\x72\x69\x6c\x6c\x6f\x76\x22\x04\x6d\x61\x6c\x65\x2a\x0a\x31\x39\x38\x33\x2d\x30\x36\x2d\x32\x34\x3a\x0c\x2b\x37\x34\x39\x35\x30\x32\x37\x35\x38\x36\x34\x42\x01\x31\x4a\x13\x32\x30\x31\x39\x2d\x30\x32\x2d\x30\x34\x20\x30\x39\x3a\x34\x35\x3a\x30\x30\x52\x02\x33\x35\x5a\x06\x63\x61\x6e\x63\x65\x72\x62\x07\x37\x20\x72\x69\x6e\x67\x73\x62\x08\x45\x61\x73\x74\x73\x69\x64\x65\x62\x0b\x4c\x61\x73\x74\x20\x48\x75\x72\x72\x61\x68\x6a\x01\x30\x6a\x01\x30\x6a\x03\x32\x35\x35\x72\x09\x53\x61\x6e\x20\x44\x69\x65\x67\x6f\x7a\x09\x33\x32\x2e\x38\x32\x33\x39\x34\x33\x7a\x0b\x2d\x31\x31\x37\x2e\x30\x38\x31\x33\x32\x37\x82\x01\x09\x33\x2e\x31\x34\x31\x35\x39\x32\x37\x8a\x01\x08\x31\x35\x30\x30\x30\x30\x30\x30\x92\x01\x06\x31\x38\x36\x2e\x37\x35\x9a\x01\x04\x2d\x32\x2e\x31\xa2\x01\x0b\x32\x30\x36\x35\x39\x38\x32\x39\x33\x33\x31\xaa\x01\x18\x0a\x06\x6d\x69\x6e\x75\x74\x65\x0a\x04\x68\x6f\x75\x72\x12\x02\x36\x30\x12\x04\x33\x36\x30\x30' | $CLICKHOUSE_CLIENT --query="INSERT INTO test.table FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format_with_nested:StrPerson'" +echo -ne '\xcf\x01\x0a\x24\x33\x66\x61\x65\x65\x30\x36\x34\x2d\x63\x34\x66\x37\x2d\x34\x64\x33\x34\x2d\x62\x36\x66\x33\x2d\x38\x64\x38\x31\x63\x32\x62\x36\x61\x31\x35\x64\x12\x04\x4e\x69\x63\x6b\x1a\x0a\x4b\x6f\x6c\x65\x73\x6e\x69\x6b\x6f\x76\x20\x01\x28\xda\x52\x32\x03\x62\x6d\x70\x3a\x0c\x34\x31\x32\x2d\x36\x38\x37\x2d\x35\x30\x30\x37\x40\x01\x4d\x2f\x27\xf2\x5b\x50\x14\x58\x09\x62\x06\x48\x61\x76\x61\x6e\x61\x68\x80\x01\x68\x00\x68\x80\x01\x72\x0a\x50\x69\x74\x74\x73\x62\x75\x72\x67\x68\x7a\x08\x9b\x11\x22\x42\x1f\xe6\x9f\xc2\x81\x01\x28\x2d\x44\x54\xfb\x21\x09\x40\x89\x01\x00\x00\x00\xe8\x76\x48\x27\x42\x95\x01\x00\x00\x43\x44\x9d\x01\x66\x66\x92\x41\xa0\x01\xce\xdf\xb8\xba\x01\xab\x01\x0d\xcd\xcc\xe2\x41\x0d\xcd\xcc\x4c\x3e\x0d\x00\x00\x80\x3f\x12\x05\x6f\x75\x6e\x63\x65\x12\x05\x63\x61\x72\x61\x74\x12\x04\x67\x72\x61\x6d\xac\x01' | $CLICKHOUSE_CLIENT --query="INSERT INTO test.table FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format_syntax2_with_nested:Syntax2Person'" diff --git a/dbms/tests/queries/0_stateless/00825_protobuf_format_input.reference b/dbms/tests/queries/0_stateless/00825_protobuf_format_input.reference new file mode 100644 index 00000000000..8c1f096c24e --- /dev/null +++ b/dbms/tests/queries/0_stateless/00825_protobuf_format_input.reference @@ -0,0 +1,7 @@ +a7da1aa6-f425-4789-8947-b034786ed374 Vasily Sidorov male 1995-07-28 bmp +442012345678 1 2018-12-30 00:00:00 23 leo ['Sunny'] [250,244,10] Murmansk [68.970680,33.074982] 3.14159265358979 100000000000.00 800 -3.2 154400000 ['pound'] [16] +c694ad8a-f714-4ea3-907d-fd54fb25d9b5 Natalia Sokolova female 1992-03-08 jpg \N 0 \N 26 pisces [] [100,200,50] Plymouth [50.403724,-4.142123] 3.14159 \N 0.007 5.4 -20000000000000 [] [] +aa0e5a06-cab2-4034-a6a2-48e82b91664e Leonid Kirillov male 1983-06-24 \N +74950275864\0 1 2019-02-04 09:45:00 35 cancer ['7 rings','Eastside','Last Hurrah'] [0,0,255] San Diego [32.823943,-117.081327] 3.1415927 15000000.00 186.75 -2.1 20659829331 ['minute','hour'] [60,3600] +20fcd95a-332d-41db-a9ec-161f644d059c Frida Ermakova female 1978-12-12 \N 3124555929\0\0\0 0 2013-03-11 16:30:00 40 sagittarius [] [110,210,74] [42.000000,-88.000000] 3.1410000324249268 311.00 0.5 10.0 8010000009 ['KB','MB'] [1024,1048576] +a7522158-3d41-4b77-ad69-6c598ee55c49 Ivan Petrov male 1980-12-29 png +74951234567\0 1 2019-01-05 18:45:00 38 capricorn ['Yesterday','Flowers'] [255,0,0] Moscow [55.753216,37.622504] 3.14 214.10 0.1 5.8 17060000000 ['meter','centimeter','kilometer'] [1,0.01,1000] +3faee064-c4f7-4d34-b6f3-8d81c2b6a15d Nick Kolesnikov male 1998-12-26 bmp 412-687-5007\0 1 2018-11-19 05:59:59 20 capricorn ['Havana'] [128,0,128] Pittsburgh [40.517192,-79.949456] 3.1415926535898 50000000000.00 780 18.3 195500007 ['ounce','carat','gram'] [28.35,0.2,1] +7cfa6856-a54a-4786-b8e5-745159d52278 Isolde Lavrova female 1987-02-09 \N \N 1 \N 32 aquarius [] [255,0,255] [26.000000,-80.000000] 3.1415998935699463 \N 4.5 25.0 -11111111111111 ['Byte','Bit'] [8,1] diff --git a/dbms/tests/queries/0_stateless/00825_protobuf_format_input.sh b/dbms/tests/queries/0_stateless/00825_protobuf_format_input.sh new file mode 100755 index 00000000000..f277f1d1ebf --- /dev/null +++ b/dbms/tests/queries/0_stateless/00825_protobuf_format_input.sh @@ -0,0 +1,44 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +set -e -o pipefail + +# Run the client. +$CLICKHOUSE_CLIENT --multiquery < dbms/tests/queries/0_stateless/00825_protobuf_format_output.reference +# build/utils/test-data-generator/ProtobufDelimitedMessagesSerializer CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) . $CURDIR/../shell_config.sh diff --git a/dbms/tests/queries/0_stateless/00825_protobuf_format_syntax2_with_nested.proto b/dbms/tests/queries/0_stateless/00825_protobuf_format_syntax2_with_nested.proto new file mode 100644 index 00000000000..76bafb3ddc2 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00825_protobuf_format_syntax2_with_nested.proto @@ -0,0 +1,51 @@ +syntax = "proto2"; + +message Syntax2Person { + enum Gender { + female = 0; + male = 1; + }; + + enum ZodiacSign { + aries = 0; + taurus = 1; + gemini = 2; + cancer = 3; + leo = 4; + virgo = 5; + libra = 6; + scorpius = 7; + sagittarius = 8; + capricorn = 9; + aquarius = 10; + pisces = 11; + }; + + required string uuid = 1; + required string name = 2; + required string surname = 3; + required Gender gender = 4; + required uint32 birthDate = 5; + optional bytes photo = 6; + optional string phoneNumber = 7; + optional bool isOnline = 8; + optional fixed32 visitTime = 9; + optional uint32 age = 10; + optional ZodiacSign zodiacSign = 11; + repeated string songs = 12; + repeated uint32 color = 13; + optional string hometown = 14 [default='Moscow']; + repeated float location = 15 [packed=true]; + optional double pi = 16; + optional double lotteryWin = 17; + optional float someRatio = 18; + optional float temperature = 19; + optional sint64 randomBigNumber = 20; + optional group MeasureUnits = 21 { + repeated float coef = 1; + repeated string unit = 2; + }; + optional string newFieldStr = 22 [default='abc']; + optional int32 newFieldInt = 23 [default=-11]; + optional bool newBool = 24 [default=true]; +}; diff --git a/dbms/tests/queries/0_stateless/00825_protobuf_format_with_nested.proto b/dbms/tests/queries/0_stateless/00825_protobuf_format_with_nested.proto new file mode 100644 index 00000000000..7574ffa0d10 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00825_protobuf_format_with_nested.proto @@ -0,0 +1,118 @@ +syntax = "proto3"; + +enum Gender { + female = 0; + male = 1; +}; + +enum ZodiacSign { + aries = 0; + taurus = 1; + gemini = 2; + cancer = 3; + leo = 4; + virgo = 5; + libra = 6; + scorpius = 7; + sagittarius = 8; + capricorn = 9; + aquarius = 10; + pisces = 11; +}; + +message Person { + message MeasureUnit + { + string unit = 1; + float coef = 2; + }; + string uuid = 1; + string name = 2; + string surname = 3; + Gender gender = 4; + uint32 birthDate = 5; + bytes photo = 6; + string phoneNumber = 7; + bool isOnline = 8; + fixed32 visitTime = 9; + uint32 age = 10; + ZodiacSign zodiacSign = 11; + repeated string songs = 12; + repeated uint32 color = 13; + string hometown = 14; + repeated float location = 15; + double pi = 16; + double lotteryWin = 17; + float someRatio = 18; + float temperature = 19; + sint64 randomBigNumber = 20; + repeated MeasureUnit measureUnits = 21; +}; + +enum OnlineStatus { + offline = 0; + online = 1; +}; + +message AltPerson { + enum Gender { + male = 0; + female = 1; + }; + message Dummy { + message Empty {}; + repeated Empty empty = 1; + float z = 2; + }; + repeated int32 location = 101 [packed=false]; + float pi = 103; + bytes uuid = 300; + bool newFieldBool = 299; + string name = 2; + Gender gender = 102; + int32 zodiacSign = 130; + int64 birthDate = 150; + bytes age = 111; + OnlineStatus isOnline = 1; + double someRatio = 100; + fixed64 visitTime = 15; + Dummy newMessage = 1000; + sfixed64 randomBigNumber = 140; + repeated int32 newFieldInt = 104; + repeated float color = 14; + uint64 lotteryWin = 202; + bytes surname = 10; + uint64 phoneNumber = 5; + sint32 temperature = 41; + string newFieldStr = 21; + repeated string measureUnits_unit = 99; + repeated float measureUnits_coef = 88; +}; + +message StrPerson { + message MeasureUnits + { + repeated string unit = 1; + repeated string coef = 2; + }; + string uuid = 1; + string name = 2; + string surname = 3; + string gender = 4; + string birthDate = 5; + string phoneNumber = 7; + string isOnline = 8; + string visitTime = 9; + string age = 10; + string zodiacSign = 11; + repeated string songs = 12; + repeated string color = 13; + string hometown = 14; + repeated string location = 15; + string pi = 16; + string lotteryWin = 17; + string someRatio = 18; + string temperature = 19; + string randomBigNumber = 20; + MeasureUnits measureUnits = 21; +}; diff --git a/utils/test-data-generator/CMakeLists.txt b/utils/test-data-generator/CMakeLists.txt index e9f2607c255..bc21287249f 100644 --- a/utils/test-data-generator/CMakeLists.txt +++ b/utils/test-data-generator/CMakeLists.txt @@ -5,9 +5,11 @@ add_executable (markov-model markov-model.cpp) target_link_libraries(markov-model PRIVATE clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY}) if(USE_PROTOBUF) - protobuf_generate_cpp(ProtobufDelimitedMessagesSerializer_Srcs ProtobufDelimitedMessagesSerializer_Hdrs ${CMAKE_CURRENT_SOURCE_DIR}/../../dbms/tests/queries/0_stateless/00825_protobuf_format.proto) - protobuf_generate_cpp(ProtobufDelimitedMessagesSerializer_Srcs2 ProtobufDelimitedMessagesSerializer_Hdrs2 ${CMAKE_CURRENT_SOURCE_DIR}/../../dbms/tests/queries/0_stateless/00825_protobuf_format_syntax2.proto) + protobuf_generate_cpp(ProtobufDelimitedMessagesSerializer_Srcs ProtobufDelimitedMessagesSerializer_Hdrs ${CMAKE_CURRENT_SOURCE_DIR}/../../dbms/tests/queries/0_stateless/00825_protobuf_format_with_nested.proto) + protobuf_generate_cpp(ProtobufDelimitedMessagesSerializer_Srcs2 ProtobufDelimitedMessagesSerializer_Hdrs2 ${CMAKE_CURRENT_SOURCE_DIR}/../../dbms/tests/queries/0_stateless/00825_protobuf_format_syntax2_with_nested.proto) add_executable (ProtobufDelimitedMessagesSerializer ProtobufDelimitedMessagesSerializer.cpp ${ProtobufDelimitedMessagesSerializer_Srcs} ${ProtobufDelimitedMessagesSerializer_Hdrs} ${ProtobufDelimitedMessagesSerializer_Srcs2} ${ProtobufDelimitedMessagesSerializer_Hdrs2}) target_include_directories (ProtobufDelimitedMessagesSerializer SYSTEM BEFORE PRIVATE ${Protobuf_INCLUDE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) - target_link_libraries (ProtobufDelimitedMessagesSerializer PRIVATE ${Protobuf_LIBRARY}) + target_link_libraries (ProtobufDelimitedMessagesSerializer PRIVATE ${Protobuf_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY}) + get_filename_component(ProtobufDelimitedMessagesSerializer_OutputDir "${CMAKE_CURRENT_LIST_DIR}/../../dbms/tests/queries/0_stateless" REALPATH) + target_compile_definitions(ProtobufDelimitedMessagesSerializer PRIVATE OUTPUT_DIR="${ProtobufDelimitedMessagesSerializer_OutputDir}") endif() diff --git a/utils/test-data-generator/ProtobufDelimitedMessagesSerializer.cpp b/utils/test-data-generator/ProtobufDelimitedMessagesSerializer.cpp index 39877956686..ebc5ef15e14 100644 --- a/utils/test-data-generator/ProtobufDelimitedMessagesSerializer.cpp +++ b/utils/test-data-generator/ProtobufDelimitedMessagesSerializer.cpp @@ -1,15 +1,33 @@ -// Reference file generator for the test dbms/tests/queries/0_stateless/00825_protobuf_format_output.sh +// Generator of protobuf delimited messages used in the protobuf IO tests +// dbms/tests/queries/0_stateless/00825_protobuf_format* +#include +#include #include #include -#include "00825_protobuf_format.pb.h" -#include "00825_protobuf_format_syntax2.pb.h" +#include "00825_protobuf_format_with_nested.pb.h" +#include "00825_protobuf_format_syntax2_with_nested.pb.h" -int main(int, char **) +void writeInsertQueryCommand(std::ostream & out, const std::string & format_schema, std::stringstream & delimited_messages) { - std::ostream* out = &std::cout; + out << "echo -ne '"; + std::string bytes = delimited_messages.str(); + delimited_messages.str(""); + for (const char c : bytes) + { + char buf[5]; + sprintf(buf, "\\x%02x", static_cast(c)); + out << buf; + } + out << "' | $CLICKHOUSE_CLIENT --query=\"INSERT INTO test.table FORMAT Protobuf" + " SETTINGS format_schema = '$CURDIR/" + << format_schema << "'\"" << std::endl; +} +void writeInputInsertQueries(std::ostream & out) +{ + std::stringstream ss; { Person person; person.set_uuid("a7522158-3d41-4b77-ad69-6c598ee55c49"); @@ -18,7 +36,7 @@ int main(int, char **) person.set_gender(Gender::male); person.set_birthdate(4015); // 1980-12-29 person.set_photo("png"); - person.set_phonenumber(std::string("+74951234567\0", 13)); // Converted from FixedString(13) + person.set_phonenumber("+74951234567"); person.set_isonline(true); person.set_visittime(1546703100); // 2019-01-05 18:45:00 person.set_age(38); @@ -36,7 +54,16 @@ int main(int, char **) person.set_someratio(0.1); person.set_temperature(5.8); person.set_randombignumber(17060000000); - google::protobuf::util::SerializeDelimitedToOstream(person, out); + auto* mu = person.add_measureunits(); + mu->set_unit("meter"); + mu->set_coef(1); + mu = person.add_measureunits(); + mu->set_unit("centimeter"); + mu->set_coef(0.01); + mu = person.add_measureunits(); + mu->set_unit("kilometer"); + mu->set_coef(1000); + google::protobuf::util::SerializeDelimitedToOstream(person, &ss); } { @@ -60,7 +87,7 @@ int main(int, char **) person.set_someratio(0.007); person.set_temperature(5.4); person.set_randombignumber(-20000000000000); - google::protobuf::util::SerializeDelimitedToOstream(person, out); + google::protobuf::util::SerializeDelimitedToOstream(person, &ss); } { @@ -88,10 +115,232 @@ int main(int, char **) person.set_someratio(800); person.set_temperature(-3.2); person.set_randombignumber(154400000); - google::protobuf::util::SerializeDelimitedToOstream(person, out); + auto* mu = person.add_measureunits(); + mu->set_unit("pound"); + mu->set_coef(16); + google::protobuf::util::SerializeDelimitedToOstream(person, &ss); } - *out << "ALTERNATIVE->" << std::endl; + writeInsertQueryCommand(out, "00825_protobuf_format_with_nested:Person", ss); + + { + AltPerson person; + person.add_location(42); + person.add_location(-88); + person.set_pi(3.141); + person.set_uuid("20fcd95a-332d-41db-a9ec-161f644d059c"); + person.set_name("Frida"); + person.set_gender(AltPerson::female); + person.set_zodiacsign(1122); // sagittarius + person.set_birthdate(3267); // 1978-12-12 + person.set_age("40"); + person.set_isonline(OnlineStatus::offline); + person.set_someratio(0.5); + person.set_visittime(1363005000); // 2013-03-11 16:30:00 + person.set_randombignumber(8010000009); + person.add_color(110); + person.add_color(210); + person.add_color(74); + person.set_lotterywin(311); + person.set_surname("Ermakova"); + person.set_phonenumber(3124555929); + person.set_temperature(10); + person.add_measureunits_unit("KB"); + person.add_measureunits_coef(1024); + person.add_measureunits_unit("MB"); + person.add_measureunits_coef(1048576); + google::protobuf::util::SerializeDelimitedToOstream(person, &ss); + } + + { + AltPerson person; + person.add_location(26); + person.add_location(-80); + person.set_pi(3.1416); + person.set_uuid("7cfa6856-a54a-4786-b8e5-745159d52278"); + person.set_name("Isolde"); + person.set_gender(AltPerson::female); + person.set_zodiacsign(120); // aquarius + person.set_birthdate(6248); // 1987-02-09 + person.set_age("32"); + person.set_isonline(OnlineStatus::online); + person.set_someratio(4.5); + person.set_randombignumber(-11111111111111); + person.add_color(255); + person.add_color(0); + person.add_color(255); + person.set_surname("Lavrova"); + person.set_temperature(25); + person.set_newfieldstr("abc"); + person.set_newfieldbool(true); + person.add_newfieldint(44); + person.add_measureunits_unit("Byte"); + person.add_measureunits_coef(8); + person.add_measureunits_unit("Bit"); + person.add_measureunits_coef(1); + person.mutable_newmessage()->set_z(91); + google::protobuf::util::SerializeDelimitedToOstream(person, &ss); + } + + writeInsertQueryCommand(out, "00825_protobuf_format_with_nested:AltPerson", ss); + + { + StrPerson person; + person.set_uuid("aa0e5a06-cab2-4034-a6a2-48e82b91664e"); + person.set_name("Leonid"); + person.set_surname("Kirillov"); + person.set_gender("male"); + person.set_birthdate("1983-06-24"); + person.set_phonenumber("+74950275864"); + person.set_isonline("1"); + person.set_visittime("2019-02-04 09:45:00"); + person.set_age("35"); + person.set_zodiacsign("cancer"); + person.add_songs("7 rings"); + person.add_songs("Eastside"); + person.add_songs("Last Hurrah"); + person.add_color("0"); + person.add_color("0"); + person.add_color("255"); + person.set_hometown("San Diego"); + person.add_location("32.823943"); + person.add_location("-117.081327"); + person.set_pi("3.1415927"); + person.set_lotterywin("15000000"); + person.set_someratio("186.75"); + person.set_temperature("-2.1"); + person.set_randombignumber("20659829331"); + person.mutable_measureunits()->add_unit("minute"); + person.mutable_measureunits()->add_coef("60"); + person.mutable_measureunits()->add_unit("hour"); + person.mutable_measureunits()->add_coef("3600"); + google::protobuf::util::SerializeDelimitedToOstream(person, &ss); + } + + writeInsertQueryCommand(out, "00825_protobuf_format_with_nested:StrPerson", ss); + + { + Syntax2Person person; + person.set_uuid("3faee064-c4f7-4d34-b6f3-8d81c2b6a15d"); + person.set_name("Nick"); + person.set_surname("Kolesnikov"); + person.set_gender(Syntax2Person::male); + person.set_birthdate(10586); // 1998-12-26 + person.set_photo("bmp"); + person.set_phonenumber("412-687-5007"); + person.set_isonline(true); + person.set_visittime(1542596399); // 2018-11-19 05:59:59 + person.set_age(20); + person.set_zodiacsign(Syntax2Person::capricorn); + person.add_songs("Havana"); + person.add_color(128); + person.add_color(0); + person.add_color(128); + person.set_hometown("Pittsburgh"); + person.add_location(40.517193); + person.add_location(-79.949452); + person.set_pi(3.1415926535898); + person.set_lotterywin(50000000000); + person.set_someratio(780); + person.set_temperature(18.3); + person.set_randombignumber(195500007); + person.mutable_measureunits()->add_unit("ounce"); + person.mutable_measureunits()->add_coef(28.35); + person.mutable_measureunits()->add_unit("carat"); + person.mutable_measureunits()->add_coef(0.2); + person.mutable_measureunits()->add_unit("gram"); + person.mutable_measureunits()->add_coef(1); + google::protobuf::util::SerializeDelimitedToOstream(person, &ss); + } + + writeInsertQueryCommand(out, "00825_protobuf_format_syntax2_with_nested:Syntax2Person", ss); +} + + +void writeOutputReference(std::ostream & out) +{ + { + Person person; + person.set_uuid("a7522158-3d41-4b77-ad69-6c598ee55c49"); + person.set_name("Ivan"); + person.set_surname("Petrov"); + person.set_gender(Gender::male); + person.set_birthdate(4015); // 1980-12-29 + person.set_photo("png"); + person.set_phonenumber(std::string("+74951234567\0", 13)); // Converted from FixedString(13) + person.set_isonline(true); + person.set_visittime(1546703100); // 2019-01-05 18:45:00 + person.set_age(38); + person.set_zodiacsign(ZodiacSign::capricorn); + person.add_songs("Yesterday"); + person.add_songs("Flowers"); + person.add_color(255); + person.add_color(0); + person.add_color(0); + person.set_hometown("Moscow"); + person.add_location(55.753215); + person.add_location(37.622504); + person.set_pi(3.14); + person.set_lotterywin(214.10); + person.set_someratio(0.1); + person.set_temperature(5.8); + person.set_randombignumber(17060000000); + google::protobuf::util::SerializeDelimitedToOstream(person, &out); + } + + { + Person person; + person.set_uuid("c694ad8a-f714-4ea3-907d-fd54fb25d9b5"); + person.set_name("Natalia"); + person.set_surname("Sokolova"); + person.set_gender(Gender::female); + person.set_birthdate(8102); // 1992-03-08 + person.set_photo("jpg"); + person.set_isonline(false); + person.set_age(26); + person.set_zodiacsign(ZodiacSign::pisces); + person.add_color(100); + person.add_color(200); + person.add_color(50); + person.set_hometown("Plymouth"); + person.add_location(50.403724); + person.add_location(-4.142123); + person.set_pi(3.14159); + person.set_someratio(0.007); + person.set_temperature(5.4); + person.set_randombignumber(-20000000000000); + google::protobuf::util::SerializeDelimitedToOstream(person, &out); + } + + { + Person person; + person.set_uuid("a7da1aa6-f425-4789-8947-b034786ed374"); + person.set_name("Vasily"); + person.set_surname("Sidorov"); + person.set_gender(Gender::male); + person.set_birthdate(9339); // 1995-07-28 + person.set_photo("bmp"); + person.set_phonenumber("+442012345678"); + person.set_isonline(true); + person.set_visittime(1546117200); // 2018-12-30 00:00:00 + person.set_age(23); + person.set_zodiacsign(ZodiacSign::leo); + person.add_songs("Sunny"); + person.add_color(250); + person.add_color(244); + person.add_color(10); + person.set_hometown("Murmansk"); + person.add_location(68.970682); + person.add_location(33.074981); + person.set_pi(3.14159265358979); + person.set_lotterywin(100000000000); + person.set_someratio(800); + person.set_temperature(-3.2); + person.set_randombignumber(154400000); + google::protobuf::util::SerializeDelimitedToOstream(person, &out); + } + + out << "ALTERNATIVE->" << std::endl; { AltPerson person; @@ -115,7 +364,7 @@ int main(int, char **) person.set_surname("Petrov"); person.set_phonenumber(+74951234567); person.set_temperature(5); - google::protobuf::util::SerializeDelimitedToOstream(person, out); + google::protobuf::util::SerializeDelimitedToOstream(person, &out); } { @@ -137,7 +386,7 @@ int main(int, char **) person.add_color(50); person.set_surname("Sokolova"); person.set_temperature(5); - google::protobuf::util::SerializeDelimitedToOstream(person, out); + google::protobuf::util::SerializeDelimitedToOstream(person, &out); } { @@ -162,10 +411,10 @@ int main(int, char **) person.set_surname("Sidorov"); person.set_phonenumber(+442012345678); person.set_temperature(-3); - google::protobuf::util::SerializeDelimitedToOstream(person, out); + google::protobuf::util::SerializeDelimitedToOstream(person, &out); } - *out << "STRINGS->" << std::endl; + out << "STRINGS->" << std::endl; { StrPerson person; @@ -192,7 +441,7 @@ int main(int, char **) person.set_someratio("0.1"); person.set_temperature("5.8"); person.set_randombignumber("17060000000"); - google::protobuf::util::SerializeDelimitedToOstream(person, out); + google::protobuf::util::SerializeDelimitedToOstream(person, &out); } { @@ -215,7 +464,7 @@ int main(int, char **) person.set_someratio("0.007"); person.set_temperature("5.4"); person.set_randombignumber("-20000000000000"); - google::protobuf::util::SerializeDelimitedToOstream(person, out); + google::protobuf::util::SerializeDelimitedToOstream(person, &out); } { @@ -242,10 +491,10 @@ int main(int, char **) person.set_someratio("800"); person.set_temperature("-3.2"); person.set_randombignumber("154400000"); - google::protobuf::util::SerializeDelimitedToOstream(person, out); + google::protobuf::util::SerializeDelimitedToOstream(person, &out); } - *out << "SYNTAX2->" << std::endl; + out << "SYNTAX2->" << std::endl; { Syntax2Person person; @@ -273,7 +522,7 @@ int main(int, char **) person.set_someratio(0.1); person.set_temperature(5.8); person.set_randombignumber(17060000000); - google::protobuf::util::SerializeDelimitedToOstream(person, out); + google::protobuf::util::SerializeDelimitedToOstream(person, &out); } { @@ -297,7 +546,7 @@ int main(int, char **) person.set_someratio(0.007); person.set_temperature(5.4); person.set_randombignumber(-20000000000000); - google::protobuf::util::SerializeDelimitedToOstream(person, out); + google::protobuf::util::SerializeDelimitedToOstream(person, &out); } { @@ -325,8 +574,48 @@ int main(int, char **) person.set_someratio(800); person.set_temperature(-3.2); person.set_randombignumber(154400000); - google::protobuf::util::SerializeDelimitedToOstream(person, out); + google::protobuf::util::SerializeDelimitedToOstream(person, &out); } +} + +void parseCommandLine(int argc, char ** argv, std::string & output_dir) +{ + namespace po = boost::program_options; + po::options_description desc; + output_dir = OUTPUT_DIR; + desc.add_options() + ("help,h", "Show help") + ("directory,d", po::value(&output_dir), + "Set the output directory. By default it's " OUTPUT_DIR); + po::parsed_options parsed = po::command_line_parser(argc, argv).options(desc).run(); + po::variables_map vm; + po::store(parsed, vm); + po::notify(vm); + if (!output_dir.empty()) + return; + + // Show help. + std::cout << "This utility generates delimited messages for tests checking protobuf IO support." << std::endl; + std::cout << desc; + std::cout << "Example:" << std::endl; + std::cout << argv[0] << " -g OUTPUT_REFERENCE" << std::endl; + std::exit(0); +} + +void writeFile(const std::string & filepath, void (*fn)(std::ostream &)) +{ + std::cout << "Writing '" << filepath << "' ... "; + std::fstream out(filepath, std::fstream::out | std::fstream::trunc); + fn(out); + std::cout << "done." << std::endl; +} + +int main(int argc, char ** argv) +{ + std::string output_dir; + parseCommandLine(argc, argv, output_dir); + writeFile(output_dir + "/00825_protobuf_format_input.insh", writeInputInsertQueries); + writeFile(output_dir + "/00825_protobuf_format_output.reference", writeOutputReference); return 0; }