Merge pull request #4174 from vitlibar/add-protobuf-input-format

Add protobuf input format.
This commit is contained in:
Vitaly Baranov 2019-02-21 00:59:13 +03:00 committed by GitHub
commit 31039056f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 2684 additions and 89 deletions

View File

@ -966,8 +966,12 @@ private:
/// Data format can be specified in the INSERT query.
if (ASTInsertQuery * insert = typeid_cast<ASTInsertQuery *>(&*parsed_query))
{
if (!insert->format.empty())
current_format = insert->format;
if (insert->settings_ast)
InterpreterSetQuery(insert->settings_ast, context).executeForCurrentContext();
}
BlockInputStreamPtr block_input = context.getInputFormat(
current_format, buf, sample, insert_format_max_block_size);

View File

@ -410,13 +410,15 @@ namespace ErrorCodes
extern const int ILLEGAL_CODEC_PARAMETER = 433;
extern const int CANNOT_PARSE_PROTOBUF_SCHEMA = 434;
extern const int NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD = 435;
extern const int CANNOT_CONVERT_TO_PROTOBUF_TYPE = 436;
extern const int PROTOBUF_BAD_CAST = 436;
extern const int PROTOBUF_FIELD_NOT_REPEATED = 437;
extern const int DATA_TYPE_CANNOT_BE_PROMOTED = 438;
extern const int CANNOT_SCHEDULE_TASK = 439;
extern const int INVALID_LIMIT_EXPRESSION = 440;
extern const int CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING = 441;
extern const int BAD_DATABASE_FOR_TEMPORARY_TABLE = 442;
extern const int NO_COMMON_COLUMNS_WITH_PROTOBUF_SCHEMA = 443;
extern const int UNKNOWN_PROTOBUF_FORMAT = 444;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -1,5 +1,6 @@
#include <Parsers/ASTInsertQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSetQuery.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <DataStreams/BlockIO.h>
@ -17,7 +18,7 @@ namespace ErrorCodes
InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context)
const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, Context & context)
{
const ASTInsertQuery * ast_insert_query = dynamic_cast<const ASTInsertQuery *>(ast.get());
@ -27,6 +28,8 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
String format = ast_insert_query->format;
if (format.empty())
format = "Values";
if (ast_insert_query->settings_ast)
InterpreterSetQuery(ast_insert_query->settings_ast, context).executeForCurrentContext();
/// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query.

View File

@ -19,7 +19,7 @@ class Context;
class InputStreamFromASTInsertQuery : public IBlockInputStream
{
public:
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context);
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, Context & context);
Block readImpl() override { return res_stream->read(); }
void readPrefixImpl() override { return res_stream->readPrefix(); }

View File

@ -9,6 +9,7 @@
#include <Common/AlignedBuffer.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeFactory.h>
@ -254,6 +255,36 @@ void DataTypeAggregateFunction::serializeProtobuf(const IColumn & column, size_t
protobuf.writeAggregateFunction(function, static_cast<const ColumnAggregateFunction &>(column).getData()[row_num]);
}
void DataTypeAggregateFunction::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
ColumnAggregateFunction & column_concrete = static_cast<ColumnAggregateFunction &>(column);
Arena & arena = column_concrete.createOrGetArena();
size_t size_of_state = function->sizeOfData();
AggregateDataPtr place = arena.alignedAlloc(size_of_state, function->alignOfData());
function->create(place);
try
{
if (!protobuf.readAggregateFunction(function, place, arena))
{
function->destroy(place);
return;
}
auto & container = column_concrete.getData();
if (allow_add_row)
{
container.emplace_back(place);
row_added = true;
}
else
container.back() = place;
}
catch (...)
{
function->destroy(place);
throw;
}
}
MutableColumnPtr DataTypeAggregateFunction::createColumn() const
{

View File

@ -57,6 +57,7 @@ public:
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;

View File

@ -6,6 +6,7 @@
#include <IO/WriteBufferFromString.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufReader.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeFactory.h>
@ -442,6 +443,36 @@ void DataTypeArray::serializeProtobuf(const IColumn & column, size_t row_num, Pr
}
void DataTypeArray::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
ColumnArray & column_array = static_cast<ColumnArray &>(column);
IColumn & nested_column = column_array.getData();
ColumnArray::Offsets & offsets = column_array.getOffsets();
size_t old_size = offsets.size();
try
{
bool nested_row_added;
do
nested->deserializeProtobuf(nested_column, protobuf, true, nested_row_added);
while (nested_row_added && protobuf.maybeCanReadValue());
if (allow_add_row)
{
offsets.emplace_back(nested_column.size());
row_added = true;
}
else
offsets.back() = nested_column.size();
}
catch (...)
{
offsets.resize_assume_reserved(old_size);
nested_column.popBack(nested_column.size() - offsets.back());
throw;
}
}
MutableColumnPtr DataTypeArray::createColumn() const
{
return ColumnArray::create(nested->createColumn(), ColumnArray::ColumnOffsets::create());

View File

@ -87,6 +87,10 @@ public:
void serializeProtobuf(const IColumn & column,
size_t row_num,
ProtobufWriter & protobuf) const override;
void deserializeProtobuf(IColumn & column,
ProtobufReader & protobuf,
bool allow_add_row,
bool & row_added) const override;
MutableColumnPtr createColumn() const override;

View File

@ -4,6 +4,7 @@
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
@ -78,6 +79,23 @@ void DataTypeDate::serializeProtobuf(const IColumn & column, size_t row_num, Pro
protobuf.writeDate(DayNum(static_cast<const ColumnUInt16 &>(column).getData()[row_num]));
}
void DataTypeDate::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
DayNum d;
if (!protobuf.readDate(d))
return;
auto & container = static_cast<ColumnUInt16 &>(column).getData();
if (allow_add_row)
{
container.emplace_back(d);
row_added = true;
}
else
container.back() = d;
}
bool DataTypeDate::equals(const IDataType & rhs) const
{
return typeid(rhs) == typeid(*this);

View File

@ -22,6 +22,7 @@ public:
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
bool canBeUsedAsVersion() const override { return true; }
bool canBeInsideNullable() const override { return true; }

View File

@ -6,6 +6,7 @@
#include <Common/typeid_cast.h>
#include <Columns/ColumnsNumber.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeFactory.h>
@ -144,6 +145,23 @@ void DataTypeDateTime::serializeProtobuf(const IColumn & column, size_t row_num,
protobuf.writeDateTime(static_cast<const ColumnUInt32 &>(column).getData()[row_num]);
}
void DataTypeDateTime::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
time_t t;
if (!protobuf.readDateTime(t))
return;
auto & container = static_cast<ColumnUInt32 &>(column).getData();
if (allow_add_row)
{
container.emplace_back(t);
row_added = true;
}
else
container.back() = t;
}
bool DataTypeDateTime::equals(const IDataType & rhs) const
{
/// DateTime with different timezones are equal, because:

View File

@ -47,6 +47,7 @@ public:
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
bool canBeUsedAsVersion() const override { return true; }
bool canBeInsideNullable() const override { return true; }

View File

@ -1,5 +1,6 @@
#include <IO/WriteBufferFromString.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeFactory.h>
@ -230,6 +231,25 @@ void DataTypeEnum<Type>::serializeProtobuf(const IColumn & column, size_t row_nu
protobuf.writeEnum(static_cast<const ColumnType &>(column).getData()[row_num]);
}
template<typename Type>
void DataTypeEnum<Type>::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
protobuf.prepareEnumMapping(values);
row_added = false;
Type value;
if (!protobuf.readEnum(value))
return;
auto & container = static_cast<ColumnType &>(column).getData();
if (allow_add_row)
{
container.emplace_back(value);
row_added = true;
}
else
container.back() = value;
}
template <typename Type>
Field DataTypeEnum<Type>::getDefault() const
{

View File

@ -106,6 +106,7 @@ public:
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, const size_t limit, const double avg_value_size_hint) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override { return ColumnType::create(); }

View File

@ -3,6 +3,7 @@
#include <Columns/ColumnConst.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeFactory.h>
@ -122,30 +123,37 @@ void DataTypeFixedString::serializeTextEscaped(const IColumn & column, size_t ro
}
static inline void alignStringLength(const DataTypeFixedString & type,
ColumnFixedString::Chars & data,
size_t string_start)
{
size_t length = data.size() - string_start;
if (length < type.getN())
{
data.resize_fill(string_start + type.getN());
}
else if (length > type.getN())
{
data.resize_assume_reserved(string_start);
throw Exception("Too large value for " + type.getName(), ErrorCodes::TOO_LARGE_STRING_SIZE);
}
}
template <typename Reader>
static inline void read(const DataTypeFixedString & self, IColumn & column, Reader && reader)
{
ColumnFixedString::Chars & data = typeid_cast<ColumnFixedString &>(column).getChars();
size_t prev_size = data.size();
try
{
reader(data);
alignStringLength(self, data, prev_size);
}
catch (...)
{
data.resize_assume_reserved(prev_size);
throw;
}
if (data.size() < prev_size + self.getN())
data.resize_fill(prev_size + self.getN());
if (data.size() > prev_size + self.getN())
{
data.resize_assume_reserved(prev_size);
throw Exception("Too large value for " + self.getName(), ErrorCodes::TOO_LARGE_STRING_SIZE);
}
}
@ -208,6 +216,44 @@ void DataTypeFixedString::serializeProtobuf(const IColumn & column, size_t row_n
}
void DataTypeFixedString::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
auto & column_string = static_cast<ColumnFixedString &>(column);
ColumnFixedString::Chars & data = column_string.getChars();
size_t old_size = data.size();
try
{
if (allow_add_row)
{
if (protobuf.readStringInto(data))
{
alignStringLength(*this, data, old_size);
row_added = true;
}
else
data.resize_assume_reserved(old_size);
}
else
{
ColumnFixedString::Chars temp_data;
if (protobuf.readStringInto(temp_data))
{
alignStringLength(*this, temp_data, 0);
column_string.popBack(1);
old_size = data.size();
data.insertSmallAllowReadWriteOverflow15(temp_data.begin(), temp_data.end());
}
}
}
catch (...)
{
data.resize_assume_reserved(old_size);
throw;
}
}
MutableColumnPtr DataTypeFixedString::createColumn() const
{
return ColumnFixedString::create(n);

View File

@ -65,6 +65,7 @@ public:
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;

View File

@ -729,27 +729,43 @@ void DataTypeLowCardinality::deserializeBinary(Field & field, ReadBuffer & istr)
dictionary_type->deserializeBinary(field, istr);
}
template <typename OutputStream, typename ... Args>
void DataTypeLowCardinality::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
if (allow_add_row)
{
deserializeImpl(column, &IDataType::deserializeProtobuf, protobuf, true, row_added);
return;
}
row_added = false;
auto & low_cardinality_column= getColumnLowCardinality(column);
auto nested_column = low_cardinality_column.getDictionary().getNestedColumn();
auto temp_column = nested_column->cloneEmpty();
size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(low_cardinality_column.size() - 1);
temp_column->insertFrom(*nested_column, unique_row_number);
bool dummy;
dictionary_type.get()->deserializeProtobuf(*temp_column, protobuf, false, dummy);
low_cardinality_column.popBack(1);
low_cardinality_column.insertFromFullColumn(*temp_column, 0);
}
template <typename... Params, typename... Args>
void DataTypeLowCardinality::serializeImpl(
const IColumn & column, size_t row_num,
DataTypeLowCardinality::SerializeFunctionPtr<OutputStream, Args ...> func,
OutputStream & ostr, Args & ... args) const
const IColumn & column, size_t row_num, DataTypeLowCardinality::SerializeFunctionPtr<Params...> func, Args &&... args) const
{
auto & low_cardinality_column = getColumnLowCardinality(column);
size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(row_num);
(dictionary_type.get()->*func)(*low_cardinality_column.getDictionary().getNestedColumn(), unique_row_number, ostr, std::forward<Args>(args)...);
(dictionary_type.get()->*func)(*low_cardinality_column.getDictionary().getNestedColumn(), unique_row_number, std::forward<Args>(args)...);
}
template <typename ... Args>
template <typename... Params, typename... Args>
void DataTypeLowCardinality::deserializeImpl(
IColumn & column,
DataTypeLowCardinality::DeserializeFunctionPtr<Args ...> func,
ReadBuffer & istr, Args & ... args) const
IColumn & column, DataTypeLowCardinality::DeserializeFunctionPtr<Params...> func, Args &&... args) const
{
auto & low_cardinality_column= getColumnLowCardinality(column);
auto temp_column = low_cardinality_column.getDictionary().getNestedColumn()->cloneEmpty();
(dictionary_type.get()->*func)(*temp_column, istr, std::forward<Args>(args)...);
(dictionary_type.get()->*func)(*temp_column, std::forward<Args>(args)...);
low_cardinality_column.insertFromFullColumn(*temp_column, 0);
}

View File

@ -115,6 +115,8 @@ public:
serializeImpl(column, row_num, &IDataType::serializeProtobuf, protobuf);
}
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;
Field getDefault() const override { return dictionary_type->getDefault(); }
@ -148,19 +150,17 @@ public:
private:
template <typename OutputStream, typename ... Args>
using SerializeFunctionPtr = void (IDataType::*)(const IColumn &, size_t, OutputStream &, Args & ...) const;
template <typename ... Params>
using SerializeFunctionPtr = void (IDataType::*)(const IColumn &, size_t, Params ...) const;
template <typename OutputStream, typename ... Args>
void serializeImpl(const IColumn & column, size_t row_num, SerializeFunctionPtr<OutputStream, Args ...> func,
OutputStream & ostr, Args & ... args) const;
template <typename... Params, typename... Args>
void serializeImpl(const IColumn & column, size_t row_num, SerializeFunctionPtr<Params...> func, Args &&... args) const;
template <typename ... Args>
using DeserializeFunctionPtr = void (IDataType::*)(IColumn &, ReadBuffer &, Args & ...) const;
template <typename ... Params>
using DeserializeFunctionPtr = void (IDataType::*)(IColumn &, Params ...) const;
template <typename ... Args>
void deserializeImpl(IColumn & column, DeserializeFunctionPtr<Args ...> func,
ReadBuffer & istr, Args & ... args) const;
template <typename ... Params, typename... Args>
void deserializeImpl(IColumn & column, DeserializeFunctionPtr<Params...> func, Args &&... args) const;
template <typename Creator>
static MutableColumnUniquePtr createColumnUniqueImpl(const IDataType & keys_type, const Creator & creator);

View File

@ -318,6 +318,26 @@ void DataTypeNullable::serializeProtobuf(const IColumn & column, size_t row_num,
nested_data_type->serializeProtobuf(col.getNestedColumn(), row_num, protobuf);
}
void DataTypeNullable::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
ColumnNullable & col = static_cast<ColumnNullable &>(column);
IColumn & nested_column = col.getNestedColumn();
size_t old_size = nested_column.size();
try
{
nested_data_type->deserializeProtobuf(nested_column, protobuf, allow_add_row, row_added);
if (row_added)
col.getNullMapData().push_back(0);
}
catch (...)
{
nested_column.popBack(nested_column.size() - old_size);
col.getNullMapData().resize_assume_reserved(old_size);
row_added = false;
throw;
}
}
MutableColumnPtr DataTypeNullable::createColumn() const
{
return ColumnNullable::create(nested_data_type->createColumn(), ColumnUInt8::create());

View File

@ -71,6 +71,7 @@ public:
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;

View File

@ -7,6 +7,7 @@
#include <Common/NaNUtils.h>
#include <Common/typeid_cast.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
@ -210,6 +211,25 @@ void DataTypeNumberBase<T>::serializeProtobuf(const IColumn & column, size_t row
}
template <typename T>
void DataTypeNumberBase<T>::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
T value;
if (!protobuf.readNumber(value))
return;
auto & container = typeid_cast<ColumnVector<T> &>(column).getData();
if (allow_add_row)
{
container.emplace_back(value);
row_added = true;
}
else
container.back() = value;
}
template <typename T>
MutableColumnPtr DataTypeNumberBase<T>::createColumn() const
{

View File

@ -37,6 +37,7 @@ public:
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;

View File

@ -7,6 +7,7 @@
#include <Common/typeid_cast.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFactory.h>
@ -237,10 +238,8 @@ static inline void read(IColumn & column, Reader && reader)
ColumnString & column_string = static_cast<ColumnString &>(column);
ColumnString::Chars & data = column_string.getChars();
ColumnString::Offsets & offsets = column_string.getOffsets();
size_t old_chars_size = data.size();
size_t old_offsets_size = offsets.size();
try
{
reader(data);
@ -310,6 +309,48 @@ void DataTypeString::serializeProtobuf(const IColumn & column, size_t row_num, P
}
void DataTypeString::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
auto & column_string = static_cast<ColumnString &>(column);
ColumnString::Chars & data = column_string.getChars();
ColumnString::Offsets & offsets = column_string.getOffsets();
size_t old_size = offsets.size();
try
{
if (allow_add_row)
{
if (protobuf.readStringInto(data))
{
data.emplace_back(0);
offsets.emplace_back(data.size());
row_added = true;
}
else
data.resize_assume_reserved(offsets.back());
}
else
{
ColumnString::Chars temp_data;
if (protobuf.readStringInto(temp_data))
{
temp_data.emplace_back(0);
column_string.popBack(1);
old_size = offsets.size();
data.insertSmallAllowReadWriteOverflow15(temp_data.begin(), temp_data.end());
offsets.emplace_back(data.size());
}
}
}
catch (...)
{
offsets.resize_assume_reserved(old_size);
data.resize_assume_reserved(offsets.back());
throw;
}
}
MutableColumnPtr DataTypeString::createColumn() const
{
return ColumnString::create();

View File

@ -46,6 +46,7 @@ public:
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;

View File

@ -413,6 +413,22 @@ void DataTypeTuple::serializeProtobuf(const IColumn & column, size_t row_num, Pr
elems[i]->serializeProtobuf(extractElementColumn(column, i), row_num, protobuf);
}
void DataTypeTuple::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
bool all_elements_get_row = true;
addElementSafe(elems, column, [&]
{
for (const auto & i : ext::range(0, ext::size(elems)))
{
bool element_row_added;
elems[i]->deserializeProtobuf(extractElementColumn(column, i), protobuf, allow_add_row, element_row_added);
all_elements_get_row &= element_row_added;
}
});
row_added = all_elements_get_row;
}
MutableColumnPtr DataTypeTuple::createColumn() const
{
size_t size = elems.size();

View File

@ -78,6 +78,7 @@ public:
DeserializeBinaryBulkStatePtr & state) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & reader, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;

View File

@ -1,6 +1,7 @@
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeFactory.h>
#include <Columns/ColumnsNumber.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
@ -77,6 +78,22 @@ void DataTypeUUID::serializeProtobuf(const IColumn & column, size_t row_num, Pro
protobuf.writeUUID(UUID(static_cast<const ColumnUInt128 &>(column).getData()[row_num]));
}
void DataTypeUUID::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
UUID uuid;
if (!protobuf.readUUID(uuid))
return;
auto & container = static_cast<ColumnUInt128 &>(column).getData();
if (allow_add_row)
{
container.emplace_back(uuid);
row_added = true;
}
else
container.back() = uuid;
}
bool DataTypeUUID::equals(const IDataType & rhs) const
{

View File

@ -25,6 +25,7 @@ public:
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
bool canBeUsedInBitOperations() const override { return true; }
bool canBeInsideNullable() const override { return true; }

View File

@ -2,6 +2,7 @@
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -141,6 +142,25 @@ void DataTypeDecimal<T>::serializeProtobuf(const IColumn & column, size_t row_nu
}
template <typename T>
void DataTypeDecimal<T>::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
{
row_added = false;
T decimal;
if (!protobuf.readDecimal(decimal, precision, scale))
return;
auto & container = static_cast<ColumnType &>(column).getData();
if (allow_add_row)
{
container.emplace_back(decimal);
row_added = true;
}
else
container.back() = decimal;
}
template <typename T>
Field DataTypeDecimal<T>::getDefault() const
{

View File

@ -101,6 +101,7 @@ public:
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
Field getDefault() const override;
bool canBePromoted() const override { return true; }

View File

@ -23,6 +23,7 @@ using MutableColumnPtr = COWPtr<IColumn>::MutablePtr;
using DataTypePtr = std::shared_ptr<const IDataType>;
using DataTypes = std::vector<DataTypePtr>;
class ProtobufReader;
class ProtobufWriter;
@ -254,6 +255,7 @@ public:
/** Serialize to a protobuf. */
virtual void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const = 0;
virtual void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const = 0;
protected:
virtual String doGetName() const;

View File

@ -27,8 +27,9 @@ public:
void serializeBinaryBulk(const IColumn &, WriteBuffer &, size_t, size_t) const override { throwNoSerialization(); }
void deserializeBinaryBulk(IColumn &, ReadBuffer &, size_t, double) const override { throwNoSerialization(); }
void serializeText(const IColumn &, size_t, WriteBuffer &, const FormatSettings &) const override { throwNoSerialization(); }
void deserializeText(IColumn &, ReadBuffer &, const FormatSettings &) const override { throwNoSerialization(); }
void serializeProtobuf(const IColumn &, size_t, ProtobufWriter &) const override { throwNoSerialization(); }
void deserializeText(IColumn &, ReadBuffer &, const FormatSettings &) const override { throwNoSerialization(); }
void serializeProtobuf(const IColumn &, size_t, ProtobufWriter &) const override { throwNoSerialization(); }
void deserializeProtobuf(IColumn &, ProtobufReader &, bool, bool &) const override { throwNoSerialization(); }
MutableColumnPtr createColumn() const override
{
@ -50,4 +51,3 @@ public:
};
}

View File

@ -114,6 +114,7 @@ void registerInputFormatJSONEachRow(FormatFactory & factory);
void registerOutputFormatJSONEachRow(FormatFactory & factory);
void registerInputFormatParquet(FormatFactory & factory);
void registerOutputFormatParquet(FormatFactory & factory);
void registerInputFormatProtobuf(FormatFactory & factory);
void registerOutputFormatProtobuf(FormatFactory & factory);
/// Output only (presentational) formats.
@ -150,6 +151,7 @@ FormatFactory::FormatFactory()
registerOutputFormatTSKV(*this);
registerInputFormatJSONEachRow(*this);
registerOutputFormatJSONEachRow(*this);
registerInputFormatProtobuf(*this);
registerOutputFormatProtobuf(*this);
registerInputFormatCapnProto(*this);
registerInputFormatParquet(*this);

View File

@ -20,7 +20,7 @@ FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & schem
{
throw Exception(
"Format schema requires the 'format_schema' setting to have the 'schema_file:message_name' format"
+ (schema_file_extension.empty() ? "" : "e.g. 'schema." + schema_file_extension + ":Message'"),
+ (schema_file_extension.empty() ? "" : ", e.g. 'schema." + schema_file_extension + ":Message'"),
ErrorCodes::BAD_ARGUMENTS);
}
return;
@ -29,11 +29,11 @@ FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & schem
size_t colon_pos = format_schema.find(':');
Poco::Path path;
if ((colon_pos == String::npos) || (colon_pos == 0) || (colon_pos == format_schema.length() - 1)
|| path.assign(format_schema.substr(0, colon_pos)).getFileName().empty())
|| path.assign(format_schema.substr(0, colon_pos)).makeFile().getFileName().empty())
{
throw Exception(
"Format schema requires the 'format_schema' setting to have the 'schema_file:message_name' format"
+ (schema_file_extension.empty() ? "" : "e.g. 'schema." + schema_file_extension + ":Message'") + ". Got '" + format_schema
+ (schema_file_extension.empty() ? "" : ", e.g. 'schema." + schema_file_extension + ":Message'") + ". Got '" + format_schema
+ "'",
ErrorCodes::BAD_ARGUMENTS);
}

View File

@ -0,0 +1,58 @@
#include <Common/config.h>
#if USE_PROTOBUF
#include <Common/Exception.h>
#include <Formats/ProtobufColumnMatcher.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/descriptor.pb.h>
#include <Poco/String.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NO_COMMON_COLUMNS_WITH_PROTOBUF_SCHEMA;
}
namespace
{
String columnNameToSearchableForm(const String & str)
{
return Poco::replace(Poco::toUpper(str), ".", "_");
}
}
namespace ProtobufColumnMatcher
{
namespace details
{
ColumnNameMatcher::ColumnNameMatcher(const std::vector<String> & column_names) : column_usage(column_names.size())
{
column_usage.resize(column_names.size(), false);
for (size_t i = 0; i != column_names.size(); ++i)
column_name_to_index_map.emplace(columnNameToSearchableForm(column_names[i]), i);
}
size_t ColumnNameMatcher::findColumn(const String & field_name)
{
auto it = column_name_to_index_map.find(columnNameToSearchableForm(field_name));
if (it == column_name_to_index_map.end())
return -1;
size_t column_index = it->second;
if (column_usage[column_index])
return -1;
column_usage[column_index] = true;
return column_index;
}
void throwNoCommonColumns()
{
throw Exception("No common columns with provided protobuf schema", ErrorCodes::NO_COMMON_COLUMNS_WITH_PROTOBUF_SCHEMA);
}
}
}
}
#endif

View File

@ -0,0 +1,160 @@
#pragma once
#include <Common/config.h>
#if USE_PROTOBUF
#include <memory>
#include <unordered_map>
#include <vector>
#include <Core/Types.h>
#include <boost/blank.hpp>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/descriptor.pb.h>
namespace google
{
namespace protobuf
{
class Descriptor;
class FieldDescriptor;
}
}
namespace DB
{
namespace ProtobufColumnMatcher
{
struct DefaultTraits
{
using MessageData = boost::blank;
using FieldData = boost::blank;
};
template <typename Traits = DefaultTraits>
struct Message;
/// Represents a field in a protobuf message.
template <typename Traits = DefaultTraits>
struct Field
{
const google::protobuf::FieldDescriptor * field_descriptor = nullptr;
/// Same as field_descriptor->number().
UInt32 field_number = 0;
/// Index of a column; either 'column_index' or 'nested_message' is set.
size_t column_index = -1;
std::unique_ptr<Message<Traits>> nested_message;
typename Traits::FieldData data;
};
/// Represents a protobuf message.
template <typename Traits>
struct Message
{
std::vector<Field<Traits>> fields;
/// Points to the parent message if this is a nested message.
Message * parent = nullptr;
size_t index_in_parent = -1;
typename Traits::MessageData data;
};
/// Utility function finding matching columns for each protobuf field.
template <typename Traits = DefaultTraits>
static std::unique_ptr<Message<Traits>> matchColumns(
const std::vector<String> & column_names,
const google::protobuf::Descriptor * message_type);
namespace details
{
void throwNoCommonColumns();
class ColumnNameMatcher
{
public:
ColumnNameMatcher(const std::vector<String> & column_names);
size_t findColumn(const String & field_name);
private:
std::unordered_map<String, size_t> column_name_to_index_map;
std::vector<bool> column_usage;
};
template <typename Traits>
std::unique_ptr<Message<Traits>> matchColumnsRecursive(
ColumnNameMatcher & name_matcher,
const google::protobuf::Descriptor * message_type,
const String & field_name_prefix)
{
auto message = std::make_unique<Message<Traits>>();
for (int i = 0; i != message_type->field_count(); ++i)
{
const google::protobuf::FieldDescriptor * field_descriptor = message_type->field(i);
if ((field_descriptor->type() == google::protobuf::FieldDescriptor::TYPE_MESSAGE)
|| (field_descriptor->type() == google::protobuf::FieldDescriptor::TYPE_GROUP))
{
auto nested_message = matchColumnsRecursive<Traits>(
name_matcher, field_descriptor->message_type(), field_name_prefix + field_descriptor->name() + ".");
if (nested_message)
{
message->fields.emplace_back();
auto & current_field = message->fields.back();
current_field.field_number = field_descriptor->number();
current_field.field_descriptor = field_descriptor;
current_field.nested_message = std::move(nested_message);
current_field.nested_message->parent = message.get();
}
}
else
{
size_t column_index = name_matcher.findColumn(field_name_prefix + field_descriptor->name());
if (column_index != static_cast<size_t>(-1))
{
message->fields.emplace_back();
auto & current_field = message->fields.back();
current_field.field_number = field_descriptor->number();
current_field.field_descriptor = field_descriptor;
current_field.column_index = column_index;
}
}
}
if (message->fields.empty())
return nullptr;
// Columns should be sorted by field_number, it's necessary for writing protobufs and useful reading protobufs.
std::sort(message->fields.begin(), message->fields.end(), [](const Field<Traits> & left, const Field<Traits> & right)
{
return left.field_number < right.field_number;
});
for (size_t i = 0; i != message->fields.size(); ++i)
{
auto & field = message->fields[i];
if (field.nested_message)
field.nested_message->index_in_parent = i;
}
return message;
}
}
template <typename Data>
static std::unique_ptr<Message<Data>> matchColumns(
const std::vector<String> & column_names,
const google::protobuf::Descriptor * message_type)
{
details::ColumnNameMatcher name_matcher(column_names);
auto message = details::matchColumnsRecursive<Data>(name_matcher, message_type, "");
if (!message)
details::throwNoCommonColumns();
return message;
}
}
}
#endif

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,220 @@
#pragma once
#include <common/DayNum.h>
#include <Common/PODArray.h>
#include <Common/UInt128.h>
#include <Core/UUID.h>
#include <Common/config.h>
#if USE_PROTOBUF
#include <boost/noncopyable.hpp>
#include <Formats/ProtobufColumnMatcher.h>
#include <memory>
namespace google
{
namespace protobuf
{
class Descriptor;
}
}
namespace DB
{
class Arena;
class IAggregateFunction;
class ReadBuffer;
using AggregateDataPtr = char *;
using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
/** Deserializes a protobuf, tries to cast data types if necessarily.
*/
class ProtobufReader : private boost::noncopyable
{
public:
ProtobufReader(ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names);
~ProtobufReader();
/// Should be called when we start reading a new message.
bool startMessage();
/// Ends reading a message.
void endMessage();
/// Reads the column index.
/// The function returns false if there are no more columns to read (call endMessage() in this case).
bool readColumnIndex(size_t & column_index);
/// Reads a value which should be put to column at index received with readColumnIndex().
/// The function returns false if there are no more values to read now (call readColumnIndex() in this case).
bool readNumber(Int8 & value) { return current_converter->readInt8(value); }
bool readNumber(UInt8 & value) { return current_converter->readUInt8(value); }
bool readNumber(Int16 & value) { return current_converter->readInt16(value); }
bool readNumber(UInt16 & value) { return current_converter->readUInt16(value); }
bool readNumber(Int32 & value) { return current_converter->readInt32(value); }
bool readNumber(UInt32 & value) { return current_converter->readUInt32(value); }
bool readNumber(Int64 & value) { return current_converter->readInt64(value); }
bool readNumber(UInt64 & value) { return current_converter->readUInt64(value); }
bool readNumber(UInt128 & value) { return current_converter->readUInt128(value); }
bool readNumber(Float32 & value) { return current_converter->readFloat32(value); }
bool readNumber(Float64 & value) { return current_converter->readFloat64(value); }
bool readStringInto(PaddedPODArray<UInt8> & str) { return current_converter->readStringInto(str); }
void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & name_value_pairs) { current_converter->prepareEnumMapping8(name_value_pairs); }
void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & name_value_pairs) { current_converter->prepareEnumMapping16(name_value_pairs); }
bool readEnum(Int8 & value) { return current_converter->readEnum8(value); }
bool readEnum(Int16 & value) { return current_converter->readEnum16(value); }
bool readUUID(UUID & uuid) { return current_converter->readUUID(uuid); }
bool readDate(DayNum & date) { return current_converter->readDate(date); }
bool readDateTime(time_t & tm) { return current_converter->readDateTime(tm); }
bool readDecimal(Decimal32 & decimal, UInt32 precision, UInt32 scale) { return current_converter->readDecimal32(decimal, precision, scale); }
bool readDecimal(Decimal64 & decimal, UInt32 precision, UInt32 scale) { return current_converter->readDecimal64(decimal, precision, scale); }
bool readDecimal(Decimal128 & decimal, UInt32 precision, UInt32 scale) { return current_converter->readDecimal128(decimal, precision, scale); }
bool readAggregateFunction(const AggregateFunctionPtr & function, AggregateDataPtr place, Arena & arena) { return current_converter->readAggregateFunction(function, place, arena); }
/// When it returns false there is no more values left and from now on all the read<Type>() functions will return false
/// until readColumnIndex() is called. When it returns true it's unclear.
bool maybeCanReadValue() const { return simple_reader.maybeCanReadValue(); }
private:
class SimpleReader
{
public:
SimpleReader(ReadBuffer & in_);
bool startMessage();
void endMessage();
void endRootMessage();
bool readFieldNumber(UInt32 & field_number);
bool readInt(Int64 & value);
bool readSInt(Int64 & value);
bool readUInt(UInt64 & value);
template<typename T> bool readFixed(T & value);
bool readStringInto(PaddedPODArray<UInt8> & str);
bool maybeCanReadValue() const { return field_end != REACHED_END; }
private:
void readBinary(void* data, size_t size);
void ignore(UInt64 num_bytes);
void moveCursorBackward(UInt64 num_bytes);
UInt64 readVarint();
void ignoreVarint();
void ignoreGroup();
static constexpr UInt64 REACHED_END = 0;
ReadBuffer & in;
UInt64 cursor;
std::vector<UInt64> parent_message_ends;
UInt64 current_message_end;
UInt64 field_end;
};
class IConverter
{
public:
virtual ~IConverter() = default;
virtual bool readStringInto(PaddedPODArray<UInt8> &) = 0;
virtual bool readInt8(Int8&) = 0;
virtual bool readUInt8(UInt8 &) = 0;
virtual bool readInt16(Int16 &) = 0;
virtual bool readUInt16(UInt16 &) = 0;
virtual bool readInt32(Int32 &) = 0;
virtual bool readUInt32(UInt32 &) = 0;
virtual bool readInt64(Int64 &) = 0;
virtual bool readUInt64(UInt64 &) = 0;
virtual bool readUInt128(UInt128 &) = 0;
virtual bool readFloat32(Float32 &) = 0;
virtual bool readFloat64(Float64 &) = 0;
virtual void prepareEnumMapping8(const std::vector<std::pair<std::string, Int8>> &) = 0;
virtual void prepareEnumMapping16(const std::vector<std::pair<std::string, Int16>> &) = 0;
virtual bool readEnum8(Int8 &) = 0;
virtual bool readEnum16(Int16 &) = 0;
virtual bool readUUID(UUID &) = 0;
virtual bool readDate(DayNum &) = 0;
virtual bool readDateTime(time_t &) = 0;
virtual bool readDecimal32(Decimal32 &, UInt32, UInt32) = 0;
virtual bool readDecimal64(Decimal64 &, UInt32, UInt32) = 0;
virtual bool readDecimal128(Decimal128 &, UInt32, UInt32) = 0;
virtual bool readAggregateFunction(const AggregateFunctionPtr &, AggregateDataPtr, Arena &) = 0;
};
class ConverterBaseImpl;
template <int type_id> class ConverterImpl;
class ConverterFromString;
template<int field_type_id, typename FromType> class ConverterFromNumber;
struct ColumnMatcherTraits
{
struct FieldData
{
std::unique_ptr<IConverter> converter;
};
struct MessageData
{
std::unordered_map<UInt32, const ProtobufColumnMatcher::Field<ColumnMatcherTraits>*> field_number_to_field_map;
};
};
using Message = ProtobufColumnMatcher::Message<ColumnMatcherTraits>;
using Field = ProtobufColumnMatcher::Field<ColumnMatcherTraits>;
void setTraitsDataAfterMatchingColumns(Message * message);
SimpleReader simple_reader;
std::unique_ptr<Message> root_message;
Message* current_message = nullptr;
size_t current_field_index = 0;
IConverter* current_converter = nullptr;
};
}
#else
namespace DB
{
class Arena;
class IAggregateFunction;
class ReadBuffer;
using AggregateDataPtr = char *;
using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
class ProtobufReader
{
public:
bool startMessage() { return false; }
void endMessage() {}
bool readColumnIndex(size_t & column_index) { return false; }
bool readNumber(Int8 & value) { return false; }
bool readNumber(UInt8 & value) { return false; }
bool readNumber(Int16 & value) { return false; }
bool readNumber(UInt16 & value) { return false; }
bool readNumber(Int32 & value) { return false; }
bool readNumber(UInt32 & value) { return false; }
bool readNumber(Int64 & value) { return false; }
bool readNumber(UInt64 & value) { return false; }
bool readNumber(UInt128 & value) { return false; }
bool readNumber(Float32 & value) { return false; }
bool readNumber(Float64 & value) { return false; }
bool readStringInto(PaddedPODArray<UInt8> & str) { return false; }
void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & name_value_pairs) {}
void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & name_value_pairs) {}
bool readEnum(Int8 & value) { return false; }
bool readEnum(Int16 & value) { return false; }
bool readUUID(UUID & uuid) { return false; }
bool readDate(DayNum & date) { return false; }
bool readDateTime(time_t & tm) { return false; }
bool readDecimal(Decimal32 & decimal, UInt32 precision, UInt32 scale) { return false; }
bool readDecimal(Decimal64 & decimal, UInt32 precision, UInt32 scale) { return false; }
bool readDecimal(Decimal128 & decimal, UInt32 precision, UInt32 scale) { return false; }
bool readAggregateFunction(const AggregateFunctionPtr & function, AggregateDataPtr place, Arena & arena) { return false; }
bool maybeCanReadValue() const { return false; }
};
}
#endif

View File

@ -0,0 +1,93 @@
#include <Common/config.h>
#if USE_PROTOBUF
#include <Core/Block.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/FormatSchemaInfo.h>
#include <Formats/ProtobufRowInputStream.h>
#include <Formats/ProtobufSchemas.h>
namespace DB
{
ProtobufRowInputStream::ProtobufRowInputStream(ReadBuffer & in_, const Block & header, const FormatSchemaInfo & info)
: data_types(header.getDataTypes()), reader(in_, ProtobufSchemas::instance().getMessageTypeForFormatSchema(info), header.getNames())
{
}
ProtobufRowInputStream::~ProtobufRowInputStream() = default;
bool ProtobufRowInputStream::read(MutableColumns & columns, RowReadExtension & extra)
{
if (!reader.startMessage())
return false; // EOF reached, no more messages.
// Set of columns for which the values were read. The rest will be filled with default values.
auto & read_columns = extra.read_columns;
read_columns.assign(columns.size(), false);
// Read values from this message and put them to the columns while it's possible.
size_t column_index;
while (reader.readColumnIndex(column_index))
{
bool allow_add_row = !static_cast<bool>(read_columns[column_index]);
do
{
bool row_added;
data_types[column_index]->deserializeProtobuf(*columns[column_index], reader, allow_add_row, row_added);
if (row_added)
{
read_columns[column_index] = true;
allow_add_row = false;
}
} while (reader.maybeCanReadValue());
}
// Fill non-visited columns with the default values.
for (column_index = 0; column_index < read_columns.size(); ++column_index)
if (!read_columns[column_index])
data_types[column_index]->insertDefaultInto(*columns[column_index]);
reader.endMessage();
return true;
}
bool ProtobufRowInputStream::allowSyncAfterError() const
{
return true;
}
void ProtobufRowInputStream::syncAfterError()
{
reader.endMessage();
}
void registerInputFormatProtobuf(FormatFactory & factory)
{
factory.registerInputFormat("Protobuf", [](
ReadBuffer & buf,
const Block & sample,
const Context & context,
UInt64 max_block_size,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<ProtobufRowInputStream>(buf, sample, FormatSchemaInfo(context, "proto")),
sample, max_block_size, settings);
});
}
}
#else
namespace DB
{
class FormatFactory;
void registerInputFormatProtobuf(FormatFactory & factory) {}
}
#endif

View File

@ -0,0 +1,34 @@
#pragma once
#include <Common/config.h>
#if USE_PROTOBUF
#include <DataTypes/IDataType.h>
#include <Formats/IRowInputStream.h>
#include <Formats/ProtobufReader.h>
namespace DB
{
class Block;
class FormatSchemaInfo;
/** Interface of stream, that allows to read data by rows.
*/
class ProtobufRowInputStream : public IRowInputStream
{
public:
ProtobufRowInputStream(ReadBuffer & in_, const Block & header, const FormatSchemaInfo & info);
~ProtobufRowInputStream() override;
bool read(MutableColumns & columns, RowReadExtension & extra) override;
bool allowSyncAfterError() const override;
void syncAfterError() override;
private:
DataTypes data_types;
ProtobufReader reader;
};
}
#endif

View File

@ -18,7 +18,7 @@ namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD;
extern const int CANNOT_CONVERT_TO_PROTOBUF_TYPE;
extern const int PROTOBUF_BAD_CAST;
extern const int PROTOBUF_FIELD_NOT_REPEATED;
}
@ -67,14 +67,14 @@ protected:
{
throw Exception(
"Could not convert data type '" + type_name + "' to protobuf type '" + field->type_name() + "' (field: " + field->name() + ")",
ErrorCodes::CANNOT_CONVERT_TO_PROTOBUF_TYPE);
ErrorCodes::PROTOBUF_BAD_CAST);
}
void cannotConvertValue(const String & value)
{
throw Exception(
"Could not convert value '" + value + "' to protobuf type '" + field->type_name() + "' (field: " + field->name() + ")",
ErrorCodes::CANNOT_CONVERT_TO_PROTOBUF_TYPE);
ErrorCodes::PROTOBUF_BAD_CAST);
}
template <typename To, typename From>

View File

@ -36,7 +36,7 @@ namespace ErrorCodes
InterpreterInsertQuery::InterpreterInsertQuery(
const ASTPtr & query_ptr_, const Context & context_, bool allow_materialized_)
const ASTPtr & query_ptr_, Context & context_, bool allow_materialized_)
: query_ptr(query_ptr_), context(context_), allow_materialized(allow_materialized_)
{
}

View File

@ -15,7 +15,7 @@ namespace DB
class InterpreterInsertQuery : public IInterpreter
{
public:
InterpreterInsertQuery(const ASTPtr & query_ptr_, const Context & context_, bool allow_materialized_ = false);
InterpreterInsertQuery(const ASTPtr & query_ptr_, Context & context_, bool allow_materialized_ = false);
/** Prepare a request for execution. Return block streams
* - the stream into which you can write data to execute the query, if INSERT;
@ -32,7 +32,7 @@ private:
void checkAccess(const ASTInsertQuery & query);
ASTPtr query_ptr;
const Context & context;
Context & context;
bool allow_materialized;
};

View File

@ -42,6 +42,12 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s
settings.ostr << (settings.hilite ? hilite_keyword : "") << " VALUES" << (settings.hilite ? hilite_none : "");
}
}
if (settings_ast)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "SETTINGS " << (settings.hilite ? hilite_none : "");
settings_ast->formatImpl(settings, state, frame);
}
}
}

View File

@ -18,6 +18,7 @@ public:
String format;
ASTPtr select;
ASTPtr table_function;
ASTPtr settings_ast;
// Set to true if the data should only be inserted into attached views
bool no_destination = false;
@ -39,10 +40,8 @@ public:
if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); }
if (select) { res->select = select->clone(); res->children.push_back(res->select); }
if (table_function)
{
res->table_function = table_function->clone(); res->children.push_back(res->table_function);
}
if (table_function) { res->table_function = table_function->clone(); res->children.push_back(res->table_function); }
if (settings_ast) { res->settings_ast = settings_ast->clone(); res->children.push_back(res->settings_ast); }
return res;
}

View File

@ -7,6 +7,7 @@
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ParserSelectWithUnionQuery.h>
#include <Parsers/ParserInsertQuery.h>
#include <Parsers/ParserSetQuery.h>
#include <Parsers/ASTFunction.h>
@ -27,6 +28,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserToken s_dot(TokenType::Dot);
ParserKeyword s_values("VALUES");
ParserKeyword s_format("FORMAT");
ParserKeyword s_settings("SETTINGS");
ParserKeyword s_select("SELECT");
ParserKeyword s_with("WITH");
ParserToken s_lparen(TokenType::OpeningRoundBracket);
@ -41,6 +43,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ASTPtr format;
ASTPtr select;
ASTPtr table_function;
ASTPtr settings_ast;
/// Insertion data
const char * data = nullptr;
@ -86,12 +89,32 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
else if (s_format.ignore(pos, expected))
{
auto name_pos = pos;
if (!name_p.parse(pos, format, expected))
return false;
}
else if (s_select.ignore(pos, expected) || s_with.ignore(pos,expected))
{
pos = before_select;
ParserSelectWithUnionQuery select_p;
select_p.parse(pos, select, expected);
}
else
{
return false;
}
data = name_pos->end;
if (s_settings.ignore(pos, expected))
{
ParserSetQuery parser_settings(true);
if (!parser_settings.parse(pos, settings_ast, expected))
return false;
}
if (format)
{
Pos last_token = pos;
--last_token;
data = last_token->end;
if (data < end && *data == ';')
throw Exception("You have excessive ';' symbol before data for INSERT.\n"
@ -114,16 +137,6 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (data < end && *data == '\n')
++data;
}
else if (s_select.ignore(pos, expected) || s_with.ignore(pos,expected))
{
pos = before_select;
ParserSelectWithUnionQuery select_p;
select_p.parse(pos, select, expected);
}
else
{
return false;
}
auto query = std::make_shared<ASTInsertQuery>();
node = query;
@ -142,6 +155,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->columns = columns;
query->select = select;
query->settings_ast = settings_ast;
query->data = data != end ? data : nullptr;
query->end = end;
@ -149,6 +163,8 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->children.push_back(columns);
if (select)
query->children.push_back(select);
if (settings_ast)
query->children.push_back(settings_ast);
return true;
}

View File

@ -0,0 +1,4 @@
echo -ne '\xe0\x01\x0a\x24\x61\x37\x35\x32\x32\x31\x35\x38\x2d\x33\x64\x34\x31\x2d\x34\x62\x37\x37\x2d\x61\x64\x36\x39\x2d\x36\x63\x35\x39\x38\x65\x65\x35\x35\x63\x34\x39\x12\x04\x49\x76\x61\x6e\x1a\x06\x50\x65\x74\x72\x6f\x76\x20\x01\x28\xaf\x1f\x32\x03\x70\x6e\x67\x3a\x0c\x2b\x37\x34\x39\x35\x31\x32\x33\x34\x35\x36\x37\x40\x01\x4d\xfc\xd0\x30\x5c\x50\x26\x58\x09\x62\x09\x59\x65\x73\x74\x65\x72\x64\x61\x79\x62\x07\x46\x6c\x6f\x77\x65\x72\x73\x6a\x04\xff\x01\x00\x00\x72\x06\x4d\x6f\x73\x63\x6f\x77\x7a\x08\x4b\x03\x5f\x42\x72\x7d\x16\x42\x81\x01\x1f\x85\xeb\x51\xb8\x1e\x09\x40\x89\x01\x33\x33\x33\x33\x33\xc3\x6a\x40\x95\x01\xcd\xcc\xcc\x3d\x9d\x01\x9a\x99\xb9\x40\xa0\x01\x80\xc4\xd7\x8d\x7f\xaa\x01\x0c\x0a\x05\x6d\x65\x74\x65\x72\x15\x00\x00\x80\x3f\xaa\x01\x11\x0a\x0a\x63\x65\x6e\x74\x69\x6d\x65\x74\x65\x72\x15\x0a\xd7\x23\x3c\xaa\x01\x10\x0a\x09\x6b\x69\x6c\x6f\x6d\x65\x74\x65\x72\x15\x00\x00\x7a\x44\x7e\x0a\x24\x63\x36\x39\x34\x61\x64\x38\x61\x2d\x66\x37\x31\x34\x2d\x34\x65\x61\x33\x2d\x39\x30\x37\x64\x2d\x66\x64\x35\x34\x66\x62\x32\x35\x64\x39\x62\x35\x12\x07\x4e\x61\x74\x61\x6c\x69\x61\x1a\x08\x53\x6f\x6b\x6f\x6c\x6f\x76\x61\x28\xa6\x3f\x32\x03\x6a\x70\x67\x50\x1a\x58\x0b\x6a\x04\x64\xc8\x01\x32\x72\x08\x50\x6c\x79\x6d\x6f\x75\x74\x68\x7a\x08\x6a\x9d\x49\x42\x46\x8c\x84\xc0\x81\x01\x6e\x86\x1b\xf0\xf9\x21\x09\x40\x95\x01\x42\x60\xe5\x3b\x9d\x01\xcd\xcc\xac\x40\xa0\x01\xff\xff\xa9\xce\x93\x8c\x09\xb3\x01\x0a\x24\x61\x37\x64\x61\x31\x61\x61\x36\x2d\x66\x34\x32\x35\x2d\x34\x37\x38\x39\x2d\x38\x39\x34\x37\x2d\x62\x30\x33\x34\x37\x38\x36\x65\x64\x33\x37\x34\x12\x06\x56\x61\x73\x69\x6c\x79\x1a\x07\x53\x69\x64\x6f\x72\x6f\x76\x20\x01\x28\xfb\x48\x32\x03\x62\x6d\x70\x3a\x0d\x2b\x34\x34\x32\x30\x31\x32\x33\x34\x35\x36\x37\x38\x40\x01\x4d\x50\xe0\x27\x5c\x50\x17\x58\x04\x62\x05\x53\x75\x6e\x6e\x79\x6a\x05\xfa\x01\xf4\x01\x0a\x72\x08\x4d\x75\x72\x6d\x61\x6e\x73\x6b\x7a\x08\xfd\xf0\x89\x42\xc8\x4c\x04\x42\x81\x01\x11\x2d\x44\x54\xfb\x21\x09\x40\x89\x01\x00\x00\x00\xe8\x76\x48\x37\x42\x95\x01\x00\x00\x48\x44\x9d\x01\xcd\xcc\x4c\xc0\xa0\x01\x80\xd4\x9f\x93\x01\xaa\x01\x0c\x0a\x05\x70\x6f\x75\x6e\x64\x15\x00\x00\x80\x41' | $CLICKHOUSE_CLIENT --query="INSERT INTO test.table FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format_with_nested:Person'"
echo -ne '\xaa\x01\x12\x05\x46\x72\x69\x64\x61\x28\x99\xe1\xf3\xd1\x0b\x52\x08\x45\x72\x6d\x61\x6b\x6f\x76\x61\x72\x0c\x00\x00\xdc\x42\x00\x00\x52\x43\x00\x00\x94\x42\x79\x48\xce\x3d\x51\x00\x00\x00\x00\xc8\x02\x14\xc2\x05\x08\x00\x00\x80\x44\x00\x00\x80\x49\x9a\x06\x02\x4b\x42\x9a\x06\x02\x4d\x42\xa1\x06\x00\x00\x00\x00\x00\x00\xe0\x3f\xa8\x06\x2a\xa8\x06\xa8\xff\xff\xff\xff\xff\xff\xff\xff\x01\xb0\x06\x01\xbd\x06\x25\x06\x49\x40\xfa\x06\x02\x34\x30\x90\x08\xe2\x08\xe1\x08\x89\xe6\x6e\xdd\x01\x00\x00\x00\xb0\x09\xc3\x19\xd0\x0c\xb7\x02\xe2\x12\x24\x32\x30\x66\x63\x64\x39\x35\x61\x2d\x33\x33\x32\x64\x2d\x34\x31\x64\x62\x2d\x61\x39\x65\x63\x2d\x31\x36\x31\x66\x36\x34\x34\x64\x30\x35\x39\x63\xb0\x01\x08\x01\x12\x06\x49\x73\x6f\x6c\x64\x65\x52\x07\x4c\x61\x76\x72\x6f\x76\x61\x72\x0c\x00\x00\x7f\x43\x00\x00\x00\x00\x00\x00\x7f\x43\xaa\x01\x03\x61\x62\x63\xc8\x02\x32\xc2\x05\x08\x00\x00\x00\x41\x00\x00\x80\x3f\x9a\x06\x04\x42\x79\x74\x65\x9a\x06\x03\x42\x69\x74\xa1\x06\x00\x00\x00\x00\x00\x00\x12\x40\xa8\x06\x1a\xa8\x06\xb0\xff\xff\xff\xff\xff\xff\xff\xff\x01\xb0\x06\x01\xbd\x06\xf9\x0f\x49\x40\xc2\x06\x01\x2c\xfa\x06\x02\x33\x32\x90\x08\x78\xe1\x08\x39\x4e\x2b\xfe\xe4\xf5\xff\xff\xb0\x09\xe8\x30\xd8\x12\x01\xe2\x12\x24\x37\x63\x66\x61\x36\x38\x35\x36\x2d\x61\x35\x34\x61\x2d\x34\x37\x38\x36\x2d\x62\x38\x65\x35\x2d\x37\x34\x35\x31\x35\x39\x64\x35\x32\x32\x37\x38\xc2\x3e\x05\x15\x00\x00\xb6\x42' | $CLICKHOUSE_CLIENT --query="INSERT INTO test.table FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format_with_nested:AltPerson'"
echo -ne '\x9a\x02\x0a\x24\x61\x61\x30\x65\x35\x61\x30\x36\x2d\x63\x61\x62\x32\x2d\x34\x30\x33\x34\x2d\x61\x36\x61\x32\x2d\x34\x38\x65\x38\x32\x62\x39\x31\x36\x36\x34\x65\x12\x06\x4c\x65\x6f\x6e\x69\x64\x1a\x08\x4b\x69\x72\x69\x6c\x6c\x6f\x76\x22\x04\x6d\x61\x6c\x65\x2a\x0a\x31\x39\x38\x33\x2d\x30\x36\x2d\x32\x34\x3a\x0c\x2b\x37\x34\x39\x35\x30\x32\x37\x35\x38\x36\x34\x42\x01\x31\x4a\x13\x32\x30\x31\x39\x2d\x30\x32\x2d\x30\x34\x20\x30\x39\x3a\x34\x35\x3a\x30\x30\x52\x02\x33\x35\x5a\x06\x63\x61\x6e\x63\x65\x72\x62\x07\x37\x20\x72\x69\x6e\x67\x73\x62\x08\x45\x61\x73\x74\x73\x69\x64\x65\x62\x0b\x4c\x61\x73\x74\x20\x48\x75\x72\x72\x61\x68\x6a\x01\x30\x6a\x01\x30\x6a\x03\x32\x35\x35\x72\x09\x53\x61\x6e\x20\x44\x69\x65\x67\x6f\x7a\x09\x33\x32\x2e\x38\x32\x33\x39\x34\x33\x7a\x0b\x2d\x31\x31\x37\x2e\x30\x38\x31\x33\x32\x37\x82\x01\x09\x33\x2e\x31\x34\x31\x35\x39\x32\x37\x8a\x01\x08\x31\x35\x30\x30\x30\x30\x30\x30\x92\x01\x06\x31\x38\x36\x2e\x37\x35\x9a\x01\x04\x2d\x32\x2e\x31\xa2\x01\x0b\x32\x30\x36\x35\x39\x38\x32\x39\x33\x33\x31\xaa\x01\x18\x0a\x06\x6d\x69\x6e\x75\x74\x65\x0a\x04\x68\x6f\x75\x72\x12\x02\x36\x30\x12\x04\x33\x36\x30\x30' | $CLICKHOUSE_CLIENT --query="INSERT INTO test.table FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format_with_nested:StrPerson'"
echo -ne '\xcf\x01\x0a\x24\x33\x66\x61\x65\x65\x30\x36\x34\x2d\x63\x34\x66\x37\x2d\x34\x64\x33\x34\x2d\x62\x36\x66\x33\x2d\x38\x64\x38\x31\x63\x32\x62\x36\x61\x31\x35\x64\x12\x04\x4e\x69\x63\x6b\x1a\x0a\x4b\x6f\x6c\x65\x73\x6e\x69\x6b\x6f\x76\x20\x01\x28\xda\x52\x32\x03\x62\x6d\x70\x3a\x0c\x34\x31\x32\x2d\x36\x38\x37\x2d\x35\x30\x30\x37\x40\x01\x4d\x2f\x27\xf2\x5b\x50\x14\x58\x09\x62\x06\x48\x61\x76\x61\x6e\x61\x68\x80\x01\x68\x00\x68\x80\x01\x72\x0a\x50\x69\x74\x74\x73\x62\x75\x72\x67\x68\x7a\x08\x9b\x11\x22\x42\x1f\xe6\x9f\xc2\x81\x01\x28\x2d\x44\x54\xfb\x21\x09\x40\x89\x01\x00\x00\x00\xe8\x76\x48\x27\x42\x95\x01\x00\x00\x43\x44\x9d\x01\x66\x66\x92\x41\xa0\x01\xce\xdf\xb8\xba\x01\xab\x01\x0d\xcd\xcc\xe2\x41\x0d\xcd\xcc\x4c\x3e\x0d\x00\x00\x80\x3f\x12\x05\x6f\x75\x6e\x63\x65\x12\x05\x63\x61\x72\x61\x74\x12\x04\x67\x72\x61\x6d\xac\x01' | $CLICKHOUSE_CLIENT --query="INSERT INTO test.table FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format_syntax2_with_nested:Syntax2Person'"

View File

@ -0,0 +1,7 @@
a7da1aa6-f425-4789-8947-b034786ed374 Vasily Sidorov male 1995-07-28 bmp +442012345678 1 2018-12-30 00:00:00 23 leo ['Sunny'] [250,244,10] Murmansk [68.970680,33.074982] 3.14159265358979 100000000000.00 800 -3.2 154400000 ['pound'] [16]
c694ad8a-f714-4ea3-907d-fd54fb25d9b5 Natalia Sokolova female 1992-03-08 jpg \N 0 \N 26 pisces [] [100,200,50] Plymouth [50.403724,-4.142123] 3.14159 \N 0.007 5.4 -20000000000000 [] []
aa0e5a06-cab2-4034-a6a2-48e82b91664e Leonid Kirillov male 1983-06-24 \N +74950275864\0 1 2019-02-04 09:45:00 35 cancer ['7 rings','Eastside','Last Hurrah'] [0,0,255] San Diego [32.823943,-117.081327] 3.1415927 15000000.00 186.75 -2.1 20659829331 ['minute','hour'] [60,3600]
20fcd95a-332d-41db-a9ec-161f644d059c Frida Ermakova female 1978-12-12 \N 3124555929\0\0\0 0 2013-03-11 16:30:00 40 sagittarius [] [110,210,74] [42.000000,-88.000000] 3.1410000324249268 311.00 0.5 10.0 8010000009 ['KB','MB'] [1024,1048576]
a7522158-3d41-4b77-ad69-6c598ee55c49 Ivan Petrov male 1980-12-29 png +74951234567\0 1 2019-01-05 18:45:00 38 capricorn ['Yesterday','Flowers'] [255,0,0] Moscow [55.753216,37.622504] 3.14 214.10 0.1 5.8 17060000000 ['meter','centimeter','kilometer'] [1,0.01,1000]
3faee064-c4f7-4d34-b6f3-8d81c2b6a15d Nick Kolesnikov male 1998-12-26 bmp 412-687-5007\0 1 2018-11-19 05:59:59 20 capricorn ['Havana'] [128,0,128] Pittsburgh [40.517192,-79.949456] 3.1415926535898 50000000000.00 780 18.3 195500007 ['ounce','carat','gram'] [28.35,0.2,1]
7cfa6856-a54a-4786-b8e5-745159d52278 Isolde Lavrova female 1987-02-09 \N \N 1 \N 32 aquarius [] [255,0,255] [26.000000,-80.000000] 3.1415998935699463 \N 4.5 25.0 -11111111111111 ['Byte','Bit'] [8,1]

View File

@ -0,0 +1,44 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
set -e -o pipefail
# Run the client.
$CLICKHOUSE_CLIENT --multiquery <<EOF
CREATE DATABASE IF NOT EXISTS test;
DROP TABLE IF EXISTS test.table;
CREATE TABLE test.table (uuid UUID,
name String,
surname String,
gender Enum8('male'=1, 'female'=0),
birthDate Date,
photo Nullable(String),
phoneNumber Nullable(FixedString(13)),
isOnline UInt8,
visitTime Nullable(DateTime),
age UInt8,
zodiacSign Enum16('aries'=321, 'taurus'=420, 'gemini'=521, 'cancer'=621, 'leo'=723, 'virgo'=823,
'libra'=923, 'scorpius'=1023, 'sagittarius'=1122, 'capricorn'=1222, 'aquarius'=120,
'pisces'=219),
songs Array(String),
color Array(UInt8),
hometown LowCardinality(String),
location Array(Decimal32(6)),
pi Nullable(Float64),
lotteryWin Nullable(Decimal64(2)),
someRatio Float32,
temperature Decimal32(1),
randomBigNumber Int64,
measureUnits Nested (unit String, coef Float32)
) ENGINE = MergeTree ORDER BY tuple();
EOF
# To generate the file 00825_protobuf_format_input.insh use the following commands:
# ninja ProtobufDelimitedMessagesSerializer
# build/utils/test-data-generator/ProtobufDelimitedMessagesSerializer
source $CURDIR/00825_protobuf_format_input.insh
$CLICKHOUSE_CLIENT --query "SELECT * FROM test.table ORDER BY uuid;"

View File

@ -2,7 +2,7 @@
# To generate reference file for this test use the following commands:
# ninja ProtobufDelimitedMessagesSerializer
# build/utils/test-data-generator/ProtobufDelimitedMessagesSerializer > dbms/tests/queries/0_stateless/00825_protobuf_format_output.reference
# build/utils/test-data-generator/ProtobufDelimitedMessagesSerializer
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh

View File

@ -0,0 +1,51 @@
syntax = "proto2";
message Syntax2Person {
enum Gender {
female = 0;
male = 1;
};
enum ZodiacSign {
aries = 0;
taurus = 1;
gemini = 2;
cancer = 3;
leo = 4;
virgo = 5;
libra = 6;
scorpius = 7;
sagittarius = 8;
capricorn = 9;
aquarius = 10;
pisces = 11;
};
required string uuid = 1;
required string name = 2;
required string surname = 3;
required Gender gender = 4;
required uint32 birthDate = 5;
optional bytes photo = 6;
optional string phoneNumber = 7;
optional bool isOnline = 8;
optional fixed32 visitTime = 9;
optional uint32 age = 10;
optional ZodiacSign zodiacSign = 11;
repeated string songs = 12;
repeated uint32 color = 13;
optional string hometown = 14 [default='Moscow'];
repeated float location = 15 [packed=true];
optional double pi = 16;
optional double lotteryWin = 17;
optional float someRatio = 18;
optional float temperature = 19;
optional sint64 randomBigNumber = 20;
optional group MeasureUnits = 21 {
repeated float coef = 1;
repeated string unit = 2;
};
optional string newFieldStr = 22 [default='abc'];
optional int32 newFieldInt = 23 [default=-11];
optional bool newBool = 24 [default=true];
};

View File

@ -0,0 +1,118 @@
syntax = "proto3";
enum Gender {
female = 0;
male = 1;
};
enum ZodiacSign {
aries = 0;
taurus = 1;
gemini = 2;
cancer = 3;
leo = 4;
virgo = 5;
libra = 6;
scorpius = 7;
sagittarius = 8;
capricorn = 9;
aquarius = 10;
pisces = 11;
};
message Person {
message MeasureUnit
{
string unit = 1;
float coef = 2;
};
string uuid = 1;
string name = 2;
string surname = 3;
Gender gender = 4;
uint32 birthDate = 5;
bytes photo = 6;
string phoneNumber = 7;
bool isOnline = 8;
fixed32 visitTime = 9;
uint32 age = 10;
ZodiacSign zodiacSign = 11;
repeated string songs = 12;
repeated uint32 color = 13;
string hometown = 14;
repeated float location = 15;
double pi = 16;
double lotteryWin = 17;
float someRatio = 18;
float temperature = 19;
sint64 randomBigNumber = 20;
repeated MeasureUnit measureUnits = 21;
};
enum OnlineStatus {
offline = 0;
online = 1;
};
message AltPerson {
enum Gender {
male = 0;
female = 1;
};
message Dummy {
message Empty {};
repeated Empty empty = 1;
float z = 2;
};
repeated int32 location = 101 [packed=false];
float pi = 103;
bytes uuid = 300;
bool newFieldBool = 299;
string name = 2;
Gender gender = 102;
int32 zodiacSign = 130;
int64 birthDate = 150;
bytes age = 111;
OnlineStatus isOnline = 1;
double someRatio = 100;
fixed64 visitTime = 15;
Dummy newMessage = 1000;
sfixed64 randomBigNumber = 140;
repeated int32 newFieldInt = 104;
repeated float color = 14;
uint64 lotteryWin = 202;
bytes surname = 10;
uint64 phoneNumber = 5;
sint32 temperature = 41;
string newFieldStr = 21;
repeated string measureUnits_unit = 99;
repeated float measureUnits_coef = 88;
};
message StrPerson {
message MeasureUnits
{
repeated string unit = 1;
repeated string coef = 2;
};
string uuid = 1;
string name = 2;
string surname = 3;
string gender = 4;
string birthDate = 5;
string phoneNumber = 7;
string isOnline = 8;
string visitTime = 9;
string age = 10;
string zodiacSign = 11;
repeated string songs = 12;
repeated string color = 13;
string hometown = 14;
repeated string location = 15;
string pi = 16;
string lotteryWin = 17;
string someRatio = 18;
string temperature = 19;
string randomBigNumber = 20;
MeasureUnits measureUnits = 21;
};

View File

@ -5,9 +5,11 @@ add_executable (markov-model markov-model.cpp)
target_link_libraries(markov-model PRIVATE clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
if(USE_PROTOBUF)
protobuf_generate_cpp(ProtobufDelimitedMessagesSerializer_Srcs ProtobufDelimitedMessagesSerializer_Hdrs ${CMAKE_CURRENT_SOURCE_DIR}/../../dbms/tests/queries/0_stateless/00825_protobuf_format.proto)
protobuf_generate_cpp(ProtobufDelimitedMessagesSerializer_Srcs2 ProtobufDelimitedMessagesSerializer_Hdrs2 ${CMAKE_CURRENT_SOURCE_DIR}/../../dbms/tests/queries/0_stateless/00825_protobuf_format_syntax2.proto)
protobuf_generate_cpp(ProtobufDelimitedMessagesSerializer_Srcs ProtobufDelimitedMessagesSerializer_Hdrs ${CMAKE_CURRENT_SOURCE_DIR}/../../dbms/tests/queries/0_stateless/00825_protobuf_format_with_nested.proto)
protobuf_generate_cpp(ProtobufDelimitedMessagesSerializer_Srcs2 ProtobufDelimitedMessagesSerializer_Hdrs2 ${CMAKE_CURRENT_SOURCE_DIR}/../../dbms/tests/queries/0_stateless/00825_protobuf_format_syntax2_with_nested.proto)
add_executable (ProtobufDelimitedMessagesSerializer ProtobufDelimitedMessagesSerializer.cpp ${ProtobufDelimitedMessagesSerializer_Srcs} ${ProtobufDelimitedMessagesSerializer_Hdrs} ${ProtobufDelimitedMessagesSerializer_Srcs2} ${ProtobufDelimitedMessagesSerializer_Hdrs2})
target_include_directories (ProtobufDelimitedMessagesSerializer SYSTEM BEFORE PRIVATE ${Protobuf_INCLUDE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
target_link_libraries (ProtobufDelimitedMessagesSerializer PRIVATE ${Protobuf_LIBRARY})
target_link_libraries (ProtobufDelimitedMessagesSerializer PRIVATE ${Protobuf_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY})
get_filename_component(ProtobufDelimitedMessagesSerializer_OutputDir "${CMAKE_CURRENT_LIST_DIR}/../../dbms/tests/queries/0_stateless" REALPATH)
target_compile_definitions(ProtobufDelimitedMessagesSerializer PRIVATE OUTPUT_DIR="${ProtobufDelimitedMessagesSerializer_OutputDir}")
endif()

View File

@ -1,15 +1,33 @@
// Reference file generator for the test dbms/tests/queries/0_stateless/00825_protobuf_format_output.sh
// Generator of protobuf delimited messages used in the protobuf IO tests
// dbms/tests/queries/0_stateless/00825_protobuf_format*
#include <boost/program_options.hpp>
#include <fstream>
#include <iostream>
#include <google/protobuf/util/delimited_message_util.h>
#include "00825_protobuf_format.pb.h"
#include "00825_protobuf_format_syntax2.pb.h"
#include "00825_protobuf_format_with_nested.pb.h"
#include "00825_protobuf_format_syntax2_with_nested.pb.h"
int main(int, char **)
void writeInsertQueryCommand(std::ostream & out, const std::string & format_schema, std::stringstream & delimited_messages)
{
std::ostream* out = &std::cout;
out << "echo -ne '";
std::string bytes = delimited_messages.str();
delimited_messages.str("");
for (const char c : bytes)
{
char buf[5];
sprintf(buf, "\\x%02x", static_cast<unsigned char>(c));
out << buf;
}
out << "' | $CLICKHOUSE_CLIENT --query=\"INSERT INTO test.table FORMAT Protobuf"
" SETTINGS format_schema = '$CURDIR/"
<< format_schema << "'\"" << std::endl;
}
void writeInputInsertQueries(std::ostream & out)
{
std::stringstream ss;
{
Person person;
person.set_uuid("a7522158-3d41-4b77-ad69-6c598ee55c49");
@ -18,7 +36,7 @@ int main(int, char **)
person.set_gender(Gender::male);
person.set_birthdate(4015); // 1980-12-29
person.set_photo("png");
person.set_phonenumber(std::string("+74951234567\0", 13)); // Converted from FixedString(13)
person.set_phonenumber("+74951234567");
person.set_isonline(true);
person.set_visittime(1546703100); // 2019-01-05 18:45:00
person.set_age(38);
@ -36,7 +54,16 @@ int main(int, char **)
person.set_someratio(0.1);
person.set_temperature(5.8);
person.set_randombignumber(17060000000);
google::protobuf::util::SerializeDelimitedToOstream(person, out);
auto* mu = person.add_measureunits();
mu->set_unit("meter");
mu->set_coef(1);
mu = person.add_measureunits();
mu->set_unit("centimeter");
mu->set_coef(0.01);
mu = person.add_measureunits();
mu->set_unit("kilometer");
mu->set_coef(1000);
google::protobuf::util::SerializeDelimitedToOstream(person, &ss);
}
{
@ -60,7 +87,7 @@ int main(int, char **)
person.set_someratio(0.007);
person.set_temperature(5.4);
person.set_randombignumber(-20000000000000);
google::protobuf::util::SerializeDelimitedToOstream(person, out);
google::protobuf::util::SerializeDelimitedToOstream(person, &ss);
}
{
@ -88,10 +115,232 @@ int main(int, char **)
person.set_someratio(800);
person.set_temperature(-3.2);
person.set_randombignumber(154400000);
google::protobuf::util::SerializeDelimitedToOstream(person, out);
auto* mu = person.add_measureunits();
mu->set_unit("pound");
mu->set_coef(16);
google::protobuf::util::SerializeDelimitedToOstream(person, &ss);
}
*out << "ALTERNATIVE->" << std::endl;
writeInsertQueryCommand(out, "00825_protobuf_format_with_nested:Person", ss);
{
AltPerson person;
person.add_location(42);
person.add_location(-88);
person.set_pi(3.141);
person.set_uuid("20fcd95a-332d-41db-a9ec-161f644d059c");
person.set_name("Frida");
person.set_gender(AltPerson::female);
person.set_zodiacsign(1122); // sagittarius
person.set_birthdate(3267); // 1978-12-12
person.set_age("40");
person.set_isonline(OnlineStatus::offline);
person.set_someratio(0.5);
person.set_visittime(1363005000); // 2013-03-11 16:30:00
person.set_randombignumber(8010000009);
person.add_color(110);
person.add_color(210);
person.add_color(74);
person.set_lotterywin(311);
person.set_surname("Ermakova");
person.set_phonenumber(3124555929);
person.set_temperature(10);
person.add_measureunits_unit("KB");
person.add_measureunits_coef(1024);
person.add_measureunits_unit("MB");
person.add_measureunits_coef(1048576);
google::protobuf::util::SerializeDelimitedToOstream(person, &ss);
}
{
AltPerson person;
person.add_location(26);
person.add_location(-80);
person.set_pi(3.1416);
person.set_uuid("7cfa6856-a54a-4786-b8e5-745159d52278");
person.set_name("Isolde");
person.set_gender(AltPerson::female);
person.set_zodiacsign(120); // aquarius
person.set_birthdate(6248); // 1987-02-09
person.set_age("32");
person.set_isonline(OnlineStatus::online);
person.set_someratio(4.5);
person.set_randombignumber(-11111111111111);
person.add_color(255);
person.add_color(0);
person.add_color(255);
person.set_surname("Lavrova");
person.set_temperature(25);
person.set_newfieldstr("abc");
person.set_newfieldbool(true);
person.add_newfieldint(44);
person.add_measureunits_unit("Byte");
person.add_measureunits_coef(8);
person.add_measureunits_unit("Bit");
person.add_measureunits_coef(1);
person.mutable_newmessage()->set_z(91);
google::protobuf::util::SerializeDelimitedToOstream(person, &ss);
}
writeInsertQueryCommand(out, "00825_protobuf_format_with_nested:AltPerson", ss);
{
StrPerson person;
person.set_uuid("aa0e5a06-cab2-4034-a6a2-48e82b91664e");
person.set_name("Leonid");
person.set_surname("Kirillov");
person.set_gender("male");
person.set_birthdate("1983-06-24");
person.set_phonenumber("+74950275864");
person.set_isonline("1");
person.set_visittime("2019-02-04 09:45:00");
person.set_age("35");
person.set_zodiacsign("cancer");
person.add_songs("7 rings");
person.add_songs("Eastside");
person.add_songs("Last Hurrah");
person.add_color("0");
person.add_color("0");
person.add_color("255");
person.set_hometown("San Diego");
person.add_location("32.823943");
person.add_location("-117.081327");
person.set_pi("3.1415927");
person.set_lotterywin("15000000");
person.set_someratio("186.75");
person.set_temperature("-2.1");
person.set_randombignumber("20659829331");
person.mutable_measureunits()->add_unit("minute");
person.mutable_measureunits()->add_coef("60");
person.mutable_measureunits()->add_unit("hour");
person.mutable_measureunits()->add_coef("3600");
google::protobuf::util::SerializeDelimitedToOstream(person, &ss);
}
writeInsertQueryCommand(out, "00825_protobuf_format_with_nested:StrPerson", ss);
{
Syntax2Person person;
person.set_uuid("3faee064-c4f7-4d34-b6f3-8d81c2b6a15d");
person.set_name("Nick");
person.set_surname("Kolesnikov");
person.set_gender(Syntax2Person::male);
person.set_birthdate(10586); // 1998-12-26
person.set_photo("bmp");
person.set_phonenumber("412-687-5007");
person.set_isonline(true);
person.set_visittime(1542596399); // 2018-11-19 05:59:59
person.set_age(20);
person.set_zodiacsign(Syntax2Person::capricorn);
person.add_songs("Havana");
person.add_color(128);
person.add_color(0);
person.add_color(128);
person.set_hometown("Pittsburgh");
person.add_location(40.517193);
person.add_location(-79.949452);
person.set_pi(3.1415926535898);
person.set_lotterywin(50000000000);
person.set_someratio(780);
person.set_temperature(18.3);
person.set_randombignumber(195500007);
person.mutable_measureunits()->add_unit("ounce");
person.mutable_measureunits()->add_coef(28.35);
person.mutable_measureunits()->add_unit("carat");
person.mutable_measureunits()->add_coef(0.2);
person.mutable_measureunits()->add_unit("gram");
person.mutable_measureunits()->add_coef(1);
google::protobuf::util::SerializeDelimitedToOstream(person, &ss);
}
writeInsertQueryCommand(out, "00825_protobuf_format_syntax2_with_nested:Syntax2Person", ss);
}
void writeOutputReference(std::ostream & out)
{
{
Person person;
person.set_uuid("a7522158-3d41-4b77-ad69-6c598ee55c49");
person.set_name("Ivan");
person.set_surname("Petrov");
person.set_gender(Gender::male);
person.set_birthdate(4015); // 1980-12-29
person.set_photo("png");
person.set_phonenumber(std::string("+74951234567\0", 13)); // Converted from FixedString(13)
person.set_isonline(true);
person.set_visittime(1546703100); // 2019-01-05 18:45:00
person.set_age(38);
person.set_zodiacsign(ZodiacSign::capricorn);
person.add_songs("Yesterday");
person.add_songs("Flowers");
person.add_color(255);
person.add_color(0);
person.add_color(0);
person.set_hometown("Moscow");
person.add_location(55.753215);
person.add_location(37.622504);
person.set_pi(3.14);
person.set_lotterywin(214.10);
person.set_someratio(0.1);
person.set_temperature(5.8);
person.set_randombignumber(17060000000);
google::protobuf::util::SerializeDelimitedToOstream(person, &out);
}
{
Person person;
person.set_uuid("c694ad8a-f714-4ea3-907d-fd54fb25d9b5");
person.set_name("Natalia");
person.set_surname("Sokolova");
person.set_gender(Gender::female);
person.set_birthdate(8102); // 1992-03-08
person.set_photo("jpg");
person.set_isonline(false);
person.set_age(26);
person.set_zodiacsign(ZodiacSign::pisces);
person.add_color(100);
person.add_color(200);
person.add_color(50);
person.set_hometown("Plymouth");
person.add_location(50.403724);
person.add_location(-4.142123);
person.set_pi(3.14159);
person.set_someratio(0.007);
person.set_temperature(5.4);
person.set_randombignumber(-20000000000000);
google::protobuf::util::SerializeDelimitedToOstream(person, &out);
}
{
Person person;
person.set_uuid("a7da1aa6-f425-4789-8947-b034786ed374");
person.set_name("Vasily");
person.set_surname("Sidorov");
person.set_gender(Gender::male);
person.set_birthdate(9339); // 1995-07-28
person.set_photo("bmp");
person.set_phonenumber("+442012345678");
person.set_isonline(true);
person.set_visittime(1546117200); // 2018-12-30 00:00:00
person.set_age(23);
person.set_zodiacsign(ZodiacSign::leo);
person.add_songs("Sunny");
person.add_color(250);
person.add_color(244);
person.add_color(10);
person.set_hometown("Murmansk");
person.add_location(68.970682);
person.add_location(33.074981);
person.set_pi(3.14159265358979);
person.set_lotterywin(100000000000);
person.set_someratio(800);
person.set_temperature(-3.2);
person.set_randombignumber(154400000);
google::protobuf::util::SerializeDelimitedToOstream(person, &out);
}
out << "ALTERNATIVE->" << std::endl;
{
AltPerson person;
@ -115,7 +364,7 @@ int main(int, char **)
person.set_surname("Petrov");
person.set_phonenumber(+74951234567);
person.set_temperature(5);
google::protobuf::util::SerializeDelimitedToOstream(person, out);
google::protobuf::util::SerializeDelimitedToOstream(person, &out);
}
{
@ -137,7 +386,7 @@ int main(int, char **)
person.add_color(50);
person.set_surname("Sokolova");
person.set_temperature(5);
google::protobuf::util::SerializeDelimitedToOstream(person, out);
google::protobuf::util::SerializeDelimitedToOstream(person, &out);
}
{
@ -162,10 +411,10 @@ int main(int, char **)
person.set_surname("Sidorov");
person.set_phonenumber(+442012345678);
person.set_temperature(-3);
google::protobuf::util::SerializeDelimitedToOstream(person, out);
google::protobuf::util::SerializeDelimitedToOstream(person, &out);
}
*out << "STRINGS->" << std::endl;
out << "STRINGS->" << std::endl;
{
StrPerson person;
@ -192,7 +441,7 @@ int main(int, char **)
person.set_someratio("0.1");
person.set_temperature("5.8");
person.set_randombignumber("17060000000");
google::protobuf::util::SerializeDelimitedToOstream(person, out);
google::protobuf::util::SerializeDelimitedToOstream(person, &out);
}
{
@ -215,7 +464,7 @@ int main(int, char **)
person.set_someratio("0.007");
person.set_temperature("5.4");
person.set_randombignumber("-20000000000000");
google::protobuf::util::SerializeDelimitedToOstream(person, out);
google::protobuf::util::SerializeDelimitedToOstream(person, &out);
}
{
@ -242,10 +491,10 @@ int main(int, char **)
person.set_someratio("800");
person.set_temperature("-3.2");
person.set_randombignumber("154400000");
google::protobuf::util::SerializeDelimitedToOstream(person, out);
google::protobuf::util::SerializeDelimitedToOstream(person, &out);
}
*out << "SYNTAX2->" << std::endl;
out << "SYNTAX2->" << std::endl;
{
Syntax2Person person;
@ -273,7 +522,7 @@ int main(int, char **)
person.set_someratio(0.1);
person.set_temperature(5.8);
person.set_randombignumber(17060000000);
google::protobuf::util::SerializeDelimitedToOstream(person, out);
google::protobuf::util::SerializeDelimitedToOstream(person, &out);
}
{
@ -297,7 +546,7 @@ int main(int, char **)
person.set_someratio(0.007);
person.set_temperature(5.4);
person.set_randombignumber(-20000000000000);
google::protobuf::util::SerializeDelimitedToOstream(person, out);
google::protobuf::util::SerializeDelimitedToOstream(person, &out);
}
{
@ -325,8 +574,48 @@ int main(int, char **)
person.set_someratio(800);
person.set_temperature(-3.2);
person.set_randombignumber(154400000);
google::protobuf::util::SerializeDelimitedToOstream(person, out);
google::protobuf::util::SerializeDelimitedToOstream(person, &out);
}
}
void parseCommandLine(int argc, char ** argv, std::string & output_dir)
{
namespace po = boost::program_options;
po::options_description desc;
output_dir = OUTPUT_DIR;
desc.add_options()
("help,h", "Show help")
("directory,d", po::value<std::string>(&output_dir),
"Set the output directory. By default it's " OUTPUT_DIR);
po::parsed_options parsed = po::command_line_parser(argc, argv).options(desc).run();
po::variables_map vm;
po::store(parsed, vm);
po::notify(vm);
if (!output_dir.empty())
return;
// Show help.
std::cout << "This utility generates delimited messages for tests checking protobuf IO support." << std::endl;
std::cout << desc;
std::cout << "Example:" << std::endl;
std::cout << argv[0] << " -g OUTPUT_REFERENCE" << std::endl;
std::exit(0);
}
void writeFile(const std::string & filepath, void (*fn)(std::ostream &))
{
std::cout << "Writing '" << filepath << "' ... ";
std::fstream out(filepath, std::fstream::out | std::fstream::trunc);
fn(out);
std::cout << "done." << std::endl;
}
int main(int argc, char ** argv)
{
std::string output_dir;
parseCommandLine(argc, argv, output_dir);
writeFile(output_dir + "/00825_protobuf_format_input.insh", writeInputInsertQueries);
writeFile(output_dir + "/00825_protobuf_format_output.reference", writeOutputReference);
return 0;
}