From 64a892c0e6c3ce3293ecdf5bf10db3a5abe62499 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Vavrus=CC=8Ca?= Date: Thu, 9 Nov 2017 22:48:28 -0800 Subject: [PATCH] DataStreams: CapnProto uses config option This addresses one of the remarks in the PR. All format schemas are required to be in the directory. This makes loading schema files less tedious, as the path can be omitted. --- dbms/src/DataStreams/CapnProtoRowInputStream.cpp | 16 +++++++++++----- dbms/src/DataStreams/CapnProtoRowInputStream.h | 9 +++++++-- dbms/src/DataStreams/FormatFactory.cpp | 5 +++-- dbms/src/Interpreters/Context.cpp | 11 +++++++++++ dbms/src/Interpreters/Context.h | 4 ++++ dbms/src/Server/Server.cpp | 5 +++++ dbms/src/Server/config.xml | 5 +++++ docs/en/formats/capnproto.rst | 12 ++++++++++-- 8 files changed, 56 insertions(+), 11 deletions(-) diff --git a/dbms/src/DataStreams/CapnProtoRowInputStream.cpp b/dbms/src/DataStreams/CapnProtoRowInputStream.cpp index fa48e450ca8..c5dda02d19e 100644 --- a/dbms/src/DataStreams/CapnProtoRowInputStream.cpp +++ b/dbms/src/DataStreams/CapnProtoRowInputStream.cpp @@ -1,6 +1,7 @@ +#include #if USE_CAPNP -#include +#include #include #include @@ -14,6 +15,10 @@ namespace DB { +static String getSchemaPath(const String & schema_dir, const String & schema_file) +{ + return schema_dir + escapeForFileName(schema_file) + ".capnp"; +} CapnProtoRowInputStream::NestedField split(const Block & sample, size_t i) { @@ -110,16 +115,17 @@ void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields } } -CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block & sample_, const String & schema_file, const String & root_object) +CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block & sample_, const String & schema_dir, const String & schema_file, const String & root_object) : istr(istr_), sample(sample_), parser(std::make_shared()) { + // Parse the schema and fetch the root object - auto schema = parser->impl.parseDiskFile(schema_file, schema_file, {}); + auto schema = parser->impl.parseDiskFile(schema_file, getSchemaPath(schema_dir, schema_file), {}); root = schema.getNested(root_object).asStruct(); /** * The schema typically consists of fields in various nested structures. - * Here we gather the list of fields and sort them in a way so that fields in the same structur are adjacent, + * Here we gather the list of fields and sort them in a way so that fields in the same structure are adjacent, * and the nesting level doesn't decrease to make traversal easier. */ NestedFieldList list; @@ -194,4 +200,4 @@ bool CapnProtoRowInputStream::read(Block & block) } -#endif +#endif // USE_CAPNP diff --git a/dbms/src/DataStreams/CapnProtoRowInputStream.h b/dbms/src/DataStreams/CapnProtoRowInputStream.h index f5712945b3d..7ccdb0d8381 100644 --- a/dbms/src/DataStreams/CapnProtoRowInputStream.h +++ b/dbms/src/DataStreams/CapnProtoRowInputStream.h @@ -1,4 +1,6 @@ #pragma once +#include +#if USE_CAPNP #include #include @@ -26,10 +28,11 @@ public: }; using NestedFieldList = std::vector; - /** schema_file - location of the capnproto schema, e.g. "schema.canpn" + /** schema_dir - base path for schema files + * schema_file - location of the capnproto schema, e.g. "schema.canpn" * root_object - name to the root object, e.g. "Message" */ - CapnProtoRowInputStream(ReadBuffer & istr_, const Block & sample_, const String & schema_file, const String & root_object); + CapnProtoRowInputStream(ReadBuffer & istr_, const Block & sample_, const String & schema_dir, const String & schema_file, const String & root_object); bool read(Block & block) override; @@ -66,3 +69,5 @@ private: }; } + +#endif // USE_CAPNP \ No newline at end of file diff --git a/dbms/src/DataStreams/FormatFactory.cpp b/dbms/src/DataStreams/FormatFactory.cpp index b75782d731e..e26f2ab6fa3 100644 --- a/dbms/src/DataStreams/FormatFactory.cpp +++ b/dbms/src/DataStreams/FormatFactory.cpp @@ -105,9 +105,10 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu auto schema_and_root = settings.format_schema.toString(); boost::split(tokens, schema_and_root, boost::is_any_of(":")); if (tokens.size() != 2) - throw Exception("Format CapnProto requires 'format_schema' setting to have schema_file:root_object format, e.g. 'schema.capnp:Message'"); + throw Exception("Format CapnProto requires 'format_schema' setting to have a schema_file:root_object format, e.g. 'schema.capnp:Message'"); - return wrap_row_stream(std::make_shared(buf, sample, tokens[0], tokens[1])); + const String & schema_dir = context.getFormatSchemaPath(); + return wrap_row_stream(std::make_shared(buf, sample, schema_dir, tokens[0], tokens[1])); } #endif else if (name == "TabSeparatedRaw" diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index e2e4c03e938..0b8f4319d5b 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -133,6 +133,7 @@ struct ContextShared mutable std::unique_ptr compression_settings_selector; std::unique_ptr merge_tree_settings; /// Settings of MergeTree* engines. size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default) + String format_schema_path; /// Path to a directory that contains schema files used by input formats. /// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests. @@ -1566,6 +1567,16 @@ void Context::setDefaultProfileName(const String & name) shared->default_profile_name = name; } +String Context::getFormatSchemaPath() const +{ + return shared->format_schema_path; +} + +void Context::setFormatSchemaPath(const String & path) +{ + shared->format_schema_path = path; +} + SessionCleaner::~SessionCleaner() { diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 88c618222ab..6fb8b585d66 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -354,6 +354,10 @@ public: String getDefaultProfileName() const; void setDefaultProfileName(const String & name); + /// Base path for format schemas + String getFormatSchemaPath() const; + void setFormatSchemaPath(const String & path); + /// User name and session identifier. Named sessions are local to users. using SessionKey = std::pair; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 09f3b6410d0..dc76b0b384c 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -264,6 +264,11 @@ int Server::main(const std::vector & args) global_context->setDefaultProfileName(default_profile_name); global_context->setSetting("profile", default_profile_name); + /// Set path for format schema files + auto format_schema_path = Poco::File(config().getString("format_schema_path", path + "format_schemas/")); + global_context->setFormatSchemaPath(format_schema_path.path() + "/"); + format_schema_path.createDirectories(); + LOG_INFO(log, "Loading metadata."); loadMetadataSystem(*global_context); /// After the system database is created, attach virtual system tables (in addition to query_log and part_log) diff --git a/dbms/src/Server/config.xml b/dbms/src/Server/config.xml index 9705d5ddaf8..2ee5d07251e 100644 --- a/dbms/src/Server/config.xml +++ b/dbms/src/Server/config.xml @@ -318,4 +318,9 @@ + + + format_schemas/ diff --git a/docs/en/formats/capnproto.rst b/docs/en/formats/capnproto.rst index d7ad7ba44db..5b6c4b79796 100644 --- a/docs/en/formats/capnproto.rst +++ b/docs/en/formats/capnproto.rst @@ -5,9 +5,10 @@ Cap'n Proto is a binary message format. Like Protocol Buffers and Thrift (but un .. code-block:: sql - SELECT SearchPhrase, count() AS c FROM test.hits GROUP BY SearchPhrase FORMAT CapnProto SETTINGS schema = 'schema.capnp:Message' + SELECT SearchPhrase, count() AS c FROM test.hits + GROUP BY SearchPhrase FORMAT CapnProto SETTINGS schema = 'schema:Message' -When the schema file looks like: +When the `schema.capnp` schema file looks like: .. code-block:: text @@ -16,6 +17,13 @@ When the schema file looks like: c @1 :Uint64; } +The schema files are located in the path specified in the configuration file: + +.. code-block:: xml + + + format_schemas/ + Deserialization is almost as efficient as the binary rows format, with typically zero allocation overhead per message. You can use this format as an efficient exchange message format in your data processing pipeline.