mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Fix partially downloaded no continuation case
This commit is contained in:
parent
cf49f41f75
commit
710bba895e
@ -18,6 +18,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int FILE_CACHE_ERROR;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -92,19 +93,6 @@ LRUFileCache::FileSegmentCell * LRUFileCache::getCell(
|
||||
return &cell_it->second;
|
||||
}
|
||||
|
||||
void LRUFileCache::removeCell(
|
||||
const Key & key, size_t offset, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
auto * cell = getCell(key, offset, cache_lock);
|
||||
if (!cell)
|
||||
throw Exception(ErrorCodes::FILE_CACHE_ERROR, "No cache cell for key: {}, offset: {}", keyToStr(key), offset);
|
||||
|
||||
cell->file_segment->detached = true;
|
||||
|
||||
if (cell->queue_iterator)
|
||||
queue.erase(*cell->queue_iterator);
|
||||
}
|
||||
|
||||
FileSegments LRUFileCache::getImpl(
|
||||
const Key & key, const FileSegment::Range & range, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
@ -385,7 +373,12 @@ void LRUFileCache::remove(
|
||||
{
|
||||
LOG_TEST(log, "Remove. Key: {}, offset: {}", keyToStr(key), offset);
|
||||
|
||||
removeCell(key, offset, cache_lock);
|
||||
auto * cell = getCell(key, offset, cache_lock);
|
||||
if (!cell)
|
||||
throw Exception(ErrorCodes::FILE_CACHE_ERROR, "No cache cell for key: {}, offset: {}", keyToStr(key), offset);
|
||||
|
||||
if (cell->queue_iterator)
|
||||
queue.erase(*cell->queue_iterator);
|
||||
|
||||
auto & offsets = files[key];
|
||||
offsets.erase(offset);
|
||||
@ -484,8 +477,6 @@ void LRUFileCache::remove(const Key & key)
|
||||
|
||||
for (auto & [offset, _] : offsets)
|
||||
remove(key, offset, cache_lock);
|
||||
|
||||
removeFileKey(key);
|
||||
}
|
||||
|
||||
void LRUFileCache::removeFileKey(const Key & key)
|
||||
@ -557,14 +548,6 @@ void LRUFileCache::reduceSizeToDownloaded(
|
||||
throw Exception(ErrorCodes::FILE_CACHE_ERROR,
|
||||
"Nothing to reduce, file segment fully downloaded, key: {}, offset: {}", keyToStr(key), offset);
|
||||
|
||||
file_segment->download_state = FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
|
||||
file_segment->detached = true;
|
||||
|
||||
/**
|
||||
* Create a new file segment as file segment's size is static and cannot be changed. Size is static because
|
||||
* there is an invariant, that list of shared pointers to file segments returned to users in FileSegmentsHolder
|
||||
* represents a contiguous range without holes.
|
||||
*/
|
||||
cell->file_segment = std::make_shared<FileSegment>(offset, downloaded_size, key, this, FileSegment::State::DOWNLOADED);
|
||||
}
|
||||
|
||||
@ -618,7 +601,7 @@ String LRUFileCache::dumpStructure()
|
||||
auto [key, offset] = *it;
|
||||
auto * cell = getCell(key, offset, cache_lock);
|
||||
result << (it != queue.begin() ? ", " : "") << cell->file_segment->range().toString();
|
||||
result << "(state: " << cell->file_segment->state() << ")";
|
||||
result << "(state: " << cell->file_segment->download_state << ")";
|
||||
}
|
||||
return result.str();
|
||||
}
|
||||
|
@ -126,10 +126,6 @@ private:
|
||||
Poco::Logger * log;
|
||||
bool startup_restore_finished = false;
|
||||
|
||||
size_t available() const { return max_size - current_size; }
|
||||
|
||||
void restore();
|
||||
|
||||
/**
|
||||
* Get list of file segments which intesect with `range`.
|
||||
* If `key` is not in cache or there is not such range, return std::nullopt.
|
||||
@ -145,9 +141,6 @@ private:
|
||||
const Key & key, size_t offset, size_t size,
|
||||
FileSegment::State state, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void removeCell(
|
||||
const Key & key, size_t offset, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void useCell(const FileSegmentCell & cell, FileSegments & result, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
bool tryReserve(
|
||||
@ -167,6 +160,10 @@ private:
|
||||
void reduceSizeToDownloaded(
|
||||
const Key & key, size_t offset, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock) override;
|
||||
|
||||
size_t available() const { return max_size - current_size; }
|
||||
|
||||
void restore();
|
||||
|
||||
public:
|
||||
struct Stat
|
||||
{
|
||||
|
@ -143,21 +143,17 @@ void FileSegment::complete(std::optional<State> state)
|
||||
{
|
||||
/**
|
||||
* Either downloader calls file_segment->complete(state) manually or
|
||||
* file_segment->complete() is called with no state from its FileSegmentsHolder desctructor.
|
||||
* file_segment->complete() is called with no state from its FileSegmentsHolder destructor.
|
||||
*
|
||||
* Downloader can call complete(state) with either DOWNLOADED or
|
||||
* PARTIALLY_DOWNLOADED_NO_CONTINUATION (in case space reservation failed).
|
||||
*
|
||||
* If complete() is called from FileSegmentsHolder desctructor -- actions are taken
|
||||
* according to current download_state and only in case `detached==false`, meaning than
|
||||
* this filesegment is present in cache cell. If file segment was removed from cache cell,
|
||||
* it has `detached=true`, so that other threads will know that no clean up is required from them.
|
||||
* complete() without any arguments should be called only from destructor of FileSegmentsHolder.
|
||||
*/
|
||||
|
||||
{
|
||||
std::lock_guard segment_lock(mutex);
|
||||
|
||||
bool download_can_continue = false;
|
||||
bool is_downloader = downloader_id == getCallerId();
|
||||
|
||||
if (state)
|
||||
@ -171,43 +167,47 @@ void FileSegment::complete(std::optional<State> state)
|
||||
"Cannot complete file segment with state: {}", toString(*state));
|
||||
|
||||
download_state = *state;
|
||||
if (download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION)
|
||||
{
|
||||
std::lock_guard cache_lock(cache->mutex);
|
||||
|
||||
if (downloaded_size)
|
||||
cache->reduceSizeToDownloaded(key(), offset(), cache_lock);
|
||||
else
|
||||
cache->remove(key(), offset(), cache_lock);
|
||||
}
|
||||
}
|
||||
else if (!detached)
|
||||
else
|
||||
{
|
||||
if (downloaded_size == range().size())
|
||||
if (download_state == State::SKIP_CACHE)
|
||||
return;
|
||||
|
||||
if (downloaded_size == range().size() && download_state != State::DOWNLOADED)
|
||||
download_state = State::DOWNLOADED;
|
||||
|
||||
if (download_state == State::DOWNLOADING)
|
||||
download_state = State::PARTIALLY_DOWNLOADED;
|
||||
}
|
||||
|
||||
if (download_state == State::PARTIALLY_DOWNLOADED
|
||||
|| download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION)
|
||||
bool download_can_continue = false;
|
||||
|
||||
if (download_state == State::PARTIALLY_DOWNLOADED
|
||||
|| download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION)
|
||||
{
|
||||
std::lock_guard cache_lock(cache->mutex);
|
||||
|
||||
bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock);
|
||||
download_can_continue = !is_last_holder && download_state == State::PARTIALLY_DOWNLOADED;
|
||||
|
||||
if (!download_can_continue)
|
||||
{
|
||||
std::lock_guard cache_lock(cache->mutex);
|
||||
|
||||
bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock);
|
||||
download_can_continue = !is_last_holder && download_state == State::PARTIALLY_DOWNLOADED;
|
||||
|
||||
if (!download_can_continue)
|
||||
if (!downloaded_size)
|
||||
{
|
||||
bool is_responsible_for_cell = is_downloader || (downloader_id.empty() && is_last_holder);
|
||||
if (is_responsible_for_cell)
|
||||
{
|
||||
if (downloaded_size)
|
||||
cache->reduceSizeToDownloaded(key(), offset(), cache_lock);
|
||||
else
|
||||
cache->remove(key(), offset(), cache_lock);
|
||||
}
|
||||
download_state = State::SKIP_CACHE;
|
||||
cache->remove(key(), offset(), cache_lock);
|
||||
}
|
||||
else if (is_last_holder)
|
||||
{
|
||||
/**
|
||||
* Only last holder of current file segment can resize the cell,
|
||||
* because there is an invariant that file segments returned to users
|
||||
* in FileSegmentsHolder represent a contiguous range, so we can resize
|
||||
* it only when nobody needs it.
|
||||
*/
|
||||
cache->reduceSizeToDownloaded(key(), offset(), cache_lock);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -117,9 +117,6 @@ private:
|
||||
|
||||
Key file_key;
|
||||
FileCache * cache;
|
||||
|
||||
/// Removed from cache cell.
|
||||
bool detached = false;
|
||||
};
|
||||
|
||||
struct FileSegmentsHolder : boost::noncopyable
|
||||
|
@ -142,7 +142,7 @@ bool CacheableReadBufferFromRemoteFS::completeFileSegmentAndGetNext()
|
||||
|
||||
auto file_segment_it = current_file_segment_it++;
|
||||
|
||||
auto range = (*file_segment_it)->range();
|
||||
[[maybe_unused]] auto range = (*file_segment_it)->range();
|
||||
assert(file_offset_of_buffer_end > range.right);
|
||||
|
||||
/// Only downloader completes file segment.
|
||||
@ -207,7 +207,8 @@ bool CacheableReadBufferFromRemoteFS::nextImpl()
|
||||
if (!completeFileSegmentAndGetNext())
|
||||
return false;
|
||||
}
|
||||
else if (current_state == FileSegment::State::PARTIALLY_DOWNLOADED
|
||||
|
||||
if (current_state == FileSegment::State::PARTIALLY_DOWNLOADED
|
||||
|| current_state == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION)
|
||||
{
|
||||
checkForPartialDownload();
|
||||
@ -243,9 +244,8 @@ bool CacheableReadBufferFromRemoteFS::nextImpl()
|
||||
size_t size = impl->buffer().size();
|
||||
|
||||
if (file_segment->reserve(size))
|
||||
file_segment->write(impl->buffer().begin(), impl->buffer().size());
|
||||
file_segment->write(impl->buffer().begin(), size);
|
||||
else
|
||||
/// TODO: This is incorrect. We steal hold file segment. Need to release.
|
||||
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
||||
}
|
||||
}
|
||||
|
@ -15,6 +15,10 @@
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<s3_max_single_read_retries>10</s3_max_single_read_retries>
|
||||
</unstable_s3>
|
||||
<hdd>
|
||||
<type>local</type>
|
||||
<path>/</path>
|
||||
</hdd>
|
||||
<s3_with_cache>
|
||||
<type>s3</type>
|
||||
<endpoint>http://minio1:9001/root/data/</endpoint>
|
||||
@ -30,6 +34,9 @@
|
||||
<main>
|
||||
<disk>s3</disk>
|
||||
</main>
|
||||
<external>
|
||||
<disk>hdd</disk>
|
||||
</external>
|
||||
</volumes>
|
||||
</s3>
|
||||
<unstable_s3>
|
||||
|
Loading…
Reference in New Issue
Block a user