2022-01-04 07:16:24 +00:00
|
|
|
|
#include <fstream>
|
|
|
|
|
#include <IO/ReadBuffer.h>
|
2021-12-27 07:04:26 +00:00
|
|
|
|
#include <IO/ReadBufferFromFile.h>
|
|
|
|
|
#include <IO/ReadHelpers.h>
|
2021-12-23 03:50:26 +00:00
|
|
|
|
#include <IO/ReadSettings.h>
|
2022-01-04 07:16:24 +00:00
|
|
|
|
#include <Storages/Cache/ExternalDataSourceCache.h>
|
|
|
|
|
#include <Storages/Cache/RemoteCacheController.h>
|
|
|
|
|
#include <Storages/Cache/RemoteFileMetadataFactory.h>
|
2021-12-23 03:50:26 +00:00
|
|
|
|
#include <Poco/JSON/JSON.h>
|
|
|
|
|
#include <Poco/JSON/Parser.h>
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
namespace fs = std::filesystem;
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
|
{
|
|
|
|
|
extern const int BAD_ARGUMENTS;
|
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(const std::filesystem::path & local_path_)
|
|
|
|
|
{
|
|
|
|
|
auto * log = &Poco::Logger::get("RemoteCacheController");
|
|
|
|
|
|
|
|
|
|
if (!std::filesystem::exists(local_path_ / "data.bin"))
|
|
|
|
|
{
|
2021-12-27 07:04:26 +00:00
|
|
|
|
LOG_TRACE(log, "Invalid cached directory: {}", local_path_.string());
|
2021-12-23 03:50:26 +00:00
|
|
|
|
return nullptr;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto cache_controller = std::make_shared<RemoteCacheController>(nullptr, local_path_, 0);
|
2021-12-27 07:04:26 +00:00
|
|
|
|
if (cache_controller->file_status != DOWNLOADED)
|
2021-12-23 03:50:26 +00:00
|
|
|
|
{
|
2022-02-28 00:15:37 +00:00
|
|
|
|
// Do not load this invalid cached file and clear it. the clear action is in
|
2021-12-28 03:26:39 +00:00
|
|
|
|
// ExternalDataSourceCache::recoverTask(), because deleting directories during iteration will
|
2022-02-28 00:15:37 +00:00
|
|
|
|
// cause unexpected behaviors.
|
2021-12-23 03:50:26 +00:00
|
|
|
|
LOG_INFO(log, "Recover cached file failed. local path:{}", local_path_.string());
|
|
|
|
|
return nullptr;
|
|
|
|
|
}
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
cache_controller->file_metadata_ptr = RemoteFileMetadataFactory::instance().get(cache_controller->metadata_class);
|
|
|
|
|
}
|
2021-12-27 07:31:24 +00:00
|
|
|
|
catch (const Exception & e)
|
2021-12-23 03:50:26 +00:00
|
|
|
|
{
|
2021-12-23 07:15:39 +00:00
|
|
|
|
LOG_ERROR(log, "Get metadata class failed for {}. {}", cache_controller->metadata_class, e.message());
|
2021-12-23 03:50:26 +00:00
|
|
|
|
cache_controller->file_metadata_ptr = nullptr;
|
|
|
|
|
}
|
|
|
|
|
if (!cache_controller->file_metadata_ptr)
|
|
|
|
|
{
|
|
|
|
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid metadata class:{}", cache_controller->metadata_class);
|
|
|
|
|
}
|
2021-12-27 07:04:26 +00:00
|
|
|
|
ReadBufferFromFile file_readbuffer((local_path_ / "metadata.txt").string());
|
|
|
|
|
std::string metadata_content;
|
|
|
|
|
readStringUntilEOF(metadata_content, file_readbuffer);
|
|
|
|
|
if (!cache_controller->file_metadata_ptr->fromString(metadata_content))
|
2021-12-23 03:50:26 +00:00
|
|
|
|
{
|
2022-01-04 07:16:24 +00:00
|
|
|
|
throw Exception(
|
|
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
|
|
|
|
"Invalid metadata file({}) for meta class {}",
|
|
|
|
|
local_path_.string(),
|
|
|
|
|
cache_controller->metadata_class);
|
2021-12-23 03:50:26 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cache_controller->current_offset = fs::file_size(local_path_ / "data.bin");
|
|
|
|
|
|
|
|
|
|
ExternalDataSourceCache::instance().updateTotalSize(cache_controller->file_metadata_ptr->file_size);
|
|
|
|
|
return cache_controller;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RemoteCacheController::RemoteCacheController(
|
2022-01-04 07:16:24 +00:00
|
|
|
|
IRemoteFileMetadataPtr file_metadata_, const std::filesystem::path & local_path_, size_t cache_bytes_before_flush_)
|
2021-12-23 03:50:26 +00:00
|
|
|
|
: file_metadata_ptr(file_metadata_)
|
|
|
|
|
, local_path(local_path_)
|
|
|
|
|
, valid(true)
|
|
|
|
|
, local_cache_bytes_read_before_flush(cache_bytes_before_flush_)
|
|
|
|
|
, current_offset(0)
|
|
|
|
|
{
|
2022-02-28 00:15:37 +00:00
|
|
|
|
// On recover, file_metadata_ptr is null, but it will be allocated after loading from metadata.txt
|
2021-12-27 07:04:26 +00:00
|
|
|
|
// when we allocate a whole new file cache,file_metadata_ptr must not be null.
|
2021-12-23 03:50:26 +00:00
|
|
|
|
if (file_metadata_ptr)
|
|
|
|
|
{
|
|
|
|
|
metadata_class = file_metadata_ptr->getName();
|
|
|
|
|
auto metadata_file_writer = std::make_unique<WriteBufferFromFile>((local_path_ / "metadata.txt").string());
|
|
|
|
|
auto str_buf = file_metadata_ptr->toString();
|
|
|
|
|
metadata_file_writer->write(str_buf.c_str(), str_buf.size());
|
|
|
|
|
metadata_file_writer->close();
|
|
|
|
|
}
|
2021-12-27 07:04:26 +00:00
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
auto info_path = local_path_ / "info.txt";
|
|
|
|
|
if (fs::exists(info_path))
|
|
|
|
|
{
|
|
|
|
|
std::ifstream info_file(info_path);
|
|
|
|
|
Poco::JSON::Parser info_parser;
|
|
|
|
|
auto info_json = info_parser.parse(info_file).extract<Poco::JSON::Object::Ptr>();
|
|
|
|
|
file_status = static_cast<LocalFileStatus>(info_json->get("file_status").convert<Int32>());
|
|
|
|
|
metadata_class = info_json->get("metadata_class").convert<String>();
|
|
|
|
|
info_file.close();
|
|
|
|
|
}
|
|
|
|
|
}
|
2021-12-23 03:50:26 +00:00
|
|
|
|
}
|
|
|
|
|
|
2021-12-27 07:04:26 +00:00
|
|
|
|
void RemoteCacheController::waitMoreData(size_t start_offset_, size_t end_offset_)
|
2021-12-23 03:50:26 +00:00
|
|
|
|
{
|
|
|
|
|
std::unique_lock lock{mutex};
|
|
|
|
|
if (file_status == DOWNLOADED)
|
|
|
|
|
{
|
2022-02-28 00:15:37 +00:00
|
|
|
|
// Finish reading.
|
2021-12-23 03:50:26 +00:00
|
|
|
|
if (start_offset_ >= current_offset)
|
|
|
|
|
{
|
|
|
|
|
lock.unlock();
|
2021-12-27 07:04:26 +00:00
|
|
|
|
return;
|
2021-12-23 03:50:26 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
2022-02-28 00:15:37 +00:00
|
|
|
|
else // Block until more data is ready.
|
2021-12-23 03:50:26 +00:00
|
|
|
|
{
|
|
|
|
|
if (current_offset >= end_offset_)
|
|
|
|
|
{
|
|
|
|
|
lock.unlock();
|
2021-12-27 07:04:26 +00:00
|
|
|
|
return;
|
2021-12-23 03:50:26 +00:00
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
more_data_signal.wait(lock, [this, end_offset_] { return file_status == DOWNLOADED || current_offset >= end_offset_; });
|
|
|
|
|
}
|
|
|
|
|
lock.unlock();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool RemoteCacheController::isModified(IRemoteFileMetadataPtr file_metadata_)
|
|
|
|
|
{
|
2021-12-27 07:04:26 +00:00
|
|
|
|
return file_metadata_ptr->getVersion() != file_metadata_->getVersion();
|
2021-12-23 03:50:26 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void RemoteCacheController::startBackgroundDownload(std::unique_ptr<ReadBuffer> in_readbuffer_, BackgroundSchedulePool & thread_pool)
|
|
|
|
|
{
|
|
|
|
|
data_file_writer = std::make_unique<WriteBufferFromFile>((fs::path(local_path) / "data.bin").string());
|
|
|
|
|
flush(true);
|
|
|
|
|
ReadBufferPtr in_readbuffer(in_readbuffer_.release());
|
2022-01-04 07:16:24 +00:00
|
|
|
|
download_task_holder = thread_pool.createTask("download remote file", [this, in_readbuffer] { backgroundDownload(in_readbuffer); });
|
2021-12-23 03:50:26 +00:00
|
|
|
|
download_task_holder->activateAndSchedule();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void RemoteCacheController::backgroundDownload(ReadBufferPtr remote_read_buffer)
|
|
|
|
|
{
|
|
|
|
|
file_status = DOWNLOADING;
|
|
|
|
|
size_t before_unflush_bytes = 0;
|
|
|
|
|
size_t total_bytes = 0;
|
|
|
|
|
while (!remote_read_buffer->eof())
|
|
|
|
|
{
|
|
|
|
|
size_t bytes = remote_read_buffer->available();
|
|
|
|
|
|
|
|
|
|
data_file_writer->write(remote_read_buffer->position(), bytes);
|
|
|
|
|
remote_read_buffer->position() += bytes;
|
|
|
|
|
total_bytes += bytes;
|
|
|
|
|
before_unflush_bytes += bytes;
|
|
|
|
|
if (before_unflush_bytes >= local_cache_bytes_read_before_flush)
|
|
|
|
|
{
|
|
|
|
|
std::unique_lock lock(mutex);
|
|
|
|
|
current_offset += total_bytes;
|
|
|
|
|
total_bytes = 0;
|
|
|
|
|
flush();
|
|
|
|
|
lock.unlock();
|
|
|
|
|
more_data_signal.notify_all();
|
|
|
|
|
before_unflush_bytes = 0;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
std::unique_lock lock(mutex);
|
|
|
|
|
current_offset += total_bytes;
|
|
|
|
|
file_status = DOWNLOADED;
|
|
|
|
|
flush(true);
|
|
|
|
|
data_file_writer.reset();
|
|
|
|
|
lock.unlock();
|
|
|
|
|
more_data_signal.notify_all();
|
|
|
|
|
ExternalDataSourceCache::instance().updateTotalSize(file_metadata_ptr->file_size);
|
|
|
|
|
LOG_TRACE(log, "Finish download into local path: {}, file metadata:{} ", local_path.string(), file_metadata_ptr->toString());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void RemoteCacheController::flush(bool need_flush_status)
|
|
|
|
|
{
|
|
|
|
|
if (data_file_writer)
|
|
|
|
|
{
|
|
|
|
|
data_file_writer->sync();
|
|
|
|
|
}
|
|
|
|
|
if (need_flush_status)
|
|
|
|
|
{
|
|
|
|
|
auto file_writer = std::make_unique<WriteBufferFromFile>(local_path / "info.txt");
|
|
|
|
|
Poco::JSON::Object jobj;
|
|
|
|
|
jobj.set("file_status", static_cast<Int32>(file_status));
|
|
|
|
|
jobj.set("metadata_class", metadata_class);
|
|
|
|
|
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
|
|
|
|
jobj.stringify(buf);
|
|
|
|
|
file_writer->write(buf.str().c_str(), buf.str().size());
|
|
|
|
|
file_writer->close();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RemoteCacheController::~RemoteCacheController()
|
|
|
|
|
{
|
|
|
|
|
if (download_task_holder)
|
|
|
|
|
download_task_holder->deactivate();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void RemoteCacheController::close()
|
|
|
|
|
{
|
|
|
|
|
// delete directory
|
|
|
|
|
LOG_TRACE(log, "Removing the local cache. local path: {}", local_path.string());
|
2021-12-28 08:57:07 +00:00
|
|
|
|
if (fs::exists(local_path))
|
|
|
|
|
fs::remove_all(local_path);
|
2021-12-23 03:50:26 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::unique_ptr<ReadBufferFromFileBase> RemoteCacheController::allocFile()
|
|
|
|
|
{
|
|
|
|
|
ReadSettings settings;
|
|
|
|
|
//settings.local_fs_method = LocalFSReadMethod::read;
|
|
|
|
|
auto file_buffer = createReadBufferFromFileBase((local_path / "data.bin").string(), settings);
|
|
|
|
|
|
|
|
|
|
return file_buffer;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|