2011-10-16 01:57:10 +00:00
|
|
|
#pragma once
|
|
|
|
|
2011-11-18 20:17:27 +00:00
|
|
|
#include <vector>
|
2011-10-16 01:57:10 +00:00
|
|
|
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <IO/WriteBuffer.h>
|
Do not catch exceptions during final flush in writers destructors
Since this hides real problems, since destructor does final flush and if
it fails, then data will be lost.
One of such examples if MEMORY_LIMIT_EXCEEDED exception, so lock
exceptions from destructors, by using
MemoryTracker::LockExceptionInThread to block these exception, and allow
others (so std::terminate will be called, since this is c++11 with
noexcept for destructors by default).
Here is an example, that leads to empty block in the distributed batch:
2021.01.21 12:43:18.619739 [ 46468 ] {7bd60d75-ebcb-45d2-874d-260df9a4ddac} <Error> virtual DB::CompressedWriteBuffer::~CompressedWriteBuffer(): Code: 241, e.displayText() = DB::Exception: Memory limit (for user) exceeded: would use 332.07 GiB (attempt to allocate chunk of 4355342 bytes), maximum: 256.00 GiB, Stack trace (when copying this message, always include the lines below):
0. DB::Exception::Exception<>() @ 0x86f7b88 in /usr/bin/clickhouse
...
4. void DB::PODArrayBase<>::resize<>(unsigned long) @ 0xe9e878d in /usr/bin/clickhouse
5. DB::CompressedWriteBuffer::nextImpl() @ 0xe9f0296 in /usr/bin/clickhouse
6. DB::CompressedWriteBuffer::~CompressedWriteBuffer() @ 0xe9f0415 in /usr/bin/clickhouse
7. DB::DistributedBlockOutputStream::writeToShard() @ 0xf6bed4a in /usr/bin/clickhouse
2021-01-22 18:56:50 +00:00
|
|
|
#include <Common/MemoryTracker.h>
|
2011-10-16 01:57:10 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2018-11-30 19:47:47 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int CANNOT_WRITE_AFTER_END_OF_BUFFER;
|
|
|
|
}
|
|
|
|
|
2018-11-30 15:36:41 +00:00
|
|
|
/** Writes data to existing std::vector or similar type. When not enough space, it doubles vector size.
|
|
|
|
*
|
2020-12-24 10:11:07 +00:00
|
|
|
* In destructor, vector is cut to the size of written data.
|
2020-07-19 02:55:08 +00:00
|
|
|
* You can call 'finalize' to resize earlier.
|
2018-11-30 15:36:41 +00:00
|
|
|
*
|
|
|
|
* The vector should live until this object is destroyed or until the 'finish' method is called.
|
2011-10-16 01:57:10 +00:00
|
|
|
*/
|
2018-11-30 15:36:41 +00:00
|
|
|
template <typename VectorType>
|
2011-10-16 01:57:10 +00:00
|
|
|
class WriteBufferFromVector : public WriteBuffer
|
|
|
|
{
|
|
|
|
private:
|
2017-04-01 07:20:54 +00:00
|
|
|
VectorType & vector;
|
2018-11-30 15:36:41 +00:00
|
|
|
bool is_finished = false;
|
2011-10-16 01:57:10 +00:00
|
|
|
|
2019-08-01 12:27:32 +00:00
|
|
|
static constexpr size_t initial_size = 32;
|
|
|
|
static constexpr size_t size_multiplier = 2;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
void nextImpl() override
|
|
|
|
{
|
2018-11-30 19:47:47 +00:00
|
|
|
if (is_finished)
|
|
|
|
throw Exception("WriteBufferFromVector is finished", ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER);
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t old_size = vector.size();
|
2020-11-10 11:43:22 +00:00
|
|
|
/// pos may not be equal to vector.data() + old_size, because WriteBuffer::next() can be used to flush data
|
|
|
|
size_t pos_offset = pos - reinterpret_cast<Position>(vector.data());
|
2019-08-01 12:27:32 +00:00
|
|
|
vector.resize(old_size * size_multiplier);
|
2020-11-10 11:43:22 +00:00
|
|
|
internal_buffer = Buffer(reinterpret_cast<Position>(vector.data() + pos_offset), reinterpret_cast<Position>(vector.data() + vector.size()));
|
2017-04-01 07:20:54 +00:00
|
|
|
working_buffer = internal_buffer;
|
|
|
|
}
|
2011-10-16 01:57:10 +00:00
|
|
|
|
|
|
|
public:
|
2021-02-26 15:29:26 +00:00
|
|
|
explicit WriteBufferFromVector(VectorType & vector_)
|
2017-11-15 18:24:48 +00:00
|
|
|
: WriteBuffer(reinterpret_cast<Position>(vector_.data()), vector_.size()), vector(vector_)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
if (vector.empty())
|
|
|
|
{
|
2018-11-30 15:36:41 +00:00
|
|
|
vector.resize(initial_size);
|
2018-09-02 03:00:04 +00:00
|
|
|
set(reinterpret_cast<Position>(vector.data()), vector.size());
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
}
|
2018-11-30 15:36:41 +00:00
|
|
|
|
2019-05-18 09:43:43 +00:00
|
|
|
/// Append to vector instead of rewrite.
|
2019-05-13 23:44:55 +00:00
|
|
|
struct AppendModeTag {};
|
|
|
|
WriteBufferFromVector(VectorType & vector_, AppendModeTag)
|
|
|
|
: WriteBuffer(nullptr, 0), vector(vector_)
|
|
|
|
{
|
|
|
|
size_t old_size = vector.size();
|
2019-08-01 12:27:32 +00:00
|
|
|
size_t size = (old_size < initial_size) ? initial_size
|
|
|
|
: ((old_size < vector.capacity()) ? vector.capacity()
|
|
|
|
: vector.capacity() * size_multiplier);
|
|
|
|
vector.resize(size);
|
|
|
|
set(reinterpret_cast<Position>(vector.data() + old_size), (size - old_size) * sizeof(typename VectorType::value_type));
|
2019-05-13 23:44:55 +00:00
|
|
|
}
|
|
|
|
|
2020-03-18 18:26:40 +00:00
|
|
|
void finalize() override final
|
2018-11-30 15:36:41 +00:00
|
|
|
{
|
2018-12-01 02:38:54 +00:00
|
|
|
if (is_finished)
|
|
|
|
return;
|
2018-11-30 15:36:41 +00:00
|
|
|
is_finished = true;
|
|
|
|
vector.resize(
|
|
|
|
((position() - reinterpret_cast<Position>(vector.data()))
|
|
|
|
+ sizeof(typename VectorType::value_type) - 1) /// Align up.
|
|
|
|
/ sizeof(typename VectorType::value_type));
|
2018-11-30 19:47:47 +00:00
|
|
|
|
|
|
|
/// Prevent further writes.
|
|
|
|
set(nullptr, 0);
|
2018-11-30 15:36:41 +00:00
|
|
|
}
|
|
|
|
|
2019-01-23 19:32:59 +00:00
|
|
|
bool isFinished() const { return is_finished; }
|
|
|
|
|
|
|
|
void restart()
|
|
|
|
{
|
2020-11-04 19:16:14 +00:00
|
|
|
if (vector.empty())
|
|
|
|
vector.resize(initial_size);
|
2019-01-23 19:32:59 +00:00
|
|
|
set(reinterpret_cast<Position>(vector.data()), vector.size());
|
|
|
|
is_finished = false;
|
|
|
|
}
|
|
|
|
|
2018-11-30 15:36:41 +00:00
|
|
|
~WriteBufferFromVector() override
|
|
|
|
{
|
Do not catch exceptions during final flush in writers destructors
Since this hides real problems, since destructor does final flush and if
it fails, then data will be lost.
One of such examples if MEMORY_LIMIT_EXCEEDED exception, so lock
exceptions from destructors, by using
MemoryTracker::LockExceptionInThread to block these exception, and allow
others (so std::terminate will be called, since this is c++11 with
noexcept for destructors by default).
Here is an example, that leads to empty block in the distributed batch:
2021.01.21 12:43:18.619739 [ 46468 ] {7bd60d75-ebcb-45d2-874d-260df9a4ddac} <Error> virtual DB::CompressedWriteBuffer::~CompressedWriteBuffer(): Code: 241, e.displayText() = DB::Exception: Memory limit (for user) exceeded: would use 332.07 GiB (attempt to allocate chunk of 4355342 bytes), maximum: 256.00 GiB, Stack trace (when copying this message, always include the lines below):
0. DB::Exception::Exception<>() @ 0x86f7b88 in /usr/bin/clickhouse
...
4. void DB::PODArrayBase<>::resize<>(unsigned long) @ 0xe9e878d in /usr/bin/clickhouse
5. DB::CompressedWriteBuffer::nextImpl() @ 0xe9f0296 in /usr/bin/clickhouse
6. DB::CompressedWriteBuffer::~CompressedWriteBuffer() @ 0xe9f0415 in /usr/bin/clickhouse
7. DB::DistributedBlockOutputStream::writeToShard() @ 0xf6bed4a in /usr/bin/clickhouse
2021-01-22 18:56:50 +00:00
|
|
|
/// FIXME move final flush into the caller
|
|
|
|
MemoryTracker::LockExceptionInThread lock;
|
|
|
|
finalize();
|
2018-11-30 15:36:41 +00:00
|
|
|
}
|
2011-10-16 01:57:10 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
}
|