mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 21:24:28 +00:00
tabs -> spaces
This commit is contained in:
parent
22f4852830
commit
7d822d8081
@ -221,39 +221,38 @@ bool CapnProtoRowInputStream::read(MutableColumns & columns, RowReadExtension &)
|
|||||||
kj::ArrayPtr<const capnp::word> array;
|
kj::ArrayPtr<const capnp::word> array;
|
||||||
|
|
||||||
size_t bytesSize = buf.size() - istr.offset();
|
size_t bytesSize = buf.size() - istr.offset();
|
||||||
size_t remainingBytes = 0;
|
size_t remainingBytes = 0;
|
||||||
if (bytesSize < sizeof(capnp::word)) // case when we read less than 8 bytes (capnp::word)
|
if (bytesSize < sizeof(capnp::word)) // case when we read less than 8 bytes (capnp::word)
|
||||||
{
|
{
|
||||||
char edgeBytes[sizeof(capnp::word)];
|
char edgeBytes[sizeof(capnp::word)];
|
||||||
while (bytesSize + remainingBytes < sizeof(capnp::word))
|
while (bytesSize + remainingBytes < sizeof(capnp::word))
|
||||||
{
|
{
|
||||||
istr.readStrict(edgeBytes + remainingBytes, bytesSize);
|
istr.readStrict(edgeBytes + remainingBytes, bytesSize);
|
||||||
remainingBytes += bytesSize;
|
remainingBytes += bytesSize;
|
||||||
istr.next();
|
istr.next();
|
||||||
bytesSize = buf.size();
|
bytesSize = buf.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto wordsSize = bytesSize / sizeof(capnp::word) + 1;
|
auto wordsSize = bytesSize / sizeof(capnp::word) + 1;
|
||||||
heap_array = kj::heapArray<capnp::word>(wordsSize + 1);
|
heap_array = kj::heapArray<capnp::word>(wordsSize + 1);
|
||||||
auto chars_heap_array = heap_array.asChars();
|
auto chars_heap_array = heap_array.asChars();
|
||||||
::memcpy(chars_heap_array.begin(), edgeBytes, remainingBytes);
|
::memcpy(chars_heap_array.begin(), edgeBytes, remainingBytes);
|
||||||
::memcpy(chars_heap_array.begin() + remainingBytes, buf.begin(), buf.size());
|
::memcpy(chars_heap_array.begin() + remainingBytes, buf.begin(), buf.size());
|
||||||
array = heap_array.asPtr();
|
array = heap_array.asPtr();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
auto wordsSize = bytesSize / sizeof(capnp::word);
|
auto wordsSize = bytesSize / sizeof(capnp::word);
|
||||||
|
array = kj::arrayPtr(base, wordsSize);
|
||||||
array = kj::arrayPtr(base, wordsSize);
|
auto expected_words = capnp::expectedSizeInWordsFromPrefix(array);
|
||||||
auto expected_words = capnp::expectedSizeInWordsFromPrefix(array);
|
if (expected_words * sizeof(capnp::word) > array.size())
|
||||||
if (expected_words * sizeof(capnp::word) > array.size())
|
{
|
||||||
{
|
// We'll need to reassemble the message in a contiguous buffer
|
||||||
// We'll need to reassemble the message in a contiguous buffer
|
heap_array = kj::heapArray<capnp::word>(expected_words);
|
||||||
heap_array = kj::heapArray<capnp::word>(expected_words);
|
istr.readStrict(heap_array.asChars().begin(), heap_array.asChars().size());
|
||||||
istr.readStrict(heap_array.asChars().begin(), heap_array.asChars().size());
|
array = heap_array.asPtr();
|
||||||
array = heap_array.asPtr();
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
#if CAPNP_VERSION >= 8000
|
#if CAPNP_VERSION >= 8000
|
||||||
capnp::UnalignedFlatArrayMessageReader msg(array);
|
capnp::UnalignedFlatArrayMessageReader msg(array);
|
||||||
@ -316,8 +315,8 @@ bool CapnProtoRowInputStream::read(MutableColumns & columns, RowReadExtension &)
|
|||||||
// Advance buffer position if used with remaining bytes from previous buffer
|
// Advance buffer position if used with remaining bytes from previous buffer
|
||||||
else if (remainingBytes != 0)
|
else if (remainingBytes != 0)
|
||||||
{
|
{
|
||||||
auto parsed = (msg.getEnd() - heap_array.begin()) * sizeof(capnp::word) - remainingBytes;
|
auto parsed = (msg.getEnd() - heap_array.begin()) * sizeof(capnp::word) - remainingBytes;
|
||||||
istr.position() += parsed;
|
istr.position() += parsed;
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
Loading…
Reference in New Issue
Block a user