This commit is contained in:
Vladimir Chebotarev 2021-05-31 11:46:28 +03:00
parent 0879dbbfa7
commit 328213f5d9
2 changed files with 11 additions and 14 deletions

View File

@ -2,6 +2,8 @@
#if USE_AWS_S3
#include <Columns/ColumnString.h>
#include <IO/S3Common.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageS3.h>
@ -379,13 +381,13 @@ public:
current_block_with_partition_by_expr.setColumns(columns);
partition_by_expr->execute(current_block_with_partition_by_expr);
const auto & key_column = current_block_with_partition_by_expr.getByName(partition_by_column_name);
const auto * key_column = checkAndGetColumn<ColumnString>(current_block_with_partition_by_expr.getByName(partition_by_column_name).column.get());
std::unordered_map<String, size_t> sub_chunks_indices;
IColumn::Selector selector;
for (size_t row = 0; row < chunk.getNumRows(); ++row)
{
auto & value = (*key_column.column)[row].get<String>();
auto value = key_column->getDataAt(row);
auto [it, inserted] = sub_chunks_indices.emplace(value, sub_chunks_indices.size());
selector.push_back(it->second);
}

View File

@ -147,25 +147,20 @@ def test_put(started_cluster, maybe_auth, positive, compression):
def test_distributed_put(cluster):
bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
bucket = cluster.minio_bucket
instance = cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
values_csv = "1,2,3\n3,2,1\n78,43,45\n"
filename = "test_{_partition_id}.csv"
put_query = f"""insert into table function s3('http://{cluster.minio_host}:{cluster.minio_port}/{bucket}/{filename}',
'CSV', '{table_format}') PARTITION BY column3 values {values}"""
'CSV', '{table_format}') PARTITION BY toString(column3) values {values}"""
try:
run_query(instance, put_query)
except helpers.client.QueryRuntimeException:
if positive:
raise
else:
assert positive
assert "1,2,3\n" == get_s3_file_content(cluster, bucket, "test_3.csv")
assert "3,2,1\n" == get_s3_file_content(cluster, bucket, "test_1.csv")
assert "78,43,45\n" == get_s3_file_content(cluster, bucket, "test_45.csv")
run_query(instance, put_query)
assert "1,2,3\n" == get_s3_file_content(cluster, bucket, "test_3.csv")
assert "3,2,1\n" == get_s3_file_content(cluster, bucket, "test_1.csv")
assert "78,43,45\n" == get_s3_file_content(cluster, bucket, "test_45.csv")
@pytest.mark.parametrize("special", [