mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #37801 from azat/s3-fix-empty-read
Fix reading of empty S3 files
This commit is contained in:
commit
6211a1c390
@ -210,7 +210,11 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
|
|||||||
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
|
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
|
||||||
|
|
||||||
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
|
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
|
||||||
assert(file_offset_of_buffer_end == impl->getImplementationBufferOffset());
|
/// In case of multiple files for the same file in clickhouse (i.e. log family)
|
||||||
|
/// file_offset_of_buffer_end will not match getImplementationBufferOffset()
|
||||||
|
/// so we use [impl->getImplementationBufferOffset(), impl->getFileSize()]
|
||||||
|
assert(file_offset_of_buffer_end >= impl->getImplementationBufferOffset());
|
||||||
|
assert(file_offset_of_buffer_end <= impl->getFileSize());
|
||||||
|
|
||||||
prefetch_future = {};
|
prefetch_future = {};
|
||||||
return size;
|
return size;
|
||||||
|
@ -116,6 +116,11 @@ bool ReadBufferFromS3::nextImpl()
|
|||||||
assert(working_buffer.begin() != nullptr);
|
assert(working_buffer.begin() != nullptr);
|
||||||
assert(!internal_buffer.empty());
|
assert(!internal_buffer.empty());
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/// use the buffer returned by `impl`
|
||||||
|
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Try to read a next portion of data.
|
/// Try to read a next portion of data.
|
||||||
@ -155,7 +160,7 @@ bool ReadBufferFromS3::nextImpl()
|
|||||||
if (!next_result)
|
if (!next_result)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset()); /// use the buffer returned by `impl`
|
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
|
||||||
|
|
||||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Bytes, working_buffer.size());
|
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Bytes, working_buffer.size());
|
||||||
offset += working_buffer.size();
|
offset += working_buffer.size();
|
||||||
|
@ -46,7 +46,11 @@ def assert_objects_count(cluster, objects_count, path="data/"):
|
|||||||
# files_overhead=1, files_overhead_per_insert=2
|
# files_overhead=1, files_overhead_per_insert=2
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"log_engine,files_overhead,files_overhead_per_insert",
|
"log_engine,files_overhead,files_overhead_per_insert",
|
||||||
[("TinyLog", 1, 1), ("Log", 1, 2), ("StripeLog", 1, 2)],
|
[
|
||||||
|
pytest.param("TinyLog", 1, 1, id="TinyLog"),
|
||||||
|
pytest.param("Log", 1, 2, id="Log"),
|
||||||
|
pytest.param("StripeLog", 1, 2, id="StripeLog"),
|
||||||
|
],
|
||||||
)
|
)
|
||||||
def test_log_family_s3(cluster, log_engine, files_overhead, files_overhead_per_insert):
|
def test_log_family_s3(cluster, log_engine, files_overhead, files_overhead_per_insert):
|
||||||
node = cluster.instances["node"]
|
node = cluster.instances["node"]
|
||||||
@ -57,21 +61,26 @@ def test_log_family_s3(cluster, log_engine, files_overhead, files_overhead_per_i
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
node.query("INSERT INTO s3_test SELECT number FROM numbers(5)")
|
node.query("INSERT INTO s3_test SELECT number FROM numbers(5)")
|
||||||
assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n3\n4\n"
|
assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n3\n4\n"
|
||||||
assert_objects_count(cluster, files_overhead_per_insert + files_overhead)
|
assert_objects_count(cluster, files_overhead_per_insert + files_overhead)
|
||||||
|
|
||||||
node.query("INSERT INTO s3_test SELECT number + 5 FROM numbers(3)")
|
node.query("INSERT INTO s3_test SELECT number + 5 FROM numbers(3)")
|
||||||
assert node.query("SELECT * FROM s3_test order by id") == "0\n1\n2\n3\n4\n5\n6\n7\n"
|
assert (
|
||||||
|
node.query("SELECT * FROM s3_test order by id")
|
||||||
|
== "0\n1\n2\n3\n4\n5\n6\n7\n"
|
||||||
|
)
|
||||||
assert_objects_count(cluster, files_overhead_per_insert * 2 + files_overhead)
|
assert_objects_count(cluster, files_overhead_per_insert * 2 + files_overhead)
|
||||||
|
|
||||||
node.query("INSERT INTO s3_test SELECT number + 8 FROM numbers(1)")
|
node.query("INSERT INTO s3_test SELECT number + 8 FROM numbers(1)")
|
||||||
assert (
|
assert (
|
||||||
node.query("SELECT * FROM s3_test order by id") == "0\n1\n2\n3\n4\n5\n6\n7\n8\n"
|
node.query("SELECT * FROM s3_test order by id")
|
||||||
|
== "0\n1\n2\n3\n4\n5\n6\n7\n8\n"
|
||||||
)
|
)
|
||||||
assert_objects_count(cluster, files_overhead_per_insert * 3 + files_overhead)
|
assert_objects_count(cluster, files_overhead_per_insert * 3 + files_overhead)
|
||||||
|
|
||||||
node.query("TRUNCATE TABLE s3_test")
|
node.query("TRUNCATE TABLE s3_test")
|
||||||
assert_objects_count(cluster, 0)
|
assert_objects_count(cluster, 0)
|
||||||
|
finally:
|
||||||
node.query("DROP TABLE s3_test")
|
node.query("DROP TABLE s3_test")
|
||||||
|
Loading…
Reference in New Issue
Block a user