diff --git a/src/Processors/Formats/Impl/MsgPackRowInputFormat.cpp b/src/Processors/Formats/Impl/MsgPackRowInputFormat.cpp index b7da335f0c5..3e112fb1ce6 100644 --- a/src/Processors/Formats/Impl/MsgPackRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/MsgPackRowInputFormat.cpp @@ -24,14 +24,124 @@ namespace ErrorCodes } MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) - : IRowInputFormat(header_, in_, std::move(params_)), buf(in), ctx(&reference_func, nullptr, msgpack::unpack_limit()), data_types(header_.getDataTypes()) {} + : IRowInputFormat(header_, in_, std::move(params_)), buf(in), parser(visitor), data_types(header_.getDataTypes()) {} -int MsgPackRowInputFormat::unpack(msgpack::zone & zone, size_t & offset) +void MsgPackVisitor::set_info(IColumn & column, DataTypePtr type) // NOLINT { - offset = 0; - ctx.init(); - ctx.user().set_zone(zone); - return ctx.execute(buf.position(), buf.buffer().end() - buf.position(), offset); + while (!info_stack.empty()) + { + info_stack.pop(); + } + info_stack.push(Info{column, type}); +} + +void MsgPackVisitor::insert_integer(UInt64 value) // NOLINT +{ + Info & info = info_stack.top(); + switch (info.type->getTypeId()) + { + case TypeIndex::UInt8: + { + assert_cast(info.column).insertValue(value); + break; + } + case TypeIndex::Date: [[fallthrough]]; + case TypeIndex::UInt16: + { + assert_cast(info.column).insertValue(value); + break; + } + case TypeIndex::DateTime: [[fallthrough]]; + case TypeIndex::UInt32: + { + assert_cast(info.column).insertValue(value); + break; + } + case TypeIndex::UInt64: + { + assert_cast(info.column).insertValue(value); + break; + } + case TypeIndex::Int8: + { + assert_cast(info.column).insertValue(value); + break; + } + case TypeIndex::Int16: + { + assert_cast(info.column).insertValue(value); + break; + } + case TypeIndex::Int32: + { + assert_cast(info.column).insertValue(value); + break; + } + case TypeIndex::Int64: + { + assert_cast(info.column).insertValue(value); + break; + } + case TypeIndex::DateTime64: + { + assert_cast(info.column).insertValue(value); + break; + } + default: + throw Exception("Type " + info.type->getName() + " is not supported for MsgPack input format", ErrorCodes::ILLEGAL_COLUMN); + } +} + +bool MsgPackVisitor::visit_positive_integer(UInt64 value) // NOLINT +{ + insert_integer(value); + return true; +} + +bool MsgPackVisitor::visit_negative_integer(Int64 value) // NOLINT +{ + insert_integer(value); + return true; +} + +bool MsgPackVisitor::visit_str(const char* value, size_t size) // NOLINT +{ + info_stack.top().column.insertData(value, size); + return true; +} + +bool MsgPackVisitor::visit_float32(Float32 value) // NOLINT +{ + assert_cast(info_stack.top().column).insertValue(value); + return true; +} + +bool MsgPackVisitor::visit_float64(Float64 value) // NOLINT +{ + assert_cast(info_stack.top().column).insertValue(value); + return true; +} + +bool MsgPackVisitor::start_array(size_t size) // NOLINT +{ + auto nested_type = assert_cast(*info_stack.top().type).getNestedType(); + ColumnArray & column_array = assert_cast(info_stack.top().column); + ColumnArray::Offsets & offsets = column_array.getOffsets(); + IColumn & nested_column = column_array.getData(); + offsets.push_back(offsets.back() + size); + info_stack.push(Info{nested_column, nested_type}); + return true; +} + +bool MsgPackVisitor::end_array() // NOLINT +{ + info_stack.pop(); + return true; +} + +void MsgPackVisitor::parse_error(size_t, size_t) // NOLINT +{ + throw Exception("Error occurred while parsing msgpack data.", ErrorCodes::INCORRECT_DATA); } bool MsgPackRowInputFormat::readObject() @@ -40,9 +150,8 @@ bool MsgPackRowInputFormat::readObject() return false; PeekableReadBufferCheckpoint checkpoint{buf}; - std::unique_ptr zone(new msgpack::zone); - size_t offset; - while (!unpack(*zone, offset)) + size_t offset = 0; + while (!parser.execute(buf.position(), buf.available(), offset)) { buf.position() = buf.buffer().end(); if (buf.eof()) @@ -52,123 +161,19 @@ bool MsgPackRowInputFormat::readObject() buf.rollbackToCheckpoint(); } buf.position() += offset; - object_handle = msgpack::object_handle(ctx.data(), std::move(zone)); return true; } -void MsgPackRowInputFormat::insertObject(IColumn & column, DataTypePtr data_type, const msgpack::object & object) -{ - switch (data_type->getTypeId()) - { - case TypeIndex::UInt8: - { - assert_cast(column).insertValue(object.as()); - return; - } - case TypeIndex::Date: [[fallthrough]]; - case TypeIndex::UInt16: - { - assert_cast(column).insertValue(object.as()); - return; - } - case TypeIndex::DateTime: [[fallthrough]]; - case TypeIndex::UInt32: - { - assert_cast(column).insertValue(object.as()); - return; - } - case TypeIndex::UInt64: - { - assert_cast(column).insertValue(object.as()); - return; - } - case TypeIndex::Int8: - { - assert_cast(column).insertValue(object.as()); - return; - } - case TypeIndex::Int16: - { - assert_cast(column).insertValue(object.as()); - return; - } - case TypeIndex::Int32: - { - assert_cast(column).insertValue(object.as()); - return; - } - case TypeIndex::Int64: - { - assert_cast(column).insertValue(object.as()); - return; - } - case TypeIndex::Float32: - { - assert_cast(column).insertValue(object.as()); - return; - } - case TypeIndex::Float64: - { - assert_cast(column).insertValue(object.as()); - return; - } - case TypeIndex::DateTime64: - { - assert_cast(column).insertValue(object.as()); - return; - } - case TypeIndex::FixedString: [[fallthrough]]; - case TypeIndex::String: - { - msgpack::object_str obj_str = object.via.str; - column.insertData(obj_str.ptr, obj_str.size); - return; - } - case TypeIndex::Array: - { - msgpack::object_array object_array = object.via.array; - auto nested_type = assert_cast(*data_type).getNestedType(); - ColumnArray & column_array = assert_cast(column); - ColumnArray::Offsets & offsets = column_array.getOffsets(); - IColumn & nested_column = column_array.getData(); - for (size_t i = 0; i != object_array.size; ++i) - { - insertObject(nested_column, nested_type, object_array.ptr[i]); - } - offsets.push_back(offsets.back() + object_array.size); - return; - } - case TypeIndex::Nullable: - { - auto nested_type = removeNullable(data_type); - ColumnNullable & column_nullable = assert_cast(column); - if (object.type == msgpack::type::NIL) - column_nullable.insertDefault(); - else - insertObject(column_nullable.getNestedColumn(), nested_type, object); - return; - } - case TypeIndex::Nothing: - { - // Nothing to insert, MsgPack object is nil. - return; - } - default: - break; - } - throw Exception("Type " + data_type->getName() + " is not supported for MsgPack input format", ErrorCodes::ILLEGAL_COLUMN); -} - bool MsgPackRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) { size_t column_index = 0; bool has_more_data = true; for (; column_index != columns.size(); ++column_index) { + visitor.set_info(*columns[column_index], data_types[column_index]); has_more_data = readObject(); if (!has_more_data) break; - insertObject(*columns[column_index], data_types[column_index], object_handle.get()); } if (!has_more_data) { diff --git a/src/Processors/Formats/Impl/MsgPackRowInputFormat.h b/src/Processors/Formats/Impl/MsgPackRowInputFormat.h index a426dc4950c..92e4f5d0bd7 100644 --- a/src/Processors/Formats/Impl/MsgPackRowInputFormat.h +++ b/src/Processors/Formats/Impl/MsgPackRowInputFormat.h @@ -4,12 +4,44 @@ #include #include #include +#include namespace DB { class ReadBuffer; +class MsgPackVisitor : public msgpack::null_visitor +{ +public: + struct Info + { + IColumn & column; + DataTypePtr type; + }; + + /// These functions are called when parser meets corresponding object in parsed data + bool visit_positive_integer(UInt64 value); + bool visit_negative_integer(Int64 value); + bool visit_float32(Float32 value); + bool visit_float64(Float64 value); + bool visit_str(const char* value, size_t size); + bool start_array(size_t size); + bool end_array(); + + /// This function will be called if error occurs in parsing + [[noreturn]] void parse_error(size_t parsed_offset, size_t error_offset); + + /// Update info_stack + void set_info(IColumn & column, DataTypePtr type); + + void insert_integer(UInt64 value); + +private: + /// Stack is needed to process nested arrays + std::stack info_stack; +}; + class MsgPackRowInputFormat : public IRowInputFormat { public: @@ -19,15 +51,10 @@ public: String getName() const override { return "MagPackRowInputFormat"; } private: bool readObject(); - void insertObject(IColumn & column, DataTypePtr type, const msgpack::object & object); - int unpack(msgpack::zone & zone, size_t & offset); - - // msgpack makes a copy of object by default, this function tells unpacker not to copy. - static bool reference_func(msgpack::type::object_type, size_t, void *) { return true; } PeekableReadBuffer buf; - msgpack::object_handle object_handle; - msgpack::v1::detail::context ctx; + MsgPackVisitor visitor; + msgpack::detail::parse_helper parser; DataTypes data_types; }; diff --git a/tests/performance/select_format.xml b/tests/performance/select_format.xml index 2bdbde83c2d..e47d981c4d7 100644 --- a/tests/performance/select_format.xml +++ b/tests/performance/select_format.xml @@ -44,7 +44,7 @@ ODBCDriver2 MySQLWire Avro - + MsgPack