Merge pull request #9889 from Avogar/msgpack_format

Msgpack format
This commit is contained in:
alexey-milovidov 2020-04-10 00:07:10 +03:00 committed by GitHub
commit 4a73fe7477
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 481 additions and 1 deletions

3
.gitmodules vendored
View File

@ -151,3 +151,6 @@
[submodule "website/images/feathericons"]
path = website/images/feathericons
url = https://github.com/feathericons/feather
[submodule "contrib/msgpack-c"]
path = contrib/msgpack-c
url = https://github.com/msgpack/msgpack-c

View File

@ -343,6 +343,7 @@ include (cmake/find/rapidjson.cmake)
include (cmake/find/fastops.cmake)
include (cmake/find/orc.cmake)
include (cmake/find/avro.cmake)
include (cmake/find/msgpack.cmake)
find_contrib_lib(cityhash)
find_contrib_lib(farmhash)

2
cmake/find/msgpack.cmake Normal file
View File

@ -0,0 +1,2 @@
set(MSGPACK_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/msgpack-c/include)
message(STATUS "Using msgpack: ${MSGPACK_INCLUDE_DIR}")

1
contrib/msgpack-c vendored Submodule

@ -0,0 +1 @@
Subproject commit 46684265d50b5d1b062d4c5c428ba08462844b1d

View File

@ -574,6 +574,8 @@ target_include_directories (clickhouse_common_io PUBLIC ${DBMS_INCLUDE_DIR})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${DOUBLE_CONVERSION_INCLUDE_DIR})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${MSGPACK_INCLUDE_DIR})
if (ENABLE_TESTS AND USE_GTEST)
macro (grep_gtest_sources BASE_DIR DST_VAR)
# Cold match files that are not in tests/ directories

View File

@ -356,6 +356,8 @@ FormatFactory::FormatFactory()
registerInputFormatProcessorTemplate(*this);
registerOutputFormatProcessorTemplate(*this);
registerInputFormatProcessorRegexp(*this);
registerInputFormatProcessorMsgPack(*this);
registerOutputFormatProcessorMsgPack(*this);
registerFileSegmentationEngineTabSeparated(*this);
registerFileSegmentationEngineCSV(*this);

View File

@ -171,7 +171,9 @@ void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
void registerInputFormatProcessorAvro(FormatFactory & factory);
void registerOutputFormatProcessorAvro(FormatFactory & factory);
void registerInputFormatProcessorTemplate(FormatFactory & factory);
void registerOutputFormatProcessorTemplate(FormatFactory &factory);
void registerOutputFormatProcessorTemplate(FormatFactory & factory);
void registerInputFormatProcessorMsgPack(FormatFactory & factory);
void registerOutputFormatProcessorMsgPack(FormatFactory & factory);
/// File Segmentation Engines for parallel reading

View File

@ -0,0 +1,193 @@
#include <cstdlib>
#include <Processors/Formats/Impl/MsgPackRowInputFormat.h>
#include <Common/assert_cast.h>
#include <IO/ReadHelpers.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int INCORRECT_DATA;
}
MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: IRowInputFormat(header_, in_, std::move(params_)), buf(in_), data_types(header_.getDataTypes()) {}
bool MsgPackRowInputFormat::readObject()
{
if (buf.eof())
return false;
PeekableReadBufferCheckpoint checkpoint{buf};
size_t offset = 0;
bool need_more_data = true;
while (need_more_data)
{
offset = 0;
try
{
object_handle = msgpack::unpack(buf.position(), buf.buffer().end() - buf.position(), offset);
need_more_data = false;
}
catch (msgpack::insufficient_bytes &)
{
buf.position() = buf.buffer().end();
if (buf.eof())
throw Exception("Unexpected end of file while parsing msgpack object.", ErrorCodes::INCORRECT_DATA);
buf.position() = buf.buffer().end();
buf.makeContinuousMemoryFromCheckpointToPos();
buf.rollbackToCheckpoint();
}
}
buf.position() += offset;
return true;
}
void MsgPackRowInputFormat::insertObject(IColumn & column, DataTypePtr data_type, const msgpack::object & object)
{
switch (data_type->getTypeId())
{
case TypeIndex::UInt8:
{
assert_cast<ColumnUInt8 &>(column).insertValue(object.as<uint8_t>());
return;
}
case TypeIndex::Date: [[fallthrough]];
case TypeIndex::UInt16:
{
assert_cast<ColumnUInt16 &>(column).insertValue(object.as<UInt16>());
return;
}
case TypeIndex::DateTime: [[fallthrough]];
case TypeIndex::UInt32:
{
assert_cast<ColumnUInt32 &>(column).insertValue(object.as<UInt32>());
return;
}
case TypeIndex::UInt64:
{
assert_cast<ColumnUInt64 &>(column).insertValue(object.as<UInt64>());
return;
}
case TypeIndex::Int8:
{
assert_cast<ColumnInt8 &>(column).insertValue(object.as<Int8>());
return;
}
case TypeIndex::Int16:
{
assert_cast<ColumnInt16 &>(column).insertValue(object.as<Int16>());
return;
}
case TypeIndex::Int32:
{
assert_cast<ColumnInt32 &>(column).insertValue(object.as<Int32>());
return;
}
case TypeIndex::Int64:
{
assert_cast<ColumnInt64 &>(column).insertValue(object.as<Int64>());
return;
}
case TypeIndex::Float32:
{
assert_cast<ColumnFloat32 &>(column).insertValue(object.as<Float32>());
return;
}
case TypeIndex::Float64:
{
assert_cast<ColumnFloat64 &>(column).insertValue(object.as<Float64>());
return;
}
case TypeIndex::DateTime64:
{
assert_cast<DataTypeDateTime64::ColumnType &>(column).insertValue(object.as<UInt64>());
return;
}
case TypeIndex::FixedString: [[fallthrough]];
case TypeIndex::String:
{
String str = object.as<String>();
column.insertData(str.data(), str.size());
return;
}
case TypeIndex::Array:
{
msgpack::object_array object_array = object.via.array;
auto nested_type = assert_cast<const DataTypeArray &>(*data_type).getNestedType();
ColumnArray & column_array = assert_cast<ColumnArray &>(column);
ColumnArray::Offsets & offsets = column_array.getOffsets();
IColumn & nested_column = column_array.getData();
for (size_t i = 0; i != object_array.size; ++i)
{
insertObject(nested_column, nested_type, object_array.ptr[i]);
}
offsets.push_back(offsets.back() + object_array.size);
return;
}
case TypeIndex::Nullable:
{
auto nested_type = removeNullable(data_type);
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(column);
if (object.type == msgpack::type::NIL)
column_nullable.insertDefault();
else
insertObject(column_nullable.getNestedColumn(), nested_type, object);
return;
}
case TypeIndex::Nothing:
{
// Nothing to insert, MsgPack object is nil.
return;
}
default:
break;
}
throw Exception("Type " + data_type->getName() + " is not supported for MsgPack input format", ErrorCodes::ILLEGAL_COLUMN);
}
bool MsgPackRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
size_t column_index = 0;
bool has_more_data = true;
for (; column_index != columns.size(); ++column_index)
{
has_more_data = readObject();
if (!has_more_data)
break;
insertObject(*columns[column_index], data_types[column_index], object_handle.get());
}
if (!has_more_data)
{
if (column_index != 0)
throw Exception("Not enough values to complete the row.", ErrorCodes::INCORRECT_DATA);
return false;
}
return true;
}
void registerInputFormatProcessorMsgPack(FormatFactory & factory)
{
factory.registerInputFormatProcessor("MsgPack", [](
ReadBuffer & buf,
const Block & sample,
const RowInputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<MsgPackRowInputFormat>(sample, buf, params);
});
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <Processors/Formats/IRowInputFormat.h>
#include <Formats/FormatFactory.h>
#include <IO/PeekableReadBuffer.h>
#include <msgpack.hpp>
namespace DB
{
class ReadBuffer;
class MsgPackRowInputFormat : public IRowInputFormat
{
public:
MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_);
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
String getName() const override { return "MagPackRowInputFormat"; }
private:
bool readObject();
void insertObject(IColumn & column, DataTypePtr type, const msgpack::object & object);
PeekableReadBuffer buf;
DataTypes data_types;
msgpack::object_handle object_handle;
};
}

View File

@ -0,0 +1,153 @@
#include <Processors/Formats/Impl/MsgPackRowOutputFormat.h>
#include <Formats/FormatFactory.h>
#include <Common/assert_cast.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
}
MsgPackRowOutputFormat::MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback)
: IRowOutputFormat(header_, out_, callback), packer(out_) {}
void MsgPackRowOutputFormat::serializeField(const IColumn & column, DataTypePtr data_type, size_t row_num)
{
switch (data_type->getTypeId())
{
case TypeIndex::UInt8:
{
packer.pack_uint8(assert_cast<const ColumnUInt8 &>(column).getElement(row_num));
return;
}
case TypeIndex::Date: [[fallthrough]];
case TypeIndex::UInt16:
{
packer.pack_uint16(assert_cast<const ColumnUInt16 &>(column).getElement(row_num));
return;
}
case TypeIndex::DateTime: [[fallthrough]];
case TypeIndex::UInt32:
{
packer.pack_uint32(assert_cast<const ColumnUInt32 &>(column).getElement(row_num));
return;
}
case TypeIndex::UInt64:
{
packer.pack_uint64(assert_cast<const ColumnUInt64 &>(column).getElement(row_num));
return;
}
case TypeIndex::Int8:
{
packer.pack_int8(assert_cast<const ColumnInt8 &>(column).getElement(row_num));
return;
}
case TypeIndex::Int16:
{
packer.pack_int16(assert_cast<const ColumnInt16 &>(column).getElement(row_num));
return;
}
case TypeIndex::Int32:
{
packer.pack_int32(assert_cast<const ColumnInt32 &>(column).getElement(row_num));
return;
}
case TypeIndex::Int64:
{
packer.pack_int64(assert_cast<const ColumnInt64 &>(column).getElement(row_num));
return;
}
case TypeIndex::Float32:
{
packer.pack_float(assert_cast<const ColumnFloat32 &>(column).getElement(row_num));
return;
}
case TypeIndex::Float64:
{
packer.pack_double(assert_cast<const ColumnFloat64 &>(column).getElement(row_num));
return;
}
case TypeIndex::DateTime64:
{
packer.pack_uint64(assert_cast<const DataTypeDateTime64::ColumnType &>(column).getElement(row_num));
return;
}
case TypeIndex::FixedString: [[fallthrough]];
case TypeIndex::String:
{
const StringRef & string = assert_cast<const ColumnString &>(column).getDataAt(row_num);
packer.pack_str(string.size);
packer.pack_str_body(string.data, string.size);
return;
}
case TypeIndex::Array:
{
auto nested_type = assert_cast<const DataTypeArray &>(*data_type).getNestedType();
const ColumnArray & column_array = assert_cast<const ColumnArray &>(column);
const IColumn & nested_column = column_array.getData();
const ColumnArray::Offsets & offsets = column_array.getOffsets();
size_t offset = offsets[row_num - 1];
size_t size = offsets[row_num] - offset;
packer.pack_array(size);
for (size_t i = 0; i < size; ++i)
{
serializeField(nested_column, nested_type, offset + i);
}
return;
}
case TypeIndex::Nullable:
{
auto nested_type = removeNullable(data_type);
const ColumnNullable & column_nullable = assert_cast<const ColumnNullable &>(column);
if (!column_nullable.isNullAt(row_num))
serializeField(column_nullable.getNestedColumn(), nested_type, row_num);
else
packer.pack_nil();
return;
}
case TypeIndex::Nothing:
{
packer.pack_nil();
return;
}
default:
break;
}
throw Exception("Type " + data_type->getName() + " is not supported for MsgPack output format", ErrorCodes::ILLEGAL_COLUMN);
}
void MsgPackRowOutputFormat::write(const Columns & columns, size_t row_num)
{
size_t num_columns = columns.size();
for (size_t i = 0; i < num_columns; ++i)
{
serializeField(*columns[i], types[i], row_num);
}
}
void registerOutputFormatProcessorMsgPack(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("MsgPack", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const FormatSettings &)
{
return std::make_shared<MsgPackRowOutputFormat>(buf, sample, callback);
});
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Formats/FormatSettings.h>
#include <msgpack.hpp>
namespace DB
{
class MsgPackRowOutputFormat : public IRowOutputFormat
{
public:
MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback);
String getName() const override { return "MsgPackRowOutputFormat"; }
void write(const Columns & columns, size_t row_num) override;
void writeField(const IColumn &, const IDataType &, size_t) override {}
void serializeField(const IColumn & column, DataTypePtr data_type, size_t row_num);
private:
msgpack::packer<DB::WriteBuffer> packer;
};
}

View File

@ -0,0 +1,10 @@
255 65535 4294967295 100000000000 -128 -32768 -2147483648 -100000000000 2.02 10000.0000001 String 2021-12-19 2021-12-19 03:00:00 2021-12-19 03:00:00.000 [1,2,3,4,5]
4 1234 3244467295 500000000000 -1 -256 -14741221 -7000000000 100.1 14321.032141201 Another string 2024-10-04 2028-04-21 01:20:00 2021-12-19 03:14:51.123 [5,4,3,2,1]
42 42 42 42 42 42 42 42 42.42 42.42 42 1970-02-12 1970-01-01 03:00:42 1970-01-01 03:00:00.042 [42]
255 65535 4294967295 100000000000 -128 -32768 -2147483648 -100000000000 2.02 10000.0000001 String 2021-12-19 2021-12-19 03:00:00 2021-12-19 03:00:00.000 [1,2,3,4,5]
4 1234 3244467295 500000000000 -1 -256 -14741221 -7000000000 100.1 14321.032141201 Another string 2024-10-04 2028-04-21 01:20:00 2021-12-19 03:14:51.123 [5,4,3,2,1]
42 42 42 42 42 42 42 42 42.42 42.42 42 1970-02-12 1970-01-01 03:00:42 1970-01-01 03:00:00.042 [42]
[[1,2,3],[1001,2002],[3167]] [[['one'],['two']],[['three']],[['four'],['five']]]
[[1,2,3],[1001,2002],[3167]] [[['one'],['two']],[['three']],[['four'],['five']]]
[0,1,2,3,42,253,254,255]
[255,254,253,42,3,2,1,0]

View File

@ -0,0 +1,54 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS msgpack";
$CLICKHOUSE_CLIENT --query="CREATE TABLE msgpack (uint8 UInt8, uint16 UInt16, uint32 UInt32, uint64 UInt64, int8 Int8, int16 Int16, int32 Int32, int64 Int64, float Float32, double Float64, string String, date Date, datetime DateTime, datetime64 DateTime64, array Array(UInt32)) ENGINE = Memory";
$CLICKHOUSE_CLIENT --query="INSERT INTO msgpack VALUES (255, 65535, 4294967295, 100000000000, -128, -32768, -2147483648, -100000000000, 2.02, 10000.0000001, 'String', 18980, 1639872000, 1639872000000, [1,2,3,4,5]), (4, 1234, 3244467295, 500000000000, -1, -256, -14741221, -7000000000, 100.1, 14321.032141201, 'Another string', 20000, 1839882000, 1639872891123, [5,4,3,2,1]),(42, 42, 42, 42, 42, 42, 42, 42, 42.42, 42.42, '42', 42, 42, 42, [42])";
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack FORMAT MsgPack" > $CURDIR/tmp_msgpac_test_all_types.msgpk;
cat $CURDIR/tmp_msgpac_test_all_types.msgpk | $CLICKHOUSE_CLIENT --query="INSERT INTO msgpack FORMAT MsgPack";
rm $CURDIR/tmp_msgpac_test_all_types.msgpk
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack";
$CLICKHOUSE_CLIENT --query="DROP TABLE msgpack";
$CLICKHOUSE_CLIENT --query="CREATE TABLE msgpack (array1 Array(Array(UInt32)), array2 Array(Array(Array(String)))) ENGINE = Memory";
$CLICKHOUSE_CLIENT --query="INSERT INTO msgpack VALUES ([[1,2,3], [1001, 2002], [3167]], [[['one'], ['two']], [['three']],[['four'], ['five']]])";
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack FORMAT MsgPack" > $CURDIR/tmp_msgpack_test_nested_arrays.msgpk;
cat $CURDIR/tmp_msgpack_test_nested_arrays.msgpk | $CLICKHOUSE_CLIENT --query="INSERT INTO msgpack FORMAT MsgPack";
rm $CURDIR/tmp_msgpack_test_nested_arrays.msgpk;
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack";
$CLICKHOUSE_CLIENT --query="DROP TABLE msgpack";
$CLICKHOUSE_CLIENT --query="CREATE TABLE msgpack (array Array(UInt8)) ENGINE = Memory";
$CLICKHOUSE_CLIENT --query="INSERT INTO msgpack VALUES ([0, 1, 2, 3, 42, 253, 254, 255]), ([255, 254, 253, 42, 3, 2, 1, 0])";
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack FORMAT MsgPack" > $CURDIR/tmp_msgpack_type_conversion.msgpk;
$CLICKHOUSE_CLIENT --query="DROP TABLE msgpack";
$CLICKHOUSE_CLIENT --query="CREATE TABLE msgpack (array Array(Int64)) ENGINE = Memory";
cat $CURDIR/tmp_msgpack_type_conversion.msgpk | $CLICKHOUSE_CLIENT --query="INSERT INTO msgpack FORMAT MsgPack";
rm $CURDIR/tmp_msgpack_type_conversion.msgpk;
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack";
$CLICKHOUSE_CLIENT --query="DROP TABLE msgpack";