mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge pull request #61152 from ClickHouse/revive-cache-contention-fix
Revive "Less contention in the cache, part 2"
This commit is contained in:
commit
659463acd4
@ -476,6 +476,7 @@ The server successfully detected this situation and will download merged part fr
|
||||
M(FileSegmentRemoveMicroseconds, "File segment remove() time") \
|
||||
M(FileSegmentHolderCompleteMicroseconds, "File segments holder complete() time") \
|
||||
M(FileSegmentFailToIncreasePriority, "Number of times the priority was not increased due to a high contention on the cache lock") \
|
||||
M(FilesystemCacheFailToReserveSpaceBecauseOfLockContention, "Number of times space reservation was skipped due to a high contention on the cache lock") \
|
||||
M(FilesystemCacheHoldFileSegments, "Filesystem cache file segments count, which were hold") \
|
||||
M(FilesystemCacheUnusedHoldFileSegments, "Filesystem cache file segments count, which were hold, but not used (because of seek or LIMIT n, etc)") \
|
||||
\
|
||||
|
@ -778,6 +778,7 @@ class IColumn;
|
||||
M(UInt64, filesystem_cache_max_download_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be downloaded by a single query", 0) \
|
||||
M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \
|
||||
M(UInt64, filesystem_cache_segments_batch_size, 20, "Limit on size of a single batch of file segments that a read buffer can request from cache. Too low value will lead to excessive requests to cache, too large may slow down eviction from cache", 0) \
|
||||
M(UInt64, filesystem_cache_reserve_space_wait_lock_timeout_milliseconds, 1000, "Wait time to lock cache for sapce reservation in filesystem cache", 0) \
|
||||
\
|
||||
M(Bool, use_page_cache_for_disks_without_file_cache, false, "Use userspace page cache for remote disks that don't have filesystem cache enabled.", 0) \
|
||||
M(Bool, read_from_page_cache_if_exists_otherwise_bypass_cache, false, "Use userspace page cache in passive mode, similar to read_from_filesystem_cache_if_exists_otherwise_bypass_cache.", 0) \
|
||||
|
@ -93,6 +93,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
|
||||
{"input_format_json_use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects", false, false, "Allow to use String type for ambiguous paths during named tuple inference from JSON objects"},
|
||||
{"throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert", false, true, "Deduplication is dependent materialized view cannot work together with async inserts."},
|
||||
{"parallel_replicas_allow_in_with_subquery", false, true, "If true, subquery for IN will be executed on every follower replica"},
|
||||
{"filesystem_cache_reserve_space_wait_lock_timeout_milliseconds", 1000, 1000, "Wait time to lock cache for sapce reservation in filesystem cache"},
|
||||
}},
|
||||
{"24.2", {{"allow_suspicious_variant_types", true, false, "Don't allow creating Variant type with suspicious variants by default"},
|
||||
{"validate_experimental_and_suspicious_types_inside_nested_types", false, true, "Validate usage of experimental and suspicious types inside nested types"},
|
||||
|
@ -637,7 +637,8 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment)
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceBytes, current_impl_buffer_size);
|
||||
|
||||
bool continue_predownload = file_segment.reserve(current_predownload_size);
|
||||
bool continue_predownload = file_segment.reserve(
|
||||
current_predownload_size, settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds);
|
||||
if (continue_predownload)
|
||||
{
|
||||
LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, current_impl_buffer_size);
|
||||
@ -992,7 +993,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
|
||||
{
|
||||
chassert(file_offset_of_buffer_end + size - 1 <= file_segment.range().right);
|
||||
|
||||
bool success = file_segment.reserve(size);
|
||||
bool success = file_segment.reserve(size, settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds);
|
||||
if (success)
|
||||
{
|
||||
chassert(file_segment.getCurrentWriteOffset() == static_cast<size_t>(implementation_buffer->getPosition()));
|
||||
|
@ -26,16 +26,18 @@ FileSegmentRangeWriter::FileSegmentRangeWriter(
|
||||
FileCache * cache_,
|
||||
const FileSegment::Key & key_,
|
||||
const FileCacheUserInfo & user_,
|
||||
size_t reserve_space_lock_wait_timeout_milliseconds_,
|
||||
std::shared_ptr<FilesystemCacheLog> cache_log_,
|
||||
const String & query_id_,
|
||||
const String & source_path_)
|
||||
: cache(cache_)
|
||||
, key(key_)
|
||||
, user(user_)
|
||||
, reserve_space_lock_wait_timeout_milliseconds(reserve_space_lock_wait_timeout_milliseconds_)
|
||||
, log(getLogger("FileSegmentRangeWriter"))
|
||||
, cache_log(cache_log_)
|
||||
, query_id(query_id_)
|
||||
, source_path(source_path_)
|
||||
, user(user_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -89,7 +91,7 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset
|
||||
|
||||
size_t size_to_write = std::min(available_size, size);
|
||||
|
||||
bool reserved = file_segment->reserve(size_to_write);
|
||||
bool reserved = file_segment->reserve(size_to_write, reserve_space_lock_wait_timeout_milliseconds);
|
||||
if (!reserved)
|
||||
{
|
||||
appendFilesystemCacheLog(*file_segment);
|
||||
@ -211,6 +213,7 @@ CachedOnDiskWriteBufferFromFile::CachedOnDiskWriteBufferFromFile(
|
||||
, key(key_)
|
||||
, query_id(query_id_)
|
||||
, user(user_)
|
||||
, reserve_space_lock_wait_timeout_milliseconds(settings_.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds)
|
||||
, throw_on_error_from_cache(settings_.throw_on_error_from_cache)
|
||||
, cache_log(!query_id_.empty() && settings_.enable_filesystem_cache_log ? cache_log_ : nullptr)
|
||||
{
|
||||
@ -251,7 +254,8 @@ void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size, bool t
|
||||
|
||||
if (!cache_writer)
|
||||
{
|
||||
cache_writer = std::make_unique<FileSegmentRangeWriter>(cache.get(), key, user, cache_log, query_id, source_path);
|
||||
cache_writer = std::make_unique<FileSegmentRangeWriter>(
|
||||
cache.get(), key, user, reserve_space_lock_wait_timeout_milliseconds, cache_log, query_id, source_path);
|
||||
}
|
||||
|
||||
Stopwatch watch(CLOCK_MONOTONIC);
|
||||
|
@ -30,6 +30,7 @@ public:
|
||||
FileCache * cache_,
|
||||
const FileSegment::Key & key_,
|
||||
const FileCacheUserInfo & user_,
|
||||
size_t reserve_space_lock_wait_timeout_milliseconds_,
|
||||
std::shared_ptr<FilesystemCacheLog> cache_log_,
|
||||
const String & query_id_,
|
||||
const String & source_path_);
|
||||
@ -52,13 +53,14 @@ private:
|
||||
void completeFileSegment();
|
||||
|
||||
FileCache * cache;
|
||||
FileSegment::Key key;
|
||||
const FileSegment::Key key;
|
||||
const FileCacheUserInfo user;
|
||||
const size_t reserve_space_lock_wait_timeout_milliseconds;
|
||||
|
||||
LoggerPtr log;
|
||||
std::shared_ptr<FilesystemCacheLog> cache_log;
|
||||
String query_id;
|
||||
String source_path;
|
||||
FileCacheUserInfo user;
|
||||
const String query_id;
|
||||
const String source_path;
|
||||
|
||||
FileSegmentsHolderPtr file_segments;
|
||||
|
||||
@ -99,11 +101,12 @@ private:
|
||||
String source_path;
|
||||
FileCacheKey key;
|
||||
|
||||
size_t current_download_offset = 0;
|
||||
const String query_id;
|
||||
const FileCacheUserInfo user;
|
||||
const size_t reserve_space_lock_wait_timeout_milliseconds;
|
||||
const bool throw_on_error_from_cache;
|
||||
|
||||
bool throw_on_error_from_cache;
|
||||
size_t current_download_offset = 0;
|
||||
bool cache_in_error_state_or_disabled = false;
|
||||
|
||||
std::unique_ptr<FileSegmentRangeWriter> cache_writer;
|
||||
|
@ -100,6 +100,7 @@ struct ReadSettings
|
||||
bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false;
|
||||
bool enable_filesystem_cache_log = false;
|
||||
size_t filesystem_cache_segments_batch_size = 20;
|
||||
size_t filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = 1000;
|
||||
|
||||
bool use_page_cache_for_disks_without_file_cache = false;
|
||||
bool read_from_page_cache_if_exists_otherwise_bypass_cache = false;
|
||||
|
@ -20,6 +20,7 @@ struct WriteSettings
|
||||
bool enable_filesystem_cache_on_write_operations = false;
|
||||
bool enable_filesystem_cache_log = false;
|
||||
bool throw_on_error_from_cache = false;
|
||||
size_t filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = 1000;
|
||||
|
||||
bool s3_allow_parallel_part_upload = true;
|
||||
|
||||
|
@ -27,6 +27,7 @@ namespace ProfileEvents
|
||||
extern const Event FilesystemCacheReserveMicroseconds;
|
||||
extern const Event FilesystemCacheGetOrSetMicroseconds;
|
||||
extern const Event FilesystemCacheGetMicroseconds;
|
||||
extern const Event FilesystemCacheFailToReserveSpaceBecauseOfLockContention;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -188,9 +189,9 @@ CacheGuard::Lock FileCache::lockCache() const
|
||||
return cache_guard.lock();
|
||||
}
|
||||
|
||||
CacheGuard::Lock FileCache::tryLockCache() const
|
||||
CacheGuard::Lock FileCache::tryLockCache(std::optional<std::chrono::milliseconds> acquire_timeout) const
|
||||
{
|
||||
return cache_guard.tryLock();
|
||||
return acquire_timeout.has_value() ? cache_guard.tryLockFor(acquire_timeout.value()) : cache_guard.tryLock();
|
||||
}
|
||||
|
||||
FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment::Range & range, size_t file_segments_limit) const
|
||||
@ -776,12 +777,18 @@ bool FileCache::tryReserve(
|
||||
FileSegment & file_segment,
|
||||
const size_t size,
|
||||
FileCacheReserveStat & reserve_stat,
|
||||
const UserInfo & user)
|
||||
const UserInfo & user,
|
||||
size_t lock_wait_timeout_milliseconds)
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheReserveMicroseconds);
|
||||
|
||||
assertInitialized();
|
||||
auto cache_lock = lockCache();
|
||||
auto cache_lock = tryLockCache(std::chrono::milliseconds(lock_wait_timeout_milliseconds));
|
||||
if (!cache_lock)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::FilesystemCacheFailToReserveSpaceBecauseOfLockContention);
|
||||
return false;
|
||||
}
|
||||
|
||||
LOG_TEST(
|
||||
log, "Trying to reserve space ({} bytes) for {}:{}, current usage {}/{}",
|
||||
|
@ -161,7 +161,8 @@ public:
|
||||
FileSegment & file_segment,
|
||||
size_t size,
|
||||
FileCacheReserveStat & stat,
|
||||
const UserInfo & user);
|
||||
const UserInfo & user,
|
||||
size_t lock_wait_timeout_milliseconds);
|
||||
|
||||
std::vector<FileSegment::Info> getFileSegmentInfos(const UserID & user_id);
|
||||
|
||||
@ -173,7 +174,7 @@ public:
|
||||
void deactivateBackgroundOperations();
|
||||
|
||||
CacheGuard::Lock lockCache() const;
|
||||
CacheGuard::Lock tryLockCache() const;
|
||||
CacheGuard::Lock tryLockCache(std::optional<std::chrono::milliseconds> acquire_timeout = std::nullopt) const;
|
||||
|
||||
std::vector<FileSegment::Info> sync();
|
||||
|
||||
|
@ -497,7 +497,7 @@ LockedKeyPtr FileSegment::lockKeyMetadata(bool assert_exists) const
|
||||
return metadata->tryLock();
|
||||
}
|
||||
|
||||
bool FileSegment::reserve(size_t size_to_reserve, FileCacheReserveStat * reserve_stat)
|
||||
bool FileSegment::reserve(size_t size_to_reserve, size_t lock_wait_timeout_milliseconds, FileCacheReserveStat * reserve_stat)
|
||||
{
|
||||
if (!size_to_reserve)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Zero space reservation is not allowed");
|
||||
@ -549,7 +549,7 @@ bool FileSegment::reserve(size_t size_to_reserve, FileCacheReserveStat * reserve
|
||||
if (!reserve_stat)
|
||||
reserve_stat = &dummy_stat;
|
||||
|
||||
bool reserved = cache->tryReserve(*this, size_to_reserve, *reserve_stat, getKeyMetadata()->user);
|
||||
bool reserved = cache->tryReserve(*this, size_to_reserve, *reserve_stat, getKeyMetadata()->user, lock_wait_timeout_milliseconds);
|
||||
|
||||
if (!reserved)
|
||||
setDownloadFailedUnlocked(lockFileSegment());
|
||||
|
@ -199,7 +199,7 @@ public:
|
||||
|
||||
/// Try to reserve exactly `size` bytes (in addition to the getDownloadedSize() bytes already downloaded).
|
||||
/// Returns true if reservation was successful, false otherwise.
|
||||
bool reserve(size_t size_to_reserve, FileCacheReserveStat * reserve_stat = nullptr);
|
||||
bool reserve(size_t size_to_reserve, size_t lock_wait_timeout_milliseconds, FileCacheReserveStat * reserve_stat = nullptr);
|
||||
|
||||
/// Write data into reserved space.
|
||||
void write(const char * from, size_t size, size_t offset);
|
||||
|
@ -61,17 +61,26 @@ namespace DB
|
||||
*/
|
||||
struct CacheGuard : private boost::noncopyable
|
||||
{
|
||||
using Mutex = std::timed_mutex;
|
||||
/// struct is used (not keyword `using`) to make CacheGuard::Lock non-interchangable with other guards locks
|
||||
/// so, we wouldn't be able to pass CacheGuard::Lock to a function which accepts KeyGuard::Lock, for example
|
||||
struct Lock : public std::unique_lock<std::mutex>
|
||||
struct Lock : public std::unique_lock<Mutex>
|
||||
{
|
||||
using Base = std::unique_lock<std::mutex>;
|
||||
using Base = std::unique_lock<Mutex>;
|
||||
using Base::Base;
|
||||
};
|
||||
|
||||
Lock lock() { return Lock(mutex); }
|
||||
|
||||
Lock tryLock() { return Lock(mutex, std::try_to_lock); }
|
||||
std::mutex mutex;
|
||||
|
||||
Lock tryLockFor(const std::chrono::milliseconds & acquire_timeout)
|
||||
{
|
||||
return Lock(mutex, std::chrono::duration<double, std::milli>(acquire_timeout));
|
||||
}
|
||||
|
||||
private:
|
||||
Mutex mutex;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <Interpreters/Cache/Metadata.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Interpreters/Cache/FileSegment.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/ElapsedTimeProfileEventIncrement.h>
|
||||
#include <filesystem>
|
||||
@ -693,6 +694,9 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
|
||||
reader->set(memory->data(), memory->size());
|
||||
}
|
||||
|
||||
const auto reserve_space_lock_wait_timeout_milliseconds =
|
||||
Context::getGlobalContextInstance()->getReadSettings().filesystem_cache_reserve_space_wait_lock_timeout_milliseconds;
|
||||
|
||||
size_t offset = file_segment.getCurrentWriteOffset();
|
||||
if (offset != static_cast<size_t>(reader->getPosition()))
|
||||
reader->seek(offset, SEEK_SET);
|
||||
@ -701,7 +705,7 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
|
||||
{
|
||||
auto size = reader->available();
|
||||
|
||||
if (!file_segment.reserve(size))
|
||||
if (!file_segment.reserve(size, reserve_space_lock_wait_timeout_milliseconds))
|
||||
{
|
||||
LOG_TEST(
|
||||
log, "Failed to reserve space during background download "
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <Interpreters/Cache/WriteBufferToFileSegment.h>
|
||||
#include <Interpreters/Cache/FileSegment.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <IO/SwapHelper.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
|
||||
@ -18,9 +19,22 @@ namespace ErrorCodes
|
||||
extern const int NOT_ENOUGH_SPACE;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
size_t getCacheLockWaitTimeout()
|
||||
{
|
||||
auto query_context = CurrentThread::getQueryContext();
|
||||
if (query_context)
|
||||
return query_context->getReadSettings().filesystem_cache_reserve_space_wait_lock_timeout_milliseconds;
|
||||
else
|
||||
return Context::getGlobalContextInstance()->getReadSettings().filesystem_cache_reserve_space_wait_lock_timeout_milliseconds;
|
||||
}
|
||||
}
|
||||
|
||||
WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegment * file_segment_)
|
||||
: WriteBufferFromFileDecorator(std::make_unique<WriteBufferFromFile>(file_segment_->getPath()))
|
||||
, file_segment(file_segment_)
|
||||
, reserve_space_lock_wait_timeout_milliseconds(getCacheLockWaitTimeout())
|
||||
{
|
||||
}
|
||||
|
||||
@ -31,6 +45,7 @@ WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegmentsHolderPtr segment
|
||||
: throw Exception(ErrorCodes::LOGICAL_ERROR, "WriteBufferToFileSegment can be created only from single segment"))
|
||||
, file_segment(&segment_holder_->front())
|
||||
, segment_holder(std::move(segment_holder_))
|
||||
, reserve_space_lock_wait_timeout_milliseconds(getCacheLockWaitTimeout())
|
||||
{
|
||||
}
|
||||
|
||||
@ -49,7 +64,7 @@ void WriteBufferToFileSegment::nextImpl()
|
||||
FileCacheReserveStat reserve_stat;
|
||||
/// In case of an error, we don't need to finalize the file segment
|
||||
/// because it will be deleted soon and completed in the holder's destructor.
|
||||
bool ok = file_segment->reserve(bytes_to_write, &reserve_stat);
|
||||
bool ok = file_segment->reserve(bytes_to_write, reserve_space_lock_wait_timeout_milliseconds, &reserve_stat);
|
||||
|
||||
if (!ok)
|
||||
{
|
||||
|
@ -28,6 +28,8 @@ private:
|
||||
|
||||
/// Empty if file_segment is not owned by this WriteBufferToFileSegment
|
||||
FileSegmentsHolderPtr segment_holder;
|
||||
|
||||
const size_t reserve_space_lock_wait_timeout_milliseconds;
|
||||
};
|
||||
|
||||
|
||||
|
@ -5166,6 +5166,7 @@ ReadSettings Context::getReadSettings() const
|
||||
res.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache;
|
||||
res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log;
|
||||
res.filesystem_cache_segments_batch_size = settings.filesystem_cache_segments_batch_size;
|
||||
res.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds;
|
||||
|
||||
res.filesystem_cache_max_download_size = settings.filesystem_cache_max_download_size;
|
||||
res.skip_download_if_exceeds_query_cache = settings.skip_download_if_exceeds_query_cache;
|
||||
@ -5214,6 +5215,7 @@ WriteSettings Context::getWriteSettings() const
|
||||
res.enable_filesystem_cache_on_write_operations = settings.enable_filesystem_cache_on_write_operations;
|
||||
res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log;
|
||||
res.throw_on_error_from_cache = settings.throw_on_error_from_cache_on_write_operations;
|
||||
res.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds;
|
||||
|
||||
res.s3_allow_parallel_part_upload = settings.s3_allow_parallel_part_upload;
|
||||
|
||||
|
@ -245,7 +245,7 @@ void download(FileSegment & file_segment)
|
||||
ASSERT_EQ(file_segment.state(), State::DOWNLOADING);
|
||||
ASSERT_EQ(file_segment.getDownloadedSize(), 0);
|
||||
|
||||
ASSERT_TRUE(file_segment.reserve(file_segment.range().size()));
|
||||
ASSERT_TRUE(file_segment.reserve(file_segment.range().size(), 1000));
|
||||
download(cache_base_path, file_segment);
|
||||
ASSERT_EQ(file_segment.state(), State::DOWNLOADING);
|
||||
|
||||
@ -257,7 +257,7 @@ void assertDownloadFails(FileSegment & file_segment)
|
||||
{
|
||||
ASSERT_EQ(file_segment.getOrSetDownloader(), FileSegment::getCallerId());
|
||||
ASSERT_EQ(file_segment.getDownloadedSize(), 0);
|
||||
ASSERT_FALSE(file_segment.reserve(file_segment.range().size()));
|
||||
ASSERT_FALSE(file_segment.reserve(file_segment.range().size(), 1000));
|
||||
file_segment.complete();
|
||||
}
|
||||
|
||||
@ -956,7 +956,7 @@ TEST_F(FileCacheTest, temporaryData)
|
||||
for (auto & segment : *some_data_holder)
|
||||
{
|
||||
ASSERT_TRUE(segment->getOrSetDownloader() == DB::FileSegment::getCallerId());
|
||||
ASSERT_TRUE(segment->reserve(segment->range().size()));
|
||||
ASSERT_TRUE(segment->reserve(segment->range().size(), 1000));
|
||||
download(*segment);
|
||||
segment->complete();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user