From 14f31f5117f3b36fd7aec104d87184f1c7bf339b Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov Date: Thu, 5 Nov 2020 14:28:20 +0300 Subject: [PATCH] add support of settings to URL, fix url and file table functions --- src/Storages/StorageFile.cpp | 13 +++--- src/Storages/StorageFile.h | 8 +++- src/Storages/StorageURL.cpp | 43 ++++++++++++++++--- src/Storages/StorageURL.h | 26 +++++++---- src/Storages/StorageXDBC.cpp | 15 ++++--- src/TableFunctions/TableFunctionFile.cpp | 11 +++-- src/TableFunctions/TableFunctionURL.cpp | 17 +++++--- .../01545_url_file_format_settings.reference | 4 ++ .../01545_url_file_format_settings.sql | 19 ++++++++ 9 files changed, 117 insertions(+), 39 deletions(-) create mode 100644 tests/queries/0_stateless/01545_url_file_format_settings.reference create mode 100644 tests/queries/0_stateless/01545_url_file_format_settings.sql diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index aa2696493f7..b130b2b0935 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -452,7 +452,7 @@ public: const StorageMetadataPtr & metadata_snapshot_, const CompressionMethod compression_method, const Context & context, - const FormatSettings & format_settings) + std::optional format_settings) : storage(storage_) , metadata_snapshot(metadata_snapshot_) , lock(storage.rwlock) @@ -618,18 +618,21 @@ void registerStorageFile(StorageFactory & factory) engine_args_ast[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args_ast[0], factory_args.local_context); storage_args.format_name = engine_args_ast[0]->as().value.safeGet(); + // Use format settings from global server context + settings from + // the SETTINGS clause of the create query. Settings from current + // session and user are ignored. if (factory_args.storage_def->settings) { - Context local_context_copy = factory_args.local_context; - local_context_copy.applySettingsChanges( + Context global_context_copy = factory_args.context; + global_context_copy.applySettingsChanges( factory_args.storage_def->settings->changes); storage_args.format_settings = getFormatSettings( - local_context_copy); + global_context_copy); } else { storage_args.format_settings = getFormatSettings( - factory_args.local_context); + factory_args.context); } if (engine_args_ast.size() == 1) /// Table in database diff --git a/src/Storages/StorageFile.h b/src/Storages/StorageFile.h index 695cd0d3912..3d678ee0ded 100644 --- a/src/Storages/StorageFile.h +++ b/src/Storages/StorageFile.h @@ -52,7 +52,7 @@ public: { StorageID table_id; std::string format_name; - FormatSettings format_settings; + std::optional format_settings; std::string compression_method; const ColumnsDescription & columns; const ConstraintsDescription & constraints; @@ -80,7 +80,11 @@ private: explicit StorageFile(CommonArguments args); std::string format_name; - FormatSettings format_settings; + // We use format settings from global context + CREATE query for File table + // function -- in this case, format_settings is set. + // For `file` table function, we use format settings from current user context, + // in this case, format_settings is not set. + std::optional format_settings; int table_fd = -1; String compression_method; diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 55c16496ba5..4483b012d69 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -32,6 +33,7 @@ IStorageURLBase::IStorageURLBase( const Context & context_, const StorageID & table_id_, const String & format_name_, + std::optional format_settings_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const String & compression_method_) @@ -40,6 +42,7 @@ IStorageURLBase::IStorageURLBase( , context_global(context_) , compression_method(compression_method_) , format_name(format_name_) + , format_settings(format_settings_) { context_global.getRemoteHostFilter().checkURL(uri); @@ -58,6 +61,7 @@ namespace const std::string & method, std::function callback, const String & format, + std::optional format_settings, String name_, const Block & sample_block, const Context & context, @@ -96,8 +100,10 @@ namespace context.getRemoteHostFilter()), compression_method); - reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); - reader = std::make_shared(reader, columns, context); + reader = FormatFactory::instance().getInput(format, *read_buf, + sample_block, context, max_block_size, format_settings); + reader = std::make_shared(reader, + columns, context); } String getName() const override @@ -134,6 +140,7 @@ namespace StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri, const String & format, + std::optional format_settings, const Block & sample_block_, const Context & context, const ConnectionTimeouts & timeouts, @@ -143,7 +150,8 @@ StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri, write_buf = wrapWriteBufferWithCompressionMethod( std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts), compression_method, 3); - writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context); + writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, + context, {} /* write callback */, format_settings); } @@ -214,6 +222,7 @@ Pipe IStorageURLBase::read( column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size), format_name, + format_settings, getName(), getHeaderBlock(column_names, metadata_snapshot), context, @@ -225,8 +234,8 @@ Pipe IStorageURLBase::read( BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/) { - return std::make_shared( - uri, format_name, metadata_snapshot->getSampleBlock(), context_global, + return std::make_shared(uri, format_name, + format_settings, metadata_snapshot->getSampleBlock(), context_global, ConnectionTimeouts::getHTTPTimeouts(context_global), chooseCompressionMethod(uri.toString(), compression_method)); } @@ -255,16 +264,38 @@ void registerStorageURL(StorageFactory & factory) { engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context); compression_method = engine_args[2]->as().value.safeGet(); - } else compression_method = "auto"; + } + else + { + compression_method = "auto"; + } + + // Use format settings from global server context + settings from + // the SETTINGS clause of the create query. Settings from current + // session and user are ignored. + FormatSettings format_settings; + if (args.storage_def->settings) + { + Context global_context_copy = args.context; + global_context_copy.applySettingsChanges( + args.storage_def->settings->changes); + format_settings = getFormatSettings(global_context_copy); + } + else + { + format_settings = getFormatSettings(args.context); + } return StorageURL::create( uri, args.table_id, format_name, + format_settings, args.columns, args.constraints, args.context, compression_method); }, { + .supports_settings = true, .source_access_type = AccessType::URL, }); } diff --git a/src/Storages/StorageURL.h b/src/Storages/StorageURL.h index 7983ad71520..4d6fdd1435c 100644 --- a/src/Storages/StorageURL.h +++ b/src/Storages/StorageURL.h @@ -36,6 +36,7 @@ protected: const Context & context_, const StorageID & id_, const String & format_name_, + std::optional format_settings_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const String & compression_method_); @@ -44,6 +45,11 @@ protected: const Context & context_global; String compression_method; String format_name; + // For URL engine, we use format settings from server context + `SETTINGS` + // clause of the `CREATE` query. In this case, format_settings is set. + // For `url` table function, we use settings from current query context. + // In this case, format_settings is not set. + std::optional format_settings; private: virtual std::string getReadMethod() const; @@ -73,6 +79,7 @@ public: StorageURLBlockOutputStream( const Poco::URI & uri, const String & format, + std::optional format_settings, const Block & sample_block_, const Context & context, const ConnectionTimeouts & timeouts, @@ -97,15 +104,16 @@ class StorageURL final : public ext::shared_ptr_helper, public IStor { friend struct ext::shared_ptr_helper; public: - StorageURL( - const Poco::URI & uri_, - const StorageID & table_id_, - const String & format_name_, - const ColumnsDescription & columns_, - const ConstraintsDescription & constraints_, - Context & context_, - const String & compression_method_) - : IStorageURLBase(uri_, context_, table_id_, format_name_, columns_, constraints_, compression_method_) + StorageURL(const Poco::URI & uri_, + const StorageID & table_id_, + const String & format_name_, + std::optional format_settings_, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + Context & context_, + const String & compression_method_) + : IStorageURLBase(uri_, context_, table_id_, format_name_, + format_settings_, columns_, constraints_, compression_method_) { } diff --git a/src/Storages/StorageXDBC.cpp b/src/Storages/StorageXDBC.cpp index 3350a4352db..590339c73cb 100644 --- a/src/Storages/StorageXDBC.cpp +++ b/src/Storages/StorageXDBC.cpp @@ -1,17 +1,18 @@ #include "StorageXDBC.h" + +#include +#include +#include #include #include #include +#include +#include +#include #include #include #include #include -#include -#include -#include -#include - -#include namespace DB { @@ -33,6 +34,7 @@ StorageXDBC::StorageXDBC( context_, table_id_, IXDBCBridgeHelper::DEFAULT_FORMAT, + getFormatSettings(context_), columns_, ConstraintsDescription{}, "" /* CompressionMethod */) @@ -121,6 +123,7 @@ BlockOutputStreamPtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageM return std::make_shared( request_uri, format_name, + getFormatSettings(context), metadata_snapshot->getSampleBlock(), context, ConnectionTimeouts::getHTTPTimeouts(context), diff --git a/src/TableFunctions/TableFunctionFile.cpp b/src/TableFunctions/TableFunctionFile.cpp index f7d76309a7f..13ac6dc2145 100644 --- a/src/TableFunctions/TableFunctionFile.cpp +++ b/src/TableFunctions/TableFunctionFile.cpp @@ -10,13 +10,16 @@ namespace DB { -StoragePtr TableFunctionFile::getStorage( - const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context, - const std::string & table_name, const std::string & compression_method_) const +StoragePtr TableFunctionFile::getStorage(const String & source, + const String & format_, const ColumnsDescription & columns, + Context & global_context, const std::string & table_name, + const std::string & compression_method_) const { + // For `file` table function, we are going to use format settings from the + // query context. StorageFile::CommonArguments args{StorageID(getDatabaseName(), table_name), format_, - getFormatSettings(global_context), + std::nullopt /*format settings*/, compression_method_, columns, ConstraintsDescription{}, diff --git a/src/TableFunctions/TableFunctionURL.cpp b/src/TableFunctions/TableFunctionURL.cpp index 6139e6ffecb..1c0109e892b 100644 --- a/src/TableFunctions/TableFunctionURL.cpp +++ b/src/TableFunctions/TableFunctionURL.cpp @@ -1,10 +1,12 @@ -#include -#include -#include -#include #include -#include + #include "registerTableFunctions.h" +#include +#include +#include +#include +#include +#include namespace DB @@ -14,8 +16,9 @@ StoragePtr TableFunctionURL::getStorage( const std::string & table_name, const String & compression_method_) const { Poco::URI uri(source); - return StorageURL::create(uri, StorageID(getDatabaseName(), table_name), format_, columns, ConstraintsDescription{}, - global_context, compression_method_); + return StorageURL::create(uri, StorageID(getDatabaseName(), table_name), + format_, std::nullopt /*format settings*/, columns, + ConstraintsDescription{}, global_context, compression_method_); } void registerTableFunctionURL(TableFunctionFactory & factory) diff --git a/tests/queries/0_stateless/01545_url_file_format_settings.reference b/tests/queries/0_stateless/01545_url_file_format_settings.reference new file mode 100644 index 00000000000..8a6795a16b8 --- /dev/null +++ b/tests/queries/0_stateless/01545_url_file_format_settings.reference @@ -0,0 +1,4 @@ +1 2 +1 2 +1 2 +1 2 diff --git a/tests/queries/0_stateless/01545_url_file_format_settings.sql b/tests/queries/0_stateless/01545_url_file_format_settings.sql new file mode 100644 index 00000000000..2ede999b1d0 --- /dev/null +++ b/tests/queries/0_stateless/01545_url_file_format_settings.sql @@ -0,0 +1,19 @@ +create table file_delim(a int, b int) engine File(CSV, '01545_url_file_format_settings.csv') settings format_csv_delimiter = '|'; + +truncate table file_delim; + +insert into file_delim select 1, 2; + +-- select 1, 2 format CSV settings format_csv_delimiter='/'; +create table url_delim(a int, b int) engine URL('http://127.0.0.1:8123/?query=select%201%2C%202%20format%20CSV%20settings%20format_csv_delimiter%3D%27/%27%3B%0A', CSV) settings format_csv_delimiter = '/'; + +select * from file_delim; + +select * from url_delim; + +select * from file('01545_url_file_format_settings.csv', CSV, 'a int, b int') settings format_csv_delimiter = '|'; + +select * from url('http://127.0.0.1:8123/?query=select%201%2C%202%20format%20CSV%20settings%20format_csv_delimiter%3D%27/%27%3B%0A', CSV, 'a int, b int') settings format_csv_delimiter = '/'; + + +