Improve MsgPack input/output format

This commit is contained in:
Pavel Kruglov 2021-09-16 17:05:07 +03:00
parent 2b67d54d58
commit 96dba14213
5 changed files with 191 additions and 47 deletions

View File

@ -13,7 +13,6 @@
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
@ -53,106 +52,199 @@ void MsgPackVisitor::reset()
info_stack = {};
}
void MsgPackVisitor::insert_integer(UInt64 value) // NOLINT
template <typename InsertFunc>
static bool checkAndInsertNullable(IColumn & column, DataTypePtr type, InsertFunc insert_func)
{
Info & info = info_stack.top();
switch (info.type->getTypeId())
if (type->isNullable())
{
auto & nullable_column = assert_cast<ColumnNullable &>(column);
auto & nested_column = nullable_column.getNestedColumn();
auto & nested_type = assert_cast<const DataTypeNullable *>(type.get())->getNestedType();
insert_func(nested_column, nested_type);
nullable_column.getNullMapColumn().insertValue(0);
return true;
}
return false;
}
template <typename InsertFunc>
static bool checkAndInsertLowCardinality(IColumn & column, DataTypePtr type, InsertFunc insert_func)
{
if (type->getTypeId() == TypeIndex::LowCardinality)
{
auto & lc_column = assert_cast<ColumnLowCardinality &>(column);
auto tmp_column = lc_column.getDictionary().getNestedColumn()->cloneEmpty();
auto dict_type = assert_cast<const DataTypeLowCardinality *>(type.get())->getDictionaryType();
insert_func(*tmp_column, dict_type);
lc_column.insertFromFullColumn(*tmp_column, 0);
return true;
}
return false;
}
static void insertInteger(IColumn & column, DataTypePtr type, UInt64 value)
{
auto insert_func = [&](IColumn & column_, DataTypePtr type_)
{
insertInteger(column_, type_, value);
};
if (checkAndInsertNullable(column, type, insert_func) || checkAndInsertLowCardinality(column, type, insert_func))
return;
switch (type->getTypeId())
{
case TypeIndex::UInt8:
{
assert_cast<ColumnUInt8 &>(info.column).insertValue(value);
assert_cast<ColumnUInt8 &>(column).insertValue(value);
break;
}
case TypeIndex::Date: [[fallthrough]];
case TypeIndex::UInt16:
{
assert_cast<ColumnUInt16 &>(info.column).insertValue(value);
assert_cast<ColumnUInt16 &>(column).insertValue(value);
break;
}
case TypeIndex::DateTime: [[fallthrough]];
case TypeIndex::UInt32:
{
assert_cast<ColumnUInt32 &>(info.column).insertValue(value);
assert_cast<ColumnUInt32 &>(column).insertValue(value);
break;
}
case TypeIndex::UInt64:
{
assert_cast<ColumnUInt64 &>(info.column).insertValue(value);
assert_cast<ColumnUInt64 &>(column).insertValue(value);
break;
}
case TypeIndex::Int8:
{
assert_cast<ColumnInt8 &>(info.column).insertValue(value);
assert_cast<ColumnInt8 &>(column).insertValue(value);
break;
}
case TypeIndex::Int16:
{
assert_cast<ColumnInt16 &>(info.column).insertValue(value);
assert_cast<ColumnInt16 &>(column).insertValue(value);
break;
}
case TypeIndex::Int32:
{
assert_cast<ColumnInt32 &>(info.column).insertValue(value);
assert_cast<ColumnInt32 &>(column).insertValue(value);
break;
}
case TypeIndex::Int64:
{
assert_cast<ColumnInt64 &>(info.column).insertValue(value);
assert_cast<ColumnInt64 &>(column).insertValue(value);
break;
}
case TypeIndex::DateTime64:
{
assert_cast<DataTypeDateTime64::ColumnType &>(info.column).insertValue(value);
break;
}
case TypeIndex::LowCardinality:
{
WhichDataType which(info.type);
if (!which.isUInt() && !which.isInt() && !which.is)
assert_cast<ColumnLowCardinality &>(info.column).insert(Field(value));
assert_cast<DataTypeDateTime64::ColumnType &>(column).insertValue(value);
break;
}
default:
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert MessagePack integer into column with type {}.", info_stack.top().type->getName());
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert MessagePack integer into column with type {}.", type->getName());
}
}
static void insertString(IColumn & column, DataTypePtr type, const char * value, size_t size)
{
auto insert_func = [&](IColumn & column_, DataTypePtr type_)
{
insertString(column_, type_, value, size);
};
if (checkAndInsertNullable(column, type, insert_func) || checkAndInsertLowCardinality(column, type, insert_func))
return;
if (!isStringOrFixedString(type))
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert MessagePack string into column with type {}.", type->getName());
column.insertData(value, size);
}
static void insertFloat32(IColumn & column, DataTypePtr type, Float32 value) // NOLINT
{
auto insert_func = [&](IColumn & column_, DataTypePtr type_)
{
insertFloat32(column_, type_, value);
};
if (checkAndInsertNullable(column, type, insert_func) || checkAndInsertLowCardinality(column, type, insert_func))
return;
if (!WhichDataType(type).isFloat32())
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert MessagePack float32 into column with type {}.", type->getName());
assert_cast<ColumnFloat32 &>(column).insertValue(value);
}
static void insertFloat64(IColumn & column, DataTypePtr type, Float64 value) // NOLINT
{
auto insert_func = [&](IColumn & column_, DataTypePtr type_)
{
insertFloat64(column_, type_, value);
};
if (checkAndInsertNullable(column, type, insert_func) || checkAndInsertLowCardinality(column, type, insert_func))
return;
if (!WhichDataType(type).isFloat64())
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert MessagePack float64 into column with type {}.", type->getName());
assert_cast<ColumnFloat64 &>(column).insertValue(value);
}
static void insertNull(IColumn & column, DataTypePtr type)
{
auto insert_func = [&](IColumn & column_, DataTypePtr type_)
{
insertNull(column_, type_);
};
/// LowCardinalityNullable(...)
if (checkAndInsertLowCardinality(column, type, insert_func))
return;
if (!type->isNullable())
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert MessagePack null into non-nullable column with type {}.", type->getName());
assert_cast<ColumnNullable &>(column).insertDefault();
}
bool MsgPackVisitor::visit_positive_integer(UInt64 value) // NOLINT
{
insert_integer(value);
insertInteger(info_stack.top().column, info_stack.top().type, value);
return true;
}
bool MsgPackVisitor::visit_negative_integer(Int64 value) // NOLINT
{
insert_integer(value);
insertInteger(info_stack.top().column, info_stack.top().type, value);
return true;
}
bool MsgPackVisitor::visit_str(const char* value, size_t size) // NOLINT
bool MsgPackVisitor::visit_str(const char * value, size_t size) // NOLINT
{
if (!isStinfo_stack.top().type)
info_stack.top().column.insertData(value, size);
insertString(info_stack.top().column, info_stack.top().type, value, size);
return true;
}
bool MsgPackVisitor::visit_bin(const char * value, size_t size)
{
insertString(info_stack.top().column, info_stack.top().type, value, size);
return true;
}
bool MsgPackVisitor::visit_float32(Float32 value) // NOLINT
{
if (!WhichDataType(info_stack.top().type).isFloat32())
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert MessagePack float32 into column with type {}.", info_stack.top().type->getName());
assert_cast<ColumnFloat32 &>(info_stack.top().column).insertValue(value);
insertFloat32(info_stack.top().column, info_stack.top().type, value);
return true;
}
bool MsgPackVisitor::visit_float64(Float64 value) // NOLINT
{
if (!WhichDataType(info_stack.top().type).isFloat64())
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert MessagePack float32 into column with type {}.", info_stack.top().type->getName());
assert_cast<ColumnFloat64 &>(info_stack.top().column).insertValue(value);
insertFloat64(info_stack.top().column, info_stack.top().type, value);
return true;
}
@ -189,7 +281,7 @@ bool MsgPackVisitor::start_map(uint32_t size) // NOLINT
bool MsgPackVisitor::start_map_key() // NOLINT
{
auto key_column = assert_cast<ColumnMap &>(info_stack.top().column).getNestedData().getColumns()[0];
auto key_type = assert_cast<DataTypeMap &>(*info_stack.top().type).getKeyType();
auto key_type = assert_cast<const DataTypeMap &>(*info_stack.top().type).getKeyType();
info_stack.push(Info{*key_column, key_type});
return true;
}
@ -203,7 +295,7 @@ bool MsgPackVisitor::end_map_key() // NOLINT
bool MsgPackVisitor::start_map_value() // NOLINT
{
auto value_column = assert_cast<ColumnMap &>(info_stack.top().column).getNestedData().getColumns()[1];
auto value_type = assert_cast<DataTypeMap &>(*info_stack.top().type).getValueType();
auto value_type = assert_cast<const DataTypeMap &>(*info_stack.top().type).getValueType();
info_stack.push(Info{*value_column, value_type});
return true;
}
@ -216,7 +308,8 @@ bool MsgPackVisitor::end_map_value() // NOLINT
bool MsgPackVisitor::visit_nil()
{
insertNull(info_stack.top().column, info_stack.top().type);
return true;
}
void MsgPackVisitor::parse_error(size_t, size_t) // NOLINT

View File

@ -33,8 +33,8 @@ public:
bool visit_negative_integer(Int64 value);
bool visit_float32(Float32 value);
bool visit_float64(Float64 value);
bool visit_str(const char* value, size_t size);
bool visit_bin(const char* value, size_t size);
bool visit_str(const char * value, size_t size);
bool visit_bin(const char * value, size_t size);
bool start_array(size_t size);
bool end_array();
bool visit_nil();
@ -49,13 +49,10 @@ public:
/// Update info_stack
void set_info(IColumn & column, DataTypePtr type);
void insert_integer(UInt64 value);
void reset();
private:
/// Stack is needed to process nested arrays
/// Stack is needed to process arrays and maps
std::stack<Info> info_stack;
};

View File

@ -142,7 +142,7 @@ void MsgPackRowOutputFormat::serializeField(const IColumn & column, DataTypePtr
const auto & key_column = map_column.getNestedData().getColumns()[0];
const auto & value_column = map_column.getNestedData().getColumns()[1];
const auto & map_type = assert_cast<const DataTypeMap &>(data_type);
const auto & map_type = assert_cast<const DataTypeMap &>(*data_type);
const auto & offsets = nested_column.getOffsets();
size_t offset = offsets[row_num - 1];
size_t size = offsets[row_num] - offset;
@ -152,14 +152,16 @@ void MsgPackRowOutputFormat::serializeField(const IColumn & column, DataTypePtr
serializeField(*key_column, map_type.getKeyType(), row_num);
serializeField(*value_column, map_type.getValueType(), row_num);
}
return;
}
case TypeIndex::LowCardinality:
{
const auto & lc_column = assert_cast<const ColumnLowCardinality &>(column);
auto dict_type = assert_cast<const DataTypeLowCardinality *>(data_type.get())->getDictionaryType();
auto dict_column = lc_column.getDictionaryPtr();
auto dict_column = lc_column.getDictionary().getNestedColumn();
size_t index = lc_column.getIndexAt(row_num);
serializeField(*dict_column, dict_type, index);
return;
}
default:
break;

View File

@ -11,3 +11,12 @@
2020-01-01
2020-01-02
2020-01-02
{1:2,2:3} [{1:[1,2],2:[3,4]},{3:[5,6],4:[7,8]}]
{1:2,1:2} [{1:[1,2],1:[1,2]},{2:[3,4],2:[3,4]}]
42 42 42 ['42','42'] ['42','42']
42 \N \N [NULL,'42',NULL] [NULL,'42',NULL]
42 42 42 ['42','42'] ['42','42']
42 \N \N [NULL,'42',NULL] [NULL,'42',NULL]
OK
OK
OK

View File

@ -5,6 +5,10 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
USER_FILES_PATH=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS msgpack";
$CLICKHOUSE_CLIENT --query="CREATE TABLE msgpack (uint8 UInt8, uint16 UInt16, uint32 UInt32, uint64 UInt64, int8 Int8, int16 Int16, int32 Int32, int64 Int64, float Float32, double Float64, string String, date Date, datetime DateTime('Europe/Moscow'), datetime64 DateTime64(3, 'Europe/Moscow'), array Array(UInt32)) ENGINE = Memory";
@ -62,3 +66,42 @@ $CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack";
$CLICKHOUSE_CLIENT --query="DROP TABLE msgpack";
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS msgpack_map";
$CLICKHOUSE_CLIENT --query="CREATE TABLE msgpack_map (m Map(UInt64, UInt64), a Array(Map(UInt64, Array(UInt64)))) ENGINE=Memory()";
$CLICKHOUSE_CLIENT --query="INSERT INTO msgpack_map VALUES ({1 : 2, 2 : 3}, [{1 : [1, 2], 2 : [3, 4]}, {3 : [5, 6], 4 : [7, 8]}])";
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack_map FORMAT MsgPack" | $CLICKHOUSE_CLIENT --query="INSERT INTO msgpack_map FORMAT MsgPack";
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack_map";
$CLICKHOUSE_CLIENT --query="DROP TABLE msgpack_map";
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS msgpack_lc_nullable";
$CLICKHOUSE_CLIENT --query="CREATE TABLE msgpack_lc_nullable (a LowCardinality(String), b Nullable(String), c LowCardinality(Nullable(String)), d Array(Nullable(String)), e Array(LowCardinality(Nullable(String)))) engine=Memory()";
$CLICKHOUSE_CLIENT --query="INSERT INTO msgpack_lc_nullable VALUES ('42', '42', '42', ['42', '42'], ['42', '42']), ('42', NULL, NULL, [NULL, '42', NULL], [NULL, '42', NULL])";
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack_lc_nullable FORMAT MsgPack" | $CLICKHOUSE_CLIENT --query="INSERT INTO msgpack_lc_nullable FORMAT MsgPack";
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack_lc_nullable";
$CLICKHOUSE_CLIENT --query="DROP TABLE msgpack_lc_nullable";
$CLICKHOUSE_CLIENT --query="SELECT toString(number) FROM numbers(10) FORMAT MsgPack" > $USER_FILES_PATH/data.msgpack
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.msgpack', 'MsgPack', 'x UInt64')" 2>&1 | grep -F -q "ILLEGAL_COLUMN" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.msgpack', 'MsgPack', 'x Float32')" 2>&1 | grep -F -q "ILLEGAL_COLUMN" && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="SELECT * FROM file('data.msgpack', 'MsgPack', 'x Array(UInt32)')" 2>&1 | grep -F -q "ILLEGAL_COLUMN" && echo 'OK' || echo 'FAIL';
rm $USER_FILES_PATH/data.msgpack