Merge pull request #19793 from excitoon-favorites/fixcompressions3

Fixed table function S3 `auto` compression mode
This commit is contained in:
Alexander Kuzmenkov 2021-01-29 23:42:05 +03:00 committed by GitHub
commit 98e88d7305
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 13 deletions

View File

@ -329,7 +329,7 @@ Pipe StorageS3::read(
context,
metadata_snapshot->getColumns(),
max_block_size,
chooseCompressionMethod(uri.endpoint, compression_method),
chooseCompressionMethod(uri.key, compression_method),
client,
uri.bucket,
key));
@ -347,7 +347,7 @@ BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const StorageMet
format_name,
metadata_snapshot->getSampleBlock(),
global_context,
chooseCompressionMethod(uri.endpoint, compression_method),
chooseCompressionMethod(uri.key, compression_method),
client,
uri.bucket,
uri.key,

View File

@ -443,10 +443,14 @@ def test_infinite_redirect(cluster):
assert exception_raised
def test_storage_s3_get_gzip(cluster):
@pytest.mark.parametrize("extension,method", [
("bin", "gzip"),
("gz", "auto")
])
def test_storage_s3_get_gzip(cluster, extension, method):
bucket = cluster.minio_bucket
instance = cluster.instances["dummy"]
filename = "test_get_gzip.bin"
filename = f"test_get_gzip.{extension}"
name = "test_get_gzip"
data = [
"Sophia Intrieri,55",
@ -473,13 +477,15 @@ def test_storage_s3_get_gzip(cluster):
put_s3_file_content(cluster, bucket, filename, buf.getvalue())
try:
run_query(instance, "CREATE TABLE {} (name String, id UInt32) ENGINE = S3('http://{}:{}/{}/{}', 'CSV', 'gzip')".format(
name, cluster.minio_host, cluster.minio_port, bucket, filename))
run_query(instance, f"""CREATE TABLE {name} (name String, id UInt32) ENGINE = S3(
'http://{cluster.minio_host}:{cluster.minio_port}/{bucket}/{filename}',
'CSV',
'{method}')""")
run_query(instance, "SELECT sum(id) FROM {}".format(name)).splitlines() == ["565"]
finally:
run_query(instance, "DROP TABLE {}".format(name))
run_query(instance, f"DROP TABLE {name}")
def test_storage_s3_put_uncompressed(cluster):
@ -515,13 +521,17 @@ def test_storage_s3_put_uncompressed(cluster):
uncompressed_content = get_s3_file_content(cluster, bucket, filename)
assert sum([ int(i.split(',')[1]) for i in uncompressed_content.splitlines() ]) == 753
finally:
run_query(instance, "DROP TABLE {}".format(name))
run_query(instance, f"DROP TABLE {name}")
def test_storage_s3_put_gzip(cluster):
@pytest.mark.parametrize("extension,method", [
("bin", "gzip"),
("gz", "auto")
])
def test_storage_s3_put_gzip(cluster, extension, method):
bucket = cluster.minio_bucket
instance = cluster.instances["dummy"]
filename = "test_put_gzip.bin"
filename = f"test_put_gzip.{extension}"
name = "test_put_gzip"
data = [
"'Joseph Tomlinson',5",
@ -541,8 +551,10 @@ def test_storage_s3_put_gzip(cluster):
"'Yolanda Joseph',89"
]
try:
run_query(instance, "CREATE TABLE {} (name String, id UInt32) ENGINE = S3('http://{}:{}/{}/{}', 'CSV', 'gzip')".format(
name, cluster.minio_host, cluster.minio_port, bucket, filename))
run_query(instance, f"""CREATE TABLE {name} (name String, id UInt32) ENGINE = S3(
'http://{cluster.minio_host}:{cluster.minio_port}/{bucket}/{filename}',
'CSV',
'{method}')""")
run_query(instance, "INSERT INTO {} VALUES ({})".format(name, "),(".join(data)))
@ -553,4 +565,4 @@ def test_storage_s3_put_gzip(cluster):
uncompressed_content = f.read().decode()
assert sum([ int(i.split(',')[1]) for i in uncompressed_content.splitlines() ]) == 708
finally:
run_query(instance, "DROP TABLE {}".format(name))
run_query(instance, f"DROP TABLE {name}")