From 28e983781991559bd28e6c753ef298040e5a05e6 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Wed, 23 Jan 2019 22:34:38 +0300 Subject: [PATCH] Implemented utility class for writing protobufs. --- dbms/src/Formats/ProtobufSimpleWriter.cpp | 412 ++++++++++++++++++++++ dbms/src/Formats/ProtobufSimpleWriter.h | 88 +++++ 2 files changed, 500 insertions(+) create mode 100644 dbms/src/Formats/ProtobufSimpleWriter.cpp create mode 100644 dbms/src/Formats/ProtobufSimpleWriter.h diff --git a/dbms/src/Formats/ProtobufSimpleWriter.cpp b/dbms/src/Formats/ProtobufSimpleWriter.cpp new file mode 100644 index 00000000000..dce58f889f1 --- /dev/null +++ b/dbms/src/Formats/ProtobufSimpleWriter.cpp @@ -0,0 +1,412 @@ +#include +#include +#include + + +namespace DB +{ +namespace +{ + void writeBytes(WriteBuffer & buf, const void * data, size_t size) { buf.write(reinterpret_cast(data), size); } + + void writeVariant(WriteBuffer & buf, UInt32 value) + { + while (value >= 0x80) + { + buf.write(static_cast(value | 0x80)); + value >>= 7; + } + buf.write(static_cast(value)); + } + + void writeVariant(WriteBuffer & buf, Int32 value) { writeVariant(buf, static_cast(value)); } + + void writeVariant(WriteBuffer & buf, UInt64 value) + { + while (value >= 0x80) + { + buf.write(static_cast(value | 0x80)); + value >>= 7; + } + buf.write(static_cast(value)); + } + + void writeVariant(WriteBuffer & buf, Int64 value) { writeVariant(buf, static_cast(value)); } + + void writeLittleEndian(WriteBuffer & buf, UInt32 value) + { + value = Poco::ByteOrder::toLittleEndian(value); + writeBytes(buf, &value, sizeof(value)); + } + + void writeLittleEndian(WriteBuffer & buf, Int32 value) { writeLittleEndian(buf, static_cast(value)); } + + void writeLittleEndian(WriteBuffer & buf, float value) + { + union + { + Float32 f; + UInt32 i; + }; + f = value; + writeLittleEndian(buf, i); + } + + void writeLittleEndian(WriteBuffer & buf, UInt64 value) + { + value = Poco::ByteOrder::toLittleEndian(value); + writeBytes(buf, &value, sizeof(value)); + } + + void writeLittleEndian(WriteBuffer & buf, Int64 value) { writeLittleEndian(buf, static_cast(value)); } + + void writeLittleEndian(WriteBuffer & buf, double value) + { + union + { + Float64 f; + UInt64 i; + }; + f = value; + writeLittleEndian(buf, i); + } + + UInt32 zigZag(Int32 value) { return (static_cast(value) << 1) ^ static_cast(value >> 31); } + UInt64 zigZag(Int64 value) { return (static_cast(value) << 1) ^ static_cast(value >> 63); } + +} + + +enum ProtobufSimpleWriter::WireType : UInt32 +{ + VARIANT = 0, + BITS64 = 1, + LENGTH_DELIMITED = 2, + BITS32 = 5 +}; + +ProtobufSimpleWriter::ProtobufSimpleWriter(WriteBuffer & out_) : out(out_) +{ +} + +ProtobufSimpleWriter::~ProtobufSimpleWriter() +{ + finishCurrentMessage(); +} + +void ProtobufSimpleWriter::newMessage() +{ + finishCurrentMessage(); + were_messages = true; +} + +void ProtobufSimpleWriter::finishCurrentMessage() +{ + if (!were_messages) + return; + finishCurrentField(); + current_field_number = 0; + StringRef str = message_buffer.stringRef(); + writeVariant(out, str.size); + writeBytes(out, str.data, str.size); + message_buffer.restart(); +} + +void ProtobufSimpleWriter::setCurrentField(UInt32 field_number) +{ + finishCurrentField(); + assert(current_field_number < field_number); + current_field_number = field_number; + num_normal_values = 0; + num_packed_values = 0; +} + +void ProtobufSimpleWriter::finishCurrentField() +{ + if (num_packed_values) + { + assert(!num_normal_values); + StringRef str = repeated_packing_buffer.stringRef(); + if (str.size) + { + writeKey(message_buffer, LENGTH_DELIMITED); + writeVariant(message_buffer, str.size); + writeBytes(message_buffer, str.data, str.size); + repeated_packing_buffer.restart(); + } + } +} + +void ProtobufSimpleWriter::writeKey(WriteBuffer & buf, WireType wire_type) +{ + writeVariant(buf, (current_field_number << 3) | wire_type); +} + +void ProtobufSimpleWriter::writeInt32(Int32 value) +{ + assert(current_field_number); + writeKey(message_buffer, VARIANT); + writeVariant(message_buffer, value); + ++num_normal_values; +} + +void ProtobufSimpleWriter::writeUInt32(UInt32 value) +{ + assert(current_field_number); + writeKey(message_buffer, VARIANT); + writeVariant(message_buffer, value); + ++num_normal_values; +} + +void ProtobufSimpleWriter::writeSInt32(Int32 value) +{ + assert(current_field_number); + writeKey(message_buffer, VARIANT); + writeVariant(message_buffer, zigZag(value)); + ++num_normal_values; +} + +void ProtobufSimpleWriter::writeInt64(Int64 value) +{ + assert(current_field_number); + writeKey(message_buffer, VARIANT); + writeVariant(message_buffer, value); + ++num_normal_values; +} + +void ProtobufSimpleWriter::writeUInt64(UInt64 value) +{ + assert(current_field_number); + writeKey(message_buffer, VARIANT); + writeVariant(message_buffer, value); + ++num_normal_values; +} + +void ProtobufSimpleWriter::writeSInt64(Int64 value) +{ + assert(current_field_number); + writeKey(message_buffer, VARIANT); + writeVariant(message_buffer, zigZag(value)); + ++num_normal_values; +} + +void ProtobufSimpleWriter::writeFixed32(UInt32 value) +{ + assert(current_field_number); + writeKey(message_buffer, BITS32); + writeLittleEndian(message_buffer, value); + ++num_normal_values; +} + +void ProtobufSimpleWriter::writeSFixed32(Int32 value) +{ + assert(current_field_number); + writeKey(message_buffer, BITS32); + writeLittleEndian(message_buffer, value); + ++num_normal_values; +} + +void ProtobufSimpleWriter::writeFloat(float value) +{ + assert(current_field_number); + writeKey(message_buffer, BITS32); + writeLittleEndian(message_buffer, value); + ++num_normal_values; +} + +void ProtobufSimpleWriter::writeFixed64(UInt64 value) +{ + assert(current_field_number); + writeKey(message_buffer, BITS64); + writeLittleEndian(message_buffer, value); + ++num_normal_values; +} + +void ProtobufSimpleWriter::writeSFixed64(Int64 value) +{ + assert(current_field_number); + writeKey(message_buffer, BITS64); + writeLittleEndian(message_buffer, value); + ++num_normal_values; +} + +void ProtobufSimpleWriter::writeDouble(double value) +{ + assert(current_field_number); + writeKey(message_buffer, BITS64); + writeLittleEndian(message_buffer, value); + ++num_normal_values; +} + +void ProtobufSimpleWriter::writeString(const StringRef & str) +{ + assert(current_field_number); + ++num_normal_values; + writeKey(message_buffer, LENGTH_DELIMITED); + writeVariant(message_buffer, str.size); + writeBytes(message_buffer, str.data, str.size); +} + +void ProtobufSimpleWriter::writeInt32IfNonZero(Int32 value) +{ + if (value) + writeInt32(value); +} + +void ProtobufSimpleWriter::writeUInt32IfNonZero(UInt32 value) +{ + if (value) + writeUInt32(value); +} + +void ProtobufSimpleWriter::writeSInt32IfNonZero(Int32 value) +{ + if (value) + writeSInt32(value); +} + +void ProtobufSimpleWriter::writeInt64IfNonZero(Int64 value) +{ + if (value) + writeInt64(value); +} + +void ProtobufSimpleWriter::writeUInt64IfNonZero(UInt64 value) +{ + if (value) + writeUInt64(value); +} + +void ProtobufSimpleWriter::writeSInt64IfNonZero(Int64 value) +{ + if (value) + writeSInt64(value); +} + +void ProtobufSimpleWriter::writeFixed32IfNonZero(UInt32 value) +{ + if (value) + writeFixed32(value); +} + +void ProtobufSimpleWriter::writeSFixed32IfNonZero(Int32 value) +{ + if (value) + writeSFixed32(value); +} + +void ProtobufSimpleWriter::writeFloatIfNonZero(float value) +{ + if (value != 0) + writeFloat(value); +} + +void ProtobufSimpleWriter::writeFixed64IfNonZero(UInt64 value) +{ + if (value) + writeFixed64(value); +} + +void ProtobufSimpleWriter::writeSFixed64IfNonZero(Int64 value) +{ + if (value) + writeSFixed64(value); +} + +void ProtobufSimpleWriter::writeDoubleIfNonZero(double value) +{ + if (value != 0) + writeDouble(value); +} + +void ProtobufSimpleWriter::writeStringIfNotEmpty(const StringRef & str) +{ + if (str.size) + writeString(str); +} + +void ProtobufSimpleWriter::packRepeatedInt32(Int32 value) +{ + assert(current_field_number); + writeVariant(repeated_packing_buffer, value); + ++num_packed_values; +} + +void ProtobufSimpleWriter::packRepeatedUInt32(UInt32 value) +{ + assert(current_field_number); + writeVariant(repeated_packing_buffer, value); + ++num_packed_values; +} + +void ProtobufSimpleWriter::packRepeatedSInt32(Int32 value) +{ + assert(current_field_number); + writeVariant(repeated_packing_buffer, zigZag(value)); + ++num_packed_values; +} + +void ProtobufSimpleWriter::packRepeatedInt64(Int64 value) +{ + assert(current_field_number); + writeVariant(repeated_packing_buffer, value); + ++num_packed_values; +} + +void ProtobufSimpleWriter::packRepeatedUInt64(UInt64 value) +{ + assert(current_field_number); + writeVariant(repeated_packing_buffer, value); + ++num_packed_values; +} + +void ProtobufSimpleWriter::packRepeatedSInt64(Int64 value) +{ + assert(current_field_number); + writeVariant(repeated_packing_buffer, zigZag(value)); + ++num_packed_values; +} + +void ProtobufSimpleWriter::packRepeatedFixed32(UInt32 value) +{ + assert(current_field_number); + writeLittleEndian(repeated_packing_buffer, value); + ++num_packed_values; +} + +void ProtobufSimpleWriter::packRepeatedSFixed32(Int32 value) +{ + assert(current_field_number); + writeLittleEndian(repeated_packing_buffer, value); + ++num_packed_values; +} + +void ProtobufSimpleWriter::packRepeatedFloat(float value) +{ + assert(current_field_number); + writeLittleEndian(repeated_packing_buffer, value); + ++num_packed_values; +} + +void ProtobufSimpleWriter::packRepeatedFixed64(UInt64 value) +{ + assert(current_field_number); + writeLittleEndian(repeated_packing_buffer, value); + ++num_packed_values; +} + +void ProtobufSimpleWriter::packRepeatedSFixed64(Int64 value) +{ + assert(current_field_number); + writeLittleEndian(repeated_packing_buffer, value); + ++num_packed_values; +} + +void ProtobufSimpleWriter::packRepeatedDouble(double value) +{ + assert(current_field_number); + writeLittleEndian(repeated_packing_buffer, value); + ++num_packed_values; +} + +} diff --git a/dbms/src/Formats/ProtobufSimpleWriter.h b/dbms/src/Formats/ProtobufSimpleWriter.h new file mode 100644 index 00000000000..4880cfb4e21 --- /dev/null +++ b/dbms/src/Formats/ProtobufSimpleWriter.h @@ -0,0 +1,88 @@ +#pragma once + +#include +#include +#include "IO/WriteBufferFromString.h" + + +namespace DB +{ +/** Utility class to serialize protobufs. + * Knows nothing about protobuf schemas, just provides useful functions to serialize data. + * This class is written following the documentation: https://developers.google.com/protocol-buffers/docs/encoding + */ +class ProtobufSimpleWriter : private boost::noncopyable +{ +public: + ProtobufSimpleWriter(WriteBuffer & out_); + ~ProtobufSimpleWriter(); + + /// Should be called when we start writing a new message. + void newMessage(); + + /// Should be called when we start writing a new field. + /// A passed 'field_number' should be positive and greater than any previous 'field_number'. + void setCurrentField(UInt32 field_number); + UInt32 currentFieldNumber() const { return current_field_number; } + + /// Returns number of values added to the current field. + size_t numValues() const { return num_normal_values + num_packed_values; } + + void writeInt32(Int32 value); + void writeUInt32(UInt32 value); + void writeSInt32(Int32 value); + void writeInt64(Int64 value); + void writeUInt64(UInt64 value); + void writeSInt64(Int64 value); + void writeFixed32(UInt32 value); + void writeSFixed32(Int32 value); + void writeFloat(float value); + void writeFixed64(UInt64 value); + void writeSFixed64(Int64 value); + void writeDouble(double value); + void writeString(const StringRef & str); + + void writeInt32IfNonZero(Int32 value); + void writeUInt32IfNonZero(UInt32 value); + void writeSInt32IfNonZero(Int32 value); + void writeInt64IfNonZero(Int64 value); + void writeUInt64IfNonZero(UInt64 value); + void writeSInt64IfNonZero(Int64 value); + void writeFixed32IfNonZero(UInt32 value); + void writeSFixed32IfNonZero(Int32 value); + void writeFloatIfNonZero(float value); + void writeFixed64IfNonZero(UInt64 value); + void writeSFixed64IfNonZero(Int64 value); + void writeDoubleIfNonZero(double value); + void writeStringIfNotEmpty(const StringRef & str); + + void packRepeatedInt32(Int32 value); + void packRepeatedUInt32(UInt32 value); + void packRepeatedSInt32(Int32 value); + void packRepeatedInt64(Int64 value); + void packRepeatedUInt64(UInt64 value); + void packRepeatedSInt64(Int64 value); + void packRepeatedFixed32(UInt32 value); + void packRepeatedSFixed32(Int32 value); + void packRepeatedFloat(float value); + void packRepeatedFixed64(UInt64 value); + void packRepeatedSFixed64(Int64 value); + void packRepeatedDouble(double value); + +private: + void finishCurrentMessage(); + void finishCurrentField(); + + enum WireType : UInt32; + void writeKey(WriteBuffer & buf, WireType wire_type); + + WriteBuffer & out; + bool were_messages = false; + WriteBufferFromOwnString message_buffer; + UInt32 current_field_number = 0; + size_t num_normal_values = 0; + size_t num_packed_values = 0; + WriteBufferFromOwnString repeated_packing_buffer; +}; + +}