This commit is contained in:
Vladislav Smirnov 2019-03-13 18:28:29 +03:00
parent a916b62f33
commit 59d473e215

View File

@ -220,30 +220,30 @@ bool CapnProtoRowInputStream::read(MutableColumns & columns, RowReadExtension &)
kj::Array<capnp::word> heap_array;
kj::ArrayPtr<const capnp::word> array;
size_t bytesSize = buf.size() - istr.offset();
size_t remainingBytes = 0;
if (bytesSize < sizeof(capnp::word)) // case when we read less than 8 bytes (capnp::word)
size_t bytes_size = buf.size() - istr.offset();
size_t remaining_bytes = 0;
if (bytes_size < sizeof(capnp::word)) // case when we read less than 8 bytes (capnp::word)
{
char edgeBytes[sizeof(capnp::word)];
while (bytesSize + remainingBytes < sizeof(capnp::word))
char edge_bytes[sizeof(capnp::word)];
while (bytes_size + remaining_bytes < sizeof(capnp::word))
{
istr.readStrict(edgeBytes + remainingBytes, bytesSize);
remainingBytes += bytesSize;
istr.readStrict(edge_bytes + remaining_bytes, bytes_size);
remaining_bytes += bytes_size;
istr.next();
bytesSize = buf.size();
bytes_size = buf.size();
}
auto wordsSize = bytesSize / sizeof(capnp::word) + 1;
heap_array = kj::heapArray<capnp::word>(wordsSize + 1);
auto words_size = bytes_size / sizeof(capnp::word) + 1;
heap_array = kj::heapArray<capnp::word>(words_size + 1);
auto chars_heap_array = heap_array.asChars();
::memcpy(chars_heap_array.begin(), edgeBytes, remainingBytes);
::memcpy(chars_heap_array.begin() + remainingBytes, buf.begin(), buf.size());
::memcpy(chars_heap_array.begin(), edge_bytes, remaining_bytes);
::memcpy(chars_heap_array.begin() + remaining_bytes, buf.begin(), buf.size());
array = heap_array.asPtr();
}
else
{
auto wordsSize = bytesSize / sizeof(capnp::word);
array = kj::arrayPtr(base, wordsSize);
auto words_size = bytes_size / sizeof(capnp::word);
array = kj::arrayPtr(base, words_size);
auto expected_words = capnp::expectedSizeInWordsFromPrefix(array);
if (expected_words * sizeof(capnp::word) > array.size())
{
@ -313,9 +313,9 @@ bool CapnProtoRowInputStream::read(MutableColumns & columns, RowReadExtension &)
istr.ignore(parsed);
}
// Advance buffer position if used with remaining bytes from previous buffer
else if (remainingBytes != 0)
else if (remaining_bytes != 0)
{
auto parsed = (msg.getEnd() - heap_array.begin()) * sizeof(capnp::word) - remainingBytes;
auto parsed = (msg.getEnd() - heap_array.begin()) * sizeof(capnp::word) - remaining_bytes;
istr.ignore(parsed);
}