ClickHouse/src/IO/WriteBufferValidUTF8.cpp
Azat Khuzhin 98e3a99a88 Do not catch exceptions during final flush in writers destructors
Since this hides real problems, since destructor does final flush and if
it fails, then data will be lost.

One of such examples if MEMORY_LIMIT_EXCEEDED exception, so lock
exceptions from destructors, by using
MemoryTracker::LockExceptionInThread to block these exception, and allow
others (so std::terminate will be called, since this is c++11 with
noexcept for destructors by default).

Here is an example, that leads to empty block in the distributed batch:

    2021.01.21 12:43:18.619739 [ 46468 ] {7bd60d75-ebcb-45d2-874d-260df9a4ddac} <Error> virtual DB::CompressedWriteBuffer::~CompressedWriteBuffer(): Code: 241, e.displayText() = DB::Exception: Memory limit (for user) exceeded: would use 332.07 GiB (attempt to allocate chunk of 4355342 bytes), maximum: 256.00 GiB, Stack trace (when copying this message, always include the lines below):

    0. DB::Exception::Exception<>() @ 0x86f7b88 in /usr/bin/clickhouse
    ...
    4. void DB::PODArrayBase<>::resize<>(unsigned long) @ 0xe9e878d in /usr/bin/clickhouse
    5. DB::CompressedWriteBuffer::nextImpl() @ 0xe9f0296 in /usr/bin/clickhouse
    6. DB::CompressedWriteBuffer::~CompressedWriteBuffer() @ 0xe9f0415 in /usr/bin/clickhouse
    7. DB::DistributedBlockOutputStream::writeToShard() @ 0xf6bed4a in /usr/bin/clickhouse
2021-02-05 01:31:45 +03:00

146 lines
3.9 KiB
C++

#include <Poco/UTF8Encoding.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Common/MemoryTracker.h>
#include <common/types.h>
#ifdef __SSE2__
#include <emmintrin.h>
#endif
namespace DB
{
const size_t WriteBufferValidUTF8::DEFAULT_SIZE = 4096;
/** Index into the table below with the first byte of a UTF-8 sequence to
* get the number of trailing bytes that are supposed to follow it.
* Note that *legal* UTF-8 values can't have 4 or 5-bytes. The table is
* left as-is for anyone who may want to do such conversion, which was
* allowed in earlier algorithms.
*/
extern const UInt8 length_of_utf8_sequence[256] =
{
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2, 2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,
3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3, 4,4,4,4,4,4,4,4,5,5,5,5,6,6,6,6
};
WriteBufferValidUTF8::WriteBufferValidUTF8(
WriteBuffer & output_buffer_, bool group_replacements_, const char * replacement_, size_t size)
: BufferWithOwnMemory<WriteBuffer>(std::max(static_cast<size_t>(32), size)), output_buffer(output_buffer_),
group_replacements(group_replacements_), replacement(replacement_)
{
}
inline void WriteBufferValidUTF8::putReplacement()
{
if (replacement.empty() || (group_replacements && just_put_replacement))
return;
just_put_replacement = true;
output_buffer.write(replacement.data(), replacement.size());
}
inline void WriteBufferValidUTF8::putValid(char *data, size_t len)
{
if (len == 0)
return;
just_put_replacement = false;
output_buffer.write(data, len);
}
void WriteBufferValidUTF8::nextImpl()
{
char * p = memory.data();
char * valid_start = p;
while (p < pos)
{
#ifdef __SSE2__
/// Fast skip of ASCII
static constexpr size_t SIMD_BYTES = 16;
const char * simd_end = p + (pos - p) / SIMD_BYTES * SIMD_BYTES;
while (p < simd_end && !_mm_movemask_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i*>(p))))
p += SIMD_BYTES;
if (!(p < pos))
break;
#endif
size_t len = length_of_utf8_sequence[static_cast<unsigned char>(*p)];
if (len > 4)
{ // NOLINT
/// Invalid start of sequence. Skip one byte.
putValid(valid_start, p - valid_start);
putReplacement();
++p;
valid_start = p;
}
else if (p + len > pos)
{
/// Sequence was not fully written to this buffer.
break;
}
else if (Poco::UTF8Encoding::isLegal(reinterpret_cast<unsigned char *>(p), len))
{
/// Valid sequence.
p += len;
}
else
{
/// Invalid sequence. Skip just first byte.
putValid(valid_start, p - valid_start);
putReplacement();
++p;
valid_start = p;
}
}
putValid(valid_start, p - valid_start);
size_t cnt = pos - p;
/// Shift unfinished sequence to start of buffer.
for (size_t i = 0; i < cnt; ++i)
memory[i] = p[i];
working_buffer = Buffer(&memory[cnt], memory.data() + memory.size());
/// Propagate next() to the output buffer
output_buffer.next();
}
void WriteBufferValidUTF8::finish()
{
/// Write all complete sequences from buffer.
nextImpl();
/// If unfinished sequence at end, then write replacement.
if (working_buffer.begin() != memory.data())
putReplacement();
}
WriteBufferValidUTF8::~WriteBufferValidUTF8()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
finish();
}
}