diff --git a/src/Columns/ColumnAggregateFunction.cpp b/src/Columns/ColumnAggregateFunction.cpp index e521262acd2..62ec324455e 100644 --- a/src/Columns/ColumnAggregateFunction.cpp +++ b/src/Columns/ColumnAggregateFunction.cpp @@ -528,6 +528,7 @@ StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & are { WriteBufferFromArena out(arena, begin); func->serialize(data[n], out, version); + out.finalize(); return out.complete(); } diff --git a/src/Common/ShellCommand.cpp b/src/Common/ShellCommand.cpp index 0616ac6303c..533e73c7adb 100644 --- a/src/Common/ShellCommand.cpp +++ b/src/Common/ShellCommand.cpp @@ -260,7 +260,7 @@ std::unique_ptr ShellCommand::executeDirect(const ShellCommand::Co std::vector argv(arguments.size() + 2); std::vector argv_data(argv_sum_size); - WriteBuffer writer(argv_data.data(), argv_sum_size); + WriteBufferFromPointer writer(argv_data.data(), argv_sum_size); argv[0] = writer.position(); writer.write(path.data(), path.size() + 1); @@ -271,6 +271,8 @@ std::unique_ptr ShellCommand::executeDirect(const ShellCommand::Co writer.write(arguments[i].data(), arguments[i].size() + 1); } + writer.finalize(); + argv[arguments.size() + 1] = nullptr; return executeImpl(path.data(), argv.data(), config); diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 6e47412cd3a..fda76c52684 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -674,7 +674,7 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ auto * buffer_start = reinterpret_cast(entry_buf->data_begin() + entry_buf->size() - write_buffer_header_size); - WriteBuffer write_buf(buffer_start, write_buffer_header_size); + WriteBufferFromPointer write_buf(buffer_start, write_buffer_header_size); if (serialization_version < KeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_TIME) writeIntBinary(request_for_session->time, write_buf); @@ -684,6 +684,8 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ if (request_for_session->digest->version != KeeperStorage::NO_DIGEST) writeIntBinary(request_for_session->digest->value, write_buf); + write_buf.finalize(); + return nuraft::cb_func::ReturnCode::Ok; } case nuraft::cb_func::PreAppendLogFollower: diff --git a/src/Core/MySQL/IMySQLWritePacket.cpp b/src/Core/MySQL/IMySQLWritePacket.cpp index b5c95717a9b..fd27603d6a8 100644 --- a/src/Core/MySQL/IMySQLWritePacket.cpp +++ b/src/Core/MySQL/IMySQLWritePacket.cpp @@ -16,7 +16,7 @@ void IMySQLWritePacket::writePayload(WriteBuffer & buffer, uint8_t & sequence_id { MySQLPacketPayloadWriteBuffer buf(buffer, getPayloadSize(), sequence_id); writePayloadImpl(buf); - buf.next(); + buf.finalize(); if (buf.remainingPayloadSize()) { throw Exception(ErrorCodes::LOGICAL_ERROR, "Incomplete payload. Written {} bytes, expected {} bytes.", diff --git a/src/Functions/blockSerializedSize.cpp b/src/Functions/blockSerializedSize.cpp index 35be65f3fed..8cfa3b8a4e3 100644 --- a/src/Functions/blockSerializedSize.cpp +++ b/src/Functions/blockSerializedSize.cpp @@ -60,6 +60,7 @@ public: settings, state); serialization->serializeBinaryBulkStateSuffix(settings, state); + out.finalize(); return out.count(); } }; diff --git a/src/Functions/dateName.cpp b/src/Functions/dateName.cpp index bfb190b9a08..4d7a4f0b53d 100644 --- a/src/Functions/dateName.cpp +++ b/src/Functions/dateName.cpp @@ -170,7 +170,7 @@ public: auto * begin = reinterpret_cast(result_column_data.data()); - WriteBuffer buffer(begin, result_column_data.size()); + WriteBufferFromPointer buffer(begin, result_column_data.size()); using TimeType = DateTypeToTimeType; callOnDatePartWriter(date_part, [&](const auto & writer) @@ -195,6 +195,8 @@ public: result_column_data.resize(buffer.position() - begin); + buffer.finalize(); + return result_column; } diff --git a/src/IO/HashingWriteBuffer.h b/src/IO/HashingWriteBuffer.h index 8edfa45a6be..a201d0dab4d 100644 --- a/src/IO/HashingWriteBuffer.h +++ b/src/IO/HashingWriteBuffer.h @@ -85,6 +85,7 @@ public: uint128 getHash() { next(); + chassert(finalized); return IHashingBuffer::getHash(); } }; diff --git a/src/IO/NullWriteBuffer.cpp b/src/IO/NullWriteBuffer.cpp index 035259d48c9..295c53ef7c7 100644 --- a/src/IO/NullWriteBuffer.cpp +++ b/src/IO/NullWriteBuffer.cpp @@ -4,8 +4,8 @@ namespace DB { -NullWriteBuffer::NullWriteBuffer(size_t buf_size, char * existing_memory, size_t alignment) - : BufferWithOwnMemory(buf_size, existing_memory, alignment) +NullWriteBuffer::NullWriteBuffer() + : WriteBuffer(data, sizeof(data)) { } diff --git a/src/IO/NullWriteBuffer.h b/src/IO/NullWriteBuffer.h index 615a9bf5cef..f14c74ff720 100644 --- a/src/IO/NullWriteBuffer.h +++ b/src/IO/NullWriteBuffer.h @@ -8,11 +8,14 @@ namespace DB { /// Simply do nothing, can be used to measure amount of written bytes. -class NullWriteBuffer : public BufferWithOwnMemory, boost::noncopyable +class NullWriteBuffer : public WriteBuffer, boost::noncopyable { public: - explicit NullWriteBuffer(size_t buf_size = 16<<10, char * existing_memory = nullptr, size_t alignment = false); + NullWriteBuffer(); void nextImpl() override; + +private: + char data[128]; }; } diff --git a/src/IO/WriteBuffer.h b/src/IO/WriteBuffer.h index 261a26c93d0..9d74eecaec9 100644 --- a/src/IO/WriteBuffer.h +++ b/src/IO/WriteBuffer.h @@ -33,7 +33,6 @@ class WriteBuffer : public BufferBase public: using BufferBase::set; using BufferBase::position; - WriteBuffer(Position ptr, size_t size) : BufferBase(ptr, size, 0) {} void set(Position ptr, size_t size) { BufferBase::set(ptr, size, 0); } /** write the data in the buffer (from the beginning of the buffer to the current position); @@ -70,7 +69,7 @@ public: virtual ~WriteBuffer() { // That destructor could be call with finalized=false in case of exceptions - if (!finalized) + if (count() > 0 && !finalized) { /// It is totally OK to destroy instance without finalization when an exception occurs /// However it is suspicious to destroy instance without finalization at the green path @@ -164,6 +163,8 @@ public: } protected: + WriteBuffer(Position ptr, size_t size) : BufferBase(ptr, size, 0) {} + virtual void finalizeImpl() { next(); @@ -175,11 +176,31 @@ private: /** Write the data in the buffer (from the beginning of the buffer to the current position). * Throw an exception if something is wrong. */ - virtual void nextImpl() { throw Exception(ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER, "Cannot write after end of buffer."); } + virtual void nextImpl() + { + throw Exception(ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER, "Cannot write after end of buffer."); + } }; using WriteBufferPtr = std::shared_ptr; +class WriteBufferFromPointer : public WriteBuffer +{ +public: + WriteBufferFromPointer(Position ptr, size_t size) : WriteBuffer(ptr, size) {} + +private: + virtual void finalizeImpl() override + { + /// no op + } + + virtual void sync() override + { + /// no on + } +}; + } diff --git a/src/IO/tests/gtest_DateTime64_parsing_and_writing.cpp b/src/IO/tests/gtest_DateTime64_parsing_and_writing.cpp index 697851b217a..6dd6ea67125 100644 --- a/src/IO/tests/gtest_DateTime64_parsing_and_writing.cpp +++ b/src/IO/tests/gtest_DateTime64_parsing_and_writing.cpp @@ -59,8 +59,9 @@ TEST_P(DateTime64StringWriteTest, WriteText) PaddedPODArray actual_string(param.string.size() * 2, '\0'); // TODO: detect overflows - WriteBuffer write_buffer(actual_string.data(), actual_string.size()); + WriteBufferFromPointer write_buffer(actual_string.data(), actual_string.size()); EXPECT_NO_THROW(writeDateTimeText(param.dt64, param.scale, write_buffer, param.timezone)); + write_buffer.finalize(); EXPECT_STREQ(param.string.data(), actual_string.data()); } diff --git a/src/Processors/Formats/Impl/NullFormat.cpp b/src/Processors/Formats/Impl/NullFormat.cpp index 59514be9abc..4bd2249ac16 100644 --- a/src/Processors/Formats/Impl/NullFormat.cpp +++ b/src/Processors/Formats/Impl/NullFormat.cpp @@ -6,7 +6,9 @@ namespace DB { -WriteBuffer NullOutputFormat::empty_buffer(nullptr, 0); +WriteBufferFromPointer NullOutputFormat::empty_buffer(nullptr, 0); + +NullOutputFormat::NullOutputFormat(const Block & header) : IOutputFormat(header, empty_buffer) {} void registerOutputFormatNull(FormatFactory & factory) { diff --git a/src/Processors/Formats/Impl/NullFormat.h b/src/Processors/Formats/Impl/NullFormat.h index 7aa9102790f..3362131c4d3 100644 --- a/src/Processors/Formats/Impl/NullFormat.h +++ b/src/Processors/Formats/Impl/NullFormat.h @@ -4,10 +4,12 @@ namespace DB { +class WriteBufferFromPointer; + class NullOutputFormat final : public IOutputFormat { public: - explicit NullOutputFormat(const Block & header) : IOutputFormat(header, empty_buffer) {} + explicit NullOutputFormat(const Block & header); String getName() const override { return "Null"; } @@ -15,7 +17,7 @@ protected: void consume(Chunk) override {} private: - static WriteBuffer empty_buffer; + static WriteBufferFromPointer empty_buffer; }; } diff --git a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp index 62ee4e4a48d..212a404825d 100644 --- a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp @@ -224,6 +224,7 @@ namespace DB /// Flush all the data to handmade buffer. formatter->flush(); + out_buffer.finalize(); unit.actual_memory_size = out_buffer.getActualSize(); { diff --git a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h index fddcd059be5..490f033b87e 100644 --- a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h +++ b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h @@ -87,6 +87,7 @@ public: NullWriteBuffer buf; save_totals_and_extremes_in_statistics = internal_formatter_creator(buf)->areTotalsAndExtremesUsedInFinalize(); + buf.finalize(); /// Just heuristic. We need one thread for collecting, one thread for receiving chunks /// and n threads for formatting. diff --git a/src/Processors/Formats/LazyOutputFormat.cpp b/src/Processors/Formats/LazyOutputFormat.cpp index 792d805eac3..4f6b10dd068 100644 --- a/src/Processors/Formats/LazyOutputFormat.cpp +++ b/src/Processors/Formats/LazyOutputFormat.cpp @@ -5,7 +5,7 @@ namespace DB { -WriteBuffer LazyOutputFormat::out(nullptr, 0); +WriteBufferFromPointer LazyOutputFormat::out(nullptr, 0); Chunk LazyOutputFormat::getChunk(UInt64 milliseconds) { diff --git a/src/Processors/Formats/LazyOutputFormat.h b/src/Processors/Formats/LazyOutputFormat.h index b539a8494c7..9cf609ed2d7 100644 --- a/src/Processors/Formats/LazyOutputFormat.h +++ b/src/Processors/Formats/LazyOutputFormat.h @@ -57,7 +57,7 @@ private: Chunk extremes; /// Is not used. - static WriteBuffer out; + static WriteBufferFromPointer out; ProfileInfo info; }; diff --git a/src/Processors/Formats/PullingOutputFormat.cpp b/src/Processors/Formats/PullingOutputFormat.cpp index c2036ce37c9..b2378e62d34 100644 --- a/src/Processors/Formats/PullingOutputFormat.cpp +++ b/src/Processors/Formats/PullingOutputFormat.cpp @@ -9,7 +9,12 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -WriteBuffer PullingOutputFormat::out(nullptr, 0); +WriteBufferFromPointer PullingOutputFormat::out(nullptr, 0); + +PullingOutputFormat::PullingOutputFormat(const Block & header, std::atomic_bool & consume_data_flag_) + : IOutputFormat(header, out) + , has_data_flag(consume_data_flag_) +{} void PullingOutputFormat::consume(Chunk chunk) { diff --git a/src/Processors/Formats/PullingOutputFormat.h b/src/Processors/Formats/PullingOutputFormat.h index a231b7679f3..a8efb8dd962 100644 --- a/src/Processors/Formats/PullingOutputFormat.h +++ b/src/Processors/Formats/PullingOutputFormat.h @@ -5,14 +5,13 @@ namespace DB { +class WriteBufferFromPointer; + /// Output format which is used in PullingPipelineExecutor. class PullingOutputFormat : public IOutputFormat { public: - explicit PullingOutputFormat(const Block & header, std::atomic_bool & consume_data_flag_) - : IOutputFormat(header, out) - , has_data_flag(consume_data_flag_) - {} + PullingOutputFormat(const Block & header, std::atomic_bool & consume_data_flag_); String getName() const override { return "PullingOutputFormat"; } @@ -41,7 +40,7 @@ private: ProfileInfo info; /// Is not used. - static WriteBuffer out; + static WriteBufferFromPointer out; }; } diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 6a871c4bb5f..eb16fedb014 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -138,7 +138,7 @@ IMergeTreeDataPart::MinMaxIndex::WrittenFiles IMergeTreeDataPart::MinMaxIndex::s HashingWriteBuffer out_hashing(*out); serialization->serializeBinary(hyperrectangle[i].left, out_hashing, {}); serialization->serializeBinary(hyperrectangle[i].right, out_hashing, {}); - out_hashing.next(); + out_hashing.finalize(); out_checksums.files[file_name].file_size = out_hashing.count(); out_checksums.files[file_name].file_hash = out_hashing.getHash(); out->preFinalize(); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index 0b650eb9f16..5e1da21da5b 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -276,14 +276,23 @@ void MergeTreeDataPartWriterCompact::fillDataChecksums(IMergeTreeDataPart::Check writeIntBinary(static_cast(0), marks_out); } + for (const auto & [_, stream] : streams_by_codec) + { + stream->hashing_buf.finalize(); + stream->compressed_buf.finalize(); + } + + plain_hashing.finalize(); + plain_file->next(); if (marks_source_hashing) - marks_source_hashing->next(); + marks_source_hashing->finalize(); if (marks_compressor) - marks_compressor->next(); + marks_compressor->finalize(); + + marks_file_hashing->finalize(); - marks_file_hashing->next(); addToChecksums(checksums); plain_file->preFinalize(); @@ -292,14 +301,14 @@ void MergeTreeDataPartWriterCompact::fillDataChecksums(IMergeTreeDataPart::Check void MergeTreeDataPartWriterCompact::finishDataSerialization(bool sync) { - plain_file->finalize(); - marks_file->finalize(); - if (sync) { plain_file->sync(); marks_file->sync(); } + + plain_file->finalize(); + marks_file->finalize(); } static void fillIndexGranularityImpl( diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index b0101bb962c..fc12255a323 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -13,17 +13,17 @@ namespace ErrorCodes void MergeTreeDataPartWriterOnDisk::Stream::preFinalize() { - compressed_hashing.next(); - compressor.next(); - plain_hashing.next(); + compressed_hashing.finalize(); + compressor.finalize(); + plain_hashing.finalize(); if (compress_marks) { - marks_compressed_hashing.next(); - marks_compressor.next(); + marks_compressed_hashing.finalize(); + marks_compressor.finalize(); } - marks_hashing.next(); + marks_hashing.finalize(); plain_file->preFinalize(); marks_file->preFinalize(); @@ -347,9 +347,9 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat } if (compress_primary_key) - index_source_hashing_stream->next(); + index_source_hashing_stream->finalize(); - index_file_hashing_stream->next(); + index_file_hashing_stream->finalize(); String index_name = "primary" + getIndexExtension(compress_primary_key); if (compress_primary_key) diff --git a/src/Storages/MergeTree/MergeTreePartition.cpp b/src/Storages/MergeTree/MergeTreePartition.cpp index 1626018f1c1..62ad658730e 100644 --- a/src/Storages/MergeTree/MergeTreePartition.cpp +++ b/src/Storages/MergeTree/MergeTreePartition.cpp @@ -435,9 +435,11 @@ std::unique_ptr MergeTreePartition::store(const Block & partition_key_sample.getByPosition(i).type->getDefaultSerialization()->serializeBinary(value[i], out_hashing, {}); } - out_hashing.next(); + out_hashing.finalize(); + checksums.files["partition.dat"].file_size = out_hashing.count(); checksums.files["partition.dat"].file_hash = out_hashing.getHash(); + out->preFinalize(); return out; } diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index c93ad135835..bfd9e92b4eb 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -201,7 +201,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis auto count_out = new_part->getDataPartStorage().writeFile("count.txt", 4096, write_settings); HashingWriteBuffer count_out_hashing(*count_out); writeIntText(rows_count, count_out_hashing); - count_out_hashing.next(); + count_out_hashing.finalize(); checksums.files["count.txt"].file_size = count_out_hashing.count(); checksums.files["count.txt"].file_hash = count_out_hashing.getHash(); count_out->preFinalize(); @@ -215,6 +215,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::UUID_FILE_NAME, 4096, write_settings); HashingWriteBuffer out_hashing(*out); writeUUIDText(new_part->uuid, out_hashing); + out_hashing.finalize(); checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count(); checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_hash = out_hashing.getHash(); out->preFinalize(); @@ -241,7 +242,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis auto count_out = new_part->getDataPartStorage().writeFile("count.txt", 4096, write_settings); HashingWriteBuffer count_out_hashing(*count_out); writeIntText(rows_count, count_out_hashing); - count_out_hashing.next(); + count_out_hashing.finalize(); checksums.files["count.txt"].file_size = count_out_hashing.count(); checksums.files["count.txt"].file_hash = count_out_hashing.getHash(); count_out->preFinalize(); @@ -255,6 +256,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis auto out = new_part->getDataPartStorage().writeFile("ttl.txt", 4096, write_settings); HashingWriteBuffer out_hashing(*out); new_part->ttl_infos.write(out_hashing); + out_hashing.finalize(); checksums.files["ttl.txt"].file_size = out_hashing.count(); checksums.files["ttl.txt"].file_hash = out_hashing.getHash(); out->preFinalize(); @@ -266,6 +268,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096, write_settings); HashingWriteBuffer out_hashing(*out); new_part->getSerializationInfos().writeJSON(out_hashing); + out_hashing.finalize(); checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count(); checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_hash = out_hashing.getHash(); out->preFinalize();