Fix some review comments, fix after merge

This commit is contained in:
kssenii 2022-07-16 23:30:54 +02:00
parent 8ad26b3111
commit ca09c6c2c0
12 changed files with 80 additions and 88 deletions

View File

@ -415,7 +415,7 @@ void FileSegment::completeBatchAndResetDownloader()
cv.notify_all();
}
void FileSegment::complete(State state, bool auto_resize)
void FileSegment::completeWithState(State state, bool auto_resize)
{
std::lock_guard cache_lock(cache->mutex);
std::lock_guard segment_lock(mutex);
@ -444,6 +444,7 @@ void FileSegment::complete(State state, bool auto_resize)
if (auto_resize && downloaded_size != range().size())
{
LOG_TEST(log, "Resize cell {} to downloaded: {}", range().toString(), downloaded_size);
assert(downloaded_size <= range().size());
segment_range = Range(segment_range.left, segment_range.left + downloaded_size - 1);
}
@ -475,7 +476,7 @@ void FileSegment::complete(State state, bool auto_resize)
cv.notify_all();
}
void FileSegment::complete(std::lock_guard<std::mutex> & cache_lock)
void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cache_lock)
{
std::lock_guard segment_lock(mutex);
@ -484,10 +485,11 @@ void FileSegment::complete(std::lock_guard<std::mutex> & cache_lock)
assertNotDetached(segment_lock);
completeUnlocked(cache_lock, segment_lock);
completeBasedOnCurrentStateUnlocked(cache_lock, segment_lock);
}
void FileSegment::completeUnlocked(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock)
void FileSegment::completeBasedOnCurrentStateUnlocked(
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock)
{
bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock);
@ -776,7 +778,7 @@ FileSegmentsHolder::~FileSegmentsHolder()
/// under the same mutex, because complete() checks for segment pointers.
std::lock_guard cache_lock(cache->mutex);
file_segment->complete(cache_lock);
file_segment->completeBasedOnCurrentState(cache_lock);
file_segment_it = file_segments.erase(current_file_segment_it);
}
@ -802,7 +804,7 @@ String FileSegmentsHolder::toString()
FileSegmentRangeWriter::FileSegmentRangeWriter(
IFileCache * cache_,
const FileSegment::Key & key_,
std::function<void(const FileSegmentPtr & file_segment)> on_complete_file_segment_func_)
std::function<void(const FileSegment & file_segment)> on_complete_file_segment_func_)
: cache(cache_)
, key(key_)
, current_file_segment_it(file_segments_holder.file_segments.end())
@ -825,33 +827,33 @@ FileSegments::iterator FileSegmentRangeWriter::allocateFileSegment(size_t offset
return file_segments_holder.add(std::move(file_segment));
}
void FileSegmentRangeWriter::completeFileSegment(const FileSegmentPtr & file_segment)
void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment)
{
/**
* Complete file segment based on downaloaded size.
*/
/// File segment can be detached if space reservation failed.
if (file_segment->isDetached())
if (file_segment.isDetached())
return;
if (file_segment->getDownloadedSize() > 0)
if (file_segment.getDownloadedSize() > 0)
{
/// file_segment->complete(DOWNLOADED) is not enough, because file segment capacity
/// was initially set with a margin as `max_file_segment_size`. => We need to always
/// resize to actual size after download finished.
file_segment->getOrSetDownloader();
file_segment->complete(FileSegment::State::DOWNLOADED, /* auto_resize */true);
file_segment.getOrSetDownloader();
file_segment.completeWithState(FileSegment::State::DOWNLOADED, /* auto_resize */true);
on_complete_file_segment_func(file_segment);
}
else
{
std::lock_guard cache_lock(cache->mutex);
file_segment->complete(cache_lock);
file_segment.completeBasedOnCurrentState(cache_lock);
}
}
bool FileSegmentRangeWriter::write(char * data, size_t size, size_t offset, bool is_persistent)
bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset, bool is_persistent)
{
/**
* Write a range of file segments. Allocate file segment of `max_file_segment_size` and write to
@ -879,7 +881,7 @@ bool FileSegmentRangeWriter::write(char * data, size_t size, size_t offset, bool
if ((*current_file_segment_it)->getRemainingSizeToDownload() == 0)
{
completeFileSegment(*current_file_segment_it);
completeFileSegment(**current_file_segment_it);
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent);
}
else if ((*current_file_segment_it)->getDownloadOffset() != offset)
@ -891,34 +893,28 @@ bool FileSegmentRangeWriter::write(char * data, size_t size, size_t offset, bool
}
}
(*current_file_segment_it)->getOrSetDownloader();
auto & file_segment = *current_file_segment_it;
file_segment->getOrSetDownloader();
SCOPE_EXIT({
file_segment->resetDownloader();
});
try
bool reserved = file_segment->reserve(size);
if (!reserved)
{
bool reserved = (*current_file_segment_it)->reserve(size);
if (!reserved)
{
(*current_file_segment_it)->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
on_complete_file_segment_func(*current_file_segment_it);
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
on_complete_file_segment_func(*file_segment);
LOG_DEBUG(
&Poco::Logger::get("FileSegmentRangeWriter"),
"Unsuccessful space reservation attempt (size: {}, file segment info: {}",
size, (*current_file_segment_it)->getInfoForLog());
LOG_DEBUG(
&Poco::Logger::get("FileSegmentRangeWriter"),
"Unsuccessful space reservation attempt (size: {}, file segment info: {}",
size, file_segment->getInfoForLog());
return false;
}
(*current_file_segment_it)->write(data, size, offset);
current_file_segment_write_offset += size;
}
catch (...)
{
(*current_file_segment_it)->resetDownloader();
throw;
return false;
}
(*current_file_segment_it)->resetDownloader();
(*current_file_segment_it)->write(data, size, offset);
current_file_segment_write_offset += size;
return true;
}
@ -932,7 +928,7 @@ void FileSegmentRangeWriter::finalize()
if (file_segments.empty() || current_file_segment_it == file_segments.end())
return;
completeFileSegment(*current_file_segment_it);
completeFileSegment(**current_file_segment_it);
finalized = true;
}

View File

@ -139,7 +139,7 @@ public:
void completeBatchAndResetDownloader();
void complete(State state, bool auto_resize = false);
void completeWithState(State state, bool auto_resize = false);
String getInfoForLog() const;
@ -192,8 +192,8 @@ private:
/// FileSegmentsHolder. complete() might check if the caller of the method
/// is the last alive holder of the segment. Therefore, complete() and destruction
/// of the file segment pointer must be done under the same cache mutex.
void complete(std::lock_guard<std::mutex> & cache_lock);
void completeUnlocked(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock);
void completeBasedOnCurrentState(std::lock_guard<std::mutex> & cache_lock);
void completeBasedOnCurrentStateUnlocked(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock);
void completeImpl(
std::lock_guard<std::mutex> & cache_lock,
@ -279,17 +279,19 @@ public:
FileSegmentRangeWriter(
IFileCache * cache_,
const FileSegment::Key & key_,
std::function<void(const FileSegmentPtr & file_segment)> on_complete_file_segment_func_);
/// A callback which is called right after each file segment is completed.
/// It is used to write into filesystem cache log.
std::function<void(const FileSegment & file_segment)> on_complete_file_segment_func_);
~FileSegmentRangeWriter();
bool write(char * data, size_t size, size_t offset, bool is_persistent);
bool write(const char * data, size_t size, size_t offset, bool is_persistent);
void finalize();
private:
FileSegments::iterator allocateFileSegment(size_t offset, bool is_persistent);
void completeFileSegment(const FileSegmentPtr & file_segment);
void completeFileSegment(FileSegment & file_segment);
IFileCache * cache;
FileSegment::Key key;
@ -300,7 +302,8 @@ private:
size_t current_file_segment_write_offset = 0;
bool finalized = false;
std::function<void(const FileSegmentPtr & file_segment)> on_complete_file_segment_func;
std::function<void(const FileSegment & file_segment)> on_complete_file_segment_func;
};
}

View File

@ -80,7 +80,7 @@ void complete(const DB::FileSegmentsHolder & holder)
{
ASSERT_TRUE(file_segment->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(file_segment);
file_segment->complete(DB::FileSegment::State::DOWNLOADED);
file_segment->completeWithState(DB::FileSegment::State::DOWNLOADED);
}
}
@ -125,7 +125,7 @@ TEST(LRUFileCache, get)
assertRange(2, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADING);
download(segments[0]);
segments[0]->complete(DB::FileSegment::State::DOWNLOADED);
segments[0]->completeWithState(DB::FileSegment::State::DOWNLOADED);
assertRange(3, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED);
}
@ -146,7 +146,7 @@ TEST(LRUFileCache, get)
ASSERT_TRUE(segments[1]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(segments[1]);
segments[1]->complete(DB::FileSegment::State::DOWNLOADED);
segments[1]->completeWithState(DB::FileSegment::State::DOWNLOADED);
assertRange(6, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED);
}
@ -203,7 +203,7 @@ TEST(LRUFileCache, get)
ASSERT_TRUE(segments[2]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(segments[2]);
segments[2]->complete(DB::FileSegment::State::DOWNLOADED);
segments[2]->completeWithState(DB::FileSegment::State::DOWNLOADED);
assertRange(14, segments[3], DB::FileSegment::Range(17, 20), DB::FileSegment::State::DOWNLOADED);
@ -244,7 +244,7 @@ TEST(LRUFileCache, get)
ASSERT_TRUE(segments[3]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(segments[3]);
segments[3]->complete(DB::FileSegment::State::DOWNLOADED);
segments[3]->completeWithState(DB::FileSegment::State::DOWNLOADED);
ASSERT_TRUE(segments[3]->state() == DB::FileSegment::State::DOWNLOADED);
}
@ -267,8 +267,8 @@ TEST(LRUFileCache, get)
ASSERT_TRUE(segments[2]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(segments[0]);
prepareAndDownload(segments[2]);
segments[0]->complete(DB::FileSegment::State::DOWNLOADED);
segments[2]->complete(DB::FileSegment::State::DOWNLOADED);
segments[0]->completeWithState(DB::FileSegment::State::DOWNLOADED);
segments[2]->completeWithState(DB::FileSegment::State::DOWNLOADED);
}
/// Current cache: [____][_] [][___][__]
@ -290,8 +290,8 @@ TEST(LRUFileCache, get)
ASSERT_TRUE(s1[0]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(s5[0]);
prepareAndDownload(s1[0]);
s5[0]->complete(DB::FileSegment::State::DOWNLOADED);
s1[0]->complete(DB::FileSegment::State::DOWNLOADED);
s5[0]->completeWithState(DB::FileSegment::State::DOWNLOADED);
s1[0]->completeWithState(DB::FileSegment::State::DOWNLOADED);
/// Current cache: [___] [_][___][_] [__]
/// ^ ^ ^ ^ ^ ^ ^ ^
@ -393,7 +393,7 @@ TEST(LRUFileCache, get)
}
prepareAndDownload(segments[2]);
segments[2]->complete(DB::FileSegment::State::DOWNLOADED);
segments[2]->completeWithState(DB::FileSegment::State::DOWNLOADED);
ASSERT_TRUE(segments[2]->state() == DB::FileSegment::State::DOWNLOADED);
other_1.join();
@ -458,7 +458,7 @@ TEST(LRUFileCache, get)
ASSERT_TRUE(segments_2[1]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(segments_2[1]);
segments_2[1]->complete(DB::FileSegment::State::DOWNLOADED);
segments_2[1]->completeWithState(DB::FileSegment::State::DOWNLOADED);
});
{

View File

@ -621,7 +621,7 @@ void CachedReadBufferFromFile::predownload(FileSegmentPtr & file_segment)
else
{
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
continue_predownload = false;
}
@ -645,7 +645,7 @@ void CachedReadBufferFromFile::predownload(FileSegmentPtr & file_segment)
/// TODO: allow seek more than once with seek avoiding.
bytes_to_predownload = 0;
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
@ -975,7 +975,7 @@ bool CachedReadBufferFromFile::nextImplStep()
else
{
LOG_DEBUG(log, "No space left in cache, will continue without cache download");
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
}
if (!success)

View File

@ -67,7 +67,7 @@ void CachedWriteBufferFromFile::cacheData(char * data, size_t size)
if (!cache_writer)
{
cache_writer = std::make_unique<FileSegmentRangeWriter>(
cache.get(), key, [this](const FileSegmentPtr & file_segment) { appendFilesystemCacheLog(file_segment); });
cache.get(), key, [this](const FileSegment & file_segment) { appendFilesystemCacheLog(file_segment); });
}
Stopwatch watch(CLOCK_MONOTONIC);
@ -91,9 +91,9 @@ void CachedWriteBufferFromFile::cacheData(char * data, size_t size)
ProfileEvents::increment(ProfileEvents::CachedWriteBufferCacheWriteMicroseconds, watch.elapsedMicroseconds());
}
void CachedWriteBufferFromFile::appendFilesystemCacheLog(const FileSegmentPtr & file_segment)
void CachedWriteBufferFromFile::appendFilesystemCacheLog(const FileSegment & file_segment)
{
auto file_segment_range = file_segment->range();
auto file_segment_range = file_segment.range();
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),

View File

@ -28,7 +28,7 @@ public:
private:
void cacheData(char * data, size_t size);
void appendFilesystemCacheLog(const FileSegmentPtr & file_segment);
void appendFilesystemCacheLog(const FileSegment & file_segment);
FileCachePtr cache;
String source_path;

View File

@ -49,15 +49,15 @@ std::string CachedObjectStorage::generateBlobNameForPath(const std::string & pat
return object_storage->generateBlobNameForPath(path);
}
ReadSettings CachedObjectStorage::getReadSettingsForCache(const ReadSettings & read_settings) const
ReadSettings CachedObjectStorage::patchSettings(const ReadSettings & read_settings) const
{
ReadSettings result_settings{read_settings};
result_settings.remote_fs_cache = cache;
ReadSettings modified_settings{read_settings};
modified_settings.remote_fs_cache = cache;
if (IFileCache::isReadOnly())
result_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true;
modified_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true;
return result_settings;
return IObjectStorage::patchSettings(modified_settings);
}
void CachedObjectStorage::startup()
@ -83,7 +83,7 @@ std::unique_ptr<ReadBufferFromFileBase> CachedObjectStorage::readObjects( /// NO
{
assert(!objects[0].getPathKeyForCache().empty());
auto modified_read_settings = getReadSettingsForCache(read_settings);
auto modified_read_settings = patchSettings(read_settings);
auto impl = object_storage->readObjects(objects, modified_read_settings, read_hint, file_size);
/// If underlying read buffer does caching on its own, do not wrap it in caching buffer.
@ -129,7 +129,7 @@ std::unique_ptr<ReadBufferFromFileBase> CachedObjectStorage::readObject( /// NOL
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
auto modified_read_settings = getReadSettingsForCache(read_settings);
auto modified_read_settings = patchSettings(read_settings);
auto impl = object_storage->readObject(object, read_settings, read_hint, file_size);
/// If underlying read buffer does caching on its own, do not wrap it in caching buffer.
@ -174,10 +174,11 @@ std::unique_ptr<WriteBufferFromFileBase> CachedObjectStorage::writeObject( /// N
size_t buf_size,
const WriteSettings & write_settings)
{
auto impl = object_storage->writeObject(object, mode, attributes, std::move(finalize_callback), buf_size, write_settings);
auto modified_write_settings = IObjectStorage::patchSettings(write_settings);
auto impl = object_storage->writeObject(object, mode, attributes, std::move(finalize_callback), buf_size, modified_write_settings);
bool cache_on_write = fs::path(object.absolute_path).extension() != ".tmp"
&& write_settings.enable_filesystem_cache_on_write_operations
&& modified_write_settings.enable_filesystem_cache_on_write_operations
&& FileCacheFactory::instance().getSettings(cache->getBasePath()).cache_on_write_operations;
auto cache_hint = object.getPathKeyForCache();
@ -193,9 +194,9 @@ std::unique_ptr<WriteBufferFromFileBase> CachedObjectStorage::writeObject( /// N
cache,
impl->getFileName(),
key,
write_settings.is_file_cache_persistent,
modified_write_settings.is_file_cache_persistent,
CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() ? CurrentThread::getQueryId().toString() : "",
write_settings);
modified_write_settings);
}
return impl;

View File

@ -101,8 +101,10 @@ public:
private:
IFileCache::Key getCacheKey(const std::string & path) const;
String getCachePath(const std::string & path) const;
ReadSettings getReadSettingsForCache(const ReadSettings & read_settings) const;
ReadSettings patchSettings(const ReadSettings & read_settings) const override;
ObjectStoragePtr object_storage;
FileCachePtr cache;

View File

@ -447,15 +447,7 @@ bool DiskObjectStorage::isReadOnly() const
DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
{
return std::make_shared<DiskObjectStorage>(
getName(),
object_storage_root_path,
getName(),
metadata_storage,
object_storage,
disk_type,
send_metadata,
threadpool_size);
return std::static_pointer_cast<DiskObjectStorage>(shared_from_this());
}
void DiskObjectStorage::wrapWithCache(FileCachePtr cache, const String & layer_name)

View File

@ -173,8 +173,8 @@ protected:
void applyRemoteThrottlingSettings(ContextPtr context);
/// Should be used by implementation of read* and write* methods
ReadSettings patchSettings(const ReadSettings & read_settings) const;
WriteSettings patchSettings(const WriteSettings & write_settings) const;
virtual ReadSettings patchSettings(const ReadSettings & read_settings) const;
virtual WriteSettings patchSettings(const WriteSettings & write_settings) const;
private:
mutable std::mutex throttlers_mutex;

View File

@ -166,7 +166,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
std::optional<ObjectAttributes> attributes,
FinalizeCallback && finalize_callback,
size_t buf_size,
const WriteSettings & /* write_settings */)
const WriteSettings & write_settings)
{
WriteSettings disk_write_settings = IObjectStorage::patchSettings(write_settings);

View File

@ -134,8 +134,6 @@ public:
ContextPtr context) override;
private:
ReadSettings patchSettings(const ReadSettings & read_settings) const;
void setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_);
void setNewClient(std::unique_ptr<Aws::S3::S3Client> && client_);