Add MsgPackRowInputFormat, msgpack-c contrib and tests.

This commit is contained in:
Avogar 2020-03-26 19:33:00 +03:00
parent 783a898b9d
commit b02636f916
15 changed files with 269 additions and 24 deletions

3
.gitmodules vendored
View File

@ -148,3 +148,6 @@
path = contrib/avro
url = https://github.com/ClickHouse-Extras/avro.git
ignore = untracked
[submodule "contrib/msgpack-c"]
path = contrib/msgpack-c
url = https://github.com/msgpack/msgpack-c

View File

@ -345,6 +345,7 @@ include (cmake/find/rapidjson.cmake)
include (cmake/find/fastops.cmake)
include (cmake/find/orc.cmake)
include (cmake/find/avro.cmake)
include (cmake/find/msgpack.cmake)
find_contrib_lib(cityhash)
find_contrib_lib(farmhash)

2
cmake/find/msgpack.cmake Normal file
View File

@ -0,0 +1,2 @@
set(MSGPACK_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/msgpack-c/include)
message(STATUS "Using msgpack: ${MSGPACK_INCLUDE_DIR}")

1
contrib/msgpack-c vendored Submodule

@ -0,0 +1 @@
Subproject commit 46684265d50b5d1b062d4c5c428ba08462844b1d

View File

@ -574,6 +574,8 @@ target_include_directories (clickhouse_common_io PUBLIC ${DBMS_INCLUDE_DIR})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${DOUBLE_CONVERSION_INCLUDE_DIR})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${MSGPACK_INCLUDE_DIR})
add_subdirectory (programs)
add_subdirectory (tests)

View File

@ -352,6 +352,7 @@ FormatFactory::FormatFactory()
registerOutputFormatProcessorAvro(*this);
registerInputFormatProcessorTemplate(*this);
registerOutputFormatProcessorTemplate(*this);
registerInputFormatProcessorMsgPack(*this);
registerOutputFormatProcessorMsgPack(*this);
registerFileSegmentationEngineTabSeparated(*this);

View File

@ -172,7 +172,8 @@ void registerInputFormatProcessorAvro(FormatFactory & factory);
void registerOutputFormatProcessorAvro(FormatFactory & factory);
void registerInputFormatProcessorTemplate(FormatFactory & factory);
void registerOutputFormatProcessorTemplate(FormatFactory & factory);
void registerOutputFormatProcessorMsgPack(FormatFactory &factory);
void registerInputFormatProcessorMsgPack(FormatFactory & factory);
void registerOutputFormatProcessorMsgPack(FormatFactory & factory);
/// File Segmentation Engines for parallel reading

View File

@ -0,0 +1,178 @@
#include <cstdlib>
#include <Processors/Formats/Impl/MsgPackRowInputFormat.h>
#include <Common/assert_cast.h>
#include <IO/ReadHelpers.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int INCORRECT_DATA;
}
MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: IRowInputFormat(header_, in_, std::move(params_)), data_types(header_.getDataTypes()) {}
bool MsgPackRowInputFormat::readObject()
{
if (in.eof() && unpacker.nonparsed_size() == 0)
return false;
while (!unpacker.next(object_handle))
{
if (in.eof())
throw Exception("Unexpected end of file while parsing MsgPack object.", ErrorCodes::INCORRECT_DATA);
unpacker.reserve_buffer(in.available());
memcpy(unpacker.buffer(), in.position(), in.available());
unpacker.buffer_consumed(in.available());
in.position() += in.available();
}
return true;
}
void MsgPackRowInputFormat::insertObject(IColumn & column, DataTypePtr data_type, const msgpack::object & object)
{
switch (data_type->getTypeId())
{
case TypeIndex::UInt8:
{
assert_cast<ColumnUInt8 &>(column).insertValue(object.as<uint8_t>());
return;
}
case TypeIndex::Date: [[fallthrough]];
case TypeIndex::UInt16:
{
assert_cast<ColumnUInt16 &>(column).insertValue(object.as<UInt16>());
return;
}
case TypeIndex::DateTime: [[fallthrough]];
case TypeIndex::UInt32:
{
assert_cast<ColumnUInt32 &>(column).insertValue(object.as<UInt32>());
return;
}
case TypeIndex::UInt64:
{
assert_cast<ColumnUInt64 &>(column).insertValue(object.as<UInt64>());
return;
}
case TypeIndex::Int8:
{
assert_cast<ColumnInt8 &>(column).insertValue(object.as<Int8>());
return;
}
case TypeIndex::Int16:
{
assert_cast<ColumnInt16 &>(column).insertValue(object.as<Int16>());
return;
}
case TypeIndex::Int32:
{
assert_cast<ColumnInt32 &>(column).insertValue(object.as<Int32>());
return;
}
case TypeIndex::Int64:
{
assert_cast<ColumnInt64 &>(column).insertValue(object.as<Int64>());
return;
}
case TypeIndex::Float32:
{
assert_cast<ColumnFloat32 &>(column).insertValue(object.as<Float32>());
return;
}
case TypeIndex::Float64:
{
assert_cast<ColumnFloat64 &>(column).insertValue(object.as<Float64>());
return;
}
case TypeIndex::DateTime64:
{
assert_cast<DataTypeDateTime64::ColumnType &>(column).insertValue(object.as<UInt64>());
return;
}
case TypeIndex::FixedString: [[fallthrough]];
case TypeIndex::String:
{
String str = object.as<String>();
column.insertData(str.data(), str.size());
return;
}
case TypeIndex::Array:
{
msgpack::object_array object_array = object.via.array;
auto nested_type = assert_cast<const DataTypeArray &>(*data_type).getNestedType();
ColumnArray & column_array = assert_cast<ColumnArray &>(column);
ColumnArray::Offsets & offsets = column_array.getOffsets();
IColumn & nested_column = column_array.getData();
for (size_t i = 0; i != object_array.size; ++i)
{
insertObject(nested_column, nested_type, object_array.ptr[i]);
}
offsets.push_back(offsets.back() + object_array.size);
return;
}
case TypeIndex::Nullable:
{
auto nested_type = removeNullable(data_type);
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(column);
if (object.type == msgpack::type::NIL)
column_nullable.insertDefault();
else
insertObject(column_nullable.getNestedColumn(), nested_type, object);
return;
}
case TypeIndex::Nothing:
{
// Nothing to insert, MsgPack object is nil.
return;
}
default:
break;
}
throw Exception("Type " + data_type->getName() + " is not supported for MsgPack input format", ErrorCodes::ILLEGAL_COLUMN);
}
bool MsgPackRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
size_t column_index = 0;
bool has_more_data = true;
for (; column_index != columns.size(); ++column_index)
{
has_more_data = readObject();
if (!has_more_data)
break;
insertObject(*columns[column_index], data_types[column_index], object_handle.get());
}
if (!has_more_data)
{
if (column_index != 0)
throw Exception("Not enough values to complete the row.", ErrorCodes::INCORRECT_DATA);
return false;
}
return true;
}
void registerInputFormatProcessorMsgPack(FormatFactory & factory) {
factory.registerInputFormatProcessor("MsgPack", [](
ReadBuffer &buf,
const Block &sample,
const RowInputFormatParams &params,
const FormatSettings &) {
return std::make_shared<MsgPackRowInputFormat>(sample, buf, params);
});
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Processors/Formats/IRowInputFormat.h>
#include <Formats/FormatFactory.h>
#include <msgpack.hpp>
namespace DB
{
class ReadBuffer;
class MsgPackRowInputFormat : public IRowInputFormat
{
public:
MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_);
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
String getName() const override { return "MagPackRowInputFormat"; }
private:
bool readObject();
void insertObject(IColumn & column, DataTypePtr type, const msgpack::object & object);
DataTypes data_types;
msgpack::unpacker unpacker;
msgpack::object_handle object_handle;
};
}

View File

@ -4,6 +4,7 @@
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
#include <Columns/ColumnArray.h>
@ -20,8 +21,8 @@ namespace ErrorCodes
extern const int ILLEGAL_COLUMN;
}
MsgPackRowOutputFormat::MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, callback), settings(settings_), packer(out_) {}
MsgPackRowOutputFormat::MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback)
: IRowOutputFormat(header_, out_, callback), packer(out_) {}
void MsgPackRowOutputFormat::serializeField(const IColumn & column, DataTypePtr data_type, size_t row_num)
{
@ -32,11 +33,13 @@ void MsgPackRowOutputFormat::serializeField(const IColumn & column, DataTypePtr
packer.pack_uint8(assert_cast<const ColumnUInt8 &>(column).getElement(row_num));
return;
}
case TypeIndex::Date: [[fallthrough]];
case TypeIndex::UInt16:
{
packer.pack_uint16(assert_cast<const ColumnUInt16 &>(column).getElement(row_num));
return;
}
case TypeIndex::DateTime: [[fallthrough]];
case TypeIndex::UInt32:
{
packer.pack_uint32(assert_cast<const ColumnUInt32 &>(column).getElement(row_num));
@ -77,19 +80,12 @@ void MsgPackRowOutputFormat::serializeField(const IColumn & column, DataTypePtr
packer.pack_double(assert_cast<const ColumnFloat64 &>(column).getElement(row_num));
return;
}
case TypeIndex::Date:
case TypeIndex::DateTime64:
{
packer.pack_uint16(assert_cast<const ColumnUInt16 &>(column).getElement(row_num));
return;
}
case TypeIndex::DateTime:
{
UInt32 datetime = assert_cast<const ColumnUInt32 &>(column).getElement(row_num);
// Timestamp extension type in MsgPack is -1.
packer.pack_ext(sizeof(datetime), -1);
packer.pack_ext_body(reinterpret_cast<const char *>(&datetime), sizeof(datetime));
packer.pack_uint64(assert_cast<const DataTypeDateTime64::ColumnType &>(column).getElement(row_num));
return;
}
case TypeIndex::FixedString: [[fallthrough]];
case TypeIndex::String:
{
const StringRef & string = assert_cast<const ColumnString &>(column).getDataAt(row_num);
@ -97,13 +93,6 @@ void MsgPackRowOutputFormat::serializeField(const IColumn & column, DataTypePtr
packer.pack_str_body(string.data, string.size);
return;
}
case TypeIndex::FixedString:
{
const StringRef & string = assert_cast<const ColumnString &>(column).getDataAt(row_num);
packer.pack_str(string.size);
packer.pack_str_body(string.data, string.size);
return;
}
case TypeIndex::Array:
{
auto nested_type = assert_cast<const DataTypeArray &>(*data_type).getNestedType();
@ -155,9 +144,9 @@ void registerOutputFormatProcessorMsgPack(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const FormatSettings & settings)
const FormatSettings &)
{
return std::make_shared<MsgPackRowOutputFormat>(buf, sample, callback, settings);
return std::make_shared<MsgPackRowOutputFormat>(buf, sample, callback);
});
}

View File

@ -13,7 +13,7 @@ namespace DB
class MsgPackRowOutputFormat : public IRowOutputFormat
{
public:
MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_);
MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback);
String getName() const override { return "MsgPackRowOutputFormat"; }
@ -22,7 +22,6 @@ public:
void serializeField(const IColumn & column, DataTypePtr data_type, size_t row_num);
private:
FormatSettings settings;
msgpack::packer<DB::WriteBuffer> packer;
};

View File

@ -0,0 +1,8 @@
255 65535 4294967295 100000000000 -128 -32768 -2147483648 -100000000000 2.02 10000.0000001 String 2021-12-19 2021-12-19 03:00:00 2021-12-19 03:00:00.000 [1,2,3,4,5]
4 1234 3244467295 500000000000 -1 -256 -14741221 -7000000000 100.1 14321.032141201 Another string 2024-10-04 2028-04-21 01:20:00 2021-12-19 03:14:51.123 [5,4,3,2,1]
42 42 42 42 42 42 42 42 42.42 42.42 42 1970-02-12 1970-01-01 03:00:42 1970-01-01 03:00:00.042 [42]
255 65535 4294967295 100000000000 -128 -32768 -2147483648 -100000000000 2.02 10000.0000001 String 2021-12-19 2021-12-19 03:00:00 2021-12-19 03:00:00.000 [1,2,3,4,5]
4 1234 3244467295 500000000000 -1 -256 -14741221 -7000000000 100.1 14321.032141201 Another string 2024-10-04 2028-04-21 01:20:00 2021-12-19 03:14:51.123 [5,4,3,2,1]
42 42 42 42 42 42 42 42 42.42 42.42 42 1970-02-12 1970-01-01 03:00:42 1970-01-01 03:00:00.042 [42]
[[1,2,3],[1001,2002],[3167]] [[['one'],['two']],[['three']],[['four'],['five']]]
[[1,2,3],[1001,2002],[3167]] [[['one'],['two']],[['three']],[['four'],['five']]]

View File

@ -0,0 +1,31 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS msgpack";
$CLICKHOUSE_CLIENT --query="CREATE TABLE msgpack (uint8 UInt8, uint16 UInt16, uint32 UInt32, uint64 UInt64, int8 Int8, int16 Int16, int32 Int32, int64 Int64, float Float32, double Float64, string String, date Date, datetime DateTime, datetime64 DateTime64, array Array(UInt32)) ENGINE = Memory";
$CLICKHOUSE_CLIENT --query="INSERT INTO msgpack VALUES (255, 65535, 4294967295, 100000000000, -128, -32768, -2147483648, -100000000000, 2.02, 10000.0000001, 'String', 18980, 1639872000, 1639872000000, [1,2,3,4,5]), (4, 1234, 3244467295, 500000000000, -1, -256, -14741221, -7000000000, 100.1, 14321.032141201, 'Another string', 20000, 1839882000, 1639872891123, [5,4,3,2,1]),(42, 42, 42, 42, 42, 42, 42, 42, 42.42, 42.42, '42', 42, 42, 42, [42])";
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack FORMAT MsgPack" > $CURDIR/data_msgpack/all_types.msgpk;
cat $CURDIR/data_msgpack/all_types.msgpk | $CLICKHOUSE_CLIENT --query="INSERT INTO msgpack FORMAT MsgPack";
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack";
$CLICKHOUSE_CLIENT --query="DROP TABLE msgpack";
$CLICKHOUSE_CLIENT --query="CREATE TABLE msgpack (array1 Array(Array(UInt32)), array2 Array(Array(Array(String)))) ENGINE = Memory";
$CLICKHOUSE_CLIENT --query="INSERT INTO msgpack VALUES ([[1,2,3], [1001, 2002], [3167]], [[['one'], ['two']], [['three']],[['four'], ['five']]])";
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack FORMAT MsgPack" > $CURDIR/data_msgpack/nested_arrays.msgpk;
cat $CURDIR/data_msgpack/nested_arrays.msgpk | $CLICKHOUSE_CLIENT --query="INSERT INTO msgpack FORMAT MsgPack";
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack";
$CLICKHOUSE_CLIENT --query="DROP TABLE msgpack";

View File

@ -0,0 +1 @@
““’ÍéÍÒ‘Í _“£one£two¥three¤four¤five