Use read/write buffer to read/write files

This commit is contained in:
lgbo-ustc 2021-12-02 18:10:58 +08:00 committed by liangjiabiao
parent 9ec8272186
commit c8f536a0a4
2 changed files with 93 additions and 151 deletions

View File

@ -111,8 +111,8 @@ RemoteCacheController::RemoteCacheController(
if (remote_read_buffer)
{
// setup local files
out_file = std::make_unique<std::ofstream>(fs::path(local_path_) / "data.bin", std::ios::out | std::ios::binary);
out_file->flush();
data_file_writer = std::make_unique<WriteBufferFromFile>((fs::path(local_path_) / "data.bin").string());
data_file_writer->sync();
file_meta_data.save(local_path_ / "meta.txt");
@ -141,7 +141,7 @@ RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offs
return RemoteReadBufferCacheError::OK;
}
else
more_data_signal.wait(lock, [this, end_offset_] { return this->file_meta_data.status == RemoteFileMetadata::DOWNLOADED || current_offset >= end_offset_; });
more_data_signal.wait(lock, [this, end_offset_] { return file_meta_data.status == RemoteFileMetadata::DOWNLOADED || current_offset >= end_offset_; });
}
lock.unlock();
return RemoteReadBufferCacheError::OK;
@ -156,7 +156,7 @@ void RemoteCacheController::backgroundDownload()
{
size_t bytes = remote_read_buffer->available();
out_file->write(remote_read_buffer->position(), bytes);
data_file_writer->write(remote_read_buffer->position(), bytes);
remote_read_buffer->position() += bytes;
total_bytes += bytes;
before_unflush_bytes += bytes;
@ -175,8 +175,7 @@ void RemoteCacheController::backgroundDownload()
current_offset += total_bytes;
file_meta_data.status = RemoteFileMetadata::DOWNLOADED;
flush(true);
out_file->close();
out_file.reset();
data_file_writer.reset();
remote_read_buffer.reset();
lock.unlock();
more_data_signal.notify_all();
@ -186,9 +185,9 @@ void RemoteCacheController::backgroundDownload()
void RemoteCacheController::flush(bool need_flush_meta_data_)
{
if (out_file)
if (data_file_writer)
{
out_file->flush();
data_file_writer->sync();
}
if (!need_flush_meta_data_)
@ -202,79 +201,43 @@ RemoteCacheController::~RemoteCacheController() = default;
void RemoteCacheController::close()
{
// delete directory
LOG_TRACE(log, "Removing all local cache. local path: {}, file meta data:{}", local_path.string(), file_meta_data.toString());
LOG_TRACE(log, "Removing the local cache. local path: {}, file meta data:{}", local_path.string(), file_meta_data.toString());
std::filesystem::remove_all(local_path);
}
std::pair<FILE *, std::filesystem::path> RemoteCacheController::allocFile()
std::unique_ptr<ReadBufferFromFileBase> RemoteCacheController::allocFile()
{
std::filesystem::path result_local_path = local_path / "data.bin";
ReadSettings settings;
settings.local_fs_prefetch = false;
settings.local_fs_method = LocalFSReadMethod::read;
auto file_buffer = createReadBufferFromFileBase((local_path / "data.bin").string(), settings);
FILE * fs = fopen(result_local_path.string().c_str(), "r");
if (!fs)
throw Exception(ErrorCodes::BAD_GET, "Alloc file failed, error code: {} local path: {}", errno, local_path.string());
std::lock_guard lock{mutex};
opened_file_streams.insert(fs);
return {fs, result_local_path};
}
void RemoteCacheController::deallocFile(FILE * file_stream)
{
if (file_buffer)
{
std::lock_guard lock{mutex};
auto it = opened_file_streams.find(file_stream);
if (it == opened_file_streams.end())
throw Exception(
opened_file_buffer_refs.insert(reinterpret_cast<uintptr_t>(file_buffer.get()));
}
return file_buffer;
}
void RemoteCacheController::deallocFile(std::unique_ptr<ReadBufferFromFileBase> file_buffer)
{
if (!file_buffer)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Try to release a null file buffer for ", local_path.string());
}
auto buffer_ref = reinterpret_cast<uintptr_t>(file_buffer.get());
std::lock_guard lock{mutex};
auto it = opened_file_buffer_refs.find(buffer_ref);
if (it == opened_file_buffer_refs.end())
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Try to deallocate file with invalid handler remote path: {}, local path: {}",
file_meta_data.remote_path,
local_path.string());
opened_file_streams.erase(it);
}
fclose(file_stream);
}
LocalCachedFileReader::LocalCachedFileReader(RemoteCacheController * cache_controller_)
: cache_controller(cache_controller_)
, file_size(cache_controller_->getFileMetaData().file_size)
, offset(0)
{
std::tie(file_stream, local_path) = cache_controller->allocFile();
}
LocalCachedFileReader::~LocalCachedFileReader()
{
cache_controller->deallocFile(file_stream);
}
size_t LocalCachedFileReader::read(char * buf, size_t size)
{
auto wret = cache_controller->waitMoreData(offset, offset + size);
if (wret != RemoteReadBufferCacheError::OK)
return 0;
std::lock_guard lock(mutex);
auto ret_size = fread(buf, 1, size, file_stream);
offset += ret_size;
return ret_size;
}
off_t LocalCachedFileReader::seek(off_t off)
{
cache_controller->waitMoreData(off, 0);
std::lock_guard lock(mutex);
auto ret = fseek(file_stream, off, SEEK_SET);
if (ret < 0)
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Seek file {} with size {} to offset {} failed: {}", getPath().string(), getSize(), off, errnoToString(errno));
offset = off;
return off;
}
size_t LocalCachedFileReader::getSize()
{
return file_size;
opened_file_buffer_refs.erase(it);
}
// the size need be equal to the original buffer
@ -282,7 +245,10 @@ RemoteReadBuffer::RemoteReadBuffer(size_t buff_size) : BufferWithOwnMemory<Seeka
{
}
RemoteReadBuffer::~RemoteReadBuffer() = default;
RemoteReadBuffer::~RemoteReadBuffer()
{
file_cache_controller->deallocFile(std::move(file_buffer));
}
std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(ContextPtr context, const RemoteFileMetadata & remote_file_meta, std::unique_ptr<ReadBuffer> read_buffer)
{
@ -303,28 +269,40 @@ std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(ContextPtr context, c
const auto & remote_path = remote_file_meta.remote_path;
auto remote_read_buffer = std::make_unique<RemoteReadBuffer>(buff_size);
auto * raw_rbp = read_buffer.release();
std::shared_ptr<ReadBuffer> srb(raw_rbp);
auto * raw_readbuffer_ptr = read_buffer.release();
std::shared_ptr<ReadBuffer> shared_readbuffer_ptr(raw_readbuffer_ptr);
RemoteReadBufferCacheError error;
std::tie(remote_read_buffer->file_reader, error) = RemoteReadBufferCache::instance().createReader(context, remote_file_meta, srb);
if (remote_read_buffer->file_reader == nullptr)
std::tie(remote_read_buffer->file_cache_controller, error) = RemoteReadBufferCache::instance().createReader(context, remote_file_meta, shared_readbuffer_ptr);
if (remote_read_buffer->file_cache_controller == nullptr)
{
LOG_ERROR(log, "Failed to allocate local file for remote path: {}, reason: {}", remote_path, error);
remote_read_buffer->original_read_buffer = srb;
remote_read_buffer->original_read_buffer = shared_readbuffer_ptr;
}
else
{
remote_read_buffer->file_buffer = remote_read_buffer->file_cache_controller->allocFile();
if (!remote_read_buffer->file_buffer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Create file readbuffer failed. {}",
remote_read_buffer->file_cache_controller->getLocalPath().string());
}
return remote_read_buffer;
}
bool RemoteReadBuffer::nextImpl()
{
if (file_reader)
if (file_buffer)
{
int bytes_read = file_reader->read(internal_buffer.begin(), internal_buffer.size());
if (bytes_read)
working_buffer.resize(bytes_read);
else
return false;
auto start_offset = file_buffer->getPosition();
auto end_offset = file_buffer->internalBuffer().size();
file_cache_controller->waitMoreData(start_offset, end_offset);
auto status = file_buffer->next();
if (status)
BufferBase::set(file_buffer->buffer().begin(),
file_buffer->buffer().size(),
file_buffer->offset());
return status;
}
else
{
@ -339,39 +317,25 @@ bool RemoteReadBuffer::nextImpl()
BufferBase::set(original_read_buffer->buffer().begin(), original_read_buffer->buffer().size(), original_read_buffer->offset());
return status;
}
return true;
}
off_t RemoteReadBuffer::seek(off_t offset, int whence)
{
off_t pos_in_file = file_reader->getOffset();
off_t new_pos;
if (whence == SEEK_SET)
new_pos = offset;
else if (whence == SEEK_CUR)
new_pos = pos_in_file - available() + offset;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expects SEEK_SET or SEEK_CUR as whence but given {}", whence);
/// Position is unchanged.
if (off_t(new_pos + available()) == pos_in_file)
return new_pos;
if (new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast<off_t>(working_buffer.size()))
{
/// Position is still inside buffer.
pos = working_buffer.begin() + (new_pos - (pos_in_file - working_buffer.size()));
return new_pos;
}
pos = working_buffer.end();
auto ret_off = file_reader->seek(new_pos);
return ret_off;
/*
* Need to wait here. For example, the current file has been download at position X, but here we try to seek to
* postition Y ( Y > X), it would fail.
*/
file_cache_controller->waitMoreData(offset, offset + file_buffer->internalBuffer().size());
auto ret = file_buffer->seek(offset, whence);
BufferBase::set(file_buffer->buffer().begin(),
file_buffer->buffer().size(),
file_buffer->offset());
return ret;
}
off_t RemoteReadBuffer::getPosition()
{
return file_reader->getOffset() - available();
return file_buffer->getPosition();
}
RemoteReadBufferCache::RemoteReadBufferCache() = default;
@ -450,7 +414,7 @@ String RemoteReadBufferCache::calculateLocalPath(const RemoteFileMetadata & meta
return fs::path(root_dir) / hashcode_str.substr(0, 3) / hashcode_str;
}
std::pair<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError>
std::pair<RemoteCacheControllerPtr, RemoteReadBufferCacheError>
RemoteReadBufferCache::createReader(ContextPtr context, const RemoteFileMetadata & remote_file_meta, std::shared_ptr<ReadBuffer> & read_buffer)
{
LOG_TRACE(log, "createReader. {} {} {}", remote_file_meta.remote_path, remote_file_meta.last_modification_timestamp, remote_file_meta.file_size);
@ -483,9 +447,7 @@ RemoteReadBufferCache::createReader(ContextPtr context, const RemoteFileMetadata
{
// move the key to the list end
keys.splice(keys.end(), keys, cache_iter->second.key_iterator);
return {
std::make_shared<LocalCachedFileReader>(cache_iter->second.cache_controller.get()),
RemoteReadBufferCacheError::OK};
return { cache_iter->second.cache_controller, RemoteReadBufferCacheError::OK};
}
}
@ -497,7 +459,7 @@ RemoteReadBufferCache::createReader(ContextPtr context, const RemoteFileMetadata
{
keys.splice(keys.end(), keys, cache_iter->second.key_iterator);
return {
std::make_shared<LocalCachedFileReader>(cache_iter->second.cache_controller.get()),
cache_iter->second.cache_controller,
RemoteReadBufferCacheError::OK};
}
else
@ -526,7 +488,7 @@ RemoteReadBufferCache::createReader(ContextPtr context, const RemoteFileMetadata
cache_cell.key_iterator = keys.insert(keys.end(), local_path);
caches[local_path] = cache_cell;
return {std::make_shared<LocalCachedFileReader>(cache_controller.get()), RemoteReadBufferCacheError::OK};
return {cache_controller, RemoteReadBufferCacheError::OK};
}
bool RemoteReadBufferCache::clearLocalCache()

View File

@ -10,6 +10,11 @@
#include <Common/ThreadPool.h>
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/createReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadSettings.h>
#include <IO/SeekableReadBuffer.h>
#include <condition_variable>
#include <Interpreters/Context.h>
@ -81,8 +86,8 @@ public:
* Called by LocalCachedFileReader, must be used in pair
* The second value of the return tuple is the local_path to store file.
*/
std::pair<FILE *, std::filesystem::path> allocFile();
void deallocFile(FILE * file_stream);
std::unique_ptr<ReadBufferFromFileBase> allocFile();
void deallocFile(std::unique_ptr<ReadBufferFromFileBase> buffer);
/**
* when allocFile be called, count++. deallocFile be called, count--.
@ -91,7 +96,8 @@ public:
inline bool closable()
{
std::lock_guard lock{mutex};
return opened_file_streams.empty() && remote_read_buffer == nullptr;
//return opened_file_streams.empty() && remote_read_buffer == nullptr;
return opened_file_buffer_refs.empty() && remote_read_buffer == nullptr;
}
void close();
@ -130,7 +136,7 @@ private:
std::mutex mutex;
std::condition_variable more_data_signal;
std::set<FILE *> opened_file_streams;
std::set<uintptr_t> opened_file_buffer_refs; // refer to a buffer address
// meta info
RemoteFileMetadata file_meta_data;
@ -141,40 +147,11 @@ private:
size_t current_offset;
std::shared_ptr<ReadBuffer> remote_read_buffer;
std::unique_ptr<std::ofstream> out_file;
std::unique_ptr<WriteBufferFromFileBase> data_file_writer;
Poco::Logger * log = &Poco::Logger::get("RemoteCacheController");
};
/**
* access local cached files by RemoteCacheController, and be used in RemoteReadBuffer
*/
class LocalCachedFileReader
{
public:
LocalCachedFileReader(RemoteCacheController * cache_controller_);
~LocalCachedFileReader();
// expect to read size bytes into buf, return is the real bytes read
size_t read(char * buf, size_t size);
off_t seek(off_t offset);
inline std::filesystem::path getPath() const { return local_path; }
inline off_t getOffset() const { return static_cast<off_t>(offset); }
size_t getSize();
private:
RemoteCacheController * cache_controller;
size_t file_size;
size_t offset;
std::mutex mutex;
FILE * file_stream;
std::filesystem::path local_path;
Poco::Logger * log = &Poco::Logger::get("LocalCachedFileReader");
};
using RemoteCacheControllerPtr = std::shared_ptr<RemoteCacheController>;
/*
* FIXME:RemoteReadBuffer derive from SeekableReadBufferWithSize may cause some risks, since it's not seekable in some cases
@ -188,13 +165,16 @@ public:
static std::unique_ptr<RemoteReadBuffer> create(ContextPtr contex, const RemoteFileMetadata & remote_file_meta, std::unique_ptr<ReadBuffer> read_buffer);
bool nextImpl() override;
inline bool seekable() { return file_reader != nullptr && file_reader->getSize() > 0; }
inline bool seekable() { return !file_buffer && file_cache_controller->getFileMetaData().file_size > 0; }
off_t seek(off_t off, int whence) override;
off_t getPosition() override;
std::optional<size_t> getTotalSize() override { return file_reader->getSize(); }
std::optional<size_t> getTotalSize() override { return file_cache_controller->getFileMetaData().file_size; }
private:
std::shared_ptr<LocalCachedFileReader> file_reader;
std::shared_ptr<RemoteCacheController> file_cache_controller;
std::unique_ptr<ReadBufferFromFileBase> file_buffer;
// in case local cache don't work, this buffer is setted;
std::shared_ptr<ReadBuffer> original_read_buffer;
};
@ -209,7 +189,7 @@ public:
inline bool isInitialized() const { return initialized; }
std::pair<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError>
std::pair<RemoteCacheControllerPtr, RemoteReadBufferCacheError>
createReader(ContextPtr context, const RemoteFileMetadata & remote_file_meta, std::shared_ptr<ReadBuffer> & read_buffer);
void updateTotalSize(size_t size) { total_size += size; }