DataStreams: CapnProto uses <format_schema_path> config option

This addresses one of the remarks in the PR.

All format schemas are required to be in the <format_schema_path> directory.
This makes loading schema files less tedious, as the path can be omitted.
This commit is contained in:
Marek Vavruša 2017-11-09 22:48:28 -08:00 committed by alexey-milovidov
parent dbee90ba5b
commit 64a892c0e6
8 changed files with 56 additions and 11 deletions

View File

@ -1,6 +1,7 @@
#include <Common/config.h>
#if USE_CAPNP
#include <Core/Block.h>
#include <Common/escapeForFileName.h>
#include <IO/ReadBuffer.h>
#include <DataStreams/CapnProtoRowInputStream.h>
@ -14,6 +15,10 @@
namespace DB
{
static String getSchemaPath(const String & schema_dir, const String & schema_file)
{
return schema_dir + escapeForFileName(schema_file) + ".capnp";
}
CapnProtoRowInputStream::NestedField split(const Block & sample, size_t i)
{
@ -110,16 +115,17 @@ void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields
}
}
CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block & sample_, const String & schema_file, const String & root_object)
CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block & sample_, const String & schema_dir, const String & schema_file, const String & root_object)
: istr(istr_), sample(sample_), parser(std::make_shared<SchemaParser>())
{
// Parse the schema and fetch the root object
auto schema = parser->impl.parseDiskFile(schema_file, schema_file, {});
auto schema = parser->impl.parseDiskFile(schema_file, getSchemaPath(schema_dir, schema_file), {});
root = schema.getNested(root_object).asStruct();
/**
* The schema typically consists of fields in various nested structures.
* Here we gather the list of fields and sort them in a way so that fields in the same structur are adjacent,
* Here we gather the list of fields and sort them in a way so that fields in the same structure are adjacent,
* and the nesting level doesn't decrease to make traversal easier.
*/
NestedFieldList list;
@ -194,4 +200,4 @@ bool CapnProtoRowInputStream::read(Block & block)
}
#endif
#endif // USE_CAPNP

View File

@ -1,4 +1,6 @@
#pragma once
#include <Common/config.h>
#if USE_CAPNP
#include <Core/Block.h>
#include <DataStreams/IRowInputStream.h>
@ -26,10 +28,11 @@ public:
};
using NestedFieldList = std::vector<NestedField>;
/** schema_file - location of the capnproto schema, e.g. "schema.canpn"
/** schema_dir - base path for schema files
* schema_file - location of the capnproto schema, e.g. "schema.canpn"
* root_object - name to the root object, e.g. "Message"
*/
CapnProtoRowInputStream(ReadBuffer & istr_, const Block & sample_, const String & schema_file, const String & root_object);
CapnProtoRowInputStream(ReadBuffer & istr_, const Block & sample_, const String & schema_dir, const String & schema_file, const String & root_object);
bool read(Block & block) override;
@ -66,3 +69,5 @@ private:
};
}
#endif // USE_CAPNP

View File

@ -105,9 +105,10 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
auto schema_and_root = settings.format_schema.toString();
boost::split(tokens, schema_and_root, boost::is_any_of(":"));
if (tokens.size() != 2)
throw Exception("Format CapnProto requires 'format_schema' setting to have schema_file:root_object format, e.g. 'schema.capnp:Message'");
throw Exception("Format CapnProto requires 'format_schema' setting to have a schema_file:root_object format, e.g. 'schema.capnp:Message'");
return wrap_row_stream(std::make_shared<CapnProtoRowInputStream>(buf, sample, tokens[0], tokens[1]));
const String & schema_dir = context.getFormatSchemaPath();
return wrap_row_stream(std::make_shared<CapnProtoRowInputStream>(buf, sample, schema_dir, tokens[0], tokens[1]));
}
#endif
else if (name == "TabSeparatedRaw"

View File

@ -133,6 +133,7 @@ struct ContextShared
mutable std::unique_ptr<CompressionSettingsSelector> compression_settings_selector;
std::unique_ptr<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
String format_schema_path; /// Path to a directory that contains schema files used by input formats.
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.
@ -1566,6 +1567,16 @@ void Context::setDefaultProfileName(const String & name)
shared->default_profile_name = name;
}
String Context::getFormatSchemaPath() const
{
return shared->format_schema_path;
}
void Context::setFormatSchemaPath(const String & path)
{
shared->format_schema_path = path;
}
SessionCleaner::~SessionCleaner()
{

View File

@ -354,6 +354,10 @@ public:
String getDefaultProfileName() const;
void setDefaultProfileName(const String & name);
/// Base path for format schemas
String getFormatSchemaPath() const;
void setFormatSchemaPath(const String & path);
/// User name and session identifier. Named sessions are local to users.
using SessionKey = std::pair<String, String>;

View File

@ -264,6 +264,11 @@ int Server::main(const std::vector<std::string> & args)
global_context->setDefaultProfileName(default_profile_name);
global_context->setSetting("profile", default_profile_name);
/// Set path for format schema files
auto format_schema_path = Poco::File(config().getString("format_schema_path", path + "format_schemas/"));
global_context->setFormatSchemaPath(format_schema_path.path() + "/");
format_schema_path.createDirectories();
LOG_INFO(log, "Loading metadata.");
loadMetadataSystem(*global_context);
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)

View File

@ -318,4 +318,9 @@
</retention>
</default>
</graphite_rollup_example>
<!-- Directory in <clickhouse-path> containing schema files for various input formats.
The directory will be created if it doesn't exist.
-->
<format_schema_path>format_schemas/</format_schema_path>
</yandex>

View File

@ -5,9 +5,10 @@ Cap'n Proto is a binary message format. Like Protocol Buffers and Thrift (but un
.. code-block:: sql
SELECT SearchPhrase, count() AS c FROM test.hits GROUP BY SearchPhrase FORMAT CapnProto SETTINGS schema = 'schema.capnp:Message'
SELECT SearchPhrase, count() AS c FROM test.hits
GROUP BY SearchPhrase FORMAT CapnProto SETTINGS schema = 'schema:Message'
When the schema file looks like:
When the `schema.capnp` schema file looks like:
.. code-block:: text
@ -16,6 +17,13 @@ When the schema file looks like:
c @1 :Uint64;
}
The schema files are located in the path specified in the configuration file:
.. code-block:: xml
<!-- Directory containing schema files for various input formats. -->
<format_schema_path>format_schemas/</format_schema_path>
Deserialization is almost as efficient as the binary rows format, with typically zero allocation overhead per message.
You can use this format as an efficient exchange message format in your data processing pipeline.