do call finalize for all buffers

This commit is contained in:
Sema Checherinda 2023-06-01 19:00:47 +02:00
parent 9a4962bd68
commit 1cb02e2710
24 changed files with 99 additions and 41 deletions

View File

@ -528,6 +528,7 @@ StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & are
{
WriteBufferFromArena out(arena, begin);
func->serialize(data[n], out, version);
out.finalize();
return out.complete();
}

View File

@ -260,7 +260,7 @@ std::unique_ptr<ShellCommand> ShellCommand::executeDirect(const ShellCommand::Co
std::vector<char *> argv(arguments.size() + 2);
std::vector<char> argv_data(argv_sum_size);
WriteBuffer writer(argv_data.data(), argv_sum_size);
WriteBufferFromPointer writer(argv_data.data(), argv_sum_size);
argv[0] = writer.position();
writer.write(path.data(), path.size() + 1);
@ -271,6 +271,8 @@ std::unique_ptr<ShellCommand> ShellCommand::executeDirect(const ShellCommand::Co
writer.write(arguments[i].data(), arguments[i].size() + 1);
}
writer.finalize();
argv[arguments.size() + 1] = nullptr;
return executeImpl(path.data(), argv.data(), config);

View File

@ -674,7 +674,7 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
auto * buffer_start = reinterpret_cast<BufferBase::Position>(entry_buf->data_begin() + entry_buf->size() - write_buffer_header_size);
WriteBuffer write_buf(buffer_start, write_buffer_header_size);
WriteBufferFromPointer write_buf(buffer_start, write_buffer_header_size);
if (serialization_version < KeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_TIME)
writeIntBinary(request_for_session->time, write_buf);
@ -684,6 +684,8 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
if (request_for_session->digest->version != KeeperStorage::NO_DIGEST)
writeIntBinary(request_for_session->digest->value, write_buf);
write_buf.finalize();
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::PreAppendLogFollower:

View File

@ -16,7 +16,7 @@ void IMySQLWritePacket::writePayload(WriteBuffer & buffer, uint8_t & sequence_id
{
MySQLPacketPayloadWriteBuffer buf(buffer, getPayloadSize(), sequence_id);
writePayloadImpl(buf);
buf.next();
buf.finalize();
if (buf.remainingPayloadSize())
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incomplete payload. Written {} bytes, expected {} bytes.",

View File

@ -60,6 +60,7 @@ public:
settings, state);
serialization->serializeBinaryBulkStateSuffix(settings, state);
out.finalize();
return out.count();
}
};

View File

@ -170,7 +170,7 @@ public:
auto * begin = reinterpret_cast<char *>(result_column_data.data());
WriteBuffer buffer(begin, result_column_data.size());
WriteBufferFromPointer buffer(begin, result_column_data.size());
using TimeType = DateTypeToTimeType<DataType>;
callOnDatePartWriter<TimeType>(date_part, [&](const auto & writer)
@ -195,6 +195,8 @@ public:
result_column_data.resize(buffer.position() - begin);
buffer.finalize();
return result_column;
}

View File

@ -85,6 +85,7 @@ public:
uint128 getHash()
{
next();
chassert(finalized);
return IHashingBuffer<WriteBuffer>::getHash();
}
};

View File

@ -4,8 +4,8 @@
namespace DB
{
NullWriteBuffer::NullWriteBuffer(size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment)
NullWriteBuffer::NullWriteBuffer()
: WriteBuffer(data, sizeof(data))
{
}

View File

@ -8,11 +8,14 @@ namespace DB
{
/// Simply do nothing, can be used to measure amount of written bytes.
class NullWriteBuffer : public BufferWithOwnMemory<WriteBuffer>, boost::noncopyable
class NullWriteBuffer : public WriteBuffer, boost::noncopyable
{
public:
explicit NullWriteBuffer(size_t buf_size = 16<<10, char * existing_memory = nullptr, size_t alignment = false);
NullWriteBuffer();
void nextImpl() override;
private:
char data[128];
};
}

View File

@ -33,7 +33,6 @@ class WriteBuffer : public BufferBase
public:
using BufferBase::set;
using BufferBase::position;
WriteBuffer(Position ptr, size_t size) : BufferBase(ptr, size, 0) {}
void set(Position ptr, size_t size) { BufferBase::set(ptr, size, 0); }
/** write the data in the buffer (from the beginning of the buffer to the current position);
@ -70,7 +69,7 @@ public:
virtual ~WriteBuffer()
{
// That destructor could be call with finalized=false in case of exceptions
if (!finalized)
if (count() > 0 && !finalized)
{
/// It is totally OK to destroy instance without finalization when an exception occurs
/// However it is suspicious to destroy instance without finalization at the green path
@ -164,6 +163,8 @@ public:
}
protected:
WriteBuffer(Position ptr, size_t size) : BufferBase(ptr, size, 0) {}
virtual void finalizeImpl()
{
next();
@ -175,11 +176,31 @@ private:
/** Write the data in the buffer (from the beginning of the buffer to the current position).
* Throw an exception if something is wrong.
*/
virtual void nextImpl() { throw Exception(ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER, "Cannot write after end of buffer."); }
virtual void nextImpl()
{
throw Exception(ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER, "Cannot write after end of buffer.");
}
};
using WriteBufferPtr = std::shared_ptr<WriteBuffer>;
class WriteBufferFromPointer : public WriteBuffer
{
public:
WriteBufferFromPointer(Position ptr, size_t size) : WriteBuffer(ptr, size) {}
private:
virtual void finalizeImpl() override
{
/// no op
}
virtual void sync() override
{
/// no on
}
};
}

View File

@ -59,8 +59,9 @@ TEST_P(DateTime64StringWriteTest, WriteText)
PaddedPODArray<char> actual_string(param.string.size() * 2, '\0'); // TODO: detect overflows
WriteBuffer write_buffer(actual_string.data(), actual_string.size());
WriteBufferFromPointer write_buffer(actual_string.data(), actual_string.size());
EXPECT_NO_THROW(writeDateTimeText(param.dt64, param.scale, write_buffer, param.timezone));
write_buffer.finalize();
EXPECT_STREQ(param.string.data(), actual_string.data());
}

View File

@ -6,7 +6,9 @@
namespace DB
{
WriteBuffer NullOutputFormat::empty_buffer(nullptr, 0);
WriteBufferFromPointer NullOutputFormat::empty_buffer(nullptr, 0);
NullOutputFormat::NullOutputFormat(const Block & header) : IOutputFormat(header, empty_buffer) {}
void registerOutputFormatNull(FormatFactory & factory)
{

View File

@ -4,10 +4,12 @@
namespace DB
{
class WriteBufferFromPointer;
class NullOutputFormat final : public IOutputFormat
{
public:
explicit NullOutputFormat(const Block & header) : IOutputFormat(header, empty_buffer) {}
explicit NullOutputFormat(const Block & header);
String getName() const override { return "Null"; }
@ -15,7 +17,7 @@ protected:
void consume(Chunk) override {}
private:
static WriteBuffer empty_buffer;
static WriteBufferFromPointer empty_buffer;
};
}

View File

@ -224,6 +224,7 @@ namespace DB
/// Flush all the data to handmade buffer.
formatter->flush();
out_buffer.finalize();
unit.actual_memory_size = out_buffer.getActualSize();
{

View File

@ -87,6 +87,7 @@ public:
NullWriteBuffer buf;
save_totals_and_extremes_in_statistics = internal_formatter_creator(buf)->areTotalsAndExtremesUsedInFinalize();
buf.finalize();
/// Just heuristic. We need one thread for collecting, one thread for receiving chunks
/// and n threads for formatting.

View File

@ -5,7 +5,7 @@
namespace DB
{
WriteBuffer LazyOutputFormat::out(nullptr, 0);
WriteBufferFromPointer LazyOutputFormat::out(nullptr, 0);
Chunk LazyOutputFormat::getChunk(UInt64 milliseconds)
{

View File

@ -57,7 +57,7 @@ private:
Chunk extremes;
/// Is not used.
static WriteBuffer out;
static WriteBufferFromPointer out;
ProfileInfo info;
};

View File

@ -9,7 +9,12 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
WriteBuffer PullingOutputFormat::out(nullptr, 0);
WriteBufferFromPointer PullingOutputFormat::out(nullptr, 0);
PullingOutputFormat::PullingOutputFormat(const Block & header, std::atomic_bool & consume_data_flag_)
: IOutputFormat(header, out)
, has_data_flag(consume_data_flag_)
{}
void PullingOutputFormat::consume(Chunk chunk)
{

View File

@ -5,14 +5,13 @@
namespace DB
{
class WriteBufferFromPointer;
/// Output format which is used in PullingPipelineExecutor.
class PullingOutputFormat : public IOutputFormat
{
public:
explicit PullingOutputFormat(const Block & header, std::atomic_bool & consume_data_flag_)
: IOutputFormat(header, out)
, has_data_flag(consume_data_flag_)
{}
PullingOutputFormat(const Block & header, std::atomic_bool & consume_data_flag_);
String getName() const override { return "PullingOutputFormat"; }
@ -41,7 +40,7 @@ private:
ProfileInfo info;
/// Is not used.
static WriteBuffer out;
static WriteBufferFromPointer out;
};
}

View File

@ -138,7 +138,7 @@ IMergeTreeDataPart::MinMaxIndex::WrittenFiles IMergeTreeDataPart::MinMaxIndex::s
HashingWriteBuffer out_hashing(*out);
serialization->serializeBinary(hyperrectangle[i].left, out_hashing, {});
serialization->serializeBinary(hyperrectangle[i].right, out_hashing, {});
out_hashing.next();
out_hashing.finalize();
out_checksums.files[file_name].file_size = out_hashing.count();
out_checksums.files[file_name].file_hash = out_hashing.getHash();
out->preFinalize();

View File

@ -276,14 +276,23 @@ void MergeTreeDataPartWriterCompact::fillDataChecksums(IMergeTreeDataPart::Check
writeIntBinary(static_cast<UInt64>(0), marks_out);
}
for (const auto & [_, stream] : streams_by_codec)
{
stream->hashing_buf.finalize();
stream->compressed_buf.finalize();
}
plain_hashing.finalize();
plain_file->next();
if (marks_source_hashing)
marks_source_hashing->next();
marks_source_hashing->finalize();
if (marks_compressor)
marks_compressor->next();
marks_compressor->finalize();
marks_file_hashing->finalize();
marks_file_hashing->next();
addToChecksums(checksums);
plain_file->preFinalize();
@ -292,14 +301,14 @@ void MergeTreeDataPartWriterCompact::fillDataChecksums(IMergeTreeDataPart::Check
void MergeTreeDataPartWriterCompact::finishDataSerialization(bool sync)
{
plain_file->finalize();
marks_file->finalize();
if (sync)
{
plain_file->sync();
marks_file->sync();
}
plain_file->finalize();
marks_file->finalize();
}
static void fillIndexGranularityImpl(

View File

@ -13,17 +13,17 @@ namespace ErrorCodes
void MergeTreeDataPartWriterOnDisk::Stream::preFinalize()
{
compressed_hashing.next();
compressor.next();
plain_hashing.next();
compressed_hashing.finalize();
compressor.finalize();
plain_hashing.finalize();
if (compress_marks)
{
marks_compressed_hashing.next();
marks_compressor.next();
marks_compressed_hashing.finalize();
marks_compressor.finalize();
}
marks_hashing.next();
marks_hashing.finalize();
plain_file->preFinalize();
marks_file->preFinalize();
@ -347,9 +347,9 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat
}
if (compress_primary_key)
index_source_hashing_stream->next();
index_source_hashing_stream->finalize();
index_file_hashing_stream->next();
index_file_hashing_stream->finalize();
String index_name = "primary" + getIndexExtension(compress_primary_key);
if (compress_primary_key)

View File

@ -435,9 +435,11 @@ std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(const Block &
partition_key_sample.getByPosition(i).type->getDefaultSerialization()->serializeBinary(value[i], out_hashing, {});
}
out_hashing.next();
out_hashing.finalize();
checksums.files["partition.dat"].file_size = out_hashing.count();
checksums.files["partition.dat"].file_hash = out_hashing.getHash();
out->preFinalize();
return out;
}

View File

@ -201,7 +201,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
auto count_out = new_part->getDataPartStorage().writeFile("count.txt", 4096, write_settings);
HashingWriteBuffer count_out_hashing(*count_out);
writeIntText(rows_count, count_out_hashing);
count_out_hashing.next();
count_out_hashing.finalize();
checksums.files["count.txt"].file_size = count_out_hashing.count();
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
count_out->preFinalize();
@ -215,6 +215,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::UUID_FILE_NAME, 4096, write_settings);
HashingWriteBuffer out_hashing(*out);
writeUUIDText(new_part->uuid, out_hashing);
out_hashing.finalize();
checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count();
checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_hash = out_hashing.getHash();
out->preFinalize();
@ -241,7 +242,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
auto count_out = new_part->getDataPartStorage().writeFile("count.txt", 4096, write_settings);
HashingWriteBuffer count_out_hashing(*count_out);
writeIntText(rows_count, count_out_hashing);
count_out_hashing.next();
count_out_hashing.finalize();
checksums.files["count.txt"].file_size = count_out_hashing.count();
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
count_out->preFinalize();
@ -255,6 +256,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
auto out = new_part->getDataPartStorage().writeFile("ttl.txt", 4096, write_settings);
HashingWriteBuffer out_hashing(*out);
new_part->ttl_infos.write(out_hashing);
out_hashing.finalize();
checksums.files["ttl.txt"].file_size = out_hashing.count();
checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
out->preFinalize();
@ -266,6 +268,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096, write_settings);
HashingWriteBuffer out_hashing(*out);
new_part->getSerializationInfos().writeJSON(out_hashing);
out_hashing.finalize();
checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count();
checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_hash = out_hashing.getHash();
out->preFinalize();