mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
commit
b149f28cd3
@ -575,7 +575,13 @@ void LRUFileCache::loadCacheInfoIntoMemory()
|
||||
pcg64 generator(randomSeed());
|
||||
std::shuffle(cells.begin(), cells.end(), generator);
|
||||
for (const auto & cell : cells)
|
||||
queue.splice(queue.end(), queue, *cell->queue_iterator);
|
||||
{
|
||||
/// Cell cache size changed and, for example, 1st file segment fits into cache
|
||||
/// and 2nd file segment will fit only if first was evicted, then first will be removed and
|
||||
/// cell is nullptr here.
|
||||
if (cell)
|
||||
queue.splice(queue.end(), queue, *cell->queue_iterator);
|
||||
}
|
||||
}
|
||||
|
||||
LRUFileCache::Stat LRUFileCache::getStat()
|
||||
|
@ -264,6 +264,7 @@ void FileSegment::setDownloaded(std::lock_guard<std::mutex> & /* segment_lock */
|
||||
{
|
||||
cache_writer->finalize();
|
||||
cache_writer.reset();
|
||||
remote_file_reader.reset();
|
||||
}
|
||||
}
|
||||
|
||||
@ -285,7 +286,7 @@ void FileSegment::completeBatchAndResetDownloader()
|
||||
cv.notify_all();
|
||||
}
|
||||
|
||||
void FileSegment::complete(State state, bool complete_because_of_error)
|
||||
void FileSegment::complete(State state)
|
||||
{
|
||||
{
|
||||
std::lock_guard segment_lock(mutex);
|
||||
@ -307,12 +308,6 @@ void FileSegment::complete(State state, bool complete_because_of_error)
|
||||
"Cannot complete file segment with state: {}", stateToString(state));
|
||||
}
|
||||
|
||||
if (complete_because_of_error)
|
||||
{
|
||||
/// Let's use a new buffer on the next attempt in this case.
|
||||
remote_file_reader.reset();
|
||||
}
|
||||
|
||||
download_state = state;
|
||||
}
|
||||
|
||||
@ -389,6 +384,7 @@ void FileSegment::completeImpl(bool allow_non_strict_checking)
|
||||
{
|
||||
cache_writer->finalize();
|
||||
cache_writer.reset();
|
||||
remote_file_reader.reset();
|
||||
}
|
||||
|
||||
assert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(cache->getPathInLocalCache(key(), offset())) > 0);
|
||||
|
@ -117,7 +117,7 @@ public:
|
||||
|
||||
void completeBatchAndResetDownloader();
|
||||
|
||||
void complete(State state, bool complete_because_of_error = false);
|
||||
void complete(State state);
|
||||
|
||||
String getInfoForLog() const;
|
||||
|
||||
|
@ -103,7 +103,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSe
|
||||
{
|
||||
/// Result buffer is owned only by current buffer -- not shareable like in the case above.
|
||||
|
||||
if (remote_file_reader)
|
||||
if (remote_file_reader && remote_file_reader->getFileOffsetOfBufferEnd() == file_offset_of_buffer_end)
|
||||
return remote_file_reader;
|
||||
|
||||
remote_file_reader = remote_file_reader_creator();
|
||||
@ -407,12 +407,34 @@ void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr & file_segment)
|
||||
}
|
||||
else
|
||||
{
|
||||
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
||||
/// We were predownloading:
|
||||
/// segment{1}
|
||||
/// cache: [_____|___________
|
||||
/// ^
|
||||
/// download_offset
|
||||
/// requested_range: [__________]
|
||||
/// ^
|
||||
/// file_offset_of_buffer_end
|
||||
/// But space reservation failed.
|
||||
/// So get working and internal buffer from predownload buffer, get new download buffer,
|
||||
/// return buffer back, seek to actual position.
|
||||
/// We could reuse predownload buffer and just seek to needed position, but for now
|
||||
/// seek is only allowed once for ReadBufferForS3 - before call to nextImpl.
|
||||
/// TODO: allow seek more than once with seek avoiding.
|
||||
|
||||
bytes_to_predownload = 0;
|
||||
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
||||
|
||||
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
|
||||
file_segment->completeBatchAndResetDownloader();
|
||||
|
||||
swap(*implementation_buffer);
|
||||
working_buffer.resize(0);
|
||||
position() = working_buffer.end();
|
||||
|
||||
implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type);
|
||||
|
||||
swap(*implementation_buffer);
|
||||
|
||||
implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET);
|
||||
|
||||
LOG_TEST(
|
||||
@ -484,6 +506,19 @@ bool CachedReadBufferFromRemoteFS::updateImplementationBufferIfNeeded()
|
||||
}
|
||||
|
||||
bool CachedReadBufferFromRemoteFS::nextImpl()
|
||||
{
|
||||
try
|
||||
{
|
||||
return nextImplStep();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
e.addMessage("Cache info: {}", getInfoForLog());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
bool CachedReadBufferFromRemoteFS::nextImplStep()
|
||||
{
|
||||
if (IFileCache::shouldBypassCache())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Using cache when not allowed");
|
||||
|
@ -48,6 +48,8 @@ private:
|
||||
|
||||
void predownload(FileSegmentPtr & file_segment);
|
||||
|
||||
bool nextImplStep();
|
||||
|
||||
enum class ReadType
|
||||
{
|
||||
CACHED,
|
||||
|
Loading…
Reference in New Issue
Block a user