mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Implement a new row input stream for reading protobuf messages.
This commit is contained in:
parent
670968af97
commit
eef02034b2
@ -114,6 +114,7 @@ void registerInputFormatJSONEachRow(FormatFactory & factory);
|
||||
void registerOutputFormatJSONEachRow(FormatFactory & factory);
|
||||
void registerInputFormatParquet(FormatFactory & factory);
|
||||
void registerOutputFormatParquet(FormatFactory & factory);
|
||||
void registerInputFormatProtobuf(FormatFactory & factory);
|
||||
void registerOutputFormatProtobuf(FormatFactory & factory);
|
||||
|
||||
/// Output only (presentational) formats.
|
||||
@ -150,6 +151,7 @@ FormatFactory::FormatFactory()
|
||||
registerOutputFormatTSKV(*this);
|
||||
registerInputFormatJSONEachRow(*this);
|
||||
registerOutputFormatJSONEachRow(*this);
|
||||
registerInputFormatProtobuf(*this);
|
||||
registerOutputFormatProtobuf(*this);
|
||||
registerInputFormatCapnProto(*this);
|
||||
registerInputFormatParquet(*this);
|
||||
|
93
dbms/src/Formats/ProtobufRowInputStream.cpp
Normal file
93
dbms/src/Formats/ProtobufRowInputStream.cpp
Normal file
@ -0,0 +1,93 @@
|
||||
#include <Common/config.h>
|
||||
#if USE_PROTOBUF
|
||||
|
||||
#include <Core/Block.h>
|
||||
#include <Formats/BlockInputStreamFromRowInputStream.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Formats/FormatSchemaInfo.h>
|
||||
#include <Formats/ProtobufRowInputStream.h>
|
||||
#include <Formats/ProtobufSchemas.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
ProtobufRowInputStream::ProtobufRowInputStream(ReadBuffer & in_, const Block & header, const FormatSchemaInfo & info)
|
||||
: data_types(header.getDataTypes()), reader(in_, ProtobufSchemas::instance().getMessageTypeForFormatSchema(info), header.getNames())
|
||||
{
|
||||
}
|
||||
|
||||
ProtobufRowInputStream::~ProtobufRowInputStream() = default;
|
||||
|
||||
bool ProtobufRowInputStream::read(MutableColumns & columns, RowReadExtension & extra)
|
||||
{
|
||||
if (!reader.startMessage())
|
||||
return false; // EOF reached, no more messages.
|
||||
|
||||
// Set of columns for which the values were read. The rest will be filled with default values.
|
||||
auto & read_columns = extra.read_columns;
|
||||
read_columns.assign(columns.size(), false);
|
||||
|
||||
// Read values from this message and put them to the columns while it's possible.
|
||||
size_t column_index;
|
||||
while (reader.readColumnIndex(column_index))
|
||||
{
|
||||
bool allow_add_row = !static_cast<bool>(read_columns[column_index]);
|
||||
do
|
||||
{
|
||||
bool row_added;
|
||||
data_types[column_index]->deserializeProtobuf(*columns[column_index], reader, allow_add_row, row_added);
|
||||
if (row_added)
|
||||
{
|
||||
read_columns[column_index] = true;
|
||||
allow_add_row = false;
|
||||
}
|
||||
} while (reader.maybeCanReadValue());
|
||||
}
|
||||
|
||||
// Fill non-visited columns with the default values.
|
||||
for (column_index = 0; column_index < read_columns.size(); ++column_index)
|
||||
if (!read_columns[column_index])
|
||||
data_types[column_index]->insertDefaultInto(*columns[column_index]);
|
||||
|
||||
reader.endMessage();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ProtobufRowInputStream::allowSyncAfterError() const
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
void ProtobufRowInputStream::syncAfterError()
|
||||
{
|
||||
reader.endMessage();
|
||||
}
|
||||
|
||||
|
||||
void registerInputFormatProtobuf(FormatFactory & factory)
|
||||
{
|
||||
factory.registerInputFormat("Protobuf", [](
|
||||
ReadBuffer & buf,
|
||||
const Block & sample,
|
||||
const Context & context,
|
||||
UInt64 max_block_size,
|
||||
const FormatSettings & settings)
|
||||
{
|
||||
return std::make_shared<BlockInputStreamFromRowInputStream>(
|
||||
std::make_shared<ProtobufRowInputStream>(buf, sample, FormatSchemaInfo(context, "proto")),
|
||||
sample, max_block_size, settings);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class FormatFactory;
|
||||
void registerInputFormatProtobuf(FormatFactory & factory) {}
|
||||
}
|
||||
|
||||
#endif
|
34
dbms/src/Formats/ProtobufRowInputStream.h
Normal file
34
dbms/src/Formats/ProtobufRowInputStream.h
Normal file
@ -0,0 +1,34 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/config.h>
|
||||
#if USE_PROTOBUF
|
||||
|
||||
#include <DataTypes/IDataType.h>
|
||||
#include <Formats/IRowInputStream.h>
|
||||
#include <Formats/ProtobufReader.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class Block;
|
||||
class FormatSchemaInfo;
|
||||
|
||||
|
||||
/** Interface of stream, that allows to read data by rows.
|
||||
*/
|
||||
class ProtobufRowInputStream : public IRowInputStream
|
||||
{
|
||||
public:
|
||||
ProtobufRowInputStream(ReadBuffer & in_, const Block & header, const FormatSchemaInfo & info);
|
||||
~ProtobufRowInputStream() override;
|
||||
|
||||
bool read(MutableColumns & columns, RowReadExtension & extra) override;
|
||||
bool allowSyncAfterError() const override;
|
||||
void syncAfterError() override;
|
||||
|
||||
private:
|
||||
DataTypes data_types;
|
||||
ProtobufReader reader;
|
||||
};
|
||||
|
||||
}
|
||||
#endif
|
Loading…
Reference in New Issue
Block a user