#include #include #include #include #include #include #include #include #include #include namespace DB { static const size_t max_block_size = 8192; HTTPDictionarySource::HTTPDictionarySource(const DictionaryStructure & dict_struct_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Block & sample_block, const Context & context) : log(&Logger::get("HTTPDictionarySource")), dict_struct{dict_struct_}, url{config.getString(config_prefix + ".url", "")}, format{config.getString(config_prefix + ".format")}, sample_block{sample_block}, context(context), timeouts(ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef())) { } HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other) : log(&Logger::get("HTTPDictionarySource")), dict_struct{other.dict_struct}, url{other.url}, format{other.format}, sample_block{other.sample_block}, context(other.context), timeouts(ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef())) { } BlockInputStreamPtr HTTPDictionarySource::loadAll() { LOG_TRACE(log, "loadAll " + toString()); Poco::URI uri(url); auto in_ptr = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_GET, ReadWriteBufferFromHTTP::OutStreamCallback(), timeouts); auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); return std::make_shared>(input_stream, std::move(in_ptr)); } BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector & ids) { LOG_TRACE(log, "loadIds " << toString() << " size = " << ids.size()); ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & ostr) { WriteBufferFromOStream out_buffer(ostr); auto output_stream = context.getOutputFormat(format, out_buffer, sample_block); formatIDs(output_stream, ids); }; Poco::URI uri(url); auto in_ptr = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts); auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); return std::make_shared>(input_stream, std::move(in_ptr)); } BlockInputStreamPtr HTTPDictionarySource::loadKeys( const Columns & key_columns, const std::vector & requested_rows) { LOG_TRACE(log, "loadKeys " << toString() << " size = " << requested_rows.size()); ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & ostr) { WriteBufferFromOStream out_buffer(ostr); auto output_stream = context.getOutputFormat(format, out_buffer, sample_block); formatKeys(dict_struct, output_stream, key_columns, requested_rows); }; Poco::URI uri(url); auto in_ptr = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts); auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); return std::make_shared>(input_stream, std::move(in_ptr)); } bool HTTPDictionarySource::isModified() const { return true; } bool HTTPDictionarySource::supportsSelectiveLoad() const { return true; } DictionarySourcePtr HTTPDictionarySource::clone() const { return std::make_unique(*this); } std::string HTTPDictionarySource::toString() const { Poco::URI uri(url); return uri.toString(); } }