mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-14 02:12:21 +00:00
coding refinement
1. make remote file meta data be a abstract class 2. make cache controller download process be started by RemoteReadBufferCache
This commit is contained in:
parent
c678c8101e
commit
01940c3f01
35
src/IO/RemoteFileMetaDataBase.cpp
Normal file
35
src/IO/RemoteFileMetaDataBase.cpp
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
#include <IO/RemoteFileMetaDataBase.h>
|
||||||
|
#include <Common/Exception.h>
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
RemoteFileMetaDataBase::~RemoteFileMetaDataBase() {}
|
||||||
|
|
||||||
|
RemoteFileMetaDataFactory & RemoteFileMetaDataFactory::instance()
|
||||||
|
{
|
||||||
|
static RemoteFileMetaDataFactory g_factory;
|
||||||
|
return g_factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
RemoteFileMetaDataBasePtr RemoteFileMetaDataFactory::create_class(const String & class_name)
|
||||||
|
{
|
||||||
|
auto it = class_creators.find(class_name);
|
||||||
|
if (it == class_creators.end())
|
||||||
|
return nullptr;
|
||||||
|
return (it->second)();
|
||||||
|
}
|
||||||
|
|
||||||
|
void RemoteFileMetaDataFactory::register_class(const String & class_name, ClassCreator creator)
|
||||||
|
{
|
||||||
|
auto it = class_creators.find(class_name);
|
||||||
|
if (it != class_creators.end())
|
||||||
|
{
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Class ({}) has been registered. It is a fatal error.", class_name);
|
||||||
|
}
|
||||||
|
class_creators[class_name] = creator;
|
||||||
|
}
|
||||||
|
}
|
87
src/IO/RemoteFileMetaDataBase.h
Normal file
87
src/IO/RemoteFileMetaDataBase.h
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
#pragma once
|
||||||
|
#include <memory>
|
||||||
|
#include <base/types.h>
|
||||||
|
#include <functional>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <boost/core/noncopyable.hpp>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
class RemoteFileMetaDataBase
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
RemoteFileMetaDataBase() = default;
|
||||||
|
RemoteFileMetaDataBase(const String & schema_,
|
||||||
|
const String & cluster_,
|
||||||
|
const String & remote_path_,
|
||||||
|
size_t file_size_,
|
||||||
|
UInt64 last_modification_timestamp_):
|
||||||
|
schema(schema_)
|
||||||
|
,cluster(cluster_)
|
||||||
|
,remote_path(remote_path_)
|
||||||
|
,file_size(file_size_)
|
||||||
|
,last_modification_timestamp(last_modification_timestamp_)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
virtual ~RemoteFileMetaDataBase();
|
||||||
|
virtual String getClassName() = 0; //class name
|
||||||
|
// methods for basic information
|
||||||
|
inline String getSchema() { return schema; }
|
||||||
|
inline String getCluster() { return cluster; }
|
||||||
|
inline size_t getFileSize() { return file_size; }
|
||||||
|
inline String getRemotePath() { return remote_path; }
|
||||||
|
inline UInt64 getLastModificationTimestamp() { return last_modification_timestamp; }
|
||||||
|
// create a new object
|
||||||
|
virtual std::shared_ptr<RemoteFileMetaDataBase> clone() = 0;
|
||||||
|
|
||||||
|
// deserialize
|
||||||
|
virtual bool fromString(const String &buf) = 0;
|
||||||
|
// serialize
|
||||||
|
virtual String toString() = 0;
|
||||||
|
// to compare two meta datas for detecting file changes
|
||||||
|
virtual bool equal(std::shared_ptr<RemoteFileMetaDataBase> b) = 0;
|
||||||
|
protected:
|
||||||
|
String schema;
|
||||||
|
String cluster;
|
||||||
|
String remote_path;
|
||||||
|
size_t file_size = 0;
|
||||||
|
UInt64 last_modification_timestamp = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
using RemoteFileMetaDataBasePtr = std::shared_ptr<RemoteFileMetaDataBase>;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* How to register a subclass into factory and use it ?
|
||||||
|
* 1) define your own subclass derive from RemoteFileMetaDataBase. Notice! the getClassName() must be the same
|
||||||
|
as your subclass name.
|
||||||
|
* 2) in a .cpp file, call REGISTTER_REMOTE_FILE_META_DATA_CLASS(subclass).
|
||||||
|
3) call RemoteFileMetaDataFactory::instance().create_class(subclass_name) where you want to make a new object
|
||||||
|
*/
|
||||||
|
|
||||||
|
class RemoteFileMetaDataFactory : private boost::noncopyable
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
using ClassCreator = std::function<RemoteFileMetaDataBasePtr()>;
|
||||||
|
~RemoteFileMetaDataFactory() = default;
|
||||||
|
|
||||||
|
static RemoteFileMetaDataFactory & instance();
|
||||||
|
RemoteFileMetaDataBasePtr create_class(const String & class_name);
|
||||||
|
void register_class(const String &class_name, ClassCreator creator);
|
||||||
|
protected:
|
||||||
|
RemoteFileMetaDataFactory() = default;
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::unordered_map<String, ClassCreator> class_creators;
|
||||||
|
};
|
||||||
|
|
||||||
|
// this should be used in a .cpp file. All the subclasses will finish the registeration before the main()
|
||||||
|
#define REGISTTER_REMOTE_FILE_META_DATA_CLASS(meta_data_class) \
|
||||||
|
class FileMetaDataFactory##meta_data_class{\
|
||||||
|
public:\
|
||||||
|
FileMetaDataFactory##meta_data_class(){\
|
||||||
|
auto creator = []() -> RemoteFileMetaDataBasePtr { return std::make_shared<meta_data_class>(); };\
|
||||||
|
RemoteFileMetaDataFactory::instance().register_class(#meta_data_class, creator);\
|
||||||
|
}\
|
||||||
|
};\
|
||||||
|
static FileMetaDataFactory##meta_data_class g_file_meta_data_factory_instance##meta_data_class;
|
||||||
|
}
|
@ -26,104 +26,85 @@ namespace ErrorCodes
|
|||||||
extern const int CANNOT_CREATE_DIRECTORY;
|
extern const int CANNOT_CREATE_DIRECTORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RemoteFileMetadata::load(const std::filesystem::path & local_path)
|
bool RemoteCacheController::loadInnerInformation(const fs::path & file_path)
|
||||||
{
|
{
|
||||||
auto * log = &Poco::Logger::get("RemoteFileMetadata");
|
if (!fs::exists(file_path))
|
||||||
if (!std::filesystem::exists(local_path))
|
|
||||||
{
|
|
||||||
LOG_ERROR(log, "file path not exists:{}", local_path.string());
|
|
||||||
return false;
|
return false;
|
||||||
}
|
std::ifstream info_file(file_path);
|
||||||
std::ifstream meta_fs(local_path.string());
|
Poco::JSON::Parser info_parser;
|
||||||
Poco::JSON::Parser meta_data_parser;
|
auto info_jobj = info_parser.parse(info_file).extract<Poco::JSON::Object::Ptr>();
|
||||||
auto meta_data_jobj = meta_data_parser.parse(meta_fs).extract<Poco::JSON::Object::Ptr>();
|
file_status = static_cast<LocalFileStatus>(info_jobj->get("file_status").convert<Int32>());
|
||||||
remote_path = meta_data_jobj->get("remote_path").convert<String>();
|
meta_data_class = info_jobj->get("meta_data_class").convert<String>();
|
||||||
schema = meta_data_jobj->get("schema").convert<String>();
|
info_file.close();
|
||||||
cluster = meta_data_jobj->get("cluster").convert<String>();
|
|
||||||
status = static_cast<LocalStatus>(meta_data_jobj->get("status").convert<Int32>());
|
|
||||||
last_modification_timestamp = meta_data_jobj->get("last_modification_timestamp").convert<UInt64>();
|
|
||||||
file_size = meta_data_jobj->get("file_size").convert<UInt64>();
|
|
||||||
meta_fs.close();
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RemoteFileMetadata::save(const std::filesystem::path & local_path) const
|
|
||||||
{
|
|
||||||
std::ofstream meta_file(local_path.string(), std::ios::out);
|
|
||||||
meta_file << toString();
|
|
||||||
meta_file.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
String RemoteFileMetadata::toString() const
|
|
||||||
{
|
|
||||||
Poco::JSON::Object jobj;
|
|
||||||
jobj.set("schema", schema);
|
|
||||||
jobj.set("cluster", cluster);
|
|
||||||
jobj.set("remote_path", remote_path);
|
|
||||||
jobj.set("status", static_cast<Int32>(status));
|
|
||||||
jobj.set("last_modification_timestamp", last_modification_timestamp);
|
|
||||||
jobj.set("file_size", file_size);
|
|
||||||
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
|
||||||
jobj.stringify(buf);
|
|
||||||
return buf.str();
|
|
||||||
}
|
|
||||||
|
|
||||||
std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(const std::filesystem::path & local_path_)
|
std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(const std::filesystem::path & local_path_)
|
||||||
{
|
{
|
||||||
auto * log = &Poco::Logger::get("RemoteCacheController");
|
auto * log = &Poco::Logger::get("RemoteCacheController");
|
||||||
|
|
||||||
if (!std::filesystem::exists(local_path_) || !std::filesystem::exists(local_path_ / "data.bin"))
|
if (!std::filesystem::exists(local_path_ / "data.bin"))
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Invalid cached directory:{}", local_path_.string());
|
LOG_TRACE(log, "Invalid cached directory:{}", local_path_.string());
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
RemoteFileMetadata remote_file_meta_data;
|
auto cache_controller = std::make_shared<RemoteCacheController>(nullptr, local_path_, 0);
|
||||||
if (!remote_file_meta_data.load(local_path_ / "meta.txt") || remote_file_meta_data.status != RemoteFileMetadata::DOWNLOADED)
|
if (!cache_controller->loadInnerInformation(local_path_ / "info.txt")
|
||||||
|
|| cache_controller->file_status != DOWNLOADED)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "recover cached file failed. local path:{}, file meta data:{}", local_path_.string(), remote_file_meta_data.toString());
|
LOG_INFO(log, "recover cached file failed. local path:{}", local_path_.string());
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto cache_controller = std::make_shared<RemoteCacheController>(nullptr, remote_file_meta_data, local_path_, 0, nullptr);
|
cache_controller->file_meta_data_ptr = RemoteFileMetaDataFactory::instance().create_class(cache_controller->meta_data_class);
|
||||||
cache_controller->current_offset = remote_file_meta_data.file_size;
|
if (!cache_controller->file_meta_data_ptr)
|
||||||
|
{
|
||||||
|
// do not load this invalid cached file and clear it
|
||||||
|
LOG_ERROR(log, "Cannot create the meta data class : {}. The cached file is invalid and will be remove. path:{}",
|
||||||
|
cache_controller->meta_data_class,
|
||||||
|
local_path_.string());
|
||||||
|
fs::remove_all(local_path_);
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
std::ifstream meta_data_file(local_path_ / "meta_data.txt");
|
||||||
|
if (!cache_controller->file_meta_data_ptr->fromString(std::string((std::istreambuf_iterator<char>(meta_data_file)),
|
||||||
|
std::istreambuf_iterator<char>())))
|
||||||
|
{
|
||||||
|
LOG_ERROR(log, "Cannot load the meta data. The cached file is invalid and will be remove. path:{}",
|
||||||
|
local_path_.string());
|
||||||
|
fs::remove_all(local_path_);
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
RemoteReadBufferCache::instance().updateTotalSize(cache_controller->file_meta_data.file_size);
|
cache_controller->current_offset = fs::file_size(local_path_ / "data.bin");
|
||||||
|
|
||||||
|
RemoteReadBufferCache::instance().updateTotalSize(cache_controller->file_meta_data_ptr->getFileSize());
|
||||||
return cache_controller;
|
return cache_controller;
|
||||||
}
|
}
|
||||||
|
|
||||||
RemoteCacheController::RemoteCacheController(
|
RemoteCacheController::RemoteCacheController(
|
||||||
ContextPtr context,
|
RemoteFileMetaDataBasePtr file_meta_data_,
|
||||||
const RemoteFileMetadata & file_meta_data_,
|
|
||||||
const std::filesystem::path & local_path_,
|
const std::filesystem::path & local_path_,
|
||||||
size_t cache_bytes_before_flush_,
|
size_t cache_bytes_before_flush_)
|
||||||
std::shared_ptr<ReadBuffer> read_buffer_)
|
: file_meta_data_ptr(file_meta_data_)
|
||||||
: file_meta_data(file_meta_data_)
|
|
||||||
, local_path(local_path_)
|
, local_path(local_path_)
|
||||||
, valid(true)
|
, valid(true)
|
||||||
, local_cache_bytes_read_before_flush(cache_bytes_before_flush_)
|
, local_cache_bytes_read_before_flush(cache_bytes_before_flush_)
|
||||||
, current_offset(0)
|
, current_offset(0)
|
||||||
, remote_read_buffer(read_buffer_)
|
|
||||||
{
|
{
|
||||||
/// readbuffer == nullptr if `RemoteCacheController` is created in `initOnce`, when metadata and local cache already exist.
|
if (file_meta_data_ptr)
|
||||||
if (remote_read_buffer)
|
|
||||||
{
|
{
|
||||||
// setup local files
|
std::ofstream meta_data_file(local_path_ / "meta_data.txt", std::ios::out);
|
||||||
data_file_writer = std::make_unique<WriteBufferFromFile>((fs::path(local_path_) / "data.bin").string());
|
meta_data_file << file_meta_data_ptr->toString();
|
||||||
data_file_writer->sync();
|
meta_data_file.close();
|
||||||
|
|
||||||
file_meta_data.save(local_path_ / "meta.txt");
|
|
||||||
|
|
||||||
download_task_holder = context->getSchedulePool().createTask("download remote file", [this]{ this->backgroundDownload(); });
|
|
||||||
download_task_holder->activateAndSchedule();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offset_, size_t end_offset_)
|
RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offset_, size_t end_offset_)
|
||||||
{
|
{
|
||||||
std::unique_lock lock{mutex};
|
std::unique_lock lock{mutex};
|
||||||
if (file_meta_data.status == RemoteFileMetadata::DOWNLOADED)
|
if (file_status == DOWNLOADED)
|
||||||
{
|
{
|
||||||
// finish reading
|
// finish reading
|
||||||
if (start_offset_ >= current_offset)
|
if (start_offset_ >= current_offset)
|
||||||
@ -140,15 +121,29 @@ RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offs
|
|||||||
return RemoteReadBufferCacheError::OK;
|
return RemoteReadBufferCacheError::OK;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
more_data_signal.wait(lock, [this, end_offset_] { return file_meta_data.status == RemoteFileMetadata::DOWNLOADED || current_offset >= end_offset_; });
|
more_data_signal.wait(lock, [this, end_offset_] { return file_status == DOWNLOADED || current_offset >= end_offset_; });
|
||||||
}
|
}
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
return RemoteReadBufferCacheError::OK;
|
return RemoteReadBufferCacheError::OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RemoteCacheController::backgroundDownload()
|
bool RemoteCacheController::checkFileChanged(RemoteFileMetaDataBasePtr file_meta_data_)
|
||||||
{
|
{
|
||||||
file_meta_data.status = RemoteFileMetadata::DOWNLOADING;
|
return !file_meta_data_ptr->equal(file_meta_data_);
|
||||||
|
}
|
||||||
|
|
||||||
|
void RemoteCacheController::startBackgroundDownload(std::shared_ptr<ReadBuffer> input_readbuffer, BackgroundSchedulePool & thread_pool)
|
||||||
|
{
|
||||||
|
data_file_writer = std::make_unique<WriteBufferFromFile>((fs::path(local_path) / "data.bin").string());
|
||||||
|
flush(true);
|
||||||
|
download_task_holder = thread_pool.createTask("download remote file",
|
||||||
|
[this,input_readbuffer]{ backgroundDownload(input_readbuffer); });
|
||||||
|
download_task_holder->activateAndSchedule();
|
||||||
|
}
|
||||||
|
|
||||||
|
void RemoteCacheController::backgroundDownload(std::shared_ptr<ReadBuffer> remote_read_buffer)
|
||||||
|
{
|
||||||
|
file_status = DOWNLOADING;
|
||||||
size_t before_unflush_bytes = 0;
|
size_t before_unflush_bytes = 0;
|
||||||
size_t total_bytes = 0;
|
size_t total_bytes = 0;
|
||||||
while (!remote_read_buffer->eof())
|
while (!remote_read_buffer->eof())
|
||||||
@ -172,27 +167,33 @@ void RemoteCacheController::backgroundDownload()
|
|||||||
}
|
}
|
||||||
std::unique_lock lock(mutex);
|
std::unique_lock lock(mutex);
|
||||||
current_offset += total_bytes;
|
current_offset += total_bytes;
|
||||||
file_meta_data.status = RemoteFileMetadata::DOWNLOADED;
|
file_status = DOWNLOADED;
|
||||||
flush(true);
|
flush(true);
|
||||||
data_file_writer.reset();
|
data_file_writer.reset();
|
||||||
remote_read_buffer.reset();
|
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
more_data_signal.notify_all();
|
more_data_signal.notify_all();
|
||||||
RemoteReadBufferCache::instance().updateTotalSize(file_meta_data.file_size);
|
RemoteReadBufferCache::instance().updateTotalSize(file_meta_data_ptr->getFileSize());
|
||||||
LOG_TRACE(log, "Finish download into local path: {}, file meta data:{} ", local_path.string(), file_meta_data.toString());
|
LOG_TRACE(log, "Finish download into local path: {}, file meta data:{} ", local_path.string(), file_meta_data_ptr->toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
void RemoteCacheController::flush(bool need_flush_meta_data_)
|
void RemoteCacheController::flush(bool need_flush_status)
|
||||||
{
|
{
|
||||||
if (data_file_writer)
|
if (data_file_writer)
|
||||||
{
|
{
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("RemoteCacheController"),"flush file. offset:{}, file:{}. total_size:{}", current_offset, local_path.string(), file_meta_data_ptr->getFileSize());
|
||||||
data_file_writer->sync();
|
data_file_writer->sync();
|
||||||
}
|
}
|
||||||
|
if (need_flush_status)
|
||||||
if (!need_flush_meta_data_)
|
{
|
||||||
return;
|
Poco::JSON::Object jobj;
|
||||||
|
jobj.set("file_status", static_cast<Int32>(file_status));
|
||||||
file_meta_data.save(local_path / "meta.txt");
|
jobj.set("meta_data_class", meta_data_class);
|
||||||
|
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||||
|
jobj.stringify(buf);
|
||||||
|
std::ofstream info_file(local_path / "info.txt");
|
||||||
|
info_file << buf.str();
|
||||||
|
info_file.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
RemoteCacheController::~RemoteCacheController() = default;
|
RemoteCacheController::~RemoteCacheController() = default;
|
||||||
@ -200,7 +201,7 @@ RemoteCacheController::~RemoteCacheController() = default;
|
|||||||
void RemoteCacheController::close()
|
void RemoteCacheController::close()
|
||||||
{
|
{
|
||||||
// delete directory
|
// delete directory
|
||||||
LOG_TRACE(log, "Removing the local cache. local path: {}, file meta data:{}", local_path.string(), file_meta_data.toString());
|
LOG_TRACE(log, "Removing the local cache. local path: {}", local_path.string());
|
||||||
std::filesystem::remove_all(local_path);
|
std::filesystem::remove_all(local_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -233,7 +234,7 @@ void RemoteCacheController::deallocFile(std::unique_ptr<ReadBufferFromFileBase>
|
|||||||
throw Exception(
|
throw Exception(
|
||||||
ErrorCodes::BAD_ARGUMENTS,
|
ErrorCodes::BAD_ARGUMENTS,
|
||||||
"Try to deallocate file with invalid handler remote path: {}, local path: {}",
|
"Try to deallocate file with invalid handler remote path: {}, local path: {}",
|
||||||
file_meta_data.remote_path,
|
file_meta_data_ptr->getRemotePath(),
|
||||||
local_path.string());
|
local_path.string());
|
||||||
}
|
}
|
||||||
opened_file_buffer_refs.erase(it);
|
opened_file_buffer_refs.erase(it);
|
||||||
@ -249,7 +250,7 @@ RemoteReadBuffer::~RemoteReadBuffer()
|
|||||||
file_cache_controller->deallocFile(std::move(file_buffer));
|
file_cache_controller->deallocFile(std::move(file_buffer));
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(ContextPtr context, const RemoteFileMetadata & remote_file_meta, std::unique_ptr<ReadBuffer> read_buffer)
|
std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(ContextPtr context, RemoteFileMetaDataBasePtr remote_file_meta_data, std::unique_ptr<ReadBuffer> read_buffer)
|
||||||
{
|
{
|
||||||
auto * log = &Poco::Logger::get("RemoteReadBuffer");
|
auto * log = &Poco::Logger::get("RemoteReadBuffer");
|
||||||
size_t buff_size = DBMS_DEFAULT_BUFFER_SIZE;
|
size_t buff_size = DBMS_DEFAULT_BUFFER_SIZE;
|
||||||
@ -266,13 +267,13 @@ std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(ContextPtr context, c
|
|||||||
if (buff_size == 0)
|
if (buff_size == 0)
|
||||||
buff_size = DBMS_DEFAULT_BUFFER_SIZE;
|
buff_size = DBMS_DEFAULT_BUFFER_SIZE;
|
||||||
|
|
||||||
const auto & remote_path = remote_file_meta.remote_path;
|
auto remote_path = remote_file_meta_data->getRemotePath();
|
||||||
auto remote_read_buffer = std::make_unique<RemoteReadBuffer>(buff_size);
|
auto remote_read_buffer = std::make_unique<RemoteReadBuffer>(buff_size);
|
||||||
auto * raw_readbuffer_ptr = read_buffer.release();
|
auto * raw_readbuffer_ptr = read_buffer.release();
|
||||||
std::shared_ptr<ReadBuffer> shared_readbuffer_ptr(raw_readbuffer_ptr);
|
std::shared_ptr<ReadBuffer> shared_readbuffer_ptr(raw_readbuffer_ptr);
|
||||||
RemoteReadBufferCacheError error;
|
RemoteReadBufferCacheError error;
|
||||||
|
|
||||||
std::tie(remote_read_buffer->file_cache_controller, error) = RemoteReadBufferCache::instance().createReader(context, remote_file_meta, shared_readbuffer_ptr);
|
std::tie(remote_read_buffer->file_cache_controller, error) = RemoteReadBufferCache::instance().createReader(context, remote_file_meta_data, shared_readbuffer_ptr);
|
||||||
if (remote_read_buffer->file_cache_controller == nullptr)
|
if (remote_read_buffer->file_cache_controller == nullptr)
|
||||||
{
|
{
|
||||||
LOG_ERROR(log, "Failed to allocate local file for remote path: {}, reason: {}", remote_path, error);
|
LOG_ERROR(log, "Failed to allocate local file for remote path: {}, reason: {}", remote_path, error);
|
||||||
@ -293,7 +294,10 @@ bool RemoteReadBuffer::nextImpl()
|
|||||||
if (file_buffer)
|
if (file_buffer)
|
||||||
{
|
{
|
||||||
auto start_offset = file_buffer->getPosition();
|
auto start_offset = file_buffer->getPosition();
|
||||||
auto end_offset = file_buffer->internalBuffer().size();
|
auto end_offset = start_offset + file_buffer->internalBuffer().size();
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("RemoteReadBuffer"), "nextImpl. start:{}, end:{}, file:{}, total_size:{}, remote_path:{}",
|
||||||
|
start_offset, end_offset, file_buffer->getFileName(), file_cache_controller->getFileMetaData()->getFileSize(),
|
||||||
|
file_cache_controller->getFileMetaData()->getRemotePath());
|
||||||
file_cache_controller->waitMoreData(start_offset, end_offset);
|
file_cache_controller->waitMoreData(start_offset, end_offset);
|
||||||
|
|
||||||
auto status = file_buffer->next();
|
auto status = file_buffer->next();
|
||||||
@ -322,9 +326,11 @@ off_t RemoteReadBuffer::seek(off_t offset, int whence)
|
|||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Need to wait here. For example, the current file has been download at position X, but here we try to seek to
|
* Need to wait here. For example, the current file has been download at position X, but here we try to seek to
|
||||||
* postition Y ( Y > X), it would fail.
|
* postition Y (Y > X), it would fail.
|
||||||
*/
|
*/
|
||||||
file_cache_controller->waitMoreData(offset, offset + file_buffer->internalBuffer().size());
|
file_cache_controller->waitMoreData(offset, offset + file_buffer->internalBuffer().size());
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("RemoteReadBuffer"), "seek. offset:{}. file:{}, total_size:{}", offset, file_buffer->getFileName(),
|
||||||
|
file_cache_controller->getFileMetaData()->getFileSize());
|
||||||
auto ret = file_buffer->seek(offset, whence);
|
auto ret = file_buffer->seek(offset, whence);
|
||||||
BufferBase::set(file_buffer->buffer().begin(),
|
BufferBase::set(file_buffer->buffer().begin(),
|
||||||
file_buffer->buffer().size(),
|
file_buffer->buffer().size(),
|
||||||
@ -405,18 +411,18 @@ void RemoteReadBufferCache::initOnce(
|
|||||||
recover_task_holder->activateAndSchedule();
|
recover_task_holder->activateAndSchedule();
|
||||||
}
|
}
|
||||||
|
|
||||||
String RemoteReadBufferCache::calculateLocalPath(const RemoteFileMetadata & meta) const
|
String RemoteReadBufferCache::calculateLocalPath(RemoteFileMetaDataBasePtr meta_data) const
|
||||||
{
|
{
|
||||||
String full_path = meta.schema + ":" + meta.cluster + ":" + meta.remote_path;
|
String full_path = meta_data->getSchema() + ":" + meta_data->getCluster() + ":" + meta_data->getRemotePath();
|
||||||
UInt128 hashcode = sipHash128(full_path.c_str(), full_path.size());
|
UInt128 hashcode = sipHash128(full_path.c_str(), full_path.size());
|
||||||
String hashcode_str = getHexUIntLowercase(hashcode);
|
String hashcode_str = getHexUIntLowercase(hashcode);
|
||||||
return fs::path(root_dir) / hashcode_str.substr(0, 3) / hashcode_str;
|
return fs::path(root_dir) / hashcode_str.substr(0, 3) / hashcode_str;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::pair<RemoteCacheControllerPtr, RemoteReadBufferCacheError>
|
std::pair<RemoteCacheControllerPtr, RemoteReadBufferCacheError>
|
||||||
RemoteReadBufferCache::createReader(ContextPtr context, const RemoteFileMetadata & remote_file_meta, std::shared_ptr<ReadBuffer> & read_buffer)
|
RemoteReadBufferCache::createReader(ContextPtr context, RemoteFileMetaDataBasePtr remote_file_meta_data, std::shared_ptr<ReadBuffer> & read_buffer)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "createReader. {} {} {}", remote_file_meta.remote_path, remote_file_meta.last_modification_timestamp, remote_file_meta.file_size);
|
LOG_TRACE(log, "createReader. {}", remote_file_meta_data->toString());
|
||||||
// If something is wrong on startup, rollback to read from the original ReadBuffer
|
// If something is wrong on startup, rollback to read from the original ReadBuffer
|
||||||
if (!isInitialized())
|
if (!isInitialized())
|
||||||
{
|
{
|
||||||
@ -424,15 +430,15 @@ RemoteReadBufferCache::createReader(ContextPtr context, const RemoteFileMetadata
|
|||||||
return {nullptr, RemoteReadBufferCacheError::NOT_INIT};
|
return {nullptr, RemoteReadBufferCacheError::NOT_INIT};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto remote_path = remote_file_meta.remote_path;
|
auto remote_path = remote_file_meta_data->getRemotePath();
|
||||||
const auto & last_modification_timestamp = remote_file_meta.last_modification_timestamp;
|
const auto & last_modification_timestamp = remote_file_meta_data->getLastModificationTimestamp();
|
||||||
auto local_path = calculateLocalPath(remote_file_meta);
|
auto local_path = calculateLocalPath(remote_file_meta_data);
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
auto cache_iter = caches.find(local_path);
|
auto cache_iter = caches.find(local_path);
|
||||||
if (cache_iter != caches.end())
|
if (cache_iter != caches.end())
|
||||||
{
|
{
|
||||||
// if the file has been update on remote side, we need to redownload it
|
// if the file has been update on remote side, we need to redownload it
|
||||||
if (cache_iter->second.cache_controller->getLastModificationTimestamp() != last_modification_timestamp)
|
if (cache_iter->second.cache_controller->checkFileChanged(remote_file_meta_data))
|
||||||
{
|
{
|
||||||
LOG_TRACE(
|
LOG_TRACE(
|
||||||
log,
|
log,
|
||||||
@ -479,9 +485,9 @@ RemoteReadBufferCache::createReader(ContextPtr context, const RemoteFileMetadata
|
|||||||
|
|
||||||
fs::create_directories(local_path);
|
fs::create_directories(local_path);
|
||||||
|
|
||||||
// pass a session context into RemoteCacheController is not a good idea
|
|
||||||
auto cache_controller
|
auto cache_controller
|
||||||
= std::make_shared<RemoteCacheController>(context->getGlobalContext(), remote_file_meta, local_path, local_cache_bytes_read_before_flush, read_buffer);
|
= std::make_shared<RemoteCacheController>(remote_file_meta_data, local_path, local_cache_bytes_read_before_flush);
|
||||||
|
cache_controller->startBackgroundDownload(read_buffer, context->getSchedulePool());
|
||||||
CacheCell cache_cell;
|
CacheCell cache_cell;
|
||||||
cache_cell.cache_controller = cache_controller;
|
cache_cell.cache_controller = cache_controller;
|
||||||
cache_cell.key_iterator = keys.insert(keys.end(), local_path);
|
cache_cell.key_iterator = keys.insert(keys.end(), local_path);
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
#include <IO/ReadBufferFromFileBase.h>
|
#include <IO/ReadBufferFromFileBase.h>
|
||||||
#include <IO/ReadSettings.h>
|
#include <IO/ReadSettings.h>
|
||||||
#include <IO/SeekableReadBuffer.h>
|
#include <IO/SeekableReadBuffer.h>
|
||||||
|
#include <IO/RemoteFileMetaDataBase.h>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
|
|
||||||
@ -31,51 +32,20 @@ enum class RemoteReadBufferCacheError : int8_t
|
|||||||
END_OF_FILE = 20,
|
END_OF_FILE = 20,
|
||||||
};
|
};
|
||||||
|
|
||||||
struct RemoteFileMetadata
|
class RemoteCacheController
|
||||||
{
|
{
|
||||||
enum LocalStatus
|
public:
|
||||||
|
enum LocalFileStatus
|
||||||
{
|
{
|
||||||
TO_DOWNLOAD = 0,
|
TO_DOWNLOAD = 0,
|
||||||
DOWNLOADING = 1,
|
DOWNLOADING = 1,
|
||||||
DOWNLOADED = 2,
|
DOWNLOADED = 2,
|
||||||
};
|
};
|
||||||
RemoteFileMetadata(): last_modification_timestamp(0l), file_size(0), status(TO_DOWNLOAD){}
|
|
||||||
RemoteFileMetadata(
|
|
||||||
const String & schema_,
|
|
||||||
const String & cluster_,
|
|
||||||
const String & path_,
|
|
||||||
UInt64 last_modification_timestamp_,
|
|
||||||
size_t file_size_)
|
|
||||||
: schema(schema_)
|
|
||||||
, cluster(cluster_)
|
|
||||||
, remote_path(path_)
|
|
||||||
, last_modification_timestamp(last_modification_timestamp_)
|
|
||||||
, file_size(file_size_)
|
|
||||||
, status(TO_DOWNLOAD)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
bool load(const std::filesystem::path & local_path);
|
|
||||||
void save(const std::filesystem::path & local_path) const;
|
|
||||||
String toString() const;
|
|
||||||
|
|
||||||
String schema; // Hive, S2 etc.
|
|
||||||
String cluster;
|
|
||||||
String remote_path;
|
|
||||||
UInt64 last_modification_timestamp;
|
|
||||||
size_t file_size;
|
|
||||||
LocalStatus status;
|
|
||||||
};
|
|
||||||
|
|
||||||
class RemoteCacheController
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
RemoteCacheController(
|
RemoteCacheController(
|
||||||
ContextPtr context,
|
RemoteFileMetaDataBasePtr file_meta_data_,
|
||||||
const RemoteFileMetadata & file_meta_data_,
|
|
||||||
const std::filesystem::path & local_path_,
|
const std::filesystem::path & local_path_,
|
||||||
size_t cache_bytes_before_flush_,
|
size_t cache_bytes_before_flush_);
|
||||||
std::shared_ptr<ReadBuffer> read_buffer_);
|
|
||||||
~RemoteCacheController();
|
~RemoteCacheController();
|
||||||
|
|
||||||
// recover from local disk
|
// recover from local disk
|
||||||
@ -97,7 +67,7 @@ public:
|
|||||||
{
|
{
|
||||||
std::lock_guard lock{mutex};
|
std::lock_guard lock{mutex};
|
||||||
//return opened_file_streams.empty() && remote_read_buffer == nullptr;
|
//return opened_file_streams.empty() && remote_read_buffer == nullptr;
|
||||||
return opened_file_buffer_refs.empty() && remote_read_buffer == nullptr;
|
return opened_file_buffer_refs.empty() && file_status == DOWNLOADED;
|
||||||
}
|
}
|
||||||
void close();
|
void close();
|
||||||
|
|
||||||
@ -111,9 +81,10 @@ public:
|
|||||||
inline size_t size() const { return current_offset; }
|
inline size_t size() const { return current_offset; }
|
||||||
|
|
||||||
inline const std::filesystem::path & getLocalPath() { return local_path; }
|
inline const std::filesystem::path & getLocalPath() { return local_path; }
|
||||||
inline const String & getRemotePath() const { return file_meta_data.remote_path; }
|
inline String getRemotePath() const { return file_meta_data_ptr->getRemotePath(); }
|
||||||
|
|
||||||
inline UInt64 getLastModificationTimestamp() const { return file_meta_data.last_modification_timestamp; }
|
inline UInt64 getLastModificationTimestamp() const { return file_meta_data_ptr->getLastModificationTimestamp(); }
|
||||||
|
bool checkFileChanged(RemoteFileMetaDataBasePtr file_meta_data_);
|
||||||
inline void markInvalid()
|
inline void markInvalid()
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
@ -124,29 +95,33 @@ public:
|
|||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
return valid;
|
return valid;
|
||||||
}
|
}
|
||||||
const RemoteFileMetadata & getFileMetaData() { return file_meta_data; }
|
RemoteFileMetaDataBasePtr getFileMetaData() { return file_meta_data_ptr; }
|
||||||
|
|
||||||
|
void startBackgroundDownload(std::shared_ptr<ReadBuffer> input_readbuffer, BackgroundSchedulePool & thread_pool);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// flush file and meta info into disk
|
// flush file and status information
|
||||||
void flush(bool need_flush_meta_data_ = false);
|
void flush(bool need_flush_status = false);
|
||||||
|
bool loadInnerInformation(const std::filesystem::path & file_path);
|
||||||
|
|
||||||
BackgroundSchedulePool::TaskHolder download_task_holder;
|
BackgroundSchedulePool::TaskHolder download_task_holder;
|
||||||
void backgroundDownload();
|
void backgroundDownload(std::shared_ptr<ReadBuffer> remote_read_buffer);
|
||||||
|
|
||||||
std::mutex mutex;
|
std::mutex mutex;
|
||||||
std::condition_variable more_data_signal;
|
std::condition_variable more_data_signal;
|
||||||
|
|
||||||
std::set<uintptr_t> opened_file_buffer_refs; // refer to a buffer address
|
std::set<uintptr_t> opened_file_buffer_refs; // refer to a buffer address
|
||||||
|
|
||||||
// meta info
|
String meta_data_class;
|
||||||
RemoteFileMetadata file_meta_data;
|
LocalFileStatus file_status = TO_DOWNLOAD; // for tracking download process
|
||||||
|
RemoteFileMetaDataBasePtr file_meta_data_ptr;
|
||||||
std::filesystem::path local_path;
|
std::filesystem::path local_path;
|
||||||
|
|
||||||
bool valid;
|
bool valid;
|
||||||
size_t local_cache_bytes_read_before_flush;
|
size_t local_cache_bytes_read_before_flush;
|
||||||
size_t current_offset;
|
size_t current_offset;
|
||||||
|
|
||||||
std::shared_ptr<ReadBuffer> remote_read_buffer;
|
//std::shared_ptr<ReadBuffer> remote_read_buffer;
|
||||||
std::unique_ptr<WriteBufferFromFileBase> data_file_writer;
|
std::unique_ptr<WriteBufferFromFileBase> data_file_writer;
|
||||||
|
|
||||||
Poco::Logger * log = &Poco::Logger::get("RemoteCacheController");
|
Poco::Logger * log = &Poco::Logger::get("RemoteCacheController");
|
||||||
@ -162,13 +137,13 @@ class RemoteReadBuffer : public BufferWithOwnMemory<SeekableReadBufferWithSize>
|
|||||||
public:
|
public:
|
||||||
explicit RemoteReadBuffer(size_t buff_size);
|
explicit RemoteReadBuffer(size_t buff_size);
|
||||||
~RemoteReadBuffer() override;
|
~RemoteReadBuffer() override;
|
||||||
static std::unique_ptr<RemoteReadBuffer> create(ContextPtr contex, const RemoteFileMetadata & remote_file_meta, std::unique_ptr<ReadBuffer> read_buffer);
|
static std::unique_ptr<RemoteReadBuffer> create(ContextPtr contex, RemoteFileMetaDataBasePtr remote_file_meta_data, std::unique_ptr<ReadBuffer> read_buffer);
|
||||||
|
|
||||||
bool nextImpl() override;
|
bool nextImpl() override;
|
||||||
inline bool seekable() { return !file_buffer && file_cache_controller->getFileMetaData().file_size > 0; }
|
inline bool seekable() { return !file_buffer && file_cache_controller->getFileMetaData()->getFileSize() > 0; }
|
||||||
off_t seek(off_t off, int whence) override;
|
off_t seek(off_t off, int whence) override;
|
||||||
off_t getPosition() override;
|
off_t getPosition() override;
|
||||||
std::optional<size_t> getTotalSize() override { return file_cache_controller->getFileMetaData().file_size; }
|
std::optional<size_t> getTotalSize() override { return file_cache_controller->getFileMetaData()->getFileSize(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::shared_ptr<RemoteCacheController> file_cache_controller;
|
std::shared_ptr<RemoteCacheController> file_cache_controller;
|
||||||
@ -190,7 +165,7 @@ public:
|
|||||||
inline bool isInitialized() const { return initialized; }
|
inline bool isInitialized() const { return initialized; }
|
||||||
|
|
||||||
std::pair<RemoteCacheControllerPtr, RemoteReadBufferCacheError>
|
std::pair<RemoteCacheControllerPtr, RemoteReadBufferCacheError>
|
||||||
createReader(ContextPtr context, const RemoteFileMetadata & remote_file_meta, std::shared_ptr<ReadBuffer> & read_buffer);
|
createReader(ContextPtr context, RemoteFileMetaDataBasePtr remote_file_meta_data, std::shared_ptr<ReadBuffer> & read_buffer);
|
||||||
|
|
||||||
void updateTotalSize(size_t size) { total_size += size; }
|
void updateTotalSize(size_t size) { total_size += size; }
|
||||||
|
|
||||||
@ -217,7 +192,7 @@ private:
|
|||||||
std::list<String> keys;
|
std::list<String> keys;
|
||||||
std::map<String, CacheCell> caches;
|
std::map<String, CacheCell> caches;
|
||||||
|
|
||||||
String calculateLocalPath(const RemoteFileMetadata & meta) const;
|
String calculateLocalPath(RemoteFileMetaDataBasePtr meta) const;
|
||||||
|
|
||||||
BackgroundSchedulePool::TaskHolder recover_task_holder;
|
BackgroundSchedulePool::TaskHolder recover_task_holder;
|
||||||
void recoverTask();
|
void recoverTask();
|
||||||
|
53
src/Storages/Hive/HiveFileMetaData.cpp
Normal file
53
src/Storages/Hive/HiveFileMetaData.cpp
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
#include <Storages/Hive/HiveFileMetaData.h>
|
||||||
|
#include <Common/Exception.h>
|
||||||
|
#include <Poco/JSON/JSON.h>
|
||||||
|
#include <Poco/JSON/Parser.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int BAD_ARGUMENTS;
|
||||||
|
}
|
||||||
|
|
||||||
|
HiveFileMetaData::~HiveFileMetaData() = default;
|
||||||
|
|
||||||
|
String HiveFileMetaData::toString()
|
||||||
|
{
|
||||||
|
Poco::JSON::Object jobj;
|
||||||
|
jobj.set("schema", schema);
|
||||||
|
jobj.set("cluster", cluster);
|
||||||
|
jobj.set("remote_path", remote_path);
|
||||||
|
jobj.set("last_modification_timestamp", last_modification_timestamp);
|
||||||
|
jobj.set("file_size", file_size);
|
||||||
|
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||||
|
jobj.stringify(buf);
|
||||||
|
return buf.str();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
bool HiveFileMetaData::fromString(const String &buf)
|
||||||
|
{
|
||||||
|
std::stringstream istream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||||
|
istream << buf;
|
||||||
|
Poco::JSON::Parser parser;
|
||||||
|
auto jobj = parser.parse(istream).extract<Poco::JSON::Object::Ptr>();
|
||||||
|
remote_path = jobj->get("remote_path").convert<String>();
|
||||||
|
schema = jobj->get("schema").convert<String>();
|
||||||
|
cluster = jobj->get("cluster").convert<String>();
|
||||||
|
last_modification_timestamp = jobj->get("last_modification_timestamp").convert<UInt64>();
|
||||||
|
file_size =jobj->get("file_size").convert<UInt64>();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool HiveFileMetaData::equal(RemoteFileMetaDataBasePtr meta_data)
|
||||||
|
{
|
||||||
|
auto real_meta_data = std::dynamic_pointer_cast<HiveFileMetaData>(meta_data);
|
||||||
|
if (!real_meta_data)
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid meta data class");
|
||||||
|
return last_modification_timestamp == real_meta_data->last_modification_timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
REGISTTER_REMOTE_FILE_META_DATA_CLASS(HiveFileMetaData)
|
||||||
|
|
||||||
|
}
|
29
src/Storages/Hive/HiveFileMetaData.h
Normal file
29
src/Storages/Hive/HiveFileMetaData.h
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
#pragma once
|
||||||
|
#include <IO/RemoteFileMetaDataBase.h>
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
class HiveFileMetaData : public RemoteFileMetaDataBase
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
HiveFileMetaData() = default;
|
||||||
|
HiveFileMetaData(const String & schema_,
|
||||||
|
const String & cluster_,
|
||||||
|
const String & remote_path_,
|
||||||
|
size_t file_size_,
|
||||||
|
UInt64 last_modification_timestamp_):
|
||||||
|
RemoteFileMetaDataBase(schema_, cluster_, remote_path_, file_size_, last_modification_timestamp_){}
|
||||||
|
~HiveFileMetaData() override;
|
||||||
|
|
||||||
|
String getClassName() override { return "HiveFileMetaData"; }
|
||||||
|
|
||||||
|
RemoteFileMetaDataBasePtr clone() override
|
||||||
|
{
|
||||||
|
auto result = std::make_shared<HiveFileMetaData>(schema, cluster, remote_path, file_size, last_modification_timestamp);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
String toString() override;
|
||||||
|
bool fromString(const String &buf) override;
|
||||||
|
bool equal(RemoteFileMetaDataBasePtr meta_data) override;
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
@ -38,6 +38,7 @@
|
|||||||
#include <Storages/Hive/HiveFile.h>
|
#include <Storages/Hive/HiveFile.h>
|
||||||
#include <Storages/Hive/HiveSettings.h>
|
#include <Storages/Hive/HiveSettings.h>
|
||||||
#include <Storages/Hive/HiveCommon.h>
|
#include <Storages/Hive/HiveCommon.h>
|
||||||
|
#include <Storages/Hive/HiveFileMetaData.h>
|
||||||
#include <Storages/MergeTree/KeyCondition.h>
|
#include <Storages/MergeTree/KeyCondition.h>
|
||||||
#include <Storages/StorageFactory.h>
|
#include <Storages/StorageFactory.h>
|
||||||
|
|
||||||
@ -168,9 +169,10 @@ public:
|
|||||||
|
|
||||||
// Use local cache for remote filesystem if enabled.
|
// Use local cache for remote filesystem if enabled.
|
||||||
std::unique_ptr<ReadBuffer> remote_read_buf;
|
std::unique_ptr<ReadBuffer> remote_read_buf;
|
||||||
if (RemoteReadBufferCache::instance().isInitialized() && getContext()->getSettingsRef().use_local_cache_for_remote_fs)
|
bool x = false;
|
||||||
|
if (RemoteReadBufferCache::instance().isInitialized() && getContext()->getSettingsRef().use_local_cache_for_remote_fs && x)
|
||||||
remote_read_buf = RemoteReadBuffer::create(getContext(),
|
remote_read_buf = RemoteReadBuffer::create(getContext(),
|
||||||
{"Hive", getNameNodeCluster(hdfs_namenode_url), uri_with_path, curr_file->getLastModTs(), curr_file->getSize()},
|
std::make_shared<HiveFileMetaData>("Hive", getNameNodeCluster(hdfs_namenode_url), uri_with_path, curr_file->getLastModTs(), curr_file->getSize()),
|
||||||
std::move(raw_read_buf));
|
std::move(raw_read_buf));
|
||||||
else
|
else
|
||||||
remote_read_buf = std::move(raw_read_buf);
|
remote_read_buf = std::move(raw_read_buf);
|
||||||
|
Loading…
Reference in New Issue
Block a user