Merge pull request #38802 from kssenii/fix-s3-seekable-with-parallel

Fix s3 seekable reads with parallel read buffer
This commit is contained in:
Kseniia Sumarokova 2022-07-05 11:23:16 +02:00 committed by GitHub
commit b3fd8e3bd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 24 deletions

View File

@ -47,6 +47,7 @@ public:
off_t getPosition() override;
const ReadBufferFactory & getReadBufferFactory() const { return *reader_factory; }
ReadBufferFactory & getReadBufferFactory() { return *reader_factory; }
private:
/// Reader in progress with a list of read segments

View File

@ -33,6 +33,10 @@ size_t getFileSizeFromReadBuffer(ReadBuffer & in)
{
return getFileSize(compressed->getWrappedReadBuffer());
}
else if (auto * parallel = dynamic_cast<ParallelReadBuffer *>(&in))
{
return getFileSize(parallel->getReadBufferFactory());
}
return getFileSize(in);
}
@ -47,6 +51,10 @@ bool isBufferWithFileSize(const ReadBuffer & in)
{
return isBufferWithFileSize(compressed->getWrappedReadBuffer());
}
else if (const auto * parallel = dynamic_cast<const ParallelReadBuffer *>(&in))
{
return dynamic_cast<const WithFileSize *>(&parallel->getReadBufferFactory()) != nullptr;
}
return dynamic_cast<const WithFileSize *>(&in) != nullptr;
}

View File

@ -1052,61 +1052,61 @@ def test_seekable_formats(started_cluster):
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000) settings s3_truncate_on_insert=1"
exec_query_with_retry(
instance,
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(1000000) settings s3_truncate_on_insert=1",
)
result = instance.query(f"SELECT count() FROM {table_function}")
assert int(result) == 5000000
assert int(result) == 1000000
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
exec_query_with_retry(
instance,
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000) settings s3_truncate_on_insert=1",
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(1000000) settings s3_truncate_on_insert=1",
)
result = instance.query(f"SELECT count() FROM {table_function}")
assert int(result) == 5000000
result = instance.query(
f"SELECT count() FROM {table_function} SETTINGS max_memory_usage='50M'"
)
assert int(result) == 1000000
instance.query(f"SELECT * FROM {table_function} FORMAT Null")
instance.query("SYSTEM FLUSH LOGS")
result = instance.query(
f"SELECT formatReadableSize(memory_usage) FROM system.query_log WHERE startsWith(query, 'SELECT count() FROM s3') AND memory_usage > 0 ORDER BY event_time desc"
f"SELECT formatReadableSize(ProfileEvents['ReadBufferFromS3Bytes']) FROM system.query_log WHERE startsWith(query, 'SELECT * FROM s3') AND memory_usage > 0 AND type='QueryFinish' ORDER BY event_time_microseconds DESC LIMIT 1"
)
result = result.strip()
assert result.endswith("MiB")
result = result[: result.index(".")]
assert int(result) < 200
assert int(result) > 80
def test_seekable_formats_url(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(
f"insert into table function {table_function} select number, randomString(100) from numbers(5000000) settings s3_truncate_on_insert=1"
exec_query_with_retry(
instance,
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(1000000) settings s3_truncate_on_insert=1",
)
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_parquet', 'Parquet', 'a Int32, b String')"
result = instance.query(f"SELECT count() FROM {table_function}")
assert int(result) == 5000000
assert int(result) == 1000000
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
exec_query_with_retry(
instance,
f"insert into table function {table_function} select number, randomString(100) from numbers(5000000) settings s3_truncate_on_insert=1",
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(1000000) settings s3_truncate_on_insert=1",
)
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_orc', 'ORC', 'a Int32, b String')"
result = instance.query(f"SELECT count() FROM {table_function}")
assert int(result) == 5000000
instance.query("SYSTEM FLUSH LOGS")
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_parquet', 'Parquet', 'a Int32, b String')"
result = instance.query(
f"SELECT formatReadableSize(memory_usage) FROM system.query_log WHERE startsWith(query, 'SELECT count() FROM url') AND memory_usage > 0 ORDER BY event_time desc"
f"SELECT count() FROM {table_function} SETTINGS max_memory_usage='50M'"
)
result = result[: result.index(".")]
assert int(result) < 200
assert int(result) == 1000000
def test_empty_file(started_cluster):