Merge branch 'master' into url-function-docs

This commit is contained in:
Dan Roscigno 2023-05-24 09:03:01 -04:00 committed by GitHub
commit 9082e7feec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
61 changed files with 2053 additions and 1163 deletions

2
.gitmodules vendored
View File

@ -267,7 +267,7 @@
url = https://github.com/ClickHouse/nats.c
[submodule "contrib/vectorscan"]
path = contrib/vectorscan
url = https://github.com/ClickHouse/vectorscan.git
url = https://github.com/VectorCamp/vectorscan.git
[submodule "contrib/c-ares"]
path = contrib/c-ares
url = https://github.com/ClickHouse/c-ares

2
contrib/vectorscan vendored

@ -1 +1 @@
Subproject commit 1f4d448314e581473103187765e4c949d01b4259
Subproject commit 38431d111781843741a781a57a6381a527d900a4

View File

@ -22,7 +22,7 @@ The minimum recommended Ubuntu version for development is 22.04 LTS.
### Install Prerequisites {#install-prerequisites}
``` bash
sudo apt-get install git cmake ccache python3 ninja-build nasm yasm gawk
sudo apt-get install git cmake ccache python3 ninja-build nasm yasm gawk lsb-release wget software-properties-common gnupg
```
### Install and Use the Clang compiler
@ -46,6 +46,11 @@ As of April 2023, any version of Clang >= 15 will work.
GCC as a compiler is not supported
To build with a specific Clang version:
:::tip
This is optional, if you are following along and just now installed Clang then check
to see what version you have installed before setting this environment variable.
:::
``` bash
export CC=clang-16
export CXX=clang++-16

View File

@ -1,48 +0,0 @@
---
slug: /en/sql-reference/aggregate-functions/reference/greatest
title: greatest
---
Aggregate function that returns the greatest across a list of values. All of the list members must be of comparable types.
Examples:
```sql
SELECT
toTypeName(greatest(toUInt8(1), 2, toUInt8(3), 3.)),
greatest(1, 2, toUInt8(3), 3.)
```
```response
┌─toTypeName(greatest(toUInt8(1), 2, toUInt8(3), 3.))─┬─greatest(1, 2, toUInt8(3), 3.)─┐
│ Float64 │ 3 │
└─────────────────────────────────────────────────────┴────────────────────────────────┘
```
:::note
The type returned is a Float64 as the UInt8 must be promoted to 64 bit for the comparison.
:::
```sql
SELECT greatest(['hello'], ['there'], ['world'])
```
```response
┌─greatest(['hello'], ['there'], ['world'])─┐
│ ['world'] │
└───────────────────────────────────────────┘
```
```sql
SELECT greatest(toDateTime32(now() + toIntervalDay(1)), toDateTime64(now(), 3))
```
```response
┌─greatest(toDateTime32(plus(now(), toIntervalDay(1))), toDateTime64(now(), 3))─┐
│ 2023-05-12 01:16:59.000 │
└──---──────────────────────────────────────────────────────────────────────────┘
```
:::note
The type returned is a DateTime64 as the DataTime32 must be promoted to 64 bit for the comparison.
:::
Also see [least](/docs/en/sql-reference/aggregate-functions/reference/least.md).

View File

@ -1,48 +0,0 @@
---
slug: /en/sql-reference/aggregate-functions/reference/least
title: least
---
Aggregate function that returns the least across a list of values. All of the list members must be of comparable types.
Examples:
```sql
SELECT
toTypeName(least(toUInt8(1), 2, toUInt8(3), 3.)),
least(1, 2, toUInt8(3), 3.)
```
```response
┌─toTypeName(least(toUInt8(1), 2, toUInt8(3), 3.))─┬─least(1, 2, toUInt8(3), 3.)─┐
│ Float64 │ 1 │
└──────────────────────────────────────────────────┴─────────────────────────────┘
```
:::note
The type returned is a Float64 as the UInt8 must be promoted to 64 bit for the comparison.
:::
```sql
SELECT least(['hello'], ['there'], ['world'])
```
```response
┌─least(['hello'], ['there'], ['world'])─┐
│ ['hello'] │
└────────────────────────────────────────┘
```
```sql
SELECT least(toDateTime32(now() + toIntervalDay(1)), toDateTime64(now(), 3))
```
```response
┌─least(toDateTime32(plus(now(), toIntervalDay(1))), toDateTime64(now(), 3))─┐
│ 2023-05-12 01:16:59.000 │
└────────────────────────────────────────────────────────────────────────────┘
```
:::note
The type returned is a DateTime64 as the DataTime32 must be promoted to 64 bit for the comparison.
:::
Also see [greatest](/docs/en/sql-reference/aggregate-functions/reference/greatest.md).

View File

@ -20,7 +20,7 @@ Strings are compared byte-by-byte. Note that this may lead to unexpected results
A string S1 which has another string S2 as prefix is considered longer than S2.
## equals
## equals, `=`, `==` operators
**Syntax**
@ -32,7 +32,7 @@ Alias:
- `a = b` (operator)
- `a == b` (operator)
## notEquals
## notEquals, `!=`, `<>` operators
**Syntax**
@ -44,7 +44,7 @@ Alias:
- `a != b` (operator)
- `a <> b` (operator)
## less
## less, `<` operator
**Syntax**
@ -55,7 +55,7 @@ less(a, b)
Alias:
- `a < b` (operator)
## greater
## greater, `>` operator
**Syntax**
@ -66,7 +66,7 @@ greater(a, b)
Alias:
- `a > b` (operator)
## lessOrEquals
## lessOrEquals, `<=` operator
**Syntax**

View File

@ -152,3 +152,85 @@ FROM LEFT_RIGHT
│ 4 │ ᴺᵁᴸᴸ │ Both equal │
└──────┴───────┴──────────────────┘
```
## greatest
Returns the greatest across a list of values. All of the list members must be of comparable types.
Examples:
```sql
SELECT greatest(1, 2, toUInt8(3), 3.) result, toTypeName(result) type;
```
```response
┌─result─┬─type────┐
│ 3 │ Float64 │
└────────┴─────────┘
```
:::note
The type returned is a Float64 as the UInt8 must be promoted to 64 bit for the comparison.
:::
```sql
SELECT greatest(['hello'], ['there'], ['world'])
```
```response
┌─greatest(['hello'], ['there'], ['world'])─┐
│ ['world'] │
└───────────────────────────────────────────┘
```
```sql
SELECT greatest(toDateTime32(now() + toIntervalDay(1)), toDateTime64(now(), 3))
```
```response
┌─greatest(toDateTime32(plus(now(), toIntervalDay(1))), toDateTime64(now(), 3))─┐
│ 2023-05-12 01:16:59.000 │
└──---──────────────────────────────────────────────────────────────────────────┘
```
:::note
The type returned is a DateTime64 as the DataTime32 must be promoted to 64 bit for the comparison.
:::
## least
Returns the least across a list of values. All of the list members must be of comparable types.
Examples:
```sql
SELECT least(1, 2, toUInt8(3), 3.) result, toTypeName(result) type;
```
```response
┌─result─┬─type────┐
│ 1 │ Float64 │
└────────┴─────────┘
```
:::note
The type returned is a Float64 as the UInt8 must be promoted to 64 bit for the comparison.
:::
```sql
SELECT least(['hello'], ['there'], ['world'])
```
```response
┌─least(['hello'], ['there'], ['world'])─┐
│ ['hello'] │
└────────────────────────────────────────┘
```
```sql
SELECT least(toDateTime32(now() + toIntervalDay(1)), toDateTime64(now(), 3))
```
```response
┌─least(toDateTime32(plus(now(), toIntervalDay(1))), toDateTime64(now(), 3))─┐
│ 2023-05-12 01:16:59.000 │
└────────────────────────────────────────────────────────────────────────────┘
```
:::note
The type returned is a DateTime64 as the DataTime32 must be promoted to 64 bit for the comparison.
:::

View File

@ -0,0 +1,62 @@
---
slug: /en/sql-reference/table-functions/urlCluster
sidebar_position: 55
sidebar_label: urlCluster
---
# urlCluster Table Function
Allows processing files from URL in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster, discloses asterics in URL file path, and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
urlCluster(cluster_name, URL, format, structure)
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- `URL` — HTTP or HTTPS server address, which can accept `GET` requests. Type: [String](../../sql-reference/data-types/string.md).
- `format` — [Format](../../interfaces/formats.md#formats) of the data. Type: [String](../../sql-reference/data-types/string.md).
- `structure` — Table structure in `'UserID UInt64, Name String'` format. Determines column names and types. Type: [String](../../sql-reference/data-types/string.md).
**Returned value**
A table with the specified format and structure and with data from the defined `URL`.
**Examples**
Getting the first 3 lines of a table that contains columns of `String` and [UInt32](../../sql-reference/data-types/int-uint.md) type from HTTP-server which answers in [CSV](../../interfaces/formats.md#csv) format.
1. Create a basic HTTP server using the standard Python 3 tools and start it:
```python
from http.server import BaseHTTPRequestHandler, HTTPServer
class CSVHTTPServer(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'text/csv')
self.end_headers()
self.wfile.write(bytes('Hello,1\nWorld,2\n', "utf-8"))
if __name__ == "__main__":
server_address = ('127.0.0.1', 12345)
HTTPServer(server_address, CSVHTTPServer).serve_forever()
```
``` sql
SELECT * FROM urlCluster('cluster_simple','http://127.0.0.1:12345', CSV, 'column1 String, column2 UInt32')
```
## Globs in URL
Patterns in curly brackets `{ }` are used to generate a set of shards or to specify failover addresses. Supported pattern types and examples see in the description of the [remote](remote.md#globs-in-addresses) function.
Character `|` inside patterns is used to specify failover addresses. They are iterated in the same order as listed in the pattern. The number of generated addresses is limited by [glob_expansion_max_elements](../../operations/settings/settings.md#glob_expansion_max_elements) setting.
**See Also**
- [HDFS engine](../../engines/table-engines/special/url.md)
- [URL table function](../../sql-reference/table-functions/url.md)

View File

@ -46,7 +46,7 @@ $ cd ..
为此,请创建以下文件:
/资源库/LaunchDaemons/limit.maxfiles.plist:
/Library/LaunchDaemons/limit.maxfiles.plist:
``` xml
<?xml version="1.0" encoding="UTF-8"?>

View File

@ -113,8 +113,8 @@ private:
void createFiber();
void destroyFiber();
Fiber fiber;
FiberStack fiber_stack;
Fiber fiber;
std::mutex fiber_lock;
std::exception_ptr exception;

View File

@ -571,7 +571,13 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
size_t bucket_idx = current_bucket->idx;
hash_join = makeInMemoryJoin();
size_t prev_keys_num = 0;
// If there is only one bucket, don't take this check.
if (hash_join && buckets.size() > 1)
{
// Use previous hash_join's keys number to estimate next hash_join's size is reasonable.
prev_keys_num = hash_join->getTotalRowCount();
}
for (bucket_idx = bucket_idx + 1; bucket_idx < buckets.size(); ++bucket_idx)
{
@ -585,6 +591,7 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
continue;
}
hash_join = makeInMemoryJoin(prev_keys_num);
auto right_reader = current_bucket->startJoining();
size_t num_rows = 0; /// count rows that were written and rehashed
while (Block block = right_reader.read())
@ -604,9 +611,9 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
return nullptr;
}
GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin()
GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin(size_t reserve_num)
{
return std::make_unique<InMemoryJoin>(table_join, right_sample_block, any_take_last_row);
return std::make_unique<InMemoryJoin>(table_join, right_sample_block, any_take_last_row, reserve_num);
}
Block GraceHashJoin::prepareRightBlock(const Block & block)
@ -646,6 +653,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
if (!current_block.rows())
return;
}
auto prev_keys_num = hash_join->getTotalRowCount();
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);
if (!hasMemoryOverflow(hash_join))
@ -654,7 +662,6 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
current_block = {};
auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
hash_join = nullptr;
buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2);
@ -674,7 +681,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
current_block = concatenateBlocks(current_blocks);
}
hash_join = makeInMemoryJoin();
hash_join = makeInMemoryJoin(prev_keys_num);
if (current_block.rows() > 0)
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);

View File

@ -90,7 +90,8 @@ public:
private:
void initBuckets();
/// Create empty join for in-memory processing.
InMemoryJoinPtr makeInMemoryJoin();
/// reserve_num for reserving space in hash table.
InMemoryJoinPtr makeInMemoryJoin(size_t reserve_num = 0);
/// Add right table block to the @join. Calls @rehash on overflow.
void addJoinedBlockImpl(Block block);

View File

@ -217,7 +217,7 @@ static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nulla
JoinCommon::removeColumnNullability(column);
}
HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_, bool any_take_last_row_)
HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_, bool any_take_last_row_, size_t reserve_num)
: table_join(table_join_)
, kind(table_join->kind())
, strictness(table_join->strictness())
@ -302,7 +302,7 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
}
for (auto & maps : data->maps)
dataMapInit(maps);
dataMapInit(maps, reserve_num);
}
HashJoin::Type HashJoin::chooseMethod(JoinKind kind, const ColumnRawPtrs & key_columns, Sizes & key_sizes)
@ -454,13 +454,15 @@ struct KeyGetterForType
using Type = typename KeyGetterForTypeImpl<type, Value, Mapped>::Type;
};
void HashJoin::dataMapInit(MapsVariant & map)
void HashJoin::dataMapInit(MapsVariant & map, size_t reserve_num)
{
if (kind == JoinKind::Cross)
return;
joinDispatchInit(kind, strictness, map);
joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) { map_.create(data->type); });
if (reserve_num)
joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) { map_.reserve(data->type, reserve_num); });
}
bool HashJoin::empty() const

View File

@ -146,7 +146,7 @@ public:
class HashJoin : public IJoin
{
public:
HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false);
HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false, size_t reserve_num = 0);
~HashJoin() override;
@ -217,6 +217,16 @@ public:
M(keys256) \
M(hashed)
/// Only for maps using hash table.
#define APPLY_FOR_HASH_JOIN_VARIANTS(M) \
M(key32) \
M(key64) \
M(key_string) \
M(key_fixed_string) \
M(keys128) \
M(keys256) \
M(hashed)
/// Used for reading from StorageJoin and applying joinGet function
#define APPLY_FOR_JOIN_VARIANTS_LIMITED(M) \
@ -266,6 +276,22 @@ public:
}
}
void reserve(Type which, size_t num)
{
switch (which)
{
case Type::EMPTY: break;
case Type::CROSS: break;
case Type::key8: break;
case Type::key16: break;
#define M(NAME) \
case Type::NAME: NAME->reserve(num); break;
APPLY_FOR_HASH_JOIN_VARIANTS(M)
#undef M
}
}
size_t getTotalRowCount(Type which) const
{
switch (which)
@ -409,7 +435,7 @@ private:
/// If set HashJoin instance is not available for modification (addJoinedBlock)
TableLockHolder storage_join_lock = nullptr;
void dataMapInit(MapsVariant &);
void dataMapInit(MapsVariant &, size_t);
void initRightBlockStructure(Block & saved_block_sample);

View File

@ -5,40 +5,38 @@
#include <Storages/HDFS/StorageHDFSCluster.h>
#include <Client/Connection.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeString.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/Context.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <QueryPipeline/narrowPipe.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Sources/RemoteSource.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/queryToString.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/StorageDictionary.h>
#include <Storages/addColumnsStructureToQueryWithClusterEngine.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
#include <TableFunctions/TableFunctionHDFSCluster.h>
#include <memory>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
StorageHDFSCluster::StorageHDFSCluster(
ContextPtr context_,
String cluster_name_,
const String & cluster_name_,
const String & uri_,
const StorageID & table_id_,
const String & format_name_,
@ -46,12 +44,10 @@ StorageHDFSCluster::StorageHDFSCluster(
const ConstraintsDescription & constraints_,
const String & compression_method_,
bool structure_argument_was_provided_)
: IStorageCluster(table_id_)
, cluster_name(cluster_name_)
: IStorageCluster(cluster_name_, table_id_, &Poco::Logger::get("StorageHDFSCluster (" + table_id_.table_name + ")"), structure_argument_was_provided_)
, uri(uri_)
, format_name(format_name_)
, compression_method(compression_method_)
, structure_argument_was_provided(structure_argument_was_provided_)
{
checkHDFSURL(uri_);
context_->getRemoteHostFilter().checkURL(Poco::URI(uri_));
@ -70,84 +66,17 @@ StorageHDFSCluster::StorageHDFSCluster(
setInMemoryMetadata(storage_metadata);
}
/// The code executes on initiator
Pipe StorageHDFSCluster::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t /*max_block_size*/,
size_t /*num_streams*/)
void StorageHDFSCluster::addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context)
{
auto cluster = getCluster(context);
auto extension = getTaskIteratorExtension(query_info.query, context);
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query);
if (!expression_list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function hdfsCluster, got '{}'", queryToString(query));
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
Block header;
if (context->getSettingsRef().allow_experimental_analyzer)
header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage).analyze());
else
header = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
Pipes pipes;
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
auto query_to_send = query_info.original_query->clone();
if (!structure_argument_was_provided)
addColumnsStructureToQueryWithClusterEngine(
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 3, getName());
auto new_context = IStorageCluster::updateSettingsForTableFunctionCluster(context, context->getSettingsRef());
const auto & current_settings = new_context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
for (const auto & shard_info : cluster->getShardsInfo())
{
auto try_results = shard_info.pool->getMany(timeouts, &current_settings, PoolMode::GET_MANY);
for (auto & try_result : try_results)
{
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
std::vector<IConnectionPool::Entry>{try_result},
queryToString(query_to_send),
header,
new_context,
/*throttler=*/nullptr,
scalars,
Tables(),
processed_stage,
extension);
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false, false));
}
}
storage_snapshot->check(column_names);
return Pipe::unitePipes(std::move(pipes));
}
QueryProcessingStage::Enum StorageHDFSCluster::getQueryProcessingStage(
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const
{
/// Initiator executes query on remote node.
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
if (to_stage >= QueryProcessingStage::Enum::WithMergeableState)
return QueryProcessingStage::Enum::WithMergeableState;
/// Follower just reads the data.
return QueryProcessingStage::Enum::FetchColumns;
TableFunctionHDFSCluster::addColumnsStructureToArguments(expression_list->children, structure, context);
}
ClusterPtr StorageHDFSCluster::getCluster(ContextPtr context) const
{
return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
}
RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(ASTPtr, ContextPtr context) const
RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(ASTPtr, const ContextPtr & context) const
{
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context, uri);
auto callback = std::make_shared<HDFSSource::IteratorWrapper>([iter = std::move(iterator)]() mutable -> String { return iter->next(); });
@ -161,7 +90,6 @@ NamesAndTypesList StorageHDFSCluster::getVirtuals() const
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
}
}
#endif

View File

@ -22,7 +22,7 @@ class StorageHDFSCluster : public IStorageCluster
public:
StorageHDFSCluster(
ContextPtr context_,
String cluster_name_,
const String & cluster_name_,
const String & uri_,
const StorageID & table_id_,
const String & format_name_,
@ -33,23 +33,16 @@ public:
std::string getName() const override { return "HDFSCluster"; }
Pipe read(const Names &, const StorageSnapshotPtr &, SelectQueryInfo &,
ContextPtr, QueryProcessingStage::Enum, size_t /*max_block_size*/, size_t /*num_streams*/) override;
QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
NamesAndTypesList getVirtuals() const override;
ClusterPtr getCluster(ContextPtr context) const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, ContextPtr context) const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
private:
String cluster_name;
void addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context) override;
String uri;
String format_name;
String compression_method;
bool structure_argument_was_provided;
};

View File

@ -0,0 +1,149 @@
#include "Storages/IStorageCluster.h"
#include "Common/Exception.h"
#include "Core/QueryProcessingStage.h"
#include <DataTypes/DataTypeString.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/Context.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <QueryPipeline/narrowPipe.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/Sources/RemoteSource.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Parsers/queryToString.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDictionary.h>
#include <memory>
#include <string>
namespace DB
{
IStorageCluster::IStorageCluster(
const String & cluster_name_,
const StorageID & table_id_,
Poco::Logger * log_,
bool structure_argument_was_provided_)
: IStorage(table_id_)
, log(log_)
, cluster_name(cluster_name_)
, structure_argument_was_provided(structure_argument_was_provided_)
{
}
/// The code executes on initiator
Pipe IStorageCluster::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t /*max_block_size*/,
size_t /*num_streams*/)
{
updateBeforeRead(context);
auto cluster = getCluster(context);
auto extension = getTaskIteratorExtension(query_info.query, context);
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
Block sample_block;
ASTPtr query_to_send = query_info.query;
if (context->getSettingsRef().allow_experimental_analyzer)
{
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage));
}
else
{
auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze());
sample_block = interpreter.getSampleBlock();
query_to_send = interpreter.getQueryInfo().query->clone();
}
const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
Pipes pipes;
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
if (!structure_argument_was_provided)
addColumnsStructureToQuery(query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), context);
RestoreQualifiedNamesVisitor::Data data;
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as<ASTSelectQuery &>(), 0));
data.remote_table.database = context->getCurrentDatabase();
data.remote_table.table = getName();
RestoreQualifiedNamesVisitor(data).visit(query_to_send);
AddDefaultDatabaseVisitor visitor(context, context->getCurrentDatabase(),
/* only_replace_current_database_function_= */false,
/* only_replace_in_join_= */true);
visitor.visit(query_to_send);
auto new_context = updateSettings(context, context->getSettingsRef());
const auto & current_settings = new_context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
for (const auto & shard_info : cluster->getShardsInfo())
{
auto try_results = shard_info.pool->getMany(timeouts, &current_settings, PoolMode::GET_MANY);
for (auto & try_result : try_results)
{
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
std::vector<IConnectionPool::Entry>{try_result},
queryToString(query_to_send),
sample_block,
new_context,
/*throttler=*/nullptr,
scalars,
Tables(),
processed_stage,
extension);
remote_query_executor->setLogger(log);
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false, false));
}
}
storage_snapshot->check(column_names);
return Pipe::unitePipes(std::move(pipes));
}
QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const
{
/// Initiator executes query on remote node.
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
if (to_stage >= QueryProcessingStage::Enum::WithMergeableState)
return QueryProcessingStage::Enum::WithMergeableState;
/// Follower just reads the data.
return QueryProcessingStage::Enum::FetchColumns;
}
ContextPtr IStorageCluster::updateSettings(ContextPtr context, const Settings & settings)
{
Settings new_settings = settings;
/// Cluster table functions should always skip unavailable shards.
new_settings.skip_unavailable_shards = true;
auto new_context = Context::createCopy(context);
new_context->setSettings(new_settings);
return new_context;
}
ClusterPtr IStorageCluster::getCluster(ContextPtr context) const
{
return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
}
}

View File

@ -3,6 +3,7 @@
#include <Storages/IStorage.h>
#include <Interpreters/Cluster.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Parsers/ASTExpressionList.h>
namespace DB
{
@ -15,26 +16,40 @@ namespace DB
class IStorageCluster : public IStorage
{
public:
IStorageCluster(
const String & cluster_name_,
const StorageID & table_id_,
Poco::Logger * log_,
bool structure_argument_was_provided_);
explicit IStorageCluster(const StorageID & table_id_) : IStorage(table_id_) {}
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t /*max_block_size*/,
size_t /*num_streams*/) override;
virtual ClusterPtr getCluster(ContextPtr context) const = 0;
ClusterPtr getCluster(ContextPtr context) const;
/// Query is needed for pruning by virtual columns (_file, _path)
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, ContextPtr context) const = 0;
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const = 0;
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
bool isRemote() const override { return true; }
static ContextPtr updateSettingsForTableFunctionCluster(ContextPtr context, const Settings & settings)
{
Settings new_settings = settings;
protected:
virtual void updateBeforeRead(const ContextPtr &) {}
/// Cluster table functions should always skip unavailable shards.
new_settings.skip_unavailable_shards = true;
virtual void addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context) = 0;
auto new_context = Context::createCopy(context);
new_context->setSettings(new_settings);
return new_context;
}
private:
ContextPtr updateSettings(ContextPtr context, const Settings & settings);
Poco::Logger * log;
String cluster_name;
bool structure_argument_was_provided;
};

View File

@ -506,52 +506,48 @@ void MergeTreeData::checkProperties(
auto all_columns = new_metadata.columns.getAllPhysical();
/// Order by check AST
if (old_metadata.hasSortingKey())
/// This is ALTER, not CREATE/ATTACH TABLE. Let us check that all new columns used in the sorting key
/// expression have just been added (so that the sorting order is guaranteed to be valid with the new key).
Names new_primary_key_columns = new_primary_key.column_names;
Names new_sorting_key_columns = new_sorting_key.column_names;
ASTPtr added_key_column_expr_list = std::make_shared<ASTExpressionList>();
const auto & old_sorting_key_columns = old_metadata.getSortingKeyColumns();
for (size_t new_i = 0, old_i = 0; new_i < sorting_key_size; ++new_i)
{
/// This is ALTER, not CREATE/ATTACH TABLE. Let us check that all new columns used in the sorting key
/// expression have just been added (so that the sorting order is guaranteed to be valid with the new key).
Names new_primary_key_columns = new_primary_key.column_names;
Names new_sorting_key_columns = new_sorting_key.column_names;
ASTPtr added_key_column_expr_list = std::make_shared<ASTExpressionList>();
const auto & old_sorting_key_columns = old_metadata.getSortingKeyColumns();
for (size_t new_i = 0, old_i = 0; new_i < sorting_key_size; ++new_i)
if (old_i < old_sorting_key_columns.size())
{
if (old_i < old_sorting_key_columns.size())
{
if (new_sorting_key_columns[new_i] != old_sorting_key_columns[old_i])
added_key_column_expr_list->children.push_back(new_sorting_key.expression_list_ast->children[new_i]);
else
++old_i;
}
else
if (new_sorting_key_columns[new_i] != old_sorting_key_columns[old_i])
added_key_column_expr_list->children.push_back(new_sorting_key.expression_list_ast->children[new_i]);
else
++old_i;
}
else
added_key_column_expr_list->children.push_back(new_sorting_key.expression_list_ast->children[new_i]);
}
if (!added_key_column_expr_list->children.empty())
if (!added_key_column_expr_list->children.empty())
{
auto syntax = TreeRewriter(getContext()).analyze(added_key_column_expr_list, all_columns);
Names used_columns = syntax->requiredSourceColumns();
NamesAndTypesList deleted_columns;
NamesAndTypesList added_columns;
old_metadata.getColumns().getAllPhysical().getDifference(all_columns, deleted_columns, added_columns);
for (const String & col : used_columns)
{
auto syntax = TreeRewriter(getContext()).analyze(added_key_column_expr_list, all_columns);
Names used_columns = syntax->requiredSourceColumns();
if (!added_columns.contains(col) || deleted_columns.contains(col))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Existing column {} is used in the expression that was added to the sorting key. "
"You can add expressions that use only the newly added columns",
backQuoteIfNeed(col));
NamesAndTypesList deleted_columns;
NamesAndTypesList added_columns;
old_metadata.getColumns().getAllPhysical().getDifference(all_columns, deleted_columns, added_columns);
for (const String & col : used_columns)
{
if (!added_columns.contains(col) || deleted_columns.contains(col))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Existing column {} is used in the expression that was added to the sorting key. "
"You can add expressions that use only the newly added columns",
backQuoteIfNeed(col));
if (new_metadata.columns.getDefaults().contains(col))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Newly added column {} has a default expression, so adding expressions that use "
"it to the sorting key is forbidden", backQuoteIfNeed(col));
}
if (new_metadata.columns.getDefaults().contains(col))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Newly added column {} has a default expression, so adding expressions that use "
"it to the sorting key is forbidden", backQuoteIfNeed(col));
}
}
@ -1081,7 +1077,7 @@ void MergeTreeData::PartLoadingTree::add(const MergeTreePartInfo & info, const S
else if (!prev_info.isDisjoint(info))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Part {} intersects previous part {}. It is a bug!",
"Part {} intersects previous part {}. It is a bug or a result of manual intervention in the server or ZooKeeper data",
name, prev->second->name);
}
}
@ -1098,7 +1094,7 @@ void MergeTreeData::PartLoadingTree::add(const MergeTreePartInfo & info, const S
else if (!next_info.isDisjoint(info))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Part {} intersects next part {}. It is a bug!",
"Part {} intersects next part {}. It is a bug or a result of manual intervention in the server or ZooKeeper data",
name, it->second->name);
}
}

View File

@ -152,7 +152,8 @@ static void splitAndModifyMutationCommands(
/// But we don't know for sure what happened.
auto part_metadata_version = part->getMetadataVersion();
auto table_metadata_version = table_metadata_snapshot->getMetadataVersion();
if (table_metadata_version <= part_metadata_version)
/// StorageMergeTree does not have metadata version
if (table_metadata_version <= part_metadata_version && part->storage.supportsReplication())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} with metadata version {} contains column {} that is absent "
"in table {} with metadata version {}",
part->name, part_metadata_version, column.name,

View File

@ -21,6 +21,30 @@
#include <Storages/MergeTree/IntersectionsIndexes.h>
#include <fmt/format.h>
namespace DB
{
struct Part
{
mutable RangesInDataPartDescription description;
// FIXME: This is needed to put this struct in set
// and modify through iterator
mutable std::set<size_t> replicas;
bool operator<(const Part & rhs) const { return description.info < rhs.description.info; }
};
}
template <>
struct fmt::formatter<DB::Part>
{
static constexpr auto parse(format_parse_context & ctx) { return ctx.begin(); }
template <typename FormatContext>
auto format(const DB::Part & part, FormatContext & ctx)
{
return format_to(ctx.out(), "{} in replicas [{}]", part.description.describe(), fmt::join(part.replicas, ", "));
}
};
namespace DB
{
@ -60,17 +84,6 @@ public:
virtual void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) = 0;
};
struct Part
{
mutable RangesInDataPartDescription description;
// FIXME: This is needed to put this struct in set
// and modify through iterator
mutable std::set<size_t> replicas;
bool operator<(const Part & rhs) const { return description.info < rhs.description.info; }
};
using Parts = std::set<Part>;
using PartRefs = std::deque<Parts::iterator>;
@ -207,14 +220,7 @@ void DefaultCoordinator::finalizeReadingState()
delayed_parts.pop_front();
}
String description;
for (const auto & part : all_parts_to_read)
{
description += part.description.describe();
description += fmt::format("Replicas: ({}) --- ", fmt::join(part.replicas, ","));
}
LOG_DEBUG(log, "Reading state is fully initialized: {}", description);
LOG_DEBUG(log, "Reading state is fully initialized: {}", fmt::join(all_parts_to_read, "; "));
}

View File

@ -46,6 +46,16 @@ public:
retryLoop(f, []() {});
}
/// retryLoop() executes f() until it succeeds/max_retries is reached/non-retrialable error is encountered
///
/// the callable f() can provide feedback in terms of errors in two ways:
/// 1. throw KeeperException exception:
/// in such case, retries are done only on hardware keeper errors
/// because non-hardware error codes are semantically not really errors, just a response
/// 2. set an error code in the ZooKeeperRetriesControl object (setUserError/setKeeperError)
/// The idea is that if the caller has some semantics on top of non-hardware keeper errors,
/// then it can provide feedback to retries controller via user errors
///
void retryLoop(auto && f, auto && iteration_cleanup)
{
while (canTry())

View File

@ -1297,7 +1297,7 @@ void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configur
configuration.auth_settings.no_sign_request = collection.getOrDefault<bool>("no_sign_request", false);
configuration.auth_settings.expiration_window_seconds = collection.getOrDefault<UInt64>("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS);
configuration.format = collection.getOrDefault<String>("format", "auto");
configuration.format = collection.getOrDefault<String>("format", configuration.format);
configuration.compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
configuration.structure = collection.getOrDefault<String>("structure", "auto");

View File

@ -4,36 +4,22 @@
#if USE_AWS_S3
#include "Common/Exception.h"
#include "Client/Connection.h"
#include "Core/QueryProcessingStage.h"
#include <DataTypes/DataTypeString.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <QueryPipeline/narrowPipe.h>
#include <QueryPipeline/Pipe.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/IStorage.h>
#include <Storages/StorageURL.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/getVirtualsForStorage.h>
#include <Storages/StorageDictionary.h>
#include <Storages/addColumnsStructureToQueryWithClusterEngine.h>
#include <Common/logger_useful.h>
#include <aws/core/auth/AWSCredentials.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
#include <Storages/getVirtualsForStorage.h>
#include <Common/Exception.h>
#include <Parsers/queryToString.h>
#include <TableFunctions/TableFunctionS3Cluster.h>
#include <memory>
#include <string>
@ -47,21 +33,15 @@ namespace ErrorCodes
}
StorageS3Cluster::StorageS3Cluster(
const Configuration & configuration_,
const String & cluster_name_,
const StorageS3::Configuration & configuration_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
bool structure_argument_was_provided_,
bool format_argument_was_provided_)
: IStorageCluster(table_id_)
, log(&Poco::Logger::get("StorageS3Cluster (" + table_id_.table_name + ")"))
bool structure_argument_was_provided_)
: IStorageCluster(cluster_name_, table_id_, &Poco::Logger::get("StorageS3Cluster (" + table_id_.table_name + ")"), structure_argument_was_provided_)
, s3_configuration{configuration_}
, cluster_name(configuration_.cluster_name)
, format_name(configuration_.format)
, compression_method(configuration_.compression_method)
, structure_argument_was_provided(structure_argument_was_provided_)
, format_argument_was_provided(format_argument_was_provided_)
{
context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration_.url.uri);
StorageInMemoryMetadata storage_metadata;
@ -69,8 +49,6 @@ StorageS3Cluster::StorageS3Cluster(
if (columns_.empty())
{
/// `distributed_processing` is set to false, because this code is executed on the initiator, so there is no callback set
/// for asking for the next tasks.
/// `format_settings` is set to std::nullopt, because StorageS3Cluster is used only as table function
auto columns = StorageS3::getTableStructureFromDataImpl(s3_configuration, /*format_settings=*/std::nullopt, context_);
storage_metadata.setColumns(columns);
@ -91,131 +69,21 @@ StorageS3Cluster::StorageS3Cluster(
virtual_block.insert({column.type->createColumn(), column.type, column.name});
}
void StorageS3Cluster::addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context)
{
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query);
if (!expression_list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function s3Cluster, got '{}'", queryToString(query));
TableFunctionS3Cluster::addColumnsStructureToArguments(expression_list->children, structure, context);
}
void StorageS3Cluster::updateConfigurationIfChanged(ContextPtr local_context)
{
s3_configuration.update(local_context);
}
namespace
{
void addColumnsStructureToQueryWithS3ClusterEngine(ASTPtr & query, const String & structure, bool format_argument_was_provided, const String & function_name)
{
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query);
if (!expression_list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function {}, got '{}'", function_name, queryToString(query));
auto structure_literal = std::make_shared<ASTLiteral>(structure);
if (!format_argument_was_provided)
{
auto format_literal = std::make_shared<ASTLiteral>("auto");
expression_list->children.push_back(format_literal);
}
expression_list->children.push_back(structure_literal);
}
}
/// The code executes on initiator
Pipe StorageS3Cluster::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t /*max_block_size*/,
size_t /*num_streams*/)
{
updateConfigurationIfChanged(context);
auto cluster = getCluster(context);
auto extension = getTaskIteratorExtension(query_info.query, context);
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
Block sample_block;
ASTPtr query_to_send = query_info.query;
if (context->getSettingsRef().allow_experimental_analyzer)
{
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage));
}
else
{
auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze());
sample_block = interpreter.getSampleBlock();
query_to_send = interpreter.getQueryInfo().query->clone();
}
const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
Pipes pipes;
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
if (!structure_argument_was_provided)
addColumnsStructureToQueryWithS3ClusterEngine(
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), format_argument_was_provided, getName());
RestoreQualifiedNamesVisitor::Data data;
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as<ASTSelectQuery &>(), 0));
data.remote_table.database = context->getCurrentDatabase();
data.remote_table.table = getName();
RestoreQualifiedNamesVisitor(data).visit(query_to_send);
AddDefaultDatabaseVisitor visitor(context, context->getCurrentDatabase(),
/* only_replace_current_database_function_= */false,
/* only_replace_in_join_= */true);
visitor.visit(query_to_send);
auto new_context = IStorageCluster::updateSettingsForTableFunctionCluster(context, context->getSettingsRef());
const auto & current_settings = new_context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
for (const auto & shard_info : cluster->getShardsInfo())
{
auto try_results = shard_info.pool->getMany(timeouts, &current_settings, PoolMode::GET_MANY);
for (auto & try_result : try_results)
{
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
std::vector<IConnectionPool::Entry>{try_result},
queryToString(query_to_send),
sample_block,
new_context,
/*throttler=*/nullptr,
scalars,
Tables(),
processed_stage,
extension);
remote_query_executor->setLogger(log);
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false, false));
}
}
storage_snapshot->check(column_names);
return Pipe::unitePipes(std::move(pipes));
}
QueryProcessingStage::Enum StorageS3Cluster::getQueryProcessingStage(
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const
{
/// Initiator executes query on remote node.
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
if (to_stage >= QueryProcessingStage::Enum::WithMergeableState)
return QueryProcessingStage::Enum::WithMergeableState;
/// Follower just reads the data.
return QueryProcessingStage::Enum::FetchColumns;
}
ClusterPtr StorageS3Cluster::getCluster(ContextPtr context) const
{
return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
}
RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(ASTPtr query, ContextPtr context) const
RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const
{
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.url, query, virtual_block, context);

View File

@ -21,46 +21,32 @@ class Context;
class StorageS3Cluster : public IStorageCluster
{
public:
struct Configuration : public StorageS3::Configuration
{
std::string cluster_name;
};
StorageS3Cluster(
const Configuration & configuration_,
const String & cluster_name_,
const StorageS3::Configuration & configuration_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
bool structure_argument_was_provided_,
bool format_argument_was_provided_);
bool structure_argument_was_provided_);
std::string getName() const override { return "S3Cluster"; }
Pipe read(const Names &, const StorageSnapshotPtr &, SelectQueryInfo &,
ContextPtr, QueryProcessingStage::Enum, size_t /*max_block_size*/, size_t /*num_streams*/) override;
QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
NamesAndTypesList getVirtuals() const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, ContextPtr context) const override;
ClusterPtr getCluster(ContextPtr context) const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
protected:
void updateConfigurationIfChanged(ContextPtr local_context);
private:
Poco::Logger * log;
void updateBeforeRead(const ContextPtr & context) override { updateConfigurationIfChanged(context); }
void addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context) override;
StorageS3::Configuration s3_configuration;
String cluster_name;
String format_name;
String compression_method;
NamesAndTypesList virtual_columns;
Block virtual_block;
bool structure_argument_was_provided;
bool format_argument_was_provided;
};

View File

@ -52,8 +52,8 @@ namespace ErrorCodes
}
static constexpr auto bad_arguments_error_message = "Storage URL requires 1-4 arguments: "
"url, name of used format (taken from file extension by default), "
"optional compression method, optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
"url, name of used format (taken from file extension by default), "
"optional compression method, optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
static const std::unordered_set<std::string_view> required_configuration_keys = {
"url",
@ -101,7 +101,8 @@ IStorageURLBase::IStorageURLBase(
const String & compression_method_,
const HTTPHeaderEntries & headers_,
const String & http_method_,
ASTPtr partition_by_)
ASTPtr partition_by_,
bool distributed_processing_)
: IStorage(table_id_)
, uri(uri_)
, compression_method(chooseCompressionMethod(Poco::URI(uri_).getPath(), compression_method_))
@ -110,6 +111,7 @@ IStorageURLBase::IStorageURLBase(
, headers(headers_)
, http_method(http_method_)
, partition_by(partition_by_)
, distributed_processing(distributed_processing_)
{
FormatFactory::instance().checkFormatName(format_name);
StorageInMemoryMetadata storage_metadata;
@ -135,7 +137,7 @@ namespace
HTTPHeaderEntries headers(headers_.begin(), headers_.end());
// Propagate OpenTelemetry trace context, if any, downstream.
const auto &current_trace_context = OpenTelemetry::CurrentContext();
const auto & current_trace_context = OpenTelemetry::CurrentContext();
if (current_trace_context.isTraceEnabled())
{
headers.emplace_back("traceparent", current_trace_context.composeTraceparentHeader());
@ -149,277 +151,268 @@ namespace
return headers;
}
class StorageURLSource : public ISource
StorageURLSource::FailoverOptions getFailoverOptions(const String & uri, size_t max_addresses)
{
using URIParams = std::vector<std::pair<String, String>>;
return parseRemoteDescription(uri, 0, uri.size(), '|', max_addresses);
}
}
public:
struct URIInfo
{
using FailoverOptions = std::vector<String>;
std::vector<FailoverOptions> uri_list_to_read;
std::atomic<size_t> next_uri_to_read = 0;
class StorageURLSource::DisclosedGlobIterator::Impl
{
public:
Impl(const String & uri, size_t max_addresses)
{
uris = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
}
bool need_path_column = false;
bool need_file_column = false;
};
using URIInfoPtr = std::shared_ptr<URIInfo>;
static void setCredentials(Poco::Net::HTTPBasicCredentials & credentials, const Poco::URI & request_uri)
{
const auto & user_info = request_uri.getUserInfo();
if (!user_info.empty())
{
std::size_t n = user_info.find(':');
if (n != std::string::npos)
{
credentials.setUsername(user_info.substr(0, n));
credentials.setPassword(user_info.substr(n + 1));
}
}
}
static Block getBlockForSource(const Block & block_for_format, const URIInfoPtr & uri_info)
{
auto res = block_for_format;
if (uri_info->need_path_column)
{
res.insert(
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
"_path"});
}
if (uri_info->need_file_column)
{
res.insert(
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
"_file"});
}
return res;
}
StorageURLSource(
URIInfoPtr uri_info_,
const std::string & http_method,
std::function<void(std::ostream &)> callback,
const String & format,
const std::optional<FormatSettings> & format_settings,
String name_,
const Block & sample_block,
ContextPtr context,
const ColumnsDescription & columns,
UInt64 max_block_size,
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method,
size_t download_threads,
const HTTPHeaderEntries & headers_ = {},
const URIParams & params = {},
bool glob_url = false)
: ISource(getBlockForSource(sample_block, uri_info_)), name(std::move(name_)), uri_info(uri_info_)
{
auto headers = getHeaders(headers_);
/// Lazy initialization. We should not perform requests in constructor, because we need to do it in query pipeline.
initialize = [=, this](const URIInfo::FailoverOptions & uri_options)
{
if (uri_options.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty url list");
auto first_option = uri_options.begin();
auto [actual_uri, buf_factory] = getFirstAvailableURIAndReadBuffer(
first_option,
uri_options.end(),
context,
params,
http_method,
callback,
timeouts,
credentials,
headers,
glob_url,
uri_options.size() == 1);
curr_uri = actual_uri;
try
{
total_size += buf_factory->getFileSize();
}
catch (...)
{
// we simply continue without total_size
}
// TODO: Pass max_parsing_threads and max_download_threads adjusted for num_streams.
auto input_format = FormatFactory::instance().getInputRandomAccess(
format,
std::move(buf_factory),
sample_block,
context,
max_block_size,
/* is_remote_fs */ true,
compression_method,
format_settings,
download_threads);
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
builder.addSimpleTransform(
[&](const Block & cur_header)
{ return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *input_format, context); });
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
};
}
String getName() const override { return name; }
Chunk generate() override
{
while (true)
{
if (isCancelled())
{
if (reader)
reader->cancel();
break;
}
if (!reader)
{
auto current_uri_pos = uri_info->next_uri_to_read.fetch_add(1);
if (current_uri_pos >= uri_info->uri_list_to_read.size())
return {};
auto current_uri_options = uri_info->uri_list_to_read[current_uri_pos];
initialize(current_uri_options);
}
Chunk chunk;
if (reader->pull(chunk))
{
UInt64 num_rows = chunk.getNumRows();
const String & path{curr_uri.getPath()};
if (uri_info->need_path_column)
{
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, path);
chunk.addColumn(column->convertToFullColumnIfConst());
}
if (uri_info->need_file_column)
{
const size_t last_slash_pos = path.find_last_of('/');
auto file_name = path.substr(last_slash_pos + 1);
auto column
= DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, std::move(file_name));
chunk.addColumn(column->convertToFullColumnIfConst());
}
if (num_rows && total_size)
updateRowsProgressApprox(
*this, chunk, total_size, total_rows_approx_accumulated, total_rows_count_times, total_rows_approx_max);
return chunk;
}
pipeline->reset();
reader.reset();
}
String next()
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= uris.size())
return {};
return uris[current_index];
}
size_t size()
{
return uris.size();
}
private:
Strings uris;
std::atomic_size_t index = 0;
};
StorageURLSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, size_t max_addresses)
: pimpl(std::make_shared<StorageURLSource::DisclosedGlobIterator::Impl>(uri, max_addresses)) {}
String StorageURLSource::DisclosedGlobIterator::next()
{
return pimpl->next();
}
size_t StorageURLSource::DisclosedGlobIterator::size()
{
return pimpl->size();
}
void StorageURLSource::setCredentials(Poco::Net::HTTPBasicCredentials & credentials, const Poco::URI & request_uri)
{
const auto & user_info = request_uri.getUserInfo();
if (!user_info.empty())
{
std::size_t n = user_info.find(':');
if (n != std::string::npos)
{
credentials.setUsername(user_info.substr(0, n));
credentials.setPassword(user_info.substr(n + 1));
}
}
}
Block StorageURLSource::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
{
for (const auto & virtual_column : requested_virtual_columns)
sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
return sample_block;
}
StorageURLSource::StorageURLSource(
const std::vector<NameAndTypePair> & requested_virtual_columns_,
std::shared_ptr<IteratorWrapper> uri_iterator_,
const std::string & http_method,
std::function<void(std::ostream &)> callback,
const String & format,
const std::optional<FormatSettings> & format_settings,
String name_,
const Block & sample_block,
ContextPtr context,
const ColumnsDescription & columns,
UInt64 max_block_size,
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method,
size_t download_threads,
const HTTPHeaderEntries & headers_,
const URIParams & params,
bool glob_url)
: ISource(getHeader(sample_block, requested_virtual_columns_)), name(std::move(name_)), requested_virtual_columns(requested_virtual_columns_), uri_iterator(uri_iterator_)
{
auto headers = getHeaders(headers_);
/// Lazy initialization. We should not perform requests in constructor, because we need to do it in query pipeline.
initialize = [=, this](const FailoverOptions & uri_options)
{
if (uri_options.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty url list");
auto first_option = uri_options.begin();
auto [actual_uri, buf_factory] = getFirstAvailableURIAndReadBuffer(
first_option,
uri_options.end(),
context,
params,
http_method,
callback,
timeouts,
credentials,
headers,
glob_url,
uri_options.size() == 1);
curr_uri = actual_uri;
try
{
total_size += buf_factory->getFileSize();
}
catch (...)
{
// we simply continue without total_size
}
static std::tuple<Poco::URI, SeekableReadBufferFactoryPtr> getFirstAvailableURIAndReadBuffer(
std::vector<String>::const_iterator & option,
const std::vector<String>::const_iterator & end,
ContextPtr context,
const URIParams & params,
const String & http_method,
std::function<void(std::ostream &)> callback,
const ConnectionTimeouts & timeouts,
Poco::Net::HTTPBasicCredentials & credentials,
const HTTPHeaderEntries & headers,
bool glob_url,
bool delay_initialization)
// TODO: Pass max_parsing_threads and max_download_threads adjusted for num_streams.
auto input_format = FormatFactory::instance().getInputRandomAccess(
format,
std::move(buf_factory),
sample_block,
context,
max_block_size,
/* is_remote_fs */ true,
compression_method,
format_settings,
download_threads);
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
builder.addSimpleTransform([&](const Block & cur_header)
{ return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *input_format, context); });
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
};
}
Chunk StorageURLSource::generate()
{
while (true)
{
if (isCancelled())
{
String first_exception_message;
ReadSettings read_settings = context->getReadSettings();
if (reader)
reader->cancel();
break;
}
size_t options = std::distance(option, end);
for (; option != end; ++option)
if (!reader)
{
auto current_uri = (*uri_iterator)();
if (current_uri.empty())
return {};
initialize(current_uri);
}
Chunk chunk;
if (reader->pull(chunk))
{
UInt64 num_rows = chunk.getNumRows();
if (num_rows && total_size)
updateRowsProgressApprox(
*this, chunk, total_size, total_rows_approx_accumulated, total_rows_count_times, total_rows_approx_max);
const String & path{curr_uri.getPath()};
for (const auto & virtual_column : requested_virtual_columns)
{
bool skip_url_not_found_error = glob_url && read_settings.http_skip_not_found_url_for_globs && option == std::prev(end);
auto request_uri = Poco::URI(*option);
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
setCredentials(credentials, request_uri);
const auto settings = context->getSettings();
auto res = std::make_unique<RangedReadWriteBufferFromHTTPFactory>(
request_uri,
http_method,
callback,
timeouts,
credentials,
settings.max_http_get_redirects,
settings.max_read_buffer_size,
read_settings,
headers,
&context->getRemoteHostFilter(),
delay_initialization,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
if (options > 1)
if (virtual_column.name == "_path")
{
// Send a HEAD request to check availability.
try
{
res->getFileInfo();
}
catch (...)
{
if (first_exception_message.empty())
first_exception_message = getCurrentExceptionMessage(false);
tryLogCurrentException(__PRETTY_FUNCTION__);
continue;
}
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, path)->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = path.find_last_of('/');
auto column = virtual_column.type->createColumnConst(num_rows, path.substr(last_slash_pos + 1));
chunk.addColumn(column->convertToFullColumnIfConst());
}
return std::make_tuple(request_uri, std::move(res));
}
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri ({}) options are unreachable: {}", options, first_exception_message);
return chunk;
}
private:
using InitializeFunc = std::function<void(const URIInfo::FailoverOptions &)>;
InitializeFunc initialize;
pipeline->reset();
reader.reset();
}
return {};
}
String name;
URIInfoPtr uri_info;
Poco::URI curr_uri;
std::tuple<Poco::URI, SeekableReadBufferFactoryPtr> StorageURLSource::getFirstAvailableURIAndReadBuffer(
std::vector<String>::const_iterator & option,
const std::vector<String>::const_iterator & end,
ContextPtr context,
const URIParams & params,
const String & http_method,
std::function<void(std::ostream &)> callback,
const ConnectionTimeouts & timeouts,
Poco::Net::HTTPBasicCredentials & credentials,
const HTTPHeaderEntries & headers,
bool glob_url,
bool delay_initialization)
{
String first_exception_message;
ReadSettings read_settings = context->getReadSettings();
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
size_t options = std::distance(option, end);
for (; option != end; ++option)
{
bool skip_url_not_found_error = glob_url && read_settings.http_skip_not_found_url_for_globs && option == std::prev(end);
auto request_uri = Poco::URI(*option);
Poco::Net::HTTPBasicCredentials credentials;
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
size_t total_size = 0;
UInt64 total_rows_approx_max = 0;
size_t total_rows_count_times = 0;
UInt64 total_rows_approx_accumulated = 0;
};
setCredentials(credentials, request_uri);
const auto settings = context->getSettings();
auto res = std::make_unique<RangedReadWriteBufferFromHTTPFactory>(
request_uri,
http_method,
callback,
timeouts,
credentials,
settings.max_http_get_redirects,
settings.max_read_buffer_size,
read_settings,
headers,
&context->getRemoteHostFilter(),
delay_initialization,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
if (options > 1)
{
// Send a HEAD request to check availability.
try
{
res->getFileInfo();
}
catch (...)
{
if (first_exception_message.empty())
first_exception_message = getCurrentExceptionMessage(false);
tryLogCurrentException(__PRETTY_FUNCTION__);
continue;
}
}
return std::make_tuple(request_uri, std::move(res));
}
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri ({}) options are unreachable: {}", options, first_exception_message);
}
StorageURLSink::StorageURLSink(
@ -674,61 +667,66 @@ Pipe IStorageURLBase::read(
block_for_format = storage_snapshot->metadata->getSampleBlock();
}
size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
for (const auto & column : column_names)
std::unordered_set<String> column_names_set(column_names.begin(), column_names.end());
std::vector<NameAndTypePair> requested_virtual_columns;
for (const auto & virtual_column : getVirtuals())
{
if (column == "_path")
uri_info->need_path_column = true;
if (column == "_file")
uri_info->need_file_column = true;
if (column_names_set.contains(virtual_column.name))
requested_virtual_columns.push_back(virtual_column);
}
if (urlWithGlobs(uri))
size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
std::shared_ptr<StorageURLSource::IteratorWrapper> iterator_wrapper{nullptr};
bool is_url_with_globs = urlWithGlobs(uri);
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
if (distributed_processing)
{
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses, "url");
if (num_streams > uri_descriptions.size())
num_streams = uri_descriptions.size();
/// For each uri (which acts like shard) check if it has failover options
uri_info->uri_list_to_read.reserve(uri_descriptions.size());
for (const auto & description : uri_descriptions)
uri_info->uri_list_to_read.emplace_back(parseRemoteDescription(description, 0, description.size(), '|', max_addresses, "url"));
Pipes pipes;
pipes.reserve(num_streams);
size_t download_threads = num_streams >= max_download_threads ? 1 : (max_download_threads / num_streams);
for (size_t i = 0; i < num_streams; ++i)
iterator_wrapper = std::make_shared<StorageURLSource::IteratorWrapper>(
[callback = local_context->getReadTaskCallback(), max_addresses]()
{
String next_uri = callback();
if (next_uri.empty())
return StorageURLSource::FailoverOptions{};
return getFailoverOptions(next_uri, max_addresses);
});
}
else if (is_url_with_globs)
{
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<StorageURLSource::DisclosedGlobIterator>(uri, max_addresses);
iterator_wrapper = std::make_shared<StorageURLSource::IteratorWrapper>([glob_iterator, max_addresses]()
{
pipes.emplace_back(std::make_shared<StorageURLSource>(
uri_info,
getReadMethod(),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
block_for_format,
local_context,
columns_description,
max_block_size,
getHTTPTimeouts(local_context),
compression_method,
download_threads,
headers,
params,
/* glob_url */ true));
}
return Pipe::unitePipes(std::move(pipes));
String next_uri = glob_iterator->next();
if (next_uri.empty())
return StorageURLSource::FailoverOptions{};
return getFailoverOptions(next_uri, max_addresses);
});
if (num_streams > glob_iterator->size())
num_streams = glob_iterator->size();
}
else
{
uri_info->uri_list_to_read.emplace_back(std::vector<String>{uri});
return Pipe(std::make_shared<StorageURLSource>(
uri_info,
iterator_wrapper = std::make_shared<StorageURLSource::IteratorWrapper>([&, max_addresses, done = false]() mutable
{
if (done)
return StorageURLSource::FailoverOptions{};
done = true;
return getFailoverOptions(uri, max_addresses);
});
num_streams = 1;
}
Pipes pipes;
pipes.reserve(num_streams);
size_t download_threads = num_streams >= max_download_threads ? 1 : (max_download_threads / num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageURLSource>(
requested_virtual_columns,
iterator_wrapper,
getReadMethod(),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
format_name,
@ -740,10 +738,13 @@ Pipe IStorageURLBase::read(
max_block_size,
getHTTPTimeouts(local_context),
compression_method,
max_download_threads,
download_threads,
headers,
params));
params,
is_url_with_globs));
}
return Pipe::unitePipes(std::move(pipes));
}
@ -771,11 +772,17 @@ Pipe StorageURLWithFailover::read(
auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size);
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
uri_info->uri_list_to_read.emplace_back(uri_options);
auto iterator_wrapper = std::make_shared<StorageURLSource::IteratorWrapper>([&, done = false]() mutable
{
if (done)
return StorageURLSource::FailoverOptions{};
done = true;
return uri_options;
});
auto pipe = Pipe(std::make_shared<StorageURLSource>(
uri_info,
std::vector<NameAndTypePair>{},
iterator_wrapper,
getReadMethod(),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
format_name,
@ -935,7 +942,8 @@ StorageURL::StorageURL(
const String & compression_method_,
const HTTPHeaderEntries & headers_,
const String & http_method_,
ASTPtr partition_by_)
ASTPtr partition_by_,
bool distributed_processing_)
: IStorageURLBase(
uri_,
context_,
@ -948,7 +956,8 @@ StorageURL::StorageURL(
compression_method_,
headers_,
http_method_,
partition_by_)
partition_by_,
distributed_processing_)
{
context_->getRemoteHostFilter().checkURL(Poco::URI(uri));
}

View File

@ -2,6 +2,7 @@
#include <Poco/URI.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/ISource.h>
#include <Formats/FormatSettings.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadWriteBufferFromHTTP.h>
@ -20,6 +21,7 @@ using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
struct ConnectionTimeouts;
class NamedCollection;
class PullingPipelineExecutor;
/**
* This class represents table engine for external urls.
@ -68,7 +70,8 @@ protected:
const String & compression_method_,
const HTTPHeaderEntries & headers_ = {},
const String & method_ = "",
ASTPtr partition_by = nullptr);
ASTPtr partition_by = nullptr,
bool distributed_processing_ = false);
String uri;
CompressionMethod compression_method;
@ -81,6 +84,7 @@ protected:
HTTPHeaderEntries headers;
String http_method; /// For insert can choose Put instead of default Post.
ASTPtr partition_by;
bool distributed_processing;
virtual std::string getReadMethod() const;
@ -131,6 +135,87 @@ private:
const ContextPtr & context);
};
class StorageURLSource : public ISource
{
using URIParams = std::vector<std::pair<String, String>>;
public:
class DisclosedGlobIterator
{
public:
DisclosedGlobIterator(const String & uri_, size_t max_addresses);
String next();
size_t size();
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
using FailoverOptions = std::vector<String>;
using IteratorWrapper = std::function<FailoverOptions()>;
StorageURLSource(
const std::vector<NameAndTypePair> & requested_virtual_columns_,
std::shared_ptr<IteratorWrapper> uri_iterator_,
const std::string & http_method,
std::function<void(std::ostream &)> callback,
const String & format,
const std::optional<FormatSettings> & format_settings,
String name_,
const Block & sample_block,
ContextPtr context,
const ColumnsDescription & columns,
UInt64 max_block_size,
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method,
size_t download_threads,
const HTTPHeaderEntries & headers_ = {},
const URIParams & params = {},
bool glob_url = false);
String getName() const override { return name; }
Chunk generate() override;
static void setCredentials(Poco::Net::HTTPBasicCredentials & credentials, const Poco::URI & request_uri);
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
static std::tuple<Poco::URI, SeekableReadBufferFactoryPtr> getFirstAvailableURIAndReadBuffer(
std::vector<String>::const_iterator & option,
const std::vector<String>::const_iterator & end,
ContextPtr context,
const URIParams & params,
const String & http_method,
std::function<void(std::ostream &)> callback,
const ConnectionTimeouts & timeouts,
Poco::Net::HTTPBasicCredentials & credentials,
const HTTPHeaderEntries & headers,
bool glob_url,
bool delay_initialization);
private:
using InitializeFunc = std::function<void(const FailoverOptions &)>;
InitializeFunc initialize;
String name;
std::vector<NameAndTypePair> requested_virtual_columns;
std::shared_ptr<IteratorWrapper> uri_iterator;
Poco::URI curr_uri;
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
Poco::Net::HTTPBasicCredentials credentials;
size_t total_size = 0;
UInt64 total_rows_approx_max = 0;
size_t total_rows_count_times = 0;
UInt64 total_rows_approx_accumulated = 0;
};
class StorageURLSink : public SinkToStorage
{
public:
@ -174,7 +259,8 @@ public:
const String & compression_method_,
const HTTPHeaderEntries & headers_ = {},
const String & method_ = "",
ASTPtr partition_by_ = nullptr);
ASTPtr partition_by_ = nullptr,
bool distributed_processing_ = false);
String getName() const override
{
@ -209,14 +295,14 @@ class StorageURLWithFailover final : public StorageURL
{
public:
StorageURLWithFailover(
const std::vector<String> & uri_options_,
const StorageID & table_id_,
const String & format_name_,
const std::optional<FormatSettings> & format_settings_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
const String & compression_method_);
const std::vector<String> & uri_options_,
const StorageID & table_id_,
const String & format_name_,
const std::optional<FormatSettings> & format_settings_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
const String & compression_method_);
Pipe read(
const Names & column_names,

View File

@ -0,0 +1,94 @@
#include "Interpreters/Context_fwd.h"
#include <Storages/StorageURLCluster.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Sources/RemoteSource.h>
#include <Parsers/queryToString.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageURL.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
#include <TableFunctions/TableFunctionURLCluster.h>
#include <memory>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
StorageURLCluster::StorageURLCluster(
ContextPtr context_,
const String & cluster_name_,
const String & uri_,
const String & format_,
const String & compression_method_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const StorageURL::Configuration & configuration_,
bool structure_argument_was_provided_)
: IStorageCluster(cluster_name_, table_id_, &Poco::Logger::get("StorageURLCluster (" + table_id_.table_name + ")"), structure_argument_was_provided_)
, uri(uri_)
{
context_->getRemoteHostFilter().checkURL(Poco::URI(uri));
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
{
auto columns = StorageURL::getTableStructureFromData(format_,
uri,
chooseCompressionMethod(Poco::URI(uri).getPath(), compression_method_),
configuration_.headers,
std::nullopt,
context_);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
}
void StorageURLCluster::addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context)
{
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query);
if (!expression_list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function urlCluster, got '{}'", queryToString(query));
TableFunctionURLCluster::addColumnsStructureToArguments(expression_list->children, structure, context);
}
RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(ASTPtr, const ContextPtr & context) const
{
auto iterator = std::make_shared<StorageURLSource::DisclosedGlobIterator>(uri, context->getSettingsRef().glob_expansion_max_elements);
auto callback = std::make_shared<TaskIterator>([iter = std::move(iterator)]() mutable -> String { return iter->next(); });
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
}
NamesAndTypesList StorageURLCluster::getVirtuals() const
{
return NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
}
}

View File

@ -0,0 +1,49 @@
#pragma once
#include "config.h"
#include <memory>
#include <optional>
#include <Client/Connection.h>
#include <Interpreters/Cluster.h>
#include <Storages/IStorageCluster.h>
#include <Storages/StorageURL.h>
namespace DB
{
class Context;
class StorageURLCluster : public IStorageCluster
{
public:
StorageURLCluster(
ContextPtr context_,
const String & cluster_name_,
const String & uri_,
const String & format_,
const String & compression_method_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const StorageURL::Configuration & configuration_,
bool structure_argument_was_provided_);
std::string getName() const override { return "URLCluster"; }
NamesAndTypesList getVirtuals() const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
private:
void addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context) override;
String uri;
String format_name;
String compression_method;
};
}

View File

@ -1,52 +0,0 @@
#include <Storages/addColumnsStructureToQueryWithClusterEngine.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/queryToString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query)
{
auto * select_query = query->as<ASTSelectQuery>();
if (!select_query || !select_query->tables())
return nullptr;
auto * tables = select_query->tables()->as<ASTTablesInSelectQuery>();
auto * table_expression = tables->children[0]->as<ASTTablesInSelectQueryElement>()->table_expression->as<ASTTableExpression>();
if (!table_expression->table_function)
return nullptr;
auto * table_function = table_expression->table_function->as<ASTFunction>();
return table_function->arguments->as<ASTExpressionList>();
}
void addColumnsStructureToQueryWithClusterEngine(ASTPtr & query, const String & structure, size_t max_arguments, const String & function_name)
{
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query);
if (!expression_list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function {}, got '{}'", function_name, queryToString(query));
auto structure_literal = std::make_shared<ASTLiteral>(structure);
if (expression_list->children.size() < 2 || expression_list->children.size() > max_arguments)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 2 to {} arguments in {} table functions, got {}",
function_name, max_arguments, expression_list->children.size());
if (expression_list->children.size() == 2 || expression_list->children.size() == max_arguments - 1)
{
auto format_literal = std::make_shared<ASTLiteral>("auto");
expression_list->children.push_back(format_literal);
}
expression_list->children.push_back(structure_literal);
}
}

View File

@ -1,14 +0,0 @@
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTExpressionList.h>
namespace DB
{
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query);
/// Add structure argument for queries with s3Cluster/hdfsCluster table function.
void addColumnsStructureToQueryWithClusterEngine(ASTPtr & query, const String & structure, size_t max_arguments, const String & function_name);
}

View File

@ -0,0 +1,29 @@
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/queryToString.h>
namespace DB
{
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query)
{
auto * select_query = query->as<ASTSelectQuery>();
if (!select_query || !select_query->tables())
return nullptr;
auto * tables = select_query->tables()->as<ASTTablesInSelectQuery>();
auto * table_expression = tables->children[0]->as<ASTTablesInSelectQueryElement>()->table_expression->as<ASTTableExpression>();
if (!table_expression->table_function)
return nullptr;
auto * table_function = table_expression->table_function->as<ASTFunction>();
return table_function->arguments->as<ASTExpressionList>();
}
}

View File

@ -0,0 +1,11 @@
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTExpressionList.h>
namespace DB
{
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query);
}

View File

@ -0,0 +1,72 @@
#pragma once
#include "config.h"
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionS3.h>
#include <Storages/StorageS3Cluster.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <Storages/checkAndGetLiteralArgument.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_GET;
extern const int LOGICAL_ERROR;
}
/// Base class for *Cluster table functions that require cluster_name for the first argument.
template <typename Base>
class ITableFunctionCluster : public Base
{
public:
String getName() const override = 0;
String getSignature() const override = 0;
static void addColumnsStructureToArguments(ASTs & args, const String & desired_structure, const ContextPtr & context)
{
if (args.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected empty list of arguments for {}Cluster table function", Base::name);
ASTPtr cluster_name_arg = args.front();
args.erase(args.begin());
Base::addColumnsStructureToArguments(args, desired_structure, context);
args.insert(args.begin(), cluster_name_arg);
}
protected:
void parseArguments(const ASTPtr & ast, ContextPtr context) override
{
/// Clone ast function, because we can modify its arguments like removing cluster_name
Base::parseArguments(ast->clone(), context);
}
void parseArgumentsImpl(ASTs & args, const ContextPtr & context) override
{
if (args.empty())
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "The signature of table function {} shall be the following:\n{}", getName(), getSignature());
/// Evaluate only first argument, everything else will be done Base class
args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context);
/// Cluster name is always the first
cluster_name = checkAndGetLiteralArgument<String>(args[0], "cluster_name");
if (!context->tryGetCluster(cluster_name))
throw Exception(ErrorCodes::BAD_GET, "Requested cluster '{}' not found", cluster_name);
/// Just cut the first arg (cluster_name) and try to parse other table function arguments as is
args.erase(args.begin());
Base::parseArgumentsImpl(args, context);
}
String cluster_name;
};
}

View File

@ -13,13 +13,9 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
template <typename Name, typename Storage, typename TableFunction>
class ITableFunctionDataLake : public ITableFunction
class ITableFunctionDataLake : public TableFunction
{
public:
static constexpr auto name = Name::name;
@ -33,11 +29,11 @@ protected:
ColumnsDescription /*cached_columns*/) const override
{
ColumnsDescription columns;
if (configuration.structure != "auto")
columns = parseColumnsListFromString(configuration.structure, context);
if (TableFunction::configuration.structure != "auto")
columns = parseColumnsListFromString(TableFunction::configuration.structure, context);
StoragePtr storage = std::make_shared<Storage>(
configuration, context, StorageID(getDatabaseName(), table_name),
TableFunction::configuration, context, StorageID(TableFunction::getDatabaseName(), table_name),
columns, ConstraintsDescription{}, String{}, std::nullopt);
storage->startup();
@ -48,34 +44,21 @@ protected:
ColumnsDescription getActualTableStructure(ContextPtr context) const override
{
if (configuration.structure == "auto")
if (TableFunction::configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
return Storage::getTableStructureFromData(configuration, std::nullopt, context);
context->checkAccess(TableFunction::getSourceAccessType());
return Storage::getTableStructureFromData(TableFunction::configuration, std::nullopt, context);
}
return parseColumnsListFromString(configuration.structure, context);
return parseColumnsListFromString(TableFunction::configuration.structure, context);
}
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override
{
ASTs & args_func = ast_function->children;
const auto message = fmt::format(
"The signature of table function '{}' could be the following:\n{}", getName(), TableFunction::signature);
if (args_func.size() != 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' must have arguments", getName());
auto & args = args_func.at(0)->children;
TableFunction::parseArgumentsImpl(message, args, context, configuration, false);
if (configuration.format == "auto")
configuration.format = "Parquet";
/// Set default format to Parquet if it's not specified in arguments.
TableFunction::configuration.format = "Parquet";
TableFunction::parseArguments(ast_function, context);
}
mutable typename Storage::Configuration configuration;
};
}

View File

@ -2,7 +2,6 @@
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Common/Exception.h>
@ -19,8 +18,8 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
void ITableFunctionFileLike::parseFirstArguments(const ASTPtr & arg, const ContextPtr &)
@ -47,9 +46,13 @@ void ITableFunctionFileLike::parseArguments(const ASTPtr & ast_function, Context
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table function '{}' must have arguments.", getName());
ASTs & args = args_func.at(0)->children;
parseArgumentsImpl(args, context);
}
if (args.empty())
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' requires at least 1 argument", getName());
void ITableFunctionFileLike::parseArgumentsImpl(ASTs & args, const ContextPtr & context)
{
if (args.empty() || args.size() > 4)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "The signature of table function {} shall be the following:\n{}", getName(), getSignature());
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
@ -62,26 +65,51 @@ void ITableFunctionFileLike::parseArguments(const ASTPtr & ast_function, Context
if (format == "auto")
format = getFormatFromFirstArgument();
if (args.size() <= 2)
return;
if (args.size() > 2)
{
structure = checkAndGetLiteralArgument<String>(args[2], "structure");
if (structure.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Table structure is empty for table function '{}'. If you want to use automatic schema inference, use 'auto'",
getName());
}
if (args.size() != 3 && args.size() != 4)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Table function '{}' requires 1, 2, 3 or 4 arguments: "
"filename, format (default auto), structure (default auto) and compression method (default auto)",
getName());
structure = checkAndGetLiteralArgument<String>(args[2], "structure");
if (structure.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Table structure is empty for table function '{}'. If you want to use automatic schema inference, use 'auto'",
ast_function->formatForErrorMessage());
if (args.size() == 4)
if (args.size() > 3)
compression_method = checkAndGetLiteralArgument<String>(args[3], "compression_method");
}
void ITableFunctionFileLike::addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr &)
{
if (args.empty() || args.size() > getMaxNumberOfArguments())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 1 to {} arguments in table function, got {}", getMaxNumberOfArguments(), args.size());
auto structure_literal = std::make_shared<ASTLiteral>(structure);
/// f(filename)
if (args.size() == 1)
{
/// Add format=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
/// f(filename, format)
else if (args.size() == 2)
{
args.push_back(structure_literal);
}
/// f(filename, format, 'auto')
else if (args.size() == 3)
{
args.back() = structure_literal;
}
/// f(filename, format, 'auto', compression)
else if (args.size() == 4)
{
args[args.size() - 2] = structure_literal;
}
}
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
ColumnsDescription columns;

View File

@ -9,19 +9,35 @@ class ColumnsDescription;
class Context;
/*
* function(source, format, structure[, compression_method]) - creates a temporary storage from formatted source
* function(source, [format, structure, compression_method]) - creates a temporary storage from formatted source
*/
class ITableFunctionFileLike : public ITableFunction
{
public:
static constexpr auto signature = " - filename\n"
" - filename, format\n"
" - filename, format, structure\n"
" - filename, format, structure, compression_method\n";
virtual String getSignature() const
{
return signature;
}
bool needStructureHint() const override { return structure == "auto"; }
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
bool supportsReadingSubsetOfColumns() override;
static size_t getMaxNumberOfArguments() { return 4; }
static void addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr &);
protected:
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
virtual void parseArgumentsImpl(ASTs & args, const ContextPtr & context);
virtual void parseFirstArguments(const ASTPtr & arg, const ContextPtr & context);
virtual String getFormatFromFirstArgument();

View File

@ -12,18 +12,28 @@ namespace DB
class Context;
/* hdfs(URI, format[, structure, compression]) - creates a temporary storage from hdfs files
/* hdfs(URI, [format, structure, compression]) - creates a temporary storage from hdfs files
*
*/
class TableFunctionHDFS : public ITableFunctionFileLike
{
public:
static constexpr auto name = "hdfs";
std::string getName() const override
static constexpr auto signature = " - uri\n"
" - uri, format\n"
" - uri, format, structure\n"
" - uri, format, structure, compression_method\n";
String getName() const override
{
return name;
}
String getSignature() const override
{
return signature;
}
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const override

View File

@ -2,86 +2,19 @@
#if USE_HDFS
#include <Storages/HDFS/StorageHDFSCluster.h>
#include <DataTypes/DataTypeString.h>
#include <Storages/HDFS/StorageHDFS.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <Interpreters/ClientInfo.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionHDFS.h>
#include <TableFunctions/TableFunctionHDFSCluster.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Access/Common/AccessFlags.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/IAST_fwd.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Storages/HDFS/StorageHDFSCluster.h>
#include <Storages/HDFS/StorageHDFS.h>
#include "registerTableFunctions.h"
#include <memory>
#include <thread>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_GET;
}
void TableFunctionHDFSCluster::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
auto ast_copy = ast_function->clone();
/// Parse args
ASTs & args_func = ast_copy->children;
if (args_func.size() != 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' must have arguments.", getName());
ASTs & args = args_func.at(0)->children;
if (args.size() < 2 || args.size() > 5)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"The signature of table function {} shall be the following:\n"
" - cluster, uri\n"
" - cluster, uri, format\n"
" - cluster, uri, format, structure\n"
" - cluster, uri, format, structure, compression_method",
getName());
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
/// This argument is always the first
cluster_name = checkAndGetLiteralArgument<String>(args[0], "cluster_name");
if (!context->tryGetCluster(cluster_name))
throw Exception(ErrorCodes::BAD_GET, "Requested cluster '{}' not found", cluster_name);
/// Just cut the first arg (cluster_name) and try to parse other table function arguments as is
args.erase(args.begin());
ITableFunctionFileLike::parseArguments(ast_copy, context);
}
ColumnsDescription TableFunctionHDFSCluster::getActualTableStructure(ContextPtr context) const
{
if (structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageHDFS::getTableStructureFromData(format, filename, compression_method, context);
}
return parseColumnsListFromString(structure, context);
}
StoragePtr TableFunctionHDFSCluster::getStorage(
const String & /*source*/, const String & /*format_*/, const ColumnsDescription & columns, ContextPtr context,
const std::string & table_name, const String & /*compression_method_*/) const
@ -106,9 +39,14 @@ StoragePtr TableFunctionHDFSCluster::getStorage(
{
storage = std::make_shared<StorageHDFSCluster>(
context,
cluster_name, filename, StorageID(getDatabaseName(), table_name),
format, columns, ConstraintsDescription{},
compression_method, structure != "auto");
cluster_name,
filename,
StorageID(getDatabaseName(), table_name),
format,
columns,
ConstraintsDescription{},
compression_method,
structure != "auto");
}
return storage;
}
@ -118,7 +56,6 @@ void registerTableFunctionHDFSCluster(TableFunctionFactory & factory)
factory.registerFunction<TableFunctionHDFSCluster>();
}
}
#endif

View File

@ -5,6 +5,8 @@
#if USE_HDFS
#include <TableFunctions/ITableFunctionFileLike.h>
#include <TableFunctions/TableFunctionHDFS.h>
#include <TableFunctions/ITableFunctionCluster.h>
namespace DB
@ -20,28 +22,31 @@ class Context;
* On worker node it asks initiator about next task to process, processes it.
* This is repeated until the tasks are finished.
*/
class TableFunctionHDFSCluster : public ITableFunctionFileLike
class TableFunctionHDFSCluster : public ITableFunctionCluster<TableFunctionHDFS>
{
public:
static constexpr auto name = "hdfsCluster";
std::string getName() const override
static constexpr auto signature = " - cluster_name, uri\n"
" - cluster_name, uri, format\n"
" - cluster_name, uri, format, structure\n"
" - cluster_name, uri, format, structure, compression_method\n";
String getName() const override
{
return name;
}
String getSignature() const override
{
return signature;
}
protected:
StoragePtr getStorage(
const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context,
const std::string & table_name, const String & compression_method_) const override;
const char * getStorageTypeName() const override { return "HDFSCluster"; }
AccessType getSourceAccessType() const override { return AccessType::HDFS; }
ColumnsDescription getActualTableStructure(ContextPtr) const override;
void parseArguments(const ASTPtr &, ContextPtr) override;
String cluster_name;
};
}

View File

@ -10,13 +10,14 @@
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Access/Common/AccessFlags.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageURL.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Formats/FormatFactory.h>
#include "registerTableFunctions.h"
#include <filesystem>
#include <boost/algorithm/string.hpp>
@ -27,29 +28,23 @@ namespace DB
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int LOGICAL_ERROR;
}
/// This is needed to avoid copy-pase. Because s3Cluster arguments only differ in additional argument (first) - cluster name
TableFunctionS3::ArgumentParseResult TableFunctionS3::parseArgumentsImpl(
const String & error_message,
ASTs & args,
ContextPtr context,
StorageS3::Configuration & s3_configuration,
bool get_format_from_file)
void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context)
{
ArgumentParseResult result;
if (auto named_collection = tryGetNamedCollectionWithOverrides(args, context))
{
StorageS3::processNamedCollectionResult(s3_configuration, *named_collection);
StorageS3::processNamedCollectionResult(configuration, *named_collection);
}
else
{
if (args.empty() || args.size() > 6)
throw Exception::createDeprecated(error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "The signature of table function {} shall be the following:\n{}", getName(), getSignature());
auto * header_it = StorageURL::collectHeaders(args, s3_configuration.headers_from_ast, context);
auto * header_it = StorageURL::collectHeaders(args, configuration.headers_from_ast, context);
if (header_it != args.end())
args.erase(header_it);
@ -136,54 +131,165 @@ TableFunctionS3::ArgumentParseResult TableFunctionS3::parseArgumentsImpl(
}
/// This argument is always the first
s3_configuration.url = S3::URI(checkAndGetLiteralArgument<String>(args[0], "url"));
configuration.url = S3::URI(checkAndGetLiteralArgument<String>(args[0], "url"));
if (args_to_idx.contains("format"))
{
s3_configuration.format = checkAndGetLiteralArgument<String>(args[args_to_idx["format"]], "format");
result.has_format_argument = true;
auto format = checkAndGetLiteralArgument<String>(args[args_to_idx["format"]], "format");
/// Set format to configuration only of it's not 'auto',
/// because we can have default format set in configuration.
if (format != "auto")
configuration.format = format;
}
if (args_to_idx.contains("structure"))
{
s3_configuration.structure = checkAndGetLiteralArgument<String>(args[args_to_idx["structure"]], "structure");
result.has_structure_argument = true;
}
configuration.structure = checkAndGetLiteralArgument<String>(args[args_to_idx["structure"]], "structure");
if (args_to_idx.contains("compression_method"))
s3_configuration.compression_method = checkAndGetLiteralArgument<String>(args[args_to_idx["compression_method"]], "compression_method");
configuration.compression_method = checkAndGetLiteralArgument<String>(args[args_to_idx["compression_method"]], "compression_method");
if (args_to_idx.contains("access_key_id"))
s3_configuration.auth_settings.access_key_id = checkAndGetLiteralArgument<String>(args[args_to_idx["access_key_id"]], "access_key_id");
configuration.auth_settings.access_key_id = checkAndGetLiteralArgument<String>(args[args_to_idx["access_key_id"]], "access_key_id");
if (args_to_idx.contains("secret_access_key"))
s3_configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(args[args_to_idx["secret_access_key"]], "secret_access_key");
configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(args[args_to_idx["secret_access_key"]], "secret_access_key");
s3_configuration.auth_settings.no_sign_request = no_sign_request;
configuration.auth_settings.no_sign_request = no_sign_request;
}
s3_configuration.keys = {s3_configuration.url.key};
configuration.keys = {configuration.url.key};
/// For DataLake table functions, we should specify default format.
if (s3_configuration.format == "auto" && get_format_from_file)
s3_configuration.format = FormatFactory::instance().getFormatFromFileName(s3_configuration.url.uri.getPath(), true);
return result;
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.url.uri.getPath(), true);
}
void TableFunctionS3::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
/// Clone ast function, because we can modify its arguments like removing headers.
auto ast_copy = ast_function->clone();
/// Parse args
ASTs & args_func = ast_function->children;
const auto message = fmt::format("The signature of table function '{}' could be the following:\n{}", getName(), signature);
if (args_func.size() != 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' must have arguments.", getName());
auto & args = args_func.at(0)->children;
parseArgumentsImpl(message, args, context, configuration);
parseArgumentsImpl(args, context);
}
void TableFunctionS3::addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr & context)
{
if (tryGetNamedCollectionWithOverrides(args, context))
{
/// In case of named collection, just add key-value pair "structure='...'"
/// at the end of arguments to override existed structure.
ASTs equal_func_args = {std::make_shared<ASTIdentifier>("structure"), std::make_shared<ASTLiteral>(structure)};
auto equal_func = makeASTFunction("equals", std::move(equal_func_args));
args.push_back(equal_func);
}
else
{
/// If arguments contain headers, just remove it and add to the end of arguments later
/// (header argument can be at any position).
HTTPHeaderEntries tmp_headers;
auto * headers_it = StorageURL::collectHeaders(args, tmp_headers, context);
ASTPtr headers_ast;
if (headers_it != args.end())
{
headers_ast = *headers_it;
args.erase(headers_it);
}
if (args.empty() || args.size() > getMaxNumberOfArguments())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 1 to {} arguments in table function, got {}", getMaxNumberOfArguments(), args.size());
auto structure_literal = std::make_shared<ASTLiteral>(structure);
/// s3(s3_url)
if (args.size() == 1)
{
/// Add format=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
/// s3(s3_url, format) or s3(s3_url, NOSIGN)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or not.
else if (args.size() == 2)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN");
/// If there is NOSIGN, add format=auto before structure.
if (boost::iequals(second_arg, "NOSIGN"))
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
/// s3(source, format, structure) or
/// s3(source, access_key_id, access_key_id) or
/// s3(source, NOSIGN, format)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN, format name or neither.
else if (args.size() == 3)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
{
args.push_back(structure_literal);
}
else if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
{
args.back() = structure_literal;
}
else
{
/// Add format=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
}
/// s3(source, format, structure, compression_method) or
/// s3(source, access_key_id, access_key_id, format) or
/// s3(source, NOSIGN, format, structure)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN, format name or neither.
else if (args.size() == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
{
args.back() = structure_literal;
}
else if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
{
args[args.size() - 2] = structure_literal;
}
else
{
args.push_back(structure_literal);
}
}
/// s3(source, access_key_id, access_key_id, format, structure) or
/// s3(source, NOSIGN, format, structure, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN keyword name or not.
else if (args.size() == 5)
{
auto sedond_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN");
if (boost::iequals(sedond_arg, "NOSIGN"))
{
args[args.size() - 2] = structure_literal;
}
else
{
args.back() = structure_literal;
}
}
/// s3(source, access_key_id, access_key_id, format, structure, compression)
else if (args.size() == 6)
{
args[args.size() - 2] = structure_literal;
}
if (headers_ast)
args.push_back(headers_ast);
}
}
ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context) const
@ -286,6 +392,7 @@ void registerTableFunctionS3(TableFunctionFactory & factory)
.allow_readonly = false});
}
void registerTableFunctionCOS(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionCOS>();

View File

@ -13,7 +13,7 @@ namespace DB
class Context;
/* s3(source, [access_key_id, secret_access_key,] format, structure[, compression]) - creates a temporary storage for a file in S3.
/* s3(source, [access_key_id, secret_access_key,] [format, structure, compression]) - creates a temporary storage for a file in S3.
*/
class TableFunctionS3 : public ITableFunction
{
@ -26,11 +26,21 @@ public:
" - url, format, structure, compression_method\n"
" - url, access_key_id, secret_access_key, format\n"
" - url, access_key_id, secret_access_key, format, structure\n"
" - url, access_key_id, secret_access_key, format, structure, compression_method";
std::string getName() const override
" - url, access_key_id, secret_access_key, format, structure, compression_method\n"
"All signatures supports optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
static size_t getMaxNumberOfArguments() { return 6; }
String getName() const override
{
return name;
}
virtual String getSignature() const
{
return signature;
}
bool hasStaticStructure() const override { return configuration.structure != "auto"; }
bool needStructureHint() const override { return configuration.structure == "auto"; }
@ -44,18 +54,9 @@ public:
return {"_path", "_file"};
}
struct ArgumentParseResult
{
bool has_format_argument = false;
bool has_structure_argument = false;
};
virtual void parseArgumentsImpl(ASTs & args, const ContextPtr & context);
static ArgumentParseResult parseArgumentsImpl(
const String & error_message,
ASTs & args,
ContextPtr context,
StorageS3::Configuration & configuration,
bool get_format_from_file = true);
static void addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr & context);
protected:

View File

@ -2,100 +2,28 @@
#if USE_AWS_S3
#include <Storages/StorageS3.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <DataTypes/DataTypeString.h>
#include <IO/S3Common.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <Interpreters/ClientInfo.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionS3.h>
#include <TableFunctions/TableFunctionS3Cluster.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Access/Common/AccessFlags.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/StorageS3.h>
#include "registerTableFunctions.h"
#include <memory>
#include <thread>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_GET;
}
void TableFunctionS3Cluster::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
/// Parse args
ASTs & args_func = ast_function->children;
if (args_func.size() != 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' must have arguments.", getName());
ASTs & args = args_func.at(0)->children;
constexpr auto fmt_string = "The signature of table function {} could be the following:\n"
" - cluster, url\n"
" - cluster, url, format\n"
" - cluster, url, format, structure\n"
" - cluster, url, access_key_id, secret_access_key\n"
" - cluster, url, format, structure, compression_method\n"
" - cluster, url, access_key_id, secret_access_key, format\n"
" - cluster, url, access_key_id, secret_access_key, format, structure\n"
" - cluster, url, access_key_id, secret_access_key, format, structure, compression_method";
auto message = PreformattedMessage{fmt::format(fmt_string, getName()), fmt_string};
if (args.size() < 2 || args.size() > 7)
throw Exception::createDeprecated(message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
/// evaluate only first argument, everything else will be done TableFunctionS3
args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context);
/// Cluster name is always the first
configuration.cluster_name = checkAndGetLiteralArgument<String>(args[0], "cluster_name");
if (!context->tryGetCluster(configuration.cluster_name))
throw Exception(ErrorCodes::BAD_GET, "Requested cluster '{}' not found", configuration.cluster_name);
/// Just cut the first arg (cluster_name) and try to parse s3 table function arguments as is
ASTs clipped_args;
clipped_args.reserve(args.size() - 1);
std::copy(args.begin() + 1, args.end(), std::back_inserter(clipped_args));
/// StorageS3ClusterConfiguration inherints from StorageS3::Configuration, so it is safe to upcast it.
argument_parse_result = TableFunctionS3::parseArgumentsImpl(message.text, clipped_args, context, static_cast<StorageS3::Configuration &>(configuration));
}
ColumnsDescription TableFunctionS3Cluster::getActualTableStructure(ContextPtr context) const
{
context->checkAccess(getSourceAccessType());
configuration.update(context);
if (configuration.structure == "auto")
return StorageS3::getTableStructureFromData(configuration, std::nullopt, context);
return parseColumnsListFromString(configuration.structure, context);
}
StoragePtr TableFunctionS3Cluster::executeImpl(
const ASTPtr & /*function*/, ContextPtr context,
const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
StoragePtr storage;
ColumnsDescription columns;
bool structure_argument_was_provided = configuration.structure != "auto";
if (argument_parse_result.has_structure_argument)
if (structure_argument_was_provided)
{
columns = parseColumnsListFromString(configuration.structure, context);
}
@ -120,13 +48,13 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
else
{
storage = std::make_shared<StorageS3Cluster>(
cluster_name,
configuration,
StorageID(getDatabaseName(), table_name),
columns,
ConstraintsDescription{},
context,
argument_parse_result.has_structure_argument,
argument_parse_result.has_format_argument);
structure_argument_was_provided);
}
storage->startup();

View File

@ -6,6 +6,7 @@
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionS3.h>
#include <TableFunctions/ITableFunctionCluster.h>
#include <Storages/StorageS3Cluster.h>
@ -15,27 +16,36 @@ namespace DB
class Context;
/**
* s3cluster(cluster_name, source, [access_key_id, secret_access_key,] format, structure)
* s3cluster(cluster_name, source, [access_key_id, secret_access_key,] format, structure, compression_method)
* A table function, which allows to process many files from S3 on a specific cluster
* On initiator it creates a connection to _all_ nodes in cluster, discloses asterisks
* in S3 file path and dispatch each file dynamically.
* On worker node it asks initiator about next task to process, processes it.
* This is repeated until the tasks are finished.
*/
class TableFunctionS3Cluster : public ITableFunction
class TableFunctionS3Cluster : public ITableFunctionCluster<TableFunctionS3>
{
public:
static constexpr auto name = "s3Cluster";
std::string getName() const override
static constexpr auto signature = " - cluster, url\n"
" - cluster, url, format\n"
" - cluster, url, format, structure\n"
" - cluster, url, access_key_id, secret_access_key\n"
" - cluster, url, format, structure, compression_method\n"
" - cluster, url, access_key_id, secret_access_key, format\n"
" - cluster, url, access_key_id, secret_access_key, format, structure\n"
" - cluster, url, access_key_id, secret_access_key, format, structure, compression_method\n"
"All signatures supports optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
String getName() const override
{
return name;
}
bool hasStaticStructure() const override { return configuration.structure != "auto"; }
bool needStructureHint() const override { return configuration.structure == "auto"; }
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
String getSignature() const override
{
return signature;
}
protected:
StoragePtr executeImpl(
@ -45,15 +55,6 @@ protected:
ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "S3Cluster"; }
AccessType getSourceAccessType() const override { return AccessType::S3; }
ColumnsDescription getActualTableStructure(ContextPtr) const override;
void parseArguments(const ASTPtr &, ContextPtr) override;
mutable StorageS3Cluster::Configuration configuration;
ColumnsDescription structure_hint;
TableFunctionS3::ArgumentParseResult argument_parse_result;
};
}

View File

@ -12,21 +12,12 @@
#include <Analyzer/FunctionNode.h>
#include <Analyzer/TableFunctionNode.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Interpreters/Context.h>
#include <Formats/FormatFactory.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromVector.h>
namespace DB
{
static const String bad_arguments_error_message = "Table function URL can have the following arguments: "
"url, name of used format (taken from file extension by default), "
"optional table structure, optional compression method, "
"optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
std::vector<size_t> TableFunctionURL::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const
{
@ -48,15 +39,13 @@ std::vector<size_t> TableFunctionURL::skipAnalysisForArguments(const QueryTreeNo
void TableFunctionURL::parseArguments(const ASTPtr & ast, ContextPtr context)
{
const auto & ast_function = assert_cast<const ASTFunction *>(ast.get());
/// Clone ast function, because we can modify it's arguments like removing headers.
ITableFunctionFileLike::parseArguments(ast->clone(), context);
}
const auto & args = ast_function->children;
if (args.empty())
throw Exception::createDeprecated(bad_arguments_error_message, ErrorCodes::BAD_ARGUMENTS);
auto & url_function_args = assert_cast<ASTExpressionList *>(args[0].get())->children;
if (auto named_collection = tryGetNamedCollectionWithOverrides(url_function_args, context))
void TableFunctionURL::parseArgumentsImpl(ASTs & args, const ContextPtr & context)
{
if (auto named_collection = tryGetNamedCollectionWithOverrides(args, context))
{
StorageURL::processNamedCollectionResult(configuration, *named_collection);
@ -68,16 +57,46 @@ void TableFunctionURL::parseArguments(const ASTPtr & ast, ContextPtr context)
if (format == "auto")
format = FormatFactory::instance().getFormatFromFileName(Poco::URI(filename).getPath(), true);
StorageURL::collectHeaders(url_function_args, configuration.headers, context);
StorageURL::collectHeaders(args, configuration.headers, context);
}
else
{
auto * headers_it = StorageURL::collectHeaders(url_function_args, configuration.headers, context);
auto * headers_it = StorageURL::collectHeaders(args, configuration.headers, context);
/// ITableFunctionFileLike cannot parse headers argument, so remove it.
if (headers_it != url_function_args.end())
url_function_args.erase(headers_it);
if (headers_it != args.end())
args.erase(headers_it);
ITableFunctionFileLike::parseArguments(ast, context);
ITableFunctionFileLike::parseArgumentsImpl(args, context);
}
}
void TableFunctionURL::addColumnsStructureToArguments(ASTs & args, const String & desired_structure, const ContextPtr & context)
{
if (tryGetNamedCollectionWithOverrides(args, context))
{
/// In case of named collection, just add key-value pair "structure='...'"
/// at the end of arguments to override existed structure.
ASTs equal_func_args = {std::make_shared<ASTIdentifier>("structure"), std::make_shared<ASTLiteral>(desired_structure)};
auto equal_func = makeASTFunction("equals", std::move(equal_func_args));
args.push_back(equal_func);
}
else
{
/// If arguments contain headers, just remove it and add to the end of arguments later
/// (header argument can be at any position).
HTTPHeaderEntries tmp_headers;
auto * headers_it = StorageURL::collectHeaders(args, tmp_headers, context);
ASTPtr headers_ast;
if (headers_it != args.end())
{
headers_ast = *headers_it;
args.erase(headers_it);
}
ITableFunctionFileLike::addColumnsStructureToArguments(args, desired_structure, context);
if (headers_ast)
args.push_back(headers_ast);
}
}

View File

@ -10,24 +10,41 @@ namespace DB
class Context;
/* url(source, format[, structure, compression]) - creates a temporary storage from url.
/* url(source, [format, structure, compression]) - creates a temporary storage from url.
*/
class TableFunctionURL final: public ITableFunctionFileLike
class TableFunctionURL : public ITableFunctionFileLike
{
public:
static constexpr auto name = "url";
std::string getName() const override
static constexpr auto signature = " - uri\n"
" - uri, format\n"
" - uri, format, structure\n"
" - uri, format, structure, compression_method\n"
"All signatures supports optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
String getName() const override
{
return name;
}
String getSignature() const override
{
return signature;
}
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
static void addColumnsStructureToArguments(ASTs & args, const String & desired_structure, const ContextPtr & context);
protected:
void parseArguments(const ASTPtr & ast, ContextPtr context) override;
void parseArgumentsImpl(ASTs & args, const ContextPtr & context) override;
StorageURL::Configuration configuration;
private:
std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override;
void parseArguments(const ASTPtr & ast, ContextPtr context) override;
StoragePtr getStorage(
const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context,
const std::string & table_name, const String & compression_method_) const override;
@ -36,7 +53,6 @@ private:
String getFormatFromFirstArgument() override;
StorageURL::Configuration configuration;
};
}

View File

@ -0,0 +1,54 @@
#include <TableFunctions/TableFunctionURLCluster.h>
#include <TableFunctions/TableFunctionFactory.h>
#include "registerTableFunctions.h"
namespace DB
{
StoragePtr TableFunctionURLCluster::getStorage(
const String & /*source*/, const String & /*format_*/, const ColumnsDescription & columns, ContextPtr context,
const std::string & table_name, const String & /*compression_method_*/) const
{
StoragePtr storage;
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{
//On worker node this uri won't contain globs
storage = std::make_shared<StorageURL>(
filename,
StorageID(getDatabaseName(), table_name),
format,
std::nullopt /*format settings*/,
columns,
ConstraintsDescription{},
String{},
context,
compression_method,
configuration.headers,
configuration.http_method,
nullptr,
/*distributed_processing=*/ true);
}
else
{
storage = std::make_shared<StorageURLCluster>(
context,
cluster_name,
filename,
format,
compression_method,
StorageID(getDatabaseName(), table_name),
getActualTableStructure(context),
ConstraintsDescription{},
configuration,
structure != "auto");
}
return storage;
}
void registerTableFunctionURLCluster(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionURLCluster>();
}
}

View File

@ -0,0 +1,52 @@
#pragma once
#include <TableFunctions/ITableFunctionFileLike.h>
#include <TableFunctions/TableFunctionURL.h>
#include <TableFunctions/ITableFunctionCluster.h>
#include <Storages/StorageURL.h>
#include <Storages/StorageURLCluster.h>
#include <IO/ReadWriteBufferFromHTTP.h>
namespace DB
{
class Context;
/**
* urlCluster(cluster, URI, format, structure, compression_method)
* A table function, which allows to process many files from url on a specific cluster
* On initiator it creates a connection to _all_ nodes in cluster, discloses asterics
* in url file path and dispatch each file dynamically.
* On worker node it asks initiator about next task to process, processes it.
* This is repeated until the tasks are finished.
*/
class TableFunctionURLCluster : public ITableFunctionCluster<TableFunctionURL>
{
public:
static constexpr auto name = "urlCluster";
static constexpr auto signature = " - cluster, uri\n"
" - cluster, uri, format\n"
" - cluster, uri, format, structure\n"
" - cluster, uri, format, structure, compression_method\n"
"All signatures supports optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
String getName() const override
{
return name;
}
String getSignature() const override
{
return signature;
}
protected:
StoragePtr getStorage(
const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context,
const std::string & table_name, const String & compression_method_) const override;
const char * getStorageTypeName() const override { return "URLCluster"; }
};
}

View File

@ -16,6 +16,7 @@ void registerTableFunctions()
registerTableFunctionExecutable(factory);
registerTableFunctionFile(factory);
registerTableFunctionURL(factory);
registerTableFunctionURLCluster(factory);
registerTableFunctionValues(factory);
registerTableFunctionInput(factory);
registerTableFunctionGenerate(factory);

View File

@ -13,6 +13,7 @@ void registerTableFunctionZeros(TableFunctionFactory & factory);
void registerTableFunctionExecutable(TableFunctionFactory & factory);
void registerTableFunctionFile(TableFunctionFactory & factory);
void registerTableFunctionURL(TableFunctionFactory & factory);
void registerTableFunctionURLCluster(TableFunctionFactory & factory);
void registerTableFunctionValues(TableFunctionFactory & factory);
void registerTableFunctionInput(TableFunctionFactory & factory);
void registerTableFunctionGenerate(TableFunctionFactory & factory);

View File

@ -137,3 +137,5 @@
01600_parts_types_metrics_long
01287_max_execution_speed
02703_row_policy_for_database
02721_url_cluster
02534_s3_cluster_insert_select_schema_inference

View File

@ -0,0 +1,10 @@
<clickhouse>
<named_collections>
<test_s3>
<url>http://minio1:9001/root/data/{clickhouse,database}/*</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<format>CSV</format>>
</test_s3>
</named_collections>
</clickhouse>

View File

@ -67,20 +67,20 @@ def started_cluster():
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"s0_0_0",
main_configs=["configs/cluster.xml"],
main_configs=["configs/cluster.xml", "configs/named_collections.xml"],
macros={"replica": "node1", "shard": "shard1"},
with_minio=True,
with_zookeeper=True,
)
cluster.add_instance(
"s0_0_1",
main_configs=["configs/cluster.xml"],
main_configs=["configs/cluster.xml", "configs/named_collections.xml"],
macros={"replica": "replica2", "shard": "shard1"},
with_zookeeper=True,
)
cluster.add_instance(
"s0_1_0",
main_configs=["configs/cluster.xml"],
main_configs=["configs/cluster.xml", "configs/named_collections.xml"],
macros={"replica": "replica1", "shard": "shard2"},
with_zookeeper=True,
)
@ -406,3 +406,21 @@ def test_cluster_with_header(started_cluster):
)
== "SomeValue\n"
)
def test_cluster_with_named_collection(started_cluster):
node = started_cluster.instances["s0_0_0"]
pure_s3 = node.query("""SELECT * from s3(test_s3) ORDER BY (c1, c2, c3)""")
s3_cluster = node.query(
"""SELECT * from s3Cluster(cluster_simple, test_s3) ORDER BY (c1, c2, c3)"""
)
assert TSV(pure_s3) == TSV(s3_cluster)
s3_cluster = node.query(
"""SELECT * from s3Cluster(cluster_simple, test_s3, structure='auto') ORDER BY (c1, c2, c3)"""
)
assert TSV(pure_s3) == TSV(s3_cluster)

View File

@ -0,0 +1,8 @@
<clickhouse>
<named_collections>
<test_url>
<url>http://nginx:80/test_1</url>
<format>TSV</format>>
</test_url>
</named_collections>
</clickhouse>

View File

@ -4,7 +4,9 @@ from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1", main_configs=["configs/conf.xml"], with_nginx=True
"node1",
main_configs=["configs/conf.xml", "configs/named_collections.xml"],
with_nginx=True,
)
@ -35,6 +37,33 @@ def test_partition_by():
assert result.strip() == "1\t2\t3"
def test_url_cluster():
result = node1.query(
f"select * from urlCluster('test_cluster_two_shards', 'http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')"
)
assert result.strip() == "3\t2\t1"
result = node1.query(
f"select * from urlCluster('test_cluster_two_shards', 'http://nginx:80/test_2', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')"
)
assert result.strip() == "1\t3\t2"
result = node1.query(
f"select * from urlCluster('test_cluster_two_shards', 'http://nginx:80/test_3', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')"
)
assert result.strip() == "1\t2\t3"
def test_url_cluster_with_named_collection():
result = node1.query(
f"select * from urlCluster(test_cluster_one_shard_three_replicas_localhost, test_url)"
)
assert result.strip() == "3\t2\t1"
result = node1.query(
f"select * from urlCluster(test_cluster_one_shard_three_replicas_localhost, test_url, structure='auto')"
)
assert result.strip() == "3\t2\t1"
def test_table_function_url_access_rights():
node1.query("CREATE USER OR REPLACE u1")

View File

@ -1,6 +1,12 @@
SET send_logs_level = 'fatal';
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS no_order;
CREATE TABLE no_order(a UInt32, b UInt32) ENGINE = MergeTree ORDER BY tuple();
ALTER TABLE no_order MODIFY ORDER BY (a); -- { serverError 36}
DROP TABLE no_order;
DROP TABLE IF EXISTS old_style;
set allow_deprecated_syntax_for_merge_tree=1;
CREATE TABLE old_style(d Date, x UInt32) ENGINE MergeTree(d, x, 8192);

View File

@ -2,5 +2,5 @@
-- Please help shorten this list down to zero elements.
SELECT name FROM system.table_functions WHERE length(description) < 10
AND name NOT IN (
'cosn', 'oss', 'hdfs', 'hdfsCluster', 'hive', 'mysql', 'postgresql', 's3', 's3Cluster', 'sqlite' -- these functions are not enabled in fast test
'cosn', 'oss', 'hdfs', 'hdfsCluster', 'hive', 'mysql', 'postgresql', 's3', 's3Cluster', 'sqlite', 'urlCluster' -- these functions are not enabled in fast test
) ORDER BY name;

View File

@ -10,6 +10,138 @@ c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
0 0 0
0 0 0
1 2 3

View File

@ -5,9 +5,34 @@ desc s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localh
desc s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'TSV');
desc s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'test', 'testtest');
desc s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'test', 'testtest', 'TSV');
desc s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'auto');
desc s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'TSV', 'auto');
desc s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'TSV', 'auto', 'auto');
desc s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'test', 'testtest', 'auto');
desc s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'test', 'testtest', 'TSV', 'auto');
desc s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'test', 'testtest', 'TSV', 'auto', 'auto');
desc s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', NOSIGN);
desc s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', NOSIGN, 'TSV');
desc s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', NOSIGN, 'TSV', 'auto');
desc s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', NOSIGN, 'TSV', 'auto', 'auto');
desc s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', headers(MyCustomHeader = 'SomeValue'));
desc s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'TSV', 'auto', headers(MyCustomHeader = 'SomeValue'), 'auto');
select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv') order by c1, c2, c3;
select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'TSV') order by c1, c2, c3;
select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'test', 'testtest') order by c1, c2, c3;
select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'test', 'testtest', 'TSV') order by c1, c2, c3;
select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'auto') order by c1, c2, c3;
select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'TSV', 'auto') order by c1, c2, c3;
select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'TSV', 'auto', 'auto') order by c1, c2, c3;
select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'test', 'testtest', 'auto') order by c1, c2, c3;
select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'test', 'testtest', 'TSV', 'auto') order by c1, c2, c3;
select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'test', 'testtest', 'TSV', 'auto', 'auto') order by c1, c2, c3;
select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', NOSIGN) order by c1, c2, c3;
select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', NOSIGN, 'TSV') order by c1, c2, c3;
select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', NOSIGN, 'TSV', 'auto') order by c1, c2, c3;
select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', NOSIGN, 'TSV', 'auto', 'auto') order by c1, c2, c3;
select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', headers(MyCustomHeader = 'SomeValue')) order by c1, c2, c3;
select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'TSV', 'auto', headers(MyCustomHeader = 'SomeValue'), 'auto') order by c1, c2, c3;

View File

@ -0,0 +1,136 @@
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 UInt64
c2 UInt64
c3 UInt64
c1 UInt64
c2 UInt64
c3 UInt64
12
12
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
c1 Nullable(Int64)
c2 Nullable(Int64)
c3 Nullable(Int64)
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
1 2 3
4 5 6
7 8 9
0 0 0

View File

@ -0,0 +1,40 @@
-- Tags: no-fasttest
-- Tag no-fasttest: Depends on AWS
select * from urlCluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv') ORDER BY c1, c2, c3;
select * from urlCluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'TSV') ORDER BY c1, c2, c3;
select * from urlCluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64') ORDER BY c1, c2, c3;
select * from urlCluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64', 'auto') ORDER BY c1, c2, c3;
desc urlCluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv');
desc urlCluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'TSV');
desc urlCluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64');
desc urlCluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64', 'auto');
select COUNT() from urlCluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv');
select COUNT(*) from urlCluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv');
desc urlCluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv');
desc urlCluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'TSV');
desc urlCluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'auto');
desc urlCluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'TSV', 'auto');
desc urlCluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'TSV', 'auto', 'auto');
desc urlCluster('test_cluster_one_shard_three_replicas_localhost', headers('X-ClickHouse-Database'='default'), 'http://localhost:11111/test/{a,b}.tsv');
desc urlCluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', headers('X-ClickHouse-Database'='default'), 'TSV');
desc urlCluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'auto', headers('X-ClickHouse-Database'='default'));
desc urlCluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'TSV', 'auto', headers('X-ClickHouse-Database'='default'));
desc urlCluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'TSV', headers('X-ClickHouse-Database'='default'), 'auto', 'auto');
select * from urlCluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv') order by c1, c2, c3;
select * from urlCluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'TSV') order by c1, c2, c3;
select * from urlCluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'auto') order by c1, c2, c3;
select * from urlCluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'TSV', 'auto') order by c1, c2, c3;
select * from urlCluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/{a,b}.tsv', 'TSV', 'auto', 'auto') order by c1, c2, c3;
drop table if exists test;
create table test (x UInt32, y UInt32, z UInt32) engine=Memory();
insert into test select * from urlCluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/a.tsv', 'TSV');
select * from test;
drop table test;