Merge branch 'master' into fix-format-row

This commit is contained in:
mergify[bot] 2021-11-02 21:56:44 +00:00 committed by GitHub
commit 1ba871a664
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 892 additions and 522 deletions

View File

@ -86,7 +86,7 @@
# include "config_core.h"
# include "Common/config_version.h"
# if USE_OPENCL
# include "Common/BitonicSort.h" // Y_IGNORE
# include "Common/BitonicSort.h"
# endif
#endif

View File

@ -511,6 +511,8 @@ class IColumn;
M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \
M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \
M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \
M(UInt64, merge_tree_min_rows_for_concurrent_read_for_remote_filesystem, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized, when reading from remote filesystem.", 0) \
M(UInt64, merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized, when reading from remote filesystem.", 0) \
\
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. Makes sense only for inserts via HTTP protocol. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
@ -594,7 +596,7 @@ class IColumn;
M(String, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy'.", 0) \
M(UInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \
M(String, output_format_avro_string_column_pattern, "", "For Avro format: regexp of String columns to select as AVRO string.", 0) \
M(UInt64, output_format_avro_rows_in_file, 1000000, "Max rows in a file (if permitted by storage)", 0) \
M(UInt64, output_format_avro_rows_in_file, 1, "Max rows in a file (if permitted by storage)", 0) \
M(Bool, output_format_tsv_crlf_end_of_line, false, "If it is set true, end of line in TSV format will be \\r\\n instead of \\n.", 0) \
M(String, output_format_csv_null_representation, "\\N", "Custom NULL representation in CSV format", 0) \
M(String, output_format_tsv_null_representation, "\\N", "Custom NULL representation in TSV format", 0) \

View File

@ -66,7 +66,7 @@ struct FormatSettings
UInt64 output_sync_interval = 16 * 1024;
bool allow_missing_fields = false;
String string_column_pattern;
UInt64 output_rows_in_file = 1000000;
UInt64 output_rows_in_file = 1;
} avro;
struct CSV

View File

@ -244,6 +244,16 @@ struct PartRangesReadInfo
bool use_uncompressed_cache = false;
static bool checkAllPartsOnRemoteFS(const RangesInDataParts & parts)
{
for (const auto & part : parts)
{
if (!part.data_part->isStoredOnRemoteDisk())
return false;
}
return true;
}
PartRangesReadInfo(
const RangesInDataParts & parts,
const Settings & settings,
@ -270,9 +280,12 @@ struct PartRangesReadInfo
data_settings.index_granularity,
index_granularity_bytes);
auto all_parts_on_remote_disk = checkAllPartsOnRemoteFS(parts);
min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead(
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,
all_parts_on_remote_disk ? settings.merge_tree_min_rows_for_concurrent_read_for_remote_filesystem
: settings.merge_tree_min_rows_for_concurrent_read,
all_parts_on_remote_disk ? settings.merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem
: settings.merge_tree_min_bytes_for_concurrent_read,
data_settings.index_granularity,
index_granularity_bytes,
sum_marks);

View File

@ -8,6 +8,9 @@ namespace DB
/// Sink which is returned from Storage::write.
class SinkToStorage : public ExceptionKeepingTransform
{
/// PartitionedSink owns nested sinks.
friend class PartitionedSink;
public:
explicit SinkToStorage(const Block & header);
@ -35,4 +38,5 @@ public:
void consume(Chunk) override {}
};
using SinkPtr = std::shared_ptr<SinkToStorage>;
}

View File

@ -259,6 +259,7 @@ void URLBasedDataSourceConfiguration::set(const URLBasedDataSourceConfiguration
format = conf.format;
compression_method = conf.compression_method;
structure = conf.structure;
http_method = conf.http_method;
}
@ -286,6 +287,18 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const
{
configuration.url = config.getString(config_prefix + ".url", "");
}
else if (key == "method")
{
configuration.http_method = config.getString(config_prefix + ".method", "");
}
else if (key == "format")
{
configuration.format = config.getString(config_prefix + ".format", "");
}
else if (key == "structure")
{
configuration.structure = config.getString(config_prefix + ".structure", "");
}
else if (key == "headers")
{
Poco::Util::AbstractConfiguration::Keys header_keys;
@ -319,6 +332,8 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const
if (arg_name == "url")
configuration.url = arg_value.safeGet<String>();
else if (arg_name == "method")
configuration.http_method = arg_value.safeGet<String>();
else if (arg_name == "format")
configuration.format = arg_value.safeGet<String>();
else if (arg_name == "compression_method")

View File

@ -93,6 +93,7 @@ struct URLBasedDataSourceConfiguration
String structure;
std::vector<std::pair<String, Field>> headers;
String http_method;
void set(const URLBasedDataSourceConfiguration & conf);
};

View File

@ -2,30 +2,44 @@
#if USE_HDFS
#include <Storages/StorageFactory.h>
#include <Storages/HDFS/StorageHDFS.h>
#include <Common/parseGlobs.h>
#include <DataTypes/DataTypeString.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadHelpers.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Storages/StorageFactory.h>
#include <Storages/HDFS/StorageHDFS.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <IO/WriteHelpers.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/PartitionedSink.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IOutputFormat.h>
#include <DataTypes/DataTypeString.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Common/parseGlobs.h>
#include <Functions/FunctionsConversion.h>
#include <QueryPipeline/QueryPipeline.h>
#include <QueryPipeline/Pipe.h>
#include <Poco/URI.h>
#include <re2/re2.h>
#include <re2/stringpiece.h>
#include <hdfs/hdfs.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <QueryPipeline/QueryPipeline.h>
#include <QueryPipeline/Pipe.h>
#include <filesystem>
@ -47,8 +61,10 @@ StorageHDFS::StorageHDFS(
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
const String & compression_method_ = "")
const String & compression_method_ = "",
ASTPtr partition_by_)
: IStorage(table_id_), WithContext(context_), uri(uri_), format_name(format_name_), compression_method(compression_method_)
, partition_by(partition_by_)
{
context_->getRemoteHostFilter().checkURL(Poco::URI(uri));
@ -224,6 +240,43 @@ private:
bool is_first_chunk = true;
};
class PartitionedHDFSSink : public PartitionedSink
{
public:
PartitionedHDFSSink(
const ASTPtr & partition_by,
const String & uri_,
const String & format_,
const Block & sample_block_,
ContextPtr context_,
const CompressionMethod compression_method_)
: PartitionedSink(partition_by, context_, sample_block_)
, uri(uri_)
, format(format_)
, sample_block(sample_block_)
, context(context_)
, compression_method(compression_method_)
{
}
SinkPtr createSinkForPartition(const String & partition_id) override
{
auto path = PartitionedSink::replaceWildcards(uri, partition_id);
PartitionedSink::validatePartitionKey(path, true);
return std::make_shared<HDFSSink>(path, format, sample_block, context, compression_method);
}
private:
const String uri;
const String format;
const Block sample_block;
ContextPtr context;
const CompressionMethod compression_method;
};
/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageFile.
*/
@ -315,13 +368,31 @@ Pipe StorageHDFS::read(
return Pipe::unitePipes(std::move(pipes));
}
SinkToStoragePtr StorageHDFS::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
{
return std::make_shared<HDFSSink>(uri,
format_name,
metadata_snapshot->getSampleBlock(),
getContext(),
chooseCompressionMethod(uri, compression_method));
bool has_wildcards = uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
bool is_partitioned_implementation = partition_by_ast && has_wildcards;
if (is_partitioned_implementation)
{
return std::make_shared<PartitionedHDFSSink>(
partition_by_ast,
uri,
format_name,
metadata_snapshot->getSampleBlock(),
getContext(),
chooseCompressionMethod(uri, compression_method));
}
else
{
return std::make_shared<HDFSSink>(uri,
format_name,
metadata_snapshot->getSampleBlock(),
getContext(),
chooseCompressionMethod(uri, compression_method));
}
}
void StorageHDFS::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr context_, TableExclusiveLockHolder &)
@ -364,10 +435,15 @@ void registerStorageHDFS(StorageFactory & factory)
compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
} else compression_method = "auto";
ASTPtr partition_by;
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
return StorageHDFS::create(
url, args.table_id, format_name, args.columns, args.constraints, args.comment, args.getContext(), compression_method);
url, args.table_id, format_name, args.columns, args.constraints, args.comment, args.getContext(), compression_method, partition_by);
},
{
.supports_sort_order = true, // for partition by
.source_access_type = AccessType::HDFS,
});
}

View File

@ -36,6 +36,8 @@ public:
NamesAndTypesList getVirtuals() const override;
bool supportsPartitionBy() const override { return true; }
protected:
StorageHDFS(
const String & uri_,
@ -45,12 +47,14 @@ protected:
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
const String & compression_method_);
const String & compression_method_,
ASTPtr partition_by = nullptr);
private:
const String uri;
String format_name;
String compression_method;
ASTPtr partition_by;
Poco::Logger * log = &Poco::Logger::get("StorageHDFS");
};

View File

@ -0,0 +1,128 @@
#include "PartitionedSink.h"
#include <Functions/FunctionsConversion.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Processors/Sources/SourceWithProgress.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_TEXT;
}
PartitionedSink::PartitionedSink(
const ASTPtr & partition_by,
ContextPtr context_,
const Block & sample_block_)
: SinkToStorage(sample_block_)
, context(context_)
, sample_block(sample_block_)
{
std::vector<ASTPtr> arguments(1, partition_by);
ASTPtr partition_by_string = makeASTFunction(FunctionToString::name, std::move(arguments));
auto syntax_result = TreeRewriter(context).analyze(partition_by_string, sample_block.getNamesAndTypesList());
partition_by_expr = ExpressionAnalyzer(partition_by_string, syntax_result, context).getActions(false);
partition_by_column_name = partition_by_string->getColumnName();
}
SinkPtr PartitionedSink::getSinkForPartition(const String & partition_id)
{
auto it = sinks.find(partition_id);
if (it == sinks.end())
{
auto sink = createSinkForPartition(partition_id);
std::tie(it, std::ignore) = sinks.emplace(partition_id, sink);
}
return it->second;
}
void PartitionedSink::consume(Chunk chunk)
{
const auto & columns = chunk.getColumns();
Block block_with_partition_by_expr = sample_block.cloneWithoutColumns();
block_with_partition_by_expr.setColumns(columns);
partition_by_expr->execute(block_with_partition_by_expr);
const auto * column = block_with_partition_by_expr.getByName(partition_by_column_name).column.get();
std::unordered_map<String, size_t> sub_chunks_indices;
IColumn::Selector selector;
for (size_t row = 0; row < chunk.getNumRows(); ++row)
{
auto value = column->getDataAt(row);
auto [it, inserted] = sub_chunks_indices.emplace(value, sub_chunks_indices.size());
selector.push_back(it->second);
}
Chunks sub_chunks;
sub_chunks.reserve(sub_chunks_indices.size());
for (size_t column_index = 0; column_index < columns.size(); ++column_index)
{
MutableColumns column_sub_chunks = columns[column_index]->scatter(sub_chunks_indices.size(), selector);
if (column_index == 0) /// Set sizes for sub-chunks.
{
for (const auto & column_sub_chunk : column_sub_chunks)
{
sub_chunks.emplace_back(Columns(), column_sub_chunk->size());
}
}
for (size_t sub_chunk_index = 0; sub_chunk_index < column_sub_chunks.size(); ++sub_chunk_index)
{
sub_chunks[sub_chunk_index].addColumn(std::move(column_sub_chunks[sub_chunk_index]));
}
}
for (const auto & [partition_id, sub_chunk_index] : sub_chunks_indices)
{
getSinkForPartition(partition_id)->consume(std::move(sub_chunks[sub_chunk_index]));
}
}
void PartitionedSink::onFinish()
{
for (auto & [partition_id, sink] : sinks)
{
sink->onFinish();
}
}
void PartitionedSink::validatePartitionKey(const String & str, bool allow_slash)
{
for (const char * i = str.data(); i != str.data() + str.size(); ++i)
{
if (static_cast<UInt8>(*i) < 0x20 || *i == '{' || *i == '}' || *i == '*' || *i == '?' || (!allow_slash && *i == '/'))
{
/// Need to convert to UInt32 because UInt8 can't be passed to format due to "mixing character types is disallowed".
UInt32 invalid_char_byte = static_cast<UInt32>(static_cast<UInt8>(*i));
throw DB::Exception(
ErrorCodes::CANNOT_PARSE_TEXT, "Illegal character '\\x{:02x}' in partition id starting with '{}'",
invalid_char_byte, std::string(str.data(), i - str.data()));
}
}
}
String PartitionedSink::replaceWildcards(const String & haystack, const String & partition_id)
{
return boost::replace_all_copy(haystack, PartitionedSink::PARTITION_ID_WILDCARD, partition_id);
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <Processors/Sinks/SinkToStorage.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/Context_fwd.h>
namespace DB
{
class PartitionedSink : public SinkToStorage
{
public:
static constexpr auto PARTITION_ID_WILDCARD = "{_partition_id}";
PartitionedSink(const ASTPtr & partition_by, ContextPtr context_, const Block & sample_block_);
String getName() const override { return "PartitionedSink"; }
void consume(Chunk chunk) override;
void onFinish() override;
virtual SinkPtr createSinkForPartition(const String & partition_id) = 0;
static void validatePartitionKey(const String & str, bool allow_slash);
static String replaceWildcards(const String & haystack, const String & partition_id);
private:
ContextPtr context;
Block sample_block;
ExpressionActionsPtr partition_by_expr;
String partition_by_column_name;
std::unordered_map<String, SinkPtr> sinks;
SinkPtr getSinkForPartition(const String & partition_id);
};
}

View File

@ -159,9 +159,8 @@ StorageExternalDistributed::StorageExternalDistributed(
}
else
{
Poco::URI uri(url_description);
shard = std::make_shared<StorageURL>(
uri, table_id, format_name, format_settings, columns, constraints, String{}, context, compression_method);
url_description, table_id, format_name, format_settings, columns, constraints, String{}, context, compression_method);
LOG_DEBUG(&Poco::Logger::get("StorageURLDistributed"), "Adding URL: {}", url_description);
}

View File

@ -7,6 +7,7 @@
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromFileDescriptor.h>
@ -25,6 +26,7 @@
#include <Common/filesystemHelpers.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/PartitionedSink.h>
#include <sys/stat.h>
#include <fcntl.h>
@ -60,6 +62,7 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
extern const int INCOMPATIBLE_COLUMNS;
extern const int CANNOT_STAT;
extern const int LOGICAL_ERROR;
}
namespace
@ -131,6 +134,7 @@ void checkCreationIsAllowed(ContextPtr context_global, const std::string & db_di
if (fs::exists(table_path) && fs::is_directory(table_path))
throw Exception("File must not be a directory", ErrorCodes::INCORRECT_FILE_NAME);
}
}
Strings StorageFile::getPathsList(const String & table_path, const String & user_files_path, ContextPtr context, size_t & total_bytes_to_read)
@ -190,6 +194,7 @@ StorageFile::StorageFile(const std::string & table_path_, const std::string & us
{
is_db_table = false;
paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read);
path_for_partitioned_write = table_path_;
if (args.format_name == "Distributed")
{
@ -541,33 +546,79 @@ Pipe StorageFile::read(
class StorageFileSink final : public SinkToStorage
{
public:
explicit StorageFileSink(
StorageFile & storage_,
StorageFileSink(
const StorageMetadataPtr & metadata_snapshot_,
std::unique_lock<std::shared_timed_mutex> && lock_,
const CompressionMethod compression_method,
ContextPtr context,
const std::optional<FormatSettings> & format_settings,
int & flags)
const String & table_name_for_log_,
int table_fd_,
bool use_table_fd_,
std::string base_path_,
std::vector<std::string> paths_,
const CompressionMethod compression_method_,
const std::optional<FormatSettings> & format_settings_,
const String format_name_,
ContextPtr context_,
int flags_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, table_name_for_log(table_name_for_log_)
, table_fd(table_fd_)
, use_table_fd(use_table_fd_)
, base_path(base_path_)
, paths(paths_)
, compression_method(compression_method_)
, format_name(format_name_)
, format_settings(format_settings_)
, context(context_)
, flags(flags_)
{
initialize();
}
StorageFileSink(
const StorageMetadataPtr & metadata_snapshot_,
const String & table_name_for_log_,
std::unique_lock<std::shared_timed_mutex> && lock_,
int table_fd_,
bool use_table_fd_,
std::string base_path_,
std::vector<std::string> paths_,
const CompressionMethod compression_method_,
const std::optional<FormatSettings> & format_settings_,
const String format_name_,
ContextPtr context_,
int flags_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, metadata_snapshot(metadata_snapshot_)
, table_name_for_log(table_name_for_log_)
, table_fd(table_fd_)
, use_table_fd(use_table_fd_)
, base_path(base_path_)
, paths(paths_)
, compression_method(compression_method_)
, format_name(format_name_)
, format_settings(format_settings_)
, context(context_)
, flags(flags_)
, lock(std::move(lock_))
{
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
initialize();
}
void initialize()
{
std::unique_ptr<WriteBufferFromFileDescriptor> naked_buffer = nullptr;
if (storage.use_table_fd)
if (use_table_fd)
{
naked_buffer = std::make_unique<WriteBufferFromFileDescriptor>(storage.table_fd, DBMS_DEFAULT_BUFFER_SIZE);
naked_buffer = std::make_unique<WriteBufferFromFileDescriptor>(table_fd, DBMS_DEFAULT_BUFFER_SIZE);
}
else
{
if (storage.paths.size() != 1)
throw Exception("Table '" + storage.getStorageID().getNameForLogs() + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED);
if (paths.size() != 1)
throw Exception("Table '" + table_name_for_log + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED);
flags |= O_WRONLY | O_APPEND | O_CREAT;
naked_buffer = std::make_unique<WriteBufferFromFile>(storage.paths[0], DBMS_DEFAULT_BUFFER_SIZE, flags);
naked_buffer = std::make_unique<WriteBufferFromFile>(paths[0], DBMS_DEFAULT_BUFFER_SIZE, flags);
}
/// In case of CSVWithNames we have already written prefix.
@ -576,7 +627,7 @@ public:
write_buf = wrapWriteBufferWithCompressionMethod(std::move(naked_buffer), compression_method, 3);
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(storage.format_name,
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format_name,
*write_buf, metadata_snapshot->getSampleBlock(), context,
{}, format_settings);
}
@ -606,16 +657,93 @@ public:
// }
private:
StorageFile & storage;
StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_timed_mutex> lock;
String table_name_for_log;
std::unique_ptr<WriteBuffer> write_buf;
OutputFormatPtr writer;
bool prefix_written{false};
int table_fd;
bool use_table_fd;
std::string base_path;
std::vector<std::string> paths;
CompressionMethod compression_method;
std::string format_name;
std::optional<FormatSettings> format_settings;
ContextPtr context;
int flags;
std::unique_lock<std::shared_timed_mutex> lock;
};
class PartitionedStorageFileSink : public PartitionedSink
{
public:
PartitionedStorageFileSink(
const ASTPtr & partition_by,
const StorageMetadataPtr & metadata_snapshot_,
const String & table_name_for_log_,
std::unique_lock<std::shared_timed_mutex> && lock_,
String base_path_,
String path_,
const CompressionMethod compression_method_,
const std::optional<FormatSettings> & format_settings_,
const String format_name_,
ContextPtr context_,
int flags_)
: PartitionedSink(partition_by, context_, metadata_snapshot_->getSampleBlock())
, path(path_)
, metadata_snapshot(metadata_snapshot_)
, table_name_for_log(table_name_for_log_)
, base_path(base_path_)
, compression_method(compression_method_)
, format_name(format_name_)
, format_settings(format_settings_)
, context(context_)
, flags(flags_)
, lock(std::move(lock_))
{
}
SinkPtr createSinkForPartition(const String & partition_id) override
{
auto partition_path = PartitionedSink::replaceWildcards(path, partition_id);
PartitionedSink::validatePartitionKey(partition_path, true);
Strings result_paths = {partition_path};
checkCreationIsAllowed(context, context->getUserFilesPath(), partition_path);
return std::make_shared<StorageFileSink>(
metadata_snapshot,
table_name_for_log,
-1,
/* use_table_fd */false,
base_path,
result_paths,
compression_method,
format_settings,
format_name,
context,
flags);
}
private:
const String path;
StorageMetadataPtr metadata_snapshot;
String table_name_for_log;
std::string base_path;
CompressionMethod compression_method;
std::string format_name;
std::optional<FormatSettings> format_settings;
ContextPtr context;
int flags;
std::unique_lock<std::shared_timed_mutex> lock;
};
SinkToStoragePtr StorageFile::write(
const ASTPtr & /*query*/,
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context)
{
@ -628,20 +756,51 @@ SinkToStoragePtr StorageFile::write(
if (context->getSettingsRef().engine_file_truncate_on_insert)
flags |= O_TRUNC;
if (!paths.empty())
{
path = paths[0];
fs::create_directories(fs::path(path).parent_path());
}
bool has_wildcards = path_for_partitioned_write.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
bool is_partitioned_implementation = insert_query && insert_query->partition_by && has_wildcards;
return std::make_shared<StorageFileSink>(
*this,
metadata_snapshot,
std::unique_lock{rwlock, getLockTimeout(context)},
chooseCompressionMethod(path, compression_method),
context,
format_settings,
flags);
if (is_partitioned_implementation)
{
if (path_for_partitioned_write.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty path for partitioned write");
fs::create_directories(fs::path(path_for_partitioned_write).parent_path());
return std::make_shared<PartitionedStorageFileSink>(
insert_query->partition_by,
metadata_snapshot,
getStorageID().getNameForLogs(),
std::unique_lock{rwlock, getLockTimeout(context)},
base_path,
path_for_partitioned_write,
chooseCompressionMethod(path, compression_method),
format_settings,
format_name,
context,
flags);
}
else
{
if (!paths.empty())
{
path = paths[0];
fs::create_directories(fs::path(path).parent_path());
}
return std::make_shared<StorageFileSink>(
metadata_snapshot,
getStorageID().getNameForLogs(),
std::unique_lock{rwlock, getLockTimeout(context)},
table_fd,
use_table_fd,
base_path,
paths,
chooseCompressionMethod(path, compression_method),
format_settings,
format_name,
context,
flags);
}
}
bool StorageFile::storesDataOnDisk() const

View File

@ -16,7 +16,9 @@ class StorageFileBlockOutputStream;
class StorageFile final : public shared_ptr_helper<StorageFile>, public IStorage
{
friend struct shared_ptr_helper<StorageFile>;
friend struct shared_ptr_helper<StorageFile>;
friend class PartitionedStorageFileSink;
public:
std::string getName() const override { return "File"; }
@ -66,6 +68,8 @@ public:
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool isColumnOriented() const;
bool supportsPartitionBy() const override { return true; }
protected:
friend class StorageFileSource;
friend class StorageFileSink;
@ -104,6 +108,8 @@ private:
/// Total number of bytes to read (sums for multiple files in case of globs). Needed for progress bar.
size_t total_bytes_to_read = 0;
String path_for_partitioned_write;
};
}

View File

@ -22,6 +22,7 @@
#include <Storages/StorageFactory.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageS3Settings.h>
#include <Storages/PartitionedSink.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadHelpers.h>
@ -353,7 +354,7 @@ private:
};
class PartitionedStorageS3Sink : public SinkToStorage
class PartitionedStorageS3Sink : public PartitionedSink
{
public:
PartitionedStorageS3Sink(
@ -368,7 +369,7 @@ public:
const String & key_,
size_t min_upload_part_size_,
size_t max_single_part_upload_size_)
: SinkToStorage(sample_block_)
: PartitionedSink(partition_by, context_, sample_block_)
, format(format_)
, sample_block(sample_block_)
, context(context_)
@ -380,74 +381,36 @@ public:
, max_single_part_upload_size(max_single_part_upload_size_)
, format_settings(format_settings_)
{
std::vector<ASTPtr> arguments(1, partition_by);
ASTPtr partition_by_string = makeASTFunction(FunctionToString::name, std::move(arguments));
auto syntax_result = TreeRewriter(context).analyze(partition_by_string, sample_block.getNamesAndTypesList());
partition_by_expr = ExpressionAnalyzer(partition_by_string, syntax_result, context).getActions(false);
partition_by_column_name = partition_by_string->getColumnName();
}
String getName() const override { return "PartitionedStorageS3Sink"; }
void consume(Chunk chunk) override
SinkPtr createSinkForPartition(const String & partition_id) override
{
const auto & columns = chunk.getColumns();
auto partition_bucket = replaceWildcards(bucket, partition_id);
validateBucket(partition_bucket);
Block block_with_partition_by_expr = sample_block.cloneWithoutColumns();
block_with_partition_by_expr.setColumns(columns);
partition_by_expr->execute(block_with_partition_by_expr);
auto partition_key = replaceWildcards(key, partition_id);
validateKey(partition_key);
const auto * column = block_with_partition_by_expr.getByName(partition_by_column_name).column.get();
std::unordered_map<String, size_t> sub_chunks_indices;
IColumn::Selector selector;
for (size_t row = 0; row < chunk.getNumRows(); ++row)
{
auto value = column->getDataAt(row);
auto [it, inserted] = sub_chunks_indices.emplace(value, sub_chunks_indices.size());
selector.push_back(it->second);
}
Chunks sub_chunks;
sub_chunks.reserve(sub_chunks_indices.size());
for (size_t column_index = 0; column_index < columns.size(); ++column_index)
{
MutableColumns column_sub_chunks = columns[column_index]->scatter(sub_chunks_indices.size(), selector);
if (column_index == 0) /// Set sizes for sub-chunks.
{
for (const auto & column_sub_chunk : column_sub_chunks)
{
sub_chunks.emplace_back(Columns(), column_sub_chunk->size());
}
}
for (size_t sub_chunk_index = 0; sub_chunk_index < column_sub_chunks.size(); ++sub_chunk_index)
{
sub_chunks[sub_chunk_index].addColumn(std::move(column_sub_chunks[sub_chunk_index]));
}
}
for (const auto & [partition_id, sub_chunk_index] : sub_chunks_indices)
{
getSinkForPartition(partition_id)->consume(std::move(sub_chunks[sub_chunk_index]));
}
}
void onFinish() override
{
for (auto & [partition_id, sink] : sinks)
{
sink->onFinish();
}
return std::make_shared<StorageS3Sink>(
format,
sample_block,
context,
format_settings,
compression_method,
client,
partition_bucket,
partition_key,
min_upload_part_size,
max_single_part_upload_size
);
}
private:
using SinkPtr = std::shared_ptr<StorageS3Sink>;
const String format;
const Block sample_block;
ContextPtr context;
const CompressionMethod compression_method;
std::shared_ptr<Aws::S3::S3Client> client;
const String bucket;
const String key;
@ -458,41 +421,6 @@ private:
ExpressionActionsPtr partition_by_expr;
String partition_by_column_name;
std::unordered_map<String, SinkPtr> sinks;
static String replaceWildcards(const String & haystack, const String & partition_id)
{
return boost::replace_all_copy(haystack, PARTITION_ID_WILDCARD, partition_id);
}
SinkPtr getSinkForPartition(const String & partition_id)
{
auto it = sinks.find(partition_id);
if (it == sinks.end())
{
auto partition_bucket = replaceWildcards(bucket, partition_id);
validateBucket(partition_bucket);
auto partition_key = replaceWildcards(key, partition_id);
validateKey(partition_key);
std::tie(it, std::ignore) = sinks.emplace(partition_id, std::make_shared<StorageS3Sink>(
format,
sample_block,
context,
format_settings,
compression_method,
client,
partition_bucket,
partition_key,
min_upload_part_size,
max_single_part_upload_size
));
}
return it->second;
}
static void validateBucket(const String & str)
{
S3::URI::validateBucket(str, {});
@ -517,21 +445,6 @@ private:
validatePartitionKey(str, true);
}
static void validatePartitionKey(const StringRef & str, bool allow_slash)
{
for (const char * i = str.data; i != str.data + str.size; ++i)
{
if (static_cast<UInt8>(*i) < 0x20 || *i == '{' || *i == '}' || *i == '*' || *i == '?' || (!allow_slash && *i == '/'))
{
/// Need to convert to UInt32 because UInt8 can't be passed to format due to "mixing character types is disallowed".
UInt32 invalid_char_byte = static_cast<UInt32>(static_cast<UInt8>(*i));
throw DB::Exception(
ErrorCodes::CANNOT_PARSE_TEXT, "Illegal character '\\x{:02x}' in partition id starting with '{}'",
invalid_char_byte, StringRef(str.data, i - str.data));
}
}
}
};
@ -551,7 +464,8 @@ StorageS3::StorageS3(
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const String & compression_method_,
bool distributed_processing_)
bool distributed_processing_,
ASTPtr partition_by_)
: IStorage(table_id_)
, client_auth{uri_, access_key_id_, secret_access_key_, max_connections_, {}, {}} /// Client and settings will be updated later
, format_name(format_name_)
@ -562,6 +476,7 @@ StorageS3::StorageS3(
, name(uri_.storage_name)
, distributed_processing(distributed_processing_)
, format_settings(format_settings_)
, partition_by(partition_by_)
{
context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri);
StorageInMemoryMetadata storage_metadata;
@ -646,12 +561,13 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
bool has_wildcards = client_auth.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos || client_auth.uri.key.find(PARTITION_ID_WILDCARD) != String::npos;
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
bool is_partitioned_implementation = insert_query && insert_query->partition_by && has_wildcards;
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
bool is_partitioned_implementation = partition_by_ast && has_wildcards;
if (is_partitioned_implementation)
{
return std::make_shared<PartitionedStorageS3Sink>(
insert_query->partition_by,
partition_by_ast,
format_name,
sample_block,
local_context,
@ -833,6 +749,10 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
auto max_single_part_upload_size = args.getLocalContext()->getSettingsRef().s3_max_single_part_upload_size;
auto max_connections = args.getLocalContext()->getSettingsRef().s3_max_connections;
ASTPtr partition_by;
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
return StorageS3::create(
s3_uri,
configuration.access_key_id,
@ -848,10 +768,13 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
args.comment,
args.getContext(),
format_settings,
configuration.compression_method);
configuration.compression_method,
/* distributed_processing_ */false,
partition_by);
},
{
.supports_settings = true,
.supports_sort_order = true, // for partition by
.source_access_type = AccessType::S3,
});
}

View File

@ -118,7 +118,8 @@ public:
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const String & compression_method_ = "",
bool distributed_processing_ = false);
bool distributed_processing_ = false,
ASTPtr partition_by_ = nullptr);
String getName() const override
{
@ -169,6 +170,7 @@ private:
String name;
const bool distributed_processing;
std::optional<FormatSettings> format_settings;
ASTPtr partition_by;
static void updateClientAndAuthSettings(ContextPtr, ClientAuthentication &);
};

View File

@ -4,6 +4,7 @@
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTInsertQuery.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromHTTP.h>
@ -15,7 +16,9 @@
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IInputFormat.h>
#include <Common/parseRemoteDescription.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Storages/PartitionedSink.h>
#include <Poco/Net/HTTPRequest.h>
#include <Processors/Sources/SourceWithProgress.h>
@ -36,7 +39,7 @@ namespace ErrorCodes
IStorageURLBase::IStorageURLBase(
const Poco::URI & uri_,
const String & uri_,
ContextPtr /*context_*/,
const StorageID & table_id_,
const String & format_name_,
@ -45,8 +48,17 @@ IStorageURLBase::IStorageURLBase(
const ConstraintsDescription & constraints_,
const String & comment,
const String & compression_method_,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_)
: IStorage(table_id_), uri(uri_), compression_method(compression_method_), format_name(format_name_), format_settings(format_settings_), headers(headers_)
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_,
const String & http_method_,
ASTPtr partition_by_)
: IStorage(table_id_)
, uri(uri_)
, compression_method(compression_method_)
, format_name(format_name_)
, format_settings(format_settings_)
, headers(headers_)
, http_method(http_method_)
, partition_by(partition_by_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
@ -80,14 +92,15 @@ namespace
return headers;
}
class StorageURLSource : public SourceWithProgress
{
using URIParams = std::vector<std::pair<String, String>>;
public:
StorageURLSource(
const std::vector<Poco::URI> & uri_options,
const std::string & method,
const std::vector<String> & uri_options,
const std::string & http_method,
std::function<void(std::ostream &)> callback,
const String & format,
const std::optional<FormatSettings> & format_settings,
@ -109,7 +122,7 @@ namespace
WriteBufferFromOwnString error_message;
for (auto option = uri_options.begin(); option < uri_options.end(); ++option)
{
auto request_uri = *option;
auto request_uri = Poco::URI(*option);
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
@ -118,7 +131,7 @@ namespace
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
request_uri,
method,
http_method,
callback,
timeouts,
context->getSettingsRef().max_http_get_redirects,
@ -137,7 +150,7 @@ namespace
if (option == uri_options.end() - 1)
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri options are unreachable. {}", error_message.str());
error_message << option->toString() << " error: " << getCurrentExceptionMessage(false) << "\n";
error_message << *option << " error: " << getCurrentExceptionMessage(false) << "\n";
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
@ -193,17 +206,18 @@ namespace
}
StorageURLSink::StorageURLSink(
const Poco::URI & uri,
const String & uri,
const String & format,
const std::optional<FormatSettings> & format_settings,
const Block & sample_block,
ContextPtr context,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
const CompressionMethod compression_method,
const String & http_method)
: SinkToStorage(sample_block)
{
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts),
std::make_unique<WriteBufferFromHTTP>(Poco::URI(uri), http_method, timeouts),
compression_method, 3);
writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block,
context, {} /* write callback */, format_settings);
@ -228,6 +242,50 @@ void StorageURLSink::onFinish()
write_buf->finalize();
}
class PartitionedStorageURLSink : public PartitionedSink
{
public:
PartitionedStorageURLSink(
const ASTPtr & partition_by,
const String & uri_,
const String & format_,
const std::optional<FormatSettings> & format_settings_,
const Block & sample_block_,
ContextPtr context_,
const ConnectionTimeouts & timeouts_,
const CompressionMethod compression_method_,
const String & http_method_)
: PartitionedSink(partition_by, context_, sample_block_)
, uri(uri_)
, format(format_)
, format_settings(format_settings_)
, sample_block(sample_block_)
, context(context_)
, timeouts(timeouts_)
, compression_method(compression_method_)
, http_method(http_method_)
{
}
SinkPtr createSinkForPartition(const String & partition_id) override
{
auto partition_path = PartitionedSink::replaceWildcards(uri, partition_id);
context->getRemoteHostFilter().checkURL(Poco::URI(partition_path));
return std::make_shared<StorageURLSink>(partition_path, format,
format_settings, sample_block, context, timeouts, compression_method, http_method);
}
private:
const String uri;
const String format;
const std::optional<FormatSettings> format_settings;
const Block sample_block;
ContextPtr context;
const ConnectionTimeouts timeouts;
const CompressionMethod compression_method;
const String http_method;
};
std::string IStorageURLBase::getReadMethod() const
{
@ -267,22 +325,59 @@ Pipe IStorageURLBase::read(
unsigned /*num_streams*/)
{
auto params = getReadURIParams(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size);
std::vector<Poco::URI> uri_options{uri};
return Pipe(std::make_shared<StorageURLSource>(
uri_options,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
getHeaderBlock(column_names, metadata_snapshot),
local_context,
metadata_snapshot->getColumns(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params));
bool with_globs = (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos)
|| uri.find('|') != std::string::npos;
if (with_globs)
{
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
std::vector<String> url_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
std::vector<String> uri_options;
Pipes pipes;
for (const auto & url_description : url_descriptions)
{
/// For each uri (which acts like shard) check if it has failover options
uri_options = parseRemoteDescription(url_description, 0, url_description.size(), '|', max_addresses);
StoragePtr shard;
pipes.emplace_back(std::make_shared<StorageURLSource>(
uri_options,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
getHeaderBlock(column_names, metadata_snapshot),
local_context,
metadata_snapshot->getColumns(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params));
}
return Pipe::unitePipes(std::move(pipes));
}
else
{
std::vector<String> uri_options{uri};
return Pipe(std::make_shared<StorageURLSource>(
uri_options,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
getHeaderBlock(column_names, metadata_snapshot),
local_context,
metadata_snapshot->getColumns(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params));
}
}
@ -296,6 +391,7 @@ Pipe StorageURLWithFailover::read(
unsigned /*num_streams*/)
{
auto params = getReadURIParams(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size);
auto pipe = Pipe(std::make_shared<StorageURLSource>(
uri_options,
getReadMethod(),
@ -316,16 +412,36 @@ Pipe StorageURLWithFailover::read(
}
SinkToStoragePtr IStorageURLBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
return std::make_shared<StorageURLSink>(uri, format_name,
format_settings, metadata_snapshot->getSampleBlock(), context,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(uri.toString(), compression_method));
if (http_method.empty())
http_method = Poco::Net::HTTPRequest::HTTP_POST;
bool has_wildcards = uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
bool is_partitioned_implementation = partition_by_ast && has_wildcards;
if (is_partitioned_implementation)
{
return std::make_shared<PartitionedStorageURLSink>(
partition_by_ast,
uri, format_name,
format_settings, metadata_snapshot->getSampleBlock(), context,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(uri, compression_method), http_method);
}
else
{
return std::make_shared<StorageURLSink>(uri, format_name,
format_settings, metadata_snapshot->getSampleBlock(), context,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(uri, compression_method), http_method);
}
}
StorageURL::StorageURL(
const Poco::URI & uri_,
const String & uri_,
const StorageID & table_id_,
const String & format_name_,
const std::optional<FormatSettings> & format_settings_,
@ -334,10 +450,13 @@ StorageURL::StorageURL(
const String & comment,
ContextPtr context_,
const String & compression_method_,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_)
: IStorageURLBase(uri_, context_, table_id_, format_name_, format_settings_, columns_, constraints_, comment, compression_method_, headers_)
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_,
const String & http_method_,
ASTPtr partition_by_)
: IStorageURLBase(uri_, context_, table_id_, format_name_, format_settings_,
columns_, constraints_, comment, compression_method_, headers_, http_method_, partition_by_)
{
context_->getRemoteHostFilter().checkURL(uri);
context_->getRemoteHostFilter().checkURL(Poco::URI(uri));
}
@ -350,14 +469,14 @@ StorageURLWithFailover::StorageURLWithFailover(
const ConstraintsDescription & constraints_,
ContextPtr context_,
const String & compression_method_)
: StorageURL(Poco::URI(), table_id_, format_name_, format_settings_, columns_, constraints_, String{}, context_, compression_method_)
: StorageURL("", table_id_, format_name_, format_settings_, columns_, constraints_, String{}, context_, compression_method_)
{
for (const auto & uri_option : uri_options_)
{
Poco::URI poco_uri(uri_option);
context_->getRemoteHostFilter().checkURL(poco_uri);
uri_options.emplace_back(std::move(poco_uri));
LOG_DEBUG(&Poco::Logger::get("StorageURLDistributed"), "Adding URL option: {}", uri_option);
uri_options.emplace_back(std::move(uri_option));
}
}
@ -406,6 +525,13 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
if (!configuration.http_method.empty()
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_POST
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_PUT)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Http method can be POST or PUT (current: {}). For insert default is POST, for select GET",
configuration.http_method);
if (!storage_specific_args.empty())
{
String illegal_args;
@ -415,14 +541,15 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex
illegal_args += ", ";
illegal_args += arg.first;
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown arguments {} for table function URL", illegal_args);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown argument `{}` for storage URL", illegal_args);
}
}
else
{
if (args.size() != 2 && args.size() != 3)
throw Exception(
"Storage URL requires 2 or 3 arguments: url, name of used format and optional compression method.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
"Storage URL requires 2 or 3 arguments: url, name of used format and optional compression method.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, local_context);
@ -444,7 +571,6 @@ void registerStorageURL(StorageFactory & factory)
ASTs & engine_args = args.engine_args;
auto configuration = StorageURL::getConfiguration(engine_args, args.getLocalContext());
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
Poco::URI uri(configuration.url);
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers)
@ -453,8 +579,12 @@ void registerStorageURL(StorageFactory & factory)
headers.emplace_back(std::make_pair(header, value_literal));
}
ASTPtr partition_by;
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
return StorageURL::create(
uri,
configuration.url,
args.table_id,
configuration.format,
format_settings,
@ -463,7 +593,9 @@ void registerStorageURL(StorageFactory & factory)
args.comment,
args.getContext(),
configuration.compression_method,
headers);
headers,
configuration.http_method,
partition_by);
},
{
.supports_settings = true,

View File

@ -39,9 +39,11 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
bool supportsPartitionBy() const override { return true; }
protected:
IStorageURLBase(
const Poco::URI & uri_,
const String & uri_,
ContextPtr context_,
const StorageID & id_,
const String & format_name_,
@ -50,9 +52,11 @@ protected:
const ConstraintsDescription & constraints_,
const String & comment,
const String & compression_method_,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {});
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {},
const String & method_ = "",
ASTPtr partition_by = nullptr);
Poco::URI uri;
String uri;
String compression_method;
String format_name;
// For URL engine, we use format settings from server context + `SETTINGS`
@ -61,6 +65,8 @@ protected:
// In this case, format_settings is not set.
std::optional<FormatSettings> format_settings;
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
String http_method; /// For insert can choose Put instead of default Post.
ASTPtr partition_by;
virtual std::string getReadMethod() const;
@ -88,13 +94,14 @@ class StorageURLSink : public SinkToStorage
{
public:
StorageURLSink(
const Poco::URI & uri,
const String & uri,
const String & format,
const std::optional<FormatSettings> & format_settings,
const Block & sample_block,
ContextPtr context,
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method);
CompressionMethod compression_method,
const String & method = Poco::Net::HTTPRequest::HTTP_POST);
std::string getName() const override { return "StorageURLSink"; }
void consume(Chunk chunk) override;
@ -112,7 +119,7 @@ class StorageURL : public shared_ptr_helper<StorageURL>, public IStorageURLBase
friend struct shared_ptr_helper<StorageURL>;
public:
StorageURL(
const Poco::URI & uri_,
const String & uri_,
const StorageID & table_id_,
const String & format_name_,
const std::optional<FormatSettings> & format_settings_,
@ -121,7 +128,9 @@ public:
const String & comment,
ContextPtr context_,
const String & compression_method_,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {});
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {},
const String & method_ = "",
ASTPtr partition_by_ = nullptr);
String getName() const override
{
@ -170,6 +179,6 @@ public:
};
private:
std::vector<Poco::URI> uri_options;
std::vector<String> uri_options;
};
}

View File

@ -33,7 +33,7 @@ StorageXDBC::StorageXDBC(
const BridgeHelperPtr bridge_helper_)
/// Please add support for constraints as soon as StorageODBC or JDBC will support insertion.
: IStorageURLBase(
Poco::URI(),
"",
context_,
table_id_,
IXDBCBridgeHelper::DEFAULT_FORMAT,
@ -47,7 +47,7 @@ StorageXDBC::StorageXDBC(
, remote_table_name(remote_table_name_)
, log(&Poco::Logger::get("Storage" + bridge_helper->getName()))
{
uri = bridge_helper->getMainURI();
uri = bridge_helper->getMainURI().toString();
}
std::string StorageXDBC::getReadMethod() const
@ -118,7 +118,7 @@ SinkToStoragePtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageMetad
{
bridge_helper->startBridgeSync();
Poco::URI request_uri = uri;
auto request_uri = Poco::URI(uri);
request_uri.setPath("/write");
auto url_params = bridge_helper->getURLParams(65536);
@ -131,13 +131,13 @@ SinkToStoragePtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageMetad
request_uri.addQueryParameter("sample_block", metadata_snapshot->getSampleBlock().getNamesAndTypesList().toString());
return std::make_shared<StorageURLSink>(
request_uri,
request_uri.toString(),
format_name,
getFormatSettings(local_context),
metadata_snapshot->getSampleBlock(),
local_context,
ConnectionTimeouts::getHTTPTimeouts(local_context),
chooseCompressionMethod(uri.toString(), compression_method));
chooseCompressionMethod(uri, compression_method));
}
Block StorageXDBC::getHeaderBlock(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const

View File

@ -41,7 +41,6 @@ public:
std::string getName() const override;
private:
BridgeHelperPtr bridge_helper;
std::string remote_database_name;
std::string remote_table_name;

View File

@ -24,12 +24,18 @@ void TableFunctionURL::parseArguments(const ASTPtr & ast_function, ContextPtr co
if (!func_args.arguments)
throw Exception("Table function 'URL' must have arguments.", ErrorCodes::BAD_ARGUMENTS);
URLBasedDataSourceConfiguration configuration;
if (auto with_named_collection = getURLBasedDataSourceConfiguration(func_args.arguments->children, context))
{
auto [common_configuration, storage_specific_args] = with_named_collection.value();
configuration.set(common_configuration);
if (!configuration.http_method.empty()
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_POST
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_PUT)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Method can be POST or PUT (current: {}). For insert default is POST, for select GET",
configuration.http_method);
if (!storage_specific_args.empty())
{
String illegal_args;
@ -39,7 +45,7 @@ void TableFunctionURL::parseArguments(const ASTPtr & ast_function, ContextPtr co
illegal_args += ", ";
illegal_args += arg.first;
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown arguments {} for table function URL", illegal_args);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown argument `{}` for table function URL", illegal_args);
}
filename = configuration.url;
@ -58,33 +64,25 @@ StoragePtr TableFunctionURL::getStorage(
const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context,
const std::string & table_name, const String & compression_method_) const
{
/// If url contains {1..k} or failover options with separator `|`, use a separate storage
if ((source.find('{') == std::string::npos || source.find('}') == std::string::npos) && source.find('|') == std::string::npos)
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers)
{
Poco::URI uri(source);
return StorageURL::create(
uri,
StorageID(getDatabaseName(), table_name),
format_,
std::nullopt /*format settings*/,
columns,
ConstraintsDescription{},
String{},
global_context,
compression_method_);
}
else
{
return StorageExternalDistributed::create(
source,
StorageID(getDatabaseName(), table_name),
format_,
std::nullopt,
compression_method_,
columns,
ConstraintsDescription{},
global_context);
auto value_literal = value.safeGet<String>();
headers.emplace_back(std::make_pair(header, value_literal));
}
return StorageURL::create(
source,
StorageID(getDatabaseName(), table_name),
format_,
std::nullopt /*format settings*/,
columns,
ConstraintsDescription{},
String{},
global_context,
compression_method_,
headers,
configuration.http_method);
}
void registerTableFunctionURL(TableFunctionFactory & factory)

View File

@ -1,6 +1,7 @@
#pragma once
#include <TableFunctions/ITableFunctionFileLike.h>
#include <Storages/ExternalDataSourceConfiguration.h>
namespace DB
@ -27,6 +28,8 @@ private:
const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context,
const std::string & table_name, const String & compression_method_) const override;
const char * getStorageTypeName() const override { return "URL"; }
URLBasedDataSourceConfiguration configuration;
};
}

View File

@ -3,264 +3,5 @@
"test_host_ip_change/test.py::test_ip_change_drop_dns_cache",
"test_host_ip_change/test.py::test_ip_change_update_dns_cache",
"test_host_ip_change/test.py::test_user_access_ip_change[node0]",
"test_host_ip_change/test.py::test_user_access_ip_change[node1]",
"test_atomic_drop_table/test.py::test_atomic_delete_with_stopped_zookeeper",
"test_attach_without_fetching/test.py::test_attach_without_fetching",
"test_broken_part_during_merge/test.py::test_merge_and_part_corruption",
"test_cleanup_dir_after_bad_zk_conn/test.py::test_attach_without_zk",
"test_cleanup_dir_after_bad_zk_conn/test.py::test_cleanup_dir_after_bad_zk_conn",
"test_cleanup_dir_after_bad_zk_conn/test.py::test_cleanup_dir_after_wrong_replica_name",
"test_cleanup_dir_after_bad_zk_conn/test.py::test_cleanup_dir_after_wrong_zk_path",
"test_consistent_parts_after_clone_replica/test.py::test_inconsistent_parts_if_drop_while_replica_not_active",
"test_cross_replication/test.py::test",
"test_ddl_worker_non_leader/test.py::test_non_leader_replica",
"test_delayed_replica_failover/test.py::test",
"test_dictionaries_update_field/test.py::test_update_field[complex_key_hashed_update_field_dictionary-HASHED]",
"test_dictionaries_update_field/test.py::test_update_field[flat_update_field_dictionary-FLAT]",
"test_dictionaries_update_field/test.py::test_update_field[simple_key_hashed_update_field_dictionary-HASHED]",
"test_dictionary_allow_read_expired_keys/test_default_reading.py::test_default_reading",
"test_dictionary_allow_read_expired_keys/test_default_string.py::test_return_real_values",
"test_dictionary_allow_read_expired_keys/test_dict_get_or_default.py::test_simple_dict_get_or_default",
"test_dictionary_allow_read_expired_keys/test_dict_get.py::test_simple_dict_get",
"test_disabled_mysql_server/test.py::test_disabled_mysql_server",
"test_distributed_ddl_on_cross_replication/test.py::test_alter_ddl",
"test_distributed_ddl_on_cross_replication/test.py::test_atomic_database",
"test_distributed_ddl_parallel/test.py::test_all_in_parallel",
"test_distributed_ddl_parallel/test.py::test_slow_dict_load_7",
"test_distributed_ddl_parallel/test.py::test_smoke",
"test_distributed_ddl_parallel/test.py::test_smoke_parallel",
"test_distributed_ddl_parallel/test.py::test_smoke_parallel_dict_reload",
"test_distributed_ddl_parallel/test.py::test_two_in_parallel_two_queued",
"test_distributed_ddl_password/test.py::test_alter",
"test_distributed_ddl_password/test.py::test_truncate",
"test_distributed_ddl/test.py::test_allowed_databases[configs]",
"test_distributed_ddl/test.py::test_allowed_databases[configs_secure]",
"test_distributed_ddl/test.py::test_create_as_select[configs]",
"test_distributed_ddl/test.py::test_create_as_select[configs_secure]",
"test_distributed_ddl/test.py::test_create_reserved[configs]",
"test_distributed_ddl/test.py::test_create_reserved[configs_secure]",
"test_distributed_ddl/test.py::test_create_view[configs]",
"test_distributed_ddl/test.py::test_create_view[configs_secure]",
"test_distributed_ddl/test.py::test_default_database[configs]",
"test_distributed_ddl/test.py::test_default_database[configs_secure]",
"test_distributed_ddl/test.py::test_detach_query[configs]",
"test_distributed_ddl/test.py::test_detach_query[configs_secure]",
"test_distributed_ddl/test.py::test_implicit_macros[configs]",
"test_distributed_ddl/test.py::test_implicit_macros[configs_secure]",
"test_distributed_ddl/test.py::test_kill_query[configs]",
"test_distributed_ddl/test.py::test_kill_query[configs_secure]",
"test_distributed_ddl/test.py::test_macro[configs]",
"test_distributed_ddl/test.py::test_macro[configs_secure]",
"test_distributed_ddl/test.py::test_on_connection_loss[configs]",
"test_distributed_ddl/test.py::test_on_connection_loss[configs_secure]",
"test_distributed_ddl/test.py::test_on_server_fail[configs]",
"test_distributed_ddl/test.py::test_on_server_fail[configs_secure]",
"test_distributed_ddl/test.py::test_on_session_expired[configs]",
"test_distributed_ddl/test.py::test_on_session_expired[configs_secure]",
"test_distributed_ddl/test.py::test_optimize_query[configs]",
"test_distributed_ddl/test.py::test_optimize_query[configs_secure]",
"test_distributed_ddl/test.py::test_rename[configs]",
"test_distributed_ddl/test.py::test_rename[configs_secure]",
"test_distributed_ddl/test.py::test_replicated_without_arguments[configs]",
"test_distributed_ddl/test.py::test_replicated_without_arguments[configs_secure]",
"test_distributed_ddl/test.py::test_simple_alters[configs]",
"test_distributed_ddl/test.py::test_simple_alters[configs_secure]",
"test_distributed_ddl/test.py::test_socket_timeout[configs]",
"test_distributed_ddl/test.py::test_socket_timeout[configs_secure]",
"test_distributed_ddl/test_replicated_alter.py::test_replicated_alters[configs]",
"test_distributed_ddl/test_replicated_alter.py::test_replicated_alters[configs_secure]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs-default-node1-distributed]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs-default-node1-remote]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs-default-node2-distributed]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs-default-node2-remote]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs-ready_to_wait-node1-distributed]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs-ready_to_wait-node1-remote]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs-ready_to_wait-node2-distributed]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs-ready_to_wait-node2-remote]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs_secure-default-node1-distributed]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs_secure-default-node1-remote]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs_secure-default-node2-distributed]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs_secure-default-node2-remote]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs_secure-ready_to_wait-node1-distributed]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs_secure-ready_to_wait-node1-remote]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs_secure-ready_to_wait-node2-distributed]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs_secure-ready_to_wait-node2-remote]",
"test_drop_replica/test.py::test_drop_replica",
"test_hedged_requests_parallel/test.py::test_combination1",
"test_hedged_requests_parallel/test.py::test_combination2",
"test_hedged_requests_parallel/test.py::test_query_with_no_data_to_sample",
"test_hedged_requests_parallel/test.py::test_send_data",
"test_hedged_requests_parallel/test.py::test_send_table_status_sleep",
"test_hedged_requests/test.py::test_combination1",
"test_hedged_requests/test.py::test_combination2",
"test_hedged_requests/test.py::test_combination3",
"test_hedged_requests/test.py::test_combination4",
"test_hedged_requests/test.py::test_long_query",
"test_hedged_requests/test.py::test_receive_timeout1",
"test_hedged_requests/test.py::test_receive_timeout2",
"test_hedged_requests/test.py::test_send_data",
"test_hedged_requests/test.py::test_send_data2",
"test_hedged_requests/test.py::test_send_table_status_sleep",
"test_hedged_requests/test.py::test_send_table_status_sleep2",
"test_hedged_requests/test.py::test_stuck_replica",
"test_https_replication/test.py::test_both_http",
"test_https_replication/test.py::test_both_https",
"test_https_replication/test.py::test_mixed_protocol",
"test_https_replication/test.py::test_replication_after_partition",
"test_insert_into_distributed_sync_async/test.py::test_async_inserts_into_local_shard",
"test_insert_into_distributed_sync_async/test.py::test_insertion_sync",
"test_insert_into_distributed_sync_async/test.py::test_insertion_sync_fails_with_timeout",
"test_insert_into_distributed_sync_async/test.py::test_insertion_sync_with_disabled_timeout",
"test_insert_into_distributed_sync_async/test.py::test_insertion_without_sync_ignores_timeout",
"test_insert_into_distributed/test.py::test_inserts_batching",
"test_insert_into_distributed/test.py::test_inserts_local",
"test_insert_into_distributed/test.py::test_inserts_low_cardinality",
"test_insert_into_distributed/test.py::test_inserts_single_replica_internal_replication",
"test_insert_into_distributed/test.py::test_inserts_single_replica_local_internal_replication",
"test_insert_into_distributed/test.py::test_inserts_single_replica_no_internal_replication",
"test_insert_into_distributed/test.py::test_prefer_localhost_replica",
"test_insert_into_distributed/test.py::test_reconnect",
"test_insert_into_distributed/test.py::test_table_function",
"test_insert_into_distributed_through_materialized_view/test.py::test_inserts_local",
"test_insert_into_distributed_through_materialized_view/test.py::test_reconnect",
"test_keeper_multinode_blocade_leader/test.py::test_blocade_leader",
"test_keeper_multinode_blocade_leader/test.py::test_blocade_leader_twice",
"test_keeper_multinode_simple/test.py::test_follower_restart",
"test_keeper_multinode_simple/test.py::test_read_write_multinode",
"test_keeper_multinode_simple/test.py::test_session_expiration",
"test_keeper_multinode_simple/test.py::test_simple_replicated_table",
"test_keeper_multinode_simple/test.py::test_watch_on_follower",
"test_limited_replicated_fetches/test.py::test_limited_fetches",
"test_materialized_mysql_database/test.py::test_clickhouse_killed_while_insert_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_clickhouse_killed_while_insert_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_clickhouse_killed_while_insert_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_clickhouse_killed_while_insert_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_insert_with_modify_binlog_checksum_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_insert_with_modify_binlog_checksum_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_insert_with_modify_binlog_checksum_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_insert_with_modify_binlog_checksum_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_mysql_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_mysql_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_mysql_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_mysql_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_dml_with_mysql_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_dml_with_mysql_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_dml_with_mysql_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_dml_with_mysql_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_err_sync_user_privs_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_err_sync_user_privs_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_err_sync_user_privs_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_err_sync_user_privs_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_multi_table_update[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_multi_table_update[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_mysql_killed_while_insert_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_mysql_killed_while_insert_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_mysql_killed_while_insert_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_mysql_killed_while_insert_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_mysql_kill_sync_thread_restore_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_mysql_kill_sync_thread_restore_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_mysql_kill_sync_thread_restore_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_mysql_kill_sync_thread_restore_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_mysql_settings[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_mysql_settings[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_network_partition_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_network_partition_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_network_partition_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_network_partition_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_select_without_columns_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_select_without_columns_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_select_without_columns_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_select_without_columns_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_system_parts_table[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_system_parts_table[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_system_tables_table[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_system_tables_table[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_materialize_with_column_comments[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_materialize_with_column_comments[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_materialize_with_enum[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_materialize_with_enum[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_utf8mb4[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_utf8mb4[clickhouse_node1]",
"test_parts_delete_zookeeper/test.py::test_merge_doesnt_work_without_zookeeper",
"test_polymorphic_parts/test.py::test_compact_parts_only",
"test_polymorphic_parts/test.py::test_different_part_types_on_replicas[polymorphic_table_compact-Compact]",
"test_polymorphic_parts/test.py::test_different_part_types_on_replicas[polymorphic_table_wide-Wide]",
"test_polymorphic_parts/test.py::test_in_memory",
"test_polymorphic_parts/test.py::test_in_memory_alters",
"test_polymorphic_parts/test.py::test_in_memory_deduplication",
"test_polymorphic_parts/test.py::test_in_memory_wal_rotate",
"test_polymorphic_parts/test.py::test_polymorphic_parts_basics[first_node0-second_node0]",
"test_polymorphic_parts/test.py::test_polymorphic_parts_basics[first_node1-second_node1]",
"test_polymorphic_parts/test.py::test_polymorphic_parts_index",
"test_polymorphic_parts/test.py::test_polymorphic_parts_non_adaptive",
"test_quorum_inserts_parallel/test.py::test_parallel_quorum_actually_parallel",
"test_quorum_inserts_parallel/test.py::test_parallel_quorum_actually_quorum",
"test_random_inserts/test.py::test_insert_multithreaded",
"test_random_inserts/test.py::test_random_inserts",
"test_reload_clusters_config/test.py::test_add_cluster",
"test_reload_clusters_config/test.py::test_delete_cluster",
"test_reload_clusters_config/test.py::test_simple_reload",
"test_reload_clusters_config/test.py::test_update_one_cluster",
"test_replace_partition/test.py::test_drop_failover",
"test_replace_partition/test.py::test_normal_work",
"test_replace_partition/test.py::test_replace_after_replace_failover",
"test_replicated_database/test.py::test_alters_from_different_replicas",
"test_replicated_database/test.py::test_create_replicated_table",
"test_replicated_database/test.py::test_recover_staled_replica",
"test_replicated_database/test.py::test_simple_alter_table[MergeTree]",
"test_replicated_database/test.py::test_simple_alter_table[ReplicatedMergeTree]",
"test_replicated_database/test.py::test_startup_without_zk",
"test_replicated_fetches_timeouts/test.py::test_no_stall",
"test_storage_kafka/test.py::test_bad_reschedule",
"test_storage_kafka/test.py::test_commits_of_unprocessed_messages_on_drop",
"test_storage_kafka/test.py::test_exception_from_destructor",
"test_storage_kafka/test.py::test_kafka_commit_on_block_write",
"test_storage_kafka/test.py::test_kafka_consumer_hang",
"test_storage_kafka/test.py::test_kafka_consumer_hang2",
"test_storage_kafka/test.py::test_kafka_csv_with_delimiter",
"test_storage_kafka/test.py::test_kafka_csv_with_thread_per_consumer",
"test_storage_kafka/test.py::test_kafka_duplicates_when_commit_failed",
"test_storage_kafka/test.py::test_kafka_engine_put_errors_to_stream",
"test_storage_kafka/test.py::test_kafka_engine_put_errors_to_stream_with_random_malformed_json",
"test_storage_kafka/test.py::test_kafka_flush_by_block_size",
"test_storage_kafka/test.py::test_kafka_flush_by_time",
"test_storage_kafka/test.py::test_kafka_flush_on_big_message",
"test_storage_kafka/test.py::test_kafka_formats",
"test_storage_kafka/test.py::test_kafka_formats_with_broken_message",
"test_storage_kafka/test.py::test_kafka_insert",
"test_storage_kafka/test.py::test_kafka_issue11308",
"test_storage_kafka/test.py::test_kafka_issue14202",
"test_storage_kafka/test.py::test_kafka_issue4116",
"test_storage_kafka/test.py::test_kafka_json_as_string",
"test_storage_kafka/test.py::test_kafka_json_without_delimiter",
"test_storage_kafka/test.py::test_kafka_lot_of_partitions_partial_commit_of_bulk",
"test_storage_kafka/test.py::test_kafka_many_materialized_views",
"test_storage_kafka/test.py::test_kafka_materialized_view",
"test_storage_kafka/test.py::test_kafka_materialized_view_with_subquery",
"test_storage_kafka/test.py::test_kafka_no_holes_when_write_suffix_failed",
"test_storage_kafka/test.py::test_kafka_produce_consume",
"test_storage_kafka/test.py::test_kafka_produce_key_timestamp",
"test_storage_kafka/test.py::test_kafka_protobuf",
"test_storage_kafka/test.py::test_kafka_protobuf_no_delimiter",
"test_storage_kafka/test.py::test_kafka_rebalance",
"test_storage_kafka/test.py::test_kafka_select_empty",
"test_storage_kafka/test.py::test_kafka_settings_new_syntax",
"test_storage_kafka/test.py::test_kafka_settings_old_syntax",
"test_storage_kafka/test.py::test_kafka_string_field_on_first_position_in_protobuf",
"test_storage_kafka/test.py::test_kafka_tsv_with_delimiter",
"test_storage_kafka/test.py::test_kafka_unavailable",
"test_storage_kafka/test.py::test_kafka_virtual_columns",
"test_storage_kafka/test.py::test_kafka_virtual_columns2",
"test_storage_kafka/test.py::test_kafka_virtual_columns_with_materialized_view",
"test_storage_kafka/test.py::test_librdkafka_compression",
"test_storage_kafka/test.py::test_premature_flush_on_eof",
"test_storage_kerberized_kafka/test.py::test_kafka_json_as_string",
"test_storage_kerberized_kafka/test.py::test_kafka_json_as_string_no_kdc",
"test_system_clusters_actual_information/test.py::test",
"test_system_metrics/test.py::test_readonly_metrics",
"test_system_replicated_fetches/test.py::test_system_replicated_fetches"
"test_host_ip_change/test.py::test_user_access_ip_change[node1]"
]

View File

@ -257,6 +257,34 @@ def test_truncate_table(started_cluster):
node1.query("drop table test_truncate")
def test_partition_by(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
file_name = "test_{_partition_id}"
partition_by = "column3"
values = "(1, 2, 3), (3, 2, 1), (1, 3, 2)"
table_function = f"hdfs('hdfs://hdfs1:9000/{file_name}', 'TSV', '{table_format}')"
node1.query(f"insert into table function {table_function} PARTITION BY {partition_by} values {values}")
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/test_1', 'TSV', '{table_format}')")
assert(result.strip() == "3\t2\t1")
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/test_2', 'TSV', '{table_format}')")
assert(result.strip() == "1\t3\t2")
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/test_3', 'TSV', '{table_format}')")
assert(result.strip() == "1\t2\t3")
file_name = "test2_{_partition_id}"
node1.query(f"create table p(column1 UInt32, column2 UInt32, column3 UInt32) engine = HDFS('hdfs://hdfs1:9000/{file_name}', 'TSV') partition by column3")
node1.query(f"insert into p values {values}")
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/test2_1', 'TSV', '{table_format}')")
assert(result.strip() == "3\t2\t1")
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/test2_2', 'TSV', '{table_format}')")
assert(result.strip() == "1\t3\t2")
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/test2_3', 'TSV', '{table_format}')")
assert(result.strip() == "1\t2\t3")
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -163,6 +163,13 @@ def test_partition_by(started_cluster):
assert "3,2,1\n" == get_s3_file_content(started_cluster, bucket, "test_1.csv")
assert "78,43,45\n" == get_s3_file_content(started_cluster, bucket, "test_45.csv")
filename = "test2_{_partition_id}.csv"
instance.query(f"create table p ({table_format}) engine=S3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{filename}', 'CSV') partition by column3")
instance.query(f"insert into p values {values}")
assert "1,2,3\n" == get_s3_file_content(started_cluster, bucket, "test2_3.csv")
assert "3,2,1\n" == get_s3_file_content(started_cluster, bucket, "test2_1.csv")
assert "78,43,45\n" == get_s3_file_content(started_cluster, bucket, "test2_45.csv")
def test_partition_by_string_column(started_cluster):
bucket = started_cluster.minio_bucket

View File

@ -0,0 +1,11 @@
<?xml version="1.0"?>
<clickhouse>
<named_collections>
<url1>
<url>http://nginx:80/test_{_partition_id}</url>
<method>PUT</method>
<format>TSV</format>
<structure>column1 UInt32, column2 UInt32, column3 UInt32</structure>
</url1>
</named_collections>
</clickhouse>

View File

@ -0,0 +1,29 @@
import pytest
from helpers.cluster import ClickHouseCluster
uuids = []
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node1", main_configs=["configs/conf.xml"], with_nginx=True)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_partition_by(cluster):
node1 = cluster.instances["node1"]
node1.query(f"insert into table function url(url1) partition by column3 values (1, 2, 3), (3, 2, 1), (1, 3, 2)")
result = node1.query(f"select * from url('http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')")
assert(result.strip() == "3\t2\t1")
result = node1.query(f"select * from url('http://nginx:80/test_2', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')")
assert(result.strip() == "1\t3\t2")
result = node1.query(f"select * from url('http://nginx:80/test_3', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')")
assert(result.strip() == "1\t2\t3")

View File

@ -1,7 +1,6 @@
-- Tags: no-fasttest
-- Tag no-fasttest: needs s3
INSERT INTO TABLE FUNCTION file('foo.csv', 'CSV', 'id Int32, val Int32') PARTITION BY val VALUES (1, 1), (2, 2); -- { serverError NOT_IMPLEMENTED }
INSERT INTO TABLE FUNCTION s3('http://localhost:9001/foo/test_{_partition_id}.csv', 'admin', 'admin', 'CSV', 'id Int32, val String') PARTITION BY val VALUES (1, '\r\n'); -- { serverError CANNOT_PARSE_TEXT }
INSERT INTO TABLE FUNCTION s3('http://localhost:9001/foo/test_{_partition_id}.csv', 'admin', 'admin', 'CSV', 'id Int32, val String') PARTITION BY val VALUES (1, 'abc\x00abc'); -- { serverError CANNOT_PARSE_TEXT }
INSERT INTO TABLE FUNCTION s3('http://localhost:9001/foo/test_{_partition_id}.csv', 'admin', 'admin', 'CSV', 'id Int32, val String') PARTITION BY val VALUES (1, 'abc\xc3\x28abc'); -- { serverError CANNOT_PARSE_TEXT }

View File

@ -0,0 +1,6 @@
part 1
3 2 1
part 2
1 3 2
part 3
1 2 3

View File

@ -0,0 +1,30 @@
#!/usr/bin/env bash
# Tags: no-parallel, no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# See 01658_read_file_to_string_column.sh
user_files_path=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
mkdir -p "${user_files_path}/"
chmod 777 ${user_files_path}
FILE_PATH="${user_files_path}/test_table_function_file"
function cleanup()
{
rm -r ${FILE_PATH}
}
trap cleanup EXIT
values="(1, 2, 3), (3, 2, 1), (1, 3, 2)"
${CLICKHOUSE_CLIENT} --query="insert into table function file('${FILE_PATH}/test_{_partition_id}', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32') PARTITION BY column3 values ${values}";
echo 'part 1'
${CLICKHOUSE_CLIENT} --query="select * from file('${FILE_PATH}/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')";
echo 'part 2'
${CLICKHOUSE_CLIENT} --query="select * from file('${FILE_PATH}/test_2', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')";
echo 'part 3'
${CLICKHOUSE_CLIENT} --query="select * from file('${FILE_PATH}/test_3', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')";

View File

@ -1,4 +1,4 @@
## Generate changelog
## How To Generate Changelog
Generate github token:
* https://github.com/settings/tokens - keep all checkboxes unchecked, no scopes need to be enabled.
@ -8,6 +8,10 @@ Dependencies:
apt-get install git curl jq python3 python3-fuzzywuzzy
```
Update information about tags:
```
git fetch --tags
```
Usage example:

View File

@ -4,7 +4,7 @@
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Exception.h>
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <libnuraft/nuraft.hxx>
#include <Coordination/KeeperLogStore.h>
#include <Coordination/Changelog.h>
#include <base/logger_useful.h>