From f418ed37cf17f37b8099774dd2c56d1d847d87e1 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 10 Apr 2017 02:10:05 +0300 Subject: [PATCH] ArrayCache: development [#CLICKHOUSE-2]. --- dbms/src/Common/ArrayCache.h | 204 +++++++++++++++++++------- dbms/src/Common/tests/array_cache.cpp | 80 +++++++--- 2 files changed, 209 insertions(+), 75 deletions(-) diff --git a/dbms/src/Common/ArrayCache.h b/dbms/src/Common/ArrayCache.h index d67d2e11b58..7b825a5b182 100644 --- a/dbms/src/Common/ArrayCache.h +++ b/dbms/src/Common/ArrayCache.h @@ -93,9 +93,9 @@ private: bool isFree() const { return SizeMultimapHook::is_linked(); } - static RegionMetadata & create() + static RegionMetadata * create() { - return *(new RegionMetadata); + return new RegionMetadata; } void destroy() @@ -122,12 +122,21 @@ private: bool operator() (Key key, const RegionMetadata & b) const { return key < b.key; } }; - using LRUList = boost::intrusive::list>; - using AdjacencyList = boost::intrusive::list>; + using LRUList = boost::intrusive::list, boost::intrusive::constant_time_size>; + using AdjacencyList = boost::intrusive::list, boost::intrusive::constant_time_size>; using SizeMultimap = boost::intrusive::multiset, boost::intrusive::base_hook>; + boost::intrusive::compare, boost::intrusive::base_hook, boost::intrusive::constant_time_size>; using KeyMap = boost::intrusive::set, boost::intrusive::base_hook>; + boost::intrusive::compare, boost::intrusive::base_hook, boost::intrusive::constant_time_size>; + + /** Each region could be: + * - free: not holding any data; + * - allocated: having data, addressed by key; + * -- allocated, in use: holded externally, could not be evicted; + * -- allocated, not in use: not holded, could be evicted. + */ /** Invariants: * adjacency_list contains all regions @@ -141,7 +150,7 @@ private: SizeMultimap size_multimap; KeyMap key_map; - std::mutex mutex; + mutable std::mutex mutex; std::mt19937 rng {randomSeed()}; @@ -159,9 +168,14 @@ private: ~Chunk() { - if (0 != munmap(ptr, size)) + if (ptr && 0 != munmap(ptr, size)) DB::throwFromErrno("Allocator: Cannot munmap.", DB::ErrorCodes::CANNOT_MUNMAP); } + + Chunk(Chunk && other) : ptr(other.ptr), size(other.size) + { + other.ptr = nullptr; + } }; using Chunks = std::list; @@ -180,19 +194,21 @@ private: static constexpr size_t min_chunk_size = 64 * 1024 * 1024; /// Cache stats. - size_t hits = 0; - size_t misses = 0; + std::atomic hits {0}; /// Value was in cache. + std::atomic concurrent_hits {0}; /// Value was calculated by another thread and we was waiting for it. Also summed in hits. + std::atomic misses {0}; public: /// Holds region as in use. Regions in use could not be evicted from cache. - /// In constructor, increases refcount. + /// In constructor, increases refcount and if it becomes non-zero, remove region from lru_list. /// In destructor, decreases refcount and if it becomes zero, insert region to lru_list. struct Holder : private boost::noncopyable { Holder(ArrayCache & cache_, RegionMetadata & region_) : cache(cache_), region(region_) { - ++region.refcount; + if (++region.refcount == 1 && region.LRUListHook::is_linked()) + cache.lru_list.erase(cache.lru_list.iterator_to(region)); cache.total_size_in_use += region.size; } @@ -284,14 +300,21 @@ private: friend struct InsertTokenHolder; - static constexpr size_t page_size = 4096; - static size_t roundUpToPageSize(size_t x) + static size_t roundUp(size_t x, size_t rounding) { - return (x + (page_size - 1)) / page_size * page_size; + return (x + (rounding - 1)) / rounding * rounding; } + static constexpr size_t page_size = 4096; - void freeRegion(RegionMetadata & region) + /// Sizes and addresses of allocated memory will be aligned to specified boundary. + static constexpr size_t alignment = 16; + + + /// Precondition: region is not in lru_list, not in key_map, not in size_multimap. + /// Postcondition: region is not in lru_list, not in key_map, + /// possibly coalesced with adjacent free regions and inserted into size_multimap. + void freeRegion(RegionMetadata & region) noexcept { auto adjacency_list_it = adjacency_list.iterator_to(region); @@ -320,7 +343,7 @@ private: { region.size += left_it->size; *reinterpret_cast(®ion.ptr) -= left_it->size; - size_multimap.erase(*left_it); + size_multimap.erase(size_multimap.iterator_to(*left_it)); adjacency_list.erase(left_it); left_it->destroy(); } @@ -328,7 +351,7 @@ private: if (has_free_region_at_right) { region.size += right_it->size; - size_multimap.erase(*right_it); + size_multimap.erase(size_multimap.iterator_to(*right_it)); adjacency_list.erase(right_it); right_it->destroy(); } @@ -339,7 +362,8 @@ private: /// Evict one region from cache and return it, coalesced with neighbours. /// If nothing to evict, returns nullptr. - RegionMetadata * evictOne() + /// Region is removed from lru_list and key_map and inserted into size_multimap. + RegionMetadata * evictOne() noexcept { if (lru_list.empty()) return nullptr; @@ -348,7 +372,7 @@ private: total_allocated_size -= evicted_region.size; lru_list.pop_front(); - key_map.erase(evicted_region); + key_map.erase(key_map.iterator_to(evicted_region)); freeRegion(evicted_region); return &evicted_region; @@ -356,7 +380,7 @@ private: /// Allocates a chunk of specified size. Creates free region, spanning through whole chunk and returns it. - RegionMetadata & addNewChunk(size_t size) + RegionMetadata * addNewChunk(size_t size) { /// ASLR by hand. void * address_hint = reinterpret_cast(std::uniform_int_distribution(0x100000000000UL, 0x700000000000UL)(rng)); @@ -367,62 +391,76 @@ private: total_chunks_size += size; /// Create free region spanning through chunk. - RegionMetadata & free_region = RegionMetadata::create(); + RegionMetadata * free_region; + try + { + free_region = RegionMetadata::create(); + } + catch (...) + { + total_chunks_size -= size; + chunks.pop_back(); + throw; + } - free_region.ptr = chunk.ptr; - free_region.chunk = chunk.ptr; - free_region.size = chunk.size; + free_region->ptr = chunk.ptr; + free_region->chunk = chunk.ptr; + free_region->size = chunk.size; - adjacency_list.push_back(free_region); - size_multimap.insert(free_region); + adjacency_list.push_back(*free_region); + size_multimap.insert(*free_region); return free_region; } /// Precondition: free_region.size >= size. - RegionMetadata & allocateFromFreeRegion(RegionMetadata & free_region, size_t size) + RegionMetadata * allocateFromFreeRegion(RegionMetadata & free_region, size_t size) { - total_allocated_size += size; - if (free_region.size == size) { - size_multimap.erase(free_region); - return free_region; + total_allocated_size += size; + size_multimap.erase(size_multimap.iterator_to(free_region)); + return &free_region; } - RegionMetadata & allocated_region = RegionMetadata::create(); - allocated_region.ptr = free_region.ptr; - allocated_region.chunk = free_region.ptr; - allocated_region.size = size; + RegionMetadata * allocated_region = RegionMetadata::create(); + total_allocated_size += size; - size_multimap.erase(free_region); + allocated_region->ptr = free_region.ptr; + allocated_region->chunk = free_region.ptr; + allocated_region->size = size; + + size_multimap.erase(size_multimap.iterator_to(free_region)); free_region.size -= size; *reinterpret_cast(&free_region.ptr) += size; size_multimap.insert(free_region); - adjacency_list.insert(adjacency_list.iterator_to(free_region), allocated_region); + adjacency_list.insert(adjacency_list.iterator_to(free_region), *allocated_region); return allocated_region; } + /// Does not insert allocated region to key_map or lru_list. Caller must do it. RegionMetadata * allocate(size_t size) { + size = roundUp(size, alignment); + /// Look up to size multimap to find free region of specified size. auto it = size_multimap.lower_bound(size, RegionCompareBySize()); if (size_multimap.end() != it) { - return &allocateFromFreeRegion(*it, size); + return allocateFromFreeRegion(*it, size); } /// If nothing was found and total size of allocated chunks plus required size is lower than maximum, /// allocate a new chunk. - size_t required_chunk_size = std::max(min_chunk_size, roundUpToPageSize(size)); + size_t required_chunk_size = std::max(min_chunk_size, roundUp(size, page_size)); if (total_chunks_size + required_chunk_size <= max_total_size) { /// Create free region spanning through chunk. - RegionMetadata & free_region = addNewChunk(required_chunk_size); - return &allocateFromFreeRegion(free_region, size); + RegionMetadata * free_region = addNewChunk(required_chunk_size); + return allocateFromFreeRegion(*free_region, size); } /// Evict something from cache and continue. @@ -438,7 +476,7 @@ private: if (res->size < size) continue; - return &allocateFromFreeRegion(*res, size); + return allocateFromFreeRegion(*res, size); } } @@ -470,7 +508,8 @@ public: /// Exceptions occuring in callbacks will be propagated to the caller. /// Another thread from the set of concurrent threads will then try to call its callbacks etc. /// - /// Returns std::pair of the cached value and a bool indicating whether the value was produced during this call. + /// Returns cached value wrapped by holder, preventing cache entry from eviction. + /// Also could return a bool indicating whether the value was produced during this call. template HolderPtr getOrSet(const Key & key, GetSizeFunc && get_size, InitializeFunc && initialize, bool * was_calculated) { @@ -505,6 +544,7 @@ public: { /// Another thread already produced the value while we waited for token->mutex. ++hits; + ++concurrent_hits; if (was_calculated) *was_calculated = false; @@ -546,22 +586,76 @@ public: } std::lock_guard cache_lock(mutex); - token->value = std::make_shared(*this, *region); - /// Insert the new value only if the token is still in present in insert_tokens. - /// (The token may be absent because of a concurrent reset() call). - auto token_it = insert_tokens.find(key); - if (token_it != insert_tokens.end() && token_it->second.get() == token) + try { - key_map.insert(*region); + token->value = std::make_shared(*this, *region); + + /// Insert the new value only if the token is still in present in insert_tokens. + /// (The token may be absent because of a concurrent reset() call). + auto token_it = insert_tokens.find(key); + if (token_it != insert_tokens.end() && token_it->second.get() == token) + { + key_map.insert(*region); + } + + if (!token->cleaned_up) + token_holder.cleanup(token_lock, cache_lock); + + if (was_calculated) + *was_calculated = true; + + return token->value; } + catch (...) + { + if (region->KeyMapHook::is_linked()) + key_map.erase(key_map.iterator_to(*region)); - if (!token->cleaned_up) - token_holder.cleanup(token_lock, cache_lock); + freeRegion(*region); + throw; + } + } - if (was_calculated) - *was_calculated = true; - return token->value; + struct Statistics + { + size_t total_chunks_size = 0; + size_t total_allocated_size = 0; + size_t total_size_currently_initialized = 0; + size_t total_size_in_use = 0; + + size_t num_chunks = 0; + size_t num_regions = 0; + size_t num_free_regions = 0; + size_t num_regions_in_use = 0; + size_t num_keyed_regions = 0; + + size_t hits = 0; + size_t concurrent_hits = 0; + size_t misses = 0; + }; + + Statistics getStatistics() const + { + std::lock_guard cache_lock(mutex); + Statistics res; + + res.total_chunks_size = total_chunks_size; + res.total_allocated_size = total_allocated_size; + res.total_size_currently_initialized = total_size_currently_initialized.load(std::memory_order_relaxed); + res.total_size_in_use = total_size_in_use; + + res.num_chunks = chunks.size(); + res.num_regions = adjacency_list.size(); + res.num_free_regions = size_multimap.size(); + res.num_regions_in_use = adjacency_list.size() - size_multimap.size() - lru_list.size(); + res.num_keyed_regions = key_map.size(); + + res.hits = hits.load(std::memory_order_relaxed); + res.concurrent_hits = concurrent_hits.load(std::memory_order_relaxed); + res.misses = misses.load(std::memory_order_relaxed); + + return res; } }; diff --git a/dbms/src/Common/tests/array_cache.cpp b/dbms/src/Common/tests/array_cache.cpp index e83ee742221..7d1e68f8d1c 100644 --- a/dbms/src/Common/tests/array_cache.cpp +++ b/dbms/src/Common/tests/array_cache.cpp @@ -1,35 +1,75 @@ #include +#include +#include #include +#include int main(int argc, char ** argv) { + size_t cache_size = DB::parse(argv[1]); + size_t num_threads = DB::parse(argv[2]); + size_t num_iterations = DB::parse(argv[3]); + size_t region_max_size = DB::parse(argv[4]); + size_t max_key = DB::parse(argv[5]); + using Cache = ArrayCache; - Cache cache(1024 * 1024 * 1024); + Cache cache(cache_size); - Cache::HolderPtr holder = cache.getOrSet(1, - [] + std::vector threads; + for (size_t i = 0; i < num_threads; ++i) { - return 100; - }, - [](void * ptr, int & payload) - { - payload = 123; - }, nullptr); + threads.emplace_back([&] + { + std::mt19937 generator(randomSeed()); - std::cerr << holder->payload() << "\n"; + for (size_t i = 0; i < num_iterations; ++i) + { + size_t size = std::uniform_int_distribution(1, region_max_size)(generator); + int key = std::uniform_int_distribution(1, max_key)(generator); - holder = cache.getOrSet(1, - [] - { - return 100; - }, - [](void * ptr, int & payload) - { - payload = 456; - }, nullptr); + cache.getOrSet( + key, + [=]{ return size; }, + [=](void * ptr, int & payload) + { + payload = i; + memset(ptr, i, size); + }, + nullptr); + } + }); + } - std::cerr << holder->payload() << "\n"; + std::atomic_bool stop{}; + + std::thread stats_thread([&] + { + while (!stop) + { + sleep(1); + Cache::Statistics statistics = cache.getStatistics(); + std::cerr + << "total_chunks_size: " << statistics.total_chunks_size << "\n" + << "total_allocated_size: " << statistics.total_allocated_size << "\n" + << "total_size_currently_initialized: " << statistics.total_size_currently_initialized << "\n" + << "total_size_in_use: " << statistics.total_size_in_use << "\n" + << "num_chunks: " << statistics.num_chunks << "\n" + << "num_regions: " << statistics.num_regions << "\n" + << "num_free_regions: " << statistics.num_free_regions << "\n" + << "num_regions_in_use: " << statistics.num_regions_in_use << "\n" + << "num_keyed_regions: " << statistics.num_keyed_regions << "\n" + << "hits: " << statistics.hits << "\n" + << "concurrent_hits: " << statistics.concurrent_hits << "\n" + << "misses: " << statistics.misses << "\n\n"; + } + }); + + for (auto & thread : threads) + thread.join(); + + stop = true; + stats_thread.join(); return 0; }