Small refactoring of WriteBiffer-s

This commit is contained in:
avogar 2021-11-11 01:58:56 +03:00
parent 56e5e05fc5
commit c521a9131a
59 changed files with 530 additions and 662 deletions

View File

@ -13,30 +13,21 @@
#if defined(__linux__)
#include <sys/prctl.h>
#endif
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <cxxabi.h>
#include <unistd.h>
#include <typeinfo>
#include <iostream>
#include <fstream>
#include <sstream>
#include <memory>
#include <base/scope_guard.h>
#include <Poco/Observer.h>
#include <Poco/AutoPtr.h>
#include <Poco/PatternFormatter.h>
#include <Poco/Message.h>
#include <Poco/Util/Application.h>
#include <Poco/Exception.h>
#include <Poco/ErrorHandler.h>
#include <Poco/Condition.h>
#include <Poco/SyslogChannel.h>
#include <Poco/DirectoryIterator.h>
#include <base/logger_useful.h>
#include <base/ErrorHandlers.h>
@ -56,7 +47,6 @@
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/ClickHouseRevision.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/MemorySanitizer.h>
#include <Common/SymbolIndex.h>
#include <Common/getExecutablePath.h>
#include <Common/getHashOfLoadedBinary.h>

View File

@ -1,16 +1,12 @@
#include "OwnPatternFormatter.h"
#include <functional>
#include <optional>
#include <sys/time.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Common/HashTable/Hash.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Common/CurrentThread.h>
#include <base/getThreadId.h>
#include <base/terminalColors.h>
#include "Loggers.h"
OwnPatternFormatter::OwnPatternFormatter(bool color_)

View File

@ -518,7 +518,7 @@ StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & are
{
WriteBufferFromArena out(arena, begin);
func->serialize(data[n], out);
return out.finish();
return out.complete();
}
const char * ColumnAggregateFunction::deserializeAndInsertFromArena(const char * src_arena)

View File

@ -6,8 +6,6 @@
#include "CompressedWriteBuffer.h"
#include <Compression/CompressionFactory.h>
#include <Common/MemorySanitizer.h>
#include <Common/MemoryTracker.h>
@ -35,13 +33,14 @@ void CompressedWriteBuffer::nextImpl()
out.write(compressed_buffer.data(), compressed_size);
}
void CompressedWriteBuffer::finalize()
CompressedWriteBuffer::~CompressedWriteBuffer()
{
next();
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
}
CompressedWriteBuffer::CompressedWriteBuffer(
WriteBuffer & out_,
CompressionCodecPtr codec_,
@ -50,12 +49,4 @@ CompressedWriteBuffer::CompressedWriteBuffer(
{
}
CompressedWriteBuffer::~CompressedWriteBuffer()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
next();
}
}

View File

@ -15,21 +15,13 @@ namespace DB
class CompressedWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
private:
WriteBuffer & out;
CompressionCodecPtr codec;
PODArray<char> compressed_buffer;
void nextImpl() override;
public:
CompressedWriteBuffer(
WriteBuffer & out_,
CompressionCodecPtr codec_ = CompressionCodecFactory::instance().getDefaultCodec(),
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
void finalize() override;
~CompressedWriteBuffer() override;
/// The amount of compressed data
size_t getCompressedBytes()
@ -51,7 +43,13 @@ public:
return offset();
}
~CompressedWriteBuffer() override;
private:
void nextImpl() override;
WriteBuffer & out;
CompressionCodecPtr codec;
PODArray<char> compressed_buffer;
};
}

View File

@ -79,7 +79,7 @@ class ChangelogWriter
public:
ChangelogWriter(const std::string & filepath_, WriteMode mode, uint64_t start_index_)
: filepath(filepath_)
, file_buf(filepath, DBMS_DEFAULT_BUFFER_SIZE, mode == WriteMode::Rewrite ? -1 : (O_APPEND | O_CREAT | O_WRONLY))
, file_buf(std::make_unique<WriteBufferFromFile>(filepath, DBMS_DEFAULT_BUFFER_SIZE, mode == WriteMode::Rewrite ? -1 : (O_APPEND | O_CREAT | O_WRONLY)))
, start_index(start_index_)
{
auto compression_method = chooseCompressionMethod(filepath_, "");
@ -89,7 +89,7 @@ public:
}
else if (compression_method == CompressionMethod::Zstd)
{
compressed_buffer = std::make_unique<ZstdDeflatingAppendableWriteBuffer>(file_buf, /* compression level = */ 3, /* append_to_existing_stream = */ mode == WriteMode::Append);
compressed_buffer = std::make_unique<ZstdDeflatingAppendableWriteBuffer>(std::move(file_buf), /* compression level = */ 3, /* append_to_existing_stream = */ mode == WriteMode::Append);
}
else
{
@ -120,12 +120,14 @@ public:
compressed_buffer->next();
}
/// Flush working buffer to file system
file_buf.next();
WriteBuffer * working_buf = compressed_buffer ? compressed_buffer->getNestedBuffer() : file_buf.get();
/// Flush working buffer to file system
working_buf->next();
/// Fsync file system if needed
if (force_fsync)
file_buf.sync();
working_buf->sync();
}
uint64_t getStartIndex() const
@ -138,12 +140,12 @@ private:
{
if (compressed_buffer)
return *compressed_buffer;
return file_buf;
return *file_buf;
}
std::string filepath;
WriteBufferFromFile file_buf;
std::unique_ptr<WriteBuffer> compressed_buffer;
std::unique_ptr<WriteBufferFromFile> file_buf;
std::unique_ptr<WriteBufferWithOwnMemoryDecorator> compressed_buffer;
uint64_t start_index;
};

View File

@ -11,7 +11,7 @@ namespace ErrorCodes
void WriteBufferFromNuraftBuffer::nextImpl()
{
if (is_finished)
if (finalized)
throw Exception("WriteBufferFromNuraftBuffer is finished", ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER);
/// pos may not be equal to vector.data() + old_size, because WriteBuffer::next() can be used to flush data
@ -35,12 +35,8 @@ WriteBufferFromNuraftBuffer::WriteBufferFromNuraftBuffer()
set(reinterpret_cast<Position>(buffer->data_begin()), buffer->size());
}
void WriteBufferFromNuraftBuffer::finalize()
void WriteBufferFromNuraftBuffer::finalizeImpl()
{
if (is_finished)
return;
is_finished = true;
size_t real_size = pos - reinterpret_cast<Position>(buffer->data_begin());
nuraft::ptr<nuraft::buffer> new_buffer = nuraft::buffer::alloc(real_size);
memcpy(new_buffer->data_begin(), buffer->data_begin(), real_size);

View File

@ -8,23 +8,23 @@ namespace DB
class WriteBufferFromNuraftBuffer : public WriteBuffer
{
private:
nuraft::ptr<nuraft::buffer> buffer;
bool is_finished = false;
static constexpr size_t initial_size = 32;
static constexpr size_t size_multiplier = 2;
void nextImpl() override;
public:
WriteBufferFromNuraftBuffer();
void finalize() override final;
nuraft::ptr<nuraft::buffer> getBuffer();
bool isFinished() const { return is_finished; }
bool isFinished() const { return finalized; }
~WriteBufferFromNuraftBuffer() override;
private:
void finalizeImpl() override final;
void nextImpl() override;
nuraft::ptr<nuraft::buffer> buffer;
static constexpr size_t initial_size = 32;
static constexpr size_t size_multiplier = 2;
};
}

View File

@ -20,7 +20,7 @@ public:
{
try
{
CompletionAwareWriteBuffer::finalize();
finalize();
}
catch (...)
{
@ -28,12 +28,9 @@ public:
}
}
void finalize() override
void finalizeImpl() override
{
if (finalized)
return;
WriteBufferFromFileDecorator::finalize();
WriteBufferFromFileDecorator::finalizeImpl();
completion_callback();
}

View File

@ -93,7 +93,7 @@ public:
}
}
void finalize() override
void finalizeImpl() override
{
if (impl.isFinished())
return;

View File

@ -41,7 +41,7 @@ public:
{
try
{
RestartAwareWriteBuffer::finalize();
finalize();
}
catch (...)
{
@ -49,12 +49,9 @@ public:
}
}
void finalize() override
void finalizeImpl() override
{
if (finalized)
return;
WriteBufferFromFileDecorator::finalize();
WriteBufferFromFileDecorator::finalizeImpl();
lock.unlock();
}

View File

@ -25,7 +25,7 @@ WriteIndirectBufferFromRemoteFS<T>::~WriteIndirectBufferFromRemoteFS()
{
try
{
WriteIndirectBufferFromRemoteFS::finalize();
finalize();
}
catch (...)
{
@ -35,12 +35,9 @@ WriteIndirectBufferFromRemoteFS<T>::~WriteIndirectBufferFromRemoteFS()
template <typename T>
void WriteIndirectBufferFromRemoteFS<T>::finalize()
void WriteIndirectBufferFromRemoteFS<T>::finalizeImpl()
{
if (finalized)
return;
WriteBufferFromFileDecorator::finalize();
WriteBufferFromFileDecorator::finalizeImpl();
metadata.addObject(remote_fs_path, count());
metadata.save();

View File

@ -21,13 +21,13 @@ public:
virtual ~WriteIndirectBufferFromRemoteFS() override;
void finalize() override;
void sync() override;
String getFileName() const override { return metadata.metadata_file_path; }
private:
void finalizeImpl() override;
IDiskRemote::Metadata metadata;
String remote_fs_path;

View File

@ -128,7 +128,7 @@ private:
writeChar(':', out);
writeIntText(location.line, out);
return out.finish();
return out.complete();
}
else
{

View File

@ -32,13 +32,12 @@ public:
};
BrotliWriteBuffer::BrotliWriteBuffer(std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, brotli(std::make_unique<BrotliStateWrapper>())
, in_available(0)
, in_data(nullptr)
, out_capacity(0)
, out_data(nullptr)
, out(std::move(out_))
{
BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_QUALITY, static_cast<uint32_t>(compression_level));
// Set LZ77 window size. According to brotli sources default value is 24 (c/tools/brotli.c:81)
@ -47,9 +46,10 @@ BrotliWriteBuffer::BrotliWriteBuffer(std::unique_ptr<WriteBuffer> out_, int comp
BrotliWriteBuffer::~BrotliWriteBuffer()
{
/// FIXME move final flush into the caller
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();
finalizeImpl();
}
void BrotliWriteBuffer::nextImpl()
@ -96,27 +96,7 @@ void BrotliWriteBuffer::nextImpl()
}
}
void BrotliWriteBuffer::finish()
{
if (finished)
return;
try
{
finishImpl();
out->finalize();
finished = true;
}
catch (...)
{
/// Do not try to flush next time after exception.
out->position() = out->buffer().begin();
finished = true;
throw;
}
}
void BrotliWriteBuffer::finishImpl()
void BrotliWriteBuffer::finalizeBeforeNestedFinalize()
{
next();

View File

@ -2,11 +2,12 @@
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBufferDecorator.h>
namespace DB
{
class BrotliWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
class BrotliWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
BrotliWriteBuffer(
@ -18,13 +19,10 @@ public:
~BrotliWriteBuffer() override;
void finalize() override { finish(); }
private:
void nextImpl() override;
void finish();
void finishImpl();
void finalizeBeforeNestedFinalize() override;
class BrotliStateWrapper;
std::unique_ptr<BrotliStateWrapper> brotli;
@ -34,10 +32,6 @@ private:
size_t out_capacity;
uint8_t * out_data;
std::unique_ptr<WriteBuffer> out;
bool finished = false;
};
}

View File

@ -40,17 +40,17 @@ public:
};
Bzip2WriteBuffer::Bzip2WriteBuffer(std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, bz(std::make_unique<Bzip2StateWrapper>(compression_level))
, out(std::move(out_))
{
}
Bzip2WriteBuffer::~Bzip2WriteBuffer()
{
/// FIXME move final flush into the caller
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();
finalizeImpl();
}
void Bzip2WriteBuffer::nextImpl()
@ -92,27 +92,7 @@ void Bzip2WriteBuffer::nextImpl()
}
}
void Bzip2WriteBuffer::finish()
{
if (finished)
return;
try
{
finishImpl();
out->finalize();
finished = true;
}
catch (...)
{
/// Do not try to flush next time after exception.
out->position() = out->buffer().begin();
finished = true;
throw;
}
}
void Bzip2WriteBuffer::finishImpl()
void Bzip2WriteBuffer::finalizeBeforeNestedFinalize()
{
next();

View File

@ -2,11 +2,12 @@
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBufferDecorator.h>
namespace DB
{
class Bzip2WriteBuffer : public BufferWithOwnMemory<WriteBuffer>
class Bzip2WriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
Bzip2WriteBuffer(
@ -18,20 +19,13 @@ public:
~Bzip2WriteBuffer() override;
void finalize() override { finish(); }
private:
void nextImpl() override;
void finish();
void finishImpl();
void finalizeBeforeNestedFinalize() override;
class Bzip2StateWrapper;
std::unique_ptr<Bzip2StateWrapper> bz;
std::unique_ptr<WriteBuffer> out;
bool finished = false;
};
}

View File

@ -10,7 +10,7 @@ namespace ErrorCodes
LZMADeflatingWriteBuffer::LZMADeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment), out(std::move(out_))
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
{
lstr = LZMA_STREAM_INIT;
@ -47,11 +47,10 @@ LZMADeflatingWriteBuffer::LZMADeflatingWriteBuffer(
LZMADeflatingWriteBuffer::~LZMADeflatingWriteBuffer()
{
/// FIXME move final flush into the caller
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();
lzma_end(&lstr);
finalizeImpl();
}
void LZMADeflatingWriteBuffer::nextImpl()
@ -94,28 +93,7 @@ void LZMADeflatingWriteBuffer::nextImpl()
}
}
void LZMADeflatingWriteBuffer::finish()
{
if (finished)
return;
try
{
finishImpl();
out->finalize();
finished = true;
}
catch (...)
{
/// Do not try to flush next time after exception.
out->position() = out->buffer().begin();
finished = true;
throw;
}
}
void LZMADeflatingWriteBuffer::finishImpl()
void LZMADeflatingWriteBuffer::finalizeBeforeNestedFinalize()
{
next();
@ -142,5 +120,11 @@ void LZMADeflatingWriteBuffer::finishImpl()
} while (lstr.avail_out == 0);
}
void LZMADeflatingWriteBuffer::finalizeAfterNestedFinalize()
{
lzma_end(&lstr);
}
}

View File

@ -2,6 +2,7 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferDecorator.h>
#include <lzma.h>
@ -10,7 +11,7 @@ namespace DB
{
/// Performs compression using lzma library and writes compressed data to out_ WriteBuffer.
class LZMADeflatingWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
class LZMADeflatingWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
LZMADeflatingWriteBuffer(
@ -20,19 +21,15 @@ public:
char * existing_memory = nullptr,
size_t alignment = 0);
void finalize() override { finish(); }
~LZMADeflatingWriteBuffer() override;
private:
void nextImpl() override;
void finish();
void finishImpl();
void finalizeBeforeNestedFinalize() override;
void finalizeAfterNestedFinalize() override;
std::unique_ptr<WriteBuffer> out;
lzma_stream lstr;
bool finished = false;
};
}

View File

@ -12,8 +12,7 @@ namespace ErrorCodes
Lz4DeflatingWriteBuffer::Lz4DeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment)
, out(std::move(out_))
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, in_data(nullptr)
, out_data(nullptr)
, in_capacity(0)
@ -45,9 +44,10 @@ Lz4DeflatingWriteBuffer::Lz4DeflatingWriteBuffer(
Lz4DeflatingWriteBuffer::~Lz4DeflatingWriteBuffer()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();
LZ4F_freeCompressionContext(ctx);
finalizeImpl();
}
void Lz4DeflatingWriteBuffer::nextImpl()
@ -120,27 +120,7 @@ void Lz4DeflatingWriteBuffer::nextImpl()
}
}
void Lz4DeflatingWriteBuffer::finish()
{
if (finished)
return;
try
{
finishImpl();
out->finalize();
finished = true;
}
catch (...)
{
/// Do not try to flush next time after exception.
out->position() = out->buffer().begin();
finished = true;
throw;
}
}
void Lz4DeflatingWriteBuffer::finishImpl()
void Lz4DeflatingWriteBuffer::finalizeBeforeNestedFinalize()
{
next();
@ -165,4 +145,9 @@ void Lz4DeflatingWriteBuffer::finishImpl()
out->position() = out->buffer().end() - out_capacity;
}
void Lz4DeflatingWriteBuffer::finalizeAfterNestedFinalize()
{
LZ4F_freeCompressionContext(ctx);
}
}

View File

@ -3,6 +3,7 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressionMethod.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferDecorator.h>
#include <lz4.h>
#include <lz4frame.h>
@ -10,7 +11,7 @@
namespace DB
{
/// Performs compression using lz4 library and writes compressed data to out_ WriteBuffer.
class Lz4DeflatingWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
class Lz4DeflatingWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
Lz4DeflatingWriteBuffer(
@ -20,17 +21,13 @@ public:
char * existing_memory = nullptr,
size_t alignment = 0);
void finalize() override { finish(); }
~Lz4DeflatingWriteBuffer() override;
private:
void nextImpl() override;
void finish();
void finishImpl();
std::unique_ptr<WriteBuffer> out;
void finalizeBeforeNestedFinalize() override;
void finalizeAfterNestedFinalize() override;
LZ4F_preferences_t kPrefs;
LZ4F_compressionContext_t ctx;
@ -42,6 +39,5 @@ private:
size_t out_capacity;
bool first_time = true;
bool finished = false;
};
}

View File

@ -58,8 +58,8 @@ public:
pos = working_buffer.begin();
}
/** it is desirable in the derived classes to place the next() call in the destructor,
* so that the last data is written
/** it is desirable in the derived classes to place the finalize() call in the destructor,
* so that the last data is written (if finalize() wasn't called explicitly)
*/
virtual ~WriteBuffer() = default;
@ -100,10 +100,23 @@ public:
next();
}
virtual void finalize()
/// Write the last data.
void finalize()
{
if (finalized)
return;
finalizeImpl();
finalized = true;
}
protected:
virtual void finalizeImpl()
{
next();
}
bool finalized = false;
private:
/** Write the data in the buffer (from the beginning of the buffer to the current position).

View File

@ -0,0 +1,50 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <utility>
#include <memory>
namespace DB
{
class WriteBuffer;
/// WriteBuffer that works with another WriteBuffer.
template <class Base>
class WriteBufferDecorator : public Base
{
public:
template <class ... BaseArgs>
explicit WriteBufferDecorator(std::unique_ptr<WriteBuffer> out_, BaseArgs && ... args)
: Base(std::forward<BaseArgs>(args)...), out(std::move(out_))
{
}
void finalizeImpl() override
{
try
{
finalizeBeforeNestedFinalize();
out->finalize();
finalizeAfterNestedFinalize();
}
catch (...)
{
/// Do not try to flush next time after exception.
out->position() = out->buffer().begin();
throw;
}
}
WriteBuffer * getNestedBuffer() { return out.get(); }
protected:
virtual void finalizeBeforeNestedFinalize() {}
virtual void finalizeAfterNestedFinalize() {}
std::unique_ptr<WriteBuffer> out;
};
using WriteBufferWithOwnMemoryDecorator = WriteBufferDecorator<BufferWithOwnMemory<WriteBuffer>>;
}

View File

@ -15,6 +15,23 @@ namespace DB
*/
class WriteBufferFromArena final : public WriteBuffer
{
public:
/// begin_ - start of previously used contiguous memory segment or nullptr (see Arena::allocContinue method).
WriteBufferFromArena(Arena & arena_, const char *& begin_)
: WriteBuffer(nullptr, 0), arena(arena_), begin(begin_)
{
nextImpl();
pos = working_buffer.begin();
}
StringRef complete()
{
/// Return over-allocated memory back into arena.
arena.rollback(buffer().end() - position());
/// Reference to written data.
return { position() - count(), count() };
}
private:
Arena & arena;
const char *& begin;
@ -46,23 +63,6 @@ private:
internalBuffer() = Buffer(const_cast<char *>(begin), end);
buffer() = Buffer(continuation, end);
}
public:
/// begin_ - start of previously used contiguous memory segment or nullptr (see Arena::allocContinue method).
WriteBufferFromArena(Arena & arena_, const char *& begin_)
: WriteBuffer(nullptr, 0), arena(arena_), begin(begin_)
{
nextImpl();
pos = working_buffer.begin();
}
StringRef finish()
{
/// Return over-allocated memory back into arena.
arena.rollback(buffer().end() - position());
/// Reference to written data.
return { position() - count(), count() };
}
};
}

View File

@ -12,8 +12,7 @@ WriteBufferFromEncryptedFile::WriteBufferFromEncryptedFile(
const String & key_,
const FileEncryption::Header & header_,
size_t old_file_size)
: WriteBufferFromFileBase(buffer_size_, nullptr, 0)
, out(std::move(out_))
: WriteBufferDecorator<WriteBufferFromFileBase>(std::move(out_), buffer_size_, nullptr, 0)
, header(header_)
, flush_header(!old_file_size)
, encryptor(header.algorithm, key_, header.init_vector)
@ -23,32 +22,13 @@ WriteBufferFromEncryptedFile::WriteBufferFromEncryptedFile(
WriteBufferFromEncryptedFile::~WriteBufferFromEncryptedFile()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();
}
void WriteBufferFromEncryptedFile::finish()
{
if (finished)
if (finalized)
return;
try
{
finishImpl();
out->finalize();
finished = true;
}
catch (...)
{
/// Do not try to flush next time after exception.
out->position() = out->buffer().begin();
finished = true;
throw;
}
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
}
void WriteBufferFromEncryptedFile::finishImpl()
void WriteBufferFromEncryptedFile::finalizeBeforeNestedFinalize()
{
/// If buffer has pending data - write it.
next();
@ -56,8 +36,6 @@ void WriteBufferFromEncryptedFile::finishImpl()
/// Note that if there is no data to write an empty file will be written, even without the initialization vector
/// (see nextImpl(): it writes the initialization vector only if there is some data ready to write).
/// That's fine because DiskEncrypted allows files without initialization vectors when they're empty.
out->finalize();
}
void WriteBufferFromEncryptedFile::sync()

View File

@ -5,13 +5,14 @@
#if USE_SSL
#include <IO/WriteBufferFromFileBase.h>
#include <IO/FileEncryptionCommon.h>
#include <IO/WriteBufferDecorator.h>
namespace DB
{
/// Encrypts data and writes the encrypted data to the underlying write buffer.
class WriteBufferFromEncryptedFile : public WriteBufferFromFileBase
class WriteBufferFromEncryptedFile : public WriteBufferDecorator<WriteBufferFromFileBase>
{
public:
/// `old_file_size` should be set to non-zero if we're going to append an existing file.
@ -21,21 +22,17 @@ public:
const String & key_,
const FileEncryption::Header & header_,
size_t old_file_size = 0);
~WriteBufferFromEncryptedFile() override;
void sync() override;
void finalize() override { finish(); }
std::string getFileName() const override { return out->getFileName(); }
std::string getFileName() const override { return dynamic_cast<WriteBufferFromFileBase *>(out.get())->getFileName(); }
private:
void nextImpl() override;
void finish();
void finishImpl();
bool finished = false;
std::unique_ptr<WriteBufferFromFileBase> out;
void finalizeBeforeNestedFinalize() override;
FileEncryption::Header header;
bool flush_header = false;

View File

@ -70,15 +70,19 @@ WriteBufferFromFile::WriteBufferFromFile(
fd_ = -1;
}
WriteBufferFromFile::~WriteBufferFromFile()
{
if (finalized || fd < 0)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
}
void WriteBufferFromFile::finalizeImpl()
{
if (fd < 0)
return;
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
next();
::close(fd);

View File

@ -53,6 +53,9 @@ public:
{
return file_name;
}
private:
void finalizeImpl() override;
};
}

View File

@ -11,22 +11,17 @@ WriteBufferFromFileDecorator::WriteBufferFromFileDecorator(std::unique_ptr<Write
swap(*impl);
}
void WriteBufferFromFileDecorator::finalize()
void WriteBufferFromFileDecorator::finalizeImpl()
{
if (finalized)
return;
next();
impl->finalize();
finalized = true;
}
WriteBufferFromFileDecorator::~WriteBufferFromFileDecorator()
{
try
{
WriteBufferFromFileDecorator::finalize();
finalize();
}
catch (...)
{

View File

@ -13,15 +13,14 @@ public:
~WriteBufferFromFileDecorator() override;
void finalize() override;
void sync() override;
std::string getFileName() const override;
protected:
void finalizeImpl() override;
std::unique_ptr<WriteBuffer> impl;
bool finalized = false;
private:
void nextImpl() override;

View File

@ -95,6 +95,14 @@ WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor(
WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
}
void WriteBufferFromFileDescriptor::finalizeImpl()
{
if (fd < 0)
{
@ -102,12 +110,9 @@ WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor()
return;
}
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
next();
}
void WriteBufferFromFileDescriptor::sync()
{
/// If buffer has pending data - write it.

View File

@ -10,13 +10,6 @@ namespace DB
*/
class WriteBufferFromFileDescriptor : public WriteBufferFromFileBase
{
protected:
int fd;
/// If file has name contains filename, otherwise contains string "(fd=...)"
std::string file_name;
void nextImpl() override;
public:
WriteBufferFromFileDescriptor(
int fd_ = -1,
@ -51,6 +44,16 @@ public:
std::string getFileName() const override;
off_t size() const;
protected:
void nextImpl() override;
int fd;
/// If file has name contains filename, otherwise contains string "(fd=...)"
std::string file_name;
private:
void finalizeImpl() override;
};
}

View File

@ -20,7 +20,7 @@ WriteBufferFromHTTP::WriteBufferFromHTTP(
ostr = &session->sendRequest(request);
}
void WriteBufferFromHTTP::finalize()
void WriteBufferFromHTTP::finalizeImpl()
{
receiveResponse(*session, request, response, false);
/// TODO: Response body is ignored.

View File

@ -17,19 +17,19 @@ namespace DB
*/
class WriteBufferFromHTTP : public WriteBufferFromOStream
{
public:
explicit WriteBufferFromHTTP(const Poco::URI & uri,
const std::string & method = Poco::Net::HTTPRequest::HTTP_POST, // POST or PUT only
const ConnectionTimeouts & timeouts = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
private:
/// Receives response from the server after sending all data.
void finalizeImpl() override;
HTTPSessionPtr session;
Poco::Net::HTTPRequest request;
Poco::Net::HTTPResponse response;
public:
explicit WriteBufferFromHTTP(const Poco::URI & uri,
const std::string & method = Poco::Net::HTTPRequest::HTTP_POST, // POST or PUT only
const ConnectionTimeouts & timeouts = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
/// Receives response from the server after sending all data.
void finalize() override;
};
}

View File

@ -42,9 +42,10 @@ WriteBufferFromOStream::WriteBufferFromOStream(
WriteBufferFromOStream::~WriteBufferFromOStream()
{
/// FIXME move final flush into the caller
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
next();
finalizeImpl();
}
}

View File

@ -11,13 +11,6 @@ namespace DB
class WriteBufferFromOStream : public BufferWithOwnMemory<WriteBuffer>
{
protected:
std::ostream * ostr{};
void nextImpl() override;
WriteBufferFromOStream(size_t size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0);
public:
WriteBufferFromOStream(
std::ostream & ostr_,
@ -26,6 +19,13 @@ public:
size_t alignment = 0);
~WriteBufferFromOStream() override;
protected:
WriteBufferFromOStream(size_t size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0);
void nextImpl() override;
std::ostream * ostr{};
};
}

View File

@ -83,9 +83,10 @@ WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_
WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket()
{
/// FIXME move final flush into the caller
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
next();
finalizeImpl();
}
}

View File

@ -13,7 +13,14 @@ namespace DB
*/
class WriteBufferFromPocoSocket : public BufferWithOwnMemory<WriteBuffer>
{
public:
WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
~WriteBufferFromPocoSocket() override;
protected:
void nextImpl() override;
Poco::Net::Socket & socket;
/** For error messages. It is necessary to receive this address in advance, because,
@ -21,14 +28,6 @@ protected:
* (getpeername will return an error).
*/
Poco::Net::SocketAddress peer_address;
void nextImpl() override;
public:
WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
~WriteBufferFromPocoSocket() override;
};
}

View File

@ -84,18 +84,16 @@ void WriteBufferFromS3::allocateBuffer()
last_part_size = 0;
}
void WriteBufferFromS3::finalize()
WriteBufferFromS3::~WriteBufferFromS3()
{
/// FIXME move final flush into the caller
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
}
void WriteBufferFromS3::finalizeImpl()
{
if (finalized)
return;
next();
if (multipart_upload_id.empty())
@ -108,8 +106,6 @@ void WriteBufferFromS3::finalizeImpl()
writePart();
completeMultipartUpload();
}
finalized = true;
}
void WriteBufferFromS3::createMultipartUpload()
@ -167,7 +163,7 @@ void WriteBufferFromS3::writePart()
{
auto etag = outcome.GetResult().GetETag();
part_tags.push_back(etag);
LOG_DEBUG(log, "Writing part finished. Bucket: {}, Key: {}, Upload_id: {}, Etag: {}, Parts: {}", bucket, key, multipart_upload_id, etag, part_tags.size());
LOG_DEBUG(log, "Writing part finalizeImpl()ed. Bucket: {}, Key: {}, Upload_id: {}, Etag: {}, Parts: {}", bucket, key, multipart_upload_id, etag, part_tags.size());
}
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);

View File

@ -31,7 +31,32 @@ namespace DB
*/
class WriteBufferFromS3 : public BufferWithOwnMemory<WriteBuffer>
{
public:
explicit WriteBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
size_t minimum_upload_part_size_,
size_t max_single_part_upload_size_,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
~WriteBufferFromS3() override;
void nextImpl() override;
private:
void allocateBuffer();
void createMultipartUpload();
void writePart();
void completeMultipartUpload();
void makeSinglepartUpload();
/// Receives response from the server after sending all data.
void finalizeImpl() override;
String bucket;
String key;
std::optional<std::map<String, String>> object_metadata;
@ -43,39 +68,11 @@ private:
size_t last_part_size;
/// Upload in S3 is made in parts.
/// We initiate upload, then upload each part and get ETag as a response, and then finish upload with listing all our parts.
/// We initiate upload, then upload each part and get ETag as a response, and then finalizeImpl() upload with listing all our parts.
String multipart_upload_id;
std::vector<String> part_tags;
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
public:
explicit WriteBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
size_t minimum_upload_part_size_,
size_t max_single_part_upload_size_,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
void nextImpl() override;
/// Receives response from the server after sending all data.
void finalize() override;
private:
bool finalized = false;
void allocateBuffer();
void createMultipartUpload();
void writePart();
void completeMultipartUpload();
void makeSinglepartUpload();
void finalizeImpl();
};
}

View File

@ -19,14 +19,11 @@ public:
~WriteBufferFromTemporaryFile() override;
protected:
private:
WriteBufferFromTemporaryFile(std::unique_ptr<TemporaryFile> && tmp_file);
std::shared_ptr<ReadBuffer> getReadBufferImpl() override;
protected:
std::unique_ptr<TemporaryFile> tmp_file;
friend class ReadBufferFromTemporaryWriteBuffer;

View File

@ -19,31 +19,11 @@ namespace ErrorCodes
* In destructor, vector is cut to the size of written data.
* You can call 'finalize' to resize earlier.
*
* The vector should live until this object is destroyed or until the 'finish' method is called.
* The vector should live until this object is destroyed or until the 'finalizeImpl()' method is called.
*/
template <typename VectorType>
class WriteBufferFromVector : public WriteBuffer
{
private:
VectorType & vector;
bool is_finished = false;
static constexpr size_t initial_size = 32;
static constexpr size_t size_multiplier = 2;
void nextImpl() override
{
if (is_finished)
throw Exception("WriteBufferFromVector is finished", ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER);
size_t old_size = vector.size();
/// pos may not be equal to vector.data() + old_size, because WriteBuffer::next() can be used to flush data
size_t pos_offset = pos - reinterpret_cast<Position>(vector.data());
vector.resize(old_size * size_multiplier);
internal_buffer = Buffer(reinterpret_cast<Position>(vector.data() + pos_offset), reinterpret_cast<Position>(vector.data() + vector.size()));
working_buffer = internal_buffer;
}
public:
explicit WriteBufferFromVector(VectorType & vector_)
: WriteBuffer(reinterpret_cast<Position>(vector_.data()), vector_.size()), vector(vector_)
@ -68,11 +48,27 @@ public:
set(reinterpret_cast<Position>(vector.data() + old_size), (size - old_size) * sizeof(typename VectorType::value_type));
}
void finalize() override final
bool isFinished() const { return finalized; }
void restart()
{
if (is_finished)
if (vector.empty())
vector.resize(initial_size);
set(reinterpret_cast<Position>(vector.data()), vector.size());
finalized = false;
}
~WriteBufferFromVector() override
{
if (finalized)
return;
is_finished = true;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
}
private:
void finalizeImpl() override final
{
vector.resize(
((position() - reinterpret_cast<Position>(vector.data()))
+ sizeof(typename VectorType::value_type) - 1) /// Align up.
@ -82,22 +78,23 @@ public:
set(nullptr, 0);
}
bool isFinished() const { return is_finished; }
void restart()
void nextImpl() override
{
if (vector.empty())
vector.resize(initial_size);
set(reinterpret_cast<Position>(vector.data()), vector.size());
is_finished = false;
if (finalized)
throw Exception("WriteBufferFromVector is finalized", ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER);
size_t old_size = vector.size();
/// pos may not be equal to vector.data() + old_size, because WriteBuffer::next() can be used to flush data
size_t pos_offset = pos - reinterpret_cast<Position>(vector.data());
vector.resize(old_size * size_multiplier);
internal_buffer = Buffer(reinterpret_cast<Position>(vector.data() + pos_offset), reinterpret_cast<Position>(vector.data() + vector.size()));
working_buffer = internal_buffer;
}
~WriteBufferFromVector() override
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalize();
}
VectorType & vector;
static constexpr size_t initial_size = 32;
static constexpr size_t size_multiplier = 2;
};
}

View File

@ -123,8 +123,15 @@ void WriteBufferValidUTF8::nextImpl()
output_buffer.next();
}
WriteBufferValidUTF8::~WriteBufferValidUTF8()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
}
void WriteBufferValidUTF8::finish()
void WriteBufferValidUTF8::finalizeImpl()
{
/// Write all complete sequences from buffer.
nextImpl();
@ -134,12 +141,4 @@ void WriteBufferValidUTF8::finish()
putReplacement();
}
WriteBufferValidUTF8::~WriteBufferValidUTF8()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();
}
}

View File

@ -13,19 +13,6 @@ namespace DB
*/
class WriteBufferValidUTF8 final : public BufferWithOwnMemory<WriteBuffer>
{
private:
WriteBuffer & output_buffer;
bool group_replacements;
/// The last recorded character was `replacement`.
bool just_put_replacement = false;
std::string replacement;
void putReplacement();
void putValid(char * data, size_t len);
void nextImpl() override;
void finish();
public:
static const size_t DEFAULT_SIZE;
@ -36,6 +23,19 @@ public:
size_t size = DEFAULT_SIZE);
~WriteBufferValidUTF8() override;
private:
void putReplacement();
void putValid(char * data, size_t len);
void nextImpl() override;
void finalizeImpl() override;
WriteBuffer & output_buffer;
bool group_replacements;
/// The last recorded character was `replacement`.
bool just_put_replacement = false;
std::string replacement;
};
}

View File

@ -1,5 +1,4 @@
#include <IO/ZlibDeflatingWriteBuffer.h>
#include <Common/MemorySanitizer.h>
#include <Common/MemoryTracker.h>
#include <Common/Exception.h>
@ -20,8 +19,7 @@ ZlibDeflatingWriteBuffer::ZlibDeflatingWriteBuffer(
size_t buf_size,
char * existing_memory,
size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment)
, out(std::move(out_))
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
{
zstr.zalloc = nullptr;
zstr.zfree = nullptr;
@ -46,27 +44,6 @@ ZlibDeflatingWriteBuffer::ZlibDeflatingWriteBuffer(
throw Exception(std::string("deflateInit2 failed: ") + zError(rc) + "; zlib version: " + ZLIB_VERSION, ErrorCodes::ZLIB_DEFLATE_FAILED);
}
ZlibDeflatingWriteBuffer::~ZlibDeflatingWriteBuffer()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();
try
{
int rc = deflateEnd(&zstr);
if (rc != Z_OK)
throw Exception(std::string("deflateEnd failed: ") + zError(rc), ErrorCodes::ZLIB_DEFLATE_FAILED);
}
catch (...)
{
/// It is OK not to terminate under an error from deflateEnd()
/// since all data already written to the stream.
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void ZlibDeflatingWriteBuffer::nextImpl()
{
if (!offset())
@ -99,27 +76,15 @@ void ZlibDeflatingWriteBuffer::nextImpl()
}
}
void ZlibDeflatingWriteBuffer::finish()
ZlibDeflatingWriteBuffer::~ZlibDeflatingWriteBuffer()
{
if (finished)
if (finalized)
return;
try
{
finishImpl();
out->finalize();
finished = true;
}
catch (...)
{
/// Do not try to flush next time after exception.
out->position() = out->buffer().begin();
finished = true;
throw;
}
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
}
void ZlibDeflatingWriteBuffer::finishImpl()
void ZlibDeflatingWriteBuffer::finalizeBeforeNestedFinalize()
{
next();
@ -153,7 +118,23 @@ void ZlibDeflatingWriteBuffer::finishImpl()
}
if (rc != Z_OK)
throw Exception(std::string("deflate finish failed: ") + zError(rc), ErrorCodes::ZLIB_DEFLATE_FAILED);
throw Exception(std::string("deflate finalizeImpl() failed: ") + zError(rc), ErrorCodes::ZLIB_DEFLATE_FAILED);
}
}
void ZlibDeflatingWriteBuffer::finalizeAfterNestedFinalize()
{
try
{
int rc = deflateEnd(&zstr);
if (rc != Z_OK)
throw Exception(std::string("deflateEnd failed: ") + zError(rc), ErrorCodes::ZLIB_DEFLATE_FAILED);
}
catch (...)
{
/// It is OK not to terminate under an error from deflateEnd()
/// since all data already written to the stream.
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}

View File

@ -3,6 +3,7 @@
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressionMethod.h>
#include <IO/WriteBufferDecorator.h>
#include <zlib.h>
@ -12,7 +13,7 @@ namespace DB
{
/// Performs compression using zlib library and writes compressed data to out_ WriteBuffer.
class ZlibDeflatingWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
class ZlibDeflatingWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
ZlibDeflatingWriteBuffer(
@ -23,22 +24,18 @@ public:
char * existing_memory = nullptr,
size_t alignment = 0);
void finalize() override { finish(); }
~ZlibDeflatingWriteBuffer() override;
private:
void nextImpl() override;
void finishImpl();
/// Flush all pending data and write zlib footer to the underlying buffer.
/// After the first call to this function, subsequent calls will have no effect and
/// an attempt to write to this buffer will result in exception.
void finish();
virtual void finalizeBeforeNestedFinalize() override;
virtual void finalizeAfterNestedFinalize() override;
std::unique_ptr<WriteBuffer> out;
z_stream zstr;
bool finished = false;
};
}

View File

@ -11,10 +11,9 @@ namespace ErrorCodes
}
ZstdDeflatingAppendableWriteBuffer::ZstdDeflatingAppendableWriteBuffer(
WriteBuffer & out_, int compression_level, bool append_to_existing_stream_,
std::unique_ptr<WriteBuffer> out_, int compression_level, bool append_to_existing_stream_,
size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment)
, out(out_)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, append_to_existing_stream(append_to_existing_stream_)
{
cctx = ZSTD_createCCtx();
@ -50,18 +49,18 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
bool ended = false;
do
{
out.nextIfAtEnd();
out->nextIfAtEnd();
output.dst = reinterpret_cast<unsigned char *>(out.buffer().begin());
output.size = out.buffer().size();
output.pos = out.offset();
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
output.size = out->buffer().size();
output.pos = out->offset();
size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, mode);
if (ZSTD_isError(compression_result))
throw Exception(
ErrorCodes::ZSTD_ENCODER_FAILED, "Zstd stream encoding failed: error code: {}; zstd version: {}", ZSTD_getErrorName(compression_result), ZSTD_VERSION_STRING);
out.position() = out.buffer().begin() + output.pos;
out->position() = out->buffer().begin() + output.pos;
bool everything_was_compressed = (input.pos == input.size);
bool everything_was_flushed = compression_result == 0;
@ -72,42 +71,57 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
catch (...)
{
/// Do not try to write next time after exception.
out.position() = out.buffer().begin();
out->position() = out->buffer().begin();
throw;
}
}
void ZstdDeflatingAppendableWriteBuffer::finish()
{
if (finished || first_write)
{
/// Nothing was written or we have already finished
return;
}
try
{
finishImpl();
out.finalize();
finished = true;
}
catch (...)
{
/// Do not try to flush next time after exception.
out.position() = out.buffer().begin();
finished = true;
throw;
}
}
ZstdDeflatingAppendableWriteBuffer::~ZstdDeflatingAppendableWriteBuffer()
{
/// FIXME move final flush into the caller
if (finalized || first_write)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
}
finish();
void ZstdDeflatingAppendableWriteBuffer::finalizeImpl()
{
if (first_write)
{
/// Nothing was written
return;
}
WriteBufferDecorator::finalizeImpl();
}
void ZstdDeflatingAppendableWriteBuffer::finalizeBeforeNestedFinalize()
{
next();
out->nextIfAtEnd();
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
input.size = offset();
input.pos = 0;
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
output.size = out->buffer().size();
output.pos = out->offset();
size_t remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
while (remaining != 0)
{
if (ZSTD_isError(remaining))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder end failed: error: '{}' zstd version: {}", ZSTD_getErrorName(remaining), ZSTD_VERSION_STRING);
remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
}
out->position() = out->buffer().begin() + output.pos;
}
void ZstdDeflatingAppendableWriteBuffer::finalizeAfterNestedFinalize()
{
try
{
int err = ZSTD_freeCCtx(cctx);
@ -123,42 +137,17 @@ ZstdDeflatingAppendableWriteBuffer::~ZstdDeflatingAppendableWriteBuffer()
}
}
void ZstdDeflatingAppendableWriteBuffer::finishImpl()
{
next();
out.nextIfAtEnd();
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
input.size = offset();
input.pos = 0;
output.dst = reinterpret_cast<unsigned char *>(out.buffer().begin());
output.size = out.buffer().size();
output.pos = out.offset();
size_t remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
while (remaining != 0)
{
if (ZSTD_isError(remaining))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder end failed: error: '{}' zstd version: {}", ZSTD_getErrorName(remaining), ZSTD_VERSION_STRING);
remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
}
out.position() = out.buffer().begin() + output.pos;
}
void ZstdDeflatingAppendableWriteBuffer::addEmptyBlock()
{
/// HACK: https://github.com/facebook/zstd/issues/2090#issuecomment-620158967
static const char empty_block[3] = {0x01, 0x00, 0x00};
if (out.buffer().size() - out.offset() < sizeof(empty_block))
out.next();
if (out->buffer().size() - out->offset() < sizeof(empty_block))
out->next();
std::memcpy(out.buffer().begin() + out.offset(), empty_block, sizeof(empty_block));
std::memcpy(out->buffer().begin() + out->offset(), empty_block, sizeof(empty_block));
out.position() = out.buffer().begin() + out.offset() + sizeof(empty_block);
out->position() = out->buffer().begin() + out->offset() + sizeof(empty_block);
}
}

View File

@ -3,6 +3,7 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressionMethod.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferDecorator.h>
#include <zstd.h>
@ -19,51 +20,47 @@ namespace DB
/// said that there is no risks of compatibility issues https://github.com/facebook/zstd/issues/2090#issuecomment-620158967.
/// 2) Doesn't support internal ZSTD check-summing, because ZSTD checksums written at the end of frame (frame epilogue).
///
class ZstdDeflatingAppendableWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
class ZstdDeflatingAppendableWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
ZstdDeflatingAppendableWriteBuffer(
WriteBuffer & out_,
std::unique_ptr<WriteBuffer> out_,
int compression_level,
bool append_to_existing_stream_, /// if true then out mustn't be empty
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
/// Write terminating ZSTD_e_end: empty block + frame epilogue. BTW it
/// should be almost noop, because frame epilogue contains only checksums,
/// and they are disabled for this buffer.
void finalize() override { finish(); }
~ZstdDeflatingAppendableWriteBuffer() override;
void sync() override
{
next();
out.sync();
out->sync();
}
private:
/// NOTE: will fill compressed data to the out.working_buffer, but will not call out.next method until the buffer is full
void nextImpl() override;
/// Write terminating ZSTD_e_end: empty block + frame epilogue. BTW it
/// should be almost noop, because frame epilogue contains only checksums,
/// and they are disabled for this buffer.
/// Flush all pending data and write zstd footer to the underlying buffer.
/// After the first call to this function, subsequent calls will have no effect and
/// an attempt to write to this buffer will result in exception.
void finish();
void finishImpl();
void finalizeImpl() override;
void finalizeBeforeNestedFinalize() override;
void finalizeAfterNestedFinalize() override;
/// Adding zstd empty block to out.working_buffer
void addEmptyBlock();
WriteBuffer & out;
/// We appending data to existing stream so on the first nextImpl call we
/// will append empty block.
bool append_to_existing_stream;
ZSTD_CCtx * cctx;
ZSTD_inBuffer input;
ZSTD_outBuffer output;
/// Flipped in finish call
bool finished = false;
/// Flipped on the first nextImpl call
bool first_write = true;
};

View File

@ -11,7 +11,7 @@ namespace ErrorCodes
ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment), out(std::move(out_))
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
{
cctx = ZSTD_createCCtx();
if (cctx == nullptr)
@ -30,24 +30,10 @@ ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer(
ZstdDeflatingWriteBuffer::~ZstdDeflatingWriteBuffer()
{
/// FIXME move final flush into the caller
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();
try
{
int err = ZSTD_freeCCtx(cctx);
/// This is just in case, since it is impossible to get an error by using this wrapper.
if (unlikely(err))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "ZSTD_freeCCtx failed: error: '{}'; zstd version: {}", ZSTD_getErrorName(err), ZSTD_VERSION_STRING);
}
catch (...)
{
/// It is OK not to terminate under an error from ZSTD_freeCCtx()
/// since all data already written to the stream.
tryLogCurrentException(__PRETTY_FUNCTION__);
}
finalizeImpl();
}
void ZstdDeflatingWriteBuffer::nextImpl()
@ -94,27 +80,7 @@ void ZstdDeflatingWriteBuffer::nextImpl()
}
}
void ZstdDeflatingWriteBuffer::finish()
{
if (finished)
return;
try
{
finishImpl();
out->finalize();
finished = true;
}
catch (...)
{
/// Do not try to flush next time after exception.
out->position() = out->buffer().begin();
finished = true;
throw;
}
}
void ZstdDeflatingWriteBuffer::finishImpl()
void ZstdDeflatingWriteBuffer::finalizeBeforeNestedFinalize()
{
next();
@ -134,4 +100,21 @@ void ZstdDeflatingWriteBuffer::finishImpl()
out->position() = out->buffer().begin() + output.pos;
}
void ZstdDeflatingWriteBuffer::finalizeAfterNestedFinalize()
{
try
{
int err = ZSTD_freeCCtx(cctx);
/// This is just in case, since it is impossible to get an error by using this wrapper.
if (unlikely(err))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "ZSTD_freeCCtx failed: error: '{}'; zstd version: {}", ZSTD_getErrorName(err), ZSTD_VERSION_STRING);
}
catch (...)
{
/// It is OK not to terminate under an error from ZSTD_freeCCtx()
/// since all data already written to the stream.
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -3,6 +3,7 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressionMethod.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferDecorator.h>
#include <zstd.h>
@ -10,7 +11,7 @@ namespace DB
{
/// Performs compression using zstd library and writes compressed data to out_ WriteBuffer.
class ZstdDeflatingWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
class ZstdDeflatingWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
ZstdDeflatingWriteBuffer(
@ -20,8 +21,6 @@ public:
char * existing_memory = nullptr,
size_t alignment = 0);
void finalize() override { finish(); }
~ZstdDeflatingWriteBuffer() override;
void sync() override
@ -35,14 +34,12 @@ private:
/// Flush all pending data and write zstd footer to the underlying buffer.
/// After the first call to this function, subsequent calls will have no effect and
/// an attempt to write to this buffer will result in exception.
void finish();
void finishImpl();
void finalizeBeforeNestedFinalize() override;
void finalizeAfterNestedFinalize() override;
std::unique_ptr<WriteBuffer> out;
ZSTD_CCtx * cctx;
ZSTD_inBuffer input;
ZSTD_outBuffer output;
bool finished = false;
};
}

View File

@ -11,7 +11,6 @@
#include <Poco/Event.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromArena.h>
#include <deque>
#include <atomic>

View File

@ -168,8 +168,15 @@ void WriteBufferFromHTTPServerResponse::onProgress(const Progress & progress)
}
}
WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
{
if (finalized)
return;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
}
void WriteBufferFromHTTPServerResponse::finalize()
void WriteBufferFromHTTPServerResponse::finalizeImpl()
{
try
{
@ -198,11 +205,4 @@ void WriteBufferFromHTTPServerResponse::finalize()
}
WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalize();
}
}

View File

@ -32,47 +32,6 @@ namespace DB
/// This allows to implement progress bar in HTTP clients.
class WriteBufferFromHTTPServerResponse final : public BufferWithOwnMemory<WriteBuffer>
{
private:
HTTPServerResponse & response;
bool is_http_method_head;
bool add_cors_header = false;
unsigned keep_alive_timeout = 0;
bool compress = false;
CompressionMethod compression_method;
int compression_level = 1;
std::shared_ptr<std::ostream> response_body_ostr;
std::shared_ptr<std::ostream> response_header_ostr;
std::unique_ptr<WriteBuffer> out;
bool initialized = false;
bool headers_started_sending = false;
bool headers_finished_sending = false; /// If true, you could not add any headers.
Progress accumulated_progress;
size_t send_progress_interval_ms = 100;
Stopwatch progress_watch;
std::mutex mutex; /// progress callback could be called from different threads.
/// Must be called under locked mutex.
/// This method send headers, if this was not done already,
/// but not finish them with \r\n, allowing to send more headers subsequently.
void startSendHeaders();
// Used for write the header X-ClickHouse-Progress
void writeHeaderProgress();
// Used for write the header X-ClickHouse-Summary
void writeHeaderSummary();
/// This method finish headers with \r\n, allowing to start to send body.
void finishSendHeaders();
void nextImpl() override;
public:
WriteBufferFromHTTPServerResponse(
HTTPServerResponse & response_,
@ -81,15 +40,11 @@ public:
bool compress_ = false, /// If true - set Content-Encoding header and compress the result.
CompressionMethod compression_method_ = CompressionMethod::None);
~WriteBufferFromHTTPServerResponse() override;
/// Writes progress in repeating HTTP headers.
void onProgress(const Progress & progress);
/// Send at least HTTP headers if no data has been sent yet.
/// Use after the data has possibly been sent and no error happened (and thus you do not plan
/// to change response HTTP code.
/// This method is idempotent.
void finalize() override;
/// Turn compression on or off.
/// The setting has any effect only if HTTP headers haven't been sent yet.
void setCompression(bool enable_compression)
@ -117,7 +72,51 @@ public:
send_progress_interval_ms = send_progress_interval_ms_;
}
~WriteBufferFromHTTPServerResponse() override;
private:
/// Send at least HTTP headers if no data has been sent yet.
/// Use after the data has possibly been sent and no error happened (and thus you do not plan
/// to change response HTTP code.
/// This method is idempotent.
void finalizeImpl() override;
/// Must be called under locked mutex.
/// This method send headers, if this was not done already,
/// but not finish them with \r\n, allowing to send more headers subsequently.
void startSendHeaders();
// Used for write the header X-ClickHouse-Progress
void writeHeaderProgress();
// Used for write the header X-ClickHouse-Summary
void writeHeaderSummary();
/// This method finish headers with \r\n, allowing to start to send body.
void finishSendHeaders();
void nextImpl() override;
HTTPServerResponse & response;
bool is_http_method_head;
bool add_cors_header = false;
unsigned keep_alive_timeout = 0;
bool compress = false;
CompressionMethod compression_method;
int compression_level = 1;
std::shared_ptr<std::ostream> response_body_ostr;
std::shared_ptr<std::ostream> response_header_ostr;
std::unique_ptr<WriteBuffer> out;
bool initialized = false;
bool headers_started_sending = false;
bool headers_finished_sending = false; /// If true, you could not add any headers.
Progress accumulated_progress;
size_t send_progress_interval_ms = 100;
Stopwatch progress_watch;
std::mutex mutex; /// progress callback could be called from different threads.
};
}

View File

@ -2,7 +2,6 @@
#if USE_HDFS
#include <Interpreters/Context.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <hdfs/hdfs.h>
@ -109,7 +108,7 @@ void WriteBufferFromHDFS::sync()
}
void WriteBufferFromHDFS::finalize()
void WriteBufferFromHDFS::finalizeImpl()
{
try
{

View File

@ -5,6 +5,8 @@
#if USE_HDFS
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <fcntl.h>
#include <string>
#include <memory>
@ -32,9 +34,9 @@ public:
void sync() override;
void finalize() override;
private:
void finalizeImpl() override;
struct WriteBufferFromHDFSImpl;
std::unique_ptr<WriteBufferFromHDFSImpl> impl;
};

View File

@ -649,6 +649,9 @@ public:
void onFinish() override
{
writer->doWriteSuffix();
writer->flush();
write_buf->sync();
write_buf->finalize();
}
// void flush() override

View File

@ -0,0 +1 @@
100000 99999

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS file"
${CLICKHOUSE_CLIENT} --query "CREATE TABLE file (x UInt64) ENGINE = File(Native, '${CLICKHOUSE_DATABASE}/data.xz')"
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE file"
${CLICKHOUSE_CLIENT} --query "INSERT INTO file SELECT * FROM numbers(100000)"
${CLICKHOUSE_CLIENT} --query "SELECT count(), max(x) FROM file"
${CLICKHOUSE_CLIENT} --query "DROP TABLE file"