This commit is contained in:
Pavel Kruglov 2021-09-16 14:25:45 +03:00
parent 8d1bf1b675
commit 2b67d54d58
3 changed files with 108 additions and 7 deletions

View File

@ -7,15 +7,18 @@
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h> #include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h> #include <Columns/ColumnFixedString.h>
#include <Columns/ColumnNullable.h> #include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnLowCardinality.h>
namespace DB namespace DB
{ {
@ -50,6 +53,8 @@ void MsgPackVisitor::reset()
info_stack = {}; info_stack = {};
} }
void MsgPackVisitor::insert_integer(UInt64 value) // NOLINT void MsgPackVisitor::insert_integer(UInt64 value) // NOLINT
{ {
Info & info = info_stack.top(); Info & info = info_stack.top();
@ -102,8 +107,15 @@ void MsgPackVisitor::insert_integer(UInt64 value) // NOLINT
assert_cast<DataTypeDateTime64::ColumnType &>(info.column).insertValue(value); assert_cast<DataTypeDateTime64::ColumnType &>(info.column).insertValue(value);
break; break;
} }
case TypeIndex::LowCardinality:
{
WhichDataType which(info.type);
if (!which.isUInt() && !which.isInt() && !which.is)
assert_cast<ColumnLowCardinality &>(info.column).insert(Field(value));
break;
}
default: default:
throw Exception("Type " + info.type->getName() + " is not supported for MsgPack input format", ErrorCodes::ILLEGAL_COLUMN); throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert MessagePack integer into column with type {}.", info_stack.top().type->getName());
} }
} }
@ -121,24 +133,34 @@ bool MsgPackVisitor::visit_negative_integer(Int64 value) // NOLINT
bool MsgPackVisitor::visit_str(const char* value, size_t size) // NOLINT bool MsgPackVisitor::visit_str(const char* value, size_t size) // NOLINT
{ {
if (!isStinfo_stack.top().type)
info_stack.top().column.insertData(value, size); info_stack.top().column.insertData(value, size);
return true; return true;
} }
bool MsgPackVisitor::visit_float32(Float32 value) // NOLINT bool MsgPackVisitor::visit_float32(Float32 value) // NOLINT
{ {
if (!WhichDataType(info_stack.top().type).isFloat32())
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert MessagePack float32 into column with type {}.", info_stack.top().type->getName());
assert_cast<ColumnFloat32 &>(info_stack.top().column).insertValue(value); assert_cast<ColumnFloat32 &>(info_stack.top().column).insertValue(value);
return true; return true;
} }
bool MsgPackVisitor::visit_float64(Float64 value) // NOLINT bool MsgPackVisitor::visit_float64(Float64 value) // NOLINT
{ {
if (!WhichDataType(info_stack.top().type).isFloat64())
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert MessagePack float32 into column with type {}.", info_stack.top().type->getName());
assert_cast<ColumnFloat64 &>(info_stack.top().column).insertValue(value); assert_cast<ColumnFloat64 &>(info_stack.top().column).insertValue(value);
return true; return true;
} }
bool MsgPackVisitor::start_array(size_t size) // NOLINT bool MsgPackVisitor::start_array(size_t size) // NOLINT
{ {
if (!isArray(info_stack.top().type))
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert MessagePack array into column with type {}.", info_stack.top().type->getName());
auto nested_type = assert_cast<const DataTypeArray &>(*info_stack.top().type).getNestedType(); auto nested_type = assert_cast<const DataTypeArray &>(*info_stack.top().type).getNestedType();
ColumnArray & column_array = assert_cast<ColumnArray &>(info_stack.top().column); ColumnArray & column_array = assert_cast<ColumnArray &>(info_stack.top().column);
ColumnArray::Offsets & offsets = column_array.getOffsets(); ColumnArray::Offsets & offsets = column_array.getOffsets();
@ -154,6 +176,49 @@ bool MsgPackVisitor::end_array() // NOLINT
return true; return true;
} }
bool MsgPackVisitor::start_map(uint32_t size) // NOLINT
{
if (!isMap(info_stack.top().type))
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert MessagePack map into column with type {}.", info_stack.top().type->getName());
ColumnArray & column_array = assert_cast<ColumnMap &>(info_stack.top().column).getNestedColumn();
ColumnArray::Offsets & offsets = column_array.getOffsets();
offsets.push_back(offsets.back() + size);
return true;
}
bool MsgPackVisitor::start_map_key() // NOLINT
{
auto key_column = assert_cast<ColumnMap &>(info_stack.top().column).getNestedData().getColumns()[0];
auto key_type = assert_cast<DataTypeMap &>(*info_stack.top().type).getKeyType();
info_stack.push(Info{*key_column, key_type});
return true;
}
bool MsgPackVisitor::end_map_key() // NOLINT
{
info_stack.pop();
return true;
}
bool MsgPackVisitor::start_map_value() // NOLINT
{
auto value_column = assert_cast<ColumnMap &>(info_stack.top().column).getNestedData().getColumns()[1];
auto value_type = assert_cast<DataTypeMap &>(*info_stack.top().type).getValueType();
info_stack.push(Info{*value_column, value_type});
return true;
}
bool MsgPackVisitor::end_map_value() // NOLINT
{
info_stack.pop();
return true;
}
bool MsgPackVisitor::visit_nil()
{
}
void MsgPackVisitor::parse_error(size_t, size_t) // NOLINT void MsgPackVisitor::parse_error(size_t, size_t) // NOLINT
{ {
throw Exception("Error occurred while parsing msgpack data.", ErrorCodes::INCORRECT_DATA); throw Exception("Error occurred while parsing msgpack data.", ErrorCodes::INCORRECT_DATA);

View File

@ -34,8 +34,15 @@ public:
bool visit_float32(Float32 value); bool visit_float32(Float32 value);
bool visit_float64(Float64 value); bool visit_float64(Float64 value);
bool visit_str(const char* value, size_t size); bool visit_str(const char* value, size_t size);
bool visit_bin(const char* value, size_t size);
bool start_array(size_t size); bool start_array(size_t size);
bool end_array(); bool end_array();
bool visit_nil();
bool start_map(uint32_t size);
bool start_map_key();
bool end_map_key();
bool start_map_value();
bool end_map_value();
/// This function will be called if error occurs in parsing /// This function will be called if error occurs in parsing
[[noreturn]] void parse_error(size_t parsed_offset, size_t error_offset); [[noreturn]] void parse_error(size_t parsed_offset, size_t error_offset);

View File

@ -6,15 +6,18 @@
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h> #include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h> #include <Columns/ColumnFixedString.h>
#include <Columns/ColumnNullable.h> #include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnLowCardinality.h>
namespace DB namespace DB
{ {
@ -91,15 +94,15 @@ void MsgPackRowOutputFormat::serializeField(const IColumn & column, DataTypePtr
case TypeIndex::String: case TypeIndex::String:
{ {
const StringRef & string = assert_cast<const ColumnString &>(column).getDataAt(row_num); const StringRef & string = assert_cast<const ColumnString &>(column).getDataAt(row_num);
packer.pack_str(string.size); packer.pack_bin(string.size);
packer.pack_str_body(string.data, string.size); packer.pack_bin_body(string.data, string.size);
return; return;
} }
case TypeIndex::FixedString: case TypeIndex::FixedString:
{ {
const StringRef & string = assert_cast<const ColumnFixedString &>(column).getDataAt(row_num); const StringRef & string = assert_cast<const ColumnFixedString &>(column).getDataAt(row_num);
packer.pack_str(string.size); packer.pack_bin(string.size);
packer.pack_str_body(string.data, string.size); packer.pack_bin_body(string.data, string.size);
return; return;
} }
case TypeIndex::Array: case TypeIndex::Array:
@ -132,6 +135,32 @@ void MsgPackRowOutputFormat::serializeField(const IColumn & column, DataTypePtr
packer.pack_nil(); packer.pack_nil();
return; return;
} }
case TypeIndex::Map:
{
const auto & map_column = assert_cast<const ColumnMap &>(column);
const auto & nested_column = map_column.getNestedColumn();
const auto & key_column = map_column.getNestedData().getColumns()[0];
const auto & value_column = map_column.getNestedData().getColumns()[1];
const auto & map_type = assert_cast<const DataTypeMap &>(data_type);
const auto & offsets = nested_column.getOffsets();
size_t offset = offsets[row_num - 1];
size_t size = offsets[row_num] - offset;
packer.pack_map(size);
for (size_t i = 0; i < size; ++i)
{
serializeField(*key_column, map_type.getKeyType(), row_num);
serializeField(*value_column, map_type.getValueType(), row_num);
}
}
case TypeIndex::LowCardinality:
{
const auto & lc_column = assert_cast<const ColumnLowCardinality &>(column);
auto dict_type = assert_cast<const DataTypeLowCardinality *>(data_type.get())->getDictionaryType();
auto dict_column = lc_column.getDictionaryPtr();
size_t index = lc_column.getIndexAt(row_num);
serializeField(*dict_column, dict_type, index);
}
default: default:
break; break;
} }