fix code style

This commit is contained in:
taiyang-li 2021-11-29 11:30:11 +08:00
parent df68b5b1a1
commit e67407b5c5
4 changed files with 88 additions and 89 deletions

View File

@ -1,16 +1,16 @@
#include <fstream>
#include <memory>
#include <unistd.h>
#include <functional>
#include <Common/SipHash.h>
#include <Common/hex.h>
#include <IO/RemoteReadBufferCache.h>
#include <IO/WriteHelpers.h>
#include <Poco/Logger.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Parser.h>
#include <base/logger_useful.h>
#include "Common/Exception.h"
#include <fstream>
#include <memory>
#include <unistd.h>
#include <Common/SipHash.h>
#include <Common/hex.h>
#include <Common/Exception.h>
#include <IO/RemoteReadBufferCache.h>
#include <IO/WriteHelpers.h>
namespace DB
{
@ -21,10 +21,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
std::shared_ptr<RemoteCacheController>
RemoteCacheController::recover(
const std::filesystem::path & local_path_,
std::function<void(RemoteCacheController *)> const & finish_callback)
std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(
const std::filesystem::path & local_path_, std::function<void(RemoteCacheController *)> const & finish_callback)
{
const auto & dir_handle = local_path_;
std::filesystem::path data_file = local_path_ / "data.bin";
@ -50,7 +48,7 @@ RemoteCacheController::recover(
}
auto file_size = std::filesystem::file_size(data_file);
RemoteFileMeta remote_file_meta (schema, cluster, remote_path, modification_ts, file_size);
RemoteFileMeta remote_file_meta(schema, cluster, remote_path, modification_ts, file_size);
auto cntrl = std::make_shared<RemoteCacheController>(remote_file_meta, local_path_, 0, nullptr, finish_callback);
cntrl->download_finished = true;
cntrl->current_offset = file_size;
@ -89,7 +87,7 @@ RemoteCacheController::RemoteCacheController(
jobj.set("remote_path", remote_path);
jobj.set("downloaded", "false");
jobj.set("last_modification_timestamp", last_modification_timestamp);
std::stringstream buf;// STYLE_CHECK_ALLOW_STD_STRING_STREAM
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
jobj.stringify(buf);
std::ofstream meta_file(local_path_ / "meta.txt", std::ios::out);
meta_file.write(buf.str().c_str(), buf.str().size());
@ -127,8 +125,7 @@ RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offs
void RemoteCacheController::backgroupDownload(std::function<void(RemoteCacheController *)> const & finish_callback)
{
auto task = [this, finish_callback]()
{
auto task = [this, finish_callback]() {
size_t unflush_bytes = 0;
size_t total_bytes = 0;
while (!remote_readbuffer->eof())
@ -163,7 +160,9 @@ void RemoteCacheController::backgroupDownload(std::function<void(RemoteCacheCont
LOG_TRACE(
&Poco::Logger::get("RemoteCacheController"),
"finish download.{} into {}. size:{} ",
remote_path, local_path.string(), current_offset);
remote_path,
local_path.string(),
current_offset);
};
RemoteReadBufferCache::instance().GetThreadPool()->scheduleOrThrow(task);
}
@ -183,7 +182,7 @@ void RemoteCacheController::flush(bool need_flush_meta_)
jobj.set("remote_path", remote_path);
jobj.set("downloaded", download_finished ? "true" : "false");
jobj.set("last_modification_timestamp", last_modification_timestamp);
std::stringstream buf;// STYLE_CHECK_ALLOW_STD_STRING_STREAM
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
jobj.stringify(buf);
std::ofstream meta_file(local_path / "meta.txt", std::ios::out);
@ -203,7 +202,7 @@ std::tuple<FILE *, std::filesystem::path> RemoteCacheController::allocFile()
{
std::filesystem::path result_local_path;
if (download_finished)
result_local_path = local_path / "data.bin";
result_local_path = local_path / "data.bin";
FILE * fs = fopen((local_path / "data.bin").string().c_str(), "r");
if (fs == nullptr)
return {fs, result_local_path};
@ -284,9 +283,7 @@ RemoteReadBuffer::RemoteReadBuffer(size_t buff_size) : BufferWithOwnMemory<Seeka
RemoteReadBuffer::~RemoteReadBuffer() = default;
std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(
const RemoteFileMeta &remote_file_meta_,
std::unique_ptr<ReadBuffer> readbuffer)
std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(const RemoteFileMeta & remote_file_meta_, std::unique_ptr<ReadBuffer> readbuffer)
{
size_t buff_size = DBMS_DEFAULT_BUFFER_SIZE;
if (readbuffer != nullptr)
@ -313,8 +310,7 @@ std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(
if (retry > 0)
usleep(20 * retry);
std::tie(rrb->file_reader, error)
= RemoteReadBufferCache::instance().createReader(remote_file_meta_, srb);
std::tie(rrb->file_reader, error) = RemoteReadBufferCache::instance().createReader(remote_file_meta_, srb);
retry++;
} while (error == RemoteReadBufferCacheError::FILE_INVALID && retry < 10);
if (rrb->file_reader == nullptr)
@ -403,10 +399,10 @@ RemoteReadBufferCache & RemoteReadBufferCache::instance()
}
void RemoteReadBufferCache::recoverCachedFilesMeta(
const std::filesystem::path & current_path,
size_t current_depth,
size_t max_depth,
std::function<void(RemoteCacheController *)> const & finish_callback)
const std::filesystem::path & current_path,
size_t current_depth,
size_t max_depth,
std::function<void(RemoteCacheController *)> const & finish_callback)
{
if (current_depth >= max_depth)
{
@ -416,24 +412,21 @@ void RemoteReadBufferCache::recoverCachedFilesMeta(
auto cache_controller = RemoteCacheController::recover(path, finish_callback);
if (!cache_controller)
continue;
auto &cell = caches[path];
auto & cell = caches[path];
cell.cache_controller = cache_controller;
cell.key_iterator = keys.insert(keys.end(), path);
}
return;
}
for (auto const &dir : std::filesystem::directory_iterator{current_path})
for (auto const & dir : std::filesystem::directory_iterator{current_path})
{
recoverCachedFilesMeta(dir.path(), current_depth + 1, max_depth, finish_callback);
}
}
void RemoteReadBufferCache::initOnce(const std::filesystem::path & dir,
size_t limit_size_,
size_t bytes_read_before_flush_,
size_t max_threads)
void RemoteReadBufferCache::initOnce(
const std::filesystem::path & dir, size_t limit_size_, size_t bytes_read_before_flush_, size_t max_threads)
{
LOG_TRACE(log, "init local cache. path: {}, limit {}", dir.string(), limit_size_);
local_path_prefix = dir;
@ -448,8 +441,7 @@ void RemoteReadBufferCache::initOnce(const std::filesystem::path & dir,
LOG_INFO(log, "{} not exists. this cache will be disable", local_path_prefix);
return;
}
auto recover_task = [this, root_dir]()
{
auto recover_task = [this, root_dir]() {
auto callback = [this](RemoteCacheController * cntrl) { this->total_size += cntrl->size(); };
std::lock_guard lock(this->mutex);
// two level dir. /<first 3 chars of path hash code>/<path hash code>
@ -465,12 +457,11 @@ std::filesystem::path RemoteReadBufferCache::calculateLocalPath(const RemoteFile
std::string full_path = meta.schema + ":" + meta.cluster + ":" + meta.path;
UInt128 hashcode = sipHash128(full_path.c_str(), full_path.size());
std::string hashcode_str = getHexUIntLowercase(hashcode);
return std::filesystem::path(local_path_prefix) / hashcode_str.substr(0,3) / hashcode_str;
return std::filesystem::path(local_path_prefix) / hashcode_str.substr(0, 3) / hashcode_str;
}
std::tuple<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError> RemoteReadBufferCache::createReader(
const RemoteFileMeta &remote_file_meta,
std::shared_ptr<ReadBuffer> & readbuffer)
std::tuple<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError>
RemoteReadBufferCache::createReader(const RemoteFileMeta & remote_file_meta, std::shared_ptr<ReadBuffer> & readbuffer)
{
// If something is wrong on startup, rollback to read from the original ReadBuffer
if (!hasInitialized())
@ -490,8 +481,10 @@ std::tuple<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError> R
// if the file has been update on remote side, we need to redownload it
if (cache_iter->second.cache_controller->getLastModificationTimestamp() != last_modification_timestamp)
{
LOG_TRACE(log,
"remote file has been updated. " + remote_path + ":" + std::to_string(cache_iter->second.cache_controller->getLastModificationTimestamp()) + "->"
LOG_TRACE(
log,
"remote file has been updated. " + remote_path + ":"
+ std::to_string(cache_iter->second.cache_controller->getLastModificationTimestamp()) + "->"
+ std::to_string(last_modification_timestamp));
cache_iter->second.cache_controller->markInvalid();
}
@ -499,7 +492,9 @@ std::tuple<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError> R
{
// move the key to the list end
keys.splice(keys.end(), keys, cache_iter->second.key_iterator);
return {std::make_shared<LocalCachedFileReader>(cache_iter->second.cache_controller.get(), file_size), RemoteReadBufferCacheError::OK};
return {
std::make_shared<LocalCachedFileReader>(cache_iter->second.cache_controller.get(), file_size),
RemoteReadBufferCacheError::OK};
}
}
@ -511,7 +506,9 @@ std::tuple<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError> R
{
// move the key to the list end, this case should not happen?
keys.splice(keys.end(), keys, cache_iter->second.key_iterator);
return {std::make_shared<LocalCachedFileReader>(cache_iter->second.cache_controller.get(), file_size), RemoteReadBufferCacheError::OK};
return {
std::make_shared<LocalCachedFileReader>(cache_iter->second.cache_controller.get(), file_size),
RemoteReadBufferCacheError::OK};
}
else
{
@ -530,7 +527,8 @@ std::tuple<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError> R
std::filesystem::create_directories(local_path);
auto callback = [this](RemoteCacheController * cntrl) { this->total_size += cntrl->size(); };
auto cache_cntrl = std::make_shared<RemoteCacheController>(remote_file_meta, local_path, local_cache_bytes_read_before_flush, readbuffer, callback);
auto cache_cntrl
= std::make_shared<RemoteCacheController>(remote_file_meta, local_path, local_cache_bytes_read_before_flush, readbuffer, callback);
CacheCell cc;
cc.cache_controller = cache_cntrl;
cc.key_iterator = keys.insert(keys.end(), local_path);
@ -548,7 +546,8 @@ bool RemoteReadBufferCache::clearLocalCache()
if (!cntrl->isValid() && cntrl->closable())
{
LOG_TRACE(log, "clear invalid cache: " + *it);
total_size = total_size > cache_it->second.cache_controller->size() ? total_size - cache_it->second.cache_controller->size() : 0;
total_size
= total_size > cache_it->second.cache_controller->size() ? total_size - cache_it->second.cache_controller->size() : 0;
cntrl->close();
it = keys.erase(it);
caches.erase(cache_it);
@ -568,12 +567,18 @@ bool RemoteReadBufferCache::clearLocalCache()
}
if (cache_it->second.cache_controller->closable())
{
total_size = total_size > cache_it->second.cache_controller->size() ? total_size - cache_it->second.cache_controller->size() : 0;
total_size
= total_size > cache_it->second.cache_controller->size() ? total_size - cache_it->second.cache_controller->size() : 0;
cache_it->second.cache_controller->close();
caches.erase(cache_it);
it = keys.erase(it);
LOG_TRACE(log, "clear local file {} for {}. key size:{}. next{}", cache_it->second.cache_controller->getLocalPath().string(),
cache_it->second.cache_controller->getRemotePath(), keys.size(), *it);
LOG_TRACE(
log,
"clear local file {} for {}. key size:{}. next{}",
cache_it->second.cache_controller->getLocalPath().string(),
cache_it->second.cache_controller->getRemotePath(),
keys.size(),
*it);
}
else
break;

View File

@ -14,7 +14,7 @@
namespace DB
{
enum class RemoteReadBufferCacheError :int8_t
enum class RemoteReadBufferCacheError : int8_t
{
OK,
NOT_INIT = 10,
@ -27,17 +27,14 @@ enum class RemoteReadBufferCacheError :int8_t
struct RemoteFileMeta
{
RemoteFileMeta(
const std::string & schema_,
const std::string & cluster_,
const std::string & path_,
UInt64 last_modification_timestamp_,
size_t file_size_):
schema(schema_),
cluster(cluster_),
path(path_),
last_modification_timestamp(last_modification_timestamp_),
file_size(file_size_)
{}
const std::string & schema_,
const std::string & cluster_,
const std::string & path_,
UInt64 last_modification_timestamp_,
size_t file_size_)
: schema(schema_), cluster(cluster_), path(path_), last_modification_timestamp(last_modification_timestamp_), file_size(file_size_)
{
}
std::string schema; // Hive, S2 etc.
std::string cluster;
@ -53,7 +50,7 @@ class RemoteCacheController
{
public:
RemoteCacheController(
const RemoteFileMeta &meta,
const RemoteFileMeta & meta,
const std::filesystem::path & local_path_,
size_t cache_bytes_before_flush_,
std::shared_ptr<ReadBuffer> readbuffer_,
@ -167,9 +164,7 @@ class RemoteReadBuffer : public BufferWithOwnMemory<SeekableReadBuffer>
public:
explicit RemoteReadBuffer(size_t buff_size);
~RemoteReadBuffer() override;
static std::unique_ptr<RemoteReadBuffer> create(
const RemoteFileMeta &remote_file_meta_,
std::unique_ptr<ReadBuffer> readbuffer);
static std::unique_ptr<RemoteReadBuffer> create(const RemoteFileMeta & remote_file_meta_, std::unique_ptr<ReadBuffer> readbuffer);
bool nextImpl() override;
inline bool seekable() { return file_reader != nullptr && file_reader->size() > 0; }
@ -191,14 +186,13 @@ public:
~RemoteReadBufferCache();
// global instance
static RemoteReadBufferCache & instance();
std::shared_ptr<FreeThreadPool> GetThreadPool(){ return threadPool; }
std::shared_ptr<FreeThreadPool> GetThreadPool() { return threadPool; }
void initOnce(const std::filesystem::path & dir, size_t limit_size, size_t bytes_read_before_flush_, size_t max_threads);
inline bool hasInitialized() const { return inited; }
std::tuple<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError> createReader(
const RemoteFileMeta & remote_file_meta,
std::shared_ptr<ReadBuffer> & readbuffer);
std::tuple<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError>
createReader(const RemoteFileMeta & remote_file_meta, std::shared_ptr<ReadBuffer> & readbuffer);
private:
std::string local_path_prefix;
@ -219,13 +213,13 @@ private:
std::list<std::string> keys;
std::map<std::string, CacheCell> caches;
std::filesystem::path calculateLocalPath(const RemoteFileMeta &meta);
std::filesystem::path calculateLocalPath(const RemoteFileMeta & meta);
void recoverCachedFilesMeta(
const std::filesystem::path & current_path,
size_t current_depth,
size_t max_depth,
std::function<void(RemoteCacheController *)> const & finish_callback);
const std::filesystem::path & current_path,
size_t current_depth,
size_t max_depth,
std::function<void(RemoteCacheController *)> const & finish_callback);
bool clearLocalCache();
};

View File

@ -9,7 +9,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
std::shared_ptr<HiveMetastoreClient::HiveTableMeta> HiveMetastoreClient::getTableMeta(const std::string & db_name, const std::string & table_name)
std::shared_ptr<HiveMetastoreClient::HiveTableMetadata> HiveMetastoreClient::getTableMetadata(const std::string & db_name, const std::string & table_name)
{
LOG_TRACE(log, "get table meta:" + db_name + ":" + table_name);
std::lock_guard lock{mutex};
@ -32,7 +32,7 @@ std::shared_ptr<HiveMetastoreClient::HiveTableMeta> HiveMetastoreClient::getTabl
}
std::string cache_key = db_name + "." + table_name;
std::shared_ptr<HiveMetastoreClient::HiveTableMeta> result = table_meta_cache.get(cache_key);
std::shared_ptr<HiveMetastoreClient::HiveTableMetadata> result = table_meta_cache.get(cache_key);
bool update_cache = false;
std::map<std::string, PartitionInfo> old_partition_infos;
std::map<std::string, PartitionInfo> partition_infos;
@ -49,15 +49,15 @@ std::shared_ptr<HiveMetastoreClient::HiveTableMeta> HiveMetastoreClient::getTabl
for (const auto & partition : partitions)
{
auto & pinfo = partition_infos[partition.sd.location];
pinfo.partition = partition;
auto & partition_info = partition_infos[partition.sd.location];
partition_info.partition = partition;
// query files under the partition by hdfs api is costly, we reuse the files in case the partition has no change
if (result)
{
auto it = old_partition_infos.find(partition.sd.location);
if (it != old_partition_infos.end() && it->second.equal(partition))
pinfo.files = it->second.files;
partition_info.files = it->second.files;
else
update_cache = true;
}
@ -72,7 +72,7 @@ std::shared_ptr<HiveMetastoreClient::HiveTableMeta> HiveMetastoreClient::getTabl
if (update_cache)
{
LOG_INFO(log, "reload hive partition meta info:" + db_name + ":" + table_name);
result = std::make_shared<HiveMetastoreClient::HiveTableMeta>(db_name, table_name, table, std::move(partition_infos), getContext());
result = std::make_shared<HiveMetastoreClient::HiveTableMetadata>(db_name, table_name, table, std::move(partition_infos), getContext());
table_meta_cache.set(cache_key, result);
}
return result;
@ -82,7 +82,7 @@ void HiveMetastoreClient::clearTableMeta(const std::string & db_name, const std:
{
std::lock_guard lock{mutex};
std::string cache_key = db_name + "." + table_name;
std::shared_ptr<HiveMetastoreClient::HiveTableMeta> meta = table_meta_cache.get(cache_key);
std::shared_ptr<HiveMetastoreClient::HiveTableMetadata> meta = table_meta_cache.get(cache_key);
if (meta)
table_meta_cache.set(cache_key, nullptr);
}
@ -107,7 +107,7 @@ bool HiveMetastoreClient::PartitionInfo::equal(const Apache::Hadoop::Hive::Parti
return (it1 == partition.parameters.end() && it2 == other.parameters.end());
}
std::vector<Apache::Hadoop::Hive::Partition> HiveMetastoreClient::HiveTableMeta::getPartitions()
std::vector<Apache::Hadoop::Hive::Partition> HiveMetastoreClient::HiveTableMetadata::getPartitions()
{
std::vector<Apache::Hadoop::Hive::Partition> result;
@ -117,7 +117,7 @@ std::vector<Apache::Hadoop::Hive::Partition> HiveMetastoreClient::HiveTableMeta:
return result;
}
std::vector<HiveMetastoreClient::FileInfo> HiveMetastoreClient::HiveTableMeta::getLocationFiles(const std::string & location)
std::vector<HiveMetastoreClient::FileInfo> HiveMetastoreClient::HiveTableMetadata::getLocationFiles(const std::string & location)
{
std::map<std::string, PartitionInfo>::const_iterator it;
if (!empty_partition_keys)
@ -153,7 +153,7 @@ std::vector<HiveMetastoreClient::FileInfo> HiveMetastoreClient::HiveTableMeta::g
return result;
}
std::vector<HiveMetastoreClient::FileInfo> HiveMetastoreClient::HiveTableMeta::getLocationFiles(const HDFSFSPtr & fs, const std::string & location)
std::vector<HiveMetastoreClient::FileInfo> HiveMetastoreClient::HiveTableMetadata::getLocationFiles(const HDFSFSPtr & fs, const std::string & location)
{
std::map<std::string, PartitionInfo>::const_iterator it;
if (!empty_partition_keys)

View File

@ -37,10 +37,10 @@ public:
};
// use for speeding up query metadata
struct HiveTableMeta : public WithContext
struct HiveTableMetadata : public WithContext
{
public:
HiveTableMeta(
HiveTableMetadata(
const std::string & db_name_,
const std::string & table_name_,
std::shared_ptr<Apache::Hadoop::Hive::Table> table_,
@ -79,7 +79,7 @@ public:
{
}
std::shared_ptr<HiveTableMeta> getTableMeta(const std::string & db_name, const std::string & table_name);
std::shared_ptr<HiveTableMetadata> getTableMetadata(const std::string & db_name, const std::string & table_name);
void clearTableMeta(const std::string & db_name, const std::string & table_name);
void setClient(std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> client_);
inline bool isExpired() const { return expired; }
@ -88,7 +88,7 @@ public:
private:
std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> client;
LRUCache<std::string, HiveTableMeta> table_meta_cache;
LRUCache<std::string, HiveTableMetadata> table_meta_cache;
mutable std::mutex mutex;
std::atomic<bool> expired{false};