From ff09934219ba99af077b4d5665fa664f290769a8 Mon Sep 17 00:00:00 2001 From: Vladimir Chebotarev Date: Fri, 31 May 2019 07:27:14 +0000 Subject: [PATCH] Table function and storage. --- dbms/src/Storages/StorageS3.cpp | 292 ++++++++---------- dbms/src/Storages/StorageS3.h | 102 +++--- dbms/src/Storages/registerStorages.cpp | 2 + dbms/src/TableFunctions/TableFunctionS3.cpp | 19 ++ dbms/src/TableFunctions/TableFunctionS3.h | 25 ++ .../TableFunctions/registerTableFunctions.cpp | 2 + 6 files changed, 232 insertions(+), 210 deletions(-) create mode 100644 dbms/src/TableFunctions/TableFunctionS3.cpp create mode 100644 dbms/src/TableFunctions/TableFunctionS3.h diff --git a/dbms/src/Storages/StorageS3.cpp b/dbms/src/Storages/StorageS3.cpp index 080c1c7069e..f49cd9e7a9e 100644 --- a/dbms/src/Storages/StorageS3.cpp +++ b/dbms/src/Storages/StorageS3.cpp @@ -1,126 +1,176 @@ -#include #include +#include #include #include - #include -#include #include #include #include -#include + #include +#include #include -#include -#include - -#include -#include #include + namespace DB { - namespace ErrorCodes { - extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR; - extern const int CANNOT_SEEK_THROUGH_FILE; - extern const int DATABASE_ACCESS_DENIED; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; - extern const int UNKNOWN_IDENTIFIER; - extern const int INCORRECT_FILE_NAME; - extern const int FILE_DOESNT_EXIST; - extern const int EMPTY_LIST_OF_COLUMNS_PASSED; } - -StorageS3::StorageS3( - const std::string & table_uri_, - const std::string & table_name_, - const std::string & format_name_, - const ColumnsDescription & columns_, - Context & context_) - : IStorage(columns_) - , table_name(table_name_) - , format_name(format_name_) - , context_global(context_) - , uri(table_uri_) +IStorageS3Base::IStorageS3Base(const Poco::URI & uri_, + const Context & context_, + const std::string & table_name_, + const String & format_name_, + const ColumnsDescription & columns_) + : IStorage(columns_), uri(uri_), context_global(context_), format_name(format_name_), table_name(table_name_) { } - -class StorageS3BlockInputStream : public IBlockInputStream +namespace { -public: - StorageS3BlockInputStream(const Poco::URI & uri, - const std::string & method, - std::function callback, - const String & format, - const String & name_, - const Block & sample_block, - const Context & context, - UInt64 max_block_size, - const ConnectionTimeouts & timeouts) - : name(name_) + class StorageS3BlockInputStream : public IBlockInputStream { - read_buf = std::make_unique(uri, method, callback, timeouts); + public: + StorageS3BlockInputStream(const Poco::URI & uri, + const std::string & method, + std::function callback, + const String & format, + const String & name_, + const Block & sample_block, + const Context & context, + UInt64 max_block_size, + const ConnectionTimeouts & timeouts) + : name(name_) + { + read_buf = std::make_unique(uri, method, callback, timeouts); - reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); - } + reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); + } - String getName() const override + String getName() const override + { + return name; + } + + Block readImpl() override + { + return reader->read(); + } + + Block getHeader() const override + { + return reader->getHeader(); + } + + void readPrefixImpl() override + { + reader->readPrefix(); + } + + void readSuffixImpl() override + { + reader->readSuffix(); + } + + private: + String name; + std::unique_ptr read_buf; + BlockInputStreamPtr reader; + }; + + class StorageS3BlockOutputStream : public IBlockOutputStream { - return name; - } + public: + StorageS3BlockOutputStream(const Poco::URI & uri, + const String & format, + const Block & sample_block_, + const Context & context, + const ConnectionTimeouts & timeouts) + : sample_block(sample_block_) + { + write_buf = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts); + writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context); + } - Block readImpl() override - { - return reader->read(); - } + Block getHeader() const override + { + return sample_block; + } - Block getHeader() const override - { - return reader->getHeader(); - } + void write(const Block & block) override + { + writer->write(block); + } - void readPrefixImpl() override - { - reader->readPrefix(); - } + void writePrefix() override + { + writer->writePrefix(); + } - void readSuffixImpl() override - { - reader->readSuffix(); - } + void writeSuffix() override + { + writer->writeSuffix(); + writer->flush(); + write_buf->finalize(); + } -private: - String name; - std::unique_ptr read_buf; - BlockInputStreamPtr reader; -}; + private: + Block sample_block; + std::unique_ptr write_buf; + BlockOutputStreamPtr writer; + }; +} -BlockInputStreams StorageS3::read( - const Names & column_names, +std::string IStorageS3Base::getReadMethod() const +{ + return Poco::Net::HTTPRequest::HTTP_GET; +} + +std::vector> IStorageS3Base::getReadURIParams(const Names & /*column_names*/, const SelectQueryInfo & /*query_info*/, + const Context & /*context*/, + QueryProcessingStage::Enum & /*processed_stage*/, + size_t /*max_block_size*/) const +{ + return {}; +} + +std::function IStorageS3Base::getReadPOSTDataCallback(const Names & /*column_names*/, + const SelectQueryInfo & /*query_info*/, + const Context & /*context*/, + QueryProcessingStage::Enum & /*processed_stage*/, + size_t /*max_block_size*/) const +{ + return nullptr; +} + + +BlockInputStreams IStorageS3Base::read(const Names & column_names, + const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum /*processed_stage*/, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned /*num_streams*/) { auto request_uri = uri; + auto params = getReadURIParams(column_names, query_info, context, processed_stage, max_block_size); + for (const auto & [param, value] : params) + request_uri.addQueryParameter(param, value); BlockInputStreamPtr block_input = std::make_shared(request_uri, - Poco::Net::HTTPRequest::HTTP_GET, - nullptr, - //getReadPOSTDataCallback(column_names, query_info, context, processed_stage, max_block_size), + getReadMethod(), + getReadPOSTDataCallback(column_names, query_info, context, processed_stage, max_block_size), format_name, getName(), - getSampleBlockForColumns(column_names), + getHeaderBlock(column_names), context, max_block_size, ConnectionTimeouts::getHTTPTimeouts(context)); @@ -132,98 +182,16 @@ BlockInputStreams StorageS3::read( return {std::make_shared(block_input, column_defaults, context)}; } +void IStorageS3Base::rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & /*new_table_name*/) {} -class StorageS3BlockOutputStream : public IBlockOutputStream -{ -public: - StorageS3BlockOutputStream(const Poco::URI & uri, - const String & format, - const Block & sample_block_, - const Context & context, - const ConnectionTimeouts & timeouts) - : sample_block(sample_block_) - { - write_buf = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts); - writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context); - } - - Block getHeader() const override - { - return sample_block; - } - - void write(const Block & block) override - { - writer->write(block); - } - - void writePrefix() override - { - writer->writePrefix(); - } - - void writeSuffix() override - { - writer->writeSuffix(); - writer->flush(); - write_buf->finalize(); - } - -private: - Block sample_block; - std::unique_ptr write_buf; - BlockOutputStreamPtr writer; -}; - -BlockOutputStreamPtr StorageS3::write( - const ASTPtr & /*query*/, - const Context & /*context*/) +BlockOutputStreamPtr IStorageS3Base::write(const ASTPtr & /*query*/, const Context & /*context*/) { return std::make_shared( uri, format_name, getSampleBlock(), context_global, ConnectionTimeouts::getHTTPTimeouts(context_global)); } - -void StorageS3::drop() +void registerStorageS3(StorageFactory & /*factory*/) { - /// Extra actions are not required. + // TODO. See #1394. } - - -void StorageS3::rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & /*new_table_name*/) {} - - -void registerStorageS3(StorageFactory & factory) -{ - factory.registerStorage("S3", [](const StorageFactory::Arguments & args) - { - ASTs & engine_args = args.engine_args; - - if (!(engine_args.size() == 2)) - throw Exception( - "Storage S3 requires 2 arguments: name of used format and source.", - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - - engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context); - String format_name = engine_args[0]->as().value.safeGet(); - - String source_path; - if (const auto * literal = engine_args[1]->as()) - { - auto type = literal->value.getType(); - if (type == Field::Types::String) - { - source_path = literal->value.get(); - return StorageS3::create( - source_path, - args.table_name, format_name, args.columns, - args.context); - } - } - - throw Exception("Unknown entity in first arg of S3 storage constructor, String expected.", - ErrorCodes::UNKNOWN_IDENTIFIER); - }); -} - } diff --git a/dbms/src/Storages/StorageS3.h b/dbms/src/Storages/StorageS3.h index 263e5033962..2615563b57c 100644 --- a/dbms/src/Storages/StorageS3.h +++ b/dbms/src/Storages/StorageS3.h @@ -1,82 +1,88 @@ #pragma once #include - -#include -#include #include - #include - -#include -#include #include - namespace DB { - -class StorageS3BlockInputStream; -class StorageS3BlockOutputStream; - -class StorageS3 : public ext::shared_ptr_helper, public IStorage +/** + * This class represents table engine for external urls. + * It sends HTTP GET to server when select is called and + * HTTP POST when insert is called. In POST request the data is send + * using Chunked transfer encoding, so server have to support it. + */ +class IStorageS3Base : public IStorage { public: - std::string getName() const override - { - return "S3"; - } - - std::string getTableName() const override + String getTableName() const override { return table_name; } - BlockInputStreams read( - const Names & column_names, + BlockInputStreams read(const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; - BlockOutputStreamPtr write( - const ASTPtr & query, - const Context & context) override; - - void drop() override; + BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override; void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override; protected: - friend class StorageS3BlockInputStream; - friend class StorageS3BlockOutputStream; - - /** there are three options (ordered by priority): - - use specified file descriptor if (fd >= 0) - - use specified table_path if it isn't empty - - create own table inside data/db/table/ - */ - StorageS3( - const std::string & table_uri_, + IStorageS3Base(const Poco::URI & uri_, + const Context & context_, const std::string & table_name_, - const std::string & format_name_, - const ColumnsDescription & columns_, - Context & context_); - -private: - - std::string table_name; - std::string format_name; - Context & context_global; + const String & format_name_, + const ColumnsDescription & columns_); Poco::URI uri; + const Context & context_global; - bool is_db_table = true; /// Table is stored in real database, not user's file +private: + String format_name; + String table_name; - mutable std::shared_mutex rwlock; + virtual std::string getReadMethod() const; - Logger * log = &Logger::get("StorageS3"); + virtual std::vector> getReadURIParams(const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size) const; + + virtual std::function getReadPOSTDataCallback(const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size) const; + + virtual Block getHeaderBlock(const Names & column_names) const = 0; }; +class StorageS3 : public ext::shared_ptr_helper, public IStorageS3Base +{ +public: + StorageS3(const Poco::URI & uri_, + const std::string & table_name_, + const String & format_name_, + const ColumnsDescription & columns_, + Context & context_) + : IStorageS3Base(uri_, context_, table_name_, format_name_, columns_) + { + } + + String getName() const override + { + return "S3"; + } + + Block getHeaderBlock(const Names & /*column_names*/) const override + { + return getSampleBlock(); + } +}; } diff --git a/dbms/src/Storages/registerStorages.cpp b/dbms/src/Storages/registerStorages.cpp index c21156ea44d..4c29884dfcf 100644 --- a/dbms/src/Storages/registerStorages.cpp +++ b/dbms/src/Storages/registerStorages.cpp @@ -19,6 +19,7 @@ void registerStorageDistributed(StorageFactory & factory); void registerStorageMemory(StorageFactory & factory); void registerStorageFile(StorageFactory & factory); void registerStorageURL(StorageFactory & factory); +void registerStorageS3(StorageFactory & factory); void registerStorageDictionary(StorageFactory & factory); void registerStorageSet(StorageFactory & factory); void registerStorageJoin(StorageFactory & factory); @@ -60,6 +61,7 @@ void registerStorages() registerStorageMemory(factory); registerStorageFile(factory); registerStorageURL(factory); + registerStorageS3(factory); registerStorageDictionary(factory); registerStorageSet(factory); registerStorageJoin(factory); diff --git a/dbms/src/TableFunctions/TableFunctionS3.cpp b/dbms/src/TableFunctions/TableFunctionS3.cpp new file mode 100644 index 00000000000..5c2c6215765 --- /dev/null +++ b/dbms/src/TableFunctions/TableFunctionS3.cpp @@ -0,0 +1,19 @@ +#include +#include +#include +#include + +namespace DB +{ +StoragePtr TableFunctionS3::getStorage( + const String & source, const String & format, const Block & sample_block, Context & global_context) const +{ + Poco::URI uri(source); + return StorageS3::create(uri, getName(), format, ColumnsDescription{sample_block.getNamesAndTypesList()}, global_context); +} + +void registerTableFunctionS3(TableFunctionFactory & factory) +{ + factory.registerFunction(); +} +} diff --git a/dbms/src/TableFunctions/TableFunctionS3.h b/dbms/src/TableFunctions/TableFunctionS3.h new file mode 100644 index 00000000000..83c49e0b8d1 --- /dev/null +++ b/dbms/src/TableFunctions/TableFunctionS3.h @@ -0,0 +1,25 @@ +#pragma once + +#include +#include +#include + + +namespace DB +{ +/* url(source, format, structure) - creates a temporary storage from url + */ +class TableFunctionS3 : public ITableFunctionFileLike +{ +public: + static constexpr auto name = "s3"; + std::string getName() const override + { + return name; + } + +private: + StoragePtr getStorage( + const String & source, const String & format, const Block & sample_block, Context & global_context) const override; +}; +} diff --git a/dbms/src/TableFunctions/registerTableFunctions.cpp b/dbms/src/TableFunctions/registerTableFunctions.cpp index 61d0ec23f7d..aad5eebe935 100644 --- a/dbms/src/TableFunctions/registerTableFunctions.cpp +++ b/dbms/src/TableFunctions/registerTableFunctions.cpp @@ -11,6 +11,7 @@ void registerTableFunctionMerge(TableFunctionFactory & factory); void registerTableFunctionRemote(TableFunctionFactory & factory); void registerTableFunctionNumbers(TableFunctionFactory & factory); void registerTableFunctionFile(TableFunctionFactory & factory); +void registerTableFunctionS3(TableFunctionFactory & factory); void registerTableFunctionURL(TableFunctionFactory & factory); void registerTableFunctionValues(TableFunctionFactory & factory); @@ -37,6 +38,7 @@ void registerTableFunctions() registerTableFunctionRemote(factory); registerTableFunctionNumbers(factory); registerTableFunctionFile(factory); + registerTableFunctionS3(factory); registerTableFunctionURL(factory); registerTableFunctionValues(factory);