#include #include #include #include #include #include #include #include namespace DB { HTTPDictionarySource::HTTPDictionarySource(const DictionaryStructure & dict_struct_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Block & sample_block, const Context & context) : dict_struct{dict_struct_}, url{config.getString(config_prefix + ".url", "")}, format{config.getString(config_prefix + ".format")}, sample_block{sample_block}, context(context) { } HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other) : dict_struct{other.dict_struct}, url{other.url}, sample_block{other.sample_block}, context(other.context) { } BlockInputStreamPtr HTTPDictionarySource::loadAll() { LOG_TRACE(log, "loadAll " + toString()); Poco::URI uri(url); auto in_ptr = std::make_unique(uri.getHost(), uri.getPort(), uri.getPathAndQuery(), ReadBufferFromHTTP::Params(), Poco::Net::HTTPRequest::HTTP_GET); auto stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); return std::make_shared>(stream, std::move(in_ptr)); } BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector & ids) { LOG_TRACE(log, "loadIds " + toString()); Poco::URI uri(url); ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & out_stream) { // copypaste from ExecutableDictionarySource.cpp, todo: make func ColumnWithTypeAndName column; column.type = std::make_shared(); column.column = column.type->createColumn(); for (auto & id : ids) { column.column->insert(id); //CHECKME maybe faster? } Block block; block.insert(std::move(column)); WriteBufferFromOStream out_buffer(out_stream); auto stream_out = context.getOutputFormat(format, out_buffer, sample_block); stream_out->write(block); }; auto in_ptr = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback); auto stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); return std::make_shared>(stream, std::move(in_ptr)); } BlockInputStreamPtr HTTPDictionarySource::loadKeys( const ConstColumnPlainPtrs & key_columns, const std::vector & requested_rows) { LOG_TRACE(log, "loadKeys " + toString()); Poco::URI uri(url); ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & out_stream) { // copypaste from ExecutableDictionarySource.cpp, todo: make func Block block; const auto keys_size = key_columns.size(); for (const auto i : ext::range(0, keys_size)) { const auto & key_description = (*dict_struct.key)[i]; const auto & key = key_columns[i]; ColumnWithTypeAndName column; column.type = key_description.type; column.column = key->clone(); // CHECKME !! block.insert(std::move(column)); } WriteBufferFromOStream out_buffer(out_stream); auto stream_out = context.getOutputFormat(format, out_buffer, sample_block); stream_out->write(block); }; auto in_ptr = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback); auto stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); return std::make_shared>(stream, std::move(in_ptr)); } bool HTTPDictionarySource::isModified() const { return true; } bool HTTPDictionarySource::supportsSelectiveLoad() const { return true; } DictionarySourcePtr HTTPDictionarySource::clone() const { return std::make_unique(*this); } std::string HTTPDictionarySource::toString() const { Poco::URI uri(url); return uri.toString(); } }