This commit is contained in:
kssenii 2023-08-10 13:56:13 +02:00
parent a470165769
commit 48fe9605a8
10 changed files with 100 additions and 123 deletions

View File

@ -194,6 +194,7 @@
M(FilesystemCacheSizeLimit, "Filesystem cache size limit in bytes") \
M(FilesystemCacheElements, "Filesystem cache elements (file segments)") \
M(FilesystemCacheDownloadQueueElements, "Filesystem cache elements in download queue") \
M(FilesystemCacheDelayedCleanupElements, "Filesystem cache elements in background cleanup queue") \
M(AsyncInsertCacheSize, "Number of async insert hash id in cache") \
M(S3Requests, "S3 requests") \
M(KeeperAliveConnections, "Number of alive connections") \

View File

@ -10,6 +10,7 @@
#include <Interpreters/Context.h>
#include <base/hex.h>
#include <pcg-random/pcg_random.hpp>
#include "Common/ThreadPool_fwd.h"
#include <Common/randomSeed.h>
#include <Common/ThreadPool.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
@ -54,7 +55,6 @@ namespace ErrorCodes
FileCache::FileCache(const FileCacheSettings & settings)
: max_file_segment_size(settings.max_file_segment_size)
, bypass_cache_threshold(settings.enable_bypass_cache_with_threashold ? settings.bypass_cache_threashold : 0)
, delayed_cleanup_interval_ms(settings.delayed_cleanup_interval_ms)
, boundary_alignment(settings.boundary_alignment)
, background_download_threads(settings.background_download_threads)
, log(&Poco::Logger::get("FileCache"))
@ -134,9 +134,7 @@ void FileCache::initialize()
for (size_t i = 0; i < background_download_threads; ++i)
download_threads.emplace_back([this] { metadata.downloadThreadFunc(); });
cleanup_task = Context::getGlobalContextInstance()->getSchedulePool().createTask("FileCacheCleanup", [this]{ cleanupThreadFunc(); });
cleanup_task->activate();
cleanup_task->scheduleAfter(delayed_cleanup_interval_ms);
cleanup_thread = std::make_unique<ThreadFromGlobalPool>(std::function{ [this]{ metadata.cleanupThreadFunc(); }});
}
CacheGuard::Lock FileCache::lockCache() const
@ -1028,33 +1026,14 @@ FileCache::~FileCache()
void FileCache::deactivateBackgroundOperations()
{
if (cleanup_task)
cleanup_task->deactivate();
metadata.cancelDownload();
for (auto & thread : download_threads)
if (thread.joinable())
thread.join();
}
void FileCache::cleanup()
{
metadata.doCleanup();
}
void FileCache::cleanupThreadFunc()
{
try
{
cleanup();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
chassert(false);
}
cleanup_task->scheduleAfter(delayed_cleanup_interval_ms);
metadata.cancelCleanup();
if (cleanup_thread && cleanup_thread->joinable())
cleanup_thread->join();
}
FileSegmentsHolderPtr FileCache::getSnapshot()

View File

@ -12,7 +12,7 @@
#include <IO/ReadSettings.h>
#include <Core/BackgroundSchedulePool.h>
#include <Common/ThreadPool.h>
#include <Interpreters/Cache/LRUFileCachePriority.h>
#include <Interpreters/Cache/FileCache_fwd.h>
#include <Interpreters/Cache/FileSegment.h>
@ -130,8 +130,6 @@ public:
FileSegmentsHolderPtr dumpQueue();
void cleanup();
void deactivateBackgroundOperations();
/// For per query cache limit.
@ -157,7 +155,6 @@ private:
const size_t max_file_segment_size;
const size_t bypass_cache_threshold = 0;
const size_t delayed_cleanup_interval_ms;
const size_t boundary_alignment;
const size_t background_download_threads;
@ -202,9 +199,8 @@ private:
* A background cleanup task.
* Clears removed cache entries from metadata.
*/
BackgroundSchedulePool::TaskHolder cleanup_task;
std::vector<ThreadFromGlobalPool> download_threads;
std::unique_ptr<ThreadFromGlobalPool> cleanup_thread;
void assertInitialized() const;
@ -235,8 +231,6 @@ private:
FileSegment::State state,
const CreateFileSegmentSettings & create_settings,
const CacheGuard::Lock *);
void cleanupThreadFunc();
};
}

View File

@ -49,8 +49,6 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
if (config.has(config_prefix + ".background_download_threads"))
background_download_threads = config.getUInt(config_prefix + ".background_download_threads");
delayed_cleanup_interval_ms = config.getUInt64(config_prefix + ".delayed_cleanup_interval_ms", FILECACHE_DELAYED_CLEANUP_INTERVAL_MS);
}
}

View File

@ -24,7 +24,6 @@ struct FileCacheSettings
bool enable_bypass_cache_with_threashold = false;
size_t bypass_cache_threashold = FILECACHE_BYPASS_THRESHOLD;
size_t delayed_cleanup_interval_ms = FILECACHE_DELAYED_CLEANUP_INTERVAL_MS;
size_t boundary_alignment = FILECACHE_DEFAULT_FILE_SEGMENT_ALIGNMENT;
size_t background_download_threads = FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_THREADS;

View File

@ -10,8 +10,6 @@ static constexpr int FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_THREADS = 2;
static constexpr int FILECACHE_DEFAULT_MAX_ELEMENTS = 10000000;
static constexpr int FILECACHE_DEFAULT_HITS_THRESHOLD = 0;
static constexpr size_t FILECACHE_BYPASS_THRESHOLD = 256 * 1024 * 1024;
static constexpr size_t FILECACHE_DELAYED_CLEANUP_INTERVAL_MS = 1000 * 60; /// 1 min
static constexpr size_t FILECACHE_DELAYED_CLEANUP_BATCH_SIZE = 1000;
class FileCache;
using FileCachePtr = std::shared_ptr<FileCache>;

View File

@ -10,6 +10,7 @@ namespace fs = std::filesystem;
namespace CurrentMetrics
{
extern const Metric FilesystemCacheDownloadQueueElements;
extern const Metric FilesystemCacheDelayedCleanupElements;
}
namespace ProfileEvents
@ -134,22 +135,6 @@ std::string KeyMetadata::getFileSegmentPath(const FileSegment & file_segment)
}
class CleanupQueue
{
friend struct CacheMetadata;
public:
void add(const FileCacheKey & key);
void remove(const FileCacheKey & key);
size_t getSize() const;
private:
bool tryPop(FileCacheKey & key);
std::unordered_set<FileCacheKey> keys;
mutable std::mutex mutex;
};
CacheMetadata::CacheMetadata(const std::string & path_)
: path(path_)
, cleanup_queue(std::make_unique<CleanupQueue>())
@ -269,37 +254,6 @@ void CacheMetadata::iterate(IterateFunc && func)
}
}
void CacheMetadata::doCleanup()
{
/// Firstly, this cleanup does not delete cache files,
/// but only empty keys from cache_metadata_map and key (prefix) directories from fs.
/// Secondly, it deletes those only if arised as a result of
/// (1) eviction in FileCache::tryReserve();
/// (2) removal of cancelled non-downloaded file segments after FileSegment::complete().
/// which does not include removal of cache files because of FileCache::removeKey/removeAllKeys,
/// triggered by removal of source files from objects storage.
/// E.g. number of elements submitted to background cleanup should remain low.
auto lock = lockMetadata();
LOG_DEBUG(log, "Having {} keys to delete", cleanup_queue->getSize());
FileCacheKey cleanup_key;
size_t remaining_remove_num = FILECACHE_DELAYED_CLEANUP_BATCH_SIZE;
while (remaining_remove_num && cleanup_queue->tryPop(cleanup_key))
{
auto it = find(cleanup_key);
if (it == end())
continue;
auto locked_key = it->second->lockNoStateCheck();
if (locked_key->getKeyState() == KeyMetadata::KeyState::REMOVING)
{
removeKeyImpl(it, *locked_key, lock);
--remaining_remove_num;
}
}
}
void CacheMetadata::removeAllKeys(bool if_releasable)
{
auto lock = lockMetadata();
@ -387,6 +341,86 @@ CacheMetadata::iterator CacheMetadata::removeKeyImpl(iterator it, LockedKey & lo
return next_it;
}
class CleanupQueue
{
friend struct CacheMetadata;
public:
void add(const FileCacheKey & key)
{
{
std::lock_guard lock(mutex);
keys.insert(key);
}
CurrentMetrics::add(CurrentMetrics::FilesystemCacheDelayedCleanupElements);
cv.notify_one();
}
void cancel()
{
{
std::lock_guard lock(mutex);
cancelled = true;
}
cv.notify_all();
}
private:
std::unordered_set<FileCacheKey> keys;
mutable std::mutex mutex;
std::condition_variable cv;
bool cancelled = false;
};
void CacheMetadata::cleanupThreadFunc()
{
while (true)
{
Key key;
{
std::unique_lock lock(cleanup_queue->mutex);
if (cleanup_queue->cancelled)
return;
auto & keys = cleanup_queue->keys;
if (keys.empty())
{
cleanup_queue->cv.wait(lock);
continue;
}
auto it = keys.begin();
key = *it;
keys.erase(it);
}
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheDelayedCleanupElements);
try
{
auto lock = lockMetadata();
auto it = find(key);
if (it == end())
continue;
auto locked_key = it->second->lockNoStateCheck();
if (locked_key->getKeyState() == KeyMetadata::KeyState::REMOVING)
{
removeKeyImpl(it, *locked_key, lock);
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
void CacheMetadata::cancelCleanup()
{
cleanup_queue->cancel();
}
class DownloadQueue
{
@ -814,35 +848,4 @@ std::string LockedKey::toString() const
return result;
}
void CleanupQueue::add(const FileCacheKey & key)
{
std::lock_guard lock(mutex);
keys.insert(key);
}
void CleanupQueue::remove(const FileCacheKey & key)
{
std::lock_guard lock(mutex);
bool erased = keys.erase(key);
if (!erased)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key {} in removal queue", key);
}
bool CleanupQueue::tryPop(FileCacheKey & key)
{
std::lock_guard lock(mutex);
if (keys.empty())
return false;
auto it = keys.begin();
key = *it;
keys.erase(it);
return true;
}
size_t CleanupQueue::getSize() const
{
std::lock_guard lock(mutex);
return keys.size();
}
}

View File

@ -124,7 +124,17 @@ public:
void removeKey(const Key & key, bool if_exists, bool is_releasable);
void removeAllKeys(bool is_releasable);
void doCleanup();
void cancelCleanup();
/// Firstly, this cleanup does not delete cache files,
/// but only empty keys from cache_metadata_map and key (prefix) directories from fs.
/// Secondly, it deletes those only if arised as a result of
/// (1) eviction in FileCache::tryReserve();
/// (2) removal of cancelled non-downloaded file segments after FileSegment::complete().
/// which does not include removal of cache files because of FileCache::removeKey/removeAllKeys,
/// triggered by removal of source files from objects storage.
/// E.g. number of elements submitted to background cleanup should remain low.
void cleanupThreadFunc();
void downloadThreadFunc();

View File

@ -25,7 +25,6 @@ static Block getSampleBlock()
ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "current_size"},
ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "current_elements"},
ColumnWithTypeAndName{std::make_shared<DataTypeString>(), "path"},
ColumnWithTypeAndName{std::make_shared<DataTypeNumber<UInt64>>(), "delayed_cleanup_interval_ms"},
ColumnWithTypeAndName{std::make_shared<DataTypeNumber<UInt64>>(), "background_download_threads"},
ColumnWithTypeAndName{std::make_shared<DataTypeNumber<UInt64>>(), "enable_bypass_cache_with_threshold"},
};
@ -54,7 +53,6 @@ BlockIO InterpreterDescribeCacheQuery::execute()
res_columns[i++]->insert(cache->getUsedCacheSize());
res_columns[i++]->insert(cache->getFileSegmentsNum());
res_columns[i++]->insert(cache->getBasePath());
res_columns[i++]->insert(settings.delayed_cleanup_interval_ms);
res_columns[i++]->insert(settings.background_download_threads);
res_columns[i++]->insert(settings.enable_bypass_cache_with_threashold);

View File

@ -604,7 +604,6 @@ TEST_F(FileCacheTest, get)
auto cache = FileCache(settings);
cache.initialize();
cache.cleanup();
const auto key = cache.createKeyForPath("key10");
const auto key_path = cache.getPathInLocalCache(key);
@ -622,7 +621,6 @@ TEST_F(FileCacheTest, get)
ASSERT_TRUE(fs::exists(key_path));
ASSERT_TRUE(!fs::exists(cache.getPathInLocalCache(key, 0, FileSegmentKind::Regular)));
cache.cleanup();
ASSERT_TRUE(!fs::exists(key_path));
ASSERT_TRUE(!fs::exists(fs::path(key_path).parent_path()));
}
@ -632,7 +630,6 @@ TEST_F(FileCacheTest, get)
/// Test background thread delated cleanup
auto settings2{settings};
settings2.delayed_cleanup_interval_ms = 0;
auto cache = DB::FileCache(settings2);
cache.initialize();
const auto key = cache.createKeyForPath("key10");