rewrite codes base on comment

This commit is contained in:
lgbo-ustc 2021-11-11 19:50:08 +08:00 committed by liangjiabiao
parent c99ae74d50
commit 9ad441c56d
3 changed files with 33 additions and 24 deletions

View File

@ -507,7 +507,7 @@ if (ThreadFuzzer::instance().isEffective())
global_context->addWarningMessage("ThreadFuzzer is enabled. Application will run slowly and unstable.");
if (config().has("local_cache_dir") && config().has("local_cache_quota"))
RemoteReadBufferCache::instance().initOnce(config().getString("local_cache_dir"), config().getUInt64("local_cache_quota"));
RemoteReadBufferCache::instance().initOnce(config().getString("local_cache_dir"), config().getUInt64("local_cache_quota"), config().getUInt64("local_cache_bytes_read_before_flush",DBMS_DEFAULT_BUFFER_SIZE));
#if defined(SANITIZER)
global_context->addWarningMessage("Server was built with sanitizer. It will work slowly.");

View File

@ -40,7 +40,7 @@ RemoteCacheController::recover(const std::string & local_path_, std::function<vo
auto schema = meta_jobj->get("schema").convert<std::string>();
auto cluster = meta_jobj->get("cluster").convert<std::string>();
auto downloaded = meta_jobj->get("downloaded").convert<std::string>();
auto mod_ts = meta_jobj->get("last_mod_ts").convert<UInt64>();
auto mod_ts = meta_jobj->get("last_modification_timestamp").convert<UInt64>();
if (downloaded == "false")
{
LOG_ERROR(&Poco::Logger::get("RemoteCacheController"), "not a downloaded file: " + local_path_);
@ -48,7 +48,7 @@ RemoteCacheController::recover(const std::string & local_path_, std::function<vo
}
auto size = std::filesystem::file_size(data_file);
auto cntrl = std::make_shared<RemoteCacheController>(schema, cluster, remote_path, mod_ts, local_path_, nullptr, finish_callback);
auto cntrl = std::make_shared<RemoteCacheController>(schema, cluster, remote_path, mod_ts, local_path_, 0, nullptr, finish_callback);
cntrl->download_finished = true;
cntrl->current_offset = size;
meta_fs.close();
@ -61,8 +61,9 @@ RemoteCacheController::RemoteCacheController(
const std::string & schema_,
const std::string & cluster_,
const std::string & path_,
UInt64 mod_ts_,
UInt64 ts_,
const std::string & local_path_,
size_t cache_bytes_before_flush_,
std::shared_ptr<ReadBuffer> readbuffer_,
std::function<void(RemoteCacheController *)> const & finish_callback)
{
@ -71,7 +72,8 @@ RemoteCacheController::RemoteCacheController(
cluster = cluster_;
local_path = local_path_;
remote_path = path_;
last_mod_ts = mod_ts_;
last_modification_timestamp = ts_;
local_cache_bytes_read_before_flush = cache_bytes_before_flush_;
valid = true;
if (readbuffer_ != nullptr)
{
@ -87,7 +89,7 @@ RemoteCacheController::RemoteCacheController(
jobj.set("cluster", cluster_);
jobj.set("remote_path", path_);
jobj.set("downloaded", "false");
jobj.set("last_mod_ts", mod_ts_);
jobj.set("last_modification_timestamp", ts_);
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
jobj.stringify(buf);
Poco::FileOutputStream meta_file(local_path_ + "/meta.txt", std::ios::out);
@ -107,7 +109,6 @@ RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offs
{
download_thread->wait();
LOG_TRACE(&Poco::Logger::get("RemoteCacheController"), "try to release down thread");
delete download_thread;
download_thread = nullptr;
}
// finish reading
@ -133,18 +134,19 @@ RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offs
void RemoteCacheController::backgroupDownload(std::function<void(RemoteCacheController *)> const & finish_callback)
{
download_thread = new ThreadPool(1);
download_thread.reset(new ThreadPool(1));
auto task = [this, finish_callback]()
{
size_t n = 0;
size_t unflush_bytes = 0;
size_t total_bytes = 0;
while (!remote_readbuffer->eof())
{
size_t bytes = remote_readbuffer->buffer().end() - remote_readbuffer->position();
size_t bytes = remote_readbuffer->available();
out_file->write(remote_readbuffer->position(), bytes);
remote_readbuffer->position() += bytes;
total_bytes += bytes;
if (n++ % 10 == 0)
unflush_bytes += bytes;
if (unflush_bytes >= local_cache_bytes_read_before_flush)
{
std::unique_lock lock(mutex);
current_offset += total_bytes;
@ -152,12 +154,13 @@ void RemoteCacheController::backgroupDownload(std::function<void(RemoteCacheCont
flush();
lock.unlock();
more_data_signal.notify_all();
unflush_bytes = 0;
}
}
std::unique_lock lock(mutex);
current_offset += total_bytes;
download_finished = true;
flush();
flush(true);
out_file->close();
delete out_file;
out_file = nullptr;
@ -171,18 +174,21 @@ void RemoteCacheController::backgroupDownload(std::function<void(RemoteCacheCont
download_thread->scheduleOrThrow(task);
}
void RemoteCacheController::flush()
void RemoteCacheController::flush(bool need_flush_meta_)
{
if (out_file != nullptr)
{
out_file->flush();
}
if (!need_flush_meta_)
return;
Poco::JSON::Object jobj;
jobj.set("schema", schema);
jobj.set("cluster", cluster);
jobj.set("remote_path", remote_path);
jobj.set("downloaded", download_finished ? "true" : "false");
jobj.set("last_mod_ts", last_mod_ts);
jobj.set("last_modification_timestamp", last_modification_timestamp);
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
jobj.stringify(buf);
@ -198,7 +204,6 @@ RemoteCacheController::~RemoteCacheController()
if (download_thread != nullptr)
{
download_thread->wait();
delete download_thread;
}
}
@ -412,12 +417,13 @@ RemoteReadBufferCache & RemoteReadBufferCache::instance()
return instance;
}
void RemoteReadBufferCache::initOnce(const std::string & dir, size_t limit_size_)
void RemoteReadBufferCache::initOnce(const std::string & dir, size_t limit_size_, size_t bytes_read_before_flush_)
{
LOG_TRACE(log, "init local cache. path: {}, limit {}", dir, limit_size_);
std::lock_guard lock(mutex);
local_path_prefix = dir;
limit_size = limit_size_;
local_cache_bytes_read_before_flush = bytes_read_before_flush_;
// scan local disk dir and recover the cache metas
std::filesystem::path root_dir(local_path_prefix);
@ -532,10 +538,10 @@ std::tuple<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError> R
return {nullptr, RemoteReadBufferCacheError::DISK_FULL};
}
std::filesystem::create_directory(local_path);
std::filesystem::create_directories(local_path);
auto callback = [this](RemoteCacheController * cntrl) { this->total_size += cntrl->size(); };
auto cache_cntrl = std::make_shared<RemoteCacheController>(schema, cluster, remote_path, mod_ts, local_path, readbuffer, callback);
auto cache_cntrl = std::make_shared<RemoteCacheController>(schema, cluster, remote_path, mod_ts, local_path, local_cache_bytes_read_before_flush, readbuffer, callback);
CacheCell cc;
cc.cache_controller = cache_cntrl;
cc.key_iterator = keys.insert(keys.end(), local_path);

View File

@ -34,8 +34,9 @@ public:
const std::string & schema_,
const std::string & cluster_,
const std::string & path_,
UInt64 mod_ts,
UInt64 ts,
const std::string & local_path_,
size_t cache_bytes_before_flush_,
std::shared_ptr<ReadBuffer> readbuffer_,
std::function<void(RemoteCacheController *)> const & finish_callback);
~RemoteCacheController(); // the local files will be deleted in descontructor
@ -74,7 +75,7 @@ public:
inline const std::string & getLocalPath() { return local_path; }
inline const std::string & getRemotePath() { return remote_path; }
inline UInt64 getLastModTS() const { return last_mod_ts; }
inline UInt64 getLastModTS() const { return last_modification_timestamp; }
inline void markInvalid()
{
std::lock_guard lock(mutex);
@ -88,13 +89,13 @@ public:
private:
// flush file and meta info into disk
void flush();
void flush(bool need_flush_meta_ = false);
void backgroupDownload(std::function<void(RemoteCacheController *)> const & finish_callback);
std::mutex mutex;
std::condition_variable more_data_signal;
ThreadPool * download_thread;
std::shared_ptr<ThreadPool> download_thread;
std::set<FILE *> opened_file_streams;
@ -102,12 +103,13 @@ private:
bool download_finished;
bool valid;
size_t current_offset;
UInt64 last_mod_ts;
UInt64 last_modification_timestamp;
std::string local_path;
std::string remote_path;
std::string schema;
std::string cluster;
size_t local_cache_bytes_read_before_flush;
std::shared_ptr<ReadBuffer> remote_readbuffer;
std::ofstream * out_file = nullptr;
};
@ -176,7 +178,7 @@ public:
// global instance
static RemoteReadBufferCache & instance();
void initOnce(const std::string & dir, size_t limit_size);
void initOnce(const std::string & dir, size_t limit_size, size_t bytes_read_before_flush_);
inline bool hasInitialized() const { return inited; }
std::tuple<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError> createReader(
@ -193,6 +195,7 @@ private:
std::atomic<bool> inited = false;
std::mutex mutex;
size_t limit_size = 0;
size_t local_cache_bytes_read_before_flush = 0;
std::atomic<size_t> total_size;
Poco::Logger * log = &Poco::Logger::get("RemoteReadBufferCache");