Merge pull request #60552 from ClickHouse/pc2

Userspace page cache again
This commit is contained in:
Alexey Milovidov 2024-03-04 17:47:35 +03:00 committed by GitHub
commit de6c5d3a40
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 1855 additions and 76 deletions

View File

@ -275,6 +275,16 @@ Cache profile events:
- `CachedWriteBufferCacheWriteBytes`, `CachedWriteBufferCacheWriteMicroseconds`
## Using in-memory cache (userspace page cache) {#userspace-page-cache}
The File Cache described above stores cached data in local files. Alternatively, object-store-based disks can be configured to use "Userspace Page Cache", which is RAM-only. Userspace page cache is recommended only if file cache can't be used for some reason, e.g. if the machine doesn't have a local disk at all. Note that file cache effectively uses RAM for caching too, since the OS caches contents of local files.
To enable userspace page cache for disks that don't use file cache, use setting `use_page_cache_for_disks_without_file_cache`.
By default, on Linux, the userspace page cache will use all available memory, similar to the OS page cache. In tools like `top` and `ps`, the clickhouse server process will typically show resident set size near 100% of the machine's RAM - this is normal, and most of this memory is actually reclaimable by the OS on memory pressure (`MADV_FREE`). This behavior can be disabled with server setting `page_cache_use_madv_free = 0`, making the userspace page cache just use a fixed amount of memory `page_cache_size` with no special interaction with the OS. On Mac OS, `page_cache_use_madv_free` is always disabled as it doesn't have lazy `MADV_FREE`.
Unfortunately, `page_cache_use_madv_free` makes it difficult to tell if the server is close to running out of memory, since the RSS metric becomes useless. Async metric `UnreclaimableRSS` shows the amount of physical memory used by the server, excluding the memory reclaimable by the OS: `select value from system.asynchronous_metrics where metric = 'UnreclaimableRSS'`. Use it for monitoring instead of RSS. This metric is only available if `page_cache_use_madv_free` is enabled.
## Storing Data on Web Server {#storing-data-on-webserver}
There is a tool `clickhouse-static-files-uploader`, which prepares a data directory for a given table (`SELECT data_paths FROM system.tables WHERE name = 'table_name'`). For each table you need, you get a directory of files. These files can be uploaded to, for example, a web server with static files. After this preparation, you can load this table into any ClickHouse server via `DiskWeb`.

View File

@ -56,9 +56,9 @@ ls -1 flightlist_*.csv.gz | xargs -P100 -I{} bash -c 'gzip -c -d "{}" | clickhou
- 这里我们将文件列表(`ls -1 flightlist_*.csv.gz`)传递给`xargs`以进行并行处理。 `xargs -P100` 指定最多使用 100 个并行工作程序,但由于我们只有 30 个文件,工作程序的数量将只有 30 个。
- 对于每个文件,`xargs` 将通过 `bash -c` 为每个文件运行一个脚本文件。该脚本通过使用 `{}` 表示文件名占位符,然后 `xargs` 由命令进行填充(使用 `-I{}`)。
- 该脚本会将文件 (`gzip -c -d "{}"`) 解压缩到标准输出(`-c` 参数),并将输出重定向到 `clickhouse-client`。
- 我们还要求使用扩展解析器解析 [DateTime](../../sql-reference/data-types/datetime.md) 字段 ([--date_time_input_format best_effort](../../operations/settings/ settings.md#settings-date_time_input_format)) 以识别具有时区偏移的 ISO-8601 格式。
- 我们还要求使用扩展解析器解析 [DateTime](/docs/zh/sql-reference/data-types/datetime.md) 字段 ([--date_time_input_format best_effort](/docs/zh/operations/settings/settings.md#settings-date_time_input_format)) 以识别具有时区偏移的 ISO-8601 格式。
最后,`clickhouse-client` 会以 [CSVWithNames](../../interfaces/formats.md#csvwithnames) 格式读取输入数据然后执行插入。
最后,`clickhouse-client` 会以 [CSVWithNames](/docs/zh/interfaces/formats.md#csvwithnames) 格式读取输入数据然后执行插入。
并行导入需要 24 秒。

View File

@ -1228,6 +1228,13 @@ try
}
global_context->setMarkCache(mark_cache_policy, mark_cache_size, mark_cache_size_ratio);
size_t page_cache_size = server_settings.page_cache_size;
if (page_cache_size != 0)
global_context->setPageCache(
server_settings.page_cache_chunk_size, server_settings.page_cache_mmap_size,
page_cache_size, server_settings.page_cache_use_madv_free,
server_settings.page_cache_use_transparent_huge_pages);
String index_uncompressed_cache_policy = server_settings.index_uncompressed_cache_policy;
size_t index_uncompressed_cache_size = server_settings.index_uncompressed_cache_size;
double index_uncompressed_cache_size_ratio = server_settings.index_uncompressed_cache_size_ratio;

View File

@ -163,6 +163,7 @@ enum class AccessType
M(SYSTEM_DROP_FILESYSTEM_CACHE, "SYSTEM DROP FILESYSTEM CACHE, DROP FILESYSTEM CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_DISTRIBUTED_CACHE, "SYSTEM DROP DISTRIBUTED CACHE, DROP DISTRIBUTED CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_SYNC_FILESYSTEM_CACHE, "SYSTEM REPAIR FILESYSTEM CACHE, REPAIR FILESYSTEM CACHE, SYNC FILESYSTEM CACHE", GLOBAL, SYSTEM) \
M(SYSTEM_DROP_PAGE_CACHE, "SYSTEM DROP PAGE CACHE, DROP PAGE CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \

688
src/Common/PageCache.cpp Normal file
View File

@ -0,0 +1,688 @@
#include "PageCache.h"
#include <unistd.h>
#include <sys/mman.h>
#include <Common/logger_useful.h>
#include <Common/formatReadable.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
#include <base/hex.h>
#include <base/errnoToString.h>
#include <base/getPageSize.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
namespace ProfileEvents
{
extern const Event PageCacheChunkMisses;
extern const Event PageCacheChunkShared;
extern const Event PageCacheChunkDataHits;
extern const Event PageCacheChunkDataPartialHits;
extern const Event PageCacheChunkDataMisses;
extern const Event PageCacheBytesUnpinnedRoundedToPages;
extern const Event PageCacheBytesUnpinnedRoundedToHugePages;
}
namespace DB
{
namespace ErrorCodes
{
extern const int SYSTEM_ERROR;
extern const int MEMORY_LIMIT_EXCEEDED;
extern const int CANNOT_ALLOCATE_MEMORY;
extern const int INVALID_SETTING_VALUE;
extern const int FILE_DOESNT_EXIST;
}
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunknown-warning-option"
#pragma clang diagnostic ignored "-Wreadability-make-member-function-const"
PinnedPageChunk::PinnedPageChunk(PinnedPageChunk && c) noexcept
: cache(std::exchange(c.cache, nullptr)), chunk(std::exchange(c.chunk, nullptr)) {}
PinnedPageChunk & PinnedPageChunk::operator=(PinnedPageChunk && c) noexcept
{
if (cache)
cache->removeRef(chunk);
cache = std::exchange(c.cache, nullptr);
chunk = std::exchange(c.chunk, nullptr);
return *this;
}
PinnedPageChunk::~PinnedPageChunk() noexcept
{
if (cache)
cache->removeRef(chunk);
}
PinnedPageChunk::PinnedPageChunk(PageCache * cache_, PageChunk * chunk_) noexcept : cache(cache_), chunk(chunk_) {}
const PageChunk * PinnedPageChunk::getChunk() const { return chunk; }
bool PinnedPageChunk::markPagePopulated(size_t page_idx)
{
bool r = chunk->pages_populated.set(page_idx);
return r;
}
void PinnedPageChunk::markPrefixPopulated(size_t bytes)
{
for (size_t i = 0; i < (bytes + chunk->page_size - 1) / chunk->page_size; ++i)
markPagePopulated(i);
}
bool PinnedPageChunk::isPrefixPopulated(size_t bytes) const
{
for (size_t i = 0; i < (bytes + chunk->page_size - 1) / chunk->page_size; ++i)
if (!chunk->pages_populated.get(i))
return false;
return true;
}
AtomicBitSet::AtomicBitSet() = default;
void AtomicBitSet::init(size_t nn)
{
n = nn;
v = std::make_unique<std::atomic<UInt8>[]>((n + 7) / 8);
}
bool AtomicBitSet::get(size_t i) const
{
return (v[i / 8] & (1 << (i % 8))) != 0;
}
bool AtomicBitSet::any() const
{
for (size_t i = 0; i < (n + 7) / 8; ++i)
if (v[i])
return true;
return false;
}
bool AtomicBitSet::set(size_t i) const
{
UInt8 prev = v[i / 8].fetch_or(1 << (i % 8));
return (prev & (1 << (i % 8))) == 0;
}
bool AtomicBitSet::set(size_t i, bool val) const
{
if (val)
return set(i);
else
return unset(i);
}
bool AtomicBitSet::unset(size_t i) const
{
UInt8 prev = v[i / 8].fetch_and(~(1 << (i % 8)));
return (prev & (1 << (i % 8))) != 0;
}
void AtomicBitSet::unsetAll() const
{
for (size_t i = 0; i < (n + 7) / 8; ++i)
v[i].store(0, std::memory_order_relaxed);
}
PageCache::PageCache(size_t bytes_per_chunk, size_t bytes_per_mmap, size_t bytes_total, bool use_madv_free_, bool use_huge_pages_)
: bytes_per_page(getPageSize())
, use_madv_free(use_madv_free_)
, use_huge_pages(use_huge_pages_)
, rng(randomSeed())
{
if (bytes_per_chunk == 0 || bytes_per_mmap == 0)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Userspace page cache chunk size and mmap size can't be zero.");
if (use_huge_pages)
{
use_huge_pages = false;
bool print_warning = false;
#ifdef OS_LINUX
try
{
ReadBufferFromFile in("/sys/kernel/mm/transparent_hugepage/hpage_pmd_size");
size_t huge_page_size;
readIntText(huge_page_size, in);
if (huge_page_size == 0 || huge_page_size % bytes_per_page != 0)
throw Exception(ErrorCodes::SYSTEM_ERROR, "Invalid huge page size reported by the OS: {}", huge_page_size);
/// THP can be configured to be 2 MiB or 1 GiB in size. 1 GiB is way too big for us.
if (huge_page_size <= (16 << 20))
{
pages_per_big_page = huge_page_size / bytes_per_page;
use_huge_pages = true;
}
else
{
LOG_WARNING(&Poco::Logger::get("PageCache"), "The OS huge page size is too large for our purposes: {} KiB. Using regular pages. Userspace page cache will be relatively slow.", huge_page_size);
}
}
catch (Exception & e)
{
if (e.code() != ErrorCodes::FILE_DOESNT_EXIST)
throw;
print_warning = true;
}
#else
print_warning = true;
#endif
if (print_warning)
LOG_WARNING(&Poco::Logger::get("PageCache"), "The OS doesn't support transparent huge pages. Userspace page cache will be relatively slow.");
}
pages_per_chunk = ((bytes_per_chunk - 1) / (bytes_per_page * pages_per_big_page) + 1) * pages_per_big_page;
chunks_per_mmap_target = (bytes_per_mmap - 1) / (bytes_per_page * pages_per_chunk) + 1;
max_mmaps = (bytes_total - 1) / (bytes_per_page * pages_per_chunk * chunks_per_mmap_target) + 1;
}
PageCache::~PageCache()
{
chassert(getPinnedSize() == 0);
}
size_t PageCache::pageSize() const { return bytes_per_page; }
size_t PageCache::chunkSize() const { return bytes_per_page * pages_per_chunk; }
size_t PageCache::maxChunks() const { return chunks_per_mmap_target * max_mmaps; }
size_t PageCache::getPinnedSize() const
{
std::unique_lock lock(global_mutex);
return (total_chunks - lru.size()) * bytes_per_page * pages_per_chunk;
}
PageCache::MemoryStats PageCache::getResidentSetSize() const
{
MemoryStats stats;
#ifdef OS_LINUX
if (use_madv_free)
{
std::unordered_set<UInt64> cache_mmap_addrs;
for (const auto & m : mmaps)
cache_mmap_addrs.insert(reinterpret_cast<UInt64>(m.ptr));
ReadBufferFromFile in("/proc/self/smaps");
/// Parse the smaps contents, which is text consisting of entries like this:
///
/// 117ba4a00000-117be4a00000 rw-p 00000000 00:00 0
/// Size: 1048576 kB
/// KernelPageSize: 4 kB
/// MMUPageSize: 4 kB
/// Rss: 539516 kB
/// Pss: 539516 kB
/// ...
auto read_token = [&]
{
String res;
while (!in.eof())
{
char c = *in.position();
if (c == '\n' || c == '\t' || c == ' ' || c == '-')
break;
res += c;
++in.position();
}
return res;
};
auto skip_whitespace = [&]
{
while (!in.eof())
{
char c = *in.position();
if (c != ' ' && c != '\t')
break;
++in.position();
}
};
bool current_range_is_cache = false;
size_t total_rss = 0;
size_t total_lazy_free = 0;
while (!in.eof())
{
String s = read_token();
if (!in.eof() && *in.position() == '-')
{
if (s.size() < 16)
s.insert(0, 16 - s.size(), '0');
UInt64 addr = unhexUInt<UInt64>(s.c_str());
current_range_is_cache = cache_mmap_addrs.contains(addr);
}
else if (s == "Rss:" || s == "LazyFree")
{
skip_whitespace();
size_t val;
readIntText(val, in);
skip_whitespace();
String unit = read_token();
if (unit != "kB")
throw Exception(ErrorCodes::SYSTEM_ERROR, "Unexpected units in /proc/self/smaps: {}", unit);
size_t bytes = val * 1024;
if (s == "Rss:")
{
total_rss += bytes;
if (current_range_is_cache)
stats.page_cache_rss += bytes;
}
else
total_lazy_free += bytes;
}
skipToNextLineOrEOF(in);
}
stats.unreclaimable_rss = total_rss - std::min(total_lazy_free, total_rss);
return stats;
}
#endif
stats.page_cache_rss = bytes_per_page * pages_per_chunk * total_chunks;
return stats;
}
PinnedPageChunk PageCache::getOrSet(PageCacheKey key, bool detached_if_missing, bool inject_eviction)
{
PageChunk * chunk;
/// Make sure we increment exactly one of the counters about the fate of a chunk lookup.
bool incremented_profile_events = false;
{
std::unique_lock lock(global_mutex);
auto * it = chunk_by_key.find(key);
if (it == chunk_by_key.end())
{
chunk = getFreeChunk(lock);
chassert(!chunk->key.has_value());
if (!detached_if_missing)
{
chunk->key = key;
chunk_by_key.insert({key, chunk});
}
ProfileEvents::increment(ProfileEvents::PageCacheChunkMisses);
incremented_profile_events = true;
}
else
{
chunk = it->getMapped();
size_t prev_pin_count = chunk->pin_count.fetch_add(1);
if (prev_pin_count == 0)
{
/// Not eligible for LRU eviction while pinned.
chassert(chunk->is_linked());
lru.erase(lru.iterator_to(*chunk));
if (detached_if_missing)
{
/// Peek the first page to see if it's evicted.
/// (Why not use the full probing procedure instead, restoreChunkFromLimbo()?
/// Right here we can't do it because of how the two mutexes are organized.
/// And we want to do the check+detach before unlocking global_mutex, because
/// otherwise we may detach a chunk pinned by someone else, which may be unexpected
/// for that someone else. Or maybe the latter is fine, dropCache() already does it.)
if (chunk->pages_populated.get(0) && reinterpret_cast<volatile std::atomic<char>*>(chunk->data)->load(std::memory_order_relaxed) == 0)
evictChunk(chunk, lock);
}
if (inject_eviction && chunk->key.has_value() && rng() % 10 == 0)
{
/// Simulate eviction of the chunk or some of its pages.
if (rng() % 2 == 0)
evictChunk(chunk, lock);
else
for (size_t i = 0; i < 20; ++i)
chunk->pages_populated.unset(rng() % (chunk->size / chunk->page_size));
}
}
else
{
ProfileEvents::increment(ProfileEvents::PageCacheChunkShared);
incremented_profile_events = true;
}
}
}
{
std::unique_lock chunk_lock(chunk->chunk_mutex);
if (chunk->pages_state == PageChunkState::Limbo)
{
auto [pages_restored, pages_evicted] = restoreChunkFromLimbo(chunk, chunk_lock);
chunk->pages_state = PageChunkState::Stable;
if (!incremented_profile_events)
{
if (pages_evicted == 0)
ProfileEvents::increment(ProfileEvents::PageCacheChunkDataHits);
else if (pages_evicted < pages_restored)
ProfileEvents::increment(ProfileEvents::PageCacheChunkDataPartialHits);
else
ProfileEvents::increment(ProfileEvents::PageCacheChunkDataMisses);
}
}
}
return PinnedPageChunk(this, chunk);
}
void PageCache::removeRef(PageChunk * chunk) noexcept
{
/// Fast path if this is not the last reference.
size_t prev_pin_count = chunk->pin_count.load();
if (prev_pin_count > 1 && chunk->pin_count.compare_exchange_strong(prev_pin_count, prev_pin_count - 1))
return;
{
std::unique_lock lock(global_mutex);
prev_pin_count = chunk->pin_count.fetch_sub(1);
if (prev_pin_count > 1)
return;
chassert(!chunk->is_linked());
if (chunk->key.has_value())
lru.push_back(*chunk);
else
/// Unpinning detached chunk. We'd rather reuse it soon, so put it at the front.
lru.push_front(*chunk);
}
{
std::unique_lock chunk_lock(chunk->chunk_mutex);
/// Need to be extra careful here because we unlocked global_mutex above, so other
/// getOrSet()/removeRef() calls could have happened during this brief period.
if (use_madv_free && chunk->pages_state == PageChunkState::Stable && chunk->pin_count.load() == 0)
{
sendChunkToLimbo(chunk, chunk_lock);
chunk->pages_state = PageChunkState::Limbo;
}
}
}
static void logUnexpectedSyscallError(std::string name)
{
std::string message = fmt::format("{} failed: {}", name, errnoToString());
LOG_WARNING(&Poco::Logger::get("PageCache"), "{}", message);
#if defined(ABORT_ON_LOGICAL_ERROR)
volatile bool true_ = true;
if (true_) // suppress warning about missing [[noreturn]]
abortOnFailedAssertion(message);
#endif
}
void PageCache::sendChunkToLimbo(PageChunk * chunk [[maybe_unused]], std::unique_lock<std::mutex> & /* chunk_mutex */) const noexcept
{
#ifdef MADV_FREE // if we're not on a very old version of Linux
chassert(chunk->size == bytes_per_page * pages_per_chunk);
size_t populated_pages = 0;
size_t populated_big_pages = 0;
for (size_t big_page_idx = 0; big_page_idx < pages_per_chunk / pages_per_big_page; ++big_page_idx)
{
bool big_page_populated = false;
for (size_t sub_idx = 0; sub_idx < pages_per_big_page; ++sub_idx)
{
size_t idx = big_page_idx * pages_per_big_page + sub_idx;
if (!chunk->pages_populated.get(idx))
continue;
big_page_populated = true;
populated_pages += 1;
auto & byte = reinterpret_cast<volatile std::atomic<char> &>(chunk->data[idx * bytes_per_page]);
chunk->first_bit_of_each_page.set(idx, (byte.load(std::memory_order_relaxed) & 1) != 0);
byte.fetch_or(1, std::memory_order_relaxed);
}
if (big_page_populated)
populated_big_pages += 1;
}
int r = madvise(chunk->data, chunk->size, MADV_FREE);
if (r != 0)
logUnexpectedSyscallError("madvise(MADV_FREE)");
ProfileEvents::increment(ProfileEvents::PageCacheBytesUnpinnedRoundedToPages, bytes_per_page * populated_pages);
ProfileEvents::increment(ProfileEvents::PageCacheBytesUnpinnedRoundedToHugePages, bytes_per_page * pages_per_big_page * populated_big_pages);
#endif
}
std::pair<size_t, size_t> PageCache::restoreChunkFromLimbo(PageChunk * chunk, std::unique_lock<std::mutex> & /* chunk_mutex */) const noexcept
{
static_assert(sizeof(std::atomic<char>) == 1, "char is not atomic?");
// Make sure our strategic memory reads/writes are not reordered or optimized out.
auto * data = reinterpret_cast<volatile std::atomic<char> *>(chunk->data);
size_t pages_restored = 0;
size_t pages_evicted = 0;
for (size_t idx = 0; idx < chunk->size / bytes_per_page; ++idx)
{
if (!chunk->pages_populated.get(idx))
continue;
/// After MADV_FREE, it's guaranteed that:
/// * writing to the page makes it non-freeable again (reading doesn't),
/// * after the write, the page contents are either fully intact or fully zero-filled,
/// * even before the write, reads return either intact data (if the page wasn't freed) or zeroes (if it was, and the read page-faulted).
/// (And when doing the write there's no way to tell whether it page-faulted or not, AFAICT; that would make our life much easier!)
///
/// With that in mind, we do the following dance to bring the page back from the MADV_FREE limbo:
/// 0. [in advance] Before doing MADV_FREE, make sure the page's first byte is not zero.
/// We do it by setting the lowest bit of the first byte to 1, after saving the original value of that bit into a bitset.
/// 1. Read the second byte.
/// 2. Write the second byte back. This makes the page non-freeable.
/// 3. Read the first byte.
/// 3a. If it's zero, the page was freed.
/// Set the second byte to 0, to keep the buffer zero-filled if the page was freed
/// between steps 1 and 2.
/// 3b. If it's nonzero, the page is intact.
/// Restore the lowest bit of the first byte to the saved original value from the bitset.
char second_byte = data[idx * bytes_per_page + 1].load(std::memory_order_relaxed);
data[idx * bytes_per_page + 1].store(second_byte, std::memory_order_relaxed);
char first_byte = data[idx * bytes_per_page].load(std::memory_order_relaxed);
if (first_byte == 0)
{
pages_evicted += 1;
data[idx * bytes_per_page + 1].store(0, std::memory_order_relaxed);
chunk->pages_populated.unset(idx);
}
else
{
pages_restored += 1;
chassert(first_byte & 1);
if (!chunk->first_bit_of_each_page.get(idx))
data[idx * bytes_per_page].fetch_and(~1, std::memory_order_relaxed);
}
}
return {pages_restored, pages_evicted};
}
PageChunk * PageCache::getFreeChunk(std::unique_lock<std::mutex> & lock /* global_mutex */)
{
if (lru.empty() || (mmaps.size() < max_mmaps && lru.front().key.has_value()))
addMmap(lock);
if (lru.empty())
throw Exception(ErrorCodes::MEMORY_LIMIT_EXCEEDED, "All chunks in the entire page cache ({:.3} GiB) are pinned.",
bytes_per_page * pages_per_chunk * total_chunks * 1. / (1l << 30));
PageChunk * chunk = &lru.front();
lru.erase(lru.iterator_to(*chunk));
size_t prev_pin_count = chunk->pin_count.fetch_add(1);
chassert(prev_pin_count == 0);
evictChunk(chunk, lock);
return chunk;
}
void PageCache::evictChunk(PageChunk * chunk, std::unique_lock<std::mutex> & /* global_mutex */)
{
if (chunk->key.has_value())
{
size_t erased = chunk_by_key.erase(chunk->key.value());
chassert(erased);
chunk->key.reset();
}
chunk->state.reset();
/// This is tricky. We're not holding the chunk_mutex, so another thread might be running
/// sendChunkToLimbo() or even restoreChunkFromLimbo() on this chunk right now.
///
/// Nevertheless, it's correct and sufficient to clear pages_populated here because sendChunkToLimbo()
/// and restoreChunkFromLimbo() only touch pages_populated (only unsetting the bits),
/// first_bit_of_each_page, and the data; and we don't care about first_bit_of_each_page and the data.
///
/// This is precarious, but I don't have better ideas. Note that this clearing (or something else)
/// must be done before unlocking the global_mutex because otherwise another call to getOrSet() might
/// return this chunk before we clear it.
chunk->pages_populated.unsetAll();
}
void PageCache::addMmap(std::unique_lock<std::mutex> & /* global_mutex */)
{
/// ASLR by hand.
void * address_hint = reinterpret_cast<void *>(std::uniform_int_distribution<size_t>(0x100000000000UL, 0x700000000000UL)(rng));
mmaps.emplace_back(bytes_per_page, pages_per_chunk, pages_per_big_page, chunks_per_mmap_target, address_hint, use_huge_pages);
size_t num_chunks = mmaps.back().num_chunks;
total_chunks += num_chunks;
for (size_t i = 0; i < num_chunks; ++i)
/// Link in reverse order, so they get assigned in increasing order. Not important, just seems nice.
lru.push_front(mmaps.back().chunks[num_chunks - 1 - i]);
}
void PageCache::dropCache()
{
std::unique_lock lock(global_mutex);
/// Detach and free unpinned chunks.
bool logged_error = false;
for (PageChunk & chunk : lru)
{
evictChunk(&chunk, lock);
if (use_madv_free)
{
/// This might happen in parallel with sendChunkToLimbo() or restoreChunkFromLimbo(), but it's ok.
int r = madvise(chunk.data, chunk.size, MADV_DONTNEED);
if (r != 0 && !logged_error)
{
logUnexpectedSyscallError("madvise(MADV_DONTNEED)");
logged_error = true;
}
}
}
/// Detach pinned chunks.
for (auto [key, chunk] : chunk_by_key)
{
chassert(chunk->key == key);
chassert(chunk->pin_count > 0); // otherwise it would have been evicted above
chunk->key.reset();
}
chunk_by_key.clear();
}
PageCache::Mmap::Mmap(size_t bytes_per_page_, size_t pages_per_chunk_, size_t pages_per_big_page_, size_t num_chunks_, void * address_hint, bool use_huge_pages_)
{
num_chunks = num_chunks_;
size = bytes_per_page_ * pages_per_chunk_ * num_chunks;
size_t alignment = bytes_per_page_ * pages_per_big_page_;
address_hint = reinterpret_cast<void*>(reinterpret_cast<UInt64>(address_hint) / alignment * alignment);
auto temp_chunks = std::make_unique<PageChunk[]>(num_chunks);
int flags = MAP_PRIVATE | MAP_ANONYMOUS;
#ifdef OS_LINUX
flags |= MAP_NORESERVE;
#endif
ptr = mmap(address_hint, size, PROT_READ | PROT_WRITE, flags, -1, 0);
if (MAP_FAILED == ptr)
throw ErrnoException(ErrorCodes::CANNOT_ALLOCATE_MEMORY, fmt::format("Cannot mmap {}.", ReadableSize(size)));
if (reinterpret_cast<UInt64>(ptr) % bytes_per_page_ != 0)
{
munmap(ptr, size);
throw Exception(ErrorCodes::SYSTEM_ERROR, "mmap returned unaligned address: {}", ptr);
}
void * chunks_start = ptr;
#ifdef OS_LINUX
if (madvise(ptr, size, MADV_DONTDUMP) != 0)
logUnexpectedSyscallError("madvise(MADV_DONTDUMP)");
if (madvise(ptr, size, MADV_DONTFORK) != 0)
logUnexpectedSyscallError("madvise(MADV_DONTFORK)");
if (use_huge_pages_)
{
if (reinterpret_cast<UInt64>(ptr) % alignment != 0)
{
LOG_DEBUG(&Poco::Logger::get("PageCache"), "mmap() returned address not aligned on huge page boundary.");
chunks_start = reinterpret_cast<void*>((reinterpret_cast<UInt64>(ptr) / alignment + 1) * alignment);
chassert(reinterpret_cast<UInt64>(chunks_start) % alignment == 0);
num_chunks -= 1;
}
if (madvise(ptr, size, MADV_HUGEPAGE) != 0)
LOG_WARNING(&Poco::Logger::get("PageCache"),
"madvise(MADV_HUGEPAGE) failed: {}. Userspace page cache will be relatively slow.", errnoToString());
}
#else
(void)use_huge_pages_;
#endif
chunks = std::move(temp_chunks);
for (size_t i = 0; i < num_chunks; ++i)
{
PageChunk * chunk = &chunks[i];
chunk->data = reinterpret_cast<char *>(chunks_start) + bytes_per_page_ * pages_per_chunk_ * i;
chunk->size = bytes_per_page_ * pages_per_chunk_;
chunk->page_size = bytes_per_page_;
chunk->big_page_size = bytes_per_page_ * pages_per_big_page_;
chunk->pages_populated.init(pages_per_chunk_);
chunk->first_bit_of_each_page.init(pages_per_chunk_);
}
}
PageCache::Mmap::Mmap(Mmap && m) noexcept : ptr(std::exchange(m.ptr, nullptr)), size(std::exchange(m.size, 0)), chunks(std::move(m.chunks)), num_chunks(std::exchange(m.num_chunks, 0)) {}
PageCache::Mmap::~Mmap() noexcept
{
if (ptr && 0 != munmap(ptr, size))
logUnexpectedSyscallError("munmap");
}
void FileChunkState::reset() {}
PageCacheKey FileChunkAddress::hash() const
{
SipHash hash(offset);
hash.update(path.data(), path.size());
if (!file_version.empty())
{
hash.update("\0", 1);
hash.update(file_version.data(), file_version.size());
}
return hash.get128();
}
std::string FileChunkAddress::toString() const
{
return fmt::format("{}:{}{}{}", path, offset, file_version.empty() ? "" : ":", file_version);
}
#pragma clang diagnostic pop
}

299
src/Common/PageCache.h Normal file
View File

@ -0,0 +1,299 @@
#pragma once
#include <boost/intrusive/list.hpp>
#include <pcg_random.hpp>
#include <Common/randomSeed.h>
#include <Core/Types.h>
#include <Common/HashTable/HashMap.h>
/// "Userspace page cache"
/// A cache for contents of remote files.
/// Uses MADV_FREE to allow Linux to evict pages from our cache under memory pressure.
/// Typically takes up almost all of the available memory, similar to the actual page cache.
///
/// Intended for caching data retrieved from distributed cache, but can be used for other things too,
/// just replace FileChunkState with a discriminated union, or something, if needed.
///
/// There are two fixed-size units of caching here:
/// * OS pages, typically 4 KiB each.
/// * Page chunks, 2 MiB each (configurable with page_cache_block_size setting).
///
/// Each file is logically split into aligned 2 MiB blocks, which are mapped to page chunks inside the cache.
/// They are cached independently from each other.
///
/// Each page chunk has a contiguous 2 MiB buffer that can be pinned and directly used e.g. by ReadBuffers.
/// While pinned (by at least one PinnedPageChunk), the pages are not reclaimable by the OS.
///
/// Inside each page chunk, any subset of pages may be populated. Unpopulated pages may or not be
/// mapped to any physical RAM. We maintain a bitmask that keeps track of which pages are populated.
/// Pages become unpopulated if they're reclaimed by the OS (when the page chunk is not pinned),
/// or if we just never populate them in the first place (e.g. if a file is shorter than 2 MiB we
/// still create a 2 MiB page chunk, but use only a prefix of it).
///
/// There are two separate eviction mechanisms at play:
/// * LRU eviction of page chunks in PageCache.
/// * OS reclaiming pages on memory pressure. We have no control over the eviction policy.
/// It probably picks the pages in the same order in which they were marked with MADV_FREE, so
/// effectively in the same LRU order as our policy in PageCache.
/// When using PageCache in oversubscribed fashion, using all available memory and relying on OS eviction,
/// the PageCache's eviction policy mostly doesn't matter. It just needs to be similar enough to the OS's
/// policy that we rarely evict chunks with unevicted pages.
///
/// We mmap memory directly instead of using allocator because this enables:
/// * knowing how much RAM the cache is using, via /proc/self/smaps,
/// * MADV_HUGEPAGE (use transparent huge pages - this makes MADV_FREE 10x less slow),
/// * MAP_NORESERVE (don't reserve swap space - otherwise large mmaps usually fail),
/// * MADV_DONTDUMP (don't include in core dumps),
/// * page-aligned addresses without padding.
///
/// madvise(MADV_FREE) call is slow: ~6 GiB/s (doesn't scale with more threads). Enabling transparent
/// huge pages (MADV_HUGEPAGE) makes it 10x less slow, so we do that. That makes the physical RAM allocation
/// work at 2 MiB granularity instead of 4 KiB, so the cache becomes less suitable for small files.
/// If this turns out to be a problem, we may consider allowing different mmaps to have different flags,
/// some having no huge pages.
/// Note that we do our bookkeeping at small-page granularity even if huge pages are enabled.
///
/// It's unfortunate that Linux's MADV_FREE eviction doesn't use the two-list strategy like the real
/// page cache (IIUC, MADV_FREE puts the pages at the head of the inactive list, and they can never
/// get to the active list).
/// If this turns out to be a problem, we could make PageCache do chunk eviction based on observed
/// system memory usage, so that most eviction is done by us, and the MADV_FREE eviction kicks in
/// only as a last resort. Then we can make PageCache's eviction policy arbitrarily more sophisticated.
namespace DB
{
/// Hash of FileChunkAddress.
using PageCacheKey = UInt128;
/// Identifies a chunk of a file or object.
/// We assume that contents of such file/object don't change (without file_version changing), so
/// cache invalidation is needed.
struct FileChunkAddress
{
/// Path, usually prefixed with storage system name and anything else needed to make it unique.
/// E.g. "s3:<bucket>/<path>"
std::string path;
/// Optional string with ETag, or file modification time, or anything else.
std::string file_version;
size_t offset = 0;
PageCacheKey hash() const;
std::string toString() const;
};
struct AtomicBitSet
{
size_t n = 0;
std::unique_ptr<std::atomic<UInt8>[]> v;
AtomicBitSet();
void init(size_t n);
bool get(size_t i) const;
bool any() const;
/// These return true if the bit was changed, false if it already had the target value.
/// (These methods are logically not const, but clang insists that I make them const, and
/// '#pragma clang diagnostic ignored' doesn't seem to work.)
bool set(size_t i) const;
bool set(size_t i, bool val) const;
bool unset(size_t i) const;
void unsetAll() const;
};
enum class PageChunkState
{
/// Pages are not reclaimable by the OS, the buffer has correct contents.
Stable,
/// Pages are reclaimable by the OS, the buffer contents are altered (first bit of each page set to 1).
Limbo,
};
/// (This is a separate struct just in case we want to use this cache for other things in future.
/// Then this struct would be the customization point, while the rest of PageChunk can stay unchanged.)
struct FileChunkState
{
std::mutex download_mutex;
void reset();
};
using PageChunkLRUListHook = boost::intrusive::list_base_hook<>;
/// Cache entry.
struct PageChunk : public PageChunkLRUListHook
{
char * data;
size_t size; // in bytes
/// Page size for use in pages_populated and first_bit_of_each_page. Same as PageCache::pageSize().
size_t page_size;
/// Actual eviction granularity. Just for information. If huge pages are used, huge page size, otherwise page_size.
size_t big_page_size;
mutable FileChunkState state;
AtomicBitSet pages_populated;
private:
friend class PinnedPageChunk;
friend class PageCache;
/// If nullopt, the chunk is "detached", i.e. not associated with any key.
/// Detached chunks may still be pinned. Chunk may get detached even while pinned, in particular when dropping cache.
/// Protected by global_mutex.
std::optional<PageCacheKey> key;
/// Refcount for usage of this chunk. When zero, the pages are reclaimable by the OS, and
/// the PageChunk itself is evictable (linked into PageCache::lru).
std::atomic<size_t> pin_count {0};
/// Bit mask containing the first bit of data from each page. Needed for the weird probing procedure when un-MADV_FREE-ing the pages.
AtomicBitSet first_bit_of_each_page;
/// Locked when changing pages_state, along with the corresponding expensive MADV_FREE/un-MADV_FREE operation.
mutable std::mutex chunk_mutex;
/// Normally pin_count == 0 <=> state == PageChunkState::Limbo,
/// pin_count > 0 <=> state == PageChunkState::Stable.
/// This separate field is needed because of synchronization: pin_count is changed with global_mutex locked,
/// this field is changed with chunk_mutex locked, and we never have to lock both mutexes at once.
PageChunkState pages_state = PageChunkState::Stable;
};
class PageCache;
/// Handle for a cache entry. Neither the entry nor its pages can get evicted while there's at least one PinnedPageChunk pointing to it.
class PinnedPageChunk
{
public:
const PageChunk * getChunk() const;
/// Sets the bit in pages_populated. Returns true if it actually changed (i.e. was previously 0).
bool markPagePopulated(size_t page_idx);
/// Calls markPagePopulated() for pages 0..ceil(bytes/page_size).
void markPrefixPopulated(size_t bytes);
bool isPrefixPopulated(size_t bytes) const;
PinnedPageChunk() = default;
~PinnedPageChunk() noexcept;
PinnedPageChunk(PinnedPageChunk &&) noexcept;
PinnedPageChunk & operator=(PinnedPageChunk &&) noexcept;
private:
friend class PageCache;
PageCache * cache = nullptr;
PageChunk * chunk = nullptr;
PinnedPageChunk(PageCache * cache_, PageChunk * chunk_) noexcept;
};
class PageCache
{
public:
PageCache(size_t bytes_per_chunk, size_t bytes_per_mmap, size_t bytes_total, bool use_madv_free, bool use_huge_pages);
~PageCache();
/// Get or insert a chunk for the given key.
///
/// If detached_if_missing = true, and the key is not present in the cache, the returned chunk
/// won't be associated with the key and will be evicted as soon as it's unpinned.
/// It's like "get if exists, otherwise return null", but instead of null we return a usable
/// temporary buffer, for convenience. Pinning and page eviction make the story more complicated:
/// * If the chunk for this key is pinned, we return it even if it's not fully populated
/// (because PageCache doesn't know what "fully populated" means).
/// * If the chunk exists, but some of its pages were evicted, we detach it. (Currently we only
/// check the first page here.)
PinnedPageChunk getOrSet(PageCacheKey key, bool detached_if_missing, bool inject_eviction);
/// OS page size, e.g. 4 KiB on x86, 4 KiB or 64 KiB on aarch64.
///
/// If transparent huge pages are enabled, this is still the regular page size, and all our bookkeeping
/// is still based on regular page size (e.g. pages_populated), because (a) it's cheap anyway,
/// and (b) I'm not sure if Linux guarantees that MADV_FREE reclamation always happens at huge page
/// granularity, and wouldn't want to rely on this even if it does.
size_t pageSize() const;
size_t chunkSize() const;
size_t maxChunks() const;
struct MemoryStats
{
/// How many bytes of actual RAM are used for the cache pages. Doesn't include metadata
/// and overhead (e.g. PageChunk structs).
size_t page_cache_rss = 0;
/// Resident set size for the whole process, excluding any MADV_FREE pages (PageCache's or not).
/// This can be used as a more useful memory usage number for clickhouse server, instead of RSS.
/// Populated only if MADV_FREE is used, otherwise zero.
std::optional<size_t> unreclaimable_rss;
};
/// Reads /proc/self/smaps, so not very fast.
MemoryStats getResidentSetSize() const;
/// Total length of memory ranges currently pinned by PinnedPageChunk-s, including unpopulated pages.
size_t getPinnedSize() const;
/// Clears the key -> chunk mapping. Frees memory (MADV_DONTNEED) of all chunks that are not pinned.
/// Doesn't unmap any virtual memory. Detaches but doesn't free the pinned chunks.
/// Locks the global mutex for the duration of the operation, which may block queries for hundreds of milliseconds.
void dropCache();
private:
friend class PinnedPageChunk;
struct Mmap
{
void * ptr = nullptr;
size_t size = 0;
std::unique_ptr<PageChunk[]> chunks;
size_t num_chunks = 0; // might be smaller than chunks_per_mmap_target because of alignment
Mmap(Mmap &&) noexcept;
Mmap(size_t bytes_per_page, size_t pages_per_chunk, size_t pages_per_big_page, size_t num_chunks, void * address_hint, bool use_huge_pages_);
~Mmap() noexcept;
};
size_t bytes_per_page;
size_t pages_per_chunk;
size_t chunks_per_mmap_target;
size_t max_mmaps;
size_t pages_per_big_page = 1; // if huge pages are used, huge_page_size/page_size, otherwise 1
bool use_madv_free = true;
bool use_huge_pages = true;
mutable std::mutex global_mutex;
pcg64 rng;
std::vector<Mmap> mmaps;
size_t total_chunks = 0;
/// All non-pinned chunks, including ones not assigned to any file. Least recently used is begin().
boost::intrusive::list<PageChunk, boost::intrusive::base_hook<PageChunkLRUListHook>, boost::intrusive::constant_time_size<true>> lru;
HashMap<PageCacheKey, PageChunk *> chunk_by_key;
/// Get a usable chunk, doing eviction or allocation if needed.
/// Caller is responsible for clearing pages_populated.
PageChunk * getFreeChunk(std::unique_lock<std::mutex> & /* global_mutex */);
void addMmap(std::unique_lock<std::mutex> & /* global_mutex */);
void evictChunk(PageChunk * chunk, std::unique_lock<std::mutex> & /* global_mutex */);
void removeRef(PageChunk * chunk) noexcept;
/// These may run in parallel with getFreeChunk(), so be very careful about which fields of the PageChunk we touch here.
void sendChunkToLimbo(PageChunk * chunk, std::unique_lock<std::mutex> & /* chunk_mutex */) const noexcept;
/// Returns {pages_restored, pages_evicted}.
std::pair<size_t, size_t> restoreChunkFromLimbo(PageChunk * chunk, std::unique_lock<std::mutex> & /* chunk_mutex */) const noexcept;
};
using PageCachePtr = std::shared_ptr<PageCache>;
}

View File

@ -63,6 +63,15 @@
M(MarkCacheMisses, "Number of times an entry has not been found in the mark cache, so we had to load a mark file in memory, which is a costly operation, adding to query latency.") \
M(QueryCacheHits, "Number of times a query result has been found in the query cache (and query computation was avoided). Only updated for SELECT queries with SETTING use_query_cache = 1.") \
M(QueryCacheMisses, "Number of times a query result has not been found in the query cache (and required query computation). Only updated for SELECT queries with SETTING use_query_cache = 1.") \
/* Each page cache chunk access increments exactly one of the following 5 PageCacheChunk* counters. */ \
/* Something like hit rate: (PageCacheChunkShared + PageCacheChunkDataHits) / [sum of all 5]. */ \
M(PageCacheChunkMisses, "Number of times a chunk has not been found in the userspace page cache.") \
M(PageCacheChunkShared, "Number of times a chunk has been found in the userspace page cache, already in use by another thread.") \
M(PageCacheChunkDataHits, "Number of times a chunk has been found in the userspace page cache, not in use, with all pages intact.") \
M(PageCacheChunkDataPartialHits, "Number of times a chunk has been found in the userspace page cache, not in use, but some of its pages were evicted by the OS.") \
M(PageCacheChunkDataMisses, "Number of times a chunk has been found in the userspace page cache, not in use, but all its pages were evicted by the OS.") \
M(PageCacheBytesUnpinnedRoundedToPages, "Total size of populated pages in chunks that became evictable in PageCache. Rounded up to whole pages.") \
M(PageCacheBytesUnpinnedRoundedToHugePages, "See PageCacheBytesUnpinnedRoundedToPages, but rounded to huge pages. Use the ratio between the two as a measure of memory waste from using huge pages.") \
M(CreatedReadBufferOrdinary, "Number of times ordinary read buffer was created for reading data (while choosing among other read methods).") \
M(CreatedReadBufferDirectIO, "Number of times a read buffer with O_DIRECT was created for reading data (while choosing among other read methods).") \
M(CreatedReadBufferDirectIOFailed, "Number of times a read buffer with O_DIRECT was attempted to be created for reading data (while choosing among other read methods), but the OS did not allow it (due to lack of filesystem support or other reasons) and we fallen back to the ordinary reading method.") \

View File

@ -70,6 +70,15 @@ static constexpr auto DBMS_DEFAULT_MAX_QUERY_SIZE = 262144;
/// Max depth of hierarchical dictionary
static constexpr auto DBMS_HIERARCHICAL_DICTIONARY_MAX_DEPTH = 1000;
#ifdef OS_LINUX
#define DBMS_DEFAULT_PAGE_CACHE_USE_MADV_FREE true
#else
/// On Mac OS, MADV_FREE is not lazy, so page_cache_use_madv_free should be disabled.
/// On FreeBSD, it may work but we haven't tested it.
#define DBMS_DEFAULT_PAGE_CACHE_USE_MADV_FREE false
#endif
/// Default maximum (total and entry) sizes and policies of various caches
static constexpr auto DEFAULT_UNCOMPRESSED_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE = 0_MiB;

View File

@ -65,7 +65,7 @@ namespace DB
M(UInt64, max_concurrent_insert_queries, 0, "Maximum number of concurrently INSERT queries. Zero means unlimited.", 0) \
M(UInt64, max_concurrent_select_queries, 0, "Maximum number of concurrently SELECT queries. Zero means unlimited.", 0) \
\
M(Double, cache_size_to_ram_max_ratio, 0.5, "Set cache size ro RAM max ratio. Allows to lower cache size on low-memory systems.", 0) \
M(Double, cache_size_to_ram_max_ratio, 0.5, "Set cache size to RAM max ratio. Allows to lower cache size on low-memory systems.", 0) \
M(String, uncompressed_cache_policy, DEFAULT_UNCOMPRESSED_CACHE_POLICY, "Uncompressed cache policy name.", 0) \
M(UInt64, uncompressed_cache_size, DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE, "Size of cache for uncompressed blocks. Zero means disabled.", 0) \
M(Double, uncompressed_cache_size_ratio, DEFAULT_UNCOMPRESSED_CACHE_SIZE_RATIO, "The size of the protected queue in the uncompressed cache relative to the cache's total size.", 0) \
@ -78,6 +78,11 @@ namespace DB
M(String, index_mark_cache_policy, DEFAULT_INDEX_MARK_CACHE_POLICY, "Secondary index mark cache policy name.", 0) \
M(UInt64, index_mark_cache_size, DEFAULT_INDEX_MARK_CACHE_MAX_SIZE, "Size of cache for secondary index marks. Zero means disabled.", 0) \
M(Double, index_mark_cache_size_ratio, DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO, "The size of the protected queue in the secondary index mark cache relative to the cache's total size.", 0) \
M(UInt64, page_cache_chunk_size, 2 << 20, "Bytes per chunk in userspace page cache. Rounded up to a multiple of page size (typically 4 KiB) or huge page size (typically 2 MiB, only if page_cache_use_thp is enabled).", 0) \
M(UInt64, page_cache_mmap_size, 1 << 30, "Bytes per memory mapping in userspace page cache. Not important.", 0) \
M(UInt64, page_cache_size, 10ul << 30, "Amount of virtual memory to map for userspace page cache. If page_cache_use_madv_free is enabled, it's recommended to set this higher than the machine's RAM size. Use 0 to disable userspace page cache.", 0) \
M(Bool, page_cache_use_madv_free, DBMS_DEFAULT_PAGE_CACHE_USE_MADV_FREE, "If true, the userspace page cache will allow the OS to automatically reclaim memory from the cache on memory pressure (using MADV_FREE).", 0) \
M(Bool, page_cache_use_transparent_huge_pages, true, "Userspace will attempt to use transparent huge pages on Linux. This is best-effort.", 0) \
M(UInt64, mmap_cache_size, DEFAULT_MMAP_CACHE_MAX_SIZE, "A cache for mmapped files.", 0) \
\
M(Bool, disable_internal_dns_cache, false, "Disable internal DNS caching at all.", 0) \

View File

@ -777,6 +777,10 @@ class IColumn;
M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \
M(UInt64, filesystem_cache_segments_batch_size, 20, "Limit on size of a single batch of file segments that a read buffer can request from cache. Too low value will lead to excessive requests to cache, too large may slow down eviction from cache", 0) \
\
M(Bool, use_page_cache_for_disks_without_file_cache, false, "Use userspace page cache for remote disks that don't have filesystem cache enabled.", 0) \
M(Bool, read_from_page_cache_if_exists_otherwise_bypass_cache, false, "Use userspace page cache in passive mode, similar to read_from_filesystem_cache_if_exists_otherwise_bypass_cache.", 0) \
M(Bool, page_cache_inject_eviction, false, "Userspace page cache will sometimes invalidate some pages at random. Intended for testing.", 0) \
\
M(Bool, load_marks_asynchronously, false, "Load MergeTree marks asynchronously", 0) \
M(Bool, enable_filesystem_read_prefetches_log, false, "Log to system.filesystem prefetch_log during query. Should be used only for testing or debugging, not recommended to be turned on by default", 0) \
M(Bool, allow_prefetched_read_pool_for_remote_filesystem, true, "Prefer prefetched threadpool if all parts are on remote filesystem", 0) \

View File

@ -114,6 +114,9 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"default_normal_view_sql_security", "INVOKER", "INVOKER", "Allows to set default `SQL SECURITY` option while creating a normal view"},
{"mysql_map_string_to_text_in_show_columns", false, true, "Reduce the configuration effort to connect ClickHouse with BI tools."},
{"mysql_map_fixed_string_to_text_in_show_columns", false, true, "Reduce the configuration effort to connect ClickHouse with BI tools."},
{"use_page_cache_for_disks_without_file_cache", false, false, "Added userspace page cache"},
{"read_from_page_cache_if_exists_otherwise_bypass_cache", false, false, "Added userspace page cache"},
{"page_cache_inject_eviction", false, false, "Added userspace page cache"},
}},
{"24.1", {{"print_pretty_type_names", false, true, "Better user experience."},
{"input_format_json_read_bools_as_strings", false, true, "Allow to read bools as strings in JSON formats by default"},

View File

@ -129,6 +129,7 @@ void AsynchronousBoundedReadBuffer::setReadUntilPosition(size_t position)
/// new read until position is after the current position in the working buffer
file_offset_of_buffer_end = position;
working_buffer.resize(working_buffer.size() - (file_offset_of_buffer_end - position));
pos = std::min(pos, working_buffer.end());
}
else
{
@ -235,9 +236,6 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
/// In case of multiple files for the same file in clickhouse (i.e. log family)
/// file_offset_of_buffer_end will not match getImplementationBufferOffset()
/// so we use [impl->getImplementationBufferOffset(), impl->getFileSize()]
chassert(file_offset_of_buffer_end <= impl->getFileSize());
if (read_until_position && (file_offset_of_buffer_end > *read_until_position))
@ -264,7 +262,7 @@ off_t AsynchronousBoundedReadBuffer::seek(off_t offset, int whence)
size_t new_pos;
if (whence == SEEK_SET)
{
assert(offset >= 0);
chassert(offset >= 0);
new_pos = offset;
}
else if (whence == SEEK_CUR)
@ -290,8 +288,8 @@ off_t AsynchronousBoundedReadBuffer::seek(off_t offset, int whence)
/// Position is still inside the buffer.
/// Probably it is at the end of the buffer - then we will load data on the following 'next' call.
pos = working_buffer.end() - file_offset_of_buffer_end + new_pos;
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
chassert(pos >= working_buffer.begin());
chassert(pos <= working_buffer.end());
return new_pos;
}
@ -317,7 +315,7 @@ off_t AsynchronousBoundedReadBuffer::seek(off_t offset, int whence)
break;
}
assert(!prefetch_future.valid());
chassert(!prefetch_future.valid());
/// First reset the buffer so the next read will fetch new data to the buffer.
resetWorkingBuffer();

View File

@ -1215,7 +1215,7 @@ size_t CachedOnDiskReadBufferFromFile::getRemainingSizeToRead()
void CachedOnDiskReadBufferFromFile::setReadUntilPosition(size_t position)
{
if (!allow_seeks_after_first_read)
if (initialized && !allow_seeks_after_first_read)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Method `setReadUntilPosition()` not allowed");
if (read_until_position == position)

View File

@ -5,6 +5,7 @@
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
#include <Disks/ObjectStorages/Cached/CachedObjectStorage.h>
#include <Interpreters/Cache/FileCache.h>
#include <IO/CachedInMemoryReadBufferFromFile.h>
#include <IO/ReadSettings.h>
#include <IO/SwapHelper.h>
#include <Interpreters/FilesystemCacheLog.h>
@ -16,12 +17,16 @@ using namespace DB;
namespace
{
bool withCache(const ReadSettings & settings)
bool withFileCache(const ReadSettings & settings)
{
return settings.remote_fs_cache && settings.enable_filesystem_cache
&& (!CurrentThread::getQueryId().empty() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache
|| !settings.avoid_readthrough_cache_outside_query_context);
}
bool withPageCache(const ReadSettings & settings, bool with_file_cache)
{
return settings.page_cache && !with_file_cache && settings.use_page_cache_for_disks_without_file_cache;
}
}
namespace DB
@ -34,7 +39,7 @@ namespace ErrorCodes
size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_t file_size)
{
/// Only when cache is used we could download bigger portions of FileSegments than what we actually gonna read within particular task.
if (!withCache(settings))
if (!withFileCache(settings))
return settings.remote_fs_buffer_size;
/// Buffers used for prefetch and pre-download better to have enough size, but not bigger than the whole file.
@ -44,27 +49,30 @@ size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const std::string & cache_path_prefix_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
bool use_external_buffer_)
: ReadBufferFromFileBase(
use_external_buffer_ ? 0 : chooseBufferSizeForRemoteReading(settings_, getTotalSize(blobs_to_read_)), nullptr, 0)
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : chooseBufferSizeForRemoteReading(
settings_, getTotalSize(blobs_to_read_)), nullptr, 0)
, settings(settings_)
, blobs_to_read(blobs_to_read_)
, read_buffer_creator(std::move(read_buffer_creator_))
, cache_path_prefix(cache_path_prefix_)
, cache_log(settings.enable_filesystem_cache_log ? cache_log_ : nullptr)
, query_id(CurrentThread::getQueryId())
, use_external_buffer(use_external_buffer_)
, with_cache(withCache(settings))
, with_file_cache(withFileCache(settings))
, with_page_cache(withPageCache(settings, with_file_cache))
, log(getLogger("ReadBufferFromRemoteFSGather"))
{
if (!blobs_to_read.empty())
current_object = blobs_to_read.front();
}
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object, size_t start_offset)
{
if (current_buf && !with_cache)
if (current_buf && !with_file_cache)
{
appendUncachedReadInfo();
}
@ -72,30 +80,45 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
current_object = object;
const auto & object_path = object.remote_path;
size_t current_read_until_position = read_until_position ? read_until_position : object.bytes_size;
auto current_read_buffer_creator = [=, this]() { return read_buffer_creator(object_path, current_read_until_position); };
std::unique_ptr<ReadBufferFromFileBase> buf;
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD
if (with_cache)
if (with_file_cache)
{
auto cache_key = settings.remote_fs_cache->createKeyForPath(object_path);
return std::make_shared<CachedOnDiskReadBufferFromFile>(
buf = std::make_unique<CachedOnDiskReadBufferFromFile>(
object_path,
cache_key,
settings.remote_fs_cache,
FileCache::getCommonUser(),
std::move(current_read_buffer_creator),
[=, this]() { return read_buffer_creator(/* restricted_seek */true, object_path); },
settings,
query_id,
object.bytes_size,
/* allow_seeks */false,
/* use_external_buffer */true,
read_until_position ? std::optional<size_t>(read_until_position) : std::nullopt,
/* read_until_position */std::nullopt,
cache_log);
}
#endif
return current_read_buffer_creator();
/// Can't wrap CachedOnDiskReadBufferFromFile in CachedInMemoryReadBufferFromFile because the
/// former doesn't support seeks.
if (with_page_cache && !buf)
{
auto inner = read_buffer_creator(/* restricted_seek */false, object_path);
auto cache_key = FileChunkAddress { .path = cache_path_prefix + object_path };
buf = std::make_unique<CachedInMemoryReadBufferFromFile>(
cache_key, settings.page_cache, std::move(inner), settings);
}
if (!buf)
buf = read_buffer_creator(/* restricted_seek */true, object_path);
if (read_until_position > start_offset && read_until_position < start_offset + object.bytes_size)
buf->setReadUntilPosition(read_until_position - start_offset);
return buf;
}
void ReadBufferFromRemoteFSGather::appendUncachedReadInfo()
@ -124,12 +147,12 @@ void ReadBufferFromRemoteFSGather::initialize()
return;
/// One clickhouse file can be split into multiple files in remote fs.
auto current_buf_offset = file_offset_of_buffer_end;
size_t start_offset = 0;
for (size_t i = 0; i < blobs_to_read.size(); ++i)
{
const auto & object = blobs_to_read[i];
if (object.bytes_size > current_buf_offset)
if (start_offset + object.bytes_size > file_offset_of_buffer_end)
{
LOG_TEST(log, "Reading from file: {} ({})", object.remote_path, object.local_path);
@ -137,14 +160,14 @@ void ReadBufferFromRemoteFSGather::initialize()
if (!current_buf || current_buf_idx != i)
{
current_buf_idx = i;
current_buf = createImplementationBuffer(object);
current_buf = createImplementationBuffer(object, start_offset);
}
current_buf->seek(current_buf_offset, SEEK_SET);
current_buf->seek(file_offset_of_buffer_end - start_offset, SEEK_SET);
return;
}
current_buf_offset -= object.bytes_size;
start_offset += object.bytes_size;
}
current_buf_idx = blobs_to_read.size();
current_buf = nullptr;
@ -171,14 +194,14 @@ bool ReadBufferFromRemoteFSGather::nextImpl()
bool ReadBufferFromRemoteFSGather::moveToNextBuffer()
{
/// If there is no available buffers - nothing to read.
if (current_buf_idx + 1 >= blobs_to_read.size())
if (current_buf_idx + 1 >= blobs_to_read.size() || (read_until_position && file_offset_of_buffer_end >= read_until_position))
return false;
++current_buf_idx;
const auto & object = blobs_to_read[current_buf_idx];
LOG_TEST(log, "Reading from next file: {} ({})", object.remote_path, object.local_path);
current_buf = createImplementationBuffer(object);
current_buf = createImplementationBuffer(object, file_offset_of_buffer_end);
return true;
}
@ -263,7 +286,7 @@ off_t ReadBufferFromRemoteFSGather::seek(off_t offset, int whence)
ReadBufferFromRemoteFSGather::~ReadBufferFromRemoteFSGather()
{
if (!with_cache)
if (!with_file_cache)
appendUncachedReadInfo();
}

View File

@ -21,11 +21,12 @@ class ReadBufferFromRemoteFSGather final : public ReadBufferFromFileBase
friend class ReadIndirectBufferFromRemoteFS;
public:
using ReadBufferCreator = std::function<std::unique_ptr<ReadBufferFromFileBase>(const std::string & path, size_t read_until_position)>;
using ReadBufferCreator = std::function<std::unique_ptr<ReadBufferFromFileBase>(bool restricted_seek, const std::string & path)>;
ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const std::string & cache_path_prefix_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
bool use_external_buffer_);
@ -53,7 +54,7 @@ public:
bool isContentCached(size_t offset, size_t size) override;
private:
SeekableReadBufferPtr createImplementationBuffer(const StoredObject & object);
SeekableReadBufferPtr createImplementationBuffer(const StoredObject & object, size_t start_offset);
bool nextImpl() override;
@ -70,10 +71,12 @@ private:
const ReadSettings settings;
const StoredObjects blobs_to_read;
const ReadBufferCreator read_buffer_creator;
const std::string cache_path_prefix;
const std::shared_ptr<FilesystemCacheLog> cache_log;
const String query_id;
const bool use_external_buffer;
const bool with_cache;
const bool with_file_cache;
const bool with_page_cache;
size_t read_until_position = 0;
size_t file_offset_of_buffer_end = 0;

View File

@ -152,6 +152,8 @@ IAsynchronousReader::Result ThreadPoolRemoteFSReader::execute(Request request, b
IAsynchronousReader::Result read_result;
if (result)
{
chassert(reader.buffer().begin() == request.buf);
chassert(reader.buffer().end() <= request.buf + request.size);
read_result.size = reader.buffer().size();
read_result.offset = reader.offset();
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size);

View File

@ -29,6 +29,9 @@ private:
class RemoteFSFileDescriptor : public IAsynchronousReader::IFileDescriptor
{
public:
/// `reader_` implementation must ensure that next() places data at the start of internal_buffer,
/// even if there was previously a seek. I.e. seek() shouldn't leave pending data (no short seek
/// optimization), and nextImpl() shouldn't assign nextimpl_working_buffer_offset.
explicit RemoteFSFileDescriptor(
SeekableReadBuffer & reader_,
std::shared_ptr<AsyncReadCounters> async_read_counters_)

View File

@ -206,7 +206,7 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
(const std::string & path, size_t read_until_position) -> std::unique_ptr<ReadBufferFromFileBase>
(bool restricted_seek, const std::string & path) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client.get(),
@ -215,8 +215,7 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries,
/* use_external_buffer */true,
/* restricted_seek */true,
read_until_position);
restricted_seek);
};
switch (read_settings.remote_fs_method)
@ -226,16 +225,17 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"azure:",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */false);
}
case RemoteFSReadMethod::threadpool:
{
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"azure:",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */true);

View File

@ -527,10 +527,9 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
auto storage_objects = metadata_storage->getStorageObjects(path);
const auto storage_objects = metadata_storage->getStorageObjects(path);
const bool file_can_be_empty = !file_size.has_value() || *file_size == 0;
if (storage_objects.empty() && file_can_be_empty)
return std::make_unique<ReadBufferFromEmptyFile>();

View File

@ -60,7 +60,7 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLI
auto disk_read_settings = patchSettings(read_settings);
auto read_buffer_creator =
[this, disk_read_settings]
(const std::string & path, size_t /* read_until_position */) -> std::unique_ptr<ReadBufferFromFileBase>
(bool /* restricted_seek */, const std::string & path) -> std::unique_ptr<ReadBufferFromFileBase>
{
size_t begin_of_path = path.find('/', path.find("//") + 2);
auto hdfs_path = path.substr(begin_of_path);
@ -71,7 +71,7 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLI
};
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), objects, disk_read_settings, nullptr, /* use_external_buffer */false);
std::move(read_buffer_creator), objects, "hdfs:", disk_read_settings, nullptr, /* use_external_buffer */false);
}
std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOLINT

View File

@ -47,7 +47,7 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
auto modified_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator =
[=] (const std::string & file_path, size_t /* read_until_position */)
[=] (bool /* restricted_seek */, const std::string & file_path)
-> std::unique_ptr<ReadBufferFromFileBase>
{
return createReadBufferFromFileBase(file_path, modified_settings, read_hint, file_size);
@ -58,13 +58,13 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
case RemoteFSReadMethod::read:
{
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), objects, modified_settings,
std::move(read_buffer_creator), objects, "file:", modified_settings,
global_context->getFilesystemCacheLog(), /* use_external_buffer */false);
}
case RemoteFSReadMethod::threadpool:
{
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), objects, modified_settings,
std::move(read_buffer_creator), objects, "file:", modified_settings,
global_context->getFilesystemCacheLog(), /* use_external_buffer */true);
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);

View File

@ -171,7 +171,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
(const std::string & path, size_t read_until_position) -> std::unique_ptr<ReadBufferFromFileBase>
(bool restricted_seek, const std::string & path) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromS3>(
client.get(),
@ -182,8 +182,8 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
disk_read_settings,
/* use_external_buffer */true,
/* offset */0,
read_until_position,
/* restricted_seek */true);
/* read_until_position */0,
restricted_seek);
};
switch (read_settings.remote_fs_method)
@ -193,16 +193,17 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"s3:" + uri.bucket + "/",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */false);
}
case RemoteFSReadMethod::threadpool:
{
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"s3:" + uri.bucket + "/",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */true);

View File

@ -252,14 +252,13 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
{
auto read_buffer_creator =
[this, read_settings]
(const std::string & path_, size_t read_until_position) -> std::unique_ptr<ReadBufferFromFileBase>
(bool /* restricted_seek */, const std::string & path_) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromWebServer>(
fs::path(url) / path_,
getContext(),
read_settings,
/* use_external_buffer */true,
read_until_position);
/* use_external_buffer */true);
};
auto global_context = Context::getGlobalContextInstance();
@ -271,6 +270,7 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{object},
"url:" + url + "/",
read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */false);
@ -280,6 +280,7 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{object},
"url:" + url + "/",
read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */true);

View File

@ -54,6 +54,9 @@ public:
struct Result
{
/// The read data is at [buf + offset, buf + size), where `buf` is from Request struct.
/// (Notice that `offset` is included in `size`.)
/// size
/// Less than requested amount of data can be returned.
/// If size is zero - the file has ended.

View File

@ -60,6 +60,9 @@ public:
BufferBase(Position ptr, size_t size, size_t offset)
: pos(ptr + offset), working_buffer(ptr, ptr + size), internal_buffer(ptr, ptr + size) {}
/// Assign the buffers and pos.
/// Be careful when calling this from ReadBuffer::nextImpl() implementations: `offset` is
/// effectively ignored because ReadBuffer::next() reassigns `pos`.
void set(Position ptr, size_t size, size_t offset)
{
internal_buffer = Buffer(ptr, ptr + size);

View File

@ -0,0 +1,188 @@
#include "CachedInMemoryReadBufferFromFile.h"
#include <IO/SwapHelper.h>
#include <base/scope_guard.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNEXPECTED_END_OF_FILE;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
CachedInMemoryReadBufferFromFile::CachedInMemoryReadBufferFromFile(
FileChunkAddress cache_key_, PageCachePtr cache_, std::unique_ptr<ReadBufferFromFileBase> in_, const ReadSettings & settings_)
: ReadBufferFromFileBase(0, nullptr, 0, in_->getFileSize()), cache_key(cache_key_), cache(cache_), settings(settings_), in(std::move(in_))
, read_until_position(file_size.value())
{
cache_key.offset = 0;
}
String CachedInMemoryReadBufferFromFile::getFileName() const
{
return in->getFileName();
}
off_t CachedInMemoryReadBufferFromFile::seek(off_t off, int whence)
{
if (whence != SEEK_SET)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET mode is allowed.");
size_t offset = static_cast<size_t>(off);
if (offset > file_size.value())
throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", off);
if (offset >= file_offset_of_buffer_end - working_buffer.size() && offset <= file_offset_of_buffer_end)
{
pos = working_buffer.end() - (file_offset_of_buffer_end - offset);
chassert(getPosition() == off);
return off;
}
resetWorkingBuffer();
file_offset_of_buffer_end = offset;
chunk.reset();
chassert(getPosition() == off);
return off;
}
off_t CachedInMemoryReadBufferFromFile::getPosition()
{
return file_offset_of_buffer_end - available();
}
size_t CachedInMemoryReadBufferFromFile::getFileOffsetOfBufferEnd() const
{
return file_offset_of_buffer_end;
}
void CachedInMemoryReadBufferFromFile::setReadUntilPosition(size_t position)
{
read_until_position = position;
if (position < static_cast<size_t>(getPosition()))
{
resetWorkingBuffer();
chunk.reset();
}
else if (position < file_offset_of_buffer_end)
{
size_t diff = file_offset_of_buffer_end - position;
working_buffer.resize(working_buffer.size() - diff);
file_offset_of_buffer_end -= diff;
}
}
void CachedInMemoryReadBufferFromFile::setReadUntilEnd()
{
setReadUntilPosition(file_size.value());
}
bool CachedInMemoryReadBufferFromFile::nextImpl()
{
chassert(read_until_position <= file_size.value());
if (file_offset_of_buffer_end >= read_until_position)
return false;
if (chunk.has_value() && file_offset_of_buffer_end >= cache_key.offset + cache->chunkSize())
{
chassert(file_offset_of_buffer_end == cache_key.offset + cache->chunkSize());
chunk.reset();
}
if (!chunk.has_value())
{
cache_key.offset = file_offset_of_buffer_end / cache->chunkSize() * cache->chunkSize();
chunk = cache->getOrSet(cache_key.hash(), settings.read_from_page_cache_if_exists_otherwise_bypass_cache, settings.page_cache_inject_eviction);
size_t chunk_size = std::min(cache->chunkSize(), file_size.value() - cache_key.offset);
std::unique_lock download_lock(chunk->getChunk()->state.download_mutex);
if (!chunk->isPrefixPopulated(chunk_size))
{
/// A few things could be improved here, which may or may not be worth the added complexity:
/// * If the next file chunk is in cache, use in->setReadUntilPosition() to limit the read to
/// just one chunk. More generally, look ahead in the cache to count how many next chunks
/// need to be downloaded. (Up to some limit? And avoid changing `in`'s until-position if
/// it's already reasonable; otherwise we'd increase it by one chunk every chunk, discarding
/// a half-completed HTTP request every time.)
/// * If only a subset of pages are missing from this chunk, download only them,
/// with some threshold for avoiding short seeks.
/// In particular, if a previous download failed in the middle of the chunk, we could
/// resume from that position instead of from the beginning of the chunk.
/// (It's also possible in principle that a proper subset of chunk's pages was reclaimed
/// by the OS. But, for performance purposes, we should completely ignore that, because
/// (a) PageCache normally uses 2 MiB transparent huge pages and has just one such page
/// per chunk, and (b) even with 4 KiB pages partial chunk eviction is extremely rare.)
/// * If our [position, read_until_position) covers only part of the chunk, we could download
/// just that part. (Which would be bad if someone else needs the rest of the chunk and has
/// to do a whole new HTTP request to get it. Unclear what the policy should be.)
/// * Instead of doing in->next() in a loop until we get the whole chunk, we could return the
/// results as soon as in->next() produces them.
/// (But this would make the download_mutex situation much more complex, similar to the
/// FileSegment::State::PARTIALLY_DOWNLOADED and FileSegment::setRemoteFileReader() stuff.)
Buffer prev_in_buffer = in->internalBuffer();
SCOPE_EXIT({ in->set(prev_in_buffer.begin(), prev_in_buffer.size()); });
size_t pos = 0;
while (pos < chunk_size)
{
char * piece_start = chunk->getChunk()->data + pos;
size_t piece_size = chunk_size - pos;
in->set(piece_start, piece_size);
LOG_INFO(&Poco::Logger::get("asdqwe"), "this {:x}, in {:x}, path {}, size {}, offset {:x}, pos {:x}", reinterpret_cast<uint64_t>(this), reinterpret_cast<uint64_t>(in.get()), cache_key.path, file_size.value(), cache_key.offset, pos);
if (pos == 0)
in->seek(cache_key.offset, SEEK_SET);
else
chassert(!in->available());
if (in->eof())
throw Exception(ErrorCodes::UNEXPECTED_END_OF_FILE, "File {} ended after {} bytes, but we expected {}",
getFileName(), cache_key.offset + pos, file_size.value());
chassert(in->position() >= piece_start && in->buffer().end() <= piece_start + piece_size);
chassert(in->getPosition() == static_cast<off_t>(cache_key.offset + pos));
size_t n = in->available();
chassert(n);
if (in->position() != piece_start)
memmove(piece_start, in->position(), n);
in->position() += n;
pos += n;
LOG_INFO(&Poco::Logger::get("asdqwe"), "this {:x}, got {:x} bytes", reinterpret_cast<uint64_t>(this), n);
}
chunk->markPrefixPopulated(chunk_size);
}
}
nextimpl_working_buffer_offset = file_offset_of_buffer_end - cache_key.offset;
working_buffer = Buffer(
chunk->getChunk()->data,
chunk->getChunk()->data + std::min(chunk->getChunk()->size, read_until_position - cache_key.offset));
pos = working_buffer.begin() + nextimpl_working_buffer_offset;
if (!internal_buffer.empty())
{
/// We were given an external buffer to read into. Copy the data into it.
/// Would be nice to avoid this copy, somehow, maybe by making ReadBufferFromRemoteFSGather
/// and AsynchronousBoundedReadBuffer explicitly aware of the page cache.
size_t n = std::min(available(), internal_buffer.size());
memcpy(internal_buffer.begin(), pos, n);
working_buffer = Buffer(internal_buffer.begin(), internal_buffer.begin() + n);
pos = working_buffer.begin();
nextimpl_working_buffer_offset = 0;
}
file_offset_of_buffer_end += available();
return true;
}
}

View File

@ -0,0 +1,41 @@
#pragma once
#include <Common/PageCache.h>
#include <IO/ReadBufferFromFileBase.h>
namespace DB
{
class CachedInMemoryReadBufferFromFile : public ReadBufferFromFileBase
{
public:
/// `in_` must support using external buffer. I.e. we assign its internal_buffer before each next()
/// call and expect the read data to be put into that buffer.
/// `in_` should be seekable and should be able to read the whole file from 0 to in_->getFileSize();
/// if you set `in_`'s read-until-position bypassing CachedInMemoryReadBufferFromFile then
/// CachedInMemoryReadBufferFromFile will break.
CachedInMemoryReadBufferFromFile(FileChunkAddress cache_key_, PageCachePtr cache_, std::unique_ptr<ReadBufferFromFileBase> in_, const ReadSettings & settings_);
String getFileName() const override;
off_t seek(off_t off, int whence) override;
off_t getPosition() override;
size_t getFileOffsetOfBufferEnd() const override;
bool supportsRightBoundedReads() const override { return true; }
void setReadUntilPosition(size_t position) override;
void setReadUntilEnd() override;
private:
FileChunkAddress cache_key; // .offset is offset of `chunk` start
PageCachePtr cache;
ReadSettings settings;
std::unique_ptr<ReadBufferFromFileBase> in;
size_t file_offset_of_buffer_end = 0;
size_t read_until_position;
std::optional<PinnedPageChunk> chunk;
bool nextImpl() override;
};
}

View File

@ -225,11 +225,22 @@ public:
* - seek() to a position above the until position (even if you setReadUntilPosition() to a
* higher value right after the seek!),
*
* Typical implementations discard any current buffers and connections, even if the position is
* adjusted only a little.
* Implementations are recommended to:
* - Allow the read-until-position to go below current position, e.g.:
* // Read block [300, 400)
* setReadUntilPosition(400);
* seek(300);
* next();
* // Read block [100, 200)
* setReadUntilPosition(200); // oh oh, this is below the current position, but should be allowed
* seek(100); // but now everything's fine again
* next();
* // (Swapping the order of seek and setReadUntilPosition doesn't help: then it breaks if the order of blocks is reversed.)
* - Check if new read-until-position value is equal to the current value and do nothing in this case,
* so that the caller doesn't have to.
*
* Typical usage is to call it right after creating the ReadBuffer, before it started doing any
* work.
* Typical implementations discard any current buffers and connections when the
* read-until-position changes even by a small (nonzero) amount.
*/
virtual void setReadUntilPosition(size_t /* position */) {}

View File

@ -61,6 +61,7 @@ enum class RemoteFSReadMethod
};
class MMappedFileCache;
class PageCache;
struct ReadSettings
{
@ -102,6 +103,12 @@ struct ReadSettings
bool avoid_readthrough_cache_outside_query_context = true;
size_t filesystem_cache_segments_batch_size = 20;
//asdqwe assign these two
bool use_page_cache_for_disks_without_file_cache = false;
bool read_from_page_cache_if_exists_otherwise_bypass_cache = false;
bool page_cache_inject_eviction = false;
std::shared_ptr<PageCache> page_cache;
size_t filesystem_cache_max_download_size = (128UL * 1024 * 1024 * 1024);
bool skip_download_if_exceeds_query_cache = true;

View File

@ -17,6 +17,7 @@
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/callOnce.h>
#include <Common/SharedLockGuard.h>
#include <Common/PageCache.h>
#include <Coordination/KeeperDispatcher.h>
#include <Core/BackgroundSchedulePool.h>
#include <Formats/FormatFactory.h>
@ -294,6 +295,7 @@ struct ContextSharedPart : boost::noncopyable
mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices.
mutable MMappedFileCachePtr mmap_cache TSA_GUARDED_BY(mutex); /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
AsynchronousMetrics * asynchronous_metrics TSA_GUARDED_BY(mutex) = nullptr; /// Points to asynchronous metrics
mutable PageCachePtr page_cache TSA_GUARDED_BY(mutex); /// Userspace page cache.
ProcessList process_list; /// Executing queries at the moment.
SessionTracker session_tracker;
GlobalOvercommitTracker global_overcommit_tracker;
@ -1251,7 +1253,7 @@ void Context::setUser(const UUID & user_id_, const std::optional<const std::vect
{
/// Prepare lists of user's profiles, constraints, settings, roles.
/// NOTE: AccessControl::read<User>() and other AccessControl's functions may require some IO work,
/// so Context::getLock() must be unlocked while we're doing this.
/// so Context::getLocalLock() and Context::getGlobalLock() must be unlocked while we're doing this.
auto & access_control = getAccessControl();
auto user = access_control.read<User>(user_id_);
@ -1381,7 +1383,7 @@ void Context::checkAccess(const AccessRightsElements & elements) const { return
std::shared_ptr<const ContextAccess> Context::getAccess() const
{
/// A helper function to collect parameters for calculating access rights, called with Context::getLock() acquired.
/// A helper function to collect parameters for calculating access rights, called with Context::getLocalSharedLock() acquired.
auto get_params = [this]()
{
/// If setUserID() was never called then this must be the global context with the full access.
@ -1408,7 +1410,8 @@ std::shared_ptr<const ContextAccess> Context::getAccess() const
}
/// Calculate new access rights according to the collected parameters.
/// NOTE: AccessControl::getContextAccess() may require some IO work, so Context::getLock() must be unlocked while we're doing this.
/// NOTE: AccessControl::getContextAccess() may require some IO work, so Context::getLocalLock()
/// and Context::getGlobalLock() must be unlocked while we're doing this.
auto res = getAccessControl().getContextAccess(*params);
{
@ -2737,6 +2740,33 @@ void Context::clearUncompressedCache() const
shared->uncompressed_cache->clear();
}
void Context::setPageCache(size_t bytes_per_chunk, size_t bytes_per_mmap, size_t bytes_total, bool use_madv_free, bool use_huge_pages)
{
std::lock_guard lock(shared->mutex);
if (shared->page_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Page cache has been already created.");
shared->page_cache = std::make_shared<PageCache>(bytes_per_chunk, bytes_per_mmap, bytes_total, use_madv_free, use_huge_pages);
}
PageCachePtr Context::getPageCache() const
{
SharedLockGuard lock(shared->mutex);
return shared->page_cache;
}
void Context::dropPageCache() const
{
PageCachePtr cache;
{
SharedLockGuard lock(shared->mutex);
cache = shared->page_cache;
}
if (cache)
cache->dropCache();
}
void Context::setMarkCache(const String & cache_policy, size_t max_cache_size_in_bytes, double size_ratio)
{
std::lock_guard lock(shared->mutex);
@ -5153,6 +5183,11 @@ ReadSettings Context::getReadSettings() const
res.filesystem_cache_max_download_size = settings.filesystem_cache_max_download_size;
res.skip_download_if_exceeds_query_cache = settings.skip_download_if_exceeds_query_cache;
res.page_cache = getPageCache();
res.use_page_cache_for_disks_without_file_cache = settings.use_page_cache_for_disks_without_file_cache;
res.read_from_page_cache_if_exists_otherwise_bypass_cache = settings.read_from_page_cache_if_exists_otherwise_bypass_cache;
res.page_cache_inject_eviction = settings.page_cache_inject_eviction;
res.remote_read_min_bytes_for_seek = settings.remote_read_min_bytes_for_seek;
/// Zero read buffer will not make progress.

View File

@ -79,6 +79,7 @@ class RefreshSet;
class Cluster;
class Compiler;
class MarkCache;
class PageCache;
class MMappedFileCache;
class UncompressedCache;
class ProcessList;
@ -969,6 +970,10 @@ public:
std::shared_ptr<UncompressedCache> getUncompressedCache() const;
void clearUncompressedCache() const;
void setPageCache(size_t bytes_per_chunk, size_t bytes_per_mmap, size_t bytes_total, bool use_madv_free, bool use_huge_pages);
std::shared_ptr<PageCache> getPageCache() const;
void dropPageCache() const;
void setMarkCache(const String & cache_policy, size_t max_cache_size_in_bytes, double size_ratio);
void updateMarkCacheConfiguration(const Poco::Util::AbstractConfiguration & config);
std::shared_ptr<MarkCache> getMarkCache() const;

View File

@ -10,6 +10,7 @@
#include <Common/ShellCommand.h>
#include <Common/CurrentMetrics.h>
#include <Common/FailPoint.h>
#include <Common/PageCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Context.h>
@ -460,6 +461,13 @@ BlockIO InterpreterSystemQuery::execute()
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
}
case Type::DROP_PAGE_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_PAGE_CACHE);
getContext()->dropPageCache();
break;
}
case Type::DROP_SCHEMA_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_SCHEMA_CACHE);
@ -1201,6 +1209,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
case Type::DROP_FILESYSTEM_CACHE:
case Type::SYNC_FILESYSTEM_CACHE:
case Type::DROP_PAGE_CACHE:
case Type::DROP_SCHEMA_CACHE:
case Type::DROP_FORMAT_SCHEMA_CACHE:
case Type::DROP_S3_CLIENT_CACHE:

View File

@ -9,6 +9,8 @@
#include <Interpreters/Cache/QueryCache.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Common/PageCache.h>
#include <Databases/IDatabase.h>
#include <IO/UncompressedCache.h>
@ -77,6 +79,16 @@ void ServerAsynchronousMetrics::updateImpl(TimePoint update_time, TimePoint curr
new_values["MarkCacheFiles"] = { mark_cache->count(), "Total number of mark files cached in the mark cache" };
}
if (auto page_cache = getContext()->getPageCache())
{
auto rss = page_cache->getResidentSetSize();
new_values["PageCacheBytes"] = { rss.page_cache_rss, "Userspace page cache memory usage in bytes" };
new_values["PageCachePinnedBytes"] = { page_cache->getPinnedSize(), "Userspace page cache memory that's currently in use and can't be evicted" };
if (rss.unreclaimable_rss.has_value())
new_values["UnreclaimableRSS"] = { *rss.unreclaimable_rss, "The amount of physical memory used by the server process, in bytes, excluding memory reclaimable by the OS (MADV_FREE)" };
}
if (auto uncompressed_cache = getContext()->getUncompressedCache())
{
new_values["UncompressedCacheBytes"] = { uncompressed_cache->sizeInBytes(),

View File

@ -0,0 +1,267 @@
#include <Common/PageCache.h>
#include <gtest/gtest.h>
#include <thread>
#ifdef OS_LINUX
#include <sys/sysinfo.h>
#endif
using namespace DB;
namespace ProfileEvents
{
extern const Event PageCacheChunkMisses;
extern const Event PageCacheChunkShared;
extern const Event PageCacheChunkDataHits;
extern const Event PageCacheChunkDataPartialHits;
extern const Event PageCacheChunkDataMisses;
}
#define CHECK(x) \
do { \
if (!(x)) \
{ \
std::cerr << "check on line " << __LINE__ << " failed: " << #x << std::endl; \
std::abort(); \
} \
} while (false)
size_t estimateRAMSize()
{
#ifdef OS_LINUX
struct sysinfo info;
int r = sysinfo(&info);
CHECK(r == 0);
return static_cast<size_t>(info.totalram * info.mem_unit);
#else
return 128ul << 30;
#endif
}
/// Do random reads and writes in PageCache from multiple threads, check that the data read matches the data written.
TEST(PageCache, DISABLED_Stress)
{
/// There doesn't seem to be a reasonable way to simulate memory pressure or force the eviction of MADV_FREE-d pages.
/// So we actually map more virtual memory than we have RAM and fill it all up a few times.
/// This takes an eternity (a few minutes), but idk how else to hit MADV_FREE eviction.
/// Expect ~1 GB/s, bottlenecked by page faults.
size_t ram_size = estimateRAMSize();
PageCache cache(2 << 20, 1 << 30, ram_size + ram_size / 10, /* use_madv_free */ true, /* use_huge_pages */ true);
CHECK(cache.getResidentSetSize().page_cache_rss);
const size_t num_keys = static_cast<size_t>(cache.maxChunks() * 1.5);
const size_t pages_per_chunk = cache.chunkSize() / cache.pageSize();
const size_t items_per_page = cache.pageSize() / 8;
const size_t passes = 2;
const size_t step = 20;
const size_t num_threads = 20;
const size_t chunks_touched = num_keys * passes * num_threads / step;
std::atomic<size_t> progress {0};
std::atomic<size_t> threads_finished {0};
std::atomic<size_t> total_racing_writes {0};
auto thread_func = [&]
{
pcg64 rng(randomSeed());
std::vector<PinnedPageChunk> pinned;
/// Stats.
size_t racing_writes = 0;
for (size_t i = 0; i < num_keys * passes; i += step)
{
progress += 1;
/// Touch the chunks sequentially + noise (to increase interference across threads), or at random 10% of the time.
size_t key_idx;
if (rng() % 10 == 0)
key_idx = std::uniform_int_distribution<size_t>(0, num_keys - 1)(rng);
else
key_idx = (i + std::uniform_int_distribution<size_t>(0, num_keys / 1000)(rng)) % num_keys;
/// For some keys, always use detached_if_missing = true and check that cache always misses.
bool key_detached_if_missing = key_idx % 100 == 42;
bool detached_if_missing = key_detached_if_missing || i % 101 == 42;
PageCacheKey key = key_idx * 0xcafebabeb0bad00dul; // a simple reversible hash (the constant can be any odd number)
PinnedPageChunk chunk = cache.getOrSet(key, detached_if_missing, /* inject_eviction */ false);
if (key_detached_if_missing)
CHECK(!chunk.getChunk()->pages_populated.any());
for (size_t page_idx = 0; page_idx < pages_per_chunk; ++page_idx)
{
bool populated = chunk.getChunk()->pages_populated.get(page_idx);
/// Generate page contents deterministically from key and page index.
size_t start = key_idx * page_idx;
if (start % 37 == 13)
{
/// Leave ~1/37 of the pages unpopulated.
CHECK(!populated);
}
else
{
/// We may write/read the same memory from multiple threads in parallel here.
std::atomic<size_t> * items = reinterpret_cast<std::atomic<size_t> *>(chunk.getChunk()->data + cache.pageSize() * page_idx);
if (populated)
{
for (size_t j = 0; j < items_per_page; ++j)
CHECK(items[j].load(std::memory_order_relaxed) == start + j);
}
else
{
for (size_t j = 0; j < items_per_page; ++j)
items[j].store(start + j, std::memory_order_relaxed);
if (!chunk.markPagePopulated(page_idx))
racing_writes += 1;
}
}
}
pinned.push_back(std::move(chunk));
CHECK(cache.getPinnedSize() >= cache.chunkSize());
/// Unpin 2 chunks on average.
while (rng() % 3 != 0 && !pinned.empty())
{
size_t idx = rng() % pinned.size();
if (idx != pinned.size() - 1)
pinned[idx] = std::move(pinned.back());
pinned.pop_back();
}
}
total_racing_writes += racing_writes;
threads_finished += 1;
};
std::cout << fmt::format("doing {:.1f} passes over {:.1f} GiB of virtual memory\nthis will take a few minutes, progress printed every 10 seconds",
chunks_touched * 1. / cache.maxChunks(), cache.maxChunks() * cache.chunkSize() * 1. / (1ul << 30)) << std::endl;
auto start_time = std::chrono::steady_clock::now();
std::vector<std::thread> threads;
for (size_t i = 0; i < num_threads; ++i)
threads.emplace_back(thread_func);
for (size_t poll = 0;; ++poll)
{
if (threads_finished == num_threads)
break;
if (poll % 100 == 0)
std::cout << fmt::format("{:.3f}%", progress.load() * 100. / num_keys / passes / num_threads * step) << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
for (std::thread & t : threads)
t.join();
auto end_time = std::chrono::steady_clock::now();
double elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(end_time - start_time).count();
double touched_gib = chunks_touched * cache.chunkSize() * 1. / (1ul << 30);
std::cout << fmt::format("touched {:.1f} GiB in {:.1f} seconds, that's {:.3f} GiB/s",
touched_gib, elapsed_seconds, touched_gib / elapsed_seconds) << std::endl;
auto & counters = CurrentThread::getProfileEvents();
std::cout << "stats:"
<< "\nchunk misses: " << counters[ProfileEvents::PageCacheChunkMisses].load()
<< "\nchunk shared: " << counters[ProfileEvents::PageCacheChunkShared].load()
<< "\nchunk data misses: " << counters[ProfileEvents::PageCacheChunkDataMisses].load()
<< "\nchunk data partial hits: " << counters[ProfileEvents::PageCacheChunkDataPartialHits].load()
<< "\nchunk data hits: " << counters[ProfileEvents::PageCacheChunkDataHits].load()
<< "\nracing page writes: " << total_racing_writes << std::endl;
/// Check that we at least hit all the cases.
CHECK(counters[ProfileEvents::PageCacheChunkMisses].load() > 0);
CHECK(counters[ProfileEvents::PageCacheChunkShared].load() > 0);
CHECK(counters[ProfileEvents::PageCacheChunkDataMisses].load() > 0);
/// Partial hits are rare enough that sometimes this is zero, so don't check it.
/// That's good news because we don't need to implement downloading parts of a chunk.
/// CHECK(counters[ProfileEvents::PageCacheChunkDataPartialHits].load() > 0);
CHECK(counters[ProfileEvents::PageCacheChunkDataHits].load() > 0);
CHECK(total_racing_writes > 0);
CHECK(cache.getPinnedSize() == 0);
size_t rss = cache.getResidentSetSize().page_cache_rss;
std::cout << "RSS: " << rss * 1. / (1ul << 30) << " GiB" << std::endl;
/// This can be flaky if the system has < 10% free memory. If this turns out to be a problem, feel free to remove or reduce.
CHECK(rss > ram_size / 10);
cache.dropCache();
#ifdef OS_LINUX
/// MADV_DONTNEED is not synchronous, and we're freeing lots of pages. Let's give Linux a lot of time.
std::this_thread::sleep_for(std::chrono::seconds(10));
size_t new_rss = cache.getResidentSetSize().page_cache_rss;
std::cout << "RSS after dropping cache: " << new_rss * 1. / (1ul << 30) << " GiB" << std::endl;
CHECK(new_rss < rss / 2);
#endif
}
/// Benchmark that measures the PageCache overhead for cache hits. Doesn't touch the actual data, so
/// memory bandwidth mostly doesn't factor into this.
/// This measures the overhead of things like madvise(MADV_FREE) and probing the pages (restoreChunkFromLimbo()).
/// Disabled in CI, run manually with --gtest_also_run_disabled_tests --gtest_filter=PageCache.DISABLED_HitsBench
TEST(PageCache, DISABLED_HitsBench)
{
/// Do a few runs, with and without MADV_FREE.
for (size_t num_threads = 1; num_threads <= 16; num_threads *= 2)
{
for (size_t run = 0; run < 8; ++ run)
{
bool use_madv_free = run % 2 == 1;
bool use_huge_pages = run % 4 / 2 == 1;
PageCache cache(2 << 20, 1ul << 30, 20ul << 30, use_madv_free, use_huge_pages);
size_t passes = 3;
std::atomic<size_t> total_misses {0};
/// Prepopulate all chunks.
for (size_t i = 0; i < cache.maxChunks(); ++i)
{
PageCacheKey key = i * 0xcafebabeb0bad00dul;
PinnedPageChunk chunk = cache.getOrSet(key, /* detache_if_missing */ false, /* inject_eviction */ false);
memset(chunk.getChunk()->data, 42, chunk.getChunk()->size);
chunk.markPrefixPopulated(cache.chunkSize());
}
auto thread_func = [&]
{
pcg64 rng(randomSeed());
size_t misses = 0;
for (size_t i = 0; i < cache.maxChunks() * passes; ++i)
{
PageCacheKey key = rng() % cache.maxChunks() * 0xcafebabeb0bad00dul;
PinnedPageChunk chunk = cache.getOrSet(key, /* detache_if_missing */ false, /* inject_eviction */ false);
if (!chunk.isPrefixPopulated(cache.chunkSize()))
misses += 1;
}
total_misses += misses;
};
auto start_time = std::chrono::steady_clock::now();
std::vector<std::thread> threads;
for (size_t i = 0; i < num_threads; ++i)
threads.emplace_back(thread_func);
for (std::thread & t : threads)
t.join();
auto end_time = std::chrono::steady_clock::now();
double elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(end_time - start_time).count();
double fetched_gib = cache.chunkSize() * cache.maxChunks() * passes * 1. / (1ul << 30);
std::cout << fmt::format(
"threads {}, run {}, use_madv_free = {}, use_huge_pages = {}\nrequested {:.1f} GiB in {:.1f} seconds\n"
"that's {:.1f} GiB/s, or overhead of {:.3}us/{:.1}MiB\n",
num_threads, run, use_madv_free, use_huge_pages, fetched_gib, elapsed_seconds, fetched_gib / elapsed_seconds,
elapsed_seconds * 1e6 / cache.maxChunks() / passes, cache.chunkSize() * 1. / (1 << 20)) << std::endl;
if (total_misses != 0)
std::cout << "!got " << total_misses.load() << " misses! perhaps your system doesn't have enough free memory, consider decreasing cache size in the benchmark code" << std::endl;
}
}
}

View File

@ -415,6 +415,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s
case Type::STOP_THREAD_FUZZER:
case Type::START_VIEWS:
case Type::STOP_VIEWS:
case Type::DROP_PAGE_CACHE:
break;
case Type::UNKNOWN:
case Type::END:

View File

@ -31,6 +31,7 @@ public:
DROP_COMPILED_EXPRESSION_CACHE,
DROP_FILESYSTEM_CACHE,
DROP_DISK_METADATA_CACHE,
DROP_PAGE_CACHE,
DROP_SCHEMA_CACHE,
DROP_FORMAT_SCHEMA_CACHE,
DROP_S3_CLIENT_CACHE,

View File

@ -1637,10 +1637,6 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const
size_t file_size = getDataPartStorage().getFileSize(TXN_VERSION_METADATA_FILE_NAME);
auto buf = getDataPartStorage().readFile(TXN_VERSION_METADATA_FILE_NAME, ReadSettings().adjustBufferSize(file_size), file_size, std::nullopt);
/// FIXME https://github.com/ClickHouse/ClickHouse/issues/48465
if (dynamic_cast<CachedOnDiskReadBufferFromFile *>(buf.get()))
return true;
readStringUntilEOF(content, *buf);
ReadBufferFromString str_buf{content};
VersionMetadata file;

View File

@ -723,7 +723,7 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
auto context = getContext();
auto read_buffer_creator =
[this, read_settings, object_size]
(const std::string & path, size_t read_until_position) -> std::unique_ptr<ReadBufferFromFileBase>
(bool restricted_seek, const std::string & path) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromS3>(
client,
@ -734,21 +734,25 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
read_settings,
/* use_external_buffer */true,
/* offset */0,
read_until_position,
/* restricted_seek */true,
/* read_until_position */0,
restricted_seek,
object_size);
};
auto modified_settings{read_settings};
/// User's S3 object may change, don't cache it.
modified_settings.use_page_cache_for_disks_without_file_cache = false;
/// FIXME: Changing this setting to default value breaks something around parquet reading
modified_settings.remote_read_min_bytes_for_seek = modified_settings.remote_fs_buffer_size;
auto s3_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{StoredObject{key, /* local_path */ "", object_size}},
"",
read_settings,
/* cache_log */nullptr, /* use_external_buffer */true);
auto modified_settings{read_settings};
/// FIXME: Changing this setting to default value breaks something around parquet reading
modified_settings.remote_read_min_bytes_for_seek = modified_settings.remote_fs_buffer_size;
auto & pool_reader = context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
auto async_reader = std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(s3_impl), pool_reader, modified_settings,

View File

@ -699,6 +699,8 @@ class SettingsRandomizer:
get_localzone(),
]
),
"use_page_cache_for_disks_without_file_cache": lambda: random.random() < 0.7,
"page_cache_inject_eviction": lambda: random.random() < 0.5,
}
@staticmethod

View File

@ -113,6 +113,7 @@ SYSTEM DROP COMPILED EXPRESSION CACHE ['SYSTEM DROP COMPILED EXPRESSION','DROP C
SYSTEM DROP FILESYSTEM CACHE ['SYSTEM DROP FILESYSTEM CACHE','DROP FILESYSTEM CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP DISTRIBUTED CACHE ['SYSTEM DROP DISTRIBUTED CACHE','DROP DISTRIBUTED CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM SYNC FILESYSTEM CACHE ['SYSTEM REPAIR FILESYSTEM CACHE','REPAIR FILESYSTEM CACHE','SYNC FILESYSTEM CACHE'] GLOBAL SYSTEM
SYSTEM DROP PAGE CACHE ['SYSTEM DROP PAGE CACHE','DROP PAGE CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP SCHEMA CACHE ['SYSTEM DROP SCHEMA CACHE','DROP SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP FORMAT SCHEMA CACHE ['SYSTEM DROP FORMAT SCHEMA CACHE','DROP FORMAT SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP S3 CLIENT CACHE ['SYSTEM DROP S3 CLIENT','DROP S3 CLIENT CACHE'] GLOBAL SYSTEM DROP CACHE

View File

@ -0,0 +1,23 @@
54975576145920
PageCacheBytesUnpinnedRoundedToHugePages 1
PageCacheBytesUnpinnedRoundedToPages 1
PageCacheChunkMisses 1
ReadBufferFromS3Bytes 1
54975576145920
PageCacheBytesUnpinnedRoundedToHugePages 1
PageCacheBytesUnpinnedRoundedToPages 1
PageCacheChunkDataHits 1
54975576145920
PageCacheBytesUnpinnedRoundedToHugePages 1
PageCacheBytesUnpinnedRoundedToPages 1
PageCacheChunkMisses 1
ReadBufferFromS3Bytes 1
54975576145920
PageCacheBytesUnpinnedRoundedToHugePages 1
PageCacheBytesUnpinnedRoundedToPages 1
PageCacheChunkMisses 1
ReadBufferFromS3Bytes 1
54975576145920
PageCacheBytesUnpinnedRoundedToHugePages 1
PageCacheBytesUnpinnedRoundedToPages 1
PageCacheChunkDataHits 1

View File

@ -0,0 +1,105 @@
-- Tags: no-fasttest, no-parallel
-- no-fasttest because we need an S3 storage policy
-- no-parallel because we look at server-wide counters about page cache usage
set use_page_cache_for_disks_without_file_cache = 1;
set page_cache_inject_eviction = 0;
set enable_filesystem_cache = 0;
set use_uncompressed_cache = 0;
create table events_snapshot engine Memory as select * from system.events;
create view events_diff as
-- round all stats to 70 MiB to leave a lot of leeway for overhead
with if(event like '%Bytes%', 70*1024*1024, 35) as granularity,
-- cache hits counter can vary a lot depending on other settings:
-- e.g. if merge_tree_min_bytes_for_concurrent_read is small, multiple threads will read each chunk
-- so we just check that the value is not too low
if(event in (
'PageCacheBytesUnpinnedRoundedToPages', 'PageCacheBytesUnpinnedRoundedToHugePages',
'PageCacheChunkDataHits'), 1, 1000) as clamp
select event, min2(intDiv(new.value - old.value, granularity), clamp) as diff
from system.events new
left outer join events_snapshot old
on old.event = new.event
where diff != 0 and
event in (
'ReadBufferFromS3Bytes', 'PageCacheChunkMisses', 'PageCacheChunkDataMisses',
'PageCacheChunkDataHits', 'PageCacheChunkDataPartialHits',
'PageCacheBytesUnpinnedRoundedToPages', 'PageCacheBytesUnpinnedRoundedToHugePages')
order by event;
drop table if exists page_cache_03055;
create table page_cache_03055 (k Int64 CODEC(NONE)) engine MergeTree order by k settings storage_policy = 's3_cache';
-- Write an 80 MiB file (40 x 2 MiB chunks), and a few small files.
system stop merges page_cache_03055;
insert into page_cache_03055 select * from numbers(10485760) settings max_block_size=100000000, preferred_block_size_bytes=1000000000;
select * from events_diff;
truncate table events_snapshot;
insert into events_snapshot select * from system.events;
system start merges page_cache_03055;
optimize table page_cache_03055 final;
truncate table events_snapshot;
insert into events_snapshot select * from system.events;
-- Cold read, should miss cache. (Populating cache on write is not implemented yet.)
select sum(k) from page_cache_03055;
select * from events_diff where event not in ('PageCacheChunkDataHits');
truncate table events_snapshot;
insert into events_snapshot select * from system.events;
-- Repeat read, should hit cache.
select sum(k) from page_cache_03055;
select * from events_diff;
truncate table events_snapshot;
insert into events_snapshot select * from system.events;
-- Drop cache and read again, should miss. Also don't write to cache.
system drop page cache;
select sum(k) from page_cache_03055 settings read_from_page_cache_if_exists_otherwise_bypass_cache = 1;
-- Data could be read multiple times because we're not writing to cache.
select event, if(event in ('PageCacheChunkMisses', 'ReadBufferFromS3Bytes'), diff >= 1, diff) from events_diff where event not in ('PageCacheChunkDataHits');
truncate table events_snapshot;
insert into events_snapshot select * from system.events;
-- Repeat read, should still miss, but populate cache.
select sum(k) from page_cache_03055;
select * from events_diff where event not in ('PageCacheChunkDataHits');
truncate table events_snapshot;
insert into events_snapshot select * from system.events;
-- Read again, hit the cache.
select sum(k) from page_cache_03055 settings read_from_page_cache_if_exists_otherwise_bypass_cache = 1;
select * from events_diff;
truncate table events_snapshot;
insert into events_snapshot select * from system.events;
-- Known limitation: cache is not invalidated if a table is dropped and created again at the same path.
-- set allow_deprecated_database_ordinary=1;
-- create database test_03055 engine = Ordinary;
-- create table test_03055.t (k Int64) engine MergeTree order by k settings storage_policy = 's3_cache';
-- insert into test_03055.t values (1);
-- select * from test_03055.t;
-- drop table test_03055.t;
-- create table test_03055.t (k Int64) engine MergeTree order by k settings storage_policy = 's3_cache';
-- insert into test_03055.t values (2);
-- select * from test_03055.t;
drop table events_snapshot;
drop table page_cache_03055;
drop view events_diff;