This commit is contained in:
kssenii 2023-06-30 16:08:48 +02:00
parent 72d1834bbd
commit c8ab68a5c3
16 changed files with 156 additions and 18 deletions

View File

@ -74,19 +74,22 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile(
}
void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog(
const FileSegment::Range & file_segment_range, CachedOnDiskReadBufferFromFile::ReadType type)
const FileSegment & file_segment, CachedOnDiskReadBufferFromFile::ReadType type)
{
if (!cache_log)
return;
const auto range = file_segment.range();
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = source_file_path,
.file_segment_range = { file_segment_range.left, file_segment_range.right },
.file_segment_range = { range.left, range.right },
.requested_range = { first_offset, read_until_position },
.file_segment_size = file_segment_range.size(),
.file_segment_key = file_segment.key().toString(),
.file_segment_offset = file_segment.offset(),
.file_segment_size = range.size(),
.read_from_cache_attempted = true,
.read_buffer_id = current_buffer_id,
.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(
@ -495,7 +498,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
auto completed_range = current_file_segment->range();
if (cache_log)
appendFilesystemCacheLog(completed_range, read_type);
appendFilesystemCacheLog(*current_file_segment, read_type);
chassert(file_offset_of_buffer_end > completed_range.right);
@ -521,7 +524,7 @@ CachedOnDiskReadBufferFromFile::~CachedOnDiskReadBufferFromFile()
{
if (cache_log && file_segments && !file_segments->empty())
{
appendFilesystemCacheLog(file_segments->front().range(), read_type);
appendFilesystemCacheLog(file_segments->front(), read_type);
}
}

View File

@ -90,7 +90,7 @@ private:
bool completeFileSegmentAndGetNext();
void appendFilesystemCacheLog(const FileSegment::Range & file_segment_range, ReadType read_type);
void appendFilesystemCacheLog(const FileSegment & file_segment, ReadType read_type);
bool writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment);

View File

@ -88,6 +88,8 @@ void ReadBufferFromRemoteFSGather::appendUncachedReadInfo()
.source_file_path = current_object.remote_path,
.file_segment_range = { 0, current_object.bytes_size },
.cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE,
.file_segment_key = {},
.file_segment_offset = {},
.file_segment_size = current_object.bytes_size,
.read_from_cache_attempted = false,
};

View File

@ -45,6 +45,8 @@ void registerDiskCache(DiskFactory & factory, bool /* global_skip_access_check *
else if (fs::path(file_cache_settings.base_path).is_relative())
file_cache_settings.base_path = fs::path(context->getPath()) / "caches" / file_cache_settings.base_path;
file_cache_settings.base_path = fs::absolute(file_cache_settings.base_path);
auto cache = FileCacheFactory::instance().getOrCreate(name, file_cache_settings);
auto disk = disk_it->second;
if (!dynamic_cast<const DiskObjectStorage *>(disk.get()))

View File

@ -808,6 +808,17 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size)
return true;
}
void FileCache::removeKey(const Key & key)
{
assertInitialized();
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW);
if (!locked_key)
return;
locked_key->removeAllReleasable();
}
void FileCache::removeKeyIfExists(const Key & key)
{
assertInitialized();
@ -823,6 +834,17 @@ void FileCache::removeKeyIfExists(const Key & key)
locked_key->removeAllReleasable();
}
void FileCache::removeFileSegment(const Key & key, size_t offset)
{
assertInitialized();
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::RETURN_NULL);
if (!locked_key)
return;
locked_key->removeFileSegment(offset);
}
void FileCache::removePathIfExists(const String & path)
{
removeKeyIfExists(createKeyForPath(path));
@ -917,7 +939,7 @@ void FileCache::loadMetadata()
continue;
}
const auto key = Key(unhexUInt<UInt128>(key_directory.filename().string().data()));
const auto key = Key::fromKeyString(key_directory.filename().string());
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::CREATE_EMPTY, /* is_initial_load */true);
for (fs::directory_iterator offset_it{key_directory}; offset_it != fs::directory_iterator(); ++offset_it)

View File

@ -83,13 +83,19 @@ public:
FileSegmentsHolderPtr set(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings);
/// Remove files by `key`. Removes files which might be used at the moment.
/// Remove file segment by `key` and `offset`. Throws if file segment does not exist.
void removeFileSegment(const Key & key, size_t offset);
/// Remove files by `key`. Throws if key does not exist.
void removeKey(const Key & key);
/// Remove files by `key`.
void removeKeyIfExists(const Key & key);
/// Removes files by `path`. Removes files which might be used at the moment.
/// Removes files by `path`.
void removePathIfExists(const String & path);
/// Remove files by `key`. Will not remove files which are used at the moment.
/// Remove files by `key`.
void removeAllReleasable();
std::vector<String> tryGetCachePaths(const Key & key);

View File

@ -28,4 +28,9 @@ FileCacheKey FileCacheKey::random()
return FileCacheKey(UUIDHelpers::generateV4().toUnderType());
}
FileCacheKey FileCacheKey::fromKeyString(const std::string & key_str)
{
return FileCacheKey(unhexUInt<UInt128>(key_str.data()));
}
}

View File

@ -21,6 +21,8 @@ struct FileCacheKey
static FileCacheKey random();
bool operator==(const FileCacheKey & other) const { return key == other.key; }
static FileCacheKey fromKeyString(const std::string & key_str);
};
using FileCacheKeyAndOffset = std::pair<FileCacheKey, size_t>;

View File

@ -381,19 +381,34 @@ void LockedKey::removeAllReleasable()
}
}
KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, const FileSegmentGuard::Lock & segment_lock)
KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, bool allow_throw)
{
auto it = key_metadata->find(offset);
if (it == key_metadata->end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no offset {}", offset);
auto file_segment = it->second->file_segment;
return removeFileSegmentImpl(it, file_segment->lock(), allow_throw);
}
KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, const FileSegmentGuard::Lock & segment_lock, bool allow_throw)
{
auto it = key_metadata->find(offset);
if (it == key_metadata->end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no offset {}", offset);
return removeFileSegmentImpl(it, segment_lock, allow_throw);
}
KeyMetadata::iterator LockedKey::removeFileSegmentImpl(KeyMetadata::iterator it, const FileSegmentGuard::Lock & segment_lock, bool allow_throw)
{
auto file_segment = it->second->file_segment;
LOG_DEBUG(
key_metadata->log, "Remove from cache. Key: {}, offset: {}, size: {}",
getKey(), offset, file_segment->reserved_size);
getKey(), file_segment->offset(), file_segment->reserved_size);
chassert(file_segment->assertCorrectnessUnlocked(segment_lock));
chassert(!allow_throw || file_segment->assertCorrectnessUnlocked(segment_lock));
if (file_segment->queue_iterator)
file_segment->queue_iterator->invalidate();
@ -405,7 +420,7 @@ KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, const FileSegm
fs::remove(path);
LOG_TEST(key_metadata->log, "Removed file segment at path: {}", path);
}
else if (file_segment->downloaded_size)
else if (file_segment->downloaded_size && allow_throw)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected path {} to exist", path);
file_segment->detach(segment_lock, *this);
@ -522,7 +537,10 @@ FileSegments LockedKey::sync()
{
auto file_segment = it->second->file_segment;
if (file_segment->state() != FileSegment::State::DOWNLOADED)
{
++it;
continue;
}
const auto & path = file_segment->getPathInLocalCache();
if (!fs::exists(path))
@ -533,7 +551,7 @@ FileSegments LockedKey::sync()
file_segment->getInfoForLog());
broken.push_back(FileSegment::getSnapshot(file_segment));
removeFileSegment(file_segment->offset(), file_segment->lock());
it = removeFileSegment(file_segment->offset(), file_segment->lock(), false);
continue;
}
@ -541,7 +559,10 @@ FileSegments LockedKey::sync()
const size_t expected_size = file_segment->getDownloadedSize(false);
if (actual_size == expected_size)
{
++it;
continue;
}
LOG_WARNING(
key_metadata->log,
@ -555,10 +576,12 @@ FileSegments LockedKey::sync()
{
file_segment->downloaded_size = actual_size;
file_segment->download_state = FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
++it;
}
else
{
removeFileSegment(file_segment->offset(), file_segment_lock);
it = removeFileSegment(file_segment->offset(), file_segment_lock, false);
fs::remove(path);
}
}
return broken;

View File

@ -158,7 +158,8 @@ struct LockedKey : private boost::noncopyable
void removeAllReleasable();
KeyMetadata::iterator removeFileSegment(size_t offset, const FileSegmentGuard::Lock &);
KeyMetadata::iterator removeFileSegment(size_t offset, const FileSegmentGuard::Lock &, bool allow_throw = true);
KeyMetadata::iterator removeFileSegment(size_t offset, bool allow_throw = true);
void shrinkFileSegmentToDownloadedSize(size_t offset, const FileSegmentGuard::Lock &);
@ -175,6 +176,8 @@ struct LockedKey : private boost::noncopyable
std::string toString() const;
private:
KeyMetadata::iterator removeFileSegmentImpl(KeyMetadata::iterator it, const FileSegmentGuard::Lock &, bool allow_throw = true);
const std::shared_ptr<KeyMetadata> key_metadata;
KeyGuard::Lock lock; /// `lock` must be destructed before `key_metadata`.
};

View File

@ -40,6 +40,8 @@ NamesAndTypesList FilesystemCacheLogElement::getNamesAndTypes()
{"source_file_path", std::make_shared<DataTypeString>()},
{"file_segment_range", std::make_shared<DataTypeTuple>(types)},
{"total_requested_range", std::make_shared<DataTypeTuple>(types)},
{"key", std::make_shared<DataTypeString>()},
{"offset", std::make_shared<DataTypeUInt64>()},
{"size", std::make_shared<DataTypeUInt64>()},
{"read_type", std::make_shared<DataTypeString>()},
{"read_from_cache_attempted", std::make_shared<DataTypeUInt8>()},
@ -60,6 +62,8 @@ void FilesystemCacheLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(source_file_path);
columns[i++]->insert(Tuple{file_segment_range.first, file_segment_range.second});
columns[i++]->insert(Tuple{requested_range.first, requested_range.second});
columns[i++]->insert(file_segment_key);
columns[i++]->insert(file_segment_offset);
columns[i++]->insert(file_segment_size);
columns[i++]->insert(typeToString(cache_type));
columns[i++]->insert(read_from_cache_attempted);

View File

@ -39,6 +39,8 @@ struct FilesystemCacheLogElement
std::pair<size_t, size_t> file_segment_range{};
std::pair<size_t, size_t> requested_range{};
CacheType cache_type{};
std::string file_segment_key;
size_t file_segment_offset;
size_t file_segment_size;
bool read_from_cache_attempted;
String read_buffer_id;

View File

@ -371,7 +371,18 @@ BlockIO InterpreterSystemQuery::execute()
else
{
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name).cache;
cache->removeAllReleasable();
if (query.delete_key.empty())
{
cache->removeAllReleasable();
}
else
{
auto key = FileCacheKey::fromKeyString(query.delete_key);
if (query.delete_offset.has_value())
cache->removeFileSegment(key, query.delete_offset.value());
else
cache->removeKey(key);
}
}
break;
}

View File

@ -78,6 +78,8 @@ void StorageSystemFilesystemCache::fillDataImpl(MutableColumns & res_columns, Fi
{
if (fs::exists(path))
res_columns[i++]->insert(fs::file_size(path));
else
res_columns[i++]->insertDefault();
}
catch (...)
{

View File

@ -0,0 +1,2 @@
ok
ok

View File

@ -0,0 +1,49 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-parallel, no-s3-storage, no-random-settings, no-tsan, no-asan, no-ubsan, no-msan, no-debug
# set -x
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm --query """
DROP TABLE IF EXISTS test;
CREATE TABLE test (a Int32, b String)
ENGINE = MergeTree() ORDER BY tuple()
SETTINGS disk = disk(type = cache, max_size = '100Ki', path = ${CLICKHOUSE_TEST_UNIQUE_NAME}, delayed_cleanup_interval_ms = 10000000, disk = s3disk);
INSERT INTO test SELECT 1, 'test';
"""
query_id=$RANDOM
$CLICKHOUSE_CLIENT --query_id "$query_id" --query "SELECT * FROM test FORMAT Null SETTINGS enable_filesystem_cache_log = 1"
${CLICKHOUSE_CLIENT} -q "system flush logs"
key=$($CLICKHOUSE_CLIENT -nm --query """
SELECT key FROM system.filesystem_cache_log WHERE query_id = '$query_id' ORDER BY size DESC LIMIT 1;
""")
offset=$($CLICKHOUSE_CLIENT -nm --query """
SELECT offset FROM system.filesystem_cache_log WHERE query_id = '$query_id' ORDER BY size DESC LIMIT 1;
""")
path=$($CLICKHOUSE_CLIENT -nm --query """
SELECT cache_path FROM system.filesystem_cache WHERE key = '$key' AND file_segment_range_begin = $offset;
""")
rm $path
$CLICKHOUSE_CLIENT --query "SELECT * FROM test FORMAT Null SETTINGS enable_filesystem_cache_log = 1" 2>&1 | grep -f -q "File path does not exist" && echo 'ok' || echo 'fail'
CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=fatal/g')
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC FILESYSTEM CACHE" 2>&1 | grep -q "$key" && echo 'ok' || echo 'fail'
$CLICKHOUSE_CLIENT --query "SELECT * FROM test FORMAT Null"
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC FILESYSTEM CACHE"