Merge pull request #57074 from ClickHouse/less-allocation-in-arenas

Avoid excessive allocation in Arena
This commit is contained in:
Alexey Milovidov 2023-11-23 23:02:26 +01:00 committed by GitHub
commit d72bc854d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 250 additions and 167 deletions

View File

@ -11,7 +11,6 @@
#include <Common/memcmpSmall.h>
#include <Common/assert_cast.h>
#include <Core/Field.h>
#include <Common/Arena.h>
class Collator;
@ -20,6 +19,8 @@ class Collator;
namespace DB
{
class Arena;
/** Column for String values.
*/
class ColumnString final : public COWHelper<IColumn, ColumnString>

View File

@ -0,0 +1,137 @@
#include <Common/Arena.h>
#include <Columns/IColumnDummy.h>
#include <Columns/ColumnsCommon.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int NOT_IMPLEMENTED;
}
Field IColumnDummy::operator[](size_t) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot get value from {}", getName());
}
void IColumnDummy::get(size_t, Field &) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot get value from {}", getName());
}
void IColumnDummy::insert(const Field &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot insert element into {}", getName());
}
bool IColumnDummy::isDefaultAt(size_t) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "isDefaultAt is not implemented for {}", getName());
}
StringRef IColumnDummy::serializeValueIntoArena(size_t /*n*/, Arena & arena, char const *& begin, const UInt8 *) const
{
/// Has to put one useless byte into Arena, because serialization into zero number of bytes is ambiguous.
char * res = arena.allocContinue(1, begin);
*res = 0;
return { res, 1 };
}
const char * IColumnDummy::deserializeAndInsertFromArena(const char * pos)
{
++s;
return pos + 1;
}
const char * IColumnDummy::skipSerializedInArena(const char * pos) const
{
return pos;
}
ColumnPtr IColumnDummy::filter(const Filter & filt, ssize_t /*result_size_hint*/) const
{
size_t bytes = countBytesInFilter(filt);
return cloneDummy(bytes);
}
void IColumnDummy::expand(const IColumn::Filter & mask, bool inverted)
{
size_t bytes = countBytesInFilter(mask);
if (inverted)
bytes = mask.size() - bytes;
s = bytes;
}
ColumnPtr IColumnDummy::permute(const Permutation & perm, size_t limit) const
{
if (s != perm.size())
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of permutation doesn't match size of column.");
return cloneDummy(limit ? std::min(s, limit) : s);
}
ColumnPtr IColumnDummy::index(const IColumn & indexes, size_t limit) const
{
if (indexes.size() < limit)
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of indexes is less than required.");
return cloneDummy(limit ? limit : s);
}
void IColumnDummy::getPermutation(IColumn::PermutationSortDirection /*direction*/, IColumn::PermutationSortStability /*stability*/,
size_t /*limit*/, int /*nan_direction_hint*/, Permutation & res) const
{
res.resize(s);
for (size_t i = 0; i < s; ++i)
res[i] = i;
}
ColumnPtr IColumnDummy::replicate(const Offsets & offsets) const
{
if (s != offsets.size())
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of offsets doesn't match size of column.");
return cloneDummy(offsets.back());
}
MutableColumns IColumnDummy::scatter(ColumnIndex num_columns, const Selector & selector) const
{
if (s != selector.size())
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of selector doesn't match size of column.");
std::vector<size_t> counts(num_columns);
for (auto idx : selector)
++counts[idx];
MutableColumns res(num_columns);
for (size_t i = 0; i < num_columns; ++i)
res[i] = cloneResized(counts[i]);
return res;
}
double IColumnDummy::getRatioOfDefaultRows(double) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getRatioOfDefaultRows is not supported for {}", getName());
}
UInt64 IColumnDummy::getNumberOfDefaultRows() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getNumberOfDefaultRows is not supported for {}", getName());
}
void IColumnDummy::getIndicesOfNonDefaultRows(Offsets &, size_t, size_t) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getIndicesOfNonDefaultRows is not supported for {}", getName());
}
void IColumnDummy::gather(ColumnGathererStream &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method gather is not supported for {}", getName());
}
}

View File

@ -1,21 +1,12 @@
#pragma once
#include <Common/Arena.h>
#include <Common/PODArray.h>
#include <Columns/IColumn.h>
#include <Columns/ColumnsCommon.h>
#include <Core/Field.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int NOT_IMPLEMENTED;
}
class Arena;
/** Base class for columns-constants that contain a value that is not in the `Field`.
* Not a full-fledged column and is used in a special way.
@ -42,10 +33,10 @@ public:
bool hasEqualValues() const override { return true; }
Field operator[](size_t) const override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot get value from {}", getName()); }
void get(size_t, Field &) const override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot get value from {}", getName()); }
void insert(const Field &) override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot insert element into {}", getName()); }
bool isDefaultAt(size_t) const override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "isDefaultAt is not implemented for {}", getName()); }
Field operator[](size_t) const override;
void get(size_t, Field &) const override;
void insert(const Field &) override;
bool isDefaultAt(size_t) const override;
StringRef getDataAt(size_t) const override
{
@ -57,24 +48,9 @@ public:
++s;
}
StringRef serializeValueIntoArena(size_t /*n*/, Arena & arena, char const *& begin, const UInt8 *) const override
{
/// Has to put one useless byte into Arena, because serialization into zero number of bytes is ambiguous.
char * res = arena.allocContinue(1, begin);
*res = 0;
return { res, 1 };
}
const char * deserializeAndInsertFromArena(const char * pos) override
{
++s;
return pos + 1;
}
const char * skipSerializedInArena(const char * pos) const override
{
return pos;
}
StringRef serializeValueIntoArena(size_t /*n*/, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t /*n*/, SipHash & /*hash*/) const override
{
@ -98,90 +74,30 @@ public:
s += length;
}
ColumnPtr filter(const Filter & filt, ssize_t /*result_size_hint*/) const override
{
size_t bytes = countBytesInFilter(filt);
return cloneDummy(bytes);
}
ColumnPtr filter(const Filter & filt, ssize_t /*result_size_hint*/) const override;
void expand(const IColumn::Filter & mask, bool inverted) override
{
size_t bytes = countBytesInFilter(mask);
if (inverted)
bytes = mask.size() - bytes;
s = bytes;
}
void expand(const IColumn::Filter & mask, bool inverted) override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override
{
if (s != perm.size())
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of permutation doesn't match size of column.");
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
return cloneDummy(limit ? std::min(s, limit) : s);
}
ColumnPtr index(const IColumn & indexes, size_t limit) const override
{
if (indexes.size() < limit)
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of indexes is less than required.");
return cloneDummy(limit ? limit : s);
}
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
void getPermutation(IColumn::PermutationSortDirection /*direction*/, IColumn::PermutationSortStability /*stability*/,
size_t /*limit*/, int /*nan_direction_hint*/, Permutation & res) const override
{
res.resize(s);
for (size_t i = 0; i < s; ++i)
res[i] = i;
}
size_t /*limit*/, int /*nan_direction_hint*/, Permutation & res) const override;
void updatePermutation(IColumn::PermutationSortDirection /*direction*/, IColumn::PermutationSortStability /*stability*/,
size_t, int, Permutation &, EqualRanges&) const override {}
ColumnPtr replicate(const Offsets & offsets) const override
size_t, int, Permutation &, EqualRanges&) const override
{
if (s != offsets.size())
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of offsets doesn't match size of column.");
return cloneDummy(offsets.back());
}
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override
{
if (s != selector.size())
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of selector doesn't match size of column.");
ColumnPtr replicate(const Offsets & offsets) const override;
std::vector<size_t> counts(num_columns);
for (auto idx : selector)
++counts[idx];
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;
MutableColumns res(num_columns);
for (size_t i = 0; i < num_columns; ++i)
res[i] = cloneResized(counts[i]);
return res;
}
double getRatioOfDefaultRows(double) const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getRatioOfDefaultRows is not supported for {}", getName());
}
UInt64 getNumberOfDefaultRows() const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getNumberOfDefaultRows is not supported for {}", getName());
}
void getIndicesOfNonDefaultRows(Offsets &, size_t, size_t) const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getIndicesOfNonDefaultRows is not supported for {}", getName());
}
void gather(ColumnGathererStream &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method gather is not supported for {}", getName());
}
double getRatioOfDefaultRows(double) const override;
UInt64 getNumberOfDefaultRows() const override;
void getIndicesOfNonDefaultRows(Offsets &, size_t, size_t) const override;
void gather(ColumnGathererStream &) override;
void getExtremes(Field &, Field &) const override
{

View File

@ -5,13 +5,14 @@
#include <vector>
#include <boost/noncopyable.hpp>
#include <Core/Defines.h>
#if __has_include(<sanitizer/asan_interface.h>) && defined(ADDRESS_SANITIZER)
# include <sanitizer/asan_interface.h>
#endif
#include <Common/memcpySmall.h>
#include <Common/ProfileEvents.h>
#include <Common/Allocator.h>
#if __has_include(<sanitizer/asan_interface.h>) && defined(ADDRESS_SANITIZER)
# include <sanitizer/asan_interface.h>
#endif
namespace ProfileEvents
{
@ -39,13 +40,36 @@ private:
/// Contiguous MemoryChunk of memory and pointer to free space inside it. Member of single-linked list.
struct alignas(16) MemoryChunk : private Allocator<false> /// empty base optimization
{
char * begin;
char * pos;
char * end; /// does not include padding.
char * begin = nullptr;
char * pos = nullptr;
char * end = nullptr; /// does not include padding.
MemoryChunk * prev;
std::unique_ptr<MemoryChunk> prev;
MemoryChunk(size_t size_, MemoryChunk * prev_)
MemoryChunk()
{
}
void swap(MemoryChunk & other)
{
std::swap(begin, other.begin);
std::swap(pos, other.pos);
std::swap(end, other.end);
prev.swap(other.prev);
}
MemoryChunk(MemoryChunk && other)
{
*this = std::move(other);
}
MemoryChunk & operator=(MemoryChunk && other)
{
swap(other);
return *this;
}
MemoryChunk(size_t size_)
{
ProfileEvents::increment(ProfileEvents::ArenaAllocChunks);
ProfileEvents::increment(ProfileEvents::ArenaAllocBytes, size_);
@ -53,13 +77,15 @@ private:
begin = reinterpret_cast<char *>(Allocator<false>::alloc(size_));
pos = begin;
end = begin + size_ - pad_right;
prev = prev_;
ASAN_POISON_MEMORY_REGION(begin, size_);
}
~MemoryChunk()
{
if (empty())
return;
/// We must unpoison the memory before returning to the allocator,
/// because the allocator might not have asan integration, and the
/// memory would stay poisoned forever. If the allocator supports
@ -67,21 +93,21 @@ private:
ASAN_UNPOISON_MEMORY_REGION(begin, size());
Allocator<false>::free(begin, size());
delete prev;
}
bool empty() const { return begin == end;}
size_t size() const { return end + pad_right - begin; }
size_t remaining() const { return end - pos; }
};
size_t initial_size;
size_t growth_factor;
size_t linear_growth_threshold;
/// Last contiguous MemoryChunk of memory.
MemoryChunk * head;
size_t allocated_bytes;
size_t used_bytes;
MemoryChunk head;
size_t allocated_bytes = 0;
size_t used_bytes = 0;
size_t page_size;
static size_t roundUpToPageSize(size_t s, size_t page_size)
@ -95,9 +121,13 @@ private:
{
size_t size_after_grow = 0;
if (head->size() < linear_growth_threshold)
if (head.empty())
{
size_after_grow = std::max(min_next_size, head->size() * growth_factor);
size_after_grow = std::max(min_next_size, initial_size);
}
else if (head.size() < linear_growth_threshold)
{
size_after_grow = std::max(min_next_size, head.size() * growth_factor);
}
else
{
@ -119,8 +149,18 @@ private:
/// Add next contiguous MemoryChunk of memory with size not less than specified.
void NO_INLINE addMemoryChunk(size_t min_size)
{
head = new MemoryChunk(nextSize(min_size + pad_right), head);
allocated_bytes += head->size();
size_t next_size = nextSize(min_size + pad_right);
if (head.empty())
{
head = MemoryChunk(next_size);
}
else
{
auto chunk = std::make_unique<MemoryChunk>(next_size);
head.swap(*chunk);
head.prev = std::move(chunk);
}
allocated_bytes += head.size();
}
friend class ArenaAllocator;
@ -128,29 +168,23 @@ private:
public:
explicit Arena(size_t initial_size_ = 4096, size_t growth_factor_ = 2, size_t linear_growth_threshold_ = 128 * 1024 * 1024)
: growth_factor(growth_factor_)
: initial_size(initial_size_)
, growth_factor(growth_factor_)
, linear_growth_threshold(linear_growth_threshold_)
, head(new MemoryChunk(initial_size_, nullptr))
, allocated_bytes(head->size())
, used_bytes(0)
, page_size(static_cast<size_t>(::getPageSize()))
{
}
~Arena()
{
delete head;
}
/// Get piece of memory, without alignment.
/// Note: we expect it will return a non-nullptr even if the size is zero.
char * alloc(size_t size)
{
used_bytes += size;
if (unlikely(static_cast<std::ptrdiff_t>(size) > head->end - head->pos))
if (unlikely(head.empty() || static_cast<std::ptrdiff_t>(size) > head.end - head.pos))
addMemoryChunk(size);
char * res = head->pos;
head->pos += size;
char * res = head.pos;
head.pos += size;
ASAN_UNPOISON_MEMORY_REGION(res, size + pad_right);
return res;
}
@ -161,14 +195,14 @@ public:
used_bytes += size;
do
{
void * head_pos = head->pos;
size_t space = head->end - head->pos;
void * head_pos = head.pos;
size_t space = head.end - head.pos;
auto * res = static_cast<char *>(std::align(alignment, size, head_pos, space));
if (res)
{
head->pos = static_cast<char *>(head_pos);
head->pos += size;
head.pos = static_cast<char *>(head_pos);
head.pos += size;
ASAN_UNPOISON_MEMORY_REGION(res, size + pad_right);
return res;
}
@ -191,9 +225,9 @@ public:
void * rollback(size_t size)
{
used_bytes -= size;
head->pos -= size;
ASAN_POISON_MEMORY_REGION(head->pos, size + pad_right);
return head->pos;
head.pos -= size;
ASAN_POISON_MEMORY_REGION(head.pos, size + pad_right);
return head.pos;
}
/** Begin or expand a contiguous range of memory.
@ -234,10 +268,10 @@ public:
// This method only works for extending the last allocation. For lack of
// original size, check a weaker condition: that 'begin' is at least in
// the current MemoryChunk.
assert(range_start >= head->begin);
assert(range_start < head->end);
assert(range_start >= head.begin);
assert(range_start < head.end);
if (head->pos + additional_bytes <= head->end)
if (head.pos + additional_bytes <= head.end)
{
// The new size fits into the last MemoryChunk, so just alloc the
// additional size. We can alloc without alignment here, because it
@ -254,7 +288,7 @@ public:
// solved not by complicating this method, but by rethinking the
// approach to memory management for aggregate function states, so that
// we can provide a proper realloc().
const size_t existing_bytes = head->pos - range_start;
const size_t existing_bytes = head.pos - range_start;
const size_t new_bytes = existing_bytes + additional_bytes;
const char * old_range = range_start;
@ -317,12 +351,11 @@ public:
/// yourself having to use this method, probably you're doing something wrong.
size_t remainingSpaceInCurrentMemoryChunk() const
{
return head->remaining();
return head.remaining();
}
};
using ArenaPtr = std::shared_ptr<Arena>;
using Arenas = std::vector<ArenaPtr>;
}

View File

@ -20,7 +20,7 @@ public:
char const * data = reinterpret_cast<char *>(buf);
// Invariant should be maintained: new_size > old_size
if (data + old_size == arena->head->pos)
if (data + old_size == arena->head.pos)
{
// Consecutive optimization
arena->allocContinue(new_size - old_size, data);
@ -59,7 +59,7 @@ public:
{
char const * data = reinterpret_cast<char *>(buf);
if (data + old_size == arena->head->pos)
if (data + old_size == arena->head.pos)
{
arena->allocContinue(new_size - old_size, data, alignment);
return reinterpret_cast<void *>(const_cast<char *>(data));

View File

@ -11,11 +11,9 @@
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnSparse.h>
#include <Formats/NativeWriter.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Interpreters/Aggregator.h>
#include <AggregateFunctions/Combinators/AggregateFunctionArray.h>
@ -23,7 +21,6 @@
#include <IO/Operators.h>
#include <Interpreters/JIT/compileFunction.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Core/ProtocolDefines.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Common/Stopwatch.h>
@ -37,13 +34,13 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/JSONBuilder.h>
#include <Common/filesystemHelpers.h>
#include <Common/scope_guard_safe.h>
#include <Parsers/ASTSelectQuery.h>
#include <Interpreters/AggregationUtils.h>
namespace ProfileEvents
{
extern const Event ExternalAggregationWritePart;
@ -1123,7 +1120,9 @@ void NO_INLINE Aggregator::executeImplBatch(
return;
/// For all rows.
AggregateDataPtr place = aggregates_pool->alloc(0);
/// This pointer is unused, but the logic will compare it for nullptr to check if the cell is set.
AggregateDataPtr place = reinterpret_cast<AggregateDataPtr>(0x1);
if (all_keys_are_const)
{
state.emplaceKey(method.data, 0, *aggregates_pool).setMapped(place);

View File

@ -5,11 +5,8 @@
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Processors/ISource.h>

View File

@ -49,5 +49,5 @@ Check total_bytes/total_rows for Set
2048 50
2048 100
Check total_bytes/total_rows for Join
10240 50
10240 100
1 50
1 100

View File

@ -134,7 +134,7 @@ DROP TABLE check_system_tables;
SELECT 'Check total_bytes/total_rows for Join';
CREATE TABLE check_system_tables Engine=Join(ANY, LEFT, number) AS SELECT * FROM numbers(50);
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables' AND database = currentDatabase();
SELECT total_bytes BETWEEN 5000 AND 15000, total_rows FROM system.tables WHERE name = 'check_system_tables' AND database = currentDatabase();
INSERT INTO check_system_tables SELECT number+50 FROM numbers(50);
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables' AND database = currentDatabase();
SELECT total_bytes BETWEEN 5000 AND 15000, total_rows FROM system.tables WHERE name = 'check_system_tables' AND database = currentDatabase();
DROP TABLE check_system_tables;

View File

@ -1,4 +1,4 @@
test_dictionary_hashed 1000000 0.4768 33558760
test_dictionary_hashed_load_factor 1000000 0.9537 16781544
test_dictionary_sparse_hashed 1000000 0.4768 20975848
test_dictionary_sparse_hashed_load_factor 1000000 0.9537 10490088
test_dictionary_hashed 1000000 0.4768 34000000
test_dictionary_hashed_load_factor 1000000 0.9537 17000000
test_dictionary_sparse_hashed 1000000 0.4768 21000000
test_dictionary_sparse_hashed_load_factor 1000000 0.9537 10000000

View File

@ -31,7 +31,7 @@ LIFETIME(0);
SYSTEM RELOAD DICTIONARY test_dictionary_{{layout}};
SYSTEM RELOAD DICTIONARY test_dictionary_{{layout}}_load_factor;
SELECT name, element_count, round(load_factor, 4), bytes_allocated FROM system.dictionaries WHERE database = currentDatabase() ORDER BY name;
SELECT name, element_count, round(load_factor, 4), round(bytes_allocated, -6) FROM system.dictionaries WHERE database = currentDatabase() ORDER BY name;
DROP DICTIONARY IF EXISTS test_dictionary_{{layout}};
DROP DICTIONARY IF EXISTS test_dictionary_{{layout}}_load_factor;