Globs for url table function

This commit is contained in:
kssenii 2021-04-21 12:32:57 +00:00
parent 44112587d4
commit 4c4f765aba
6 changed files with 218 additions and 75 deletions

View File

@ -1,6 +1,5 @@
#include "StorageExternalDistributed.h"
#if USE_MYSQL || USE_LIBPQXX
#include <Storages/StorageFactory.h>
#include <Interpreters/evaluateConstantExpression.h>
@ -14,6 +13,7 @@
#include <Common/parseRemoteDescription.h>
#include <Storages/StorageMySQL.h>
#include <Storages/StoragePostgreSQL.h>
#include <Storages/StorageURL.h>
#include <common/logger_useful.h>
@ -101,7 +101,7 @@ StorageExternalDistributed::StorageExternalDistributed(
#endif
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unsupported table engine. Supported engines are: MySQL, PostgreSQL");
"Unsupported table engine. Supported engines are: MySQL, PostgreSQL, URL");
}
shards.emplace(std::move(shard));
@ -109,6 +109,43 @@ StorageExternalDistributed::StorageExternalDistributed(
}
StorageExternalDistributed::StorageExternalDistributed(
const String & addresses_description,
const StorageID & table_id,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const String & compression_method,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints,
ContextPtr context)
: IStorage(table_id)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns);
storage_metadata.setConstraints(constraints);
setInMemoryMetadata(storage_metadata);
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
std::vector<String> shards_descriptions = parseRemoteDescription(addresses_description, 0, addresses_description.size(), '|', max_addresses);
std::vector<std::pair<std::string, UInt16>> addresses;
for (const auto & shard_description : shards_descriptions)
{
Poco::URI uri(shard_description);
auto shard = StorageURL::create(
uri,
table_id,
format_name,
format_settings,
columns, constraints, context,
compression_method);
shards.emplace(std::move(shard));
LOG_DEBUG(&Poco::Logger::get("StorageURLDistributed"), "Adding URL: {}", shard_description);
}
}
Pipe StorageExternalDistributed::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -151,39 +188,62 @@ void registerStorageExternalDistributed(StorageFactory & factory)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.getLocalContext());
const String & engine_name = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
const String & cluster_description = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
const String & remote_database = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
const String & remote_table = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
const String & username = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
const String & password = engine_args[5]->as<ASTLiteral &>().value.safeGet<String>();
const String & addresses_description = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
StorageExternalDistributed::ExternalStorageEngine table_engine;
if (engine_name == "MySQL")
table_engine = StorageExternalDistributed::ExternalStorageEngine::MySQL;
else if (engine_name == "PostgreSQL")
table_engine = StorageExternalDistributed::ExternalStorageEngine::PostgreSQL;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"External storage engine {} is not supported for StorageExternalDistributed. Supported engines are: MySQL, PostgreSQL",
engine_name);
if (engine_name == "URL")
{
table_engine = StorageExternalDistributed::ExternalStorageEngine::URL;
return StorageExternalDistributed::create(
args.table_id,
table_engine,
cluster_description,
remote_database,
remote_table,
username,
password,
args.columns,
args.constraints,
args.getContext());
const String & format_name = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
String compression_method = "auto";
if (engine_args.size() == 4)
compression_method = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
return StorageExternalDistributed::create(
addresses_description,
args.table_id,
format_name,
format_settings,
compression_method,
args.columns,
args.constraints,
args.getContext());
}
else
{
if (engine_name == "MySQL")
table_engine = StorageExternalDistributed::ExternalStorageEngine::MySQL;
else if (engine_name == "PostgreSQL")
table_engine = StorageExternalDistributed::ExternalStorageEngine::PostgreSQL;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"External storage engine {} is not supported for StorageExternalDistributed. Supported engines are: MySQL, PostgreSQL, URL",
engine_name);
const String & remote_database = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
const String & remote_table = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
const String & username = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
const String & password = engine_args[5]->as<ASTLiteral &>().value.safeGet<String>();
return StorageExternalDistributed::create(
args.table_id,
table_engine,
addresses_description,
remote_database,
remote_table,
username,
password,
args.columns,
args.constraints,
args.getContext());
}
},
{
.source_access_type = AccessType::MYSQL,
.source_access_type = AccessType::SOURCES,
});
}
}
#endif

View File

@ -4,11 +4,8 @@
#include "config_core.h"
#endif
#if USE_MYSQL || USE_LIBPQXX
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <mysqlxx/PoolWithFailover.h>
namespace DB
@ -28,7 +25,7 @@ public:
{
MySQL,
PostgreSQL,
Default
URL
};
std::string getName() const override { return "ExternalDistributed"; }
@ -55,11 +52,19 @@ protected:
const ConstraintsDescription & constraints_,
ContextPtr context_);
StorageExternalDistributed(
const String & uri,
const StorageID & table_id,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const String & compression_method,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints,
ContextPtr context);
private:
using Shards = std::unordered_set<StoragePtr>;
Shards shards;
};
}
#endif

View File

@ -1,4 +1,3 @@
#include <Storages/StorageFactory.h>
#include <Storages/StorageURL.h>
#include <Interpreters/Context.h>
@ -22,6 +21,8 @@
#include <Poco/Net/HTTPRequest.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
#include <common/logger_useful.h>
#include <Common/parseRemoteDescription.h>
namespace DB
@ -256,6 +257,43 @@ StorageURL::StorageURL(const Poco::URI & uri_,
context_->getRemoteHostFilter().checkURL(uri);
}
FormatSettings StorageURL::getFormatSettingsFromArgs(const StorageFactory::Arguments & args)
{
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
FormatSettings format_settings;
if (args.storage_def->settings)
{
FormatFactorySettings user_format_settings;
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
{
user_format_settings.set(change.name, change.value);
}
}
// Apply changes from SETTINGS clause, with validation.
user_format_settings.applyChanges(args.storage_def->settings->changes);
format_settings = getFormatSettings(args.getContext(),
user_format_settings);
}
else
{
format_settings = getFormatSettings(args.getContext());
}
return format_settings;
}
void registerStorageURL(StorageFactory & factory)
{
factory.registerStorage("URL", [](const StorageFactory::Arguments & args)
@ -268,53 +306,21 @@ void registerStorageURL(StorageFactory & factory)
engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.getLocalContext());
String url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
const String & url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
Poco::URI uri(url);
engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.getLocalContext());
String format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
const String & format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
String compression_method;
String compression_method = "auto";
if (engine_args.size() == 3)
{
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.getLocalContext());
compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
else
{
compression_method = "auto";
}
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
FormatSettings format_settings;
if (args.storage_def->settings)
{
FormatFactorySettings user_format_settings;
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
{
user_format_settings.set(change.name, change.value);
}
}
// Apply changes from SETTINGS clause, with validation.
user_format_settings.applyChanges(args.storage_def->settings->changes);
format_settings = getFormatSettings(args.getContext(),
user_format_settings);
}
else
{
format_settings = getFormatSettings(args.getContext());
}
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
return StorageURL::create(
uri,

View File

@ -6,6 +6,7 @@
#include <DataStreams/IBlockOutputStream.h>
#include <Formats/FormatSettings.h>
#include <IO/CompressionMethod.h>
#include <Storages/StorageFactory.h>
namespace DB
@ -124,5 +125,7 @@ public:
{
return metadata_snapshot->getSampleBlock();
}
static FormatSettings getFormatSettingsFromArgs(const StorageFactory::Arguments & args);
};
}

View File

@ -7,6 +7,9 @@
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageURL.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <common/logger_useful.h>
#include <Common/parseRemoteDescription.h>
#include <Storages/StorageExternalDistributed.h>
namespace DB
@ -16,9 +19,25 @@ StoragePtr TableFunctionURL::getStorage(
const std::string & table_name, const String & compression_method_) const
{
Poco::URI uri(source);
return StorageURL::create(uri, StorageID(getDatabaseName(), table_name),
format_, std::nullopt /*format settings*/, columns,
ConstraintsDescription{}, global_context, compression_method_);
if (source.find("{") == std::string::npos)
{
return StorageURL::create(uri, StorageID(getDatabaseName(), table_name),
format_, std::nullopt /*format settings*/, columns,
ConstraintsDescription{}, global_context, compression_method_);
}
else
{
return StorageExternalDistributed::create(
source,
StorageID(getDatabaseName(), table_name),
format_,
std::nullopt,
compression_method_,
columns,
ConstraintsDescription{},
global_context);
}
}
void registerTableFunctionURL(TableFunctionFactory & factory)

View File

@ -26,6 +26,56 @@ def test_url_without_redirect(started_cluster):
assert node1.query("select * from WebHDFSStorage") == "1\tMark\t72.53\n"
def test_url_with_globs_1(started_cluster):
query = "SELECT DISTINCT(users) FROM ("
for i in range (10, 20):
query += """
SELECT DISTINCT(users) FROM
(
SELECT JSONExtractString(data, 'actor', 'login') AS users,
JSONExtractString(data, 'payload', 'comment', 'body') AS body
FROM url('https://data.gharchive.org/2020-11-11-{}.json.gz', TSV, 'data String')
)
UNION ALL
""".format(i)
query += """
SELECT DISTINCT(users) FROM
(
SELECT JSONExtractString(data, 'actor', 'login') AS users,
JSONExtractString(data, 'payload', 'comment', 'body') AS body
FROM url('https://data.gharchive.org/2020-11-11-20.json.gz', TSV, 'data String')
)) ORDER BY users"""
expected = node1.query(query)
result = node1.query("""
SELECT DISTINCT(users) FROM
(
SELECT JSONExtractString(data, 'actor', 'login') AS users,
JSONExtractString(data, 'payload', 'comment', 'body') AS body
FROM url('https://data.gharchive.org/2020-11-11-{10..20}.json.gz', TSV, 'data String')
)
ORDER BY users;
""")
assert(result == expected)
def test_url_with_globs_2(started_cluster):
# 6 addresses
started_cluster.hdfs_api.write_data("/simple_storage_1_1", "1\n")
started_cluster.hdfs_api.write_data("/simple_storage_1_2", "2\n")
started_cluster.hdfs_api.write_data("/simple_storage_1_3", "3\n")
started_cluster.hdfs_api.write_data("/simple_storage_2_1", "4\n")
started_cluster.hdfs_api.write_data("/simple_storage_2_2", "5\n")
started_cluster.hdfs_api.write_data("/simple_storage_2_3", "6\n")
result = node1.query(
"select * from url('http://hdfs1:50075/webhdfs/v1/simple_storage_{1..2}_{1..3}?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', 'TSV', 'data String') as data order by data")
assert result == "1\n2\n3\n4\n5\n6\n"
def test_url_with_redirect_not_allowed(started_cluster):
started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"