From 5372942aef0f71e4704ee91593587c8c4fb1bc8f Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 17 Feb 2020 18:01:03 +0300 Subject: [PATCH] Processors support for StorageURL reading. --- dbms/src/Storages/StorageURL.cpp | 51 ++++++++++++++++---------------- dbms/src/Storages/StorageURL.h | 4 ++- 2 files changed, 29 insertions(+), 26 deletions(-) diff --git a/dbms/src/Storages/StorageURL.cpp b/dbms/src/Storages/StorageURL.cpp index b7eb61c991a..04c8c3e8fd8 100644 --- a/dbms/src/Storages/StorageURL.cpp +++ b/dbms/src/Storages/StorageURL.cpp @@ -17,6 +17,8 @@ #include #include +#include +#include namespace DB @@ -48,26 +50,27 @@ IStorageURLBase::IStorageURLBase( namespace { - class StorageURLBlockInputStream : public IBlockInputStream + class StorageURLSource : public SourceWithProgress { public: - StorageURLBlockInputStream(const Poco::URI & uri, + StorageURLSource(const Poco::URI & uri, const std::string & method, std::function callback, const String & format, - const String & name_, + String name_, const Block & sample_block, const Context & context, + const ColumnDefaults & column_defaults, UInt64 max_block_size, const ConnectionTimeouts & timeouts, const CompressionMethod compression_method) - : name(name_) + : SourceWithProgress(sample_block), name(std::move(name_)) { read_buf = wrapReadBufferWithCompressionMethod( std::make_unique( uri, method, - callback, + std::move(callback), timeouts, context.getSettingsRef().max_http_get_redirects, Poco::Net::HTTPBasicCredentials{}, @@ -77,6 +80,7 @@ namespace compression_method); reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); + reader = std::make_shared(reader, column_defaults, context); } String getName() const override @@ -84,30 +88,28 @@ namespace return name; } - Block readImpl() override + Chunk generate() override { - return reader->read(); - } + if (!reader) + return {}; - Block getHeader() const override - { - return reader->getHeader(); - } + if (!initialized) + reader->readPrefix(); - void readPrefixImpl() override - { - reader->readPrefix(); - } + if (auto block = reader->read()) + return Chunk(block.getColumns(), block.rows()); - void readSuffixImpl() override - { reader->readSuffix(); + reader.reset(); + + return {}; } private: String name; std::unique_ptr read_buf; BlockInputStreamPtr reader; + bool initialized = false; }; class StorageURLBlockOutputStream : public IBlockOutputStream @@ -181,7 +183,7 @@ std::function IStorageURLBase::getReadPOSTDataCallback(con } -BlockInputStreams IStorageURLBase::read(const Names & column_names, +Pipes IStorageURLBase::readWithProcessors(const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage, @@ -193,21 +195,20 @@ BlockInputStreams IStorageURLBase::read(const Names & column_names, for (const auto & [param, value] : params) request_uri.addQueryParameter(param, value); - BlockInputStreamPtr block_input = std::make_shared(request_uri, + Pipes pipes; + pipes.emplace_back(std::make_shared(request_uri, getReadMethod(), getReadPOSTDataCallback(column_names, query_info, context, processed_stage, max_block_size), format_name, getName(), getHeaderBlock(column_names), context, + getColumns().getDefaults(), max_block_size, ConnectionTimeouts::getHTTPTimeouts(context), - chooseCompressionMethod(request_uri.getPath(), compression_method)); + chooseCompressionMethod(request_uri.getPath(), compression_method))); - auto column_defaults = getColumns().getDefaults(); - if (column_defaults.empty()) - return {block_input}; - return {std::make_shared(block_input, column_defaults, context)}; + return pipes; } BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const Context & /*context*/) diff --git a/dbms/src/Storages/StorageURL.h b/dbms/src/Storages/StorageURL.h index b0fec1527c1..2dbf5a1af03 100644 --- a/dbms/src/Storages/StorageURL.h +++ b/dbms/src/Storages/StorageURL.h @@ -16,7 +16,7 @@ namespace DB class IStorageURLBase : public IStorage { public: - BlockInputStreams read( + Pipes readWithProcessors( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, @@ -24,6 +24,8 @@ public: size_t max_block_size, unsigned num_streams) override; + bool supportProcessorsPipeline() const override { return true; } + BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override; protected: