2020-11-09 22:52:22 +00:00
|
|
|
#include <IO/LZMADeflatingWriteBuffer.h>
|
Do not catch exceptions during final flush in writers destructors
Since this hides real problems, since destructor does final flush and if
it fails, then data will be lost.
One of such examples if MEMORY_LIMIT_EXCEEDED exception, so lock
exceptions from destructors, by using
MemoryTracker::LockExceptionInThread to block these exception, and allow
others (so std::terminate will be called, since this is c++11 with
noexcept for destructors by default).
Here is an example, that leads to empty block in the distributed batch:
2021.01.21 12:43:18.619739 [ 46468 ] {7bd60d75-ebcb-45d2-874d-260df9a4ddac} <Error> virtual DB::CompressedWriteBuffer::~CompressedWriteBuffer(): Code: 241, e.displayText() = DB::Exception: Memory limit (for user) exceeded: would use 332.07 GiB (attempt to allocate chunk of 4355342 bytes), maximum: 256.00 GiB, Stack trace (when copying this message, always include the lines below):
0. DB::Exception::Exception<>() @ 0x86f7b88 in /usr/bin/clickhouse
...
4. void DB::PODArrayBase<>::resize<>(unsigned long) @ 0xe9e878d in /usr/bin/clickhouse
5. DB::CompressedWriteBuffer::nextImpl() @ 0xe9f0296 in /usr/bin/clickhouse
6. DB::CompressedWriteBuffer::~CompressedWriteBuffer() @ 0xe9f0415 in /usr/bin/clickhouse
7. DB::DistributedBlockOutputStream::writeToShard() @ 0xf6bed4a in /usr/bin/clickhouse
2021-01-22 18:56:50 +00:00
|
|
|
#include <Common/MemoryTracker.h>
|
2020-11-01 18:40:05 +00:00
|
|
|
|
2020-11-12 09:21:33 +00:00
|
|
|
#if !defined(ARCADIA_BUILD)
|
2020-11-01 18:40:05 +00:00
|
|
|
|
2020-11-02 20:04:49 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
2020-11-01 18:40:05 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LZMA_STREAM_ENCODER_FAILED;
|
|
|
|
}
|
|
|
|
|
2020-11-09 22:52:22 +00:00
|
|
|
LZMADeflatingWriteBuffer::LZMADeflatingWriteBuffer(
|
2020-11-02 20:04:49 +00:00
|
|
|
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
|
|
|
|
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment), out(std::move(out_))
|
2020-11-01 18:40:05 +00:00
|
|
|
{
|
2020-11-09 22:52:22 +00:00
|
|
|
|
2020-11-11 01:50:56 +00:00
|
|
|
lstr = LZMA_STREAM_INIT;
|
2020-11-01 18:40:05 +00:00
|
|
|
lstr.allocator = nullptr;
|
|
|
|
lstr.next_in = nullptr;
|
|
|
|
lstr.avail_in = 0;
|
|
|
|
lstr.next_out = nullptr;
|
|
|
|
lstr.avail_out = 0;
|
2020-11-02 20:04:49 +00:00
|
|
|
|
2020-11-01 18:40:05 +00:00
|
|
|
// options for further compression
|
|
|
|
lzma_options_lzma opt_lzma2;
|
2020-11-02 13:17:25 +00:00
|
|
|
if (lzma_lzma_preset(&opt_lzma2, compression_level))
|
2020-11-11 01:50:56 +00:00
|
|
|
throw Exception(ErrorCodes::LZMA_STREAM_ENCODER_FAILED, "lzma preset failed: lzma version: {}", LZMA_VERSION_STRING);
|
2020-11-02 13:17:25 +00:00
|
|
|
|
2020-11-01 18:40:05 +00:00
|
|
|
|
2020-11-11 01:50:56 +00:00
|
|
|
// LZMA_FILTER_X86 -
|
|
|
|
// LZMA2 - codec for *.xz files compression; LZMA is not suitable for this purpose
|
|
|
|
// VLI - variable length integer (in *.xz most integers encoded as VLI)
|
|
|
|
// LZMA_VLI_UNKNOWN (UINT64_MAX) - VLI value to denote that the value is unknown
|
2020-11-01 18:40:05 +00:00
|
|
|
lzma_filter filters[] = {
|
2020-11-04 00:35:16 +00:00
|
|
|
{.id = LZMA_FILTER_X86, .options = nullptr},
|
2020-11-02 20:04:49 +00:00
|
|
|
{.id = LZMA_FILTER_LZMA2, .options = &opt_lzma2},
|
2020-11-04 00:35:16 +00:00
|
|
|
{.id = LZMA_VLI_UNKNOWN, .options = nullptr},
|
2020-11-01 18:40:05 +00:00
|
|
|
};
|
|
|
|
lzma_ret ret = lzma_stream_encoder(&lstr, filters, LZMA_CHECK_CRC64);
|
|
|
|
|
2020-11-02 20:04:49 +00:00
|
|
|
if (ret != LZMA_OK)
|
|
|
|
throw Exception(
|
2020-11-11 01:50:56 +00:00
|
|
|
ErrorCodes::LZMA_STREAM_ENCODER_FAILED,
|
|
|
|
"lzma stream encoder init failed: error code: {} lzma version: {}",
|
|
|
|
ret,
|
|
|
|
LZMA_VERSION_STRING);
|
2020-11-01 18:40:05 +00:00
|
|
|
}
|
|
|
|
|
2020-11-09 22:52:22 +00:00
|
|
|
LZMADeflatingWriteBuffer::~LZMADeflatingWriteBuffer()
|
2020-11-01 18:40:05 +00:00
|
|
|
{
|
Do not catch exceptions during final flush in writers destructors
Since this hides real problems, since destructor does final flush and if
it fails, then data will be lost.
One of such examples if MEMORY_LIMIT_EXCEEDED exception, so lock
exceptions from destructors, by using
MemoryTracker::LockExceptionInThread to block these exception, and allow
others (so std::terminate will be called, since this is c++11 with
noexcept for destructors by default).
Here is an example, that leads to empty block in the distributed batch:
2021.01.21 12:43:18.619739 [ 46468 ] {7bd60d75-ebcb-45d2-874d-260df9a4ddac} <Error> virtual DB::CompressedWriteBuffer::~CompressedWriteBuffer(): Code: 241, e.displayText() = DB::Exception: Memory limit (for user) exceeded: would use 332.07 GiB (attempt to allocate chunk of 4355342 bytes), maximum: 256.00 GiB, Stack trace (when copying this message, always include the lines below):
0. DB::Exception::Exception<>() @ 0x86f7b88 in /usr/bin/clickhouse
...
4. void DB::PODArrayBase<>::resize<>(unsigned long) @ 0xe9e878d in /usr/bin/clickhouse
5. DB::CompressedWriteBuffer::nextImpl() @ 0xe9f0296 in /usr/bin/clickhouse
6. DB::CompressedWriteBuffer::~CompressedWriteBuffer() @ 0xe9f0415 in /usr/bin/clickhouse
7. DB::DistributedBlockOutputStream::writeToShard() @ 0xf6bed4a in /usr/bin/clickhouse
2021-01-22 18:56:50 +00:00
|
|
|
/// FIXME move final flush into the caller
|
|
|
|
MemoryTracker::LockExceptionInThread lock;
|
2020-11-01 18:40:05 +00:00
|
|
|
|
Do not catch exceptions during final flush in writers destructors
Since this hides real problems, since destructor does final flush and if
it fails, then data will be lost.
One of such examples if MEMORY_LIMIT_EXCEEDED exception, so lock
exceptions from destructors, by using
MemoryTracker::LockExceptionInThread to block these exception, and allow
others (so std::terminate will be called, since this is c++11 with
noexcept for destructors by default).
Here is an example, that leads to empty block in the distributed batch:
2021.01.21 12:43:18.619739 [ 46468 ] {7bd60d75-ebcb-45d2-874d-260df9a4ddac} <Error> virtual DB::CompressedWriteBuffer::~CompressedWriteBuffer(): Code: 241, e.displayText() = DB::Exception: Memory limit (for user) exceeded: would use 332.07 GiB (attempt to allocate chunk of 4355342 bytes), maximum: 256.00 GiB, Stack trace (when copying this message, always include the lines below):
0. DB::Exception::Exception<>() @ 0x86f7b88 in /usr/bin/clickhouse
...
4. void DB::PODArrayBase<>::resize<>(unsigned long) @ 0xe9e878d in /usr/bin/clickhouse
5. DB::CompressedWriteBuffer::nextImpl() @ 0xe9f0296 in /usr/bin/clickhouse
6. DB::CompressedWriteBuffer::~CompressedWriteBuffer() @ 0xe9f0415 in /usr/bin/clickhouse
7. DB::DistributedBlockOutputStream::writeToShard() @ 0xf6bed4a in /usr/bin/clickhouse
2021-01-22 18:56:50 +00:00
|
|
|
finish();
|
|
|
|
lzma_end(&lstr);
|
2020-11-01 18:40:05 +00:00
|
|
|
}
|
|
|
|
|
2020-11-09 22:52:22 +00:00
|
|
|
void LZMADeflatingWriteBuffer::nextImpl()
|
2020-11-01 18:40:05 +00:00
|
|
|
{
|
|
|
|
if (!offset())
|
|
|
|
return;
|
|
|
|
|
|
|
|
lstr.next_in = reinterpret_cast<unsigned char *>(working_buffer.begin());
|
|
|
|
lstr.avail_in = offset();
|
|
|
|
|
|
|
|
lzma_action action = LZMA_RUN;
|
2020-11-02 13:17:25 +00:00
|
|
|
do
|
|
|
|
{
|
2020-11-01 18:40:05 +00:00
|
|
|
out->nextIfAtEnd();
|
|
|
|
lstr.next_out = reinterpret_cast<unsigned char *>(out->position());
|
|
|
|
lstr.avail_out = out->buffer().end() - out->position();
|
2020-11-01 23:52:34 +00:00
|
|
|
|
2020-11-01 18:40:05 +00:00
|
|
|
lzma_ret ret = lzma_code(&lstr, action);
|
|
|
|
out->position() = out->buffer().end() - lstr.avail_out;
|
|
|
|
|
2020-11-02 13:17:25 +00:00
|
|
|
if (ret == LZMA_STREAM_END)
|
|
|
|
return;
|
2020-11-02 20:04:49 +00:00
|
|
|
|
2020-11-01 18:40:05 +00:00
|
|
|
if (ret != LZMA_OK)
|
2020-11-02 20:04:49 +00:00
|
|
|
throw Exception(
|
2020-11-11 01:50:56 +00:00
|
|
|
ErrorCodes::LZMA_STREAM_ENCODER_FAILED,
|
|
|
|
"lzma stream encoding failed: error code: {}; lzma_version: {}",
|
|
|
|
ret,
|
|
|
|
LZMA_VERSION_STRING);
|
2020-11-02 20:04:49 +00:00
|
|
|
|
2020-11-01 18:40:05 +00:00
|
|
|
} while (lstr.avail_in > 0 || lstr.avail_out == 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-11-09 22:52:22 +00:00
|
|
|
void LZMADeflatingWriteBuffer::finish()
|
2020-11-01 18:40:05 +00:00
|
|
|
{
|
|
|
|
if (finished)
|
|
|
|
return;
|
|
|
|
|
|
|
|
next();
|
|
|
|
|
2020-11-02 13:17:25 +00:00
|
|
|
do
|
|
|
|
{
|
2020-11-01 18:40:05 +00:00
|
|
|
out->nextIfAtEnd();
|
|
|
|
lstr.next_out = reinterpret_cast<unsigned char *>(out->position());
|
|
|
|
lstr.avail_out = out->buffer().end() - out->position();
|
|
|
|
|
|
|
|
lzma_ret ret = lzma_code(&lstr, LZMA_FINISH);
|
|
|
|
out->position() = out->buffer().end() - lstr.avail_out;
|
|
|
|
|
2020-11-02 20:04:49 +00:00
|
|
|
if (ret == LZMA_STREAM_END)
|
2020-11-02 13:17:25 +00:00
|
|
|
{
|
2020-11-01 23:52:34 +00:00
|
|
|
finished = true;
|
2020-11-02 20:04:49 +00:00
|
|
|
return;
|
2020-11-01 23:52:34 +00:00
|
|
|
}
|
2020-11-02 20:04:49 +00:00
|
|
|
|
2020-11-01 18:40:05 +00:00
|
|
|
if (ret != LZMA_OK)
|
2020-11-02 20:04:49 +00:00
|
|
|
throw Exception(
|
2020-11-11 01:50:56 +00:00
|
|
|
ErrorCodes::LZMA_STREAM_ENCODER_FAILED,
|
|
|
|
"lzma stream encoding failed: error code: {}; lzma version: {}",
|
|
|
|
ret,
|
|
|
|
LZMA_VERSION_STRING);
|
2020-11-02 20:04:49 +00:00
|
|
|
|
2020-11-01 18:40:05 +00:00
|
|
|
} while (lstr.avail_out == 0);
|
|
|
|
}
|
2020-11-02 13:17:25 +00:00
|
|
|
}
|
2020-11-12 09:21:33 +00:00
|
|
|
|
|
|
|
#endif
|