Improve MsgPackRowInputFormat

This commit is contained in:
Avogar 2020-04-17 12:35:38 +03:00
parent 23ae3f4bff
commit b056dbce1c
3 changed files with 153 additions and 122 deletions

View File

@ -24,14 +24,124 @@ namespace ErrorCodes
}
MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: IRowInputFormat(header_, in_, std::move(params_)), buf(in), ctx(&reference_func, nullptr, msgpack::unpack_limit()), data_types(header_.getDataTypes()) {}
: IRowInputFormat(header_, in_, std::move(params_)), buf(in), parser(visitor), data_types(header_.getDataTypes()) {}
int MsgPackRowInputFormat::unpack(msgpack::zone & zone, size_t & offset)
void MsgPackVisitor::set_info(IColumn & column, DataTypePtr type)
{
offset = 0;
ctx.init();
ctx.user().set_zone(zone);
return ctx.execute(buf.position(), buf.buffer().end() - buf.position(), offset);
while (!info_stack.empty())
{
info_stack.pop();
}
info_stack.push(Info{column, type});
}
void MsgPackVisitor::insert_integer(UInt64 value)
{
Info & info = info_stack.top();
switch (info.type->getTypeId())
{
case TypeIndex::UInt8:
{
assert_cast<ColumnUInt8 &>(info.column).insertValue(value);
break;
}
case TypeIndex::Date: [[fallthrough]];
case TypeIndex::UInt16:
{
assert_cast<ColumnUInt16 &>(info.column).insertValue(value);
break;
}
case TypeIndex::DateTime: [[fallthrough]];
case TypeIndex::UInt32:
{
assert_cast<ColumnUInt32 &>(info.column).insertValue(value);
break;
}
case TypeIndex::UInt64:
{
assert_cast<ColumnUInt64 &>(info.column).insertValue(value);
break;
}
case TypeIndex::Int8:
{
assert_cast<ColumnInt8 &>(info.column).insertValue(value);
break;
}
case TypeIndex::Int16:
{
assert_cast<ColumnInt16 &>(info.column).insertValue(value);
break;
}
case TypeIndex::Int32:
{
assert_cast<ColumnInt32 &>(info.column).insertValue(value);
break;
}
case TypeIndex::Int64:
{
assert_cast<ColumnInt64 &>(info.column).insertValue(value);
break;
}
case TypeIndex::DateTime64:
{
assert_cast<DataTypeDateTime64::ColumnType &>(info.column).insertValue(value);
break;
}
default:
throw Exception("Type " + info.type->getName() + " is not supported for MsgPack input format", ErrorCodes::ILLEGAL_COLUMN);
}
}
bool MsgPackVisitor::visit_positive_integer(UInt64 value)
{
insert_integer(value);
return true;
}
bool MsgPackVisitor::visit_negative_integer(Int64 value)
{
insert_integer(value);
return true;
}
bool MsgPackVisitor::visit_str(const char* value, size_t size)
{
info_stack.top().column.insertData(value, size);
return true;
}
bool MsgPackVisitor::visit_float32(Float32 value)
{
assert_cast<ColumnFloat32 &>(info_stack.top().column).insertValue(value);
return true;
}
bool MsgPackVisitor::visit_float64(Float64 value)
{
assert_cast<ColumnFloat64 &>(info_stack.top().column).insertValue(value);
return true;
}
bool MsgPackVisitor::start_array(size_t size)
{
auto nested_type = assert_cast<const DataTypeArray &>(*info_stack.top().type).getNestedType();
ColumnArray & column_array = assert_cast<ColumnArray &>(info_stack.top().column);
ColumnArray::Offsets & offsets = column_array.getOffsets();
IColumn & nested_column = column_array.getData();
offsets.push_back(offsets.back() + size);
info_stack.push(Info{nested_column, nested_type});
return true;
}
bool MsgPackVisitor::end_array()
{
info_stack.pop();
return true;
}
void MsgPackVisitor::parse_error(size_t, size_t)
{
throw Exception("Error occurred while parsing msgpack data.", ErrorCodes::INCORRECT_DATA);
}
bool MsgPackRowInputFormat::readObject()
@ -40,9 +150,8 @@ bool MsgPackRowInputFormat::readObject()
return false;
PeekableReadBufferCheckpoint checkpoint{buf};
std::unique_ptr<msgpack::zone> zone(new msgpack::zone);
size_t offset;
while (!unpack(*zone, offset))
size_t offset = 0;
while (!parser.execute(buf.position(), buf.available(), offset))
{
buf.position() = buf.buffer().end();
if (buf.eof())
@ -52,123 +161,19 @@ bool MsgPackRowInputFormat::readObject()
buf.rollbackToCheckpoint();
}
buf.position() += offset;
object_handle = msgpack::object_handle(ctx.data(), std::move(zone));
return true;
}
void MsgPackRowInputFormat::insertObject(IColumn & column, DataTypePtr data_type, const msgpack::object & object)
{
switch (data_type->getTypeId())
{
case TypeIndex::UInt8:
{
assert_cast<ColumnUInt8 &>(column).insertValue(object.as<uint8_t>());
return;
}
case TypeIndex::Date: [[fallthrough]];
case TypeIndex::UInt16:
{
assert_cast<ColumnUInt16 &>(column).insertValue(object.as<UInt16>());
return;
}
case TypeIndex::DateTime: [[fallthrough]];
case TypeIndex::UInt32:
{
assert_cast<ColumnUInt32 &>(column).insertValue(object.as<UInt32>());
return;
}
case TypeIndex::UInt64:
{
assert_cast<ColumnUInt64 &>(column).insertValue(object.as<UInt64>());
return;
}
case TypeIndex::Int8:
{
assert_cast<ColumnInt8 &>(column).insertValue(object.as<Int8>());
return;
}
case TypeIndex::Int16:
{
assert_cast<ColumnInt16 &>(column).insertValue(object.as<Int16>());
return;
}
case TypeIndex::Int32:
{
assert_cast<ColumnInt32 &>(column).insertValue(object.as<Int32>());
return;
}
case TypeIndex::Int64:
{
assert_cast<ColumnInt64 &>(column).insertValue(object.as<Int64>());
return;
}
case TypeIndex::Float32:
{
assert_cast<ColumnFloat32 &>(column).insertValue(object.as<Float32>());
return;
}
case TypeIndex::Float64:
{
assert_cast<ColumnFloat64 &>(column).insertValue(object.as<Float64>());
return;
}
case TypeIndex::DateTime64:
{
assert_cast<DataTypeDateTime64::ColumnType &>(column).insertValue(object.as<UInt64>());
return;
}
case TypeIndex::FixedString: [[fallthrough]];
case TypeIndex::String:
{
msgpack::object_str obj_str = object.via.str;
column.insertData(obj_str.ptr, obj_str.size);
return;
}
case TypeIndex::Array:
{
msgpack::object_array object_array = object.via.array;
auto nested_type = assert_cast<const DataTypeArray &>(*data_type).getNestedType();
ColumnArray & column_array = assert_cast<ColumnArray &>(column);
ColumnArray::Offsets & offsets = column_array.getOffsets();
IColumn & nested_column = column_array.getData();
for (size_t i = 0; i != object_array.size; ++i)
{
insertObject(nested_column, nested_type, object_array.ptr[i]);
}
offsets.push_back(offsets.back() + object_array.size);
return;
}
case TypeIndex::Nullable:
{
auto nested_type = removeNullable(data_type);
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(column);
if (object.type == msgpack::type::NIL)
column_nullable.insertDefault();
else
insertObject(column_nullable.getNestedColumn(), nested_type, object);
return;
}
case TypeIndex::Nothing:
{
// Nothing to insert, MsgPack object is nil.
return;
}
default:
break;
}
throw Exception("Type " + data_type->getName() + " is not supported for MsgPack input format", ErrorCodes::ILLEGAL_COLUMN);
}
bool MsgPackRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
size_t column_index = 0;
bool has_more_data = true;
for (; column_index != columns.size(); ++column_index)
{
visitor.set_info(*columns[column_index], data_types[column_index]);
has_more_data = readObject();
if (!has_more_data)
break;
insertObject(*columns[column_index], data_types[column_index], object_handle.get());
}
if (!has_more_data)
{

View File

@ -4,12 +4,43 @@
#include <Formats/FormatFactory.h>
#include <IO/PeekableReadBuffer.h>
#include <msgpack.hpp>
#include <stack>
namespace DB
{
class ReadBuffer;
class MsgPackVisitor : public msgpack::null_visitor {
public:
struct Info
{
IColumn & column;
DataTypePtr type;
};
/// These functions are called when parser meets corresponding object in parsed data
bool visit_positive_integer(UInt64 value);
bool visit_negative_integer(Int64 value);
bool visit_float32(Float32 value);
bool visit_float64(Float64 value);
bool visit_str(const char* value, size_t size);
bool start_array(size_t size);
bool end_array();
/// This function will be called if error occurs in parsing
void parse_error(size_t parsed_offset, size_t error_offset);
/// Update info_stack
void set_info(IColumn & column, DataTypePtr type);
void insert_integer(UInt64 value);
private:
/// Stack is needed to process nested arrays
std::stack<Info> info_stack;
};
class MsgPackRowInputFormat : public IRowInputFormat
{
public:
@ -19,15 +50,10 @@ public:
String getName() const override { return "MagPackRowInputFormat"; }
private:
bool readObject();
void insertObject(IColumn & column, DataTypePtr type, const msgpack::object & object);
int unpack(msgpack::zone & zone, size_t & offset);
// msgpack makes a copy of object by default, this function tells unpacker not to copy.
static bool reference_func(msgpack::type::object_type, size_t, void *) { return true; }
PeekableReadBuffer buf;
msgpack::object_handle object_handle;
msgpack::v1::detail::context ctx;
MsgPackVisitor visitor;
msgpack::detail::parse_helper<MsgPackVisitor> parser;
DataTypes data_types;
};

View File

@ -44,7 +44,7 @@
<value>ODBCDriver2</value>
<value>MySQLWire</value>
<value>Avro</value>
<!-- <value>MsgPack</value> Does not work in performance test for unknown reason. -->
<value>MsgPack</value>
</values>
</substitution>
</substitutions>