Better functionality.

This commit is contained in:
Vladimir Chebotarev 2021-06-01 15:41:23 +03:00
parent 328213f5d9
commit e99433e094
2 changed files with 15 additions and 9 deletions

View File

@ -4,18 +4,22 @@
#include <Columns/ColumnString.h>
#include <Functions/FunctionsConversion.h>
#include <IO/S3Common.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageS3Settings.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadHelpers.h>
@ -365,10 +369,12 @@ public:
, max_single_part_upload_size(max_single_part_upload_size_)
{
ASTPtr query = partition_by;
auto syntax_result = TreeRewriter(context).analyze(query, sample_block.getNamesAndTypesList());
partition_by_expr = ExpressionAnalyzer(query, syntax_result, context).getActions(false);
partition_by_column_name = partition_by->getColumnName();
std::vector<ASTPtr> arguments(1, partition_by);
ASTPtr partition_by_string = makeASTFunction(FunctionToString::name, std::move(arguments));
auto syntax_result = TreeRewriter(context).analyze(partition_by_string, sample_block.getNamesAndTypesList());
partition_by_expr = ExpressionAnalyzer(partition_by_string, syntax_result, context).getActions(false);
partition_by_column_name = partition_by_string->getColumnName();
}
String getName() const override { return "PartitionedStorageS3Sink"; }

View File

@ -154,7 +154,7 @@ def test_distributed_put(cluster):
values_csv = "1,2,3\n3,2,1\n78,43,45\n"
filename = "test_{_partition_id}.csv"
put_query = f"""insert into table function s3('http://{cluster.minio_host}:{cluster.minio_port}/{bucket}/{filename}',
'CSV', '{table_format}') PARTITION BY toString(column3) values {values}"""
'CSV', '{table_format}') PARTITION BY column3 values {values}"""
run_query(instance, put_query)