Minor changes

This commit is contained in:
kssenii 2022-07-13 19:22:47 +02:00
parent d026a8360b
commit 615221f73b
4 changed files with 37 additions and 27 deletions

View File

@ -812,25 +812,36 @@ FileSegmentRangeWriter::FileSegmentRangeWriter(
FileSegments::iterator FileSegmentRangeWriter::allocateFileSegment(size_t offset, bool is_persistent)
{
/**
* Allocate a new file segment starting `offset`.
* File segment capacity will equal `max_file_segment_size`, but actual size is 0.
*/
std::lock_guard cache_lock(cache->mutex);
/// We set max_file_segment_size to be downloaded,
/// if we have less size to write, then file segment will be resized.
/// if we have less size to write, file segment will be resized in complete() method.
auto file_segment = cache->setDownloading(key, offset, cache->max_file_segment_size, is_persistent, cache_lock);
return file_segments_holder.add(std::move(file_segment));
}
void FileSegmentRangeWriter::completeFileSegment(const FileSegmentPtr & file_segment)
{
/**
* Complete file segment based on downaloaded size.
*/
/// File segment can be detached if space reservation failed.
if (file_segment->isDetached())
{
/// File segment can be detached if space reservation failed.
return;
}
if (file_segment->getDownloadedSize() > 0)
{
/// file_segment->complete(DOWNLOADED) is not enough, because file segment capacity
/// was initially set with a margin as `max_file_segment_size`. => We need to always
/// resize to actual size after download finished.
file_segment->getOrSetDownloader();
file_segment->complete(FileSegment::State::DOWNLOADED, true);
file_segment->complete(FileSegment::State::DOWNLOADED, /* auto_resize */true);
on_complete_file_segment_func(file_segment);
}
else
@ -842,6 +853,11 @@ void FileSegmentRangeWriter::completeFileSegment(const FileSegmentPtr & file_seg
bool FileSegmentRangeWriter::write(char * data, size_t size, size_t offset, bool is_persistent)
{
/**
* Write a range of file segments. Allocate file segment of `max_file_segment_size` and write to
* it until it is full and then allocate next file segment.
*/
if (finalized)
return false;

View File

@ -66,8 +66,6 @@ bool fileOrSymlinkPathStartsWith(const String & path, const String & prefix_path
size_t getSizeFromFileDescriptor(int fd, const String & file_name = "");
int getINodeNumberFromPath(const String & path);
std::optional<size_t> tryGetSizeFromFilePath(const String & path);
/// Get inode number for a file path.

View File

@ -72,10 +72,14 @@ void CachedWriteBufferFromFile::cacheData(char * data, size_t size)
Stopwatch watch(CLOCK_MONOTONIC);
bool cached;
try
{
cached = cache_writer->write(data, size, current_download_offset, is_persistent_cache_file);
if (!cache_writer->write(data, size, current_download_offset, is_persistent_cache_file))
{
/// No space left, disable caching.
stop_caching = true;
return;
}
}
catch (...)
{
@ -83,13 +87,6 @@ void CachedWriteBufferFromFile::cacheData(char * data, size_t size)
return;
}
if (!cached)
{
/// No space left, disable caching.
stop_caching = true;
return;
}
ProfileEvents::increment(ProfileEvents::CachedWriteBufferCacheWriteBytes, size);
ProfileEvents::increment(ProfileEvents::CachedWriteBufferCacheWriteMicroseconds, watch.elapsedMicroseconds());
}

View File

@ -1219,22 +1219,21 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
std::unordered_set<String> disk_names_wrapped_in_cache;
for (const auto & disk_ptr : disks)
{
defined_disk_names.insert(disk_ptr->getName());
if (disk_ptr->supportsCache())
{
auto caches = disk_ptr->getCacheLayersNames();
disk_names_wrapped_in_cache.insert(caches.begin(), caches.end());
}
}
for (const auto & [name, disk_ptr] : getContext()->getDisksMap())
for (const auto & [_, disk_ptr] : getContext()->getDisksMap())
{
/// In composable cache with the underlying source disk there might the following structure:
/// DiskObjectStorage(CachedObjectStorage(...(CachedObjectStored(ObjectStorage)...)))
/// In configuration file each of these layers has a different name, but data path
/// (getPath() result) is the same. We need to take it into account here.
if (disk_ptr->supportsCache())
{
auto caches = disk_ptr->getCacheLayersNames();
disk_names_wrapped_in_cache.insert(caches.begin(), caches.end());
disk_names_wrapped_in_cache.insert(name);
if (defined_disk_names.contains(disk_ptr->getName()))
{
auto caches = disk_ptr->getCacheLayersNames();
disk_names_wrapped_in_cache.insert(caches.begin(), caches.end());
}
}
}