Fix some corner cases

This commit is contained in:
kssenii 2022-03-07 14:30:57 +01:00
parent 2dcfe66236
commit 692b247379
10 changed files with 104 additions and 11 deletions

View File

@ -429,6 +429,40 @@ bool LRUFileCache::tryReserve(
return true;
}
void LRUFileCache::remove(const Key & key)
{
std::lock_guard cache_lock(mutex);
auto it = files.find(key);
if (it == files.end())
return;
auto & offsets = it->second;
std::vector<FileSegmentCell *> to_remove;
to_remove.reserve(offsets.size());
for (auto & [offset, cell] : offsets)
to_remove.push_back(&cell);
for (auto & cell : to_remove)
{
auto file_segment = cell->file_segment;
if (file_segment)
{
std::lock_guard<std::mutex> segment_lock(file_segment->mutex);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
}
}
auto key_path = getPathInLocalCache(key);
files.erase(key);
if (fs::exists(key_path))
fs::remove(key_path);
}
void LRUFileCache::remove(
Key key, size_t offset,
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */)

View File

@ -41,6 +41,8 @@ public:
/// Restore cache from local filesystem.
virtual void initialize() = 0;
virtual void remove(const Key & key) = 0;
static bool shouldBypassCache();
/// Cache capacity in bytes.
@ -115,6 +117,8 @@ public:
void initialize() override;
void remove(const Key & key) override;
private:
using FileKeyAndOffset = std::pair<Key, size_t>;
using LRUQueue = std::list<FileKeyAndOffset>;

View File

@ -13,6 +13,7 @@
#include <boost/algorithm/string.hpp>
#include <Common/filesystemHelpers.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Common/FileCache.h>
namespace DB
@ -281,7 +282,16 @@ void IDiskRemote::removeMetadata(const String & path, RemoteFSPathKeeperPtr fs_p
if (metadata.ref_count == 0)
{
for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects)
{
fs_paths_keeper->addPath(remote_fs_root_path + remote_fs_object_path);
if (cache)
{
auto key = cache->hash(remote_fs_object_path);
cache->remove(key);
}
}
return false;
}
else /// In other case decrement number of references, save metadata and delete hardlink.
@ -441,6 +451,7 @@ void IDiskRemote::removeSharedFile(const String & path, bool delete_metadata_onl
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
removeMetadata(path, fs_paths_keeper);
if (!delete_metadata_only)
removeFromRemoteFS(fs_paths_keeper);
}
@ -449,6 +460,7 @@ void IDiskRemote::removeSharedFile(const String & path, bool delete_metadata_onl
void IDiskRemote::removeSharedFileIfExists(const String & path, bool delete_metadata_only)
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
if (metadata_disk->exists(path))
{
removeMetadata(path, fs_paths_keeper);
@ -475,6 +487,7 @@ void IDiskRemote::removeSharedRecursive(const String & path, bool delete_metadat
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
removeMetadataRecursive(path, fs_paths_keeper);
if (!delete_metadata_only)
removeFromRemoteFS(fs_paths_keeper);
}

View File

@ -87,8 +87,8 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::hasPendingDataToRead()
return false;
if (file_offset_of_buffer_end > *read_until_position)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})",
file_offset_of_buffer_end, *read_until_position);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {}, info: {})",
file_offset_of_buffer_end, *read_until_position, impl->getInfoForLog());
}
else if (must_read_until_position)
throw Exception(ErrorCodes::LOGICAL_ERROR,
@ -136,8 +136,11 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t pos
if (prefetch_future.valid())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Prefetch is valid in readUntilPosition");
read_until_position = position;
impl->setReadUntilPosition(*read_until_position);
if (position > read_until_position)
{
read_until_position = position;
impl->setReadUntilPosition(*read_until_position);
}
}
@ -171,7 +174,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
LOG_TEST(log, "Current size: {}, offset: {}", size, offset);
/// If prefetch_future is valid, size should always be greater than zero.
assert(offset < size && size > 0);
assert(offset < size);
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
}
@ -188,7 +191,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
auto offset = result.offset;
LOG_TEST(log, "Current size: {}, offset: {}", size, offset);
assert(offset < size || size == 0);
assert(offset < size);
if (size)
{
@ -198,6 +201,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
}
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
assert(file_offset_of_buffer_end == impl->getImplementationBufferOffset());
prefetch_future = {};
return size;

View File

@ -604,17 +604,16 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
}
}
/// Local filesystem does not support bounded reads.
if (read_type == ReadType::CACHED && std::next(current_file_segment_it) == file_segments_holder->file_segments.end())
if (std::next(current_file_segment_it) == file_segments_holder->file_segments.end())
{
size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1;
size = std::min(size, remaining_size_to_read);
implementation_buffer->buffer().resize(size);
implementation_buffer->buffer().resize(nextimpl_working_buffer_offset + size);
}
file_offset_of_buffer_end += size;
assert(read_type == ReadType::CACHED || file_offset_of_buffer_end == implementation_buffer->getFileOffsetOfBufferEnd());
// assert(read_type == ReadType::CACHED || file_offset_of_buffer_end == implementation_buffer->getFileOffsetOfBufferEnd());
}
swap(*implementation_buffer);
@ -671,6 +670,11 @@ size_t CachedReadBufferFromRemoteFS::getTotalSizeToRead()
return read_until_position - file_offset_of_buffer_end;
}
void CachedReadBufferFromRemoteFS::setReadUntilPosition(size_t)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Method `setReadUntilPosition()` not allowed");
}
off_t CachedReadBufferFromRemoteFS::getPosition()
{
return file_offset_of_buffer_end - available();

View File

@ -31,6 +31,8 @@ public:
String getInfoForLog() override;
void setReadUntilPosition(size_t position) override;
private:
void initialize(size_t offset, size_t size);

View File

@ -269,5 +269,13 @@ String ReadBufferFromRemoteFSGather::getInfoForLog()
return current_buf->getInfoForLog();
}
size_t ReadBufferFromRemoteFSGather::getImplementationBufferOffset() const
{
if (!current_buf)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer not initialized");
return current_buf->getFileOffsetOfBufferEnd();
}
}

View File

@ -52,6 +52,8 @@ public:
String getInfoForLog();
size_t getImplementationBufferOffset() const;
protected:
virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size) = 0;

View File

@ -74,6 +74,8 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
ProfileEvents::increment(ProfileEvents::RemoteFSReadMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, bytes_read);
thread_status.detachQuery();
return Result{ .size = bytes_read, .offset = offset };
});

View File

@ -6,6 +6,7 @@
#include <Common/Stopwatch.h>
#include <Common/setThreadName.h>
#include <Common/MemorySanitizer.h>
#include <Common/CurrentThread.h>
#include <base/errnoToString.h>
#include <Poco/Event.h>
#include <future>
@ -184,9 +185,26 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMiss);
auto task = std::make_shared<std::packaged_task<Result()>>([request, fd]
ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup()
? CurrentThread::get().getThreadGroup()
: MainThreadStatus::getInstance().getThreadGroup();
ContextPtr query_context;
if (CurrentThread::isInitialized)
query_context = CurrentThread::get().getQueryContext();
auto task = std::make_shared<std::packaged_task<Result()>>([request, fd, running_group, query_context]
{
ThreadStatus thread_status;
if (query_context)
thread_status.attachQueryContext(query_context);
if (running_group)
thread_status.attachQuery(running_group);
setThreadName("ThreadPoolRead");
Stopwatch watch(CLOCK_MONOTONIC);
size_t bytes_read = 0;
@ -219,6 +237,8 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
thread_status.detachQuery();
return Result{ .size = bytes_read, .offset = request.ignore };
});