ClickHouse/src/Common/FileSegment.cpp

969 lines
30 KiB
C++
Raw Normal View History

2022-01-22 22:56:24 +00:00
#include "FileSegment.h"
2022-05-14 12:26:04 +00:00
#include <base/getThreadId.h>
2022-05-16 20:09:11 +00:00
#include <base/scope_guard.h>
2022-05-14 12:26:04 +00:00
#include <Common/logger_useful.h>
#include <Common/FileCache.h>
#include <Common/hex.h>
2022-02-02 14:25:25 +00:00
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <filesystem>
2022-05-14 12:26:04 +00:00
2022-05-03 17:17:54 +00:00
namespace CurrentMetrics
{
extern const Metric CacheDetachedFileSegments;
}
2022-01-22 22:56:24 +00:00
namespace DB
{
namespace ErrorCodes
{
2022-02-18 15:38:23 +00:00
extern const int REMOTE_FS_OBJECT_CACHE_ERROR;
extern const int LOGICAL_ERROR;
2022-01-22 22:56:24 +00:00
}
FileSegment::FileSegment(
size_t offset_,
size_t size_,
2022-01-23 16:51:18 +00:00
const Key & key_,
2022-08-10 05:50:30 +00:00
FileCache * cache_,
State download_state_,
bool is_persistent_)
2022-01-22 22:56:24 +00:00
: segment_range(offset_, offset_ + size_ - 1)
, download_state(download_state_)
, file_key(key_)
, cache(cache_)
2022-02-18 15:38:23 +00:00
#ifndef NDEBUG
, log(&Poco::Logger::get(fmt::format("FileSegment({}) : {}", getHexUIntLowercase(key_), range().toString())))
2022-02-18 15:38:23 +00:00
#else
, log(&Poco::Logger::get("FileSegment"))
#endif
, is_persistent(is_persistent_)
2022-01-22 22:56:24 +00:00
{
/// On creation, file segment state can be EMPTY, DOWNLOADED, DOWNLOADING.
switch (download_state)
{
/// EMPTY is used when file segment is not in cache and
/// someone will _potentially_ want to download it (after calling getOrSetDownloader()).
case (State::EMPTY):
{
break;
}
2022-03-21 20:20:15 +00:00
/// DOWNLOADED is used either on initial cache metadata load into memory on server startup
/// or on reduceSizeToDownloaded() -- when file segment object is updated.
case (State::DOWNLOADED):
{
reserved_size = downloaded_size = size_;
2022-08-24 12:16:53 +00:00
is_downloaded = true;
break;
}
2022-05-25 08:54:28 +00:00
case (State::SKIP_CACHE):
{
break;
}
default:
{
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Can create cell with either EMPTY, DOWNLOADED, DOWNLOADING state");
}
}
2022-01-22 22:56:24 +00:00
}
2022-01-23 16:51:18 +00:00
FileSegment::State FileSegment::state() const
2022-01-22 22:56:24 +00:00
{
std::lock_guard segment_lock(mutex);
2022-01-23 16:51:18 +00:00
return download_state;
2022-01-22 22:56:24 +00:00
}
2022-02-18 15:38:23 +00:00
size_t FileSegment::getDownloadOffset() const
2022-01-22 22:56:24 +00:00
{
std::lock_guard segment_lock(mutex);
2022-03-14 16:33:29 +00:00
return range().left + getDownloadedSize(segment_lock);
}
2022-04-07 16:46:46 +00:00
size_t FileSegment::getDownloadedSize() const
{
std::lock_guard segment_lock(mutex);
return getDownloadedSize(segment_lock);
}
2022-06-02 15:43:37 +00:00
size_t FileSegment::getRemainingSizeToDownload() const
2022-05-14 12:26:04 +00:00
{
std::lock_guard segment_lock(mutex);
2022-08-24 15:44:09 +00:00
return range().size() - getDownloadedSize(segment_lock);
2022-05-14 12:26:04 +00:00
}
2022-06-03 13:24:42 +00:00
bool FileSegment::isDetached() const
{
std::lock_guard segment_lock(mutex);
return is_detached;
}
2022-03-14 16:33:29 +00:00
size_t FileSegment::getDownloadedSize(std::lock_guard<std::mutex> & /* segment_lock */) const
{
if (download_state == State::DOWNLOADED)
return downloaded_size;
std::lock_guard download_lock(download_mutex);
return downloaded_size;
2022-01-22 22:56:24 +00:00
}
String FileSegment::getCallerId()
2022-03-08 09:58:37 +00:00
{
2022-04-07 16:46:46 +00:00
if (!CurrentThread::isInitialized()
|| !CurrentThread::get().getQueryContext()
|| CurrentThread::getQueryId().empty())
2022-04-07 16:46:46 +00:00
return "None:" + toString(getThreadId());
2022-01-22 22:56:24 +00:00
return std::string(CurrentThread::getQueryId()) + ":" + toString(getThreadId());
2022-01-22 22:56:24 +00:00
}
String FileSegment::getOrSetDownloader()
{
std::lock_guard segment_lock(mutex);
2022-05-03 11:15:27 +00:00
assertNotDetached(segment_lock);
2022-04-11 15:51:49 +00:00
2022-01-22 22:56:24 +00:00
if (downloader_id.empty())
{
2022-03-09 17:14:28 +00:00
assert(download_state != State::DOWNLOADING);
2022-02-01 19:10:56 +00:00
if (download_state != State::EMPTY
&& download_state != State::PARTIALLY_DOWNLOADED)
2022-03-09 17:14:28 +00:00
return "None";
2022-02-01 19:10:56 +00:00
2022-01-22 22:56:24 +00:00
downloader_id = getCallerId();
download_state = State::DOWNLOADING;
}
else if (downloader_id == getCallerId())
2022-03-09 09:36:52 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Attempt to set the same downloader for segment {} for the second time", range().toString());
2022-01-22 22:56:24 +00:00
return downloader_id;
}
2022-03-06 19:33:07 +00:00
void FileSegment::resetDownloader()
{
std::lock_guard segment_lock(mutex);
2022-05-03 11:15:27 +00:00
assertNotDetached(segment_lock);
2022-03-06 19:33:07 +00:00
if (downloader_id.empty())
2022-03-09 09:36:52 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "There is no downloader");
2022-03-06 19:33:07 +00:00
if (getCallerId() != downloader_id)
2022-03-09 09:36:52 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Downloader can be reset only by downloader");
resetDownloaderImpl(segment_lock);
}
2022-03-06 19:33:07 +00:00
2022-03-09 09:36:52 +00:00
void FileSegment::resetDownloaderImpl(std::lock_guard<std::mutex> & segment_lock)
{
2022-08-24 15:44:09 +00:00
if (getDownloadedSize(segment_lock) == range().size())
2022-03-07 21:03:12 +00:00
setDownloaded(segment_lock);
else
download_state = State::PARTIALLY_DOWNLOADED;
2022-03-06 19:33:07 +00:00
downloader_id.clear();
}
String FileSegment::getDownloader() const
{
std::lock_guard segment_lock(mutex);
2022-01-22 22:56:24 +00:00
return downloader_id;
}
bool FileSegment::isDownloader() const
{
std::lock_guard segment_lock(mutex);
return getCallerId() == downloader_id;
}
2022-04-15 09:55:05 +00:00
bool FileSegment::isDownloaderImpl(std::lock_guard<std::mutex> & /* segment_lock */) const
2022-04-14 11:17:04 +00:00
{
return getCallerId() == downloader_id;
}
FileSegment::RemoteFileReaderPtr FileSegment::getRemoteFileReader()
{
if (!isDownloader())
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Only downloader can use remote filesystem file reader");
return remote_file_reader;
}
2022-08-16 12:13:12 +00:00
FileSegment::RemoteFileReaderPtr FileSegment::extractRemoteFileReader()
{
std::lock_guard cache_lock(cache->mutex);
std::lock_guard segment_lock(mutex);
2022-08-16 15:08:12 +00:00
if (!is_detached)
{
bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock);
if (!downloader_id.empty() || !is_last_holder)
return nullptr;
}
2022-08-16 12:13:12 +00:00
LOG_TRACE(log, "Extracted reader from file segment");
return std::move(remote_file_reader);
}
void FileSegment::setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_)
{
if (!isDownloader())
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Only downloader can use remote filesystem file reader");
if (remote_file_reader)
2022-04-26 10:57:58 +00:00
throw Exception(ErrorCodes::LOGICAL_ERROR, "Remote file reader already exists");
remote_file_reader = remote_file_reader_;
}
2022-04-26 10:57:58 +00:00
void FileSegment::resetRemoteFileReader()
2022-03-17 19:29:07 +00:00
{
if (!isDownloader())
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Only downloader can use remote filesystem file reader");
if (!remote_file_reader)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Remote file reader does not exist");
2022-04-26 10:57:58 +00:00
remote_file_reader.reset();
2022-03-17 19:29:07 +00:00
}
2022-06-03 13:24:42 +00:00
void FileSegment::write(const char * from, size_t size, size_t offset_)
2022-01-22 22:56:24 +00:00
{
2022-01-23 16:51:18 +00:00
if (!size)
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing zero size is not allowed");
2022-01-23 16:51:18 +00:00
2022-01-24 22:07:02 +00:00
if (availableSize() < size)
2022-01-22 22:56:24 +00:00
throw Exception(
2022-02-18 15:38:23 +00:00
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
2022-01-24 22:07:02 +00:00
"Not enough space is reserved. Available: {}, expected: {}", availableSize(), size);
2022-01-22 22:56:24 +00:00
2022-06-02 15:43:37 +00:00
if (!isDownloader())
2022-08-24 15:44:09 +00:00
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Only downloader can do the downloading. (CallerId: {}, DownloaderId: {})",
getCallerId(), downloader_id);
2022-01-23 16:51:18 +00:00
2022-08-24 15:44:09 +00:00
if (getDownloadedSize() == range().size())
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Attempt to write {} bytes to offset: {}, but current file segment is already fully downloaded",
size, offset_);
2022-03-17 16:50:51 +00:00
2022-03-14 16:33:29 +00:00
auto download_offset = range().left + downloaded_size;
if (offset_ != download_offset)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
2022-03-17 16:50:51 +00:00
"Attempt to write {} bytes to offset: {}, but current download offset is {}",
2022-03-14 16:33:29 +00:00
size, offset_, download_offset);
2022-04-30 20:02:44 +00:00
{
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
}
2022-04-11 15:51:49 +00:00
if (!cache_writer)
2022-01-22 22:56:24 +00:00
{
2022-03-14 16:33:29 +00:00
if (downloaded_size > 0)
2022-03-17 16:50:51 +00:00
throw Exception(ErrorCodes::LOGICAL_ERROR,
2022-03-18 09:16:06 +00:00
"Cache writer was finalized (downloaded size: {}, state: {})",
2022-03-17 16:50:51 +00:00
downloaded_size, stateToString(download_state));
2022-03-14 16:33:29 +00:00
2022-06-15 11:39:00 +00:00
auto download_path = getPathInLocalCache();
cache_writer = std::make_unique<WriteBufferFromFile>(download_path);
2022-01-22 22:56:24 +00:00
}
2022-02-02 14:25:25 +00:00
try
{
cache_writer->write(from, size);
2022-03-14 16:33:29 +00:00
std::lock_guard download_lock(download_mutex);
2022-02-02 14:25:25 +00:00
cache_writer->next();
2022-03-14 16:33:29 +00:00
downloaded_size += size;
2022-02-02 14:25:25 +00:00
}
2022-03-17 19:29:07 +00:00
catch (Exception & e)
2022-02-02 14:25:25 +00:00
{
2022-02-18 15:38:23 +00:00
std::lock_guard segment_lock(mutex);
2022-04-07 16:46:46 +00:00
wrapWithCacheInfo(e, "while writing into cache", segment_lock);
2022-03-17 19:29:07 +00:00
2022-04-07 16:46:46 +00:00
setDownloadFailed(segment_lock);
2022-03-10 09:56:48 +00:00
cv.notify_all();
2022-02-18 15:38:23 +00:00
2022-02-02 14:25:25 +00:00
throw;
}
2022-03-16 13:29:21 +00:00
2022-04-12 08:52:37 +00:00
assert(getDownloadOffset() == offset_ + size);
2022-01-22 22:56:24 +00:00
}
2022-06-15 11:39:00 +00:00
String FileSegment::getPathInLocalCache() const
{
return cache->getPathInLocalCache(key(), offset(), isPersistent());
}
2022-01-22 22:56:24 +00:00
FileSegment::State FileSegment::wait()
{
std::unique_lock segment_lock(mutex);
if (is_detached)
2022-05-06 12:22:01 +00:00
throw Exception(
2022-05-10 17:50:43 +00:00
ErrorCodes::LOGICAL_ERROR,
2022-05-11 21:33:15 +00:00
"Cache file segment is in detached state, operation not allowed");
2022-05-03 11:15:27 +00:00
2022-03-06 19:33:07 +00:00
if (downloader_id.empty())
return download_state;
2022-01-22 22:56:24 +00:00
if (download_state == State::EMPTY)
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cannot wait on a file segment with empty state");
2022-01-22 22:56:24 +00:00
if (download_state == State::DOWNLOADING)
{
LOG_TEST(log, "{} waiting on: {}, current downloader: {}", getCallerId(), range().toString(), downloader_id);
2022-01-26 09:35:46 +00:00
2022-02-01 19:10:56 +00:00
assert(!downloader_id.empty());
assert(downloader_id != getCallerId());
2022-01-26 09:35:46 +00:00
2022-02-21 12:54:03 +00:00
cv.wait_for(segment_lock, std::chrono::seconds(60));
2022-01-22 22:56:24 +00:00
}
return download_state;
}
bool FileSegment::reserve(size_t size)
{
if (!size)
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Zero space reservation is not allowed");
2022-01-22 22:56:24 +00:00
2022-02-24 14:20:51 +00:00
{
std::lock_guard segment_lock(mutex);
2022-04-30 20:02:44 +00:00
assertNotDetached(segment_lock);
2022-01-22 22:56:24 +00:00
2022-03-14 16:33:29 +00:00
auto caller_id = getCallerId();
2022-05-02 15:56:05 +00:00
bool is_downloader = caller_id == downloader_id;
2022-06-02 15:43:37 +00:00
if (!is_downloader)
2022-05-11 08:45:20 +00:00
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Space can be reserved only by downloader (current: {}, expected: {})",
caller_id, downloader_id);
}
2022-03-14 16:33:29 +00:00
2022-08-24 15:44:09 +00:00
size_t current_downloaded_size = getDownloadedSize(segment_lock);
if (current_downloaded_size + size > range().size())
{
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Attempt to reserve space too much space: {} ({})",
size, getInfoForLogImpl(segment_lock));
}
2022-01-22 22:56:24 +00:00
2022-08-24 15:44:09 +00:00
assert(reserved_size >= current_downloaded_size);
2022-02-24 14:20:51 +00:00
}
2022-01-22 22:56:24 +00:00
/**
* It is possible to have downloaded_size < reserved_size when reserve is called
* in case previous downloader did not fully download current file_segment
* and the caller is going to continue;
*/
2022-08-24 15:44:09 +00:00
size_t current_downloaded_size = getDownloadedSize();
assert(reserved_size >= current_downloaded_size);
size_t free_space = reserved_size - current_downloaded_size;
2022-01-22 22:56:24 +00:00
2022-08-24 15:44:09 +00:00
bool reserved = free_space >= size;
if (!reserved)
2022-05-05 22:07:22 +00:00
{
2022-08-24 15:44:09 +00:00
std::lock_guard cache_lock(cache->mutex);
size_t size_to_reserve = size - free_space;
reserved = cache->tryReserve(key(), offset(), size_to_reserve, cache_lock);
if (reserved)
{
std::lock_guard segment_lock(mutex);
2022-08-25 11:02:41 +00:00
reserved_size += size_to_reserve;
2022-08-24 15:44:09 +00:00
}
2022-05-05 22:07:22 +00:00
}
2022-01-22 22:56:24 +00:00
return reserved;
}
2022-08-24 15:44:09 +00:00
bool FileSegment::isDownloaded() const
{
std::lock_guard segment_lock(mutex);
return isDownloadedUnlocked(segment_lock);
}
bool FileSegment::isDownloadedUnlocked(std::lock_guard<std::mutex> & /* segment_lock */) const
{
return is_downloaded;
}
void FileSegment::setDownloaded([[maybe_unused]] std::lock_guard<std::mutex> & segment_lock)
2022-02-23 10:12:14 +00:00
{
2022-04-14 11:17:04 +00:00
if (is_downloaded)
return;
2022-04-07 16:46:46 +00:00
downloader_id.clear();
if (cache_writer)
{
cache_writer->finalize();
cache_writer.reset();
remote_file_reader.reset();
}
2022-08-24 15:44:09 +00:00
download_state = State::DOWNLOADED;
is_downloaded = true;
assert(getDownloadedSize(segment_lock) > 0);
assert(std::filesystem::file_size(getPathInLocalCache()) > 0);
2022-04-07 16:46:46 +00:00
}
void FileSegment::setDownloadFailed(std::lock_guard<std::mutex> & /* segment_lock */)
{
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
downloader_id.clear();
2022-02-23 10:12:14 +00:00
if (cache_writer)
{
cache_writer->finalize();
cache_writer.reset();
2022-03-11 11:57:57 +00:00
remote_file_reader.reset();
2022-02-23 10:12:14 +00:00
}
}
void FileSegment::completeBatchAndResetDownloader()
2022-01-22 22:56:24 +00:00
{
2022-02-23 13:43:40 +00:00
std::lock_guard segment_lock(mutex);
2022-05-02 22:27:35 +00:00
assertNotDetached(segment_lock);
2022-04-14 11:17:04 +00:00
if (!isDownloaderImpl(segment_lock))
2022-01-22 22:56:24 +00:00
{
2022-02-23 13:43:40 +00:00
cv.notify_all();
2022-04-11 15:51:49 +00:00
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"File segment can be completed only by downloader ({} != {})",
downloader_id, getCallerId());
2022-02-23 13:43:40 +00:00
}
2022-01-22 22:56:24 +00:00
2022-03-09 09:36:52 +00:00
resetDownloaderImpl(segment_lock);
2022-02-23 13:43:40 +00:00
2022-08-24 15:44:09 +00:00
LOG_TEST(log, "Complete batch. Current downloaded size: {}", getDownloadedSize(segment_lock));
2022-01-26 09:35:46 +00:00
cv.notify_all();
}
2022-08-19 18:13:46 +00:00
void FileSegment::completeWithState(State state)
2022-01-26 09:35:46 +00:00
{
2022-03-17 19:29:07 +00:00
std::lock_guard cache_lock(cache->mutex);
std::lock_guard segment_lock(mutex);
2022-03-01 16:00:54 +00:00
2022-04-30 20:02:44 +00:00
assertNotDetached(segment_lock);
2022-04-25 19:57:13 +00:00
2022-08-19 18:13:46 +00:00
auto caller_id = getCallerId();
if (caller_id != downloader_id)
2022-02-23 13:43:40 +00:00
{
2022-08-19 18:13:46 +00:00
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"File segment completion can be done only by downloader. (CallerId: {}, downloader id: {}",
caller_id, downloader_id);
2022-03-17 19:29:07 +00:00
}
2022-02-15 09:11:33 +00:00
2022-03-17 19:29:07 +00:00
if (state != State::DOWNLOADED
&& state != State::PARTIALLY_DOWNLOADED
&& state != State::PARTIALLY_DOWNLOADED_NO_CONTINUATION)
{
cv.notify_all();
2022-08-19 18:13:46 +00:00
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Cannot complete file segment with state: {}", stateToString(state));
2022-05-25 14:49:40 +00:00
}
2022-03-17 19:29:07 +00:00
download_state = state;
2022-08-19 18:13:46 +00:00
completeBasedOnCurrentState(cache_lock, segment_lock);
2022-01-24 22:07:02 +00:00
}
2022-01-22 22:56:24 +00:00
2022-08-19 18:13:46 +00:00
void FileSegment::completeWithoutState(std::lock_guard<std::mutex> & cache_lock)
2022-01-24 22:07:02 +00:00
{
2022-03-17 19:29:07 +00:00
std::lock_guard segment_lock(mutex);
2022-08-19 18:13:46 +00:00
completeBasedOnCurrentState(cache_lock, segment_lock);
2022-04-27 18:01:59 +00:00
}
2022-01-23 16:51:18 +00:00
2022-08-19 18:13:46 +00:00
void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock)
2022-04-27 18:01:59 +00:00
{
2022-08-19 18:13:46 +00:00
if (is_detached)
2022-03-17 19:29:07 +00:00
return;
2022-01-22 22:56:24 +00:00
2022-08-19 18:13:46 +00:00
bool is_downloader = isDownloaderImpl(segment_lock);
2022-03-22 09:39:58 +00:00
bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock);
2022-08-19 18:13:46 +00:00
bool can_update_segment_state = is_downloader || is_last_holder;
size_t current_downloaded_size = getDownloadedSize(segment_lock);
2022-02-15 10:27:44 +00:00
2022-08-22 10:00:14 +00:00
SCOPE_EXIT({
if (is_downloader)
{
cv.notify_all();
}
});
2022-08-19 18:13:46 +00:00
LOG_TEST(log, "Complete without state (is_last_holder: {}). File segment info: {}", is_last_holder, getInfoForLogImpl(segment_lock));
if (can_update_segment_state)
2022-01-24 22:07:02 +00:00
{
2022-08-19 18:13:46 +00:00
if (current_downloaded_size == range().size())
setDownloaded(segment_lock);
2022-03-29 12:08:24 +00:00
else
2022-08-19 18:13:46 +00:00
download_state = State::PARTIALLY_DOWNLOADED;
2022-01-23 16:51:18 +00:00
2022-08-19 18:13:46 +00:00
resetDownloaderImpl(segment_lock);
2022-01-22 22:56:24 +00:00
}
2022-08-19 18:13:46 +00:00
switch (download_state)
2022-01-26 09:35:46 +00:00
{
2022-08-19 18:13:46 +00:00
case State::SKIP_CACHE:
{
if (is_last_holder)
cache->remove(key(), offset(), cache_lock, segment_lock);
return;
}
case State::DOWNLOADED:
{
2022-08-24 15:44:09 +00:00
assert(getDownloadedSize(segment_lock) == range().size());
assert(isDownloadedUnlocked(segment_lock));
2022-08-19 18:13:46 +00:00
break;
}
case State::DOWNLOADING:
case State::EMPTY:
{
assert(!is_last_holder);
break;
}
case State::PARTIALLY_DOWNLOADED:
case State::PARTIALLY_DOWNLOADED_NO_CONTINUATION:
{
if (is_last_holder)
{
if (current_downloaded_size == 0)
{
LOG_TEST(log, "Remove cell {} (nothing downloaded)", range().toString());
download_state = State::SKIP_CACHE;
cache->remove(key(), offset(), cache_lock, segment_lock);
}
else
{
LOG_TEST(log, "Resize cell {} to downloaded: {}", range().toString(), current_downloaded_size);
/**
* Only last holder of current file segment can resize the cell,
* because there is an invariant that file segments returned to users
* in FileSegmentsHolder represent a contiguous range, so we can resize
* it only when nobody needs it.
*/
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
/// Resize this file segment by creating a copy file segment with DOWNLOADED state,
/// but current file segment should remain PARRTIALLY_DOWNLOADED_NO_CONTINUATION and with detached state,
/// because otherwise an invariant that getOrSet() returns a contiguous range of file segments will be broken
/// (this will be crucial for other file segment holder, not for current one).
cache->reduceSizeToDownloaded(key(), offset(), cache_lock, segment_lock);
}
markAsDetached(segment_lock);
}
break;
}
2022-01-26 09:35:46 +00:00
}
2022-01-24 22:07:02 +00:00
2022-04-30 13:02:04 +00:00
LOG_TEST(log, "Completed file segment: {}", getInfoForLogImpl(segment_lock));
2022-03-21 18:48:13 +00:00
assertCorrectnessImpl(segment_lock);
2022-01-22 22:56:24 +00:00
}
2022-02-02 14:25:25 +00:00
String FileSegment::getInfoForLog() const
{
std::lock_guard segment_lock(mutex);
2022-03-17 17:29:31 +00:00
return getInfoForLogImpl(segment_lock);
}
2022-02-02 14:25:25 +00:00
2022-03-17 17:29:31 +00:00
String FileSegment::getInfoForLogImpl(std::lock_guard<std::mutex> & segment_lock) const
{
2022-02-02 14:25:25 +00:00
WriteBufferFromOwnString info;
info << "File segment: " << range().toString() << ", ";
2022-08-24 15:44:09 +00:00
info << "key: " << key().toString() << ", ";
2022-02-02 14:25:25 +00:00
info << "state: " << download_state << ", ";
2022-03-14 16:33:29 +00:00
info << "downloaded size: " << getDownloadedSize(segment_lock) << ", ";
2022-05-04 15:12:35 +00:00
info << "reserved size: " << reserved_size << ", ";
2022-05-27 13:10:52 +00:00
info << "downloader id: " << (downloader_id.empty() ? "None" : downloader_id) << ", ";
info << "caller id: " << getCallerId() << ", ";
2022-05-16 20:09:11 +00:00
info << "persistent: " << is_persistent;
2022-02-02 14:25:25 +00:00
return info.str();
}
2022-04-07 16:46:46 +00:00
void FileSegment::wrapWithCacheInfo(Exception & e, const String & message, std::lock_guard<std::mutex> & segment_lock) const
{
e.addMessage(fmt::format("{}, current cache state: {}", message, getInfoForLogImpl(segment_lock)));
}
String FileSegment::stateToString(FileSegment::State state)
2022-01-22 22:56:24 +00:00
{
switch (state)
{
case FileSegment::State::DOWNLOADED:
return "DOWNLOADED";
case FileSegment::State::EMPTY:
return "EMPTY";
case FileSegment::State::DOWNLOADING:
return "DOWNLOADING";
case FileSegment::State::PARTIALLY_DOWNLOADED:
return "PARTIALLY DOWNLOADED";
case FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION:
return "PARTIALLY DOWNLOADED NO CONTINUATION";
case FileSegment::State::SKIP_CACHE:
return "SKIP_CACHE";
}
__builtin_unreachable();
2022-01-22 22:56:24 +00:00
}
2022-03-21 18:48:13 +00:00
void FileSegment::assertCorrectness() const
{
std::lock_guard segment_lock(mutex);
assertCorrectnessImpl(segment_lock);
}
void FileSegment::assertCorrectnessImpl(std::lock_guard<std::mutex> & /* segment_lock */) const
{
assert(downloader_id.empty() == (download_state != FileSegment::State::DOWNLOADING));
assert(!downloader_id.empty() == (download_state == FileSegment::State::DOWNLOADING));
2022-06-15 11:39:00 +00:00
assert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(getPathInLocalCache()) > 0);
2022-03-21 18:48:13 +00:00
}
2022-05-11 08:45:20 +00:00
void FileSegment::throwIfDetached() const
{
std::lock_guard segment_lock(mutex);
2022-05-11 08:45:20 +00:00
throwIfDetachedUnlocked(segment_lock);
}
2022-05-11 08:45:20 +00:00
void FileSegment::throwIfDetachedUnlocked(std::lock_guard<std::mutex> & segment_lock) const
2022-05-03 11:15:27 +00:00
{
throw Exception(
2022-05-10 17:50:43 +00:00
ErrorCodes::LOGICAL_ERROR,
2022-05-03 11:15:27 +00:00
"Cache file segment is in detached state, operation not allowed. "
"It can happen when cache was concurrently dropped with SYSTEM DROP FILESYSTEM CACHE FORCE. "
"Please, retry. File segment info: {}", getInfoForLogImpl(segment_lock));
2022-05-03 11:15:27 +00:00
}
void FileSegment::assertNotDetached(std::lock_guard<std::mutex> & segment_lock) const
2022-04-11 15:51:49 +00:00
{
if (is_detached)
2022-05-11 08:45:20 +00:00
throwIfDetachedUnlocked(segment_lock);
2022-04-11 15:51:49 +00:00
}
2022-05-02 22:27:35 +00:00
void FileSegment::assertDetachedStatus(std::lock_guard<std::mutex> & segment_lock) const
2022-04-20 11:43:07 +00:00
{
2022-05-11 08:45:20 +00:00
/// Detached file segment is allowed to have only a certain subset of states.
/// It should be either EMPTY or one of the finalized states.
2022-05-02 22:27:35 +00:00
if (download_state != State::EMPTY && !hasFinalizedState())
{
throw Exception(
2022-05-07 21:33:28 +00:00
ErrorCodes::LOGICAL_ERROR,
2022-05-11 10:05:50 +00:00
"Detached file segment has incorrect state: {}",
2022-05-07 21:33:28 +00:00
getInfoForLogImpl(segment_lock));
2022-05-02 22:27:35 +00:00
}
2022-04-20 11:43:07 +00:00
}
2022-04-07 23:58:55 +00:00
FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std::lock_guard<std::mutex> & /* cache_lock */)
2022-04-07 16:46:46 +00:00
{
2022-08-26 12:13:36 +00:00
std::lock_guard segment_lock(file_segment->mutex);
2022-04-07 16:46:46 +00:00
auto snapshot = std::make_shared<FileSegment>(
file_segment->offset(),
file_segment->range().size(),
file_segment->key(),
nullptr,
2022-04-07 23:58:55 +00:00
State::EMPTY);
2022-04-07 16:46:46 +00:00
snapshot->hits_count = file_segment->getHitsCount();
snapshot->ref_count = file_segment.use_count();
2022-08-26 12:13:36 +00:00
snapshot->downloaded_size = file_segment->getDownloadedSize(segment_lock);
snapshot->download_state = file_segment->download_state;
2022-05-16 20:09:11 +00:00
snapshot->is_persistent = file_segment->isPersistent();
2022-04-07 16:46:46 +00:00
return snapshot;
}
2022-04-28 10:57:22 +00:00
bool FileSegment::hasFinalizedState() const
{
return download_state == State::DOWNLOADED
|| download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION
|| download_state == State::SKIP_CACHE;
}
2022-05-04 12:18:52 +00:00
void FileSegment::detach(
std::lock_guard<std::mutex> & /* cache_lock */,
std::lock_guard<std::mutex> & segment_lock)
2022-04-26 10:09:58 +00:00
{
2022-05-11 21:33:15 +00:00
/// Now detached status can be in 2 cases, which do not do any complex logic:
/// 1. there is only 1 remaining file segment holder
2022-05-12 10:16:12 +00:00
/// && it does not need this segment anymore
2022-05-11 21:33:15 +00:00
/// && this file segment was in cache and needs to be removed
/// 2. in read_from_cache_if_exists_otherwise_bypass_cache case
if (is_detached)
2022-04-26 10:13:21 +00:00
return;
2022-05-03 17:17:54 +00:00
markAsDetached(segment_lock);
2022-05-02 22:27:35 +00:00
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
downloader_id.clear();
2022-04-30 13:02:04 +00:00
2022-08-24 13:53:54 +00:00
LOG_DEBUG(log, "Detached file segment: {}", getInfoForLogImpl(segment_lock));
2022-04-26 10:09:58 +00:00
}
2022-05-03 17:17:54 +00:00
void FileSegment::markAsDetached(std::lock_guard<std::mutex> & /* segment_lock */)
{
is_detached = true;
2022-05-03 17:17:54 +00:00
CurrentMetrics::add(CurrentMetrics::CacheDetachedFileSegments);
}
FileSegment::~FileSegment()
{
std::lock_guard segment_lock(mutex);
if (is_detached)
2022-05-03 17:17:54 +00:00
CurrentMetrics::sub(CurrentMetrics::CacheDetachedFileSegments);
}
2022-03-17 19:29:07 +00:00
FileSegmentsHolder::~FileSegmentsHolder()
{
/// In CacheableReadBufferFromRemoteFS file segment's downloader removes file segments from
/// FileSegmentsHolder right after calling file_segment->complete(), so on destruction here
/// remain only uncompleted file segments.
2022-08-10 05:50:30 +00:00
FileCache * cache = nullptr;
2022-03-17 19:29:07 +00:00
for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end();)
{
2022-03-28 23:59:53 +00:00
auto current_file_segment_it = file_segment_it;
2022-03-17 19:29:07 +00:00
auto & file_segment = *current_file_segment_it;
if (!cache)
cache = file_segment->cache;
try
{
bool is_detached = false;
2022-05-03 11:15:27 +00:00
2022-04-28 20:50:19 +00:00
{
std::lock_guard segment_lock(file_segment->mutex);
is_detached = file_segment->isDetached(segment_lock);
if (is_detached)
2022-04-28 20:50:19 +00:00
file_segment->assertDetachedStatus(segment_lock);
}
2022-05-03 11:15:27 +00:00
if (is_detached)
2022-04-28 20:50:19 +00:00
{
/// This file segment is not owned by cache, so it will be destructed
/// at this point, therefore no completion required.
file_segment_it = file_segments.erase(current_file_segment_it);
continue;
}
2022-03-17 19:29:07 +00:00
/// File segment pointer must be reset right after calling complete() and
/// under the same mutex, because complete() checks for segment pointers.
std::lock_guard cache_lock(cache->mutex);
2022-08-19 18:13:46 +00:00
file_segment->completeWithoutState(cache_lock);
2022-03-17 19:29:07 +00:00
2022-03-28 23:59:53 +00:00
file_segment_it = file_segments.erase(current_file_segment_it);
2022-03-17 19:29:07 +00:00
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
2022-02-12 22:20:05 +00:00
String FileSegmentsHolder::toString()
{
String ranges;
for (const auto & file_segment : file_segments)
{
if (!ranges.empty())
ranges += ", ";
ranges += file_segment->range().toString();
}
return ranges;
}
2022-05-14 12:26:04 +00:00
FileSegmentRangeWriter::FileSegmentRangeWriter(
FileCache * cache_,
2022-05-16 14:16:38 +00:00
const FileSegment::Key & key_,
2022-08-16 12:13:12 +00:00
OnCompleteFileSegmentCallback && on_complete_file_segment_func_)
2022-05-14 12:26:04 +00:00
: cache(cache_)
, key(key_)
, current_file_segment_it(file_segments_holder.file_segments.end())
2022-05-16 14:16:38 +00:00
, on_complete_file_segment_func(on_complete_file_segment_func_)
2022-05-14 12:26:04 +00:00
{
}
FileSegments::iterator FileSegmentRangeWriter::allocateFileSegment(size_t offset, bool is_persistent)
{
2022-07-13 17:22:47 +00:00
/**
* Allocate a new file segment starting `offset`.
* File segment capacity will equal `max_file_segment_size`, but actual size is 0.
*/
2022-05-14 12:26:04 +00:00
std::lock_guard cache_lock(cache->mutex);
2022-07-13 17:22:47 +00:00
2022-05-14 12:26:04 +00:00
/// We set max_file_segment_size to be downloaded,
2022-07-13 17:22:47 +00:00
/// if we have less size to write, file segment will be resized in complete() method.
2022-07-17 13:13:28 +00:00
auto file_segment = cache->createFileSegmentForDownload(
key, offset, cache->max_file_segment_size, is_persistent, cache_lock);
2022-05-14 12:26:04 +00:00
return file_segments_holder.add(std::move(file_segment));
}
void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment)
2022-06-02 15:43:37 +00:00
{
2022-07-13 17:22:47 +00:00
/**
* Complete file segment based on downaloaded size.
*/
/// File segment can be detached if space reservation failed.
if (file_segment.isDetached())
2022-06-03 13:24:42 +00:00
return;
2022-08-24 15:44:09 +00:00
size_t current_downloaded_size = file_segment.getDownloadedSize();
if (current_downloaded_size > 0)
2022-06-02 15:43:37 +00:00
{
file_segment.getOrSetDownloader();
2022-08-19 18:13:46 +00:00
2022-08-25 10:41:31 +00:00
{
/// file_segment->complete(DOWNLOADED) is not enough, because file segment capacity
/// was initially set with a margin as `max_file_segment_size`. => We need to always
/// resize to actual size after download finished.
2022-08-25 10:53:51 +00:00
2022-08-25 10:41:31 +00:00
/// Current file segment is downloaded as a part of write-through cache
/// and therefore cannot be concurrently accessed. Nevertheless, it can be
/// accessed by cache system tables if someone read from them,
/// therefore we need a mutex.
2022-08-26 12:13:36 +00:00
std::lock_guard segment_lock(file_segment.mutex);
2022-08-25 10:41:31 +00:00
assert(file_segment.downloaded_size <= file_segment.range().size());
file_segment.segment_range = FileSegment::Range(
2022-08-25 10:53:51 +00:00
file_segment.segment_range.left,
2022-08-25 10:41:31 +00:00
file_segment.segment_range.left + file_segment.downloaded_size - 1);
file_segment.reserved_size = file_segment.downloaded_size;
}
2022-08-19 18:13:46 +00:00
file_segment.completeWithState(FileSegment::State::DOWNLOADED);
2022-06-02 15:43:37 +00:00
on_complete_file_segment_func(file_segment);
}
else
{
std::lock_guard cache_lock(cache->mutex);
2022-08-19 18:13:46 +00:00
file_segment.completeWithoutState(cache_lock);
2022-06-02 15:43:37 +00:00
}
}
bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset, bool is_persistent)
2022-05-14 12:26:04 +00:00
{
2022-07-13 17:22:47 +00:00
/**
* Write a range of file segments. Allocate file segment of `max_file_segment_size` and write to
* it until it is full and then allocate next file segment.
*/
2022-05-14 12:26:04 +00:00
if (finalized)
return false;
auto & file_segments = file_segments_holder.file_segments;
if (current_file_segment_it == file_segments.end())
{
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent);
}
else
{
if (current_file_segment_write_offset != offset)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot write file segment at offset {}, because current write offset is: {}",
offset, current_file_segment_write_offset);
}
2022-06-02 15:43:37 +00:00
if ((*current_file_segment_it)->getRemainingSizeToDownload() == 0)
2022-05-14 12:26:04 +00:00
{
completeFileSegment(**current_file_segment_it);
2022-05-14 12:26:04 +00:00
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent);
}
2022-05-21 22:21:40 +00:00
else if ((*current_file_segment_it)->getDownloadOffset() != offset)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot file segment download offset {} does not match current write offset {}",
(*current_file_segment_it)->getDownloadOffset(), offset);
}
2022-05-14 12:26:04 +00:00
}
auto & file_segment = *current_file_segment_it;
file_segment->getOrSetDownloader();
SCOPE_EXIT({
file_segment->resetDownloader();
});
2022-06-02 15:43:37 +00:00
bool reserved = file_segment->reserve(size);
if (!reserved)
2022-05-14 12:26:04 +00:00
{
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
on_complete_file_segment_func(*file_segment);
2022-05-21 22:21:40 +00:00
LOG_DEBUG(
&Poco::Logger::get("FileSegmentRangeWriter"),
"Unsuccessful space reservation attempt (size: {}, file segment info: {}",
size, file_segment->getInfoForLog());
2022-05-21 22:21:40 +00:00
return false;
2022-05-14 12:26:04 +00:00
}
(*current_file_segment_it)->write(data, size, offset);
current_file_segment_write_offset += size;
2022-05-14 12:26:04 +00:00
return true;
}
void FileSegmentRangeWriter::finalize()
{
if (finalized)
return;
auto & file_segments = file_segments_holder.file_segments;
if (file_segments.empty() || current_file_segment_it == file_segments.end())
return;
completeFileSegment(**current_file_segment_it);
2022-05-14 12:26:04 +00:00
finalized = true;
}
FileSegmentRangeWriter::~FileSegmentRangeWriter()
{
try
{
if (!finalized)
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
2022-01-22 22:56:24 +00:00
}