add support of settings to URL, fix url and file table functions

This commit is contained in:
Alexander Kuzmenkov 2020-11-05 14:28:20 +03:00
parent 0f22a9dfcd
commit 14f31f5117
9 changed files with 117 additions and 39 deletions

View File

@ -452,7 +452,7 @@ public:
const StorageMetadataPtr & metadata_snapshot_, const StorageMetadataPtr & metadata_snapshot_,
const CompressionMethod compression_method, const CompressionMethod compression_method,
const Context & context, const Context & context,
const FormatSettings & format_settings) std::optional<FormatSettings> format_settings)
: storage(storage_) : storage(storage_)
, metadata_snapshot(metadata_snapshot_) , metadata_snapshot(metadata_snapshot_)
, lock(storage.rwlock) , lock(storage.rwlock)
@ -618,18 +618,21 @@ void registerStorageFile(StorageFactory & factory)
engine_args_ast[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args_ast[0], factory_args.local_context); engine_args_ast[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args_ast[0], factory_args.local_context);
storage_args.format_name = engine_args_ast[0]->as<ASTLiteral &>().value.safeGet<String>(); storage_args.format_name = engine_args_ast[0]->as<ASTLiteral &>().value.safeGet<String>();
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
if (factory_args.storage_def->settings) if (factory_args.storage_def->settings)
{ {
Context local_context_copy = factory_args.local_context; Context global_context_copy = factory_args.context;
local_context_copy.applySettingsChanges( global_context_copy.applySettingsChanges(
factory_args.storage_def->settings->changes); factory_args.storage_def->settings->changes);
storage_args.format_settings = getFormatSettings( storage_args.format_settings = getFormatSettings(
local_context_copy); global_context_copy);
} }
else else
{ {
storage_args.format_settings = getFormatSettings( storage_args.format_settings = getFormatSettings(
factory_args.local_context); factory_args.context);
} }
if (engine_args_ast.size() == 1) /// Table in database if (engine_args_ast.size() == 1) /// Table in database

View File

@ -52,7 +52,7 @@ public:
{ {
StorageID table_id; StorageID table_id;
std::string format_name; std::string format_name;
FormatSettings format_settings; std::optional<FormatSettings> format_settings;
std::string compression_method; std::string compression_method;
const ColumnsDescription & columns; const ColumnsDescription & columns;
const ConstraintsDescription & constraints; const ConstraintsDescription & constraints;
@ -80,7 +80,11 @@ private:
explicit StorageFile(CommonArguments args); explicit StorageFile(CommonArguments args);
std::string format_name; std::string format_name;
FormatSettings format_settings; // We use format settings from global context + CREATE query for File table
// function -- in this case, format_settings is set.
// For `file` table function, we use format settings from current user context,
// in this case, format_settings is not set.
std::optional<FormatSettings> format_settings;
int table_fd = -1; int table_fd = -1;
String compression_method; String compression_method;

View File

@ -3,6 +3,7 @@
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
@ -32,6 +33,7 @@ IStorageURLBase::IStorageURLBase(
const Context & context_, const Context & context_,
const StorageID & table_id_, const StorageID & table_id_,
const String & format_name_, const String & format_name_,
std::optional<FormatSettings> format_settings_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
const String & compression_method_) const String & compression_method_)
@ -40,6 +42,7 @@ IStorageURLBase::IStorageURLBase(
, context_global(context_) , context_global(context_)
, compression_method(compression_method_) , compression_method(compression_method_)
, format_name(format_name_) , format_name(format_name_)
, format_settings(format_settings_)
{ {
context_global.getRemoteHostFilter().checkURL(uri); context_global.getRemoteHostFilter().checkURL(uri);
@ -58,6 +61,7 @@ namespace
const std::string & method, const std::string & method,
std::function<void(std::ostream &)> callback, std::function<void(std::ostream &)> callback,
const String & format, const String & format,
std::optional<FormatSettings> format_settings,
String name_, String name_,
const Block & sample_block, const Block & sample_block,
const Context & context, const Context & context,
@ -96,8 +100,10 @@ namespace
context.getRemoteHostFilter()), context.getRemoteHostFilter()),
compression_method); compression_method);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); reader = FormatFactory::instance().getInput(format, *read_buf,
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns, context); sample_block, context, max_block_size, format_settings);
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader,
columns, context);
} }
String getName() const override String getName() const override
@ -134,6 +140,7 @@ namespace
StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri, StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri,
const String & format, const String & format,
std::optional<FormatSettings> format_settings,
const Block & sample_block_, const Block & sample_block_,
const Context & context, const Context & context,
const ConnectionTimeouts & timeouts, const ConnectionTimeouts & timeouts,
@ -143,7 +150,8 @@ StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri,
write_buf = wrapWriteBufferWithCompressionMethod( write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts), std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts),
compression_method, 3); compression_method, 3);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context); writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block,
context, {} /* write callback */, format_settings);
} }
@ -214,6 +222,7 @@ Pipe IStorageURLBase::read(
column_names, metadata_snapshot, query_info, column_names, metadata_snapshot, query_info,
context, processed_stage, max_block_size), context, processed_stage, max_block_size),
format_name, format_name,
format_settings,
getName(), getName(),
getHeaderBlock(column_names, metadata_snapshot), getHeaderBlock(column_names, metadata_snapshot),
context, context,
@ -225,8 +234,8 @@ Pipe IStorageURLBase::read(
BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/) BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{ {
return std::make_shared<StorageURLBlockOutputStream>( return std::make_shared<StorageURLBlockOutputStream>(uri, format_name,
uri, format_name, metadata_snapshot->getSampleBlock(), context_global, format_settings, metadata_snapshot->getSampleBlock(), context_global,
ConnectionTimeouts::getHTTPTimeouts(context_global), ConnectionTimeouts::getHTTPTimeouts(context_global),
chooseCompressionMethod(uri.toString(), compression_method)); chooseCompressionMethod(uri.toString(), compression_method));
} }
@ -255,16 +264,38 @@ void registerStorageURL(StorageFactory & factory)
{ {
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context); engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>(); compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
} else compression_method = "auto"; }
else
{
compression_method = "auto";
}
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
FormatSettings format_settings;
if (args.storage_def->settings)
{
Context global_context_copy = args.context;
global_context_copy.applySettingsChanges(
args.storage_def->settings->changes);
format_settings = getFormatSettings(global_context_copy);
}
else
{
format_settings = getFormatSettings(args.context);
}
return StorageURL::create( return StorageURL::create(
uri, uri,
args.table_id, args.table_id,
format_name, format_name,
format_settings,
args.columns, args.constraints, args.context, args.columns, args.constraints, args.context,
compression_method); compression_method);
}, },
{ {
.supports_settings = true,
.source_access_type = AccessType::URL, .source_access_type = AccessType::URL,
}); });
} }

View File

@ -36,6 +36,7 @@ protected:
const Context & context_, const Context & context_,
const StorageID & id_, const StorageID & id_,
const String & format_name_, const String & format_name_,
std::optional<FormatSettings> format_settings_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
const String & compression_method_); const String & compression_method_);
@ -44,6 +45,11 @@ protected:
const Context & context_global; const Context & context_global;
String compression_method; String compression_method;
String format_name; String format_name;
// For URL engine, we use format settings from server context + `SETTINGS`
// clause of the `CREATE` query. In this case, format_settings is set.
// For `url` table function, we use settings from current query context.
// In this case, format_settings is not set.
std::optional<FormatSettings> format_settings;
private: private:
virtual std::string getReadMethod() const; virtual std::string getReadMethod() const;
@ -73,6 +79,7 @@ public:
StorageURLBlockOutputStream( StorageURLBlockOutputStream(
const Poco::URI & uri, const Poco::URI & uri,
const String & format, const String & format,
std::optional<FormatSettings> format_settings,
const Block & sample_block_, const Block & sample_block_,
const Context & context, const Context & context,
const ConnectionTimeouts & timeouts, const ConnectionTimeouts & timeouts,
@ -97,15 +104,16 @@ class StorageURL final : public ext::shared_ptr_helper<StorageURL>, public IStor
{ {
friend struct ext::shared_ptr_helper<StorageURL>; friend struct ext::shared_ptr_helper<StorageURL>;
public: public:
StorageURL( StorageURL(const Poco::URI & uri_,
const Poco::URI & uri_, const StorageID & table_id_,
const StorageID & table_id_, const String & format_name_,
const String & format_name_, std::optional<FormatSettings> format_settings_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
Context & context_, Context & context_,
const String & compression_method_) const String & compression_method_)
: IStorageURLBase(uri_, context_, table_id_, format_name_, columns_, constraints_, compression_method_) : IStorageURLBase(uri_, context_, table_id_, format_name_,
format_settings_, columns_, constraints_, compression_method_)
{ {
} }

View File

@ -1,17 +1,18 @@
#include "StorageXDBC.h" #include "StorageXDBC.h"
#include <DataStreams/IBlockOutputStream.h>
#include <Formats/FormatFactory.h>
#include <IO/ReadHelpers.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Path.h>
#include <Processors/Pipe.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Storages/StorageURL.h> #include <Storages/StorageURL.h>
#include <Storages/transformQueryForExternalDatabase.h> #include <Storages/transformQueryForExternalDatabase.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <IO/ReadHelpers.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Path.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Pipe.h>
namespace DB namespace DB
{ {
@ -33,6 +34,7 @@ StorageXDBC::StorageXDBC(
context_, context_,
table_id_, table_id_,
IXDBCBridgeHelper::DEFAULT_FORMAT, IXDBCBridgeHelper::DEFAULT_FORMAT,
getFormatSettings(context_),
columns_, columns_,
ConstraintsDescription{}, ConstraintsDescription{},
"" /* CompressionMethod */) "" /* CompressionMethod */)
@ -121,6 +123,7 @@ BlockOutputStreamPtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageM
return std::make_shared<StorageURLBlockOutputStream>( return std::make_shared<StorageURLBlockOutputStream>(
request_uri, request_uri,
format_name, format_name,
getFormatSettings(context),
metadata_snapshot->getSampleBlock(), metadata_snapshot->getSampleBlock(),
context, context,
ConnectionTimeouts::getHTTPTimeouts(context), ConnectionTimeouts::getHTTPTimeouts(context),

View File

@ -10,13 +10,16 @@
namespace DB namespace DB
{ {
StoragePtr TableFunctionFile::getStorage( StoragePtr TableFunctionFile::getStorage(const String & source,
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context, const String & format_, const ColumnsDescription & columns,
const std::string & table_name, const std::string & compression_method_) const Context & global_context, const std::string & table_name,
const std::string & compression_method_) const
{ {
// For `file` table function, we are going to use format settings from the
// query context.
StorageFile::CommonArguments args{StorageID(getDatabaseName(), table_name), StorageFile::CommonArguments args{StorageID(getDatabaseName(), table_name),
format_, format_,
getFormatSettings(global_context), std::nullopt /*format settings*/,
compression_method_, compression_method_,
columns, columns,
ConstraintsDescription{}, ConstraintsDescription{},

View File

@ -1,10 +1,12 @@
#include <Storages/StorageURL.h>
#include <Storages/ColumnsDescription.h>
#include <Access/AccessFlags.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionURL.h> #include <TableFunctions/TableFunctionURL.h>
#include <Poco/URI.h>
#include "registerTableFunctions.h" #include "registerTableFunctions.h"
#include <Access/AccessFlags.h>
#include <Formats/FormatFactory.h>
#include <Poco/URI.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageURL.h>
#include <TableFunctions/TableFunctionFactory.h>
namespace DB namespace DB
@ -14,8 +16,9 @@ StoragePtr TableFunctionURL::getStorage(
const std::string & table_name, const String & compression_method_) const const std::string & table_name, const String & compression_method_) const
{ {
Poco::URI uri(source); Poco::URI uri(source);
return StorageURL::create(uri, StorageID(getDatabaseName(), table_name), format_, columns, ConstraintsDescription{}, return StorageURL::create(uri, StorageID(getDatabaseName(), table_name),
global_context, compression_method_); format_, std::nullopt /*format settings*/, columns,
ConstraintsDescription{}, global_context, compression_method_);
} }
void registerTableFunctionURL(TableFunctionFactory & factory) void registerTableFunctionURL(TableFunctionFactory & factory)

View File

@ -0,0 +1,4 @@
1 2
1 2
1 2
1 2

View File

@ -0,0 +1,19 @@
create table file_delim(a int, b int) engine File(CSV, '01545_url_file_format_settings.csv') settings format_csv_delimiter = '|';
truncate table file_delim;
insert into file_delim select 1, 2;
-- select 1, 2 format CSV settings format_csv_delimiter='/';
create table url_delim(a int, b int) engine URL('http://127.0.0.1:8123/?query=select%201%2C%202%20format%20CSV%20settings%20format_csv_delimiter%3D%27/%27%3B%0A', CSV) settings format_csv_delimiter = '/';
select * from file_delim;
select * from url_delim;
select * from file('01545_url_file_format_settings.csv', CSV, 'a int, b int') settings format_csv_delimiter = '|';
select * from url('http://127.0.0.1:8123/?query=select%201%2C%202%20format%20CSV%20settings%20format_csv_delimiter%3D%27/%27%3B%0A', CSV, 'a int, b int') settings format_csv_delimiter = '/';