2021-11-29 03:30:11 +00:00
|
|
|
#include <fstream>
|
|
|
|
#include <memory>
|
|
|
|
#include <unistd.h>
|
2021-11-05 11:55:30 +00:00
|
|
|
#include <functional>
|
|
|
|
#include <Poco/Logger.h>
|
|
|
|
#include <Poco/JSON/JSON.h>
|
|
|
|
#include <Poco/JSON/Parser.h>
|
|
|
|
#include <base/logger_useful.h>
|
2021-11-29 05:01:03 +00:00
|
|
|
#include <base/sleep.h>
|
2021-11-29 09:01:34 +00:00
|
|
|
#include <base/errnoToString.h>
|
2021-11-29 03:30:11 +00:00
|
|
|
#include <Common/SipHash.h>
|
|
|
|
#include <Common/hex.h>
|
|
|
|
#include <Common/Exception.h>
|
|
|
|
#include <IO/RemoteReadBufferCache.h>
|
|
|
|
#include <IO/WriteHelpers.h>
|
2021-11-05 11:55:30 +00:00
|
|
|
|
2021-11-29 12:19:36 +00:00
|
|
|
|
|
|
|
namespace fs = std::filesystem;
|
|
|
|
|
2021-11-05 11:55:30 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int BAD_ARGUMENTS;
|
|
|
|
extern const int BAD_GET;
|
|
|
|
extern const int LOGICAL_ERROR;
|
2021-11-29 08:22:43 +00:00
|
|
|
extern const int CANNOT_CREATE_DIRECTORY;
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
|
2021-11-29 03:30:11 +00:00
|
|
|
std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(
|
2021-11-29 08:22:43 +00:00
|
|
|
const String & local_path_, std::function<void(RemoteCacheController *)> const & finish_callback)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-29 08:22:43 +00:00
|
|
|
fs::path data_file = fs::path(local_path_) / "data.bin";
|
|
|
|
fs::path meta_file = fs::path(local_path_) / "meta.txt";
|
2021-11-29 04:01:52 +00:00
|
|
|
auto * log = &Poco::Logger::get("RemoteCacheController");
|
2021-11-29 08:22:43 +00:00
|
|
|
if (!fs::exists(data_file) || !fs::exists(meta_file))
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-29 08:22:43 +00:00
|
|
|
LOG_ERROR(log, "Directory {} or file {}, {} does not exist", local_path_, data_file.string(), meta_file.string());
|
2021-11-05 11:55:30 +00:00
|
|
|
return nullptr;
|
|
|
|
}
|
|
|
|
|
2021-11-12 05:12:24 +00:00
|
|
|
std::ifstream meta_fs(meta_file);
|
2021-11-05 11:55:30 +00:00
|
|
|
Poco::JSON::Parser meta_parser;
|
|
|
|
auto meta_jobj = meta_parser.parse(meta_fs).extract<Poco::JSON::Object::Ptr>();
|
2021-11-29 08:22:43 +00:00
|
|
|
auto remote_path = meta_jobj->get("remote_path").convert<String>();
|
|
|
|
auto schema = meta_jobj->get("schema").convert<String>();
|
|
|
|
auto cluster = meta_jobj->get("cluster").convert<String>();
|
|
|
|
auto downloaded = meta_jobj->get("downloaded").convert<String>();
|
2021-11-12 05:12:24 +00:00
|
|
|
auto modification_ts = meta_jobj->get("last_modification_timestamp").convert<UInt64>();
|
2021-11-05 11:55:30 +00:00
|
|
|
if (downloaded == "false")
|
|
|
|
{
|
2021-11-29 09:01:34 +00:00
|
|
|
LOG_ERROR(log, "Local metadata for local path {} exists, but the data was not downloaded", local_path_);
|
2021-11-05 11:55:30 +00:00
|
|
|
return nullptr;
|
|
|
|
}
|
2021-11-29 08:22:43 +00:00
|
|
|
auto file_size = fs::file_size(data_file);
|
2021-11-05 11:55:30 +00:00
|
|
|
|
2021-11-29 05:01:03 +00:00
|
|
|
RemoteFileMetadata remote_file_meta(schema, cluster, remote_path, modification_ts, file_size);
|
|
|
|
auto cache_controller = std::make_shared<RemoteCacheController>(remote_file_meta, local_path_, 0, nullptr, finish_callback);
|
|
|
|
cache_controller->download_finished = true;
|
|
|
|
cache_controller->current_offset = file_size;
|
2021-11-05 11:55:30 +00:00
|
|
|
meta_fs.close();
|
|
|
|
|
2021-11-29 05:01:03 +00:00
|
|
|
finish_callback(cache_controller.get());
|
|
|
|
return cache_controller;
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
RemoteCacheController::RemoteCacheController(
|
2021-11-29 05:01:03 +00:00
|
|
|
const RemoteFileMetadata & remote_file_meta,
|
2021-11-29 08:22:43 +00:00
|
|
|
const String & local_path_,
|
2021-11-11 11:50:08 +00:00
|
|
|
size_t cache_bytes_before_flush_,
|
2021-11-29 12:19:36 +00:00
|
|
|
std::shared_ptr<ReadBuffer> read_buffer_,
|
2021-11-05 11:55:30 +00:00
|
|
|
std::function<void(RemoteCacheController *)> const & finish_callback)
|
2021-11-29 05:01:03 +00:00
|
|
|
: schema(remote_file_meta.schema)
|
|
|
|
, cluster(remote_file_meta.cluster)
|
|
|
|
, remote_path(remote_file_meta.path)
|
|
|
|
, local_path(local_path_)
|
|
|
|
, last_modify_time(remote_file_meta.last_modify_time)
|
|
|
|
, valid(true)
|
|
|
|
, local_cache_bytes_read_before_flush(cache_bytes_before_flush_)
|
|
|
|
, download_finished(false)
|
|
|
|
, current_offset(0)
|
2021-11-29 12:19:36 +00:00
|
|
|
, remote_read_buffer(read_buffer_)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-29 12:19:36 +00:00
|
|
|
/// readbuffer == nullptr if `RemoteCacheController` is created in `initOnce`, when metadata and local cache already exist.
|
|
|
|
if (remote_read_buffer)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
|
|
|
// setup local files
|
2021-11-29 08:22:43 +00:00
|
|
|
out_file = std::make_unique<std::ofstream>(fs::path(local_path_) / "data.bin", std::ios::out | std::ios::binary);
|
2021-11-05 11:55:30 +00:00
|
|
|
out_file->flush();
|
|
|
|
|
|
|
|
Poco::JSON::Object jobj;
|
2021-11-12 05:12:24 +00:00
|
|
|
jobj.set("schema", schema);
|
|
|
|
jobj.set("cluster", cluster);
|
|
|
|
jobj.set("remote_path", remote_path);
|
2021-11-05 11:55:30 +00:00
|
|
|
jobj.set("downloaded", "false");
|
2021-11-29 05:01:03 +00:00
|
|
|
jobj.set("last_modification_timestamp", last_modify_time);
|
2021-11-29 03:30:11 +00:00
|
|
|
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
2021-11-05 11:55:30 +00:00
|
|
|
jobj.stringify(buf);
|
2021-11-29 08:22:43 +00:00
|
|
|
std::ofstream meta_file(fs::path(local_path_) / "meta.txt", std::ios::out);
|
2021-11-05 11:55:30 +00:00
|
|
|
meta_file.write(buf.str().c_str(), buf.str().size());
|
|
|
|
meta_file.close();
|
|
|
|
|
2021-11-29 05:01:03 +00:00
|
|
|
backgroundDownload(finish_callback);
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-11 06:12:15 +00:00
|
|
|
RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offset_, size_t end_offset_)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
|
|
|
std::unique_lock lock{mutex};
|
|
|
|
if (download_finished)
|
|
|
|
{
|
|
|
|
// finish reading
|
|
|
|
if (start_offset_ >= current_offset)
|
|
|
|
{
|
|
|
|
lock.unlock();
|
2021-11-11 06:12:15 +00:00
|
|
|
return RemoteReadBufferCacheError::END_OF_FILE;
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
else // block until more data is ready
|
|
|
|
{
|
|
|
|
if (current_offset >= end_offset_)
|
|
|
|
{
|
|
|
|
lock.unlock();
|
2021-11-11 06:12:15 +00:00
|
|
|
return RemoteReadBufferCacheError::OK;
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
else
|
2021-11-29 06:50:33 +00:00
|
|
|
more_data_signal.wait(lock, [this, end_offset_] { return download_finished || current_offset >= end_offset_; });
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
lock.unlock();
|
2021-11-11 06:12:15 +00:00
|
|
|
return RemoteReadBufferCacheError::OK;
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
|
2021-11-29 05:01:03 +00:00
|
|
|
void RemoteCacheController::backgroundDownload(std::function<void(RemoteCacheController *)> const & finish_callback)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-29 08:22:43 +00:00
|
|
|
auto task = [this, finish_callback]()
|
2021-11-29 05:01:03 +00:00
|
|
|
{
|
|
|
|
size_t before_unflush_bytes = 0;
|
2021-11-05 11:55:30 +00:00
|
|
|
size_t total_bytes = 0;
|
2021-11-29 12:19:36 +00:00
|
|
|
while (!remote_read_buffer->eof())
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-29 12:19:36 +00:00
|
|
|
size_t bytes = remote_read_buffer->available();
|
2021-11-12 05:12:24 +00:00
|
|
|
|
2021-11-29 12:19:36 +00:00
|
|
|
out_file->write(remote_read_buffer->position(), bytes);
|
|
|
|
remote_read_buffer->position() += bytes;
|
2021-11-05 11:55:30 +00:00
|
|
|
total_bytes += bytes;
|
2021-11-29 05:01:03 +00:00
|
|
|
before_unflush_bytes += bytes;
|
|
|
|
if (before_unflush_bytes >= local_cache_bytes_read_before_flush)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
|
|
|
std::unique_lock lock(mutex);
|
|
|
|
current_offset += total_bytes;
|
|
|
|
total_bytes = 0;
|
|
|
|
flush();
|
|
|
|
lock.unlock();
|
|
|
|
more_data_signal.notify_all();
|
2021-11-29 05:01:03 +00:00
|
|
|
before_unflush_bytes = 0;
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
std::unique_lock lock(mutex);
|
|
|
|
current_offset += total_bytes;
|
|
|
|
download_finished = true;
|
2021-11-11 11:50:08 +00:00
|
|
|
flush(true);
|
2021-11-05 11:55:30 +00:00
|
|
|
out_file->close();
|
2021-11-29 12:19:36 +00:00
|
|
|
out_file.reset();
|
|
|
|
remote_read_buffer.reset();
|
2021-11-05 11:55:30 +00:00
|
|
|
lock.unlock();
|
|
|
|
more_data_signal.notify_all();
|
|
|
|
finish_callback(this);
|
2021-11-29 09:01:34 +00:00
|
|
|
LOG_TRACE(log, "Finish download from remote path: {} to local path: {}, file size:{} ", remote_path, local_path, current_offset);
|
2021-11-05 11:55:30 +00:00
|
|
|
};
|
2021-11-29 03:47:31 +00:00
|
|
|
RemoteReadBufferCache::instance().getThreadPool()->scheduleOrThrow(task);
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
|
2021-11-11 11:50:08 +00:00
|
|
|
void RemoteCacheController::flush(bool need_flush_meta_)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-12 05:12:24 +00:00
|
|
|
if (out_file)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
|
|
|
out_file->flush();
|
|
|
|
}
|
2021-11-11 11:50:08 +00:00
|
|
|
|
|
|
|
if (!need_flush_meta_)
|
|
|
|
return;
|
2021-11-05 11:55:30 +00:00
|
|
|
Poco::JSON::Object jobj;
|
|
|
|
jobj.set("schema", schema);
|
|
|
|
jobj.set("cluster", cluster);
|
|
|
|
jobj.set("remote_path", remote_path);
|
|
|
|
jobj.set("downloaded", download_finished ? "true" : "false");
|
2021-11-29 05:01:03 +00:00
|
|
|
jobj.set("last_modification_timestamp", last_modify_time);
|
2021-11-29 03:30:11 +00:00
|
|
|
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
2021-11-05 11:55:30 +00:00
|
|
|
jobj.stringify(buf);
|
|
|
|
|
2021-11-29 08:22:43 +00:00
|
|
|
std::ofstream meta_file(fs::path(local_path) / "meta.txt", std::ios::out);
|
2021-11-05 11:55:30 +00:00
|
|
|
meta_file << buf.str();
|
|
|
|
meta_file.close();
|
|
|
|
}
|
|
|
|
|
2021-11-15 08:47:12 +00:00
|
|
|
RemoteCacheController::~RemoteCacheController() = default;
|
2021-11-29 05:01:03 +00:00
|
|
|
|
2021-11-05 11:55:30 +00:00
|
|
|
void RemoteCacheController::close()
|
|
|
|
{
|
2021-11-29 08:22:43 +00:00
|
|
|
LOG_TRACE(log, "Removing all local cache for remote path: {}, local path: {}", remote_path, local_path);
|
|
|
|
fs::remove_all(local_path);
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
|
2021-11-29 08:22:43 +00:00
|
|
|
std::pair<FILE *, String> RemoteCacheController::allocFile()
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-29 08:22:43 +00:00
|
|
|
fs::path result_local_path;
|
2021-11-05 11:55:30 +00:00
|
|
|
if (download_finished)
|
2021-11-29 08:22:43 +00:00
|
|
|
result_local_path = fs::path(local_path) / "data.bin";
|
2021-11-29 05:01:03 +00:00
|
|
|
|
2021-11-29 08:22:43 +00:00
|
|
|
FILE * fs = fopen((fs::path(local_path) / "data.bin").string().c_str(), "r");
|
2021-11-29 09:01:34 +00:00
|
|
|
if (!fs)
|
|
|
|
throw Exception(ErrorCodes::BAD_GET, "Alloc file failed, error code: {} local path: {}", errno, local_path);
|
2021-11-29 05:01:03 +00:00
|
|
|
|
2021-11-05 11:55:30 +00:00
|
|
|
std::lock_guard lock{mutex};
|
|
|
|
opened_file_streams.insert(fs);
|
2021-11-11 06:12:15 +00:00
|
|
|
return {fs, result_local_path};
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
|
2021-11-29 12:19:36 +00:00
|
|
|
void RemoteCacheController::deallocFile(FILE * file_stream)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
|
|
|
{
|
|
|
|
std::lock_guard lock{mutex};
|
2021-11-29 12:19:36 +00:00
|
|
|
auto it = opened_file_streams.find(file_stream);
|
2021-11-05 11:55:30 +00:00
|
|
|
if (it == opened_file_streams.end())
|
2021-11-29 12:19:36 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::BAD_ARGUMENTS,
|
|
|
|
"Try to deallocate file with invalid handler remote path: {}, local path: {}",
|
|
|
|
remote_path,
|
|
|
|
local_path);
|
2021-11-05 11:55:30 +00:00
|
|
|
opened_file_streams.erase(it);
|
|
|
|
}
|
2021-11-29 12:19:36 +00:00
|
|
|
fclose(file_stream);
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
|
2021-11-29 09:01:34 +00:00
|
|
|
LocalCachedFileReader::LocalCachedFileReader(RemoteCacheController * cache_controller_, size_t file_size_)
|
|
|
|
: cache_controller(cache_controller_)
|
|
|
|
, file_size(file_size_)
|
|
|
|
, offset(0)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-29 09:01:34 +00:00
|
|
|
std::tie(file_stream, local_path) = cache_controller->allocFile();
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
2021-11-29 09:01:34 +00:00
|
|
|
|
2021-11-05 11:55:30 +00:00
|
|
|
LocalCachedFileReader::~LocalCachedFileReader()
|
|
|
|
{
|
2021-11-29 09:01:34 +00:00
|
|
|
cache_controller->deallocFile(file_stream);
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
size_t LocalCachedFileReader::read(char * buf, size_t size)
|
|
|
|
{
|
2021-11-29 08:22:43 +00:00
|
|
|
auto wret = cache_controller->waitMoreData(offset, offset + size);
|
2021-11-11 06:12:15 +00:00
|
|
|
if (wret != RemoteReadBufferCacheError::OK)
|
2021-11-05 11:55:30 +00:00
|
|
|
return 0;
|
|
|
|
std::lock_guard lock(mutex);
|
2021-11-29 09:01:34 +00:00
|
|
|
auto ret_size = fread(buf, 1, size, file_stream);
|
2021-11-05 11:55:30 +00:00
|
|
|
offset += ret_size;
|
|
|
|
return ret_size;
|
|
|
|
}
|
|
|
|
|
|
|
|
off_t LocalCachedFileReader::seek(off_t off)
|
|
|
|
{
|
2021-11-29 08:22:43 +00:00
|
|
|
cache_controller->waitMoreData(off, 1);
|
2021-11-05 11:55:30 +00:00
|
|
|
std::lock_guard lock(mutex);
|
2021-11-29 09:01:34 +00:00
|
|
|
auto ret = fseek(file_stream, off, SEEK_SET);
|
|
|
|
if (ret < 0)
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::BAD_ARGUMENTS, "Seek file {} with size {} to offset {} failed: {}", getPath(), getSize(), off, errnoToString(errno));
|
|
|
|
|
2021-11-05 11:55:30 +00:00
|
|
|
offset = off;
|
|
|
|
return off;
|
|
|
|
}
|
2021-11-29 12:19:36 +00:00
|
|
|
|
2021-11-29 09:01:34 +00:00
|
|
|
size_t LocalCachedFileReader::getSize()
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
|
|
|
if (file_size != 0)
|
|
|
|
return file_size;
|
2021-11-29 09:01:34 +00:00
|
|
|
|
2021-11-05 11:55:30 +00:00
|
|
|
if (local_path.empty())
|
|
|
|
{
|
2021-11-29 09:01:34 +00:00
|
|
|
LOG_TRACE(log, "Empty local_path");
|
2021-11-05 11:55:30 +00:00
|
|
|
return 0;
|
|
|
|
}
|
2021-11-11 07:33:58 +00:00
|
|
|
|
2021-11-29 09:01:34 +00:00
|
|
|
file_size = fs::file_size(local_path);
|
|
|
|
return file_size;
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// the size need be equal to the original buffer
|
2021-11-29 12:19:36 +00:00
|
|
|
RemoteReadBuffer::RemoteReadBuffer(size_t buff_size) : BufferWithOwnMemory<SeekableReadBufferWithSize>(buff_size)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
RemoteReadBuffer::~RemoteReadBuffer() = default;
|
|
|
|
|
2021-11-29 08:22:43 +00:00
|
|
|
std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(const RemoteFileMetadata & remote_file_meta, std::unique_ptr<ReadBuffer> read_buffer)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-29 04:17:22 +00:00
|
|
|
auto * log = &Poco::Logger::get("RemoteReadBuffer");
|
2021-11-05 11:55:30 +00:00
|
|
|
size_t buff_size = DBMS_DEFAULT_BUFFER_SIZE;
|
2021-11-29 05:01:03 +00:00
|
|
|
if (read_buffer)
|
|
|
|
buff_size = read_buffer->internalBuffer().size();
|
2021-11-05 11:55:30 +00:00
|
|
|
/*
|
|
|
|
* in the new implement of ReadBufferFromHDFS, buffer size is 0.
|
|
|
|
*
|
|
|
|
* in the common case, we don't read bytes from readbuffer directly, so set buff_size = DBMS_DEFAULT_BUFFER_SIZE
|
|
|
|
* is OK.
|
|
|
|
*
|
|
|
|
* we need be careful with the case without local file reader.
|
|
|
|
*/
|
|
|
|
if (buff_size == 0)
|
|
|
|
buff_size = DBMS_DEFAULT_BUFFER_SIZE;
|
|
|
|
|
2021-11-29 08:22:43 +00:00
|
|
|
const auto & remote_path = remote_file_meta.path;
|
2021-11-29 05:01:03 +00:00
|
|
|
auto remote_read_buffer = std::make_unique<RemoteReadBuffer>(buff_size);
|
|
|
|
auto * raw_rbp = read_buffer.release();
|
2021-11-05 11:55:30 +00:00
|
|
|
std::shared_ptr<ReadBuffer> srb(raw_rbp);
|
2021-11-11 06:12:15 +00:00
|
|
|
RemoteReadBufferCacheError error;
|
2021-11-05 11:55:30 +00:00
|
|
|
int retry = 0;
|
|
|
|
do
|
|
|
|
{
|
|
|
|
if (retry > 0)
|
2021-11-29 05:01:03 +00:00
|
|
|
sleepForMicroseconds(20 * retry);
|
2021-11-05 11:55:30 +00:00
|
|
|
|
2021-11-29 08:22:43 +00:00
|
|
|
std::tie(remote_read_buffer->file_reader, error) = RemoteReadBufferCache::instance().createReader(remote_file_meta, srb);
|
2021-11-05 11:55:30 +00:00
|
|
|
retry++;
|
2021-11-11 06:12:15 +00:00
|
|
|
} while (error == RemoteReadBufferCacheError::FILE_INVALID && retry < 10);
|
2021-11-29 05:01:03 +00:00
|
|
|
if (remote_read_buffer->file_reader == nullptr)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-29 09:01:34 +00:00
|
|
|
LOG_ERROR(log, "Failed to allocate local file for remote path: {}, reason: {}", remote_path, error);
|
2021-11-29 12:19:36 +00:00
|
|
|
remote_read_buffer->original_read_buffer = srb;
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
2021-11-29 05:01:03 +00:00
|
|
|
return remote_read_buffer;
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bool RemoteReadBuffer::nextImpl()
|
|
|
|
{
|
2021-11-29 05:01:03 +00:00
|
|
|
if (file_reader)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
|
|
|
int bytes_read = file_reader->read(internal_buffer.begin(), internal_buffer.size());
|
|
|
|
if (bytes_read)
|
|
|
|
working_buffer.resize(bytes_read);
|
|
|
|
else
|
|
|
|
return false;
|
|
|
|
}
|
2021-11-29 12:19:36 +00:00
|
|
|
else
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-29 12:19:36 +00:00
|
|
|
// In the case we cannot use local cache, read from the original readbuffer directly
|
|
|
|
if (!original_read_buffer)
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Original read buffer is not initialized. It's a bug");
|
2021-11-29 09:01:34 +00:00
|
|
|
|
2021-11-29 12:19:36 +00:00
|
|
|
auto status = original_read_buffer->next();
|
|
|
|
// We don't need to worry about the memory buffer allocated in RemoteReadBuffer, since it is owned by
|
2021-11-05 11:55:30 +00:00
|
|
|
// BufferWithOwnMemory, BufferWithOwnMemory would release it.
|
|
|
|
if (status)
|
2021-11-29 12:19:36 +00:00
|
|
|
BufferBase::set(original_read_buffer->buffer().begin(), original_read_buffer->buffer().size(), original_read_buffer->offset());
|
2021-11-05 11:55:30 +00:00
|
|
|
return status;
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
off_t RemoteReadBuffer::seek(off_t offset, int whence)
|
|
|
|
{
|
|
|
|
off_t pos_in_file = file_reader->getOffset();
|
|
|
|
off_t new_pos;
|
|
|
|
if (whence == SEEK_SET)
|
|
|
|
new_pos = offset;
|
|
|
|
else if (whence == SEEK_CUR)
|
2021-11-29 06:50:33 +00:00
|
|
|
new_pos = pos_in_file - available() + offset;
|
2021-11-05 11:55:30 +00:00
|
|
|
else
|
2021-11-29 09:01:34 +00:00
|
|
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expects SEEK_SET or SEEK_CUR as whence but given {}", whence);
|
2021-11-05 11:55:30 +00:00
|
|
|
|
|
|
|
/// Position is unchanged.
|
2021-11-29 06:50:33 +00:00
|
|
|
if (off_t(new_pos + available()) == pos_in_file)
|
2021-11-05 11:55:30 +00:00
|
|
|
return new_pos;
|
|
|
|
|
|
|
|
if (new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast<off_t>(working_buffer.size()))
|
|
|
|
{
|
|
|
|
/// Position is still inside buffer.
|
|
|
|
pos = working_buffer.begin() + (new_pos - (pos_in_file - working_buffer.size()));
|
|
|
|
return new_pos;
|
|
|
|
}
|
|
|
|
|
|
|
|
pos = working_buffer.end();
|
|
|
|
auto ret_off = file_reader->seek(new_pos);
|
|
|
|
return ret_off;
|
|
|
|
}
|
|
|
|
|
|
|
|
off_t RemoteReadBuffer::getPosition()
|
|
|
|
{
|
2021-11-11 06:12:15 +00:00
|
|
|
return file_reader->getOffset() - available();
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
RemoteReadBufferCache::RemoteReadBufferCache() = default;
|
|
|
|
|
2021-11-15 08:47:12 +00:00
|
|
|
RemoteReadBufferCache::~RemoteReadBufferCache()
|
|
|
|
{
|
2021-11-29 03:47:31 +00:00
|
|
|
thread_pool->wait();
|
2021-11-15 08:47:12 +00:00
|
|
|
}
|
2021-11-05 11:55:30 +00:00
|
|
|
|
|
|
|
RemoteReadBufferCache & RemoteReadBufferCache::instance()
|
|
|
|
{
|
|
|
|
static RemoteReadBufferCache instance;
|
|
|
|
return instance;
|
|
|
|
}
|
|
|
|
|
2021-11-15 02:09:21 +00:00
|
|
|
void RemoteReadBufferCache::recoverCachedFilesMeta(
|
2021-11-29 08:22:43 +00:00
|
|
|
const fs::path & current_path,
|
2021-11-29 03:30:11 +00:00
|
|
|
size_t current_depth,
|
|
|
|
size_t max_depth,
|
|
|
|
std::function<void(RemoteCacheController *)> const & finish_callback)
|
2021-11-12 05:12:24 +00:00
|
|
|
{
|
2021-11-15 08:47:12 +00:00
|
|
|
if (current_depth >= max_depth)
|
2021-11-12 05:12:24 +00:00
|
|
|
{
|
2021-11-29 08:22:43 +00:00
|
|
|
for (auto const & dir : fs::directory_iterator{current_path})
|
2021-11-15 08:47:12 +00:00
|
|
|
{
|
|
|
|
std::string path = dir.path();
|
|
|
|
auto cache_controller = RemoteCacheController::recover(path, finish_callback);
|
|
|
|
if (!cache_controller)
|
|
|
|
continue;
|
2021-11-29 03:30:11 +00:00
|
|
|
auto & cell = caches[path];
|
2021-11-15 08:47:12 +00:00
|
|
|
cell.cache_controller = cache_controller;
|
|
|
|
cell.key_iterator = keys.insert(keys.end(), path);
|
|
|
|
}
|
|
|
|
return;
|
2021-11-12 05:12:24 +00:00
|
|
|
}
|
2021-11-15 08:47:12 +00:00
|
|
|
|
2021-11-29 08:22:43 +00:00
|
|
|
for (auto const & dir : fs::directory_iterator{current_path})
|
2021-11-15 08:47:12 +00:00
|
|
|
{
|
|
|
|
recoverCachedFilesMeta(dir.path(), current_depth + 1, max_depth, finish_callback);
|
|
|
|
}
|
2021-11-12 05:12:24 +00:00
|
|
|
}
|
|
|
|
|
2021-11-29 03:30:11 +00:00
|
|
|
void RemoteReadBufferCache::initOnce(
|
2021-11-29 08:22:43 +00:00
|
|
|
const String & root_dir_, size_t limit_size_, size_t bytes_read_before_flush_, size_t max_threads_)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-29 04:01:52 +00:00
|
|
|
LOG_INFO(
|
2021-11-29 08:22:43 +00:00
|
|
|
log, "Initializing local cache for remote data sources. Local cache root path: {}, cache size limit: {}", root_dir_, limit_size_);
|
|
|
|
root_dir = root_dir_;
|
2021-11-05 11:55:30 +00:00
|
|
|
limit_size = limit_size_;
|
2021-11-11 11:50:08 +00:00
|
|
|
local_cache_bytes_read_before_flush = bytes_read_before_flush_;
|
2021-11-29 08:22:43 +00:00
|
|
|
thread_pool = std::make_shared<FreeThreadPool>(max_threads_, 1000, 1000, false);
|
2021-11-05 11:55:30 +00:00
|
|
|
|
2021-11-29 08:22:43 +00:00
|
|
|
/// create if root_dir not exists
|
|
|
|
if (!fs::exists(fs::path(root_dir) / ""))
|
2021-11-11 06:12:15 +00:00
|
|
|
{
|
2021-11-29 08:22:43 +00:00
|
|
|
std::error_code ec;
|
|
|
|
bool success = fs::create_directories(fs::path(root_dir) / "", ec);
|
|
|
|
if (!success)
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::CANNOT_CREATE_DIRECTORY, "Failed to create directories, error code:{} reason:{}", ec.value(), ec.message());
|
2021-11-11 06:12:15 +00:00
|
|
|
}
|
2021-11-29 08:22:43 +00:00
|
|
|
|
2021-11-29 09:01:34 +00:00
|
|
|
auto recover_task = [this]()
|
|
|
|
{
|
2021-11-29 08:22:43 +00:00
|
|
|
auto callback = [this](RemoteCacheController * cache_controller) { total_size += cache_controller->size(); };
|
2021-11-29 06:50:33 +00:00
|
|
|
std::lock_guard lock(mutex);
|
2021-11-15 08:47:12 +00:00
|
|
|
recoverCachedFilesMeta(root_dir, 1, 2, callback);
|
2021-11-29 06:50:33 +00:00
|
|
|
initialized = true;
|
2021-11-29 09:01:34 +00:00
|
|
|
LOG_TRACE(log, "Recovered from directory:{}", root_dir);
|
2021-11-15 08:47:12 +00:00
|
|
|
};
|
2021-11-29 03:47:31 +00:00
|
|
|
getThreadPool()->scheduleOrThrow(recover_task);
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
|
2021-11-29 08:22:43 +00:00
|
|
|
String RemoteReadBufferCache::calculateLocalPath(const RemoteFileMetadata & meta) const
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-29 08:22:43 +00:00
|
|
|
String full_path = meta.schema + ":" + meta.cluster + ":" + meta.path;
|
2021-11-15 02:09:21 +00:00
|
|
|
UInt128 hashcode = sipHash128(full_path.c_str(), full_path.size());
|
2021-11-29 08:22:43 +00:00
|
|
|
String hashcode_str = getHexUIntLowercase(hashcode);
|
|
|
|
return fs::path(root_dir) / hashcode_str.substr(0, 3) / hashcode_str;
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
|
2021-11-29 05:01:03 +00:00
|
|
|
std::pair<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError>
|
|
|
|
RemoteReadBufferCache::createReader(const RemoteFileMetadata & remote_file_meta, std::shared_ptr<ReadBuffer> & read_buffer)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-15 11:08:58 +00:00
|
|
|
// If something is wrong on startup, rollback to read from the original ReadBuffer
|
2021-11-29 05:01:03 +00:00
|
|
|
if (!isInitialized())
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-29 09:01:34 +00:00
|
|
|
LOG_ERROR(log, "RemoteReadBufferCache not initialized yet");
|
2021-11-11 06:12:15 +00:00
|
|
|
return {nullptr, RemoteReadBufferCacheError::NOT_INIT};
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
2021-11-15 08:47:12 +00:00
|
|
|
|
2021-11-12 05:12:24 +00:00
|
|
|
auto remote_path = remote_file_meta.path;
|
2021-11-12 08:43:35 +00:00
|
|
|
const auto & file_size = remote_file_meta.file_size;
|
2021-11-29 05:01:03 +00:00
|
|
|
const auto & last_modification_timestamp = remote_file_meta.last_modify_time;
|
2021-11-12 05:12:24 +00:00
|
|
|
auto local_path = calculateLocalPath(remote_file_meta);
|
2021-11-05 11:55:30 +00:00
|
|
|
std::lock_guard lock(mutex);
|
2021-11-11 06:12:15 +00:00
|
|
|
auto cache_iter = caches.find(local_path);
|
|
|
|
if (cache_iter != caches.end())
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
|
|
|
// if the file has been update on remote side, we need to redownload it
|
2021-11-12 05:12:24 +00:00
|
|
|
if (cache_iter->second.cache_controller->getLastModificationTimestamp() != last_modification_timestamp)
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-29 03:30:11 +00:00
|
|
|
LOG_TRACE(
|
|
|
|
log,
|
2021-11-29 06:50:33 +00:00
|
|
|
"Remote file ({}) has been updated. Last saved modification time: {}, actual last modification time: {}",
|
|
|
|
remote_path,
|
|
|
|
std::to_string(cache_iter->second.cache_controller->getLastModificationTimestamp()),
|
|
|
|
std::to_string(last_modification_timestamp));
|
2021-11-11 06:12:15 +00:00
|
|
|
cache_iter->second.cache_controller->markInvalid();
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
// move the key to the list end
|
2021-11-11 06:12:15 +00:00
|
|
|
keys.splice(keys.end(), keys, cache_iter->second.key_iterator);
|
2021-11-29 03:30:11 +00:00
|
|
|
return {
|
|
|
|
std::make_shared<LocalCachedFileReader>(cache_iter->second.cache_controller.get(), file_size),
|
|
|
|
RemoteReadBufferCacheError::OK};
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
auto clear_ret = clearLocalCache();
|
2021-11-11 06:12:15 +00:00
|
|
|
cache_iter = caches.find(local_path);
|
|
|
|
if (cache_iter != caches.end())
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-11 06:12:15 +00:00
|
|
|
if (cache_iter->second.cache_controller->isValid())
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-05 12:27:37 +00:00
|
|
|
// move the key to the list end, this case should not happen?
|
2021-11-11 06:12:15 +00:00
|
|
|
keys.splice(keys.end(), keys, cache_iter->second.key_iterator);
|
2021-11-29 03:30:11 +00:00
|
|
|
return {
|
|
|
|
std::make_shared<LocalCachedFileReader>(cache_iter->second.cache_controller.get(), file_size),
|
|
|
|
RemoteReadBufferCacheError::OK};
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
// maybe someone is holding this file
|
2021-11-11 06:12:15 +00:00
|
|
|
return {nullptr, RemoteReadBufferCacheError::FILE_INVALID};
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// reach the disk capacity limit
|
|
|
|
if (!clear_ret)
|
|
|
|
{
|
2021-11-29 06:50:33 +00:00
|
|
|
LOG_INFO(log, "Reached local cache capacity limit size ({})", limit_size);
|
2021-11-11 06:12:15 +00:00
|
|
|
return {nullptr, RemoteReadBufferCacheError::DISK_FULL};
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
|
2021-11-29 08:22:43 +00:00
|
|
|
fs::create_directories(local_path);
|
2021-11-05 11:55:30 +00:00
|
|
|
|
2021-11-29 06:50:33 +00:00
|
|
|
auto callback = [this](RemoteCacheController * cntrl) { total_size += cntrl->size(); };
|
|
|
|
auto cache_controller
|
2021-11-29 05:01:03 +00:00
|
|
|
= std::make_shared<RemoteCacheController>(remote_file_meta, local_path, local_cache_bytes_read_before_flush, read_buffer, callback);
|
2021-11-29 06:50:33 +00:00
|
|
|
CacheCell cache_cell;
|
|
|
|
cache_cell.cache_controller = cache_controller;
|
|
|
|
cache_cell.key_iterator = keys.insert(keys.end(), local_path);
|
|
|
|
caches[local_path] = cache_cell;
|
2021-11-05 11:55:30 +00:00
|
|
|
|
2021-11-29 06:50:33 +00:00
|
|
|
return {std::make_shared<LocalCachedFileReader>(cache_controller.get(), file_size), RemoteReadBufferCacheError::OK};
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bool RemoteReadBufferCache::clearLocalCache()
|
|
|
|
{
|
|
|
|
for (auto it = keys.begin(); it != keys.end();)
|
|
|
|
{
|
2021-11-29 06:50:33 +00:00
|
|
|
// TODO keys is not thread-safe
|
2021-11-12 05:12:24 +00:00
|
|
|
auto cache_it = caches.find(*it);
|
2021-11-29 06:50:33 +00:00
|
|
|
auto cache_controller = cache_it->second.cache_controller;
|
|
|
|
if (!cache_controller->isValid() && cache_controller->closable())
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-29 09:01:34 +00:00
|
|
|
LOG_TRACE(log, "Clear invalid cache entry with key {} from local cache", *it);
|
2021-11-29 03:30:11 +00:00
|
|
|
total_size
|
|
|
|
= total_size > cache_it->second.cache_controller->size() ? total_size - cache_it->second.cache_controller->size() : 0;
|
2021-11-29 06:50:33 +00:00
|
|
|
cache_controller->close();
|
2021-11-05 11:55:30 +00:00
|
|
|
it = keys.erase(it);
|
2021-11-12 05:12:24 +00:00
|
|
|
caches.erase(cache_it);
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
it++;
|
|
|
|
}
|
|
|
|
// clear closable cache from the list head
|
|
|
|
for (auto it = keys.begin(); it != keys.end();)
|
|
|
|
{
|
|
|
|
if (total_size < limit_size)
|
|
|
|
break;
|
2021-11-12 05:12:24 +00:00
|
|
|
auto cache_it = caches.find(*it);
|
|
|
|
if (cache_it == caches.end())
|
2021-11-29 09:01:34 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Found no entry in local cache with key: {}", *it);
|
|
|
|
|
2021-11-12 05:12:24 +00:00
|
|
|
if (cache_it->second.cache_controller->closable())
|
2021-11-05 11:55:30 +00:00
|
|
|
{
|
2021-11-29 03:30:11 +00:00
|
|
|
total_size
|
|
|
|
= total_size > cache_it->second.cache_controller->size() ? total_size - cache_it->second.cache_controller->size() : 0;
|
2021-11-12 05:12:24 +00:00
|
|
|
cache_it->second.cache_controller->close();
|
|
|
|
caches.erase(cache_it);
|
2021-11-05 11:55:30 +00:00
|
|
|
it = keys.erase(it);
|
2021-11-29 03:30:11 +00:00
|
|
|
LOG_TRACE(
|
|
|
|
log,
|
|
|
|
"clear local file {} for {}. key size:{}. next{}",
|
2021-11-29 08:22:43 +00:00
|
|
|
cache_it->second.cache_controller->getLocalPath(),
|
2021-11-29 03:30:11 +00:00
|
|
|
cache_it->second.cache_controller->getRemotePath(),
|
|
|
|
keys.size(),
|
|
|
|
*it);
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
break;
|
|
|
|
}
|
2021-11-29 09:01:34 +00:00
|
|
|
LOG_TRACE(log, "After clear local cache, keys size:{}, total_size:{}, limit size:{}", keys.size(), total_size, limit_size);
|
2021-11-12 05:12:24 +00:00
|
|
|
return total_size < limit_size;
|
2021-11-05 11:55:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|