Implemented storage for parsed protobuf schemas.

This commit is contained in:
Vitaly Baranov 2019-01-23 22:36:57 +03:00
parent 28e9837819
commit 42d9d4e81d
3 changed files with 120 additions and 0 deletions

View File

@ -408,6 +408,7 @@ namespace ErrorCodes
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE = 431; extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE = 431;
extern const int UNKNOWN_CODEC = 432; extern const int UNKNOWN_CODEC = 432;
extern const int ILLEGAL_CODEC_PARAMETER = 433; extern const int ILLEGAL_CODEC_PARAMETER = 433;
extern const int CANNOT_PARSE_PROTOBUF_SCHEMA = 434;
extern const int KEEPER_EXCEPTION = 999; extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000; extern const int POCO_EXCEPTION = 1000;

View File

@ -0,0 +1,70 @@
#include <Common/Exception.h>
#include <Core/Block.h>
#include <Formats/FormatSchemaInfo.h>
#include <Formats/ProtobufSchemas.h>
#include <Poco/Path.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int CANNOT_PARSE_PROTOBUF_SCHEMA;
}
ProtobufSchemas::ProtobufSchemas()
: disk_source_tree(new google::protobuf::compiler::DiskSourceTree())
, importer(new google::protobuf::compiler::Importer(disk_source_tree.get(), this))
{
}
ProtobufSchemas::~ProtobufSchemas() = default;
const google::protobuf::Descriptor *
ProtobufSchemas::getMessageTypeForFormatSchema(const FormatSchemaInfo& info)
{
// Search the message type among already imported ones.
const auto * descriptor = importer->pool()->FindMessageTypeByName(info.messageName());
if (descriptor)
return descriptor;
// Initialize mapping in protobuf's DiskSourceTree.
if (proto_directory.has_value())
{
assert(*proto_directory == info.schemaDirectory()); // format_schema_path should not be changed!
}
else
{
proto_directory = info.schemaDirectory();
disk_source_tree->MapPath("", *proto_directory);
}
const auto * file_descriptor = importer->Import(info.schemaPath());
// If there parsing errors AddError() throws an exception and in this case the following line
// isn't executed.
assert(file_descriptor);
descriptor = file_descriptor->FindMessageTypeByName(info.messageName());
if (!descriptor)
throw Exception(
"Not found a message named '" + info.messageName() + "' in the schema file '" + info.schemaPath() + "'",
ErrorCodes::BAD_ARGUMENTS);
return descriptor;
}
const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForColumns(const std::vector<ColumnWithTypeAndName> & /*columns*/)
{
throw Exception("Using the 'Protobuf' format without schema is not implemented", ErrorCodes::NOT_IMPLEMENTED);
}
void ProtobufSchemas::AddError(const String & filename, int line, int column, const String & message)
{
throw Exception(
"Cannot parse '" + filename + "' file, found an error at line " + std::to_string(line) + ", column " + std::to_string(column) + ", "
+ message,
ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA);
}
}

View File

@ -0,0 +1,49 @@
#pragma once
#include <optional>
#include <Core/Types.h>
#include <google/protobuf/compiler/importer.h>
#include <ext/singleton.h>
namespace google
{
namespace protobuf
{
class Descriptor;
}
}
namespace DB
{
class Block;
class FormatSchemaInfo;
struct ColumnWithTypeAndName;
/** Keeps parsed google protobuf schemas either parsed from files or generated from DB columns.
* This class is used to handle the "Protobuf" input/output formats.
*/
class ProtobufSchemas : public ext::singleton<ProtobufSchemas>, public google::protobuf::compiler::MultiFileErrorCollector
{
public:
ProtobufSchemas();
~ProtobufSchemas() override;
/// Parses the format schema, then parses the corresponding proto file, and returns the descriptor of the message type.
/// The function never returns nullptr, it throws an exception if it cannot load or parse the file.
const google::protobuf::Descriptor * getMessageTypeForFormatSchema(const FormatSchemaInfo& info);
/// Generates a message type with suitable types of fields to store a block with |header|, then returns the descriptor
/// of the generated message type.
const google::protobuf::Descriptor * getMessageTypeForColumns(const std::vector<ColumnWithTypeAndName> & columns);
private:
// Overrides google::protobuf::compiler::MultiFileErrorCollector:
void AddError(const String & filename, int line, int column, const String & message) override;
std::optional<String> proto_directory;
std::unique_ptr<google::protobuf::compiler::DiskSourceTree> disk_source_tree;
std::unique_ptr<google::protobuf::compiler::Importer> importer;
};
}