Partitoned write

This commit is contained in:
kssenii 2021-10-25 19:23:44 +03:00
parent 5a840c5f75
commit 1d743b9259
10 changed files with 447 additions and 38 deletions

View File

@ -8,6 +8,8 @@ namespace DB
/// Sink which is returned from Storage::write.
class SinkToStorage : public ExceptionKeepingTransform
{
friend class PartitionedSink;
public:
explicit SinkToStorage(const Block & header);
@ -35,4 +37,5 @@ public:
void consume(Chunk) override {}
};
using SinkPtr = std::shared_ptr<SinkToStorage>;
}

View File

@ -2,30 +2,44 @@
#if USE_HDFS
#include <Storages/StorageFactory.h>
#include <Storages/HDFS/StorageHDFS.h>
#include <Common/parseGlobs.h>
#include <DataTypes/DataTypeString.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTInsertQuery.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadHelpers.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Storages/StorageFactory.h>
#include <Storages/HDFS/StorageHDFS.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <IO/WriteHelpers.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/ExternalDataSourceUtils.h>
#include <Storages/PartitionedSink.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IOutputFormat.h>
#include <DataTypes/DataTypeString.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Common/parseGlobs.h>
#include <Functions/FunctionsConversion.h>
#include <QueryPipeline/QueryPipeline.h>
#include <QueryPipeline/Pipe.h>
#include <Poco/URI.h>
#include <re2/re2.h>
#include <re2/stringpiece.h>
#include <hdfs/hdfs.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <QueryPipeline/QueryPipeline.h>
#include <QueryPipeline/Pipe.h>
#include <filesystem>
@ -224,6 +238,43 @@ private:
bool is_first_chunk = true;
};
class PartitionedHDFSSink : public PartitionedSink
{
public:
PartitionedHDFSSink(
const ASTPtr & partition_by,
const String & uri_,
const String & format_,
const Block & sample_block_,
ContextPtr context_,
const CompressionMethod compression_method_)
: PartitionedSink(partition_by, context_, sample_block_)
, uri(uri_)
, format(format_)
, sample_block(sample_block_)
, context(context_)
, compression_method(compression_method_)
{
}
SinkPtr createSinkForPartition(const String & partition_id) override
{
auto path = PartitionedSink::replaceWildcards(uri, partition_id);
PartitionedSink::validatePartitionKey(path, true);
return std::make_shared<HDFSSink>(path, format, sample_block, context, compression_method);
}
private:
const String uri;
const String format;
const Block sample_block;
ContextPtr context;
const CompressionMethod compression_method;
};
/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageFile.
*/
@ -315,14 +366,33 @@ Pipe StorageHDFS::read(
return Pipe::unitePipes(std::move(pipes));
}
SinkToStoragePtr StorageHDFS::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
{
bool has_wildcards = uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
bool is_partitioned_implementation = insert_query && insert_query->partition_by && has_wildcards;
if (is_partitioned_implementation)
{
std::cerr << "partitioned implementation\n";
return std::make_shared<PartitionedHDFSSink>(
insert_query->partition_by,
uri,
format_name,
metadata_snapshot->getSampleBlock(),
getContext(),
chooseCompressionMethod(uri, compression_method));
}
else
{
std::cerr << "non partitioned implementation\n";
return std::make_shared<HDFSSink>(uri,
format_name,
metadata_snapshot->getSampleBlock(),
getContext(),
chooseCompressionMethod(uri, compression_method));
}
}
void StorageHDFS::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr context_, TableExclusiveLockHolder &)
{

View File

@ -38,6 +38,8 @@ public:
NamesAndTypesList getVirtuals() const override;
bool supportsPartitionBy() const override { return true; }
protected:
StorageHDFS(
const String & uri_,

View File

@ -0,0 +1,128 @@
#include "PartitionedSink.h"
#include <Functions/FunctionsConversion.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Processors/Sources/SourceWithProgress.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_TEXT;
}
PartitionedSink::PartitionedSink(
const ASTPtr & partition_by,
ContextPtr context_,
const Block & sample_block_)
: SinkToStorage(sample_block_)
, context(context_)
, sample_block(sample_block_)
{
std::vector<ASTPtr> arguments(1, partition_by);
ASTPtr partition_by_string = makeASTFunction(FunctionToString::name, std::move(arguments));
auto syntax_result = TreeRewriter(context).analyze(partition_by_string, sample_block.getNamesAndTypesList());
partition_by_expr = ExpressionAnalyzer(partition_by_string, syntax_result, context).getActions(false);
partition_by_column_name = partition_by_string->getColumnName();
}
SinkPtr PartitionedSink::getSinkForPartition(const String & partition_id)
{
auto it = sinks.find(partition_id);
if (it == sinks.end())
{
auto sink = createSinkForPartition(partition_id);
std::tie(it, std::ignore) = sinks.emplace(partition_id, sink);
}
return it->second;
}
void PartitionedSink::consume(Chunk chunk)
{
const auto & columns = chunk.getColumns();
Block block_with_partition_by_expr = sample_block.cloneWithoutColumns();
block_with_partition_by_expr.setColumns(columns);
partition_by_expr->execute(block_with_partition_by_expr);
const auto * column = block_with_partition_by_expr.getByName(partition_by_column_name).column.get();
std::unordered_map<String, size_t> sub_chunks_indices;
IColumn::Selector selector;
for (size_t row = 0; row < chunk.getNumRows(); ++row)
{
auto value = column->getDataAt(row);
auto [it, inserted] = sub_chunks_indices.emplace(value, sub_chunks_indices.size());
selector.push_back(it->second);
}
Chunks sub_chunks;
sub_chunks.reserve(sub_chunks_indices.size());
for (size_t column_index = 0; column_index < columns.size(); ++column_index)
{
MutableColumns column_sub_chunks = columns[column_index]->scatter(sub_chunks_indices.size(), selector);
if (column_index == 0) /// Set sizes for sub-chunks.
{
for (const auto & column_sub_chunk : column_sub_chunks)
{
sub_chunks.emplace_back(Columns(), column_sub_chunk->size());
}
}
for (size_t sub_chunk_index = 0; sub_chunk_index < column_sub_chunks.size(); ++sub_chunk_index)
{
sub_chunks[sub_chunk_index].addColumn(std::move(column_sub_chunks[sub_chunk_index]));
}
}
for (const auto & [partition_id, sub_chunk_index] : sub_chunks_indices)
{
getSinkForPartition(partition_id)->consume(std::move(sub_chunks[sub_chunk_index]));
}
}
void PartitionedSink::onFinish()
{
for (auto & [partition_id, sink] : sinks)
{
sink->onFinish();
}
}
void PartitionedSink::validatePartitionKey(const String & str, bool allow_slash)
{
for (const char * i = str.data(); i != str.data() + str.size(); ++i)
{
if (static_cast<UInt8>(*i) < 0x20 || *i == '{' || *i == '}' || *i == '*' || *i == '?' || (!allow_slash && *i == '/'))
{
/// Need to convert to UInt32 because UInt8 can't be passed to format due to "mixing character types is disallowed".
UInt32 invalid_char_byte = static_cast<UInt32>(static_cast<UInt8>(*i));
throw DB::Exception(
ErrorCodes::CANNOT_PARSE_TEXT, "Illegal character '\\x{:02x}' in partition id starting with '{}'",
invalid_char_byte, std::string(str.data(), i - str.data()));
}
}
}
String PartitionedSink::replaceWildcards(const String & haystack, const String & partition_id)
{
return boost::replace_all_copy(haystack, PartitionedSink::PARTITION_ID_WILDCARD, partition_id);
}
}

View File

@ -0,0 +1,40 @@
#include <Processors/Sinks/SinkToStorage.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/Context_fwd.h>
namespace DB
{
class PartitionedSink : public SinkToStorage
{
public:
static constexpr auto PARTITION_ID_WILDCARD = "{_partition_id}";
PartitionedSink(const ASTPtr & partition_by, ContextPtr context_, const Block & sample_block_);
String getName() const override { return "PartitionedSink"; }
void consume(Chunk chunk) override;
void onFinish() override;
virtual SinkPtr createSinkForPartition(const String & partition_id) = 0;
static void validatePartitionKey(const String & str, bool allow_slash);
static String replaceWildcards(const String & haystack, const String & partition_id);
private:
ContextPtr context;
Block sample_block;
ExpressionActionsPtr partition_by_expr;
String partition_by_column_name;
std::unordered_map<String, SinkPtr> sinks;
SinkPtr getSinkForPartition(const String & partition_id);
};
}

View File

@ -7,6 +7,7 @@
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromFileDescriptor.h>
@ -25,6 +26,7 @@
#include <Common/filesystemHelpers.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/PartitionedSink.h>
#include <sys/stat.h>
#include <fcntl.h>
@ -189,6 +191,10 @@ StorageFile::StorageFile(const std::string & table_path_, const std::string & us
: StorageFile(args)
{
is_db_table = false;
bool has_wildcards = table_path_.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
if (has_wildcards)
paths = {table_path_};
else
paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read);
if (args.format_name == "Distributed")
@ -541,22 +547,48 @@ Pipe StorageFile::read(
class StorageFileSink final : public SinkToStorage
{
public:
explicit StorageFileSink(
StorageFileSink(
StorageFile & storage_,
const StorageMetadataPtr & metadata_snapshot_,
std::unique_lock<std::shared_timed_mutex> && lock_,
const CompressionMethod compression_method,
ContextPtr context,
const std::optional<FormatSettings> & format_settings,
int & flags)
const CompressionMethod compression_method_,
ContextPtr context_,
const std::optional<FormatSettings> & format_settings_,
int flags_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, compression_method(compression_method_)
, context(context_)
, format_settings(format_settings_)
, flags(flags_)
{
initialize();
}
StorageFileSink(
StorageFile & storage_,
const StorageMetadataPtr & metadata_snapshot_,
std::unique_lock<std::shared_timed_mutex> && lock_,
const CompressionMethod compression_method_,
ContextPtr context_,
const std::optional<FormatSettings> & format_settings_,
int flags_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, compression_method(compression_method_)
, context(context_)
, format_settings(format_settings_)
, flags(flags_)
, lock(std::move(lock_))
{
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
initialize();
}
void initialize()
{
std::unique_ptr<WriteBufferFromFileDescriptor> naked_buffer = nullptr;
if (storage.use_table_fd)
{
@ -608,14 +640,66 @@ public:
private:
StorageFile & storage;
StorageMetadataPtr metadata_snapshot;
const CompressionMethod compression_method;
ContextPtr context;
std::optional<FormatSettings> format_settings;
int flags;
std::unique_lock<std::shared_timed_mutex> lock;
std::unique_ptr<WriteBuffer> write_buf;
OutputFormatPtr writer;
bool prefix_written{false};
};
class PartitionedStorageFileSink : public PartitionedSink
{
public:
PartitionedStorageFileSink(
const ASTPtr & partition_by,
StorageFile & storage_,
const StorageMetadataPtr & metadata_snapshot_,
std::unique_lock<std::shared_timed_mutex> && lock_,
const CompressionMethod compression_method_,
ContextPtr context_,
const std::optional<FormatSettings> & format_settings_,
int & flags_)
: PartitionedSink(partition_by, context_, metadata_snapshot_->getSampleBlock())
, path(storage_.paths[0])
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, lock(std::move(lock_))
, compression_method(compression_method_)
, context(context_)
, format_settings(format_settings_)
, flags(flags_)
{
}
SinkPtr createSinkForPartition(const String & partition_id) override
{
auto partition_path = PartitionedSink::replaceWildcards(path, partition_id);
PartitionedSink::validatePartitionKey(partition_path, true);
storage.paths[0] = partition_path;
return std::make_shared<StorageFileSink>(
storage, metadata_snapshot, compression_method, context, format_settings, flags);
}
private:
const String path;
StorageFile & storage;
StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_timed_mutex> lock;
const CompressionMethod compression_method;
ContextPtr context;
std::optional<FormatSettings> format_settings;
int flags;
};
SinkToStoragePtr StorageFile::write(
const ASTPtr & /*query*/,
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context)
{
@ -634,6 +718,29 @@ SinkToStoragePtr StorageFile::write(
fs::create_directories(fs::path(path).parent_path());
}
bool has_wildcards = path.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
bool is_partitioned_implementation = insert_query && insert_query->partition_by && has_wildcards;
if (is_partitioned_implementation)
{
if (paths.size() != 1)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
"Table '{}' is in readonly mode because of globs in filepath",
getStorageID().getNameForLogs());
return std::make_shared<PartitionedStorageFileSink>(
insert_query->partition_by,
*this,
metadata_snapshot,
std::unique_lock{rwlock, getLockTimeout(context)},
chooseCompressionMethod(path, compression_method),
context,
format_settings,
flags);
}
else
{
return std::make_shared<StorageFileSink>(
*this,
metadata_snapshot,
@ -643,6 +750,7 @@ SinkToStoragePtr StorageFile::write(
format_settings,
flags);
}
}
bool StorageFile::storesDataOnDisk() const
{

View File

@ -17,6 +17,8 @@ class StorageFileBlockOutputStream;
class StorageFile final : public shared_ptr_helper<StorageFile>, public IStorage
{
friend struct shared_ptr_helper<StorageFile>;
friend class PartitionedStorageFileSink;
public:
std::string getName() const override { return "File"; }
@ -66,6 +68,8 @@ public:
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool isColumnOriented() const;
bool supportsPartitionBy() const override { return true; }
protected:
friend class StorageFileSource;
friend class StorageFileSink;

View File

@ -257,6 +257,24 @@ def test_truncate_table(started_cluster):
node1.query("drop table test_truncate")
def test_partition_by(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
file_name = "test_{_partition_id}"
partition_by = "column3"
values = "(1, 2, 3), (3, 2, 1), (1, 3, 2)"
table_function = f"hdfs('hdfs://hdfs1:9000/{file_name}', 'TSV', '{table_format}')"
node1.query(f"insert into table function {table_function} PARTITION BY {partition_by} values {values}")
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/test_1', 'TSV', '{table_format}')")
assert(result.strip() == "3\t2\t1")
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/test_2', 'TSV', '{table_format}')")
assert(result.strip() == "1\t3\t2")
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/test_3', 'TSV', '{table_format}')")
assert(result.strip() == "1\t2\t3")
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -0,0 +1,6 @@
part 1
3 2 1
part 2
1 3 2
part 3
1 2 3

View File

@ -0,0 +1,30 @@
#!/usr/bin/env bash
# Tags: no-parallel, no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# See 01658_read_file_to_string_column.sh
user_files_path=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
mkdir -p "${user_files_path}/"
chmod 777 ${user_files_path}
FILE_PATH="${user_files_path}/test_table_function_file"
function cleanup()
{
rm -r ${FILE_PATH}
}
trap cleanup EXIT
values="(1, 2, 3), (3, 2, 1), (1, 3, 2)"
${CLICKHOUSE_CLIENT} --query="insert into table function file('${FILE_PATH}/test_{_partition_id}', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32') PARTITION BY column3 values ${values}";
echo 'part 1'
${CLICKHOUSE_CLIENT} --query="select * from file('${FILE_PATH}/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')";
echo 'part 2'
${CLICKHOUSE_CLIENT} --query="select * from file('${FILE_PATH}/test_2', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')";
echo 'part 3'
${CLICKHOUSE_CLIENT} --query="select * from file('${FILE_PATH}/test_3', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')";