ClickHouse/src/Common/FileSegment.cpp

416 lines
13 KiB
C++
Raw Normal View History

2022-01-22 22:56:24 +00:00
#include "FileSegment.h"
#include <base/getThreadId.h>
2022-01-22 22:56:24 +00:00
#include <Common/FileCache.h>
#include <Common/hex.h>
2022-02-02 14:25:25 +00:00
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <filesystem>
2022-01-22 22:56:24 +00:00
namespace DB
{
namespace ErrorCodes
{
2022-02-18 15:38:23 +00:00
extern const int REMOTE_FS_OBJECT_CACHE_ERROR;
extern const int LOGICAL_ERROR;
2022-01-22 22:56:24 +00:00
}
FileSegment::FileSegment(
size_t offset_,
size_t size_,
2022-01-23 16:51:18 +00:00
const Key & key_,
2022-02-18 15:38:23 +00:00
IFileCache * cache_,
2022-01-22 22:56:24 +00:00
State download_state_)
: segment_range(offset_, offset_ + size_ - 1)
, download_state(download_state_)
, file_key(key_)
, cache(cache_)
2022-02-18 15:38:23 +00:00
#ifndef NDEBUG
, log(&Poco::Logger::get(fmt::format("FileSegment({}) : {}", getHexUIntLowercase(key_), range().toString())))
2022-02-18 15:38:23 +00:00
#else
, log(&Poco::Logger::get("FileSegment"))
#endif
2022-01-22 22:56:24 +00:00
{
2022-01-23 16:51:18 +00:00
if (download_state == State::DOWNLOADED)
reserved_size = downloaded_size = size_;
else if (download_state != State::EMPTY)
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Can create cell with either DOWNLOADED or EMPTY state");
2022-01-22 22:56:24 +00:00
}
2022-01-23 16:51:18 +00:00
FileSegment::State FileSegment::state() const
2022-01-22 22:56:24 +00:00
{
std::lock_guard segment_lock(mutex);
2022-01-23 16:51:18 +00:00
return download_state;
2022-01-22 22:56:24 +00:00
}
2022-02-18 15:38:23 +00:00
size_t FileSegment::getDownloadOffset() const
2022-01-22 22:56:24 +00:00
{
std::lock_guard segment_lock(mutex);
2022-02-02 14:25:25 +00:00
return range().left + downloaded_size;
2022-01-22 22:56:24 +00:00
}
String FileSegment::getCallerId()
{
if (!CurrentThread::isInitialized() || CurrentThread::getQueryId().size == 0)
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cannot use cache without query id");
2022-01-22 22:56:24 +00:00
return CurrentThread::getQueryId().toString() + ":" + toString(getThreadId());
2022-01-22 22:56:24 +00:00
}
String FileSegment::getOrSetDownloader()
{
std::lock_guard segment_lock(mutex);
if (downloader_id.empty())
{
2022-02-01 19:10:56 +00:00
if (download_state != State::EMPTY
&& download_state != State::PARTIALLY_DOWNLOADED)
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
2022-02-01 19:10:56 +00:00
"Can set downloader only for file segment with state EMPTY or PARTIALLY_DOWNLOADED, but got: {}",
download_state);
2022-01-22 22:56:24 +00:00
downloader_id = getCallerId();
LOG_TEST(log, "Set downloader: {}, prev state: {}", downloader_id, stateToString(download_state));
2022-01-22 22:56:24 +00:00
download_state = State::DOWNLOADING;
}
else if (downloader_id == getCallerId())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to set the same downloader for segment {} for the second time", range().toString());
2022-01-22 22:56:24 +00:00
LOG_TEST(log, "Returning with downloader: {} and state: {}", downloader_id, stateToString(download_state));
return downloader_id;
}
String FileSegment::getDownloader() const
{
std::lock_guard segment_lock(mutex);
2022-01-22 22:56:24 +00:00
return downloader_id;
}
bool FileSegment::isDownloader() const
{
std::lock_guard segment_lock(mutex);
2022-02-01 19:10:56 +00:00
LOG_TEST(log, "Checking for current downloader. Caller: {}, downloader: {}, current state: {}", getCallerId(), downloader_id, stateToString(download_state));
2022-01-22 22:56:24 +00:00
return getCallerId() == downloader_id;
}
FileSegment::RemoteFileReaderPtr FileSegment::getRemoteFileReader()
{
if (!isDownloader())
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Only downloader can use remote filesystem file reader");
return remote_file_reader;
}
void FileSegment::setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_)
{
if (!isDownloader())
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Only downloader can use remote filesystem file reader");
if (remote_file_reader)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Remote file reader already exists");
remote_file_reader = remote_file_reader_;
}
2022-01-22 22:56:24 +00:00
void FileSegment::write(const char * from, size_t size)
{
2022-01-23 16:51:18 +00:00
if (!size)
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing zero size is not allowed");
2022-01-23 16:51:18 +00:00
2022-01-24 22:07:02 +00:00
if (availableSize() < size)
2022-01-22 22:56:24 +00:00
throw Exception(
2022-02-18 15:38:23 +00:00
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
2022-01-24 22:07:02 +00:00
"Not enough space is reserved. Available: {}, expected: {}", availableSize(), size);
2022-01-22 22:56:24 +00:00
2022-01-23 16:51:18 +00:00
if (!isDownloader())
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Only downloader can do the downloading. (CallerId: {}, DownloaderId: {})",
getCallerId(), downloader_id);
2022-01-23 16:51:18 +00:00
if (!cache_writer)
2022-01-22 22:56:24 +00:00
{
2022-02-18 15:38:23 +00:00
auto download_path = cache->getPathInLocalCache(key(), offset());
cache_writer = std::make_unique<WriteBufferFromFile>(download_path);
2022-01-22 22:56:24 +00:00
}
2022-02-02 14:25:25 +00:00
try
{
cache_writer->write(from, size);
cache_writer->next();
}
catch (...)
{
2022-02-18 15:38:23 +00:00
std::lock_guard segment_lock(mutex);
2022-02-15 09:11:33 +00:00
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
2022-02-18 15:38:23 +00:00
cache_writer->finalize();
cache_writer.reset();
2022-02-02 14:25:25 +00:00
throw;
}
2022-02-01 19:10:56 +00:00
2022-02-18 15:38:23 +00:00
std::lock_guard segment_lock(mutex);
2022-01-22 22:56:24 +00:00
downloaded_size += size;
}
FileSegment::State FileSegment::wait()
{
std::unique_lock segment_lock(mutex);
if (download_state == State::EMPTY)
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cannot wait on a file segment with empty state");
2022-01-22 22:56:24 +00:00
if (download_state == State::DOWNLOADING)
{
LOG_TEST(log, "{} waiting on: {}, current downloader: {}", getCallerId(), range().toString(), downloader_id);
2022-01-26 09:35:46 +00:00
2022-02-01 19:10:56 +00:00
assert(!downloader_id.empty());
assert(downloader_id != getCallerId());
2022-01-26 09:35:46 +00:00
#ifndef NDEBUG
2022-02-23 10:12:14 +00:00
// {
// std::lock_guard cache_lock(cache->mutex);
// assert(!cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock));
// }
2022-01-26 09:35:46 +00:00
#endif
2022-02-21 12:54:03 +00:00
cv.wait_for(segment_lock, std::chrono::seconds(60));
2022-01-22 22:56:24 +00:00
}
return download_state;
}
bool FileSegment::reserve(size_t size)
{
if (!size)
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Zero space reservation is not allowed");
2022-01-22 22:56:24 +00:00
std::lock_guard segment_lock(mutex);
2022-01-23 16:51:18 +00:00
if (downloaded_size + size > range().size())
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
2022-01-23 16:51:18 +00:00
"Attempt to reserve space too much space ({}) for file segment with range: {} (downloaded size: {})",
size, range().toString(), downloaded_size);
2022-01-22 22:56:24 +00:00
2022-01-26 09:35:46 +00:00
auto caller_id = getCallerId();
if (downloader_id != caller_id)
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Space can be reserved only by downloader (current: {}, expected: {})", caller_id, downloader_id);
2022-01-22 22:56:24 +00:00
assert(reserved_size >= downloaded_size);
/**
* It is possible to have downloaded_size < reserved_size when reserve is called
* in case previous downloader did not fully download current file_segment
* and the caller is going to continue;
*/
size_t free_space = reserved_size - downloaded_size;
size_t size_to_reserve = size - free_space;
2022-02-18 15:38:23 +00:00
std::lock_guard cache_lock(cache->mutex);
2022-01-23 16:51:18 +00:00
bool reserved = cache->tryReserve(key(), offset(), size_to_reserve, cache_lock);
2022-01-22 22:56:24 +00:00
if (reserved)
reserved_size += size;
return reserved;
}
2022-02-23 10:12:14 +00:00
void FileSegment::setDownloaded(std::lock_guard<std::mutex> & /* segment_lock */)
{
download_state = State::DOWNLOADED;
is_downloaded = true;
if (cache_writer)
{
cache_writer->finalize();
cache_writer.reset();
}
}
void FileSegment::completeBatchAndResetDownloader()
2022-01-22 22:56:24 +00:00
{
{
std::lock_guard segment_lock(mutex);
2022-01-23 16:51:18 +00:00
bool is_downloader = downloader_id == getCallerId();
2022-01-26 09:35:46 +00:00
if (!is_downloader)
{
cv.notify_all();
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "File segment can be completed only by downloader");
2022-01-26 09:35:46 +00:00
}
if (downloaded_size == range().size())
{
2022-02-23 10:12:14 +00:00
setDownloaded(segment_lock);
}
else
download_state = State::PARTIALLY_DOWNLOADED;
2022-01-26 09:35:46 +00:00
downloader_id.clear();
LOG_TEST(log, "Complete batch. Current downloaded size: {}", downloaded_size);
2022-01-26 09:35:46 +00:00
}
cv.notify_all();
}
2022-02-15 09:11:33 +00:00
void FileSegment::complete(State state, bool complete_because_of_error)
2022-01-26 09:35:46 +00:00
{
{
std::lock_guard segment_lock(mutex);
2022-01-22 22:56:24 +00:00
2022-01-26 09:35:46 +00:00
bool is_downloader = downloader_id == getCallerId();
2022-01-24 22:07:02 +00:00
if (!is_downloader)
2022-01-26 09:35:46 +00:00
{
cv.notify_all();
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
2022-01-24 22:07:02 +00:00
"File segment can be completed only by downloader or downloader's FileSegmentsHodler");
2022-01-26 09:35:46 +00:00
}
2022-01-22 22:56:24 +00:00
2022-01-24 22:07:02 +00:00
if (state != State::DOWNLOADED
&& state != State::PARTIALLY_DOWNLOADED
&& state != State::PARTIALLY_DOWNLOADED_NO_CONTINUATION)
2022-01-26 09:35:46 +00:00
{
cv.notify_all();
2022-02-18 15:38:23 +00:00
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Cannot complete file segment with state: {}", stateToString(state));
2022-01-26 09:35:46 +00:00
}
2022-01-22 22:56:24 +00:00
2022-02-15 09:11:33 +00:00
if (complete_because_of_error)
{
/// Let's use a new buffer on the next attempt in this case.
remote_file_reader.reset();
}
2022-01-24 22:07:02 +00:00
download_state = state;
completeImpl(segment_lock);
}
2022-01-24 22:07:02 +00:00
cv.notify_all();
}
2022-01-22 22:56:24 +00:00
2022-01-24 22:07:02 +00:00
void FileSegment::complete()
{
{
std::lock_guard segment_lock(mutex);
2022-01-22 22:56:24 +00:00
2022-02-15 10:27:44 +00:00
if (download_state == State::SKIP_CACHE || detached)
2022-01-24 22:07:02 +00:00
return;
2022-01-24 22:07:02 +00:00
if (downloaded_size == range().size() && download_state != State::DOWNLOADED)
{
2022-02-23 10:12:14 +00:00
setDownloaded(segment_lock);
}
2022-01-23 16:51:18 +00:00
2022-01-26 09:35:46 +00:00
if (download_state == State::DOWNLOADING || download_state == State::EMPTY)
2022-01-24 22:07:02 +00:00
download_state = State::PARTIALLY_DOWNLOADED;
2022-01-22 22:56:24 +00:00
2022-01-24 22:07:02 +00:00
completeImpl(segment_lock);
}
2022-01-24 22:07:02 +00:00
cv.notify_all();
}
2022-01-23 16:51:18 +00:00
2022-02-23 10:12:14 +00:00
void FileSegment::completeImpl(std::lock_guard<std::mutex> & segment_lock)
2022-01-24 22:07:02 +00:00
{
2022-02-15 10:27:44 +00:00
std::lock_guard cache_lock(cache->mutex);
2022-01-24 22:07:02 +00:00
bool download_can_continue = false;
if (download_state == State::PARTIALLY_DOWNLOADED
|| download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION)
{
2022-02-23 10:12:14 +00:00
bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock);
2022-01-24 22:07:02 +00:00
download_can_continue = !is_last_holder && download_state == State::PARTIALLY_DOWNLOADED;
2022-01-23 16:51:18 +00:00
if (!download_can_continue)
2022-01-24 22:07:02 +00:00
{
if (!downloaded_size)
{
download_state = State::SKIP_CACHE;
LOG_TEST(log, "Remove cell {} (downloaded: {})", range().toString(), downloaded_size);
2022-02-23 10:12:14 +00:00
cache->remove(key(), offset(), cache_lock, segment_lock);
2022-02-15 10:27:44 +00:00
detached = true;
2022-01-24 22:07:02 +00:00
}
else if (is_last_holder)
{
/**
* Only last holder of current file segment can resize the cell,
* because there is an invariant that file segments returned to users
* in FileSegmentsHolder represent a contiguous range, so we can resize
* it only when nobody needs it.
*/
LOG_TEST(log, "Resize cell {} to downloaded: {}", range().toString(), downloaded_size);
2022-02-23 10:12:14 +00:00
cache->reduceSizeToDownloaded(key(), offset(), cache_lock, segment_lock);
2022-02-15 10:27:44 +00:00
detached = true;
2022-01-24 22:07:02 +00:00
}
}
2022-01-22 22:56:24 +00:00
}
2022-01-24 22:07:02 +00:00
if (downloader_id == getCallerId())
2022-01-26 09:35:46 +00:00
{
LOG_TEST(log, "Clearing downloader id: {}, current state: {}", downloader_id, stateToString(download_state));
2022-01-24 22:07:02 +00:00
downloader_id.clear();
2022-01-26 09:35:46 +00:00
}
2022-01-24 22:07:02 +00:00
if (!download_can_continue && cache_writer)
2022-01-24 22:07:02 +00:00
{
2022-02-18 15:38:23 +00:00
cache_writer->finalize();
cache_writer.reset();
2022-01-24 22:07:02 +00:00
}
2022-02-18 15:38:23 +00:00
assert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(cache->getPathInLocalCache(key(), offset())) > 0);
2022-01-22 22:56:24 +00:00
}
2022-02-02 14:25:25 +00:00
String FileSegment::getInfoForLog() const
{
std::lock_guard segment_lock(mutex);
WriteBufferFromOwnString info;
info << "File segment: " << range().toString() << ", ";
info << "state: " << download_state << ", ";
info << "downloaded size: " << downloaded_size << ", ";
info << "downloader id: " << downloader_id << ", ";
info << "caller id: " << getCallerId();
return info.str();
}
String FileSegment::stateToString(FileSegment::State state)
2022-01-22 22:56:24 +00:00
{
switch (state)
{
case FileSegment::State::DOWNLOADED:
return "DOWNLOADED";
case FileSegment::State::EMPTY:
return "EMPTY";
case FileSegment::State::DOWNLOADING:
return "DOWNLOADING";
case FileSegment::State::PARTIALLY_DOWNLOADED:
return "PARTIALLY DOWNLOADED";
case FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION:
return "PARTIALLY DOWNLOADED NO CONTINUATION";
case FileSegment::State::SKIP_CACHE:
return "SKIP_CACHE";
}
}
2022-02-12 22:20:05 +00:00
String FileSegmentsHolder::toString()
{
std::lock_guard lock(mutex);
String ranges;
for (const auto & file_segment : file_segments)
{
if (!ranges.empty())
ranges += ", ";
ranges += file_segment->range().toString();
}
return ranges;
}
2022-01-22 22:56:24 +00:00
}