Merge pull request #23051 from excitoon-favorites/s3partitionedwrite

This commit is contained in:
Vladimir C 2021-08-24 10:58:43 +03:00 committed by GitHub
commit 5f4ca42d15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 499 additions and 112 deletions

View File

@ -393,6 +393,9 @@ function run_tests
01853_s2_cells_intersect
01854_s2_cap_contains
01854_s2_cap_union
# needs s3
01944_insert_partition_by
)
time clickhouse-test --hung-check -j 8 --order=random --use-skip-list \

131
src/Common/isValidUTF8.cpp Normal file
View File

@ -0,0 +1,131 @@
#include <Common/isValidUTF8.h>
#include <cstring>
/// inspired by https://github.com/cyb70289/utf8/
/*
MIT License
Copyright (c) 2019 Yibo Cai
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
/*
* http://www.unicode.org/versions/Unicode6.0.0/ch03.pdf - page 94
*
* Table 3-7. Well-Formed UTF-8 Byte Sequences
*
* +--------------------+------------+-------------+------------+-------------+
* | Code Points | First Byte | Second Byte | Third Byte | Fourth Byte |
* +--------------------+------------+-------------+------------+-------------+
* | U+0000..U+007F | 00..7F | | | |
* +--------------------+------------+-------------+------------+-------------+
* | U+0080..U+07FF | C2..DF | 80..BF | | |
* +--------------------+------------+-------------+------------+-------------+
* | U+0800..U+0FFF | E0 | A0..BF | 80..BF | |
* +--------------------+------------+-------------+------------+-------------+
* | U+1000..U+CFFF | E1..EC | 80..BF | 80..BF | |
* +--------------------+------------+-------------+------------+-------------+
* | U+D000..U+D7FF | ED | 80..9F | 80..BF | |
* +--------------------+------------+-------------+------------+-------------+
* | U+E000..U+FFFF | EE..EF | 80..BF | 80..BF | |
* +--------------------+------------+-------------+------------+-------------+
* | U+10000..U+3FFFF | F0 | 90..BF | 80..BF | 80..BF |
* +--------------------+------------+-------------+------------+-------------+
* | U+40000..U+FFFFF | F1..F3 | 80..BF | 80..BF | 80..BF |
* +--------------------+------------+-------------+------------+-------------+
* | U+100000..U+10FFFF | F4 | 80..8F | 80..BF | 80..BF |
* +--------------------+------------+-------------+------------+-------------+
*/
namespace DB
{
namespace UTF8
{
UInt8 isValidUTF8(const UInt8 * data, UInt64 len)
{
while (len)
{
int bytes;
const UInt8 byte1 = data[0];
/* 00..7F */
if (byte1 <= 0x7F)
{
bytes = 1;
}
/* C2..DF, 80..BF */
else if (len >= 2 && byte1 >= 0xC2 && byte1 <= 0xDF && static_cast<Int8>(data[1]) <= static_cast<Int8>(0xBF))
{
bytes = 2;
}
else if (len >= 3)
{
const UInt8 byte2 = data[1];
bool byte2_ok = static_cast<Int8>(byte2) <= static_cast<Int8>(0xBF);
bool byte3_ok = static_cast<Int8>(data[2]) <= static_cast<Int8>(0xBF);
if (byte2_ok && byte3_ok &&
/* E0, A0..BF, 80..BF */
((byte1 == 0xE0 && byte2 >= 0xA0) ||
/* E1..EC, 80..BF, 80..BF */
(byte1 >= 0xE1 && byte1 <= 0xEC) ||
/* ED, 80..9F, 80..BF */
(byte1 == 0xED && byte2 <= 0x9F) ||
/* EE..EF, 80..BF, 80..BF */
(byte1 >= 0xEE && byte1 <= 0xEF)))
{
bytes = 3;
}
else if (len >= 4)
{
bool byte4_ok = static_cast<Int8>(data[3]) <= static_cast<Int8>(0xBF);
if (byte2_ok && byte3_ok && byte4_ok &&
/* F0, 90..BF, 80..BF, 80..BF */
((byte1 == 0xF0 && byte2 >= 0x90) ||
/* F1..F3, 80..BF, 80..BF, 80..BF */
(byte1 >= 0xF1 && byte1 <= 0xF3) ||
/* F4, 80..8F, 80..BF, 80..BF */
(byte1 == 0xF4 && byte2 <= 0x8F)))
{
bytes = 4;
}
else
{
return false;
}
}
else
{
return false;
}
}
else
{
return false;
}
len -= bytes;
data += bytes;
}
return true;
}
}
}

10
src/Common/isValidUTF8.h Normal file
View File

@ -0,0 +1,10 @@
#pragma once
#include <common/types.h>
namespace DB::UTF8
{
UInt8 isValidUTF8(const UInt8 * data, UInt64 len);
}

View File

@ -116,6 +116,7 @@ SRCS(
hasLinuxCapability.cpp
hex.cpp
isLocalAddress.cpp
isValidUTF8.cpp
malloc.cpp
memory.cpp
new_delete.cpp

View File

@ -1,14 +1,7 @@
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionStringOrArrayToT.h>
#include <cstring>
#ifdef __SSE4_1__
# include <emmintrin.h>
# include <smmintrin.h>
# include <tmmintrin.h>
#endif
#include <Common/isValidUTF8.h>
namespace DB
{
@ -71,75 +64,8 @@ SOFTWARE.
* +--------------------+------------+-------------+------------+-------------+
*/
static inline UInt8 isValidUTF8Naive(const UInt8 * data, UInt64 len)
{
while (len)
{
int bytes;
const UInt8 byte1 = data[0];
/* 00..7F */
if (byte1 <= 0x7F)
{
bytes = 1;
}
/* C2..DF, 80..BF */
else if (len >= 2 && byte1 >= 0xC2 && byte1 <= 0xDF && static_cast<Int8>(data[1]) <= static_cast<Int8>(0xBF))
{
bytes = 2;
}
else if (len >= 3)
{
const UInt8 byte2 = data[1];
bool byte2_ok = static_cast<Int8>(byte2) <= static_cast<Int8>(0xBF);
bool byte3_ok = static_cast<Int8>(data[2]) <= static_cast<Int8>(0xBF);
if (byte2_ok && byte3_ok &&
/* E0, A0..BF, 80..BF */
((byte1 == 0xE0 && byte2 >= 0xA0) ||
/* E1..EC, 80..BF, 80..BF */
(byte1 >= 0xE1 && byte1 <= 0xEC) ||
/* ED, 80..9F, 80..BF */
(byte1 == 0xED && byte2 <= 0x9F) ||
/* EE..EF, 80..BF, 80..BF */
(byte1 >= 0xEE && byte1 <= 0xEF)))
{
bytes = 3;
}
else if (len >= 4)
{
bool byte4_ok = static_cast<Int8>(data[3]) <= static_cast<Int8>(0xBF);
if (byte2_ok && byte3_ok && byte4_ok &&
/* F0, 90..BF, 80..BF, 80..BF */
((byte1 == 0xF0 && byte2 >= 0x90) ||
/* F1..F3, 80..BF, 80..BF, 80..BF */
(byte1 >= 0xF1 && byte1 <= 0xF3) ||
/* F4, 80..8F, 80..BF, 80..BF */
(byte1 == 0xF4 && byte2 <= 0x8F)))
{
bytes = 4;
}
else
{
return false;
}
}
else
{
return false;
}
}
else
{
return false;
}
len -= bytes;
data += bytes;
}
return true;
}
#ifndef __SSE4_1__
static inline UInt8 isValidUTF8(const UInt8 * data, UInt64 len) { return isValidUTF8Naive(data, len); }
static inline UInt8 isValidUTF8(const UInt8 * data, UInt64 len) { return DB::UTF8::isValidUTF8(data, len); }
#else
static inline UInt8 isValidUTF8(const UInt8 * data, UInt64 len)
{

View File

@ -2,9 +2,10 @@
#if USE_AWS_S3
# include <IO/S3Common.h>
# include <Common/quoteString.h>
# include <IO/S3Common.h>
# include <IO/WriteBufferFromString.h>
# include <Storages/StorageS3Settings.h>
@ -617,7 +618,7 @@ namespace S3
storage_name = S3;
if (uri.getHost().empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Host is empty in S3 URI: {}", uri.toString());
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Host is empty in S3 URI.");
String name;
String endpoint_authority_from_uri;
@ -626,12 +627,7 @@ namespace S3
{
is_virtual_hosted_style = true;
endpoint = uri.getScheme() + "://" + name + endpoint_authority_from_uri;
/// S3 specification requires at least 3 and at most 63 characters in bucket name.
/// https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html
if (bucket.length() < 3 || bucket.length() > 63)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Bucket name length is out of bounds in virtual hosted style S3 URI: {} ({})", quoteString(bucket), uri.toString());
validateBucket(bucket, uri);
if (!uri.getPath().empty())
{
@ -642,7 +638,7 @@ namespace S3
boost::to_upper(name);
if (name != S3 && name != COS)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Object storage system name is unrecognized in virtual hosted style S3 URI: {} ({})", quoteString(name), uri.toString());
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Object storage system name is unrecognized in virtual hosted style S3 URI: {}", quoteString(name));
}
if (name == S3)
{
@ -657,14 +653,19 @@ namespace S3
{
is_virtual_hosted_style = false;
endpoint = uri.getScheme() + "://" + uri.getAuthority();
validateBucket(bucket, uri);
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bucket or key name are invalid in S3 URI.");
}
void URI::validateBucket(const String & bucket, const Poco::URI & uri)
{
/// S3 specification requires at least 3 and at most 63 characters in bucket name.
/// https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html
if (bucket.length() < 3 || bucket.length() > 63)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bucket name length is out of bounds in virtual hosted style S3 URI: {} ({})", quoteString(bucket), uri.toString());
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bucket or key name are invalid in S3 URI: {}", uri.toString());
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bucket name length is out of bounds in virtual hosted style S3 URI: {}{}",
quoteString(bucket), !uri.empty() ? " (" + uri.toString() + ")" : "");
}
}

View File

@ -74,6 +74,8 @@ struct URI
bool is_virtual_hosted_style;
explicit URI(const Poco::URI & uri_);
static void validateBucket(const String & bucket, const Poco::URI & uri);
};
}

View File

@ -37,6 +37,7 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int ILLEGAL_COLUMN;
extern const int DUPLICATE_COLUMN;
@ -155,6 +156,9 @@ BlockIO InterpreterInsertQuery::execute()
BlockIO res;
StoragePtr table = getTable(query);
if (query.partition_by && !table->supportsPartitionBy())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "PARTITION BY clause is not supported by storage");
auto table_lock = table->lockForShare(getContext()->getInitialQueryId(), settings.lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();

View File

@ -25,6 +25,11 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "FUNCTION ";
table_function->formatImpl(settings, state, frame);
if (partition_by)
{
settings.ostr << " PARTITION BY ";
partition_by->formatImpl(settings, state, frame);
}
}
else
settings.ostr << (settings.hilite ? hilite_none : "")

View File

@ -20,6 +20,7 @@ public:
ASTPtr infile;
ASTPtr watch;
ASTPtr table_function;
ASTPtr partition_by;
ASTPtr settings_ast;
/// Data to insert
@ -44,6 +45,7 @@ public:
if (select) { res->select = select->clone(); res->children.push_back(res->select); }
if (watch) { res->watch = watch->clone(); res->children.push_back(res->watch); }
if (table_function) { res->table_function = table_function->clone(); res->children.push_back(res->table_function); }
if (partition_by) { res->partition_by = partition_by->clone(); res->children.push_back(res->partition_by); }
if (settings_ast) { res->settings_ast = settings_ast->clone(); res->children.push_back(res->settings_ast); }
return res;

View File

@ -35,6 +35,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_settings("SETTINGS");
ParserKeyword s_select("SELECT");
ParserKeyword s_watch("WATCH");
ParserKeyword s_partition_by("PARTITION BY");
ParserKeyword s_with("WITH");
ParserToken s_lparen(TokenType::OpeningRoundBracket);
ParserToken s_rparen(TokenType::ClosingRoundBracket);
@ -42,6 +43,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserList columns_p(std::make_unique<ParserInsertElement>(), std::make_unique<ParserToken>(TokenType::Comma), false);
ParserFunction table_function_p{false};
ParserStringLiteral infile_name_p;
ParserExpressionWithOptionalAlias exp_elem_p(false);
ASTPtr database;
ASTPtr table;
@ -52,6 +54,8 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ASTPtr watch;
ASTPtr table_function;
ASTPtr settings_ast;
ASTPtr partition_by_expr;
/// Insertion data
const char * data = nullptr;
@ -64,6 +68,12 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
if (!table_function_p.parse(pos, table_function, expected))
return false;
if (s_partition_by.ignore(pos, expected))
{
if (!exp_elem_p.parse(pos, partition_by_expr, expected))
return false;
}
}
else
{
@ -183,6 +193,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (table_function)
{
query->table_function = table_function;
query->partition_by = partition_by_expr;
}
else
{

View File

@ -127,6 +127,9 @@ public:
/// Returns true if the storage supports queries with the FINAL section.
virtual bool supportsFinal() const { return false; }
/// Returns true if the storage supports insert queries with the PARTITION BY section.
virtual bool supportsPartitionBy() const { return false; }
/// Returns true if the storage supports queries with the PREWHERE section.
virtual bool supportsPrewhere() const { return false; }

View File

@ -2,15 +2,26 @@
#if USE_AWS_S3
#include <Columns/ColumnString.h>
#include <Common/isValidUTF8.h>
#include <Functions/FunctionsConversion.h>
#include <IO/S3Common.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromS3.h>
@ -46,13 +57,21 @@
namespace fs = std::filesystem;
#include <boost/algorithm/string.hpp>
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_TEXT;
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int UNEXPECTED_EXPRESSION;
extern const int S3_ERROR;
extern const int UNEXPECTED_EXPRESSION;
}
class StorageS3Source::DisclosedGlobIterator::Impl
{
@ -299,11 +318,6 @@ public:
writer->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
// void flush() override
// {
// writer->flush();
// }
void onFinish() override
{
try
@ -328,6 +342,185 @@ private:
};
class PartitionedStorageS3Sink : public SinkToStorage
{
public:
PartitionedStorageS3Sink(
const ASTPtr & partition_by,
const String & format_,
const Block & sample_block_,
ContextPtr context_,
const CompressionMethod compression_method_,
const std::shared_ptr<Aws::S3::S3Client> & client_,
const String & bucket_,
const String & key_,
size_t min_upload_part_size_,
size_t max_single_part_upload_size_)
: SinkToStorage(sample_block_)
, format(format_)
, sample_block(sample_block_)
, context(context_)
, compression_method(compression_method_)
, client(client_)
, bucket(bucket_)
, key(key_)
, min_upload_part_size(min_upload_part_size_)
, max_single_part_upload_size(max_single_part_upload_size_)
{
std::vector<ASTPtr> arguments(1, partition_by);
ASTPtr partition_by_string = makeASTFunction(FunctionToString::name, std::move(arguments));
auto syntax_result = TreeRewriter(context).analyze(partition_by_string, sample_block.getNamesAndTypesList());
partition_by_expr = ExpressionAnalyzer(partition_by_string, syntax_result, context).getActions(false);
partition_by_column_name = partition_by_string->getColumnName();
}
String getName() const override { return "PartitionedStorageS3Sink"; }
void consume(Chunk chunk) override
{
const auto & columns = chunk.getColumns();
Block block_with_partition_by_expr = sample_block.cloneWithoutColumns();
block_with_partition_by_expr.setColumns(columns);
partition_by_expr->execute(block_with_partition_by_expr);
const auto * column = block_with_partition_by_expr.getByName(partition_by_column_name).column.get();
std::unordered_map<String, size_t> sub_chunks_indices;
IColumn::Selector selector;
for (size_t row = 0; row < chunk.getNumRows(); ++row)
{
auto value = column->getDataAt(row);
auto [it, inserted] = sub_chunks_indices.emplace(value, sub_chunks_indices.size());
selector.push_back(it->second);
}
Chunks sub_chunks;
sub_chunks.reserve(sub_chunks_indices.size());
for (size_t column_index = 0; column_index < columns.size(); ++column_index)
{
MutableColumns column_sub_chunks = columns[column_index]->scatter(sub_chunks_indices.size(), selector);
if (column_index == 0) /// Set sizes for sub-chunks.
{
for (const auto & column_sub_chunk : column_sub_chunks)
{
sub_chunks.emplace_back(Columns(), column_sub_chunk->size());
}
}
for (size_t sub_chunk_index = 0; sub_chunk_index < column_sub_chunks.size(); ++sub_chunk_index)
{
sub_chunks[sub_chunk_index].addColumn(std::move(column_sub_chunks[sub_chunk_index]));
}
}
for (const auto & [partition_id, sub_chunk_index] : sub_chunks_indices)
{
getSinkForPartition(partition_id)->consume(std::move(sub_chunks[sub_chunk_index]));
}
}
void onFinish() override
{
for (auto & [partition_id, sink] : sinks)
{
sink->onFinish();
}
}
private:
using SinkPtr = std::shared_ptr<StorageS3Sink>;
const String format;
const Block sample_block;
ContextPtr context;
const CompressionMethod compression_method;
std::shared_ptr<Aws::S3::S3Client> client;
const String bucket;
const String key;
size_t min_upload_part_size;
size_t max_single_part_upload_size;
ExpressionActionsPtr partition_by_expr;
String partition_by_column_name;
std::unordered_map<String, SinkPtr> sinks;
static String replaceWildcards(const String & haystack, const String & partition_id)
{
return boost::replace_all_copy(haystack, PARTITION_ID_WILDCARD, partition_id);
}
SinkPtr getSinkForPartition(const String & partition_id)
{
auto it = sinks.find(partition_id);
if (it == sinks.end())
{
auto partition_bucket = replaceWildcards(bucket, partition_id);
validateBucket(partition_bucket);
auto partition_key = replaceWildcards(key, partition_id);
validateKey(partition_key);
std::tie(it, std::ignore) = sinks.emplace(partition_id, std::make_shared<StorageS3Sink>(
format,
sample_block,
context,
compression_method,
client,
partition_bucket,
partition_key,
min_upload_part_size,
max_single_part_upload_size
));
}
return it->second;
}
static void validateBucket(const String & str)
{
S3::URI::validateBucket(str, {});
if (!DB::UTF8::isValidUTF8(reinterpret_cast<const UInt8 *>(str.data()), str.size()))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in bucket name");
validatePartitionKey(str, false);
}
static void validateKey(const String & str)
{
/// See:
/// - https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html
/// - https://cloud.ibm.com/apidocs/cos/cos-compatibility#putobject
if (str.empty() || str.size() > 1024)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Incorrect key length (not empty, max 1023 characters), got: {}", str.size());
if (!DB::UTF8::isValidUTF8(reinterpret_cast<const UInt8 *>(str.data()), str.size()))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in key");
validatePartitionKey(str, true);
}
static void validatePartitionKey(const StringRef & str, bool allow_slash)
{
for (const char * i = str.data; i != str.data + str.size; ++i)
{
if (static_cast<UInt8>(*i) < 0x20 || *i == '{' || *i == '}' || *i == '*' || *i == '?' || (!allow_slash && *i == '/'))
{
/// Need to convert to UInt32 because UInt8 can't be passed to format due to "mixing character types is disallowed".
UInt32 invalid_char_byte = static_cast<UInt32>(static_cast<UInt8>(*i));
throw DB::Exception(
ErrorCodes::CANNOT_PARSE_TEXT, "Illegal character '\\x{:02x}' in partition id starting with '{}'",
invalid_char_byte, StringRef(str.data, i - str.data));
}
}
}
};
StorageS3::StorageS3(
const S3::URI & uri_,
const String & access_key_id_,
@ -427,20 +620,45 @@ Pipe StorageS3::read(
return pipe;
}
SinkToStoragePtr StorageS3::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
updateClientAndAuthSettings(local_context, client_auth);
return std::make_shared<StorageS3Sink>(
auto sample_block = metadata_snapshot->getSampleBlock();
auto chosen_compression_method = chooseCompressionMethod(client_auth.uri.key, compression_method);
bool has_wildcards = client_auth.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos || client_auth.uri.key.find(PARTITION_ID_WILDCARD) != String::npos;
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
bool is_partitioned_implementation = insert_query && insert_query->partition_by && has_wildcards;
if (is_partitioned_implementation)
{
return std::make_shared<PartitionedStorageS3Sink>(
insert_query->partition_by,
format_name,
metadata_snapshot->getSampleBlock(),
sample_block,
local_context,
chooseCompressionMethod(client_auth.uri.key, compression_method),
chosen_compression_method,
client_auth.client,
client_auth.uri.bucket,
client_auth.uri.key,
min_upload_part_size,
max_single_part_upload_size);
}
else
{
return std::make_shared<StorageS3Sink>(
format_name,
sample_block,
local_context,
chosen_compression_method,
client_auth.client,
client_auth.uri.bucket,
client_auth.uri.key,
min_upload_part_size,
max_single_part_upload_size);
}
}
void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
@ -583,6 +801,11 @@ NamesAndTypesList StorageS3::getVirtuals() const
};
}
bool StorageS3::supportsPartitionBy() const
{
return true;
}
}
#endif

View File

@ -136,6 +136,8 @@ public:
NamesAndTypesList getVirtuals() const override;
bool supportsPartitionBy() const override;
private:
friend class StorageS3Cluster;

View File

@ -146,6 +146,59 @@ def test_put(started_cluster, maybe_auth, positive, compression):
assert values_csv == get_s3_file_content(started_cluster, bucket, filename)
def test_partition_by(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
partition_by = "column3"
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
filename = "test_{_partition_id}.csv"
put_query = f"""INSERT INTO TABLE FUNCTION
s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{filename}', 'CSV', '{table_format}')
PARTITION BY {partition_by} VALUES {values}"""
run_query(instance, put_query)
assert "1,2,3\n" == get_s3_file_content(started_cluster, bucket, "test_3.csv")
assert "3,2,1\n" == get_s3_file_content(started_cluster, bucket, "test_1.csv")
assert "78,43,45\n" == get_s3_file_content(started_cluster, bucket, "test_45.csv")
def test_partition_by_string_column(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "col_num UInt32, col_str String"
partition_by = "col_str"
values = "(1, 'foo/bar'), (3, 'йцук'), (78, '你好')"
filename = "test_{_partition_id}.csv"
put_query = f"""INSERT INTO TABLE FUNCTION
s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{filename}', 'CSV', '{table_format}')
PARTITION BY {partition_by} VALUES {values}"""
run_query(instance, put_query)
assert '1,"foo/bar"\n' == get_s3_file_content(started_cluster, bucket, "test_foo/bar.csv")
assert '3,"йцук"\n' == get_s3_file_content(started_cluster, bucket, "test_йцук.csv")
assert '78,"你好"\n' == get_s3_file_content(started_cluster, bucket, "test_你好.csv")
def test_partition_by_const_column(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
partition_by = "'88'"
values_csv = "1,2,3\n3,2,1\n78,43,45\n"
filename = "test_{_partition_id}.csv"
put_query = f"""INSERT INTO TABLE FUNCTION
s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{filename}', 'CSV', '{table_format}')
PARTITION BY {partition_by} VALUES {values}"""
run_query(instance, put_query)
assert values_csv == get_s3_file_content(started_cluster, bucket, "test_88.csv")
@pytest.mark.parametrize("special", [
"space",
"plus"

View File

@ -0,0 +1,9 @@
INSERT INTO TABLE FUNCTION file('foo.csv', 'CSV', 'id Int32, val Int32') PARTITION BY val VALUES (1, 1), (2, 2); -- { serverError NOT_IMPLEMENTED }
INSERT INTO TABLE FUNCTION s3('http://localhost:9001/foo/test_{_partition_id}.csv', 'admin', 'admin', 'CSV', 'id Int32, val String') PARTITION BY val VALUES (1, '\r\n'); -- { serverError CANNOT_PARSE_TEXT }
INSERT INTO TABLE FUNCTION s3('http://localhost:9001/foo/test_{_partition_id}.csv', 'admin', 'admin', 'CSV', 'id Int32, val String') PARTITION BY val VALUES (1, 'abc\x00abc'); -- { serverError CANNOT_PARSE_TEXT }
INSERT INTO TABLE FUNCTION s3('http://localhost:9001/foo/test_{_partition_id}.csv', 'admin', 'admin', 'CSV', 'id Int32, val String') PARTITION BY val VALUES (1, 'abc\xc3\x28abc'); -- { serverError CANNOT_PARSE_TEXT }
INSERT INTO TABLE FUNCTION s3('http://localhost:9001/foo/test_{_partition_id}.csv', 'admin', 'admin', 'CSV', 'id Int32, val String') PARTITION BY val VALUES (1, 'abc}{abc'); -- { serverError CANNOT_PARSE_TEXT }
INSERT INTO TABLE FUNCTION s3('http://localhost:9001/foo/test_{_partition_id}.csv', 'admin', 'admin', 'CSV', 'id Int32, val String') PARTITION BY val VALUES (1, 'abc*abc'); -- { serverError CANNOT_PARSE_TEXT }
INSERT INTO TABLE FUNCTION s3('http://localhost:9001/foo/{_partition_id}', 'admin', 'admin', 'CSV', 'id Int32, val String') PARTITION BY val VALUES (1, ''); -- { serverError BAD_ARGUMENTS }
INSERT INTO TABLE FUNCTION s3('http://localhost:9001/{_partition_id}/key.csv', 'admin', 'admin', 'CSV', 'id Int32, val String') PARTITION BY val VALUES (1, ''); -- { serverError BAD_ARGUMENTS }
INSERT INTO TABLE FUNCTION s3('http://localhost:9001/{_partition_id}/key.csv', 'admin', 'admin', 'CSV', 'id Int32, val String') PARTITION BY val VALUES (1, 'aa/bb'); -- { serverError CANNOT_PARSE_TEXT }

View File

@ -267,3 +267,4 @@
01428_h3_range_check
01442_h3kring_range_check
01906_h3_to_geo
01944_insert_partition_by