ArrayCache: development [#CLICKHOUSE-2].

This commit is contained in:
Alexey Milovidov 2017-04-10 02:10:05 +03:00
parent 5f8d99cbe9
commit f418ed37cf
2 changed files with 209 additions and 75 deletions

View File

@ -93,9 +93,9 @@ private:
bool isFree() const { return SizeMultimapHook::is_linked(); }
static RegionMetadata & create()
static RegionMetadata * create()
{
return *(new RegionMetadata);
return new RegionMetadata;
}
void destroy()
@ -122,12 +122,21 @@ private:
bool operator() (Key key, const RegionMetadata & b) const { return key < b.key; }
};
using LRUList = boost::intrusive::list<RegionMetadata, boost::intrusive::base_hook<LRUListHook>>;
using AdjacencyList = boost::intrusive::list<RegionMetadata, boost::intrusive::base_hook<AdjacencyListHook>>;
using LRUList = boost::intrusive::list<RegionMetadata,
boost::intrusive::base_hook<LRUListHook>, boost::intrusive::constant_time_size<true>>;
using AdjacencyList = boost::intrusive::list<RegionMetadata,
boost::intrusive::base_hook<AdjacencyListHook>, boost::intrusive::constant_time_size<true>>;
using SizeMultimap = boost::intrusive::multiset<RegionMetadata,
boost::intrusive::compare<RegionCompareBySize>, boost::intrusive::base_hook<SizeMultimapHook>>;
boost::intrusive::compare<RegionCompareBySize>, boost::intrusive::base_hook<SizeMultimapHook>, boost::intrusive::constant_time_size<true>>;
using KeyMap = boost::intrusive::set<RegionMetadata,
boost::intrusive::compare<RegionCompareByKey>, boost::intrusive::base_hook<SizeMultimapHook>>;
boost::intrusive::compare<RegionCompareByKey>, boost::intrusive::base_hook<SizeMultimapHook>, boost::intrusive::constant_time_size<true>>;
/** Each region could be:
* - free: not holding any data;
* - allocated: having data, addressed by key;
* -- allocated, in use: holded externally, could not be evicted;
* -- allocated, not in use: not holded, could be evicted.
*/
/** Invariants:
* adjacency_list contains all regions
@ -141,7 +150,7 @@ private:
SizeMultimap size_multimap;
KeyMap key_map;
std::mutex mutex;
mutable std::mutex mutex;
std::mt19937 rng {randomSeed()};
@ -159,9 +168,14 @@ private:
~Chunk()
{
if (0 != munmap(ptr, size))
if (ptr && 0 != munmap(ptr, size))
DB::throwFromErrno("Allocator: Cannot munmap.", DB::ErrorCodes::CANNOT_MUNMAP);
}
Chunk(Chunk && other) : ptr(other.ptr), size(other.size)
{
other.ptr = nullptr;
}
};
using Chunks = std::list<Chunk>;
@ -180,19 +194,21 @@ private:
static constexpr size_t min_chunk_size = 64 * 1024 * 1024;
/// Cache stats.
size_t hits = 0;
size_t misses = 0;
std::atomic<size_t> hits {0}; /// Value was in cache.
std::atomic<size_t> concurrent_hits {0}; /// Value was calculated by another thread and we was waiting for it. Also summed in hits.
std::atomic<size_t> misses {0};
public:
/// Holds region as in use. Regions in use could not be evicted from cache.
/// In constructor, increases refcount.
/// In constructor, increases refcount and if it becomes non-zero, remove region from lru_list.
/// In destructor, decreases refcount and if it becomes zero, insert region to lru_list.
struct Holder : private boost::noncopyable
{
Holder(ArrayCache & cache_, RegionMetadata & region_) : cache(cache_), region(region_)
{
++region.refcount;
if (++region.refcount == 1 && region.LRUListHook::is_linked())
cache.lru_list.erase(cache.lru_list.iterator_to(region));
cache.total_size_in_use += region.size;
}
@ -284,14 +300,21 @@ private:
friend struct InsertTokenHolder;
static constexpr size_t page_size = 4096;
static size_t roundUpToPageSize(size_t x)
static size_t roundUp(size_t x, size_t rounding)
{
return (x + (page_size - 1)) / page_size * page_size;
return (x + (rounding - 1)) / rounding * rounding;
}
static constexpr size_t page_size = 4096;
void freeRegion(RegionMetadata & region)
/// Sizes and addresses of allocated memory will be aligned to specified boundary.
static constexpr size_t alignment = 16;
/// Precondition: region is not in lru_list, not in key_map, not in size_multimap.
/// Postcondition: region is not in lru_list, not in key_map,
/// possibly coalesced with adjacent free regions and inserted into size_multimap.
void freeRegion(RegionMetadata & region) noexcept
{
auto adjacency_list_it = adjacency_list.iterator_to(region);
@ -320,7 +343,7 @@ private:
{
region.size += left_it->size;
*reinterpret_cast<char **>(&region.ptr) -= left_it->size;
size_multimap.erase(*left_it);
size_multimap.erase(size_multimap.iterator_to(*left_it));
adjacency_list.erase(left_it);
left_it->destroy();
}
@ -328,7 +351,7 @@ private:
if (has_free_region_at_right)
{
region.size += right_it->size;
size_multimap.erase(*right_it);
size_multimap.erase(size_multimap.iterator_to(*right_it));
adjacency_list.erase(right_it);
right_it->destroy();
}
@ -339,7 +362,8 @@ private:
/// Evict one region from cache and return it, coalesced with neighbours.
/// If nothing to evict, returns nullptr.
RegionMetadata * evictOne()
/// Region is removed from lru_list and key_map and inserted into size_multimap.
RegionMetadata * evictOne() noexcept
{
if (lru_list.empty())
return nullptr;
@ -348,7 +372,7 @@ private:
total_allocated_size -= evicted_region.size;
lru_list.pop_front();
key_map.erase(evicted_region);
key_map.erase(key_map.iterator_to(evicted_region));
freeRegion(evicted_region);
return &evicted_region;
@ -356,7 +380,7 @@ private:
/// Allocates a chunk of specified size. Creates free region, spanning through whole chunk and returns it.
RegionMetadata & addNewChunk(size_t size)
RegionMetadata * addNewChunk(size_t size)
{
/// ASLR by hand.
void * address_hint = reinterpret_cast<void *>(std::uniform_int_distribution<size_t>(0x100000000000UL, 0x700000000000UL)(rng));
@ -367,62 +391,76 @@ private:
total_chunks_size += size;
/// Create free region spanning through chunk.
RegionMetadata & free_region = RegionMetadata::create();
RegionMetadata * free_region;
try
{
free_region = RegionMetadata::create();
}
catch (...)
{
total_chunks_size -= size;
chunks.pop_back();
throw;
}
free_region.ptr = chunk.ptr;
free_region.chunk = chunk.ptr;
free_region.size = chunk.size;
free_region->ptr = chunk.ptr;
free_region->chunk = chunk.ptr;
free_region->size = chunk.size;
adjacency_list.push_back(free_region);
size_multimap.insert(free_region);
adjacency_list.push_back(*free_region);
size_multimap.insert(*free_region);
return free_region;
}
/// Precondition: free_region.size >= size.
RegionMetadata & allocateFromFreeRegion(RegionMetadata & free_region, size_t size)
RegionMetadata * allocateFromFreeRegion(RegionMetadata & free_region, size_t size)
{
total_allocated_size += size;
if (free_region.size == size)
{
size_multimap.erase(free_region);
return free_region;
total_allocated_size += size;
size_multimap.erase(size_multimap.iterator_to(free_region));
return &free_region;
}
RegionMetadata & allocated_region = RegionMetadata::create();
allocated_region.ptr = free_region.ptr;
allocated_region.chunk = free_region.ptr;
allocated_region.size = size;
RegionMetadata * allocated_region = RegionMetadata::create();
total_allocated_size += size;
size_multimap.erase(free_region);
allocated_region->ptr = free_region.ptr;
allocated_region->chunk = free_region.ptr;
allocated_region->size = size;
size_multimap.erase(size_multimap.iterator_to(free_region));
free_region.size -= size;
*reinterpret_cast<char **>(&free_region.ptr) += size;
size_multimap.insert(free_region);
adjacency_list.insert(adjacency_list.iterator_to(free_region), allocated_region);
adjacency_list.insert(adjacency_list.iterator_to(free_region), *allocated_region);
return allocated_region;
}
/// Does not insert allocated region to key_map or lru_list. Caller must do it.
RegionMetadata * allocate(size_t size)
{
size = roundUp(size, alignment);
/// Look up to size multimap to find free region of specified size.
auto it = size_multimap.lower_bound(size, RegionCompareBySize());
if (size_multimap.end() != it)
{
return &allocateFromFreeRegion(*it, size);
return allocateFromFreeRegion(*it, size);
}
/// If nothing was found and total size of allocated chunks plus required size is lower than maximum,
/// allocate a new chunk.
size_t required_chunk_size = std::max(min_chunk_size, roundUpToPageSize(size));
size_t required_chunk_size = std::max(min_chunk_size, roundUp(size, page_size));
if (total_chunks_size + required_chunk_size <= max_total_size)
{
/// Create free region spanning through chunk.
RegionMetadata & free_region = addNewChunk(required_chunk_size);
return &allocateFromFreeRegion(free_region, size);
RegionMetadata * free_region = addNewChunk(required_chunk_size);
return allocateFromFreeRegion(*free_region, size);
}
/// Evict something from cache and continue.
@ -438,7 +476,7 @@ private:
if (res->size < size)
continue;
return &allocateFromFreeRegion(*res, size);
return allocateFromFreeRegion(*res, size);
}
}
@ -470,7 +508,8 @@ public:
/// Exceptions occuring in callbacks will be propagated to the caller.
/// Another thread from the set of concurrent threads will then try to call its callbacks etc.
///
/// Returns std::pair of the cached value and a bool indicating whether the value was produced during this call.
/// Returns cached value wrapped by holder, preventing cache entry from eviction.
/// Also could return a bool indicating whether the value was produced during this call.
template <typename GetSizeFunc, typename InitializeFunc>
HolderPtr getOrSet(const Key & key, GetSizeFunc && get_size, InitializeFunc && initialize, bool * was_calculated)
{
@ -505,6 +544,7 @@ public:
{
/// Another thread already produced the value while we waited for token->mutex.
++hits;
++concurrent_hits;
if (was_calculated)
*was_calculated = false;
@ -546,6 +586,9 @@ public:
}
std::lock_guard<std::mutex> cache_lock(mutex);
try
{
token->value = std::make_shared<Holder>(*this, *region);
/// Insert the new value only if the token is still in present in insert_tokens.
@ -564,4 +607,55 @@ public:
return token->value;
}
catch (...)
{
if (region->KeyMapHook::is_linked())
key_map.erase(key_map.iterator_to(*region));
freeRegion(*region);
throw;
}
}
struct Statistics
{
size_t total_chunks_size = 0;
size_t total_allocated_size = 0;
size_t total_size_currently_initialized = 0;
size_t total_size_in_use = 0;
size_t num_chunks = 0;
size_t num_regions = 0;
size_t num_free_regions = 0;
size_t num_regions_in_use = 0;
size_t num_keyed_regions = 0;
size_t hits = 0;
size_t concurrent_hits = 0;
size_t misses = 0;
};
Statistics getStatistics() const
{
std::lock_guard<std::mutex> cache_lock(mutex);
Statistics res;
res.total_chunks_size = total_chunks_size;
res.total_allocated_size = total_allocated_size;
res.total_size_currently_initialized = total_size_currently_initialized.load(std::memory_order_relaxed);
res.total_size_in_use = total_size_in_use;
res.num_chunks = chunks.size();
res.num_regions = adjacency_list.size();
res.num_free_regions = size_multimap.size();
res.num_regions_in_use = adjacency_list.size() - size_multimap.size() - lru_list.size();
res.num_keyed_regions = key_map.size();
res.hits = hits.load(std::memory_order_relaxed);
res.concurrent_hits = concurrent_hits.load(std::memory_order_relaxed);
res.misses = misses.load(std::memory_order_relaxed);
return res;
}
};

View File

@ -1,35 +1,75 @@
#include <iostream>
#include <cstring>
#include <thread>
#include <Common/ArrayCache.h>
#include <IO/ReadHelpers.h>
int main(int argc, char ** argv)
{
size_t cache_size = DB::parse<size_t>(argv[1]);
size_t num_threads = DB::parse<size_t>(argv[2]);
size_t num_iterations = DB::parse<size_t>(argv[3]);
size_t region_max_size = DB::parse<size_t>(argv[4]);
size_t max_key = DB::parse<size_t>(argv[5]);
using Cache = ArrayCache<int, int>;
Cache cache(1024 * 1024 * 1024);
Cache cache(cache_size);
Cache::HolderPtr holder = cache.getOrSet(1,
[]
std::vector<std::thread> threads;
for (size_t i = 0; i < num_threads; ++i)
{
return 100;
threads.emplace_back([&]
{
std::mt19937 generator(randomSeed());
for (size_t i = 0; i < num_iterations; ++i)
{
size_t size = std::uniform_int_distribution<size_t>(1, region_max_size)(generator);
int key = std::uniform_int_distribution<int>(1, max_key)(generator);
cache.getOrSet(
key,
[=]{ return size; },
[=](void * ptr, int & payload)
{
payload = i;
memset(ptr, i, size);
},
[](void * ptr, int & payload)
{
payload = 123;
}, nullptr);
nullptr);
}
});
}
std::cerr << holder->payload() << "\n";
std::atomic_bool stop{};
holder = cache.getOrSet(1,
[]
std::thread stats_thread([&]
{
return 100;
},
[](void * ptr, int & payload)
while (!stop)
{
payload = 456;
}, nullptr);
sleep(1);
Cache::Statistics statistics = cache.getStatistics();
std::cerr
<< "total_chunks_size: " << statistics.total_chunks_size << "\n"
<< "total_allocated_size: " << statistics.total_allocated_size << "\n"
<< "total_size_currently_initialized: " << statistics.total_size_currently_initialized << "\n"
<< "total_size_in_use: " << statistics.total_size_in_use << "\n"
<< "num_chunks: " << statistics.num_chunks << "\n"
<< "num_regions: " << statistics.num_regions << "\n"
<< "num_free_regions: " << statistics.num_free_regions << "\n"
<< "num_regions_in_use: " << statistics.num_regions_in_use << "\n"
<< "num_keyed_regions: " << statistics.num_keyed_regions << "\n"
<< "hits: " << statistics.hits << "\n"
<< "concurrent_hits: " << statistics.concurrent_hits << "\n"
<< "misses: " << statistics.misses << "\n\n";
}
});
std::cerr << holder->payload() << "\n";
for (auto & thread : threads)
thread.join();
stop = true;
stats_thread.join();
return 0;
}