Remove ProtobufRowInputStream.

This commit is contained in:
Nikolai Kochetov 2019-08-02 19:23:44 +03:00
parent bd4f0182e4
commit 8c66d106e4
5 changed files with 8 additions and 140 deletions

View File

@ -208,7 +208,6 @@ void registerOutputFormatCSV(FormatFactory & factory);
void registerInputFormatTSKV(FormatFactory & factory);
void registerOutputFormatTSKV(FormatFactory & factory);
void registerOutputFormatTSKV(FormatFactory & factory);
void registerInputFormatProtobuf(FormatFactory & factory);
void registerOutputFormatProtobuf(FormatFactory & factory);
void registerInputFormatProcessorNative(FormatFactory & factory);
@ -263,7 +262,6 @@ FormatFactory::FormatFactory()
registerOutputFormatCSV(*this);
registerInputFormatTSKV(*this);
registerOutputFormatTSKV(*this);
registerInputFormatProtobuf(*this);
registerOutputFormatProtobuf(*this);
registerInputFormatProcessorNative(*this);

View File

@ -1,96 +0,0 @@
#include "config_formats.h"
#if USE_PROTOBUF
#include "ProtobufRowInputStream.h"
#include <Core/Block.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/FormatSchemaInfo.h>
#include <Formats/ProtobufSchemas.h>
namespace DB
{
ProtobufRowInputStream::ProtobufRowInputStream(ReadBuffer & in_, const Block & header, const FormatSchemaInfo & format_schema)
: data_types(header.getDataTypes()), reader(in_, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames())
{
}
ProtobufRowInputStream::~ProtobufRowInputStream() = default;
bool ProtobufRowInputStream::read(MutableColumns & columns, RowReadExtension & extra)
{
if (!reader.startMessage())
return false; // EOF reached, no more messages.
// Set of columns for which the values were read. The rest will be filled with default values.
auto & read_columns = extra.read_columns;
read_columns.assign(columns.size(), false);
// Read values from this message and put them to the columns while it's possible.
size_t column_index;
while (reader.readColumnIndex(column_index))
{
bool allow_add_row = !static_cast<bool>(read_columns[column_index]);
do
{
bool row_added;
data_types[column_index]->deserializeProtobuf(*columns[column_index], reader, allow_add_row, row_added);
if (row_added)
{
read_columns[column_index] = true;
allow_add_row = false;
}
} while (reader.canReadMoreValues());
}
// Fill non-visited columns with the default values.
for (column_index = 0; column_index < read_columns.size(); ++column_index)
if (!read_columns[column_index])
data_types[column_index]->insertDefaultInto(*columns[column_index]);
reader.endMessage();
return true;
}
bool ProtobufRowInputStream::allowSyncAfterError() const
{
return true;
}
void ProtobufRowInputStream::syncAfterError()
{
reader.endMessage(true);
}
void registerInputFormatProtobuf(FormatFactory & factory)
{
factory.registerInputFormat("Protobuf", [](
ReadBuffer & buf,
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<ProtobufRowInputStream>(buf, sample, FormatSchemaInfo(context, "Protobuf")),
sample, max_block_size, rows_portion_size, callback, settings);
});
}
}
#else
namespace DB
{
class FormatFactory;
void registerInputFormatProtobuf(FormatFactory &) {}
}
#endif

View File

@ -1,40 +0,0 @@
#pragma once
#include "config_formats.h"
#if USE_PROTOBUF
#include <DataTypes/IDataType.h>
#include <Formats/IRowInputStream.h>
#include <Formats/ProtobufReader.h>
namespace DB
{
class Block;
class FormatSchemaInfo;
/** Stream designed to deserialize data from the google protobuf format.
* Each row is read as a separated message.
* These messages are delimited according to documentation
* https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/util/delimited_message_util.h
* Serializing in the protobuf format requires the 'format_schema' setting to be set, e.g.
* INSERT INTO table FORMAT Protobuf SETTINGS format_schema = 'schema:Message'
* where schema is the name of "schema.proto" file specifying protobuf schema.
*/
class ProtobufRowInputStream : public IRowInputStream
{
public:
ProtobufRowInputStream(ReadBuffer & in_, const Block & header, const FormatSchemaInfo & format_schema);
~ProtobufRowInputStream() override;
bool read(MutableColumns & columns, RowReadExtension & extra) override;
bool allowSyncAfterError() const override;
void syncAfterError() override;
private:
DataTypes data_types;
ProtobufReader reader;
};
}
#endif

View File

@ -75,7 +75,7 @@ void registerInputFormatProcessorProtobuf(FormatFactory & factory)
IRowInputFormat::Params params,
const FormatSettings &)
{
return std::make_shared<ProtobufRowInputFormat>(buf, sample, params, FormatSchemaInfo(context, "Protobuf"));
return std::make_shared<ProtobufRowInputFormat>(buf, sample, std::move(params), FormatSchemaInfo(context, "Protobuf"));
});
}

View File

@ -13,7 +13,13 @@ class Block;
class FormatSchemaInfo;
/** Interface of stream, that allows to read data by rows.
/** Stream designed to deserialize data from the google protobuf format.
* Each row is read as a separated message.
* These messages are delimited according to documentation
* https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/util/delimited_message_util.h
* Serializing in the protobuf format requires the 'format_schema' setting to be set, e.g.
* INSERT INTO table FORMAT Protobuf SETTINGS format_schema = 'schema:Message'
* where schema is the name of "schema.proto" file specifying protobuf schema.
*/
class ProtobufRowInputFormat : public IRowInputFormat
{