Padding for IO buffers.

Testing data

```
select 'aaaaaaaa','bbbbbbbb','cccccccc','dddddddd','eeeeeeee','ffffffff','gggg','hhh' from numbers(3000000) into outfile '/tmp/test.tsv'
```

Testing command
```
echo "select count() from file('/tmp/test.tsv', CSV, 'a String, b String, c String, d String, e String, f String, g String, h String') where not ignore(e)" | clickhouse-benchmark
```

TSV parser has less overhead than CSV, using it would better unveil the benefits of memcpySmall.

Before
```
QPS: 1.662, RPS: 4985463.906, MiB/s: 603.823, result RPS: 1.662, result MiB/s: 0.000.
0.000%  0.559 sec.
10.000% 0.564 sec.
20.000% 0.568 sec.
30.000% 0.572 sec.
40.000% 0.575 sec.
50.000% 0.581 sec.
60.000% 0.592 sec.
70.000% 0.624 sec.
80.000% 0.639 sec.
90.000% 0.664 sec.
95.000% 0.686 sec.
99.000% 0.711 sec.
99.900% 0.715 sec.
99.990% 0.716 sec.
```

After
```
QPS: 1.861, RPS: 5582303.107, MiB/s: 676.110, result RPS: 1.861, result MiB/s: 0.000.
0.000%  0.510 sec.
10.000% 0.514 sec.
20.000% 0.517 sec.
30.000% 0.521 sec.
40.000% 0.523 sec.
50.000% 0.527 sec.
60.000% 0.530 sec.
70.000% 0.539 sec.
80.000% 0.558 sec.
90.000% 0.584 sec.
95.000% 0.589 sec.
99.000% 0.608 sec.
99.900% 0.655 sec.
99.990% 0.663 sec.
```
This commit is contained in:
Amos Bird 2018-08-28 03:14:15 +08:00
parent a5d7097c08
commit 8851fbcab2
4 changed files with 56 additions and 25 deletions

View File

@ -14,6 +14,7 @@
#include <Common/Allocator.h>
#include <Common/Exception.h>
#include <Common/BitHelpers.h>
#include <Common/memcpySmall.h>
namespace DB
@ -288,21 +289,36 @@ public:
/// Do not insert into the array a piece of itself. Because with the resize, the iterators on themselves can be invalidated.
template <typename It1, typename It2, typename ... TAllocatorParams>
void insert(It1 from_begin, It2 from_end, TAllocatorParams &&... allocator_params)
void insertPrepare(It1 from_begin, It2 from_end, TAllocatorParams &&... allocator_params)
{
size_t required_capacity = size() + (from_end - from_begin);
if (required_capacity > capacity())
reserve(roundUpToPowerOfTwoOrZero(required_capacity), std::forward<TAllocatorParams>(allocator_params)...);
}
/// Do not insert into the array a piece of itself. Because with the resize, the iterators on themselves can be invalidated.
template <typename It1, typename It2, typename ... TAllocatorParams>
void insert(It1 from_begin, It2 from_end, TAllocatorParams &&... allocator_params)
{
insertPrepare(from_begin, from_end, std::forward<TAllocatorParams>(allocator_params)...);
insert_assume_reserved(from_begin, from_end);
}
/// Works under assumption, that it's possible to read up to 15 excessive bytes after `from_end` and this PODArray is padded.
template <typename It1, typename It2, typename ... TAllocatorParams>
void insertSmallAllowReadWriteOverflow15(It1 from_begin, It2 from_end, TAllocatorParams &&... allocator_params)
{
static_assert(pad_right_ >= 15);
insertPrepare(from_begin, from_end, std::forward<TAllocatorParams>(allocator_params)...);
size_t bytes_to_copy = byte_size(from_end - from_begin);
memcpySmallAllowReadWriteOverflow15(c_end, reinterpret_cast<const void *>(&*from_begin), bytes_to_copy);
c_end += bytes_to_copy;
}
template <typename It1, typename It2>
void insert(iterator it, It1 from_begin, It2 from_end)
{
size_t required_capacity = size() + (from_end - from_begin);
if (required_capacity > capacity())
reserve(roundUpToPowerOfTwoOrZero(required_capacity));
insertPrepare(from_begin, from_end);
size_t bytes_to_copy = byte_size(from_end - from_begin);
size_t bytes_to_move = (end() - it) * sizeof(T);

View File

@ -99,6 +99,11 @@ public:
return pos != working_buffer.end();
}
bool isPadded() const
{
return padded;
}
protected:
/// Read/write position.
Position pos;
@ -117,6 +122,9 @@ protected:
/// A reference to a piece of memory for the buffer.
Buffer internal_buffer;
/// Indicator of 15 bytes pad_right
bool padded{false};
};

View File

@ -26,6 +26,7 @@ namespace DB
*/
struct Memory : boost::noncopyable, Allocator<false>
{
static constexpr size_t pad_right = 15;
size_t m_capacity = 0;
size_t m_size = 0;
char * m_data = nullptr;
@ -72,17 +73,17 @@ struct Memory : boost::noncopyable, Allocator<false>
m_size = m_capacity = new_size;
alloc();
}
else if (new_size <= m_capacity)
else if (new_size <= m_size)
{
m_size = new_size;
return;
}
else
{
new_size = align(new_size, alignment);
m_data = static_cast<char *>(Allocator::realloc(m_data, m_capacity, new_size, alignment));
m_capacity = new_size;
m_size = m_capacity;
size_t new_capacity = align(new_size + pad_right, alignment);
m_data = static_cast<char *>(Allocator::realloc(m_data, m_capacity, new_capacity, alignment));
m_capacity = new_capacity;
m_size = m_capacity - pad_right;
}
}
@ -103,13 +104,15 @@ private:
return;
}
ProfileEvents::increment(ProfileEvents::IOBufferAllocs);
ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, m_capacity);
size_t padded_capacity = m_capacity + pad_right;
size_t new_capacity = align(m_capacity, alignment);
ProfileEvents::increment(ProfileEvents::IOBufferAllocs);
ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, padded_capacity);
size_t new_capacity = align(padded_capacity, alignment);
m_data = static_cast<char *>(Allocator::alloc(new_capacity, alignment));
m_capacity = new_capacity;
m_size = m_capacity;
m_size = m_capacity - pad_right;
}
void dealloc()
@ -137,6 +140,7 @@ public:
: Base(nullptr, 0), memory(existing_memory ? 0 : size, alignment)
{
Base::set(existing_memory ? existing_memory : memory.data(), size);
Base::padded = !existing_memory;
}
};

View File

@ -9,6 +9,7 @@
#include <IO/Operators.h>
#include <common/find_first_symbols.h>
#include <stdlib.h>
#include <Common/memcpySmall.h>
#if __SSE2__
#include <emmintrin.h>
@ -162,18 +163,20 @@ bool checkStringByFirstCharacterAndAssertTheRestCaseInsensitive(const char * s,
template <typename T>
static void appendToStringOrVector(T & s, const char * begin, const char * end)
static void appendToStringOrVector(T & s, ReadBuffer & rb, const char * end)
{
s.append(begin, end - begin);
s.append(rb.position(), end - rb.position());
}
template <>
inline void appendToStringOrVector(PaddedPODArray<UInt8> & s, const char * begin, const char * end)
inline void appendToStringOrVector(PaddedPODArray<UInt8> & s, ReadBuffer & rb, const char * end)
{
s.insert(begin, end); /// TODO memcpySmall
if (rb.isPadded())
s.insertSmallAllowReadWriteOverflow15(rb.position(), end);
else
s.insert(rb.position(), end);
}
template <typename Vector>
void readStringInto(Vector & s, ReadBuffer & buf)
{
@ -181,7 +184,7 @@ void readStringInto(Vector & s, ReadBuffer & buf)
{
char * next_pos = find_first_symbols<'\t', '\n'>(buf.position(), buf.buffer().end());
appendToStringOrVector(s, buf.position(), next_pos);
appendToStringOrVector(s, buf, next_pos);
buf.position() = next_pos;
if (buf.hasPendingData())
@ -203,7 +206,7 @@ void readStringUntilEOFInto(Vector & s, ReadBuffer & buf)
{
while (!buf.eof())
{
appendToStringOrVector(s, buf.position(), buf.buffer().end());
appendToStringOrVector(s, buf, buf.buffer().end());
buf.position() = buf.buffer().end();
if (buf.hasPendingData())
@ -374,7 +377,7 @@ void readEscapedStringInto(Vector & s, ReadBuffer & buf)
{
char * next_pos = find_first_symbols<'\t', '\n', '\\'>(buf.position(), buf.buffer().end());
appendToStringOrVector(s, buf.position(), next_pos);
appendToStringOrVector(s, buf, next_pos);
buf.position() = next_pos;
if (!buf.hasPendingData())
@ -416,7 +419,7 @@ static void readAnyQuotedStringInto(Vector & s, ReadBuffer & buf)
{
char * next_pos = find_first_symbols<'\\', quote>(buf.position(), buf.buffer().end());
appendToStringOrVector(s, buf.position(), next_pos);
appendToStringOrVector(s, buf, next_pos);
buf.position() = next_pos;
if (!buf.hasPendingData())
@ -529,7 +532,7 @@ void readCSVStringInto(Vector & s, ReadBuffer & buf, const FormatSettings::CSV &
if (nullptr == next_pos)
next_pos = buf.buffer().end();
appendToStringOrVector(s, buf.position(), next_pos);
appendToStringOrVector(s, buf, next_pos);
buf.position() = next_pos;
if (!buf.hasPendingData())
@ -580,7 +583,7 @@ void readCSVStringInto(Vector & s, ReadBuffer & buf, const FormatSettings::CSV &
}();
appendToStringOrVector(s, buf.position(), next_pos);
appendToStringOrVector(s, buf, next_pos);
buf.position() = next_pos;
if (!buf.hasPendingData())
@ -630,7 +633,7 @@ ReturnType readJSONStringInto(Vector & s, ReadBuffer & buf)
{
char * next_pos = find_first_symbols<'\\', '"'>(buf.position(), buf.buffer().end());
appendToStringOrVector(s, buf.position(), next_pos);
appendToStringOrVector(s, buf, next_pos);
buf.position() = next_pos;
if (!buf.hasPendingData())