#include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int ILLEGAL_COLUMN; extern const int INCORRECT_DATA; } MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) : IRowInputFormat(header_, in_, std::move(params_)), buf(in_), data_types(header_.getDataTypes()) {} bool MsgPackRowInputFormat::readObject() { if (buf.eof()) return false; PeekableReadBufferCheckpoint checkpoint{buf}; size_t offset = 0; bool need_more_data = true; while (need_more_data) { offset = 0; try { object_handle = msgpack::unpack(buf.position(), buf.buffer().end() - buf.position(), offset); need_more_data = false; } catch (msgpack::insufficient_bytes &) { buf.position() = buf.buffer().end(); if (buf.eof()) throw Exception("Unexpected end of file while parsing msgpack object.", ErrorCodes::INCORRECT_DATA); buf.position() = buf.buffer().end(); buf.makeContinuousMemoryFromCheckpointToPos(); buf.rollbackToCheckpoint(); } } buf.position() += offset; return true; } void MsgPackRowInputFormat::insertObject(IColumn & column, DataTypePtr data_type, const msgpack::object & object) { switch (data_type->getTypeId()) { case TypeIndex::UInt8: { assert_cast(column).insertValue(object.as()); return; } case TypeIndex::Date: [[fallthrough]]; case TypeIndex::UInt16: { assert_cast(column).insertValue(object.as()); return; } case TypeIndex::DateTime: [[fallthrough]]; case TypeIndex::UInt32: { assert_cast(column).insertValue(object.as()); return; } case TypeIndex::UInt64: { assert_cast(column).insertValue(object.as()); return; } case TypeIndex::Int8: { assert_cast(column).insertValue(object.as()); return; } case TypeIndex::Int16: { assert_cast(column).insertValue(object.as()); return; } case TypeIndex::Int32: { assert_cast(column).insertValue(object.as()); return; } case TypeIndex::Int64: { assert_cast(column).insertValue(object.as()); return; } case TypeIndex::Float32: { assert_cast(column).insertValue(object.as()); return; } case TypeIndex::Float64: { assert_cast(column).insertValue(object.as()); return; } case TypeIndex::DateTime64: { assert_cast(column).insertValue(object.as()); return; } case TypeIndex::FixedString: [[fallthrough]]; case TypeIndex::String: { String str = object.as(); column.insertData(str.data(), str.size()); return; } case TypeIndex::Array: { msgpack::object_array object_array = object.via.array; auto nested_type = assert_cast(*data_type).getNestedType(); ColumnArray & column_array = assert_cast(column); ColumnArray::Offsets & offsets = column_array.getOffsets(); IColumn & nested_column = column_array.getData(); for (size_t i = 0; i != object_array.size; ++i) { insertObject(nested_column, nested_type, object_array.ptr[i]); } offsets.push_back(offsets.back() + object_array.size); return; } case TypeIndex::Nullable: { auto nested_type = removeNullable(data_type); ColumnNullable & column_nullable = assert_cast(column); if (object.type == msgpack::type::NIL) column_nullable.insertDefault(); else insertObject(column_nullable.getNestedColumn(), nested_type, object); return; } case TypeIndex::Nothing: { // Nothing to insert, MsgPack object is nil. return; } default: break; } throw Exception("Type " + data_type->getName() + " is not supported for MsgPack input format", ErrorCodes::ILLEGAL_COLUMN); } bool MsgPackRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) { size_t column_index = 0; bool has_more_data = true; for (; column_index != columns.size(); ++column_index) { has_more_data = readObject(); if (!has_more_data) break; insertObject(*columns[column_index], data_types[column_index], object_handle.get()); } if (!has_more_data) { if (column_index != 0) throw Exception("Not enough values to complete the row.", ErrorCodes::INCORRECT_DATA); return false; } return true; } void registerInputFormatProcessorMsgPack(FormatFactory & factory) { factory.registerInputFormatProcessor("MsgPack", []( ReadBuffer & buf, const Block & sample, const RowInputFormatParams & params, const FormatSettings &) { return std::make_shared(sample, buf, params); }); } }