diff --git a/dbms/src/Formats/CapnProtoRowInputStream.cpp b/dbms/src/Formats/CapnProtoRowInputStream.cpp index d0bea371d10..c84442e83f5 100644 --- a/dbms/src/Formats/CapnProtoRowInputStream.cpp +++ b/dbms/src/Formats/CapnProtoRowInputStream.cpp @@ -221,39 +221,38 @@ bool CapnProtoRowInputStream::read(MutableColumns & columns, RowReadExtension &) kj::ArrayPtr array; size_t bytesSize = buf.size() - istr.offset(); - size_t remainingBytes = 0; - if (bytesSize < sizeof(capnp::word)) // case when we read less than 8 bytes (capnp::word) - { - char edgeBytes[sizeof(capnp::word)]; - while (bytesSize + remainingBytes < sizeof(capnp::word)) - { - istr.readStrict(edgeBytes + remainingBytes, bytesSize); - remainingBytes += bytesSize; - istr.next(); - bytesSize = buf.size(); - } + size_t remainingBytes = 0; + if (bytesSize < sizeof(capnp::word)) // case when we read less than 8 bytes (capnp::word) + { + char edgeBytes[sizeof(capnp::word)]; + while (bytesSize + remainingBytes < sizeof(capnp::word)) + { + istr.readStrict(edgeBytes + remainingBytes, bytesSize); + remainingBytes += bytesSize; + istr.next(); + bytesSize = buf.size(); + } - auto wordsSize = bytesSize / sizeof(capnp::word) + 1; - heap_array = kj::heapArray(wordsSize + 1); - auto chars_heap_array = heap_array.asChars(); - ::memcpy(chars_heap_array.begin(), edgeBytes, remainingBytes); - ::memcpy(chars_heap_array.begin() + remainingBytes, buf.begin(), buf.size()); - array = heap_array.asPtr(); - } - else - { - auto wordsSize = bytesSize / sizeof(capnp::word); - - array = kj::arrayPtr(base, wordsSize); - auto expected_words = capnp::expectedSizeInWordsFromPrefix(array); - if (expected_words * sizeof(capnp::word) > array.size()) - { - // We'll need to reassemble the message in a contiguous buffer - heap_array = kj::heapArray(expected_words); - istr.readStrict(heap_array.asChars().begin(), heap_array.asChars().size()); - array = heap_array.asPtr(); - } - } + auto wordsSize = bytesSize / sizeof(capnp::word) + 1; + heap_array = kj::heapArray(wordsSize + 1); + auto chars_heap_array = heap_array.asChars(); + ::memcpy(chars_heap_array.begin(), edgeBytes, remainingBytes); + ::memcpy(chars_heap_array.begin() + remainingBytes, buf.begin(), buf.size()); + array = heap_array.asPtr(); + } + else + { + auto wordsSize = bytesSize / sizeof(capnp::word); + array = kj::arrayPtr(base, wordsSize); + auto expected_words = capnp::expectedSizeInWordsFromPrefix(array); + if (expected_words * sizeof(capnp::word) > array.size()) + { + // We'll need to reassemble the message in a contiguous buffer + heap_array = kj::heapArray(expected_words); + istr.readStrict(heap_array.asChars().begin(), heap_array.asChars().size()); + array = heap_array.asPtr(); + } + } #if CAPNP_VERSION >= 8000 capnp::UnalignedFlatArrayMessageReader msg(array); @@ -316,8 +315,8 @@ bool CapnProtoRowInputStream::read(MutableColumns & columns, RowReadExtension &) // Advance buffer position if used with remaining bytes from previous buffer else if (remainingBytes != 0) { - auto parsed = (msg.getEnd() - heap_array.begin()) * sizeof(capnp::word) - remainingBytes; - istr.position() += parsed; + auto parsed = (msg.getEnd() - heap_array.begin()) * sizeof(capnp::word) - remainingBytes; + istr.position() += parsed; } return true;