2022-01-22 22:56:24 +00:00
|
|
|
#include "FileSegment.h"
|
2022-05-14 12:26:04 +00:00
|
|
|
|
2022-01-30 11:35:28 +00:00
|
|
|
#include <base/getThreadId.h>
|
2022-09-06 11:30:02 +00:00
|
|
|
#include <Common/scope_guard_safe.h>
|
|
|
|
#include <Common/hex.h>
|
2022-05-14 12:26:04 +00:00
|
|
|
#include <Common/logger_useful.h>
|
2022-08-26 17:23:46 +00:00
|
|
|
#include <Interpreters/Cache/FileCache.h>
|
2022-02-02 14:25:25 +00:00
|
|
|
#include <IO/WriteBufferFromString.h>
|
|
|
|
#include <IO/Operators.h>
|
2022-01-30 11:35:28 +00:00
|
|
|
#include <filesystem>
|
2022-05-14 12:26:04 +00:00
|
|
|
|
2022-12-06 16:56:23 +00:00
|
|
|
#include <magic_enum.hpp>
|
|
|
|
|
2022-05-03 17:17:54 +00:00
|
|
|
namespace CurrentMetrics
|
|
|
|
{
|
|
|
|
extern const Metric CacheDetachedFileSegments;
|
|
|
|
}
|
|
|
|
|
2022-01-22 22:56:24 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2022-01-30 11:35:28 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2022-01-22 22:56:24 +00:00
|
|
|
}
|
|
|
|
|
2022-12-06 17:27:05 +00:00
|
|
|
String toString(FileSegmentKind kind)
|
2022-12-06 10:04:15 +00:00
|
|
|
{
|
2022-12-06 17:27:05 +00:00
|
|
|
return String(magic_enum::enum_name(kind));
|
2022-12-06 10:04:15 +00:00
|
|
|
}
|
|
|
|
|
2022-01-22 22:56:24 +00:00
|
|
|
FileSegment::FileSegment(
|
|
|
|
size_t offset_,
|
|
|
|
size_t size_,
|
2022-01-23 16:51:18 +00:00
|
|
|
const Key & key_,
|
2022-08-10 05:50:30 +00:00
|
|
|
FileCache * cache_,
|
2022-04-12 13:43:57 +00:00
|
|
|
State download_state_,
|
2022-09-06 11:30:02 +00:00
|
|
|
const CreateFileSegmentSettings & settings)
|
2022-01-22 22:56:24 +00:00
|
|
|
: segment_range(offset_, offset_ + size_ - 1)
|
|
|
|
, download_state(download_state_)
|
|
|
|
, file_key(key_)
|
|
|
|
, cache(cache_)
|
2022-02-18 15:38:23 +00:00
|
|
|
#ifndef NDEBUG
|
2022-01-30 11:35:28 +00:00
|
|
|
, log(&Poco::Logger::get(fmt::format("FileSegment({}) : {}", getHexUIntLowercase(key_), range().toString())))
|
2022-02-18 15:38:23 +00:00
|
|
|
#else
|
|
|
|
, log(&Poco::Logger::get("FileSegment"))
|
|
|
|
#endif
|
2022-12-06 17:27:05 +00:00
|
|
|
, segment_kind(settings.kind)
|
|
|
|
, is_unbound(settings.unbounded)
|
2022-01-22 22:56:24 +00:00
|
|
|
{
|
2022-03-21 11:30:25 +00:00
|
|
|
/// On creation, file segment state can be EMPTY, DOWNLOADED, DOWNLOADING.
|
|
|
|
switch (download_state)
|
|
|
|
{
|
|
|
|
/// EMPTY is used when file segment is not in cache and
|
|
|
|
/// someone will _potentially_ want to download it (after calling getOrSetDownloader()).
|
|
|
|
case (State::EMPTY):
|
|
|
|
{
|
|
|
|
break;
|
|
|
|
}
|
2022-03-21 20:20:15 +00:00
|
|
|
/// DOWNLOADED is used either on initial cache metadata load into memory on server startup
|
2022-03-21 11:30:25 +00:00
|
|
|
/// or on reduceSizeToDownloaded() -- when file segment object is updated.
|
|
|
|
case (State::DOWNLOADED):
|
|
|
|
{
|
|
|
|
reserved_size = downloaded_size = size_;
|
2022-08-24 12:16:53 +00:00
|
|
|
is_downloaded = true;
|
2022-12-15 13:57:25 +00:00
|
|
|
chassert(std::filesystem::file_size(getPathInLocalCache()) == size_);
|
2022-03-21 11:30:25 +00:00
|
|
|
break;
|
|
|
|
}
|
2022-05-25 08:54:28 +00:00
|
|
|
case (State::SKIP_CACHE):
|
|
|
|
{
|
|
|
|
break;
|
|
|
|
}
|
2022-03-21 11:30:25 +00:00
|
|
|
default:
|
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
throw Exception(
|
2022-12-15 18:39:41 +00:00
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
2022-10-18 13:58:17 +00:00
|
|
|
"Can only create cell with either EMPTY, DOWNLOADED or SKIP_CACHE state");
|
2022-03-21 11:30:25 +00:00
|
|
|
}
|
|
|
|
}
|
2022-01-22 22:56:24 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
String FileSegment::getPathInLocalCache() const
|
|
|
|
{
|
2022-12-06 10:04:15 +00:00
|
|
|
chassert(cache);
|
|
|
|
return cache->getPathInLocalCache(key(), offset(), segment_kind);
|
2022-09-06 11:30:02 +00:00
|
|
|
}
|
|
|
|
|
2022-01-23 16:51:18 +00:00
|
|
|
FileSegment::State FileSegment::state() const
|
2022-01-22 22:56:24 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(mutex);
|
2022-01-23 16:51:18 +00:00
|
|
|
return download_state;
|
2022-01-22 22:56:24 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::setDownloadState(State state)
|
|
|
|
{
|
|
|
|
LOG_TEST(log, "Updated state from {} to {}", stateToString(download_state), stateToString(state));
|
|
|
|
download_state = state;
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t FileSegment::getFirstNonDownloadedOffset() const
|
|
|
|
{
|
|
|
|
std::unique_lock segment_lock(mutex);
|
|
|
|
return getFirstNonDownloadedOffsetUnlocked(segment_lock);
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t FileSegment::getFirstNonDownloadedOffsetUnlocked(std::unique_lock<std::mutex> & segment_lock) const
|
2022-01-22 22:56:24 +00:00
|
|
|
{
|
2022-08-29 14:17:32 +00:00
|
|
|
return range().left + getDownloadedSizeUnlocked(segment_lock);
|
2022-03-14 16:33:29 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
size_t FileSegment::getCurrentWriteOffset() const
|
2022-04-07 16:46:46 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(mutex);
|
|
|
|
return getCurrentWriteOffsetUnlocked(segment_lock);
|
2022-04-07 16:46:46 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
size_t FileSegment::getCurrentWriteOffsetUnlocked(std::unique_lock<std::mutex> & segment_lock) const
|
2022-05-14 12:26:04 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
return getFirstNonDownloadedOffsetUnlocked(segment_lock);
|
2022-05-14 12:26:04 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
size_t FileSegment::getDownloadedSize() const
|
2022-06-03 13:24:42 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(mutex);
|
|
|
|
return getDownloadedSizeUnlocked(segment_lock);
|
2022-06-03 13:24:42 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
size_t FileSegment::getDownloadedSizeUnlocked(std::unique_lock<std::mutex> & /* segment_lock */) const
|
2022-03-14 16:33:29 +00:00
|
|
|
{
|
|
|
|
if (download_state == State::DOWNLOADED)
|
|
|
|
return downloaded_size;
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock download_lock(download_mutex);
|
2022-03-14 16:33:29 +00:00
|
|
|
return downloaded_size;
|
2022-01-22 22:56:24 +00:00
|
|
|
}
|
|
|
|
|
2022-12-06 16:23:43 +00:00
|
|
|
void FileSegment::setDownloadedSize(size_t delta)
|
|
|
|
{
|
|
|
|
std::unique_lock download_lock(download_mutex);
|
|
|
|
setDownloadedSizeUnlocked(download_lock, delta);
|
|
|
|
}
|
|
|
|
|
|
|
|
void FileSegment::setDownloadedSizeUnlocked(std::unique_lock<std::mutex> & /* download_lock */, size_t delta)
|
|
|
|
{
|
|
|
|
downloaded_size += delta;
|
2022-12-15 11:09:03 +00:00
|
|
|
assert(downloaded_size == std::filesystem::file_size(getPathInLocalCache()));
|
2022-12-06 16:23:43 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
bool FileSegment::isDownloaded() const
|
|
|
|
{
|
|
|
|
std::lock_guard segment_lock(mutex);
|
|
|
|
return is_downloaded;
|
|
|
|
}
|
|
|
|
|
2022-01-22 22:56:24 +00:00
|
|
|
String FileSegment::getCallerId()
|
2022-03-08 09:58:37 +00:00
|
|
|
{
|
2022-04-07 16:46:46 +00:00
|
|
|
if (!CurrentThread::isInitialized()
|
|
|
|
|| !CurrentThread::get().getQueryContext()
|
2022-07-17 15:22:12 +00:00
|
|
|
|| CurrentThread::getQueryId().empty())
|
2022-04-07 16:46:46 +00:00
|
|
|
return "None:" + toString(getThreadId());
|
2022-01-22 22:56:24 +00:00
|
|
|
|
2022-07-17 15:22:12 +00:00
|
|
|
return std::string(CurrentThread::getQueryId()) + ":" + toString(getThreadId());
|
2022-01-22 22:56:24 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
String FileSegment::getDownloader() const
|
|
|
|
{
|
|
|
|
std::unique_lock segment_lock(mutex);
|
|
|
|
return getDownloaderUnlocked(segment_lock);
|
|
|
|
}
|
|
|
|
|
|
|
|
String FileSegment::getDownloaderUnlocked(std::unique_lock<std::mutex> & /* segment_lock */) const
|
|
|
|
{
|
|
|
|
return downloader_id;
|
|
|
|
}
|
|
|
|
|
2022-01-22 22:56:24 +00:00
|
|
|
String FileSegment::getOrSetDownloader()
|
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(mutex);
|
2022-01-22 22:56:24 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
assertNotDetachedUnlocked(segment_lock);
|
2022-04-11 15:51:49 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
auto current_downloader = getDownloaderUnlocked(segment_lock);
|
2022-03-09 17:14:28 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
if (current_downloader.empty())
|
|
|
|
{
|
|
|
|
bool allow_new_downloader = download_state == State::EMPTY || download_state == State::PARTIALLY_DOWNLOADED;
|
|
|
|
if (!allow_new_downloader)
|
2022-09-12 12:39:52 +00:00
|
|
|
return "notAllowed:" + stateToString(download_state);
|
2022-09-06 11:30:02 +00:00
|
|
|
|
|
|
|
current_downloader = downloader_id = getCallerId();
|
|
|
|
setDownloadState(State::DOWNLOADING);
|
2022-01-22 22:56:24 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
return current_downloader;
|
2022-01-26 18:43:23 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::resetDownloadingStateUnlocked([[maybe_unused]] std::unique_lock<std::mutex> & segment_lock)
|
2022-03-06 19:33:07 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
assert(isDownloaderUnlocked(segment_lock));
|
|
|
|
assert(download_state == State::DOWNLOADING);
|
2022-03-06 19:33:07 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
|
|
|
|
/// range().size() can equal 0 in case of write-though cache.
|
|
|
|
if (current_downloaded_size != 0 && current_downloaded_size == range().size())
|
|
|
|
setDownloadedUnlocked(segment_lock);
|
|
|
|
else
|
|
|
|
setDownloadState(State::PARTIALLY_DOWNLOADED);
|
|
|
|
}
|
2022-05-03 11:15:27 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::resetDownloader()
|
|
|
|
{
|
|
|
|
std::unique_lock segment_lock(mutex);
|
2022-03-06 19:33:07 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
assertNotDetachedUnlocked(segment_lock);
|
|
|
|
assertIsDownloaderUnlocked("resetDownloader", segment_lock);
|
2022-03-09 09:36:52 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
resetDownloadingStateUnlocked(segment_lock);
|
|
|
|
resetDownloaderUnlocked(segment_lock);
|
2022-03-09 09:36:52 +00:00
|
|
|
}
|
2022-03-06 19:33:07 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::resetDownloaderUnlocked(std::unique_lock<std::mutex> & /* segment_lock */)
|
2022-03-09 09:36:52 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
LOG_TEST(log, "Resetting downloader from {}", downloader_id);
|
2022-03-06 19:33:07 +00:00
|
|
|
downloader_id.clear();
|
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::assertIsDownloaderUnlocked(const std::string & operation, std::unique_lock<std::mutex> & segment_lock) const
|
2022-01-26 18:43:23 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
auto caller = getCallerId();
|
|
|
|
auto current_downloader = getDownloaderUnlocked(segment_lock);
|
|
|
|
LOG_TEST(log, "Downloader id: {}, caller id: {}", current_downloader, caller);
|
|
|
|
|
|
|
|
if (caller != current_downloader)
|
|
|
|
{
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"Operation `{}` can be done only by downloader. "
|
|
|
|
"(CallerId: {}, downloader id: {})",
|
|
|
|
operation, caller, downloader_id);
|
|
|
|
}
|
2022-01-22 22:56:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bool FileSegment::isDownloader() const
|
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(mutex);
|
|
|
|
return isDownloaderUnlocked(segment_lock);
|
2022-01-22 22:56:24 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
bool FileSegment::isDownloaderUnlocked(std::unique_lock<std::mutex> & segment_lock) const
|
2022-04-14 11:17:04 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
return getCallerId() == getDownloaderUnlocked(segment_lock);
|
2022-04-14 11:17:04 +00:00
|
|
|
}
|
|
|
|
|
2022-01-26 18:43:23 +00:00
|
|
|
FileSegment::RemoteFileReaderPtr FileSegment::getRemoteFileReader()
|
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(mutex);
|
|
|
|
assertIsDownloaderUnlocked("getRemoteFileReader", segment_lock);
|
2022-01-26 18:43:23 +00:00
|
|
|
return remote_file_reader;
|
|
|
|
}
|
|
|
|
|
2022-08-16 12:13:12 +00:00
|
|
|
FileSegment::RemoteFileReaderPtr FileSegment::extractRemoteFileReader()
|
|
|
|
{
|
|
|
|
std::lock_guard cache_lock(cache->mutex);
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(mutex);
|
2022-08-16 12:13:12 +00:00
|
|
|
|
2022-08-16 15:08:12 +00:00
|
|
|
if (!is_detached)
|
|
|
|
{
|
|
|
|
bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock);
|
|
|
|
if (!downloader_id.empty() || !is_last_holder)
|
|
|
|
return nullptr;
|
|
|
|
}
|
2022-08-16 12:13:12 +00:00
|
|
|
|
|
|
|
return std::move(remote_file_reader);
|
|
|
|
}
|
|
|
|
|
2022-01-26 18:43:23 +00:00
|
|
|
void FileSegment::setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_)
|
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(mutex);
|
|
|
|
assertIsDownloaderUnlocked("setRemoteFileReader", segment_lock);
|
2022-01-26 18:43:23 +00:00
|
|
|
|
|
|
|
if (remote_file_reader)
|
2022-04-26 10:57:58 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Remote file reader already exists");
|
2022-01-26 18:43:23 +00:00
|
|
|
|
|
|
|
remote_file_reader = remote_file_reader_;
|
|
|
|
}
|
|
|
|
|
2022-04-26 10:57:58 +00:00
|
|
|
void FileSegment::resetRemoteFileReader()
|
2022-03-17 19:29:07 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(mutex);
|
|
|
|
assertIsDownloaderUnlocked("resetRemoteFileReader", segment_lock);
|
2022-03-17 19:29:07 +00:00
|
|
|
|
|
|
|
if (!remote_file_reader)
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Remote file reader does not exist");
|
|
|
|
|
2022-04-26 10:57:58 +00:00
|
|
|
remote_file_reader.reset();
|
2022-03-17 19:29:07 +00:00
|
|
|
}
|
|
|
|
|
2022-12-06 17:27:05 +00:00
|
|
|
std::unique_ptr<WriteBufferFromFile> FileSegment::detachWriter()
|
|
|
|
{
|
|
|
|
std::unique_lock segment_lock(mutex);
|
|
|
|
|
|
|
|
if (!cache_writer)
|
|
|
|
{
|
|
|
|
if (detached_writer)
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writer is already detached");
|
|
|
|
|
|
|
|
auto download_path = getPathInLocalCache();
|
|
|
|
cache_writer = std::make_unique<WriteBufferFromFile>(download_path);
|
|
|
|
}
|
|
|
|
detached_writer = true;
|
|
|
|
return std::move(cache_writer);
|
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::write(const char * from, size_t size, size_t offset)
|
2022-01-22 22:56:24 +00:00
|
|
|
{
|
2022-01-23 16:51:18 +00:00
|
|
|
if (!size)
|
2022-12-15 18:39:41 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing zero size is not allowed");
|
2022-01-23 16:51:18 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
{
|
|
|
|
std::unique_lock segment_lock(mutex);
|
2022-01-22 22:56:24 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
assertIsDownloaderUnlocked("write", segment_lock);
|
|
|
|
assertNotDetachedUnlocked(segment_lock);
|
2022-01-23 16:51:18 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
if (download_state != State::DOWNLOADING)
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"Expected DOWNLOADING state, got {}", stateToString(download_state));
|
|
|
|
|
|
|
|
size_t first_non_downloaded_offset = getFirstNonDownloadedOffsetUnlocked(segment_lock);
|
|
|
|
if (offset != first_non_downloaded_offset)
|
|
|
|
throw Exception(
|
2022-12-15 18:39:41 +00:00
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
2022-09-06 11:30:02 +00:00
|
|
|
"Attempt to write {} bytes to offset: {}, but current write offset is {}",
|
|
|
|
size, offset, first_non_downloaded_offset);
|
2022-03-17 16:50:51 +00:00
|
|
|
|
2022-09-12 12:39:52 +00:00
|
|
|
size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
|
|
|
|
chassert(reserved_size >= current_downloaded_size);
|
|
|
|
size_t free_reserved_size = reserved_size - current_downloaded_size;
|
|
|
|
|
|
|
|
if (free_reserved_size < size)
|
2022-09-06 11:30:02 +00:00
|
|
|
throw Exception(
|
2022-12-15 18:39:41 +00:00
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
2022-09-12 12:39:52 +00:00
|
|
|
"Not enough space is reserved. Available: {}, expected: {}", free_reserved_size, size);
|
2022-03-16 12:27:58 +00:00
|
|
|
|
2022-09-12 12:39:52 +00:00
|
|
|
if (current_downloaded_size == range().size())
|
2022-09-06 11:30:02 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "File segment is already fully downloaded");
|
2022-03-14 16:33:29 +00:00
|
|
|
|
2022-12-06 16:23:43 +00:00
|
|
|
if (!cache_writer)
|
2022-09-12 12:39:52 +00:00
|
|
|
{
|
|
|
|
if (current_downloaded_size > 0)
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"Cache writer was finalized (downloaded size: {}, state: {})",
|
|
|
|
current_downloaded_size, stateToString(download_state));
|
|
|
|
|
2022-12-06 17:27:05 +00:00
|
|
|
if (detached_writer)
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache writer was detached");
|
|
|
|
|
2022-09-12 12:39:52 +00:00
|
|
|
auto download_path = getPathInLocalCache();
|
|
|
|
cache_writer = std::make_unique<WriteBufferFromFile>(download_path);
|
|
|
|
}
|
2022-01-22 22:56:24 +00:00
|
|
|
}
|
|
|
|
|
2022-02-02 14:25:25 +00:00
|
|
|
try
|
|
|
|
{
|
2022-12-06 16:23:43 +00:00
|
|
|
cache_writer->write(from, size);
|
2022-03-14 16:33:29 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock download_lock(download_mutex);
|
2022-03-14 16:33:29 +00:00
|
|
|
|
2022-12-06 16:23:43 +00:00
|
|
|
cache_writer->next();
|
2022-03-14 16:33:29 +00:00
|
|
|
|
|
|
|
downloaded_size += size;
|
2022-12-15 13:57:25 +00:00
|
|
|
|
|
|
|
chassert(std::filesystem::file_size(getPathInLocalCache()) == downloaded_size);
|
2022-02-02 14:25:25 +00:00
|
|
|
}
|
2022-03-17 19:29:07 +00:00
|
|
|
catch (Exception & e)
|
2022-02-02 14:25:25 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(mutex);
|
2022-02-18 15:38:23 +00:00
|
|
|
|
2022-04-07 16:46:46 +00:00
|
|
|
wrapWithCacheInfo(e, "while writing into cache", segment_lock);
|
2022-03-17 19:29:07 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
setDownloadFailedUnlocked(segment_lock);
|
2022-03-10 09:56:48 +00:00
|
|
|
|
2022-03-16 12:27:58 +00:00
|
|
|
cv.notify_all();
|
2022-02-18 15:38:23 +00:00
|
|
|
|
2022-02-02 14:25:25 +00:00
|
|
|
throw;
|
|
|
|
}
|
2022-03-16 13:29:21 +00:00
|
|
|
|
2022-09-12 12:39:52 +00:00
|
|
|
chassert(getFirstNonDownloadedOffset() == offset + size);
|
2022-06-15 11:39:00 +00:00
|
|
|
}
|
|
|
|
|
2022-01-22 22:56:24 +00:00
|
|
|
FileSegment::State FileSegment::wait()
|
|
|
|
{
|
|
|
|
std::unique_lock segment_lock(mutex);
|
|
|
|
|
2022-05-06 10:48:16 +00:00
|
|
|
if (is_detached)
|
2022-05-06 12:22:01 +00:00
|
|
|
throw Exception(
|
2022-05-10 17:50:43 +00:00
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
2022-05-11 21:33:15 +00:00
|
|
|
"Cache file segment is in detached state, operation not allowed");
|
2022-05-03 11:15:27 +00:00
|
|
|
|
2022-03-06 19:33:07 +00:00
|
|
|
if (downloader_id.empty())
|
|
|
|
return download_state;
|
|
|
|
|
2022-01-22 22:56:24 +00:00
|
|
|
if (download_state == State::EMPTY)
|
2022-12-15 18:39:41 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot wait on a file segment with empty state");
|
2022-01-22 22:56:24 +00:00
|
|
|
|
|
|
|
if (download_state == State::DOWNLOADING)
|
|
|
|
{
|
2022-01-30 11:35:28 +00:00
|
|
|
LOG_TEST(log, "{} waiting on: {}, current downloader: {}", getCallerId(), range().toString(), downloader_id);
|
2022-01-26 09:35:46 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
chassert(!getDownloaderUnlocked(segment_lock).empty());
|
|
|
|
chassert(!isDownloaderUnlocked(segment_lock));
|
2022-01-26 09:35:46 +00:00
|
|
|
|
2022-02-21 12:54:03 +00:00
|
|
|
cv.wait_for(segment_lock, std::chrono::seconds(60));
|
2022-01-22 22:56:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return download_state;
|
|
|
|
}
|
|
|
|
|
2022-08-29 14:17:32 +00:00
|
|
|
bool FileSegment::reserve(size_t size_to_reserve)
|
2022-01-22 22:56:24 +00:00
|
|
|
{
|
2022-08-29 14:17:32 +00:00
|
|
|
if (!size_to_reserve)
|
2022-12-15 18:39:41 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Zero space reservation is not allowed");
|
2022-01-22 22:56:24 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
size_t expected_downloaded_size;
|
|
|
|
|
2022-12-06 17:27:05 +00:00
|
|
|
bool is_file_segment_size_exceeded;
|
2022-02-24 14:20:51 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(mutex);
|
2022-01-22 22:56:24 +00:00
|
|
|
|
2022-12-06 17:27:05 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
assertNotDetachedUnlocked(segment_lock);
|
|
|
|
assertIsDownloaderUnlocked("reserve", segment_lock);
|
2022-03-14 16:33:29 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
expected_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
|
|
|
|
|
2022-12-06 17:27:05 +00:00
|
|
|
is_file_segment_size_exceeded = expected_downloaded_size + size_to_reserve > range().size();
|
|
|
|
if (is_file_segment_size_exceeded && !is_unbound)
|
2022-12-06 10:04:15 +00:00
|
|
|
{
|
2022-12-06 17:27:05 +00:00
|
|
|
throw Exception(
|
2022-12-15 18:39:41 +00:00
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
2022-12-06 17:27:05 +00:00
|
|
|
"Attempt to reserve space too much space ({}) for file segment with range: {} (downloaded size: {})",
|
|
|
|
size_to_reserve, range().toString(), downloaded_size);
|
2022-12-06 10:04:15 +00:00
|
|
|
}
|
2022-01-22 22:56:24 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
chassert(reserved_size >= expected_downloaded_size);
|
2022-02-24 14:20:51 +00:00
|
|
|
}
|
2022-01-22 22:56:24 +00:00
|
|
|
|
|
|
|
/**
|
|
|
|
* It is possible to have downloaded_size < reserved_size when reserve is called
|
|
|
|
* in case previous downloader did not fully download current file_segment
|
|
|
|
* and the caller is going to continue;
|
|
|
|
*/
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
size_t already_reserved_size = reserved_size - expected_downloaded_size;
|
2022-01-22 22:56:24 +00:00
|
|
|
|
2022-08-29 14:17:32 +00:00
|
|
|
bool reserved = already_reserved_size >= size_to_reserve;
|
2022-08-24 15:44:09 +00:00
|
|
|
if (!reserved)
|
2022-05-05 22:07:22 +00:00
|
|
|
{
|
2022-08-24 15:44:09 +00:00
|
|
|
std::lock_guard cache_lock(cache->mutex);
|
2022-12-06 17:27:05 +00:00
|
|
|
std::lock_guard segment_lock(mutex);
|
|
|
|
|
|
|
|
size_to_reserve = size_to_reserve - already_reserved_size;
|
2022-08-24 15:44:09 +00:00
|
|
|
|
2022-12-06 17:27:05 +00:00
|
|
|
if (is_unbound && is_file_segment_size_exceeded)
|
|
|
|
{
|
|
|
|
segment_range.right = range().left + expected_downloaded_size + size_to_reserve;
|
|
|
|
}
|
2022-08-24 15:44:09 +00:00
|
|
|
|
2022-12-06 17:27:05 +00:00
|
|
|
reserved = cache->tryReserve(key(), offset(), size_to_reserve, cache_lock);
|
2022-12-02 13:50:56 +00:00
|
|
|
|
2022-12-06 17:27:05 +00:00
|
|
|
if (reserved)
|
|
|
|
reserved_size += size_to_reserve;
|
2022-12-06 10:04:15 +00:00
|
|
|
}
|
2022-12-06 17:27:05 +00:00
|
|
|
|
|
|
|
return reserved;
|
2022-01-22 22:56:24 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::setDownloadedUnlocked([[maybe_unused]] std::unique_lock<std::mutex> & segment_lock)
|
2022-02-23 10:12:14 +00:00
|
|
|
{
|
2022-04-14 11:17:04 +00:00
|
|
|
if (is_downloaded)
|
|
|
|
return;
|
|
|
|
|
2022-04-07 16:46:46 +00:00
|
|
|
if (cache_writer)
|
|
|
|
{
|
|
|
|
cache_writer->finalize();
|
|
|
|
cache_writer.reset();
|
|
|
|
remote_file_reader.reset();
|
|
|
|
}
|
2022-08-24 15:44:09 +00:00
|
|
|
|
|
|
|
download_state = State::DOWNLOADED;
|
|
|
|
is_downloaded = true;
|
|
|
|
|
2022-08-29 14:17:32 +00:00
|
|
|
assert(getDownloadedSizeUnlocked(segment_lock) > 0);
|
2022-08-24 15:44:09 +00:00
|
|
|
assert(std::filesystem::file_size(getPathInLocalCache()) > 0);
|
2022-04-07 16:46:46 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::setDownloadFailedUnlocked(std::unique_lock<std::mutex> & segment_lock)
|
2022-04-07 16:46:46 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
LOG_INFO(log, "Settings download as failed: {}", getInfoForLogUnlocked(segment_lock));
|
|
|
|
|
|
|
|
setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
|
|
|
resetDownloaderUnlocked(segment_lock);
|
2022-02-23 10:12:14 +00:00
|
|
|
|
|
|
|
if (cache_writer)
|
|
|
|
{
|
|
|
|
cache_writer->finalize();
|
|
|
|
cache_writer.reset();
|
2022-03-11 11:57:57 +00:00
|
|
|
remote_file_reader.reset();
|
2022-02-23 10:12:14 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::completePartAndResetDownloader()
|
2022-01-22 22:56:24 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(mutex);
|
|
|
|
completePartAndResetDownloaderUnlocked(segment_lock);
|
|
|
|
}
|
2022-01-22 22:56:24 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::completePartAndResetDownloaderUnlocked(std::unique_lock<std::mutex> & segment_lock)
|
|
|
|
{
|
|
|
|
assertNotDetachedUnlocked(segment_lock);
|
|
|
|
assertIsDownloaderUnlocked("completePartAndResetDownloader", segment_lock);
|
2022-02-23 13:43:40 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
resetDownloadingStateUnlocked(segment_lock);
|
|
|
|
resetDownloaderUnlocked(segment_lock);
|
2022-01-26 09:35:46 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
LOG_TEST(log, "Complete batch. ({})", getInfoForLogUnlocked(segment_lock));
|
2022-01-26 09:35:46 +00:00
|
|
|
cv.notify_all();
|
|
|
|
}
|
|
|
|
|
2022-08-19 18:13:46 +00:00
|
|
|
void FileSegment::completeWithState(State state)
|
2022-01-26 09:35:46 +00:00
|
|
|
{
|
2022-03-17 19:29:07 +00:00
|
|
|
std::lock_guard cache_lock(cache->mutex);
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(mutex);
|
2022-03-01 16:00:54 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
assertNotDetachedUnlocked(segment_lock);
|
|
|
|
assertIsDownloaderUnlocked("complete", segment_lock);
|
2022-02-15 09:11:33 +00:00
|
|
|
|
2022-03-17 19:29:07 +00:00
|
|
|
if (state != State::DOWNLOADED
|
|
|
|
&& state != State::PARTIALLY_DOWNLOADED
|
|
|
|
&& state != State::PARTIALLY_DOWNLOADED_NO_CONTINUATION)
|
|
|
|
{
|
|
|
|
cv.notify_all();
|
2022-08-19 18:13:46 +00:00
|
|
|
throw Exception(
|
2022-12-15 18:39:41 +00:00
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
2022-08-19 18:13:46 +00:00
|
|
|
"Cannot complete file segment with state: {}", stateToString(state));
|
2022-05-25 14:49:40 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
setDownloadState(state);
|
2022-08-19 18:13:46 +00:00
|
|
|
completeBasedOnCurrentState(cache_lock, segment_lock);
|
2022-01-24 22:07:02 +00:00
|
|
|
}
|
2022-01-22 22:56:24 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::completeWithoutState()
|
2022-01-24 22:07:02 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::lock_guard cache_lock(cache->mutex);
|
|
|
|
completeWithoutStateUnlocked(cache_lock);
|
|
|
|
}
|
|
|
|
|
|
|
|
void FileSegment::completeWithoutStateUnlocked(std::lock_guard<std::mutex> & cache_lock)
|
|
|
|
{
|
|
|
|
std::unique_lock segment_lock(mutex);
|
2022-08-19 18:13:46 +00:00
|
|
|
completeBasedOnCurrentState(cache_lock, segment_lock);
|
2022-04-27 18:01:59 +00:00
|
|
|
}
|
2022-01-23 16:51:18 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cache_lock, std::unique_lock<std::mutex> & segment_lock)
|
2022-04-27 18:01:59 +00:00
|
|
|
{
|
2022-08-19 18:13:46 +00:00
|
|
|
if (is_detached)
|
2022-03-17 19:29:07 +00:00
|
|
|
return;
|
2022-01-22 22:56:24 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
bool is_downloader = isDownloaderUnlocked(segment_lock);
|
2022-03-22 09:39:58 +00:00
|
|
|
bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock);
|
2022-08-29 14:17:32 +00:00
|
|
|
size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
|
2022-02-15 10:27:44 +00:00
|
|
|
|
2022-08-22 10:00:14 +00:00
|
|
|
SCOPE_EXIT({
|
|
|
|
if (is_downloader)
|
|
|
|
{
|
|
|
|
cv.notify_all();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
LOG_TEST(
|
|
|
|
log,
|
|
|
|
"Complete based on current state (is_last_holder: {}, {})",
|
|
|
|
is_last_holder, getInfoForLogUnlocked(segment_lock));
|
2022-08-19 18:13:46 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
if (is_downloader)
|
2022-01-24 22:07:02 +00:00
|
|
|
{
|
2022-09-11 15:38:51 +00:00
|
|
|
if (download_state == State::DOWNLOADING) /// != in case of completeWithState
|
|
|
|
resetDownloadingStateUnlocked(segment_lock);
|
2022-09-06 11:30:02 +00:00
|
|
|
resetDownloaderUnlocked(segment_lock);
|
2022-01-22 22:56:24 +00:00
|
|
|
}
|
|
|
|
|
2022-12-15 13:57:25 +00:00
|
|
|
if (cache_writer && (is_downloader || is_last_holder))
|
|
|
|
{
|
|
|
|
cache_writer->finalize();
|
|
|
|
cache_writer.reset();
|
|
|
|
remote_file_reader.reset();
|
|
|
|
}
|
|
|
|
|
2022-12-06 10:04:15 +00:00
|
|
|
if (segment_kind == FileSegmentKind::Temporary && is_last_holder)
|
|
|
|
{
|
|
|
|
LOG_TEST(log, "Removing temporary file segment: {}", getInfoForLogUnlocked(segment_lock));
|
|
|
|
detach(cache_lock, segment_lock);
|
|
|
|
setDownloadState(State::SKIP_CACHE);
|
|
|
|
cache->remove(key(), offset(), cache_lock, segment_lock);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2022-08-19 18:13:46 +00:00
|
|
|
switch (download_state)
|
2022-01-26 09:35:46 +00:00
|
|
|
{
|
2022-08-19 18:13:46 +00:00
|
|
|
case State::SKIP_CACHE:
|
|
|
|
{
|
|
|
|
if (is_last_holder)
|
|
|
|
cache->remove(key(), offset(), cache_lock, segment_lock);
|
2022-12-15 18:39:41 +00:00
|
|
|
break;
|
2022-08-19 18:13:46 +00:00
|
|
|
}
|
|
|
|
case State::DOWNLOADED:
|
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
chassert(getDownloadedSizeUnlocked(segment_lock) == range().size());
|
2022-12-15 13:57:25 +00:00
|
|
|
chassert(getDownloadedSizeUnlocked(segment_lock) == std::filesystem::file_size(getPathInLocalCache()));
|
|
|
|
chassert(is_downloaded);
|
|
|
|
chassert(!cache_writer);
|
2022-08-19 18:13:46 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
case State::DOWNLOADING:
|
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
chassert(!is_last_holder);
|
2022-08-19 18:13:46 +00:00
|
|
|
break;
|
|
|
|
}
|
2022-09-06 11:30:02 +00:00
|
|
|
case State::EMPTY:
|
2022-08-19 18:13:46 +00:00
|
|
|
case State::PARTIALLY_DOWNLOADED:
|
|
|
|
case State::PARTIALLY_DOWNLOADED_NO_CONTINUATION:
|
|
|
|
{
|
|
|
|
if (is_last_holder)
|
|
|
|
{
|
|
|
|
if (current_downloaded_size == 0)
|
|
|
|
{
|
|
|
|
LOG_TEST(log, "Remove cell {} (nothing downloaded)", range().toString());
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
setDownloadState(State::SKIP_CACHE);
|
2022-08-19 18:13:46 +00:00
|
|
|
cache->remove(key(), offset(), cache_lock, segment_lock);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
LOG_TEST(log, "Resize cell {} to downloaded: {}", range().toString(), current_downloaded_size);
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Only last holder of current file segment can resize the cell,
|
|
|
|
* because there is an invariant that file segments returned to users
|
|
|
|
* in FileSegmentsHolder represent a contiguous range, so we can resize
|
|
|
|
* it only when nobody needs it.
|
|
|
|
*/
|
2022-09-06 11:30:02 +00:00
|
|
|
setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
2022-08-19 18:13:46 +00:00
|
|
|
|
|
|
|
/// Resize this file segment by creating a copy file segment with DOWNLOADED state,
|
|
|
|
/// but current file segment should remain PARRTIALLY_DOWNLOADED_NO_CONTINUATION and with detached state,
|
|
|
|
/// because otherwise an invariant that getOrSet() returns a contiguous range of file segments will be broken
|
|
|
|
/// (this will be crucial for other file segment holder, not for current one).
|
|
|
|
cache->reduceSizeToDownloaded(key(), offset(), cache_lock, segment_lock);
|
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
detachAssumeStateFinalized(segment_lock);
|
2022-08-19 18:13:46 +00:00
|
|
|
}
|
|
|
|
break;
|
|
|
|
}
|
2022-01-26 09:35:46 +00:00
|
|
|
}
|
2022-01-24 22:07:02 +00:00
|
|
|
|
2022-12-15 18:39:41 +00:00
|
|
|
is_completed = true;
|
2022-09-06 11:30:02 +00:00
|
|
|
LOG_TEST(log, "Completed file segment: {}", getInfoForLogUnlocked(segment_lock));
|
2022-01-22 22:56:24 +00:00
|
|
|
}
|
|
|
|
|
2022-02-02 14:25:25 +00:00
|
|
|
String FileSegment::getInfoForLog() const
|
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(mutex);
|
|
|
|
return getInfoForLogUnlocked(segment_lock);
|
2022-03-17 17:29:31 +00:00
|
|
|
}
|
2022-02-02 14:25:25 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
String FileSegment::getInfoForLogUnlocked(std::unique_lock<std::mutex> & segment_lock) const
|
2022-03-17 17:29:31 +00:00
|
|
|
{
|
2022-02-02 14:25:25 +00:00
|
|
|
WriteBufferFromOwnString info;
|
|
|
|
info << "File segment: " << range().toString() << ", ";
|
2022-08-24 13:53:54 +00:00
|
|
|
info << "key: " << key().toString() << ", ";
|
2022-02-02 14:25:25 +00:00
|
|
|
info << "state: " << download_state << ", ";
|
2022-08-29 14:17:32 +00:00
|
|
|
info << "downloaded size: " << getDownloadedSizeUnlocked(segment_lock) << ", ";
|
2022-05-04 15:12:35 +00:00
|
|
|
info << "reserved size: " << reserved_size << ", ";
|
2022-05-27 13:10:52 +00:00
|
|
|
info << "downloader id: " << (downloader_id.empty() ? "None" : downloader_id) << ", ";
|
2022-09-06 11:30:02 +00:00
|
|
|
info << "current write offset: " << getCurrentWriteOffsetUnlocked(segment_lock) << ", ";
|
|
|
|
info << "first non-downloaded offset: " << getFirstNonDownloadedOffsetUnlocked(segment_lock) << ", ";
|
2022-05-27 13:10:52 +00:00
|
|
|
info << "caller id: " << getCallerId() << ", ";
|
2022-09-06 11:30:02 +00:00
|
|
|
info << "detached: " << is_detached << ", ";
|
2022-12-06 10:04:15 +00:00
|
|
|
info << "kind: " << toString(segment_kind);
|
2022-02-02 14:25:25 +00:00
|
|
|
|
|
|
|
return info.str();
|
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::wrapWithCacheInfo(Exception & e, const String & message, std::unique_lock<std::mutex> & segment_lock) const
|
2022-04-07 16:46:46 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
e.addMessage(fmt::format("{}, current cache state: {}", message, getInfoForLogUnlocked(segment_lock)));
|
2022-04-07 16:46:46 +00:00
|
|
|
}
|
|
|
|
|
2022-01-26 18:43:23 +00:00
|
|
|
String FileSegment::stateToString(FileSegment::State state)
|
2022-01-22 22:56:24 +00:00
|
|
|
{
|
|
|
|
switch (state)
|
|
|
|
{
|
|
|
|
case FileSegment::State::DOWNLOADED:
|
|
|
|
return "DOWNLOADED";
|
|
|
|
case FileSegment::State::EMPTY:
|
|
|
|
return "EMPTY";
|
|
|
|
case FileSegment::State::DOWNLOADING:
|
|
|
|
return "DOWNLOADING";
|
|
|
|
case FileSegment::State::PARTIALLY_DOWNLOADED:
|
|
|
|
return "PARTIALLY DOWNLOADED";
|
|
|
|
case FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION:
|
|
|
|
return "PARTIALLY DOWNLOADED NO CONTINUATION";
|
|
|
|
case FileSegment::State::SKIP_CACHE:
|
|
|
|
return "SKIP_CACHE";
|
|
|
|
}
|
2022-10-07 19:20:14 +00:00
|
|
|
UNREACHABLE();
|
2022-01-22 22:56:24 +00:00
|
|
|
}
|
|
|
|
|
2022-03-21 18:48:13 +00:00
|
|
|
void FileSegment::assertCorrectness() const
|
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(mutex);
|
|
|
|
assertCorrectnessUnlocked(segment_lock);
|
2022-03-21 18:48:13 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::assertCorrectnessUnlocked(std::unique_lock<std::mutex> & segment_lock) const
|
2022-05-06 10:48:16 +00:00
|
|
|
{
|
2022-09-12 12:39:52 +00:00
|
|
|
auto current_downloader = getDownloaderUnlocked(segment_lock);
|
|
|
|
chassert(current_downloader.empty() == (download_state != FileSegment::State::DOWNLOADING));
|
|
|
|
chassert(!current_downloader.empty() == (download_state == FileSegment::State::DOWNLOADING));
|
|
|
|
chassert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(getPathInLocalCache()) > 0);
|
2022-05-06 10:48:16 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::throwIfDetachedUnlocked(std::unique_lock<std::mutex> & segment_lock) const
|
2022-05-03 11:15:27 +00:00
|
|
|
{
|
|
|
|
throw Exception(
|
2022-05-10 17:50:43 +00:00
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
2022-05-03 11:15:27 +00:00
|
|
|
"Cache file segment is in detached state, operation not allowed. "
|
|
|
|
"It can happen when cache was concurrently dropped with SYSTEM DROP FILESYSTEM CACHE FORCE. "
|
2022-09-06 11:30:02 +00:00
|
|
|
"Please, retry. File segment info: {}", getInfoForLogUnlocked(segment_lock));
|
2022-05-03 11:15:27 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::assertNotDetached() const
|
|
|
|
{
|
|
|
|
std::unique_lock segment_lock(mutex);
|
|
|
|
assertNotDetachedUnlocked(segment_lock);
|
|
|
|
}
|
2022-05-06 10:48:16 +00:00
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::assertNotDetachedUnlocked(std::unique_lock<std::mutex> & segment_lock) const
|
2022-04-11 15:51:49 +00:00
|
|
|
{
|
2022-05-06 10:48:16 +00:00
|
|
|
if (is_detached)
|
2022-05-11 08:45:20 +00:00
|
|
|
throwIfDetachedUnlocked(segment_lock);
|
2022-04-11 15:51:49 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::assertDetachedStatus(std::unique_lock<std::mutex> & segment_lock) const
|
2022-04-20 11:43:07 +00:00
|
|
|
{
|
2022-05-11 08:45:20 +00:00
|
|
|
/// Detached file segment is allowed to have only a certain subset of states.
|
|
|
|
/// It should be either EMPTY or one of the finalized states.
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
if (download_state != State::EMPTY && !hasFinalizedStateUnlocked(segment_lock))
|
2022-05-02 22:27:35 +00:00
|
|
|
{
|
|
|
|
throw Exception(
|
2022-05-07 21:33:28 +00:00
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
2022-05-11 10:05:50 +00:00
|
|
|
"Detached file segment has incorrect state: {}",
|
2022-09-06 11:30:02 +00:00
|
|
|
getInfoForLogUnlocked(segment_lock));
|
2022-05-02 22:27:35 +00:00
|
|
|
}
|
2022-04-20 11:43:07 +00:00
|
|
|
}
|
|
|
|
|
2022-04-07 23:58:55 +00:00
|
|
|
FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std::lock_guard<std::mutex> & /* cache_lock */)
|
2022-04-07 16:46:46 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(file_segment->mutex);
|
2022-08-26 12:13:36 +00:00
|
|
|
|
2022-04-07 16:46:46 +00:00
|
|
|
auto snapshot = std::make_shared<FileSegment>(
|
|
|
|
file_segment->offset(),
|
|
|
|
file_segment->range().size(),
|
|
|
|
file_segment->key(),
|
|
|
|
nullptr,
|
2022-09-06 11:30:02 +00:00
|
|
|
State::EMPTY,
|
|
|
|
CreateFileSegmentSettings{});
|
2022-04-07 16:46:46 +00:00
|
|
|
|
|
|
|
snapshot->hits_count = file_segment->getHitsCount();
|
|
|
|
snapshot->ref_count = file_segment.use_count();
|
2022-08-29 14:17:32 +00:00
|
|
|
snapshot->downloaded_size = file_segment->getDownloadedSizeUnlocked(segment_lock);
|
2022-08-26 12:13:36 +00:00
|
|
|
snapshot->download_state = file_segment->download_state;
|
2022-12-06 10:04:15 +00:00
|
|
|
snapshot->segment_kind = file_segment->getKind();
|
2022-04-07 16:46:46 +00:00
|
|
|
|
|
|
|
return snapshot;
|
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
bool FileSegment::hasFinalizedStateUnlocked(std::unique_lock<std::mutex> & /* segment_lock */) const
|
2022-04-28 10:57:22 +00:00
|
|
|
{
|
|
|
|
return download_state == State::DOWNLOADED
|
|
|
|
|| download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION
|
|
|
|
|| download_state == State::SKIP_CACHE;
|
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
bool FileSegment::isDetached() const
|
|
|
|
{
|
|
|
|
std::unique_lock segment_lock(mutex);
|
|
|
|
return is_detached;
|
|
|
|
}
|
|
|
|
|
2022-12-15 18:39:41 +00:00
|
|
|
bool FileSegment::isCompleted() const
|
|
|
|
{
|
|
|
|
std::unique_lock segment_lock(mutex);
|
|
|
|
return is_completed;
|
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::detach(std::lock_guard<std::mutex> & /* cache_lock */, std::unique_lock<std::mutex> & segment_lock)
|
2022-04-26 10:09:58 +00:00
|
|
|
{
|
2022-05-06 10:48:16 +00:00
|
|
|
if (is_detached)
|
2022-04-26 10:13:21 +00:00
|
|
|
return;
|
|
|
|
|
2022-09-12 12:39:52 +00:00
|
|
|
if (download_state == State::DOWNLOADING)
|
|
|
|
resetDownloadingStateUnlocked(segment_lock);
|
|
|
|
else
|
|
|
|
setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
2022-04-30 13:02:04 +00:00
|
|
|
|
2022-09-12 12:39:52 +00:00
|
|
|
resetDownloaderUnlocked(segment_lock);
|
2022-09-06 11:30:02 +00:00
|
|
|
detachAssumeStateFinalized(segment_lock);
|
2022-04-26 10:09:58 +00:00
|
|
|
}
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
void FileSegment::detachAssumeStateFinalized(std::unique_lock<std::mutex> & segment_lock)
|
2022-05-03 17:17:54 +00:00
|
|
|
{
|
2022-05-06 10:48:16 +00:00
|
|
|
is_detached = true;
|
2022-05-03 17:17:54 +00:00
|
|
|
CurrentMetrics::add(CurrentMetrics::CacheDetachedFileSegments);
|
2022-09-06 11:30:02 +00:00
|
|
|
LOG_TEST(log, "Detached file segment: {}", getInfoForLogUnlocked(segment_lock));
|
2022-05-03 17:17:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
FileSegment::~FileSegment()
|
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(mutex);
|
2022-05-06 10:48:16 +00:00
|
|
|
if (is_detached)
|
2022-05-03 17:17:54 +00:00
|
|
|
CurrentMetrics::sub(CurrentMetrics::CacheDetachedFileSegments);
|
|
|
|
}
|
|
|
|
|
2022-12-06 17:27:05 +00:00
|
|
|
void FileSegmentsHolder::reset()
|
2022-03-17 19:29:07 +00:00
|
|
|
{
|
|
|
|
/// In CacheableReadBufferFromRemoteFS file segment's downloader removes file segments from
|
|
|
|
/// FileSegmentsHolder right after calling file_segment->complete(), so on destruction here
|
|
|
|
/// remain only uncompleted file segments.
|
|
|
|
|
2022-12-06 17:27:05 +00:00
|
|
|
SCOPE_EXIT({
|
|
|
|
file_segments.clear();
|
|
|
|
});
|
|
|
|
|
2022-08-10 05:50:30 +00:00
|
|
|
FileCache * cache = nullptr;
|
2022-03-17 19:29:07 +00:00
|
|
|
|
|
|
|
for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end();)
|
|
|
|
{
|
2022-03-28 23:59:53 +00:00
|
|
|
auto current_file_segment_it = file_segment_it;
|
2022-03-17 19:29:07 +00:00
|
|
|
auto & file_segment = *current_file_segment_it;
|
|
|
|
|
|
|
|
if (!cache)
|
|
|
|
cache = file_segment->cache;
|
|
|
|
|
2022-12-06 10:04:15 +00:00
|
|
|
assert(cache == file_segment->cache); /// all segments should belong to the same cache
|
|
|
|
|
2022-03-17 19:29:07 +00:00
|
|
|
try
|
|
|
|
{
|
2022-05-06 10:48:16 +00:00
|
|
|
bool is_detached = false;
|
2022-05-03 11:15:27 +00:00
|
|
|
|
2022-04-28 20:50:19 +00:00
|
|
|
{
|
2022-09-06 11:30:02 +00:00
|
|
|
std::unique_lock segment_lock(file_segment->mutex);
|
2022-05-06 10:48:16 +00:00
|
|
|
is_detached = file_segment->isDetached(segment_lock);
|
|
|
|
if (is_detached)
|
2022-04-28 20:50:19 +00:00
|
|
|
file_segment->assertDetachedStatus(segment_lock);
|
|
|
|
}
|
2022-05-03 11:15:27 +00:00
|
|
|
|
2022-05-06 10:48:16 +00:00
|
|
|
if (is_detached)
|
2022-04-28 20:50:19 +00:00
|
|
|
{
|
|
|
|
/// This file segment is not owned by cache, so it will be destructed
|
|
|
|
/// at this point, therefore no completion required.
|
|
|
|
file_segment_it = file_segments.erase(current_file_segment_it);
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2022-03-17 19:29:07 +00:00
|
|
|
/// File segment pointer must be reset right after calling complete() and
|
|
|
|
/// under the same mutex, because complete() checks for segment pointers.
|
|
|
|
std::lock_guard cache_lock(cache->mutex);
|
|
|
|
|
2022-09-06 11:30:02 +00:00
|
|
|
file_segment->completeWithoutStateUnlocked(cache_lock);
|
2022-03-17 19:29:07 +00:00
|
|
|
|
2022-03-28 23:59:53 +00:00
|
|
|
file_segment_it = file_segments.erase(current_file_segment_it);
|
2022-03-17 19:29:07 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-12-06 17:27:05 +00:00
|
|
|
FileSegmentsHolder::~FileSegmentsHolder()
|
|
|
|
{
|
|
|
|
reset();
|
|
|
|
}
|
|
|
|
|
2022-02-12 22:20:05 +00:00
|
|
|
String FileSegmentsHolder::toString()
|
|
|
|
{
|
|
|
|
String ranges;
|
|
|
|
for (const auto & file_segment : file_segments)
|
|
|
|
{
|
|
|
|
if (!ranges.empty())
|
|
|
|
ranges += ", ";
|
|
|
|
ranges += file_segment->range().toString();
|
|
|
|
}
|
|
|
|
return ranges;
|
|
|
|
}
|
|
|
|
|
2022-01-22 22:56:24 +00:00
|
|
|
}
|