diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index 92b9618af77..e7cee827ca7 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -189,7 +189,7 @@ struct Settings : public SettingsCollection M(SettingBool, input_format_values_interpret_expressions, true, "For Values format: if the field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.", 0) \ M(SettingBool, input_format_values_deduce_templates_of_expressions, true, "For Values format: if the field could not be parsed by streaming parser, run SQL parser, deduce template of the SQL expression, try to parse all rows using template and then interpret expression for all rows.", 0) \ M(SettingBool, input_format_values_accurate_types_of_literals, true, "For Values format: when parsing and interpreting expressions using template, check actual type of literal to avoid possible overflow and precision issues.", 0) \ - M(SettingString, input_format_avro_schema_registry_url, "", "For AvroConfluent format: Confluent Schema Registry URL.", 0) \ + M(SettingURI, format_avro_schema_registry_url, {}, "For AvroConfluent format: Confluent Schema Registry URL.", 0) \ \ M(SettingBool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \ \ diff --git a/dbms/src/Core/SettingsCollection.cpp b/dbms/src/Core/SettingsCollection.cpp index 9e44e327fcc..6e1040a23fc 100644 --- a/dbms/src/Core/SettingsCollection.cpp +++ b/dbms/src/Core/SettingsCollection.cpp @@ -396,6 +396,54 @@ void SettingEnum::set(const Field & x) } + +String SettingURI::toString() const +{ + return value.toString(); +} + +Field SettingURI::toField() const +{ + return value.toString(); +} + +void SettingURI::set(const Poco::URI & x) +{ + value = x; + changed = true; +} + +void SettingURI::set(const Field & x) +{ + const String & s = safeGet(x); + set(s); +} + +void SettingURI::set(const String & x) +{ + try { + Poco::URI uri(x); + set(uri); + } + catch (const Poco::Exception& e) + { + throw Exception{Exception::CreateFromPoco, e}; + } +} + +void SettingURI::serialize(WriteBuffer & buf, SettingsBinaryFormat) const +{ + writeStringBinary(toString(), buf); +} + +void SettingURI::deserialize(ReadBuffer & buf, SettingsBinaryFormat) +{ + String s; + readStringBinary(s, buf); + set(s); +} + + #define IMPLEMENT_SETTING_ENUM(ENUM_NAME, LIST_OF_NAMES_MACRO, ERROR_CODE_FOR_UNEXPECTED_NAME) \ IMPLEMENT_SETTING_ENUM_WITH_TAG(ENUM_NAME, void, LIST_OF_NAMES_MACRO, ERROR_CODE_FOR_UNEXPECTED_NAME) diff --git a/dbms/src/Core/SettingsCollection.h b/dbms/src/Core/SettingsCollection.h index a7a28fef847..4f44612c36f 100644 --- a/dbms/src/Core/SettingsCollection.h +++ b/dbms/src/Core/SettingsCollection.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -196,6 +197,26 @@ struct SettingEnum void deserialize(ReadBuffer & buf, SettingsBinaryFormat format); }; +struct SettingURI +{ + Poco::URI value; + bool changed = false; + + SettingURI(const Poco::URI & x = Poco::URI{}) : value(x) {} + + operator Poco::URI() const { return value; } + SettingURI & operator= (const Poco::URI & x) { set(x); return *this; } + + String toString() const; + Field toField() const; + + void set(const Poco::URI & x); + void set(const Field & x); + void set(const String & x); + + void serialize(WriteBuffer & buf, SettingsBinaryFormat format) const; + void deserialize(ReadBuffer & buf, SettingsBinaryFormat format); +}; enum class LoadBalancing { diff --git a/dbms/src/Formats/FormatFactory.cpp b/dbms/src/Formats/FormatFactory.cpp index f812b56aa5d..13203075918 100644 --- a/dbms/src/Formats/FormatFactory.cpp +++ b/dbms/src/Formats/FormatFactory.cpp @@ -14,7 +14,7 @@ #include #include #include - +#include namespace DB { @@ -68,7 +68,15 @@ static FormatSettings getInputFormatSetting(const Settings & settings, const Con format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter; format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter; format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter; - format_settings.avro.schema_registry_url = settings.input_format_avro_schema_registry_url; + + /// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context + if (context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER)) + { + const Poco::URI & avro_schema_registry_url = settings.format_avro_schema_registry_url; + if (!avro_schema_registry_url.empty()) + context.getRemoteHostFilter().checkURL(avro_schema_registry_url); + } + format_settings.avro.schema_registry_url = settings.format_avro_schema_registry_url.toString(); return format_settings; } diff --git a/dbms/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/dbms/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index d4e963d49e7..a6eed90d57a 100644 --- a/dbms/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/dbms/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -480,7 +480,7 @@ AvroDeserializer::AvroDeserializer(const ColumnsWithTypeAndName & columns, avro: } } -void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & decoder) +void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & decoder) const { for (size_t i = 0; i < field_mapping.size(); i++) { @@ -519,51 +519,49 @@ bool AvroRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) class AvroConfluentRowInputFormat::SchemaRegistry { public: - SchemaRegistry(const std::string & base_url_) + SchemaRegistry(const std::string & base_url_, size_t schema_cache_max_size = 1000) + : base_url(base_url_), schema_cache(schema_cache_max_size) { - if (base_url_.empty()) - { + if (base_url.empty()) throw Exception("Empty Schema Registry URL", ErrorCodes::BAD_ARGUMENTS); - } - try - { - base_url = base_url_; - } - catch (const Poco::SyntaxException & e) - { - throw Exception("Invalid Schema Registry URL: " + e.displayText(), ErrorCodes::BAD_ARGUMENTS); - } } - avro::ValidSchema getSchema(uint32_t id) const + avro::ValidSchema getSchema(uint32_t id) + { + auto [schema, loaded] = schema_cache.getOrSet( + id, + [this, id](){ return std::make_shared(fetchSchema(id)); } + ); + return *schema; + } + +private: + avro::ValidSchema fetchSchema(uint32_t id) { try { try { - /// TODO Host checking to prevent SSRF - Poco::URI url(base_url, "/schemas/ids/" + std::to_string(id)); + LOG_TRACE((&Logger::get("AvroConfluentRowInputFormat")), "Fetching schema id = " << id); /// One second for connect/send/receive. Just in case. ConnectionTimeouts timeouts({1, 0}, {1, 0}, {1, 0}); - Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery()); + Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); + request.setHost(url.getHost()); auto session = makePooledHTTPSession(url, timeouts, 1); session->sendRequest(request); Poco::Net::HTTPResponse response; - auto & response_body = session->receiveResponse(response); - - if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK) - { - throw Exception("HTTP code " + std::to_string(response.getStatus()), ErrorCodes::INCORRECT_DATA); - } + auto response_body = receiveResponse(*session, request, response, false); Poco::JSON::Parser parser; - auto json_body = parser.parse(response_body).extract(); + auto json_body = parser.parse(*response_body).extract(); auto schema = json_body->getValue("schema"); + LOG_TRACE((&Logger::get("AvroConfluentRowInputFormat")), + "Succesfully fetched schema id = " << id << "\n" << schema); return avro::compileJsonSchemaFromString(schema); } catch (const Exception &) @@ -588,8 +586,27 @@ public: private: Poco::URI base_url; + LRUCache schema_cache; }; +using ConfluentSchemaRegistry = AvroConfluentRowInputFormat::SchemaRegistry; +#define SCHEMA_REGISTRY_CACHE_MAX_SIZE 1000 +/// Cache of Schema Registry URL -> SchemaRegistry +static LRUCache schema_registry_cache(SCHEMA_REGISTRY_CACHE_MAX_SIZE); + +static std::shared_ptr getConfluentSchemaRegistry(const FormatSettings & format_settings) +{ + const auto & base_url = format_settings.avro.schema_registry_url; + auto [schema_registry, loaded] = schema_registry_cache.getOrSet( + base_url, + [base_url]() + { + return std::make_shared(base_url); + } + ); + return schema_registry; +} + static uint32_t readConfluentSchemaId(ReadBuffer & in) { uint8_t magic; @@ -611,7 +628,7 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat( const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_) : IRowInputFormat(header_.cloneEmpty(), in_, params_) , header_columns(header_.getColumnsWithTypeAndName()) - , schema_registry(std::make_unique(format_settings_.avro.schema_registry_url)) + , schema_registry(getConfluentSchemaRegistry(format_settings_)) , input_stream(std::make_unique(in)) , decoder(avro::binaryDecoder()) @@ -632,7 +649,7 @@ bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExten return true; } -AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(SchemaId schema_id) +const AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(SchemaId schema_id) { auto it = deserializer_cache.find(schema_id); if (it == deserializer_cache.end()) @@ -657,12 +674,6 @@ void registerInputFormatProcessorAvro(FormatFactory & factory) }); #if USE_POCO_JSON - - /// AvroConfluent format is disabled for the following reasons: - /// 1. There is no test for it. - /// 2. RemoteHostFilter is not used to prevent CSRF attacks. - -#if 0 factory.registerInputFormatProcessor("AvroConfluent",[]( ReadBuffer & buf, const Block & sample, @@ -673,8 +684,6 @@ void registerInputFormatProcessorAvro(FormatFactory & factory) }); #endif -#endif - } } diff --git a/dbms/src/Processors/Formats/Impl/AvroRowInputFormat.h b/dbms/src/Processors/Formats/Impl/AvroRowInputFormat.h index cdf7a8c58c1..b54c8ecede5 100644 --- a/dbms/src/Processors/Formats/Impl/AvroRowInputFormat.h +++ b/dbms/src/Processors/Formats/Impl/AvroRowInputFormat.h @@ -23,7 +23,7 @@ class AvroDeserializer { public: AvroDeserializer(const ColumnsWithTypeAndName & columns, avro::ValidSchema schema); - void deserializeRow(MutableColumns & columns, avro::Decoder & decoder); + void deserializeRow(MutableColumns & columns, avro::Decoder & decoder) const; private: using DeserializeFn = std::function; @@ -58,6 +58,12 @@ private: }; #if USE_POCO_JSON +/// Confluent framing + Avro binary datum encoding. Mainly used for Kafka. +/// Uses 3 caches: +/// 1. global: schema registry cache (base_url -> SchemaRegistry) +/// 2. SchemaRegistry: schema cache (schema_id -> schema) +/// 3. AvroConfluentRowInputFormat: deserializer cache (schema_id -> AvroDeserializer) +/// This is needed because KafkaStorage creates a new instance of InputFormat per a batch of messages class AvroConfluentRowInputFormat : public IRowInputFormat { public: @@ -65,15 +71,13 @@ public: virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override; String getName() const override { return "AvroConfluentRowInputFormat"; } + class SchemaRegistry; private: const ColumnsWithTypeAndName header_columns; - - class SchemaRegistry; - std::unique_ptr schema_registry; - + std::shared_ptr schema_registry; using SchemaId = uint32_t; std::unordered_map deserializer_cache; - AvroDeserializer & getOrCreateDeserializer(SchemaId schema_id); + const AvroDeserializer & getOrCreateDeserializer(SchemaId schema_id); avro::InputStreamPtr input_stream; avro::DecoderPtr decoder;