mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Merge pull request #4158 from vitlibar/add-support-for-absolute-format-schema-paths
Add support for absolute format schema paths.
This commit is contained in:
commit
ba8fcfd32b
@ -222,7 +222,7 @@ private:
|
||||
|
||||
/// Set path for format schema files
|
||||
if (config().has("format_schema_path"))
|
||||
context.setFormatSchemaPath(Poco::Path(config().getString("format_schema_path")).toString() + "/");
|
||||
context.setFormatSchemaPath(Poco::Path(config().getString("format_schema_path")).toString());
|
||||
}
|
||||
|
||||
|
||||
|
@ -418,7 +418,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
|
||||
/// Set path for format schema files
|
||||
auto format_schema_path = Poco::File(config().getString("format_schema_path", path + "format_schemas/"));
|
||||
global_context->setFormatSchemaPath(format_schema_path.path() + "/");
|
||||
global_context->setFormatSchemaPath(format_schema_path.path());
|
||||
format_schema_path.createDirectories();
|
||||
|
||||
LOG_INFO(log, "Loading metadata.");
|
||||
|
@ -27,7 +27,9 @@ FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & schem
|
||||
}
|
||||
|
||||
size_t colon_pos = format_schema.find(':');
|
||||
if ((colon_pos == String::npos) || (colon_pos == 0) || (colon_pos == format_schema.length() - 1))
|
||||
Poco::Path path;
|
||||
if ((colon_pos == String::npos) || (colon_pos == 0) || (colon_pos == format_schema.length() - 1)
|
||||
|| path.assign(format_schema.substr(0, colon_pos)).getFileName().empty())
|
||||
{
|
||||
throw Exception(
|
||||
"Format schema requires the 'format_schema' setting to have the 'schema_file:message_name' format"
|
||||
@ -36,25 +38,44 @@ FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & schem
|
||||
ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
|
||||
Poco::Path path(format_schema.substr(0, colon_pos));
|
||||
if (context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER))
|
||||
{
|
||||
if (path.isAbsolute())
|
||||
throw Exception("Absolute path in the 'format_schema' setting is prohibited: " + path.toString(), ErrorCodes::BAD_ARGUMENTS);
|
||||
is_null = false;
|
||||
message_name = format_schema.substr(colon_pos + 1);
|
||||
|
||||
if (path.depth() >= 1 && path.directory(0) == "..")
|
||||
throw Exception(
|
||||
"Path in the 'format_schema' setting shouldn't go outside the 'format_schema_path' directory: " + path.toString(),
|
||||
ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
auto default_schema_directory = [&context]()
|
||||
{
|
||||
static const String str = Poco::Path(context.getFormatSchemaPath()).makeAbsolute().makeDirectory().toString();
|
||||
return str;
|
||||
};
|
||||
auto is_server = [&context]()
|
||||
{
|
||||
return context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
|
||||
};
|
||||
|
||||
if (path.getExtension().empty() && !schema_file_extension.empty())
|
||||
path.setExtension(schema_file_extension);
|
||||
|
||||
schema_path = path.toString();
|
||||
schema_directory = context.getFormatSchemaPath();
|
||||
message_name = format_schema.substr(colon_pos + 1);
|
||||
is_null = false;
|
||||
if (path.isAbsolute())
|
||||
{
|
||||
if (is_server())
|
||||
throw Exception("Absolute path in the 'format_schema' setting is prohibited: " + path.toString(), ErrorCodes::BAD_ARGUMENTS);
|
||||
schema_path = path.getFileName();
|
||||
schema_directory = path.makeParent().toString();
|
||||
}
|
||||
else if (path.depth() >= 1 && path.directory(0) == "..")
|
||||
{
|
||||
if (is_server())
|
||||
throw Exception(
|
||||
"Path in the 'format_schema' setting shouldn't go outside the 'format_schema_path' directory: " + path.toString(),
|
||||
ErrorCodes::BAD_ARGUMENTS);
|
||||
path = Poco::Path(default_schema_directory()).resolve(path).toString();
|
||||
schema_path = path.getFileName();
|
||||
schema_directory = path.makeParent().toString();
|
||||
}
|
||||
else
|
||||
{
|
||||
schema_path = path.toString();
|
||||
schema_directory = default_schema_directory();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <Formats/FormatSchemaInfo.h>
|
||||
#include <Formats/ProtobufSchemas.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <google/protobuf/descriptor.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
|
@ -1,11 +1,10 @@
|
||||
#include <Common/config.h>
|
||||
#if USE_PROTOBUF
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Formats/FormatSchemaInfo.h>
|
||||
#include <Formats/ProtobufSchemas.h>
|
||||
#include <Poco/Path.h>
|
||||
#include <google/protobuf/compiler/importer.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -13,48 +12,66 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int CANNOT_PARSE_PROTOBUF_SCHEMA;
|
||||
}
|
||||
|
||||
ProtobufSchemas::ProtobufSchemas()
|
||||
: disk_source_tree(new google::protobuf::compiler::DiskSourceTree())
|
||||
, importer(new google::protobuf::compiler::Importer(disk_source_tree.get(), this))
|
||||
{
|
||||
}
|
||||
|
||||
class ProtobufSchemas::ImporterWithSourceTree : public google::protobuf::compiler::MultiFileErrorCollector
|
||||
{
|
||||
public:
|
||||
ImporterWithSourceTree(const String & schema_directory) : importer(&disk_source_tree, this)
|
||||
{
|
||||
disk_source_tree.MapPath("", schema_directory);
|
||||
}
|
||||
|
||||
~ImporterWithSourceTree() override = default;
|
||||
|
||||
const google::protobuf::Descriptor * import(const String & schema_path, const String & message_name)
|
||||
{
|
||||
// Search the message type among already imported ones.
|
||||
const auto * descriptor = importer.pool()->FindMessageTypeByName(message_name);
|
||||
if (descriptor)
|
||||
return descriptor;
|
||||
|
||||
const auto * file_descriptor = importer.Import(schema_path);
|
||||
// If there are parsing errors AddError() throws an exception and in this case the following line
|
||||
// isn't executed.
|
||||
assert(file_descriptor);
|
||||
|
||||
descriptor = file_descriptor->FindMessageTypeByName(message_name);
|
||||
if (!descriptor)
|
||||
throw Exception(
|
||||
"Not found a message named '" + message_name + "' in the schema file '" + schema_path + "'", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
return descriptor;
|
||||
}
|
||||
|
||||
private:
|
||||
// Overrides google::protobuf::compiler::MultiFileErrorCollector:
|
||||
void AddError(const String & filename, int line, int column, const String & message) override
|
||||
{
|
||||
throw Exception(
|
||||
"Cannot parse '" + filename + "' file, found an error at line " + std::to_string(line) + ", column " + std::to_string(column)
|
||||
+ ", " + message,
|
||||
ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA);
|
||||
}
|
||||
|
||||
google::protobuf::compiler::DiskSourceTree disk_source_tree;
|
||||
google::protobuf::compiler::Importer importer;
|
||||
};
|
||||
|
||||
|
||||
ProtobufSchemas::ProtobufSchemas() = default;
|
||||
ProtobufSchemas::~ProtobufSchemas() = default;
|
||||
|
||||
const google::protobuf::Descriptor *
|
||||
ProtobufSchemas::getMessageTypeForFormatSchema(const FormatSchemaInfo& info)
|
||||
const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForFormatSchema(const FormatSchemaInfo & info)
|
||||
{
|
||||
// Search the message type among already imported ones.
|
||||
const auto * descriptor = importer->pool()->FindMessageTypeByName(info.messageName());
|
||||
if (descriptor)
|
||||
return descriptor;
|
||||
|
||||
// Initialize mapping in protobuf's DiskSourceTree.
|
||||
if (proto_directory.has_value())
|
||||
{
|
||||
assert(*proto_directory == info.schemaDirectory()); // format_schema_path should not be changed!
|
||||
}
|
||||
else
|
||||
{
|
||||
proto_directory = info.schemaDirectory();
|
||||
disk_source_tree->MapPath("", *proto_directory);
|
||||
}
|
||||
|
||||
const auto * file_descriptor = importer->Import(info.schemaPath());
|
||||
|
||||
// If there parsing errors AddError() throws an exception and in this case the following line
|
||||
// isn't executed.
|
||||
assert(file_descriptor);
|
||||
|
||||
descriptor = file_descriptor->FindMessageTypeByName(info.messageName());
|
||||
if (!descriptor)
|
||||
throw Exception(
|
||||
"Not found a message named '" + info.messageName() + "' in the schema file '" + info.schemaPath() + "'",
|
||||
ErrorCodes::BAD_ARGUMENTS);
|
||||
return descriptor;
|
||||
auto it = importers.find(info.schemaDirectory());
|
||||
if (it == importers.end())
|
||||
it = importers.emplace(info.schemaDirectory(), std::make_unique<ImporterWithSourceTree>(info.schemaDirectory())).first;
|
||||
auto * importer = it->second.get();
|
||||
return importer->import(info.schemaPath(), info.messageName());
|
||||
}
|
||||
|
||||
const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForColumns(const std::vector<ColumnWithTypeAndName> & /*columns*/)
|
||||
@ -62,14 +79,6 @@ const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForColumns(c
|
||||
throw Exception("Using the 'Protobuf' format without schema is not implemented", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
void ProtobufSchemas::AddError(const String & filename, int line, int column, const String & message)
|
||||
{
|
||||
throw Exception(
|
||||
"Cannot parse '" + filename + "' file, found an error at line " + std::to_string(line) + ", column " + std::to_string(column) + ", "
|
||||
+ message,
|
||||
ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -3,9 +3,10 @@
|
||||
#include <Common/config.h>
|
||||
#if USE_PROTOBUF
|
||||
|
||||
#include <optional>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
#include <Core/Types.h>
|
||||
#include <google/protobuf/compiler/importer.h>
|
||||
#include <ext/singleton.h>
|
||||
|
||||
|
||||
@ -19,34 +20,29 @@ namespace protobuf
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class Block;
|
||||
class FormatSchemaInfo;
|
||||
struct ColumnWithTypeAndName;
|
||||
|
||||
/** Keeps parsed google protobuf schemas either parsed from files or generated from DB columns.
|
||||
* This class is used to handle the "Protobuf" input/output formats.
|
||||
*/
|
||||
class ProtobufSchemas : public ext::singleton<ProtobufSchemas>, public google::protobuf::compiler::MultiFileErrorCollector
|
||||
class ProtobufSchemas : public ext::singleton<ProtobufSchemas>
|
||||
{
|
||||
public:
|
||||
ProtobufSchemas();
|
||||
~ProtobufSchemas() override;
|
||||
~ProtobufSchemas();
|
||||
|
||||
/// Parses the format schema, then parses the corresponding proto file, and returns the descriptor of the message type.
|
||||
/// The function never returns nullptr, it throws an exception if it cannot load or parse the file.
|
||||
const google::protobuf::Descriptor * getMessageTypeForFormatSchema(const FormatSchemaInfo& info);
|
||||
const google::protobuf::Descriptor * getMessageTypeForFormatSchema(const FormatSchemaInfo & info);
|
||||
|
||||
/// Generates a message type with suitable types of fields to store a block with |header|, then returns the descriptor
|
||||
/// of the generated message type.
|
||||
const google::protobuf::Descriptor * getMessageTypeForColumns(const std::vector<ColumnWithTypeAndName> & columns);
|
||||
|
||||
private:
|
||||
// Overrides google::protobuf::compiler::MultiFileErrorCollector:
|
||||
void AddError(const String & filename, int line, int column, const String & message) override;
|
||||
|
||||
std::optional<String> proto_directory;
|
||||
std::unique_ptr<google::protobuf::compiler::DiskSourceTree> disk_source_tree;
|
||||
std::unique_ptr<google::protobuf::compiler::Importer> importer;
|
||||
class ImporterWithSourceTree;
|
||||
std::unordered_map<String, std::unique_ptr<ImporterWithSourceTree>> importers;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -9,12 +9,8 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
|
||||
set -e -o pipefail
|
||||
|
||||
# Copy schema files to the current directory because the client can open schemas from the current directory only.
|
||||
cp "$CURDIR/00825_protobuf_format.proto" 00825_protobuf_format_copy.proto
|
||||
cp "$CURDIR/00825_protobuf_format_syntax2.proto" 00825_protobuf_format_syntax2_copy.proto
|
||||
|
||||
# Run the client.
|
||||
$CLICKHOUSE_CLIENT --multiquery <<'EOF'
|
||||
$CLICKHOUSE_CLIENT --multiquery <<EOF
|
||||
SET allow_experimental_low_cardinality_type = 1;
|
||||
CREATE DATABASE IF NOT EXISTS test;
|
||||
DROP TABLE IF EXISTS test.table;
|
||||
@ -47,14 +43,11 @@ INSERT INTO test.table VALUES (toUUID('a7522158-3d41-4b77-ad69-6c598ee55c49'), '
|
||||
INSERT INTO test.table VALUES (toUUID('c694ad8a-f714-4ea3-907d-fd54fb25d9b5'), 'Natalia', 'Sokolova', 'female', toDate('1992-03-08'), 'jpg', NULL, 0, NULL, 26, 'pisces', [], [100, 200, 50], 'Plymouth', [50.403724, -4.142123], 3.14159, NULL, 0.007, 5.4, -20000000000000);
|
||||
INSERT INTO test.table VALUES (toUUID('a7da1aa6-f425-4789-8947-b034786ed374'), 'Vasily', 'Sidorov', 'male', toDate('1995-07-28'), 'bmp', '+442012345678', 1, toDateTime('2018-12-30 00:00:00'), 23, 'leo', ['Sunny'], [250, 244, 10], 'Murmansk', [68.970682, 33.074981], 3.14159265358979, 100000000000, 800, -3.2, 154400000);
|
||||
|
||||
SELECT * FROM test.table ORDER BY name FORMAT Protobuf SETTINGS format_schema = '00825_protobuf_format_copy:Person';
|
||||
SELECT * FROM test.table ORDER BY name FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format:Person';
|
||||
SELECT 'ALTERNATIVE->';
|
||||
SELECT * FROM test.table ORDER BY name FORMAT Protobuf SETTINGS format_schema = '00825_protobuf_format_copy:AltPerson';
|
||||
SELECT * FROM test.table ORDER BY name FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format:AltPerson';
|
||||
SELECT 'STRINGS->';
|
||||
SELECT * FROM test.table ORDER BY name FORMAT Protobuf SETTINGS format_schema = '00825_protobuf_format_copy:StrPerson';
|
||||
SELECT * FROM test.table ORDER BY name FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format:StrPerson';
|
||||
SELECT 'SYNTAX2->';
|
||||
SELECT * FROM test.table ORDER BY name FORMAT Protobuf SETTINGS format_schema = '00825_protobuf_format_syntax2_copy:Syntax2Person';
|
||||
SELECT * FROM test.table ORDER BY name FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format_syntax2:Syntax2Person';
|
||||
EOF
|
||||
|
||||
# Remove copies of the schema files.
|
||||
rm "00825_protobuf_format_copy.proto" "00825_protobuf_format_syntax2_copy.proto"
|
||||
|
Loading…
Reference in New Issue
Block a user