mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-13 18:02:24 +00:00
fixed code style
This commit is contained in:
parent
5913d67553
commit
dd70209623
@ -1,4 +1,4 @@
|
||||
#include <IO/RemoteFileMetaDataBase.h>
|
||||
#include <IO/IRemoteFileMetadata.h>
|
||||
#include <Common/Exception.h>
|
||||
namespace DB
|
||||
{
|
||||
@ -7,15 +7,15 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
RemoteFileMetaDataBase::~RemoteFileMetaDataBase() {}
|
||||
IRemoteFileMetadata::~IRemoteFileMetadata() {}
|
||||
|
||||
RemoteFileMetaDataFactory & RemoteFileMetaDataFactory::instance()
|
||||
RemoteFileMetadataFactory & RemoteFileMetadataFactory::instance()
|
||||
{
|
||||
static RemoteFileMetaDataFactory g_factory;
|
||||
static RemoteFileMetadataFactory g_factory;
|
||||
return g_factory;
|
||||
}
|
||||
|
||||
RemoteFileMetaDataBasePtr RemoteFileMetaDataFactory::createClass(const String & class_name)
|
||||
IRemoteFileMetadataPtr RemoteFileMetadataFactory::createClass(const String & class_name)
|
||||
{
|
||||
auto it = class_creators.find(class_name);
|
||||
if (it == class_creators.end())
|
||||
@ -23,7 +23,7 @@ RemoteFileMetaDataBasePtr RemoteFileMetaDataFactory::createClass(const String &
|
||||
return (it->second)();
|
||||
}
|
||||
|
||||
void RemoteFileMetaDataFactory::registerClass(const String & class_name, ClassCreator creator)
|
||||
void RemoteFileMetadataFactory::registerClass(const String & class_name, ClassCreator creator)
|
||||
{
|
||||
auto it = class_creators.find(class_name);
|
||||
if (it != class_creators.end())
|
@ -7,27 +7,21 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class RemoteFileMetaDataBase
|
||||
class IRemoteFileMetadata
|
||||
{
|
||||
public:
|
||||
RemoteFileMetaDataBase() = default;
|
||||
RemoteFileMetaDataBase(const String & schema_,
|
||||
const String & cluster_,
|
||||
const String & remote_path_,
|
||||
IRemoteFileMetadata() = default;
|
||||
IRemoteFileMetadata(const String & remote_path_,
|
||||
size_t file_size_,
|
||||
UInt64 last_modification_timestamp_):
|
||||
schema(schema_)
|
||||
,cluster(cluster_)
|
||||
,remote_path(remote_path_)
|
||||
remote_path(remote_path_)
|
||||
,file_size(file_size_)
|
||||
,last_modification_timestamp(last_modification_timestamp_)
|
||||
{
|
||||
}
|
||||
virtual ~RemoteFileMetaDataBase();
|
||||
virtual ~IRemoteFileMetadata();
|
||||
virtual String getName() const = 0; //class name
|
||||
// methods for basic information
|
||||
inline String getSchema() const { return schema; }
|
||||
inline String getCluster() const { return cluster; }
|
||||
inline size_t getFileSize() const { return file_size; }
|
||||
inline String getRemotePath() const { return remote_path; }
|
||||
inline UInt64 getLastModificationTimestamp() const { return last_modification_timestamp; }
|
||||
@ -40,34 +34,32 @@ public:
|
||||
// used for comparing two file meta datas are the same or not.
|
||||
virtual String getVersion() const = 0;
|
||||
protected:
|
||||
String schema;
|
||||
String cluster;
|
||||
String remote_path;
|
||||
size_t file_size = 0;
|
||||
UInt64 last_modification_timestamp = 0;
|
||||
};
|
||||
|
||||
using RemoteFileMetaDataBasePtr = std::shared_ptr<RemoteFileMetaDataBase>;
|
||||
using IRemoteFileMetadataPtr = std::shared_ptr<IRemoteFileMetadata>;
|
||||
|
||||
/*
|
||||
* How to register a subclass into the factory and use it ?
|
||||
* 1) define your own subclass derive from RemoteFileMetaDataBase. Notice! the getClassName() must be the same
|
||||
* 1) define your own subclass derive from IRemoteFileMetadata. Notice! the getClassName() must be the same
|
||||
* as your subclass name.
|
||||
* 2) in a .cpp file, call REGISTTER_REMOTE_FILE_META_DATA_CLASS(subclass),
|
||||
* 3) call RemoteFileMetaDataFactory::instance().createClass(subclass_name) where you want to make a new object
|
||||
* 3) call RemoteFileMetadataFactory::instance().createClass(subclass_name) where you want to make a new object
|
||||
*/
|
||||
|
||||
class RemoteFileMetaDataFactory : private boost::noncopyable
|
||||
class RemoteFileMetadataFactory : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
using ClassCreator = std::function<RemoteFileMetaDataBasePtr()>;
|
||||
~RemoteFileMetaDataFactory() = default;
|
||||
using ClassCreator = std::function<IRemoteFileMetadataPtr()>;
|
||||
~RemoteFileMetadataFactory() = default;
|
||||
|
||||
static RemoteFileMetaDataFactory & instance();
|
||||
RemoteFileMetaDataBasePtr createClass(const String & class_name);
|
||||
static RemoteFileMetadataFactory & instance();
|
||||
IRemoteFileMetadataPtr createClass(const String & class_name);
|
||||
void registerClass(const String &class_name, ClassCreator creator);
|
||||
protected:
|
||||
RemoteFileMetaDataFactory() = default;
|
||||
RemoteFileMetadataFactory() = default;
|
||||
|
||||
private:
|
||||
std::unordered_map<String, ClassCreator> class_creators;
|
||||
@ -75,12 +67,12 @@ private:
|
||||
|
||||
// this should be used in a .cpp file. All the subclasses will finish the registeration before the main()
|
||||
#define REGISTTER_REMOTE_FILE_META_DATA_CLASS(meta_data_class) \
|
||||
class FileMetaDataFactory##meta_data_class{\
|
||||
class FileMetadataFactory##meta_data_class{\
|
||||
public:\
|
||||
FileMetaDataFactory##meta_data_class(){\
|
||||
auto creator = []() -> RemoteFileMetaDataBasePtr { return std::make_shared<meta_data_class>(); };\
|
||||
RemoteFileMetaDataFactory::instance().registerClass(#meta_data_class, creator);\
|
||||
FileMetadataFactory##meta_data_class(){\
|
||||
auto creator = []() -> IRemoteFileMetadataPtr { return std::make_shared<meta_data_class>(); };\
|
||||
RemoteFileMetadataFactory::instance().registerClass(#meta_data_class, creator);\
|
||||
}\
|
||||
};\
|
||||
static FileMetaDataFactory##meta_data_class g_file_meta_data_factory_instance##meta_data_class;
|
||||
static FileMetadataFactory##meta_data_class g_file_meta_data_factory_instance##meta_data_class;
|
||||
}
|
@ -32,9 +32,9 @@ bool RemoteCacheController::loadInnerInformation(const fs::path & file_path)
|
||||
return false;
|
||||
std::ifstream info_file(file_path);
|
||||
Poco::JSON::Parser info_parser;
|
||||
auto info_jobj = info_parser.parse(info_file).extract<Poco::JSON::Object::Ptr>();
|
||||
file_status = static_cast<LocalFileStatus>(info_jobj->get("file_status").convert<Int32>());
|
||||
meta_data_class = info_jobj->get("meta_data_class").convert<String>();
|
||||
auto info_json = info_parser.parse(info_file).extract<Poco::JSON::Object::Ptr>();
|
||||
file_status = static_cast<LocalFileStatus>(info_json->get("file_status").convert<Int32>());
|
||||
metadata_class = info_json->get("metadata_class").convert<String>();
|
||||
info_file.close();
|
||||
return true;
|
||||
}
|
||||
@ -57,19 +57,19 @@ std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(const std:
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
cache_controller->file_meta_data_ptr = RemoteFileMetaDataFactory::instance().createClass(cache_controller->meta_data_class);
|
||||
if (!cache_controller->file_meta_data_ptr)
|
||||
cache_controller->file_metadata_ptr = RemoteFileMetadataFactory::instance().createClass(cache_controller->metadata_class);
|
||||
if (!cache_controller->file_metadata_ptr)
|
||||
{
|
||||
// do not load this invalid cached file and clear it. the clear action is in
|
||||
// RemoteReadBufferCache::recoverCachedFilesMetaData(), because deleting directories during iteration will
|
||||
// RemoteReadBufferCache::recoverCachedFilesMetadata(), because deleting directories during iteration will
|
||||
// cause unexpected behaviors
|
||||
LOG_ERROR(log, "Cannot create the meta data class : {}. The cached file is invalid and will be remove. path:{}",
|
||||
cache_controller->meta_data_class,
|
||||
cache_controller->metadata_class,
|
||||
local_path_.string());
|
||||
return nullptr;
|
||||
}
|
||||
std::ifstream meta_data_file(local_path_ / "meta_data.txt");
|
||||
if (!cache_controller->file_meta_data_ptr->fromString(std::string((std::istreambuf_iterator<char>(meta_data_file)),
|
||||
std::ifstream metadata_file(local_path_ / "metadata.txt");
|
||||
if (!cache_controller->file_metadata_ptr->fromString(std::string((std::istreambuf_iterator<char>(metadata_file)),
|
||||
std::istreambuf_iterator<char>())))
|
||||
{
|
||||
LOG_ERROR(log, "Cannot load the meta data. The cached file is invalid and will be remove. path:{}",
|
||||
@ -79,27 +79,28 @@ std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(const std:
|
||||
|
||||
cache_controller->current_offset = fs::file_size(local_path_ / "data.bin");
|
||||
|
||||
RemoteReadBufferCache::instance().updateTotalSize(cache_controller->file_meta_data_ptr->getFileSize());
|
||||
RemoteReadBufferCache::instance().updateTotalSize(cache_controller->file_metadata_ptr->getFileSize());
|
||||
return cache_controller;
|
||||
}
|
||||
|
||||
RemoteCacheController::RemoteCacheController(
|
||||
RemoteFileMetaDataBasePtr file_meta_data_,
|
||||
IRemoteFileMetadataPtr file_metadata_,
|
||||
const std::filesystem::path & local_path_,
|
||||
size_t cache_bytes_before_flush_)
|
||||
: file_meta_data_ptr(file_meta_data_)
|
||||
: file_metadata_ptr(file_metadata_)
|
||||
, local_path(local_path_)
|
||||
, valid(true)
|
||||
, local_cache_bytes_read_before_flush(cache_bytes_before_flush_)
|
||||
, current_offset(0)
|
||||
{
|
||||
// on recover, file_meta_data_ptr is null, but it will be allocated after loading from meta_data.txt
|
||||
// when we allocate a whole new file cache , file_meta_data_ptr must not be null.
|
||||
if (file_meta_data_ptr)
|
||||
// on recover, file_metadata_ptr is null, but it will be allocated after loading from metadata.txt
|
||||
// when we allocate a whole new file cache , file_metadata_ptr must not be null.
|
||||
if (file_metadata_ptr)
|
||||
{
|
||||
std::ofstream meta_data_file(local_path_ / "meta_data.txt", std::ios::out);
|
||||
meta_data_file << file_meta_data_ptr->toString();
|
||||
meta_data_file.close();
|
||||
auto metadata_file_writer = std::make_unique<WriteBufferFromFile>((local_path_ / "metadata.txt").string());
|
||||
auto str_buf = file_metadata_ptr->toString();
|
||||
metadata_file_writer->write(str_buf.c_str(), str_buf.size());
|
||||
metadata_file_writer->close();
|
||||
}
|
||||
}
|
||||
|
||||
@ -129,21 +130,22 @@ RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offs
|
||||
return RemoteReadBufferCacheError::OK;
|
||||
}
|
||||
|
||||
bool RemoteCacheController::checkFileChanged(RemoteFileMetaDataBasePtr file_meta_data_)
|
||||
bool RemoteCacheController::checkFileChanged(IRemoteFileMetadataPtr file_metadata_)
|
||||
{
|
||||
return !(file_meta_data_ptr->getVersion() == file_meta_data_->getVersion());
|
||||
return !(file_metadata_ptr->getVersion() == file_metadata_->getVersion());
|
||||
}
|
||||
|
||||
void RemoteCacheController::startBackgroundDownload(std::shared_ptr<ReadBuffer> input_readbuffer, BackgroundSchedulePool & thread_pool)
|
||||
void RemoteCacheController::startBackgroundDownload(std::unique_ptr<ReadBuffer> in_readbuffer_, BackgroundSchedulePool & thread_pool)
|
||||
{
|
||||
data_file_writer = std::make_unique<WriteBufferFromFile>((fs::path(local_path) / "data.bin").string());
|
||||
flush(true);
|
||||
ReadBufferPtr in_readbuffer(in_readbuffer_.release());
|
||||
download_task_holder = thread_pool.createTask("download remote file",
|
||||
[this,input_readbuffer]{ backgroundDownload(input_readbuffer); });
|
||||
[this, in_readbuffer]{ backgroundDownload(in_readbuffer); });
|
||||
download_task_holder->activateAndSchedule();
|
||||
}
|
||||
|
||||
void RemoteCacheController::backgroundDownload(std::shared_ptr<ReadBuffer> remote_read_buffer)
|
||||
void RemoteCacheController::backgroundDownload(ReadBufferPtr remote_read_buffer)
|
||||
{
|
||||
file_status = DOWNLOADING;
|
||||
size_t before_unflush_bytes = 0;
|
||||
@ -174,8 +176,8 @@ void RemoteCacheController::backgroundDownload(std::shared_ptr<ReadBuffer> remot
|
||||
data_file_writer.reset();
|
||||
lock.unlock();
|
||||
more_data_signal.notify_all();
|
||||
RemoteReadBufferCache::instance().updateTotalSize(file_meta_data_ptr->getFileSize());
|
||||
LOG_TRACE(log, "Finish download into local path: {}, file meta data:{} ", local_path.string(), file_meta_data_ptr->toString());
|
||||
RemoteReadBufferCache::instance().updateTotalSize(file_metadata_ptr->getFileSize());
|
||||
LOG_TRACE(log, "Finish download into local path: {}, file meta data:{} ", local_path.string(), file_metadata_ptr->toString());
|
||||
}
|
||||
|
||||
void RemoteCacheController::flush(bool need_flush_status)
|
||||
@ -186,14 +188,14 @@ void RemoteCacheController::flush(bool need_flush_status)
|
||||
}
|
||||
if (need_flush_status)
|
||||
{
|
||||
auto file_writer = std::make_unique<WriteBufferFromFile>(local_path / "info.txt");
|
||||
Poco::JSON::Object jobj;
|
||||
jobj.set("file_status", static_cast<Int32>(file_status));
|
||||
jobj.set("meta_data_class", meta_data_class);
|
||||
jobj.set("metadata_class", metadata_class);
|
||||
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
jobj.stringify(buf);
|
||||
std::ofstream info_file(local_path / "info.txt");
|
||||
info_file << buf.str();
|
||||
info_file.close();
|
||||
file_writer->write(buf.str().c_str(), buf.str().size());
|
||||
file_writer->close();
|
||||
}
|
||||
}
|
||||
|
||||
@ -239,7 +241,7 @@ void RemoteCacheController::deallocFile(std::unique_ptr<ReadBufferFromFileBase>
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Try to deallocate file with invalid handler remote path: {}, local path: {}",
|
||||
file_meta_data_ptr->getRemotePath(),
|
||||
file_metadata_ptr->getRemotePath(),
|
||||
local_path.string());
|
||||
}
|
||||
opened_file_buffer_refs.erase(it);
|
||||
@ -255,7 +257,7 @@ RemoteReadBuffer::~RemoteReadBuffer()
|
||||
file_cache_controller->deallocFile(std::move(file_buffer));
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> RemoteReadBuffer::create(ContextPtr context, RemoteFileMetaDataBasePtr remote_file_meta_data, std::unique_ptr<ReadBuffer> read_buffer)
|
||||
std::unique_ptr<ReadBuffer> RemoteReadBuffer::create(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> read_buffer)
|
||||
{
|
||||
auto * log = &Poco::Logger::get("RemoteReadBuffer");
|
||||
size_t buff_size = DBMS_DEFAULT_BUFFER_SIZE;
|
||||
@ -272,11 +274,11 @@ std::unique_ptr<ReadBuffer> RemoteReadBuffer::create(ContextPtr context, RemoteF
|
||||
if (buff_size == 0)
|
||||
buff_size = DBMS_DEFAULT_BUFFER_SIZE;
|
||||
|
||||
auto remote_path = remote_file_meta_data->getRemotePath();
|
||||
auto remote_path = remote_file_metadata->getRemotePath();
|
||||
auto remote_read_buffer = std::make_unique<RemoteReadBuffer>(buff_size);
|
||||
RemoteReadBufferCacheError error;
|
||||
|
||||
std::tie(remote_read_buffer->file_cache_controller, read_buffer, error) = RemoteReadBufferCache::instance().createReader(context, remote_file_meta_data, read_buffer);
|
||||
std::tie(remote_read_buffer->file_cache_controller, read_buffer, error) = RemoteReadBufferCache::instance().createReader(context, remote_file_metadata, read_buffer);
|
||||
if (remote_read_buffer->file_cache_controller == nullptr)
|
||||
{
|
||||
LOG_ERROR(log, "Failed to allocate local file for remote path: {}, reason: {}.", remote_path, error);
|
||||
@ -290,7 +292,7 @@ std::unique_ptr<ReadBuffer> RemoteReadBuffer::create(ContextPtr context, RemoteF
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Create file readbuffer failed. {}",
|
||||
remote_read_buffer->file_cache_controller->getLocalPath().string());
|
||||
}
|
||||
remote_read_buffer->remote_file_size = remote_file_meta_data->getFileSize();
|
||||
remote_read_buffer->remote_file_size = remote_file_metadata->getFileSize();
|
||||
return remote_read_buffer;
|
||||
}
|
||||
|
||||
@ -339,7 +341,7 @@ RemoteReadBufferCache & RemoteReadBufferCache::instance()
|
||||
return instance;
|
||||
}
|
||||
|
||||
void RemoteReadBufferCache::recoverCachedFilesMetaData(
|
||||
void RemoteReadBufferCache::recoverCachedFilesMetadata(
|
||||
const fs::path & current_path,
|
||||
size_t current_depth,
|
||||
size_t max_depth)
|
||||
@ -370,14 +372,14 @@ void RemoteReadBufferCache::recoverCachedFilesMetaData(
|
||||
|
||||
for (auto const & dir : fs::directory_iterator{current_path})
|
||||
{
|
||||
recoverCachedFilesMetaData(dir.path(), current_depth + 1, max_depth);
|
||||
recoverCachedFilesMetadata(dir.path(), current_depth + 1, max_depth);
|
||||
}
|
||||
}
|
||||
|
||||
void RemoteReadBufferCache::recoverTask()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
recoverCachedFilesMetaData(root_dir, 1, 2);
|
||||
recoverCachedFilesMetadata(root_dir, 1, 2);
|
||||
initialized = true;
|
||||
LOG_INFO(log, "Recovered from directory:{}", root_dir);
|
||||
}
|
||||
@ -406,18 +408,18 @@ void RemoteReadBufferCache::initOnce(
|
||||
recover_task_holder->activateAndSchedule();
|
||||
}
|
||||
|
||||
String RemoteReadBufferCache::calculateLocalPath(RemoteFileMetaDataBasePtr meta_data) const
|
||||
String RemoteReadBufferCache::calculateLocalPath(IRemoteFileMetadataPtr metadata) const
|
||||
{
|
||||
// add version into the full_path, and not block to read the new version
|
||||
String full_path = meta_data->getSchema() + ":" + meta_data->getCluster() + ":" + meta_data->getRemotePath()
|
||||
+ ":" + meta_data->getVersion();
|
||||
String full_path = metadata->getName() + ":" + metadata->getRemotePath()
|
||||
+ ":" + metadata->getVersion();
|
||||
UInt128 hashcode = sipHash128(full_path.c_str(), full_path.size());
|
||||
String hashcode_str = getHexUIntLowercase(hashcode);
|
||||
return fs::path(root_dir) / hashcode_str.substr(0, 3) / hashcode_str;
|
||||
}
|
||||
|
||||
std::tuple<RemoteCacheControllerPtr, std::unique_ptr<ReadBuffer>, RemoteReadBufferCacheError>
|
||||
RemoteReadBufferCache::createReader(ContextPtr context, RemoteFileMetaDataBasePtr remote_file_meta_data, std::unique_ptr<ReadBuffer> & read_buffer)
|
||||
RemoteReadBufferCache::createReader(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> & read_buffer)
|
||||
{
|
||||
// If something is wrong on startup, rollback to read from the original ReadBuffer
|
||||
if (!isInitialized())
|
||||
@ -426,15 +428,15 @@ RemoteReadBufferCache::createReader(ContextPtr context, RemoteFileMetaDataBasePt
|
||||
return {nullptr, std::move(read_buffer), RemoteReadBufferCacheError::NOT_INIT};
|
||||
}
|
||||
|
||||
auto remote_path = remote_file_meta_data->getRemotePath();
|
||||
const auto & last_modification_timestamp = remote_file_meta_data->getLastModificationTimestamp();
|
||||
auto local_path = calculateLocalPath(remote_file_meta_data);
|
||||
auto remote_path = remote_file_metadata->getRemotePath();
|
||||
const auto & last_modification_timestamp = remote_file_metadata->getLastModificationTimestamp();
|
||||
auto local_path = calculateLocalPath(remote_file_metadata);
|
||||
std::lock_guard lock(mutex);
|
||||
auto cache = lru_caches->get(local_path);
|
||||
if (cache)
|
||||
{
|
||||
// the remote file has been updated, need to redownload
|
||||
if (!cache->isValid() || cache->checkFileChanged(remote_file_meta_data))
|
||||
if (!cache->isValid() || cache->checkFileChanged(remote_file_metadata))
|
||||
{
|
||||
LOG_TRACE(
|
||||
log,
|
||||
@ -454,11 +456,11 @@ RemoteReadBufferCache::createReader(ContextPtr context, RemoteFileMetaDataBasePt
|
||||
fs::create_directories(local_path);
|
||||
|
||||
// cache is not found or is invalid
|
||||
auto new_cache = std::make_shared<RemoteCacheController>(remote_file_meta_data, local_path, local_cache_bytes_read_before_flush);
|
||||
auto new_cache = std::make_shared<RemoteCacheController>(remote_file_metadata, local_path, local_cache_bytes_read_before_flush);
|
||||
if (!lru_caches->set(local_path, new_cache))
|
||||
{
|
||||
LOG_ERROR(log, "Insert the new cache failed. new file size:{}, current total size:{}",
|
||||
remote_file_meta_data->getFileSize(),
|
||||
remote_file_metadata->getFileSize(),
|
||||
lru_caches->weight());
|
||||
return {nullptr, std::move(read_buffer), RemoteReadBufferCacheError::DISK_FULL};
|
||||
}
|
||||
|
@ -17,7 +17,7 @@
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
#include <IO/SeekableReadBuffer.h>
|
||||
#include <IO/RemoteFileMetaDataBase.h>
|
||||
#include <IO/IRemoteFileMetadata.h>
|
||||
#include <condition_variable>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
@ -44,7 +44,7 @@ public:
|
||||
};
|
||||
|
||||
RemoteCacheController(
|
||||
RemoteFileMetaDataBasePtr file_meta_data_,
|
||||
IRemoteFileMetadataPtr file_metadata_,
|
||||
const std::filesystem::path & local_path_,
|
||||
size_t cache_bytes_before_flush_);
|
||||
~RemoteCacheController();
|
||||
@ -82,10 +82,10 @@ public:
|
||||
inline size_t size() const { return current_offset; }
|
||||
|
||||
inline const std::filesystem::path & getLocalPath() { return local_path; }
|
||||
inline String getRemotePath() const { return file_meta_data_ptr->getRemotePath(); }
|
||||
inline String getRemotePath() const { return file_metadata_ptr->getRemotePath(); }
|
||||
|
||||
inline UInt64 getLastModificationTimestamp() const { return file_meta_data_ptr->getLastModificationTimestamp(); }
|
||||
bool checkFileChanged(RemoteFileMetaDataBasePtr file_meta_data_);
|
||||
inline UInt64 getLastModificationTimestamp() const { return file_metadata_ptr->getLastModificationTimestamp(); }
|
||||
bool checkFileChanged(IRemoteFileMetadataPtr file_metadata_);
|
||||
inline void markInvalid()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
@ -96,10 +96,10 @@ public:
|
||||
std::lock_guard lock(mutex);
|
||||
return valid;
|
||||
}
|
||||
RemoteFileMetaDataBasePtr getFileMetaData() { return file_meta_data_ptr; }
|
||||
inline size_t getFileSize() const { return file_meta_data_ptr->getFileSize(); }
|
||||
IRemoteFileMetadataPtr getFileMetadata() { return file_metadata_ptr; }
|
||||
inline size_t getFileSize() const { return file_metadata_ptr->getFileSize(); }
|
||||
|
||||
void startBackgroundDownload(std::shared_ptr<ReadBuffer> input_readbuffer, BackgroundSchedulePool & thread_pool);
|
||||
void startBackgroundDownload(std::unique_ptr<ReadBuffer> in_readbuffer_, BackgroundSchedulePool & thread_pool);
|
||||
|
||||
private:
|
||||
// flush file and status information
|
||||
@ -107,16 +107,16 @@ private:
|
||||
bool loadInnerInformation(const std::filesystem::path & file_path);
|
||||
|
||||
BackgroundSchedulePool::TaskHolder download_task_holder;
|
||||
void backgroundDownload(std::shared_ptr<ReadBuffer> remote_read_buffer);
|
||||
void backgroundDownload(ReadBufferPtr remote_read_buffer);
|
||||
|
||||
std::mutex mutex;
|
||||
std::condition_variable more_data_signal;
|
||||
|
||||
std::set<uintptr_t> opened_file_buffer_refs; // refer to a buffer address
|
||||
|
||||
String meta_data_class;
|
||||
String metadata_class;
|
||||
LocalFileStatus file_status = TO_DOWNLOAD; // for tracking download process
|
||||
RemoteFileMetaDataBasePtr file_meta_data_ptr;
|
||||
IRemoteFileMetadataPtr file_metadata_ptr;
|
||||
std::filesystem::path local_path;
|
||||
|
||||
bool valid;
|
||||
@ -139,7 +139,7 @@ class RemoteReadBuffer : public BufferWithOwnMemory<SeekableReadBufferWithSize>
|
||||
public:
|
||||
explicit RemoteReadBuffer(size_t buff_size);
|
||||
~RemoteReadBuffer() override;
|
||||
static std::unique_ptr<ReadBuffer> create(ContextPtr contex, RemoteFileMetaDataBasePtr remote_file_meta_data, std::unique_ptr<ReadBuffer> read_buffer);
|
||||
static std::unique_ptr<ReadBuffer> create(ContextPtr contex, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> read_buffer);
|
||||
|
||||
bool nextImpl() override;
|
||||
off_t seek(off_t off, int whence) override;
|
||||
@ -188,7 +188,7 @@ public:
|
||||
inline bool isInitialized() const { return initialized; }
|
||||
|
||||
std::tuple<RemoteCacheControllerPtr, std::unique_ptr<ReadBuffer>, RemoteReadBufferCacheError>
|
||||
createReader(ContextPtr context, RemoteFileMetaDataBasePtr remote_file_meta_data, std::unique_ptr<ReadBuffer> & read_buffer);
|
||||
createReader(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> & read_buffer);
|
||||
|
||||
void updateTotalSize(size_t size) { total_size += size; }
|
||||
|
||||
@ -207,11 +207,11 @@ private:
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("RemoteReadBufferCache");
|
||||
|
||||
String calculateLocalPath(RemoteFileMetaDataBasePtr meta) const;
|
||||
String calculateLocalPath(IRemoteFileMetadataPtr meta) const;
|
||||
|
||||
BackgroundSchedulePool::TaskHolder recover_task_holder;
|
||||
void recoverTask();
|
||||
void recoverCachedFilesMetaData(
|
||||
void recoverCachedFilesMetadata(
|
||||
const std::filesystem::path & current_path,
|
||||
size_t current_depth,
|
||||
size_t max_depth);
|
||||
|
@ -1,23 +0,0 @@
|
||||
#pragma once
|
||||
#include <IO/RemoteFileMetaDataBase.h>
|
||||
namespace DB
|
||||
{
|
||||
class HiveFileMetaData : public RemoteFileMetaDataBase
|
||||
{
|
||||
public:
|
||||
HiveFileMetaData() = default;
|
||||
HiveFileMetaData(const String & schema_,
|
||||
const String & cluster_,
|
||||
const String & remote_path_,
|
||||
size_t file_size_,
|
||||
UInt64 last_modification_timestamp_):
|
||||
RemoteFileMetaDataBase(schema_, cluster_, remote_path_, file_size_, last_modification_timestamp_){}
|
||||
~HiveFileMetaData() override;
|
||||
|
||||
String getName() const override { return "HiveFileMetaData"; }
|
||||
|
||||
String toString() const override;
|
||||
bool fromString(const String &buf) override;
|
||||
String getVersion() const override;
|
||||
};
|
||||
}
|
@ -38,7 +38,7 @@
|
||||
#include <Storages/Hive/HiveFile.h>
|
||||
#include <Storages/Hive/HiveSettings.h>
|
||||
#include <Storages/Hive/HiveCommon.h>
|
||||
#include <Storages/Hive/HiveFileMetaData.h>
|
||||
#include <Storages/Hive/StorageHiveMetadata.h>
|
||||
#include <Storages/MergeTree/KeyCondition.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
|
||||
@ -171,7 +171,7 @@ public:
|
||||
std::unique_ptr<ReadBuffer> remote_read_buf;
|
||||
if (RemoteReadBufferCache::instance().isInitialized() && getContext()->getSettingsRef().use_local_cache_for_remote_fs)
|
||||
remote_read_buf = RemoteReadBuffer::create(getContext(),
|
||||
std::make_shared<HiveFileMetaData>("Hive", getNameNodeCluster(hdfs_namenode_url), uri_with_path, curr_file->getSize(), curr_file->getLastModTs()),
|
||||
std::make_shared<StorageHiveMetadata>("Hive", getNameNodeCluster(hdfs_namenode_url), uri_with_path, curr_file->getSize(), curr_file->getLastModTs()),
|
||||
std::move(raw_read_buf));
|
||||
else
|
||||
remote_read_buf = std::move(raw_read_buf);
|
||||
|
@ -1,13 +1,13 @@
|
||||
#include <Storages/Hive/HiveFileMetaData.h>
|
||||
#include <Storages/Hive/StorageHiveMetadata.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Poco/JSON/JSON.h>
|
||||
#include <Poco/JSON/Parser.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
HiveFileMetaData::~HiveFileMetaData() = default;
|
||||
StorageHiveMetadata::~StorageHiveMetadata() = default;
|
||||
|
||||
String HiveFileMetaData::toString() const
|
||||
String StorageHiveMetadata::toString() const
|
||||
{
|
||||
Poco::JSON::Object jobj;
|
||||
jobj.set("schema", schema);
|
||||
@ -21,7 +21,7 @@ String HiveFileMetaData::toString() const
|
||||
|
||||
}
|
||||
|
||||
bool HiveFileMetaData::fromString(const String &buf)
|
||||
bool StorageHiveMetadata::fromString(const String &buf)
|
||||
{
|
||||
std::stringstream istream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
istream << buf;
|
||||
@ -35,11 +35,11 @@ bool HiveFileMetaData::fromString(const String &buf)
|
||||
return true;
|
||||
}
|
||||
|
||||
String HiveFileMetaData::getVersion() const
|
||||
String StorageHiveMetadata::getVersion() const
|
||||
{
|
||||
return std::to_string(getLastModificationTimestamp());
|
||||
}
|
||||
|
||||
REGISTTER_REMOTE_FILE_META_DATA_CLASS(HiveFileMetaData)
|
||||
REGISTTER_REMOTE_FILE_META_DATA_CLASS(StorageHiveMetadata)
|
||||
|
||||
}
|
28
src/Storages/Hive/StorageHiveMetadata.h
Normal file
28
src/Storages/Hive/StorageHiveMetadata.h
Normal file
@ -0,0 +1,28 @@
|
||||
#pragma once
|
||||
#include <IO/IRemoteFileMetadata.h>
|
||||
namespace DB
|
||||
{
|
||||
class StorageHiveMetadata : public IRemoteFileMetadata
|
||||
{
|
||||
public:
|
||||
StorageHiveMetadata() = default;
|
||||
StorageHiveMetadata(const String & schema_,
|
||||
const String & cluster_,
|
||||
const String & remote_path_,
|
||||
size_t file_size_,
|
||||
UInt64 last_modification_timestamp_):
|
||||
IRemoteFileMetadata(remote_path_, file_size_, last_modification_timestamp_),schema(schema_), cluster(cluster_){}
|
||||
~StorageHiveMetadata() override;
|
||||
|
||||
String getName() const override { return "StorageHiveMetadata"; }
|
||||
String getSchema() const { return schema; }
|
||||
String getCluster() const { return cluster; }
|
||||
|
||||
String toString() const override;
|
||||
bool fromString(const String &buf) override;
|
||||
String getVersion() const override;
|
||||
private:
|
||||
String schema;
|
||||
String cluster;
|
||||
};
|
||||
}
|
Loading…
Reference in New Issue
Block a user