#include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } IStorageURLBase::IStorageURLBase(const Poco::URI & uri_, const Context & context_, const std::string & table_name_, const String & format_name_, const ColumnsDescription & columns_) : IStorage(columns_), uri(uri_), context_global(context_), format_name(format_name_), table_name(table_name_) { } namespace { class StorageURLBlockInputStream : public IBlockInputStream { public: StorageURLBlockInputStream(const Poco::URI & uri, const std::string & method, std::function callback, const String & format, const String & name_, const Block & sample_block, const Context & context, UInt64 max_block_size, const ConnectionTimeouts & timeouts) : name(name_) { read_buf = std::make_unique(uri, method, callback, timeouts); reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); } String getName() const override { return name; } Block readImpl() override { return reader->read(); } Block getHeader() const override { return reader->getHeader(); } void readPrefixImpl() override { reader->readPrefix(); } void readSuffixImpl() override { reader->readSuffix(); } private: String name; std::unique_ptr read_buf; BlockInputStreamPtr reader; }; class StorageURLBlockOutputStream : public IBlockOutputStream { public: StorageURLBlockOutputStream(const Poco::URI & uri, const String & format, const Block & sample_block_, const Context & context, const ConnectionTimeouts & timeouts) : sample_block(sample_block_) { write_buf = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts); writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context); } Block getHeader() const override { return sample_block; } void write(const Block & block) override { writer->write(block); } void writePrefix() override { writer->writePrefix(); } void writeSuffix() override { writer->writeSuffix(); writer->flush(); write_buf->finalize(); } private: Block sample_block; std::unique_ptr write_buf; BlockOutputStreamPtr writer; }; } std::string IStorageURLBase::getReadMethod() const { return Poco::Net::HTTPRequest::HTTP_GET; } std::vector> IStorageURLBase::getReadURIParams(const Names & /*column_names*/, const SelectQueryInfo & /*query_info*/, const Context & /*context*/, QueryProcessingStage::Enum & /*processed_stage*/, size_t /*max_block_size*/) const { return {}; } std::function IStorageURLBase::getReadPOSTDataCallback(const Names & /*column_names*/, const SelectQueryInfo & /*query_info*/, const Context & /*context*/, QueryProcessingStage::Enum & /*processed_stage*/, size_t /*max_block_size*/) const { return nullptr; } BlockInputStreams IStorageURLBase::read(const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned /*num_streams*/) { auto request_uri = uri; auto params = getReadURIParams(column_names, query_info, context, processed_stage, max_block_size); for (const auto & [param, value] : params) request_uri.addQueryParameter(param, value); BlockInputStreamPtr block_input = std::make_shared(request_uri, getReadMethod(), getReadPOSTDataCallback(column_names, query_info, context, processed_stage, max_block_size), format_name, getName(), getHeaderBlock(column_names), context, max_block_size, ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef())); auto column_defaults = getColumns().getDefaults(); if (column_defaults.empty()) return {block_input}; return {std::make_shared(block_input, column_defaults, context)}; } void IStorageURLBase::rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & /*new_table_name*/) {} BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const Context & /*context*/) { return std::make_shared( uri, format_name, getSampleBlock(), context_global, ConnectionTimeouts::getHTTPTimeouts(context_global.getSettingsRef())); } void registerStorageURL(StorageFactory & factory) { factory.registerStorage("URL", [](const StorageFactory::Arguments & args) { ASTs & engine_args = args.engine_args; if (!(engine_args.size() == 1 || engine_args.size() == 2)) throw Exception( "Storage URL requires exactly 2 arguments: url and name of used format.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context); String url = static_cast(*engine_args[0]).value.safeGet(); Poco::URI uri(url); engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context); String format_name = static_cast(*engine_args[1]).value.safeGet(); return StorageURL::create(uri, args.table_name, format_name, args.columns, args.context); }); } }