mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 09:32:01 +00:00
buffer result if out copacity is not enough
This commit is contained in:
parent
7c4998e5c7
commit
6d5a5f9fcd
@ -2,6 +2,59 @@
|
||||
#include <Common/Exception.h>
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
using namespace DB;
|
||||
|
||||
class SinkToOut
|
||||
{
|
||||
public:
|
||||
SinkToOut(WriteBuffer * out_, Memory<> & mem_, size_t guaranteed_capacity)
|
||||
: sink(out_)
|
||||
, tmp_out(mem_)
|
||||
, cur_out(sink)
|
||||
{
|
||||
chassert(sink);
|
||||
|
||||
if (sink->available() < guaranteed_capacity)
|
||||
{
|
||||
mem_.resize(guaranteed_capacity);
|
||||
cur_out = &tmp_out;
|
||||
}
|
||||
}
|
||||
|
||||
size_t getCapacity()
|
||||
{
|
||||
return cur_out->available();
|
||||
}
|
||||
|
||||
BufferBase::Position getPosition()
|
||||
{
|
||||
return cur_out->position();
|
||||
}
|
||||
|
||||
void advancePosition(size_t size)
|
||||
{
|
||||
chassert(size <= cur_out->available());
|
||||
cur_out->position() += size;
|
||||
}
|
||||
|
||||
~SinkToOut() noexcept(false)
|
||||
{
|
||||
if (cur_out == sink)
|
||||
return;
|
||||
|
||||
sink->write(tmp_out.buffer().begin(), tmp_out.count());
|
||||
}
|
||||
|
||||
private:
|
||||
WriteBuffer * sink;
|
||||
BufferWithOutsideMemory<WriteBuffer> tmp_out;
|
||||
WriteBuffer * cur_out;
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
@ -13,9 +66,9 @@ Lz4DeflatingWriteBuffer::Lz4DeflatingWriteBuffer(
|
||||
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
|
||||
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
|
||||
, in_data(nullptr)
|
||||
, out_data(nullptr)
|
||||
, in_capacity(0)
|
||||
, out_capacity(0)
|
||||
, tmp_memory(buf_size)
|
||||
|
||||
{
|
||||
kPrefs = {
|
||||
{LZ4F_max256KB,
|
||||
@ -36,8 +89,8 @@ Lz4DeflatingWriteBuffer::Lz4DeflatingWriteBuffer(
|
||||
if (LZ4F_isError(ret))
|
||||
throw Exception(
|
||||
ErrorCodes::LZ4_ENCODER_FAILED,
|
||||
"creation of LZ4 compression context failed. LZ4F version: {}",
|
||||
LZ4F_VERSION);
|
||||
"creation of LZ4 compression context failed. LZ4F version: {}, error: {}",
|
||||
LZ4F_VERSION, LZ4F_getErrorName(ret));
|
||||
}
|
||||
|
||||
Lz4DeflatingWriteBuffer::~Lz4DeflatingWriteBuffer()
|
||||
@ -54,107 +107,76 @@ void Lz4DeflatingWriteBuffer::nextImpl()
|
||||
in_data = reinterpret_cast<void *>(working_buffer.begin());
|
||||
in_capacity = offset();
|
||||
|
||||
out_capacity = out->buffer().end() - out->position();
|
||||
out_data = reinterpret_cast<void *>(out->position());
|
||||
|
||||
try
|
||||
if (first_time)
|
||||
{
|
||||
if (first_time)
|
||||
auto sink = SinkToOut(out.get(), tmp_memory, LZ4F_HEADER_SIZE_MAX);
|
||||
chassert(sink.getCapacity() >= LZ4F_HEADER_SIZE_MAX);
|
||||
|
||||
/// write frame header and check for errors
|
||||
size_t header_size = LZ4F_compressBegin(
|
||||
ctx, sink.getPosition(), sink.getCapacity(), &kPrefs);
|
||||
|
||||
if (LZ4F_isError(header_size))
|
||||
throw Exception(
|
||||
ErrorCodes::LZ4_ENCODER_FAILED,
|
||||
"LZ4 failed to start stream encoding. LZ4F version: {}, error: {}",
|
||||
LZ4F_VERSION, LZ4F_getErrorName(header_size));
|
||||
|
||||
sink.advancePosition(header_size);
|
||||
first_time = false;
|
||||
}
|
||||
|
||||
do
|
||||
{
|
||||
/// Ensure that there is enough space for compressed block of minimal size
|
||||
size_t min_compressed_block_size = LZ4F_compressBound(1, &kPrefs);
|
||||
|
||||
auto sink = SinkToOut(out.get(), tmp_memory, min_compressed_block_size);
|
||||
chassert(sink.getCapacity() >= min_compressed_block_size);
|
||||
|
||||
/// LZ4F_compressUpdate compresses whole input buffer at once so we need to shink it manually
|
||||
size_t cur_buffer_size = in_capacity;
|
||||
if (sink.getCapacity() >= min_compressed_block_size) /// We cannot shrink the input buffer if it's already too small.
|
||||
{
|
||||
if (out_capacity < LZ4F_HEADER_SIZE_MAX)
|
||||
{
|
||||
out->next();
|
||||
out_capacity = out->buffer().end() - out->position();
|
||||
out_data = reinterpret_cast<void *>(out->position());
|
||||
}
|
||||
|
||||
/// write frame header and check for errors
|
||||
size_t header_size = LZ4F_compressBegin(ctx, out_data, out_capacity, &kPrefs);
|
||||
|
||||
if (LZ4F_isError(header_size))
|
||||
throw Exception(
|
||||
ErrorCodes::LZ4_ENCODER_FAILED,
|
||||
"LZ4 failed to start stream encoding. LZ4F version: {}",
|
||||
LZ4F_VERSION);
|
||||
|
||||
out_capacity -= header_size;
|
||||
out->position() = out->buffer().end() - out_capacity;
|
||||
out_data = reinterpret_cast<void *>(out->position());
|
||||
|
||||
first_time = false;
|
||||
while (sink.getCapacity() < LZ4F_compressBound(cur_buffer_size, &kPrefs))
|
||||
cur_buffer_size /= 2;
|
||||
}
|
||||
|
||||
do
|
||||
{
|
||||
/// Ensure that there is enough space for compressed block of minimal size
|
||||
size_t min_compressed_block_size = LZ4F_compressBound(1, &kPrefs);
|
||||
if (out_capacity < min_compressed_block_size)
|
||||
{
|
||||
out->next();
|
||||
out_capacity = out->buffer().end() - out->position();
|
||||
out_data = reinterpret_cast<void *>(out->position());
|
||||
}
|
||||
size_t compressed_size = LZ4F_compressUpdate(
|
||||
ctx, sink.getPosition(), sink.getCapacity(), in_data, cur_buffer_size, nullptr);
|
||||
|
||||
/// LZ4F_compressUpdate compresses whole input buffer at once so we need to shink it manually
|
||||
size_t cur_buffer_size = in_capacity;
|
||||
if (out_capacity >= min_compressed_block_size) /// We cannot shrink the input buffer if it's already too small.
|
||||
{
|
||||
while (out_capacity < LZ4F_compressBound(cur_buffer_size, &kPrefs))
|
||||
cur_buffer_size /= 2;
|
||||
}
|
||||
if (LZ4F_isError(compressed_size))
|
||||
throw Exception(
|
||||
ErrorCodes::LZ4_ENCODER_FAILED,
|
||||
"LZ4 failed to encode stream. LZ4F version: {}, error {}, out_capacity {}",
|
||||
LZ4F_VERSION, LZ4F_getErrorName(compressed_size), sink.getCapacity());
|
||||
|
||||
size_t compressed_size = LZ4F_compressUpdate(ctx, out_data, out_capacity, in_data, cur_buffer_size, nullptr);
|
||||
in_capacity -= cur_buffer_size;
|
||||
in_data = reinterpret_cast<void *>(working_buffer.end() - in_capacity);
|
||||
|
||||
if (LZ4F_isError(compressed_size))
|
||||
throw Exception(
|
||||
ErrorCodes::LZ4_ENCODER_FAILED,
|
||||
"LZ4 failed to encode stream. LZ4F version: {}",
|
||||
LZ4F_VERSION);
|
||||
|
||||
in_capacity -= cur_buffer_size;
|
||||
in_data = reinterpret_cast<void *>(working_buffer.end() - in_capacity);
|
||||
|
||||
out_capacity -= compressed_size;
|
||||
out->position() = out->buffer().end() - out_capacity;
|
||||
out_data = reinterpret_cast<void *>(out->position());
|
||||
}
|
||||
while (in_capacity > 0);
|
||||
sink.advancePosition(compressed_size);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
out->position() = out->buffer().begin();
|
||||
throw;
|
||||
}
|
||||
out->next();
|
||||
out_capacity = out->buffer().end() - out->position();
|
||||
while (in_capacity > 0);
|
||||
}
|
||||
|
||||
void Lz4DeflatingWriteBuffer::finalizeBefore()
|
||||
{
|
||||
next();
|
||||
|
||||
out_capacity = out->buffer().end() - out->position();
|
||||
out_data = reinterpret_cast<void *>(out->position());
|
||||
|
||||
if (out_capacity < LZ4F_compressBound(0, &kPrefs))
|
||||
{
|
||||
out->next();
|
||||
out_capacity = out->buffer().end() - out->position();
|
||||
out_data = reinterpret_cast<void *>(out->position());
|
||||
}
|
||||
auto suffix_size = LZ4F_compressBound(0, &kPrefs);
|
||||
auto sink = SinkToOut(out.get(), tmp_memory, suffix_size);
|
||||
chassert(sink.getCapacity() >= suffix_size);
|
||||
|
||||
/// compression end
|
||||
size_t end_size = LZ4F_compressEnd(ctx, out_data, out_capacity, nullptr);
|
||||
size_t end_size = LZ4F_compressEnd(ctx, sink.getPosition(), sink.getCapacity(), nullptr);
|
||||
|
||||
if (LZ4F_isError(end_size))
|
||||
throw Exception(
|
||||
ErrorCodes::LZ4_ENCODER_FAILED,
|
||||
"LZ4 failed to end stream encoding. LZ4F version: {}",
|
||||
LZ4F_VERSION);
|
||||
"LZ4 failed to end stream encoding. LZ4F version: {}, error {}, out_capacity {}",
|
||||
LZ4F_VERSION, LZ4F_getErrorName(end_size), sink.getCapacity());
|
||||
|
||||
out_capacity -= end_size;
|
||||
out->position() = out->buffer().end() - out_capacity;
|
||||
out_data = reinterpret_cast<void *>(out->position());
|
||||
sink.advancePosition(end_size);
|
||||
}
|
||||
|
||||
void Lz4DeflatingWriteBuffer::finalizeAfter()
|
||||
|
@ -33,10 +33,9 @@ private:
|
||||
LZ4F_compressionContext_t ctx;
|
||||
|
||||
void * in_data;
|
||||
void * out_data;
|
||||
|
||||
size_t in_capacity;
|
||||
size_t out_capacity;
|
||||
|
||||
Memory<> tmp_memory;
|
||||
|
||||
bool first_time = true;
|
||||
};
|
||||
|
@ -97,9 +97,8 @@ def get_counters(node, query_id, log_type="ExceptionWhileProcessing"):
|
||||
]
|
||||
|
||||
|
||||
# Add "lz4" compression method in the list after https://github.com/ClickHouse/ClickHouse/issues/50975 is fixed
|
||||
@pytest.mark.parametrize(
|
||||
"compression", ["none", "gzip", "br", "xz", "zstd", "bz2", "deflate"]
|
||||
"compression", ["none", "gzip", "br", "xz", "zstd", "bz2", "deflate", "lz4"]
|
||||
)
|
||||
def test_upload_s3_fail_create_multi_part_upload(cluster, broken_s3, compression):
|
||||
node = cluster.instances["node"]
|
||||
@ -137,9 +136,8 @@ def test_upload_s3_fail_create_multi_part_upload(cluster, broken_s3, compression
|
||||
assert count_s3_errors == 1
|
||||
|
||||
|
||||
# Add "lz4" compression method in the list after https://github.com/ClickHouse/ClickHouse/issues/50975 is fixed
|
||||
@pytest.mark.parametrize(
|
||||
"compression", ["none", "gzip", "br", "xz", "zstd", "bz2", "deflate"]
|
||||
"compression", ["none", "gzip", "br", "xz", "zstd", "bz2", "deflate", "lz4"]
|
||||
)
|
||||
def test_upload_s3_fail_upload_part_when_multi_part_upload(
|
||||
cluster, broken_s3, compression
|
||||
|
Loading…
Reference in New Issue
Block a user