ClickHouse/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp

117 lines
3.3 KiB
C++
Raw Normal View History

#include "ProtobufRowInputFormat.h"
2019-08-04 12:43:11 +00:00
#if USE_PROTOBUF
# include <Core/Block.h>
# include <Formats/FormatFactory.h>
# include <Formats/FormatSchemaInfo.h>
# include <Formats/ProtobufReader.h>
# include <Formats/ProtobufSchemas.h>
# include <Formats/ProtobufSerializer.h>
# include <Interpreters/Context.h>
2021-10-02 07:13:14 +00:00
# include <base/range.h>
namespace DB
{
ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, const Params & params_, const FormatSchemaInfo & schema_info_, bool with_length_delimiter_)
2019-08-03 11:02:40 +00:00
: IRowInputFormat(header_, in_, params_)
, reader(std::make_unique<ProtobufReader>(in_))
, serializer(ProtobufSerializer::create(
header_.getNames(),
header_.getDataTypes(),
missing_column_indices,
*ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_),
with_length_delimiter_,
*reader))
{
}
ProtobufRowInputFormat::~ProtobufRowInputFormat() = default;
bool ProtobufRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & row_read_extension)
{
if (reader->eof())
return false;
size_t row_num = columns.empty() ? 0 : columns[0]->size();
if (!row_num)
serializer->setColumns(columns.data(), columns.size());
serializer->readRow(row_num);
row_read_extension.read_columns.clear();
row_read_extension.read_columns.resize(columns.size(), true);
for (size_t column_idx : missing_column_indices)
row_read_extension.read_columns[column_idx] = false;
return true;
}
bool ProtobufRowInputFormat::allowSyncAfterError() const
{
return true;
}
void ProtobufRowInputFormat::syncAfterError()
{
reader->endMessage(true);
}
2021-10-11 16:11:50 +00:00
void registerInputFormatProtobuf(FormatFactory & factory)
{
for (bool with_length_delimiter : {false, true})
{
2021-10-11 16:11:50 +00:00
factory.registerInputFormat(with_length_delimiter ? "Protobuf" : "ProtobufSingle", [with_length_delimiter](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<ProtobufRowInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings, "Protobuf", true),
with_length_delimiter);
});
}
}
ProtobufSchemaReader::ProtobufSchemaReader(const FormatSettings & format_settings)
: schema_info(
format_settings.schema.format_schema,
"Protobuf",
true,
format_settings.schema.is_server,
format_settings.schema.format_schema_path)
{
}
NamesAndTypesList ProtobufSchemaReader::readSchema()
{
const auto * message_descriptor = ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info);
return protobufSchemaToCHSchema(message_descriptor);
}
void registerProtobufSchemaReader(FormatFactory & factory)
{
factory.registerExternalSchemaReader("Protobuf", [](const FormatSettings & settings)
{
return std::make_shared<ProtobufSchemaReader>(settings);
});
factory.registerExternalSchemaReader("ProtobufSingle", [](const FormatSettings & settings)
{
return std::make_shared<ProtobufSchemaReader>(settings);
});
}
}
#else
namespace DB
{
class FormatFactory;
2021-10-11 16:11:50 +00:00
void registerInputFormatProtobuf(FormatFactory &) {}
void registerProtobufSchemaReader(FormatFactory &) {}
}
#endif