clickhouse: added LRUCache and changed UncompressedCache to use it; added a test, fixed another test. [#METR-9633]

This commit is contained in:
Michael Kolupaev 2014-02-11 11:05:02 +00:00
parent 5fa9d07b35
commit df2e012943
5 changed files with 235 additions and 69 deletions

View File

@ -0,0 +1,150 @@
#pragma once
#include <unordered_map>
#include <list>
#include <memory>
#include <Poco/ScopedLock.h>
#include <Poco/Mutex.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Core/Exception.h>
namespace DB
{
template <typename T>
struct TrivialWeightFunction
{
size_t operator()(const T & x) const
{
return 1;
}
};
/** Кеш, вытесняющий долго не использовавшиеся записи. thread-safe.
* WeightFunction - тип, оператор () которого принимает Mapped и возвращает "вес" (примерный размер) этого значения.
* Кеш начинает выбрасывать значения, когда их суммарный вес превышает max_size.
* После вставки значения его вес не должен меняться.
*/
template <typename TKey, typename TMapped, typename HashFunction = std::hash<TMapped>, typename WeightFunction = TrivialWeightFunction<TMapped> >
class LRUCache
{
public:
typedef TKey Key;
typedef TMapped Mapped;
typedef std::shared_ptr<Mapped> MappedPtr;
LRUCache(size_t max_size_)
: max_size(std::max(1ul, max_size_)), current_size(0), hits(0), misses(0) {}
MappedPtr get(const Key & key)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
CellsIterator it = cells.find(key);
if (it == cells.end())
{
++misses;
return MappedPtr();
}
++hits;
Cell & cell = it->second;
/// Переместим ключ в конец очереди. Итератор остается валидным.
queue.splice(queue.end(), queue, cell.queue_iterator);
return cell.value;
}
void set(const Key & key, MappedPtr mapped)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::pair<CellsIterator, bool> it =
cells.emplace(std::piecewise_construct, std::forward_as_tuple(key), std::forward_as_tuple());
Cell & cell = it.first->second;
bool inserted = it.second;
if (inserted)
{
cell.queue_iterator = queue.insert(queue.end(), key);
}
else
{
current_size -= cell.size;
queue.splice(queue.end(), queue, cell.queue_iterator);
}
cell.value = mapped;
cell.size = cell.value ? weight_function(*cell.value) : 0;
current_size += cell.size;
removeOverflow();
}
void getStats(size_t & out_hits, size_t & out_misses) const volatile
{
/// Синхронизация не нужна.
out_hits = hits;
out_misses = misses;
}
size_t weight() const
{
return current_size;
}
size_t count() const
{
return queue.size();
}
private:
typedef std::list<Key> LRUQueue;
typedef typename LRUQueue::iterator LRUQueueIterator;
struct Cell
{
MappedPtr value;
size_t size;
LRUQueueIterator queue_iterator;
};
typedef std::unordered_map<Key, Cell, HashFunction> Cells;
typedef typename Cells::iterator CellsIterator;
LRUQueue queue;
Cells cells;
size_t max_size;
size_t current_size;
Poco::FastMutex mutex;
size_t hits;
size_t misses;
WeightFunction weight_function;
void removeOverflow()
{
while (current_size > max_size && queue.size() > 1)
{
const Key & key = queue.front();
CellsIterator it = cells.find(key);
current_size -= it->second.size;
cells.erase(it);
queue.pop_front();
}
if (queue.size() != cells.size() || current_size > (1ull << 63))
{
queue.clear();
cells.clear();
current_size = 0;
throw Exception("LRUCache became inconsistent. There must be a bug in it. Clearing it for now.",
ErrorCodes::LOGICAL_ERROR);
}
}
};
}

View File

@ -26,7 +26,7 @@ private:
size_t file_pos;
/// Кусок данных из кэша, или кусок считанных данных, который мы положим в кэш.
UncompressedCache::CellPtr owned_cell;
UncompressedCache::MappedPtr owned_cell;
void initInput()
{
@ -41,9 +41,7 @@ private:
{
/// Проверим наличие разжатого блока в кэше, захватим владение этим блоком, если он есть.
UInt128 key = {0, 0};
key = cache->hash(path, file_pos);
UInt128 key = cache->hash(path, file_pos);
owned_cell = cache->get(key);
if (!owned_cell)
@ -52,8 +50,7 @@ private:
initInput();
file_in->seek(file_pos);
owned_cell = new UncompressedCache::Cell;
owned_cell->key = key;
owned_cell.reset(new UncompressedCacheCell);
size_t size_decompressed;
owned_cell->compressed_size = readCompressedData(size_decompressed);
@ -64,7 +61,7 @@ private:
decompress(owned_cell->data.m_data, size_decompressed);
/// Положим данные в кэш.
cache->set(owned_cell);
cache->set(key, owned_cell);
}
}

View File

@ -1,10 +1,6 @@
#pragma once
#include <vector>
#include <Poco/SharedPtr.h>
#include <Poco/Mutex.h>
#include <DB/Common/LRUCache.h>
#include <DB/Common/SipHash.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/IO/BufferWithOwnMemory.h>
@ -15,38 +11,23 @@ namespace DB
{
/** Кэш разжатых блоков для CachedCompressedReadBuffer. thread-safe.
* NOTE Использовать LRU вместо простой кэш-таблицы.
*/
class UncompressedCache
struct UncompressedCacheCell
{
public:
struct Cell
{
UInt128 key;
Memory data;
size_t compressed_size;
Memory data;
size_t compressed_size;
};
Cell() { key.first = 0; key.second = 0; compressed_size = 0; }
};
/// В ячейках кэш-таблицы лежат SharedPtr-ы на разжатые блоки. Это нужно, чтобы можно было достать ячейку, захватив владение ею.
typedef Poco::SharedPtr<Cell> CellPtr;
typedef std::vector<CellPtr> Cells;
/** Кэш разжатых блоков для CachedCompressedReadBuffer. thread-safe.
*/
class UncompressedCache : public LRUCache<UInt128, UncompressedCacheCell, UInt128TrivialHash>
{
private:
size_t num_cells;
Cells cells;
mutable Poco::FastMutex mutex;
mutable size_t hits;
mutable size_t misses;
typedef LRUCache<UInt128, UncompressedCacheCell, UInt128TrivialHash> Base;
public:
UncompressedCache(size_t num_cells_)
: num_cells(num_cells_), cells(num_cells), hits(0), misses(0)
{
}
UncompressedCache(size_t max_size_in_cells)
: Base(max_size_in_cells) {}
/// Посчитать ключ от пути к файлу и смещения.
static UInt128 hash(const String & path_to_file, size_t offset)
@ -61,41 +42,16 @@ public:
return key;
}
CellPtr get(UInt128 key) const
MappedPtr get(const Key & key)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
MappedPtr res = Base::get(key);
CellPtr cell = cells[key.first % num_cells];
if (cell && cell->key == key)
{
if (res)
ProfileEvents::increment(ProfileEvents::UncompressedCacheHits);
++hits;
return cell;
}
else
{
ProfileEvents::increment(ProfileEvents::UncompressedCacheMisses);
++misses;
return NULL;
}
}
void set(const CellPtr & new_cell)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
CellPtr & cell = cells[new_cell->key.first % num_cells];
if (!cell || cell->key != new_cell->key)
cell = new_cell;
}
void getStats(size_t & out_hits, size_t & out_misses) const volatile
{
/// Синхронизация не нужна.
out_hits = hits;
out_misses = misses;
return res;
}
};

View File

@ -0,0 +1,63 @@
#include <DB/Common/LRUCache.h>
#include <iostream>
#include <string>
using namespace DB;
struct Weight
{
size_t operator()(const std::string & s) const
{
return s.size();
}
};
void fail()
{
std::cout << "failed" << std::endl;
exit(1);
}
typedef LRUCache<std::string, std::string, std::hash<std::string>, Weight> Cache;
typedef Cache::MappedPtr MappedPtr;
MappedPtr ptr(const std::string & s)
{
return MappedPtr(new std::string(s));
}
int main()
{
try
{
Cache cache(10);
if (cache.get("asd")) fail();
cache.set("asd", ptr("qwe"));
if (*cache.get("asd") != "qwe") fail();
cache.set("zxcv", ptr("12345"));
cache.set("01234567891234567", ptr("--"));
if (*cache.get("zxcv") != "12345") fail();
if (*cache.get("asd") != "qwe") fail();
if (*cache.get("01234567891234567") != "--") fail();
if (cache.get("123x")) fail();
cache.set("321x", ptr("+"));
if (cache.get("zxcv")) fail();
if (*cache.get("asd") != "qwe") fail();
if (*cache.get("01234567891234567") != "--") fail();
if (cache.get("123x")) fail();
if (*cache.get("321x") != "+") fail();
if (cache.weight() != 6) fail();
if (cache.count() != 3) fail();
std::cout << "passed" << std::endl;
}
catch (...)
{
fail();
}
return 0;
}

View File

@ -24,7 +24,7 @@ int main(int argc, char ** argv)
{
Stopwatch watch;
CachedCompressedReadBuffer in(path, &cache, 0);
CachedCompressedReadBuffer in(path, &cache);
WriteBufferFromFile out("/dev/null");
copyData(in, out);
@ -36,7 +36,7 @@ int main(int argc, char ** argv)
{
Stopwatch watch;
CachedCompressedReadBuffer in(path, &cache, 0);
CachedCompressedReadBuffer in(path, &cache);
WriteBufferFromFile out("/dev/null");
copyData(in, out);