fix code stylke

This commit is contained in:
taiyang-li 2021-11-29 16:22:43 +08:00
parent 83be8d28e6
commit c7a0fe467f
6 changed files with 123 additions and 98 deletions

View File

@ -518,12 +518,21 @@ if (ThreadFuzzer::instance().isEffective())
config().getUInt("thread_pool_queue_size", 10000)
);
if (config().has("local_cache_dir") && config().has("local_cache_quota"))
RemoteReadBufferCache::instance().initOnce(
config().getString("local_cache_dir"),
config().getUInt64("local_cache_quota"),
config().getUInt64("local_cache_bytes_read_before_flush",DBMS_DEFAULT_BUFFER_SIZE),
config().getUInt64("local_cache_max_threads", 1000));
/// Initialize global local cache for remote filesystem.
if (config().has("local_cache_for_remote_fs"))
{
bool enable = config().getBool("local_cache_for_remote_fs.enable", false);
if (enable)
{
String root_dir = config().getString("local_cache_for_remote_fs.root_dir");
UInt64 limit_size = config().getUInt64("local_cache_for_remote_fs.limit_size");
UInt64 bytes_read_before_flush
= config().getUInt64("local_cache_for_remote_fs.bytes_read_before_flush", DBMS_DEFAULT_BUFFER_SIZE);
auto max_threads = config().getUInt("local_cache_for_remote_fs.max_threads", 64);
RemoteReadBufferCache::instance().initOnce(root_dir, limit_size, bytes_read_before_flush, max_threads);
}
}
ConnectionCollector::init(global_context, config().getUInt("max_threads_for_connection_collector", 10));

View File

@ -1253,4 +1253,12 @@
</tables>
</rocksdb>
-->
<local_cache_for_remote_fs>
<enable>true</enable>
<root_dir>/var/lib/clickhouse/local_cache</root_dir>
<quota>53687091200</quota>
<bytes_read_before_flush>1048576</bytes_read_before_flush>
<max_threads>1024</max_threads>
</local_cache_for_remote_fs>
</clickhouse>

View File

@ -533,6 +533,7 @@ class IColumn;
\
M(Bool, force_remove_data_recursively_on_drop, false, "Recursively remove data on DROP query. Avoids 'Directory not empty' error, but may silently remove detached data", 0) \
M(Bool, check_table_dependencies, true, "Check that DDL query (such as DROP TABLE or RENAME) will not break dependencies", 0) \
M(Bool, use_local_cache_for_remote_fs, true, "Use local cache for remote filesystem like HDFS or S3", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \

View File

@ -20,34 +20,35 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int BAD_GET;
extern const int LOGICAL_ERROR;
extern const int CANNOT_CREATE_DIRECTORY;
}
std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(
const std::filesystem::path & local_path_, std::function<void(RemoteCacheController *)> const & finish_callback)
const String & local_path_, std::function<void(RemoteCacheController *)> const & finish_callback)
{
std::filesystem::path data_file = local_path_ / "data.bin";
std::filesystem::path meta_file = local_path_ / "meta.txt";
fs::path data_file = fs::path(local_path_) / "data.bin";
fs::path meta_file = fs::path(local_path_) / "meta.txt";
auto * log = &Poco::Logger::get("RemoteCacheController");
if (!std::filesystem::exists(data_file) || !std::filesystem::exists(meta_file))
if (!fs::exists(data_file) || !fs::exists(meta_file))
{
LOG_ERROR(log, "Directory {} or file {}, {} does not exist", local_path_.string(), data_file.string(), meta_file.string());
LOG_ERROR(log, "Directory {} or file {}, {} does not exist", local_path_, data_file.string(), meta_file.string());
return nullptr;
}
std::ifstream meta_fs(meta_file);
Poco::JSON::Parser meta_parser;
auto meta_jobj = meta_parser.parse(meta_fs).extract<Poco::JSON::Object::Ptr>();
auto remote_path = meta_jobj->get("remote_path").convert<std::string>();
auto schema = meta_jobj->get("schema").convert<std::string>();
auto cluster = meta_jobj->get("cluster").convert<std::string>();
auto downloaded = meta_jobj->get("downloaded").convert<std::string>();
auto remote_path = meta_jobj->get("remote_path").convert<String>();
auto schema = meta_jobj->get("schema").convert<String>();
auto cluster = meta_jobj->get("cluster").convert<String>();
auto downloaded = meta_jobj->get("downloaded").convert<String>();
auto modification_ts = meta_jobj->get("last_modification_timestamp").convert<UInt64>();
if (downloaded == "false")
{
LOG_ERROR(log, "Local metadata for path {} exists, but the data was not downloaded", local_path_.string());
LOG_ERROR(log, "Local metadata for path {} exists, but the data was not downloaded", local_path_);
return nullptr;
}
auto file_size = std::filesystem::file_size(data_file);
auto file_size = fs::file_size(data_file);
RemoteFileMetadata remote_file_meta(schema, cluster, remote_path, modification_ts, file_size);
auto cache_controller = std::make_shared<RemoteCacheController>(remote_file_meta, local_path_, 0, nullptr, finish_callback);
@ -61,7 +62,7 @@ std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(
RemoteCacheController::RemoteCacheController(
const RemoteFileMetadata & remote_file_meta,
const std::filesystem::path & local_path_,
const String & local_path_,
size_t cache_bytes_before_flush_,
std::shared_ptr<ReadBuffer> readbuffer_,
std::function<void(RemoteCacheController *)> const & finish_callback)
@ -79,7 +80,7 @@ RemoteCacheController::RemoteCacheController(
if (remote_readbuffer)
{
// setup local files
out_file = std::make_unique<std::ofstream>(local_path_ / "data.bin", std::ios::out | std::ios::binary);
out_file = std::make_unique<std::ofstream>(fs::path(local_path_) / "data.bin", std::ios::out | std::ios::binary);
out_file->flush();
Poco::JSON::Object jobj;
@ -90,7 +91,7 @@ RemoteCacheController::RemoteCacheController(
jobj.set("last_modification_timestamp", last_modify_time);
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
jobj.stringify(buf);
std::ofstream meta_file(local_path_ / "meta.txt", std::ios::out);
std::ofstream meta_file(fs::path(local_path_) / "meta.txt", std::ios::out);
meta_file.write(buf.str().c_str(), buf.str().size());
meta_file.close();
@ -126,7 +127,7 @@ RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offs
void RemoteCacheController::backgroundDownload(std::function<void(RemoteCacheController *)> const & finish_callback)
{
auto task = [this, finish_callback]()
auto task = [this, finish_callback]()
{
size_t before_unflush_bytes = 0;
size_t total_bytes = 0;
@ -159,7 +160,7 @@ void RemoteCacheController::backgroundDownload(std::function<void(RemoteCacheCon
lock.unlock();
more_data_signal.notify_all();
finish_callback(this);
LOG_TRACE(log, "finish download.{} into {}. size:{} ", remote_path, local_path.string(), current_offset);
LOG_TRACE(log, "finish download.{} into {}. size:{} ", remote_path, local_path, current_offset);
};
RemoteReadBufferCache::instance().getThreadPool()->scheduleOrThrow(task);
}
@ -182,7 +183,7 @@ void RemoteCacheController::flush(bool need_flush_meta_)
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
jobj.stringify(buf);
std::ofstream meta_file(local_path / "meta.txt", std::ios::out);
std::ofstream meta_file(fs::path(local_path) / "meta.txt", std::ios::out);
meta_file << buf.str();
meta_file.close();
}
@ -192,17 +193,17 @@ RemoteCacheController::~RemoteCacheController() = default;
void RemoteCacheController::close()
{
// delete directory
LOG_TRACE(log, "Removing all local cache for remote path: {}, local path: {}", remote_path, local_path.string());
std::filesystem::remove_all(local_path);
LOG_TRACE(log, "Removing all local cache for remote path: {}, local path: {}", remote_path, local_path);
fs::remove_all(local_path);
}
std::pair<FILE *, std::filesystem::path> RemoteCacheController::allocFile()
std::pair<FILE *, String> RemoteCacheController::allocFile()
{
std::filesystem::path result_local_path;
fs::path result_local_path;
if (download_finished)
result_local_path = local_path / "data.bin";
result_local_path = fs::path(local_path) / "data.bin";
FILE * fs = fopen((local_path / "data.bin").string().c_str(), "r");
FILE * fs = fopen((fs::path(local_path) / "data.bin").string().c_str(), "r");
if (fs == nullptr)
throw Exception("alloc file failed.", ErrorCodes::BAD_GET);
@ -226,19 +227,19 @@ void RemoteCacheController::deallocFile(FILE * fs)
fclose(fs);
}
LocalCachedFileReader::LocalCachedFileReader(RemoteCacheController * cntrl_, size_t size_)
: offset(0), file_size(size_), fs(nullptr), controller(cntrl_)
LocalCachedFileReader::LocalCachedFileReader(RemoteCacheController * cache_controller_, size_t size_)
: offset(0), file_size(size_), fs(nullptr), cache_controller(cache_controller_)
{
std::tie(fs, local_path) = controller->allocFile();
std::tie(fs, local_path) = cache_controller->allocFile();
}
LocalCachedFileReader::~LocalCachedFileReader()
{
controller->deallocFile(fs);
cache_controller->deallocFile(fs);
}
size_t LocalCachedFileReader::read(char * buf, size_t size)
{
auto wret = controller->waitMoreData(offset, offset + size);
auto wret = cache_controller->waitMoreData(offset, offset + size);
if (wret != RemoteReadBufferCacheError::OK)
return 0;
std::lock_guard lock(mutex);
@ -249,7 +250,7 @@ size_t LocalCachedFileReader::read(char * buf, size_t size)
off_t LocalCachedFileReader::seek(off_t off)
{
controller->waitMoreData(off, 1);
cache_controller->waitMoreData(off, 1);
std::lock_guard lock(mutex);
auto ret = fseek(fs, off, SEEK_SET);
offset = off;
@ -269,7 +270,7 @@ size_t LocalCachedFileReader::size()
return 0;
}
auto ret = std::filesystem::file_size(local_path);
auto ret = fs::file_size(local_path);
file_size = ret;
return ret;
}
@ -281,7 +282,7 @@ RemoteReadBuffer::RemoteReadBuffer(size_t buff_size) : BufferWithOwnMemory<Seeka
RemoteReadBuffer::~RemoteReadBuffer() = default;
std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(const RemoteFileMetadata & remote_file_meta_, std::unique_ptr<ReadBuffer> read_buffer)
std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(const RemoteFileMetadata & remote_file_meta, std::unique_ptr<ReadBuffer> read_buffer)
{
auto * log = &Poco::Logger::get("RemoteReadBuffer");
size_t buff_size = DBMS_DEFAULT_BUFFER_SIZE;
@ -298,7 +299,7 @@ std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(const RemoteFileMetad
if (buff_size == 0)
buff_size = DBMS_DEFAULT_BUFFER_SIZE;
const auto & remote_path = remote_file_meta_.path;
const auto & remote_path = remote_file_meta.path;
auto remote_read_buffer = std::make_unique<RemoteReadBuffer>(buff_size);
auto * raw_rbp = read_buffer.release();
std::shared_ptr<ReadBuffer> srb(raw_rbp);
@ -309,7 +310,7 @@ std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(const RemoteFileMetad
if (retry > 0)
sleepForMicroseconds(20 * retry);
std::tie(remote_read_buffer->file_reader, error) = RemoteReadBufferCache::instance().createReader(remote_file_meta_, srb);
std::tie(remote_read_buffer->file_reader, error) = RemoteReadBufferCache::instance().createReader(remote_file_meta, srb);
retry++;
} while (error == RemoteReadBufferCacheError::FILE_INVALID && retry < 10);
if (remote_read_buffer->file_reader == nullptr)
@ -397,14 +398,14 @@ RemoteReadBufferCache & RemoteReadBufferCache::instance()
}
void RemoteReadBufferCache::recoverCachedFilesMeta(
const std::filesystem::path & current_path,
const fs::path & current_path,
size_t current_depth,
size_t max_depth,
std::function<void(RemoteCacheController *)> const & finish_callback)
{
if (current_depth >= max_depth)
{
for (auto const & dir : std::filesystem::directory_iterator{current_path})
for (auto const & dir : fs::directory_iterator{current_path})
{
std::string path = dir.path();
auto cache_controller = RemoteCacheController::recover(path, finish_callback);
@ -417,36 +418,35 @@ void RemoteReadBufferCache::recoverCachedFilesMeta(
return;
}
for (auto const & dir : std::filesystem::directory_iterator{current_path})
for (auto const & dir : fs::directory_iterator{current_path})
{
recoverCachedFilesMeta(dir.path(), current_depth + 1, max_depth, finish_callback);
}
}
void RemoteReadBufferCache::initOnce(
const std::filesystem::path & dir, size_t limit_size_, size_t bytes_read_before_flush_, size_t max_threads)
const String & root_dir_, size_t limit_size_, size_t bytes_read_before_flush_, size_t max_threads_)
{
LOG_INFO(
log,
"Initializing local cache for remote data sources. Local cache root path: {}, cache size limit: {}",
dir.string(),
limit_size_);
local_path_prefix = dir;
log, "Initializing local cache for remote data sources. Local cache root path: {}, cache size limit: {}", root_dir_, limit_size_);
root_dir = root_dir_;
limit_size = limit_size_;
local_cache_bytes_read_before_flush = bytes_read_before_flush_;
thread_pool = std::make_shared<FreeThreadPool>(max_threads, 1000, 1000, false);
thread_pool = std::make_shared<FreeThreadPool>(max_threads_, 1000, 1000, false);
// scan local disk dir and recover the cache metas
std::filesystem::path root_dir(local_path_prefix);
if (!std::filesystem::exists(root_dir))
/// create if root_dir not exists
if (!fs::exists(fs::path(root_dir) / ""))
{
LOG_INFO(log, "Path {} not exists. this cache will be disable", local_path_prefix);
return;
std::error_code ec;
bool success = fs::create_directories(fs::path(root_dir) / "", ec);
if (!success)
throw Exception(
ErrorCodes::CANNOT_CREATE_DIRECTORY, "Failed to create directories, error code:{} reason:{}", ec.value(), ec.message());
}
auto recover_task = [this, root_dir]() {
auto callback = [this](RemoteCacheController * cntrl) { total_size += cntrl->size(); };
auto recover_task = [this]() {
auto callback = [this](RemoteCacheController * cache_controller) { total_size += cache_controller->size(); };
std::lock_guard lock(mutex);
// two level dir. /<first 3 chars of path hash code>/<path hash code>
recoverCachedFilesMeta(root_dir, 1, 2, callback);
initialized = true;
LOG_TRACE(log, "Recovered from disk ");
@ -454,12 +454,12 @@ void RemoteReadBufferCache::initOnce(
getThreadPool()->scheduleOrThrow(recover_task);
}
std::filesystem::path RemoteReadBufferCache::calculateLocalPath(const RemoteFileMetadata & meta)
String RemoteReadBufferCache::calculateLocalPath(const RemoteFileMetadata & meta) const
{
std::string full_path = meta.schema + ":" + meta.cluster + ":" + meta.path;
String full_path = meta.schema + ":" + meta.cluster + ":" + meta.path;
UInt128 hashcode = sipHash128(full_path.c_str(), full_path.size());
std::string hashcode_str = getHexUIntLowercase(hashcode);
return std::filesystem::path(local_path_prefix) / hashcode_str.substr(0, 3) / hashcode_str;
String hashcode_str = getHexUIntLowercase(hashcode);
return fs::path(root_dir) / hashcode_str.substr(0, 3) / hashcode_str;
}
std::pair<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError>
@ -527,7 +527,7 @@ RemoteReadBufferCache::createReader(const RemoteFileMetadata & remote_file_meta,
return {nullptr, RemoteReadBufferCacheError::DISK_FULL};
}
std::filesystem::create_directories(local_path);
fs::create_directories(local_path);
auto callback = [this](RemoteCacheController * cntrl) { total_size += cntrl->size(); };
auto cache_controller
@ -579,7 +579,7 @@ bool RemoteReadBufferCache::clearLocalCache()
LOG_TRACE(
log,
"clear local file {} for {}. key size:{}. next{}",
cache_it->second.cache_controller->getLocalPath().string(),
cache_it->second.cache_controller->getLocalPath(),
cache_it->second.cache_controller->getRemotePath(),
keys.size(),
*it);

View File

@ -12,6 +12,8 @@
#include <IO/SeekableReadBuffer.h>
#include <condition_variable>
namespace fs = std::filesystem;
namespace DB
{
enum class RemoteReadBufferCacheError : int8_t
@ -20,16 +22,15 @@ enum class RemoteReadBufferCacheError : int8_t
NOT_INIT = 10,
DISK_FULL = 11,
FILE_INVALID = 12,
END_OF_FILE = 20,
};
struct RemoteFileMetadata
{
RemoteFileMetadata(
const std::string & schema_,
const std::string & cluster_,
const std::string & path_,
const String & schema_,
const String & cluster_,
const String & path_,
UInt64 last_modify_time_,
size_t file_size_)
: schema(schema_)
@ -40,9 +41,9 @@ struct RemoteFileMetadata
{
}
std::string schema; // Hive, S2 etc.
std::string cluster;
std::string path;
String schema; // Hive, S2 etc.
String cluster;
String path;
UInt64 last_modify_time;
size_t file_size;
};
@ -52,7 +53,7 @@ class RemoteCacheController
public:
RemoteCacheController(
const RemoteFileMetadata & meta,
const std::filesystem::path & local_path_,
const String & local_path_,
size_t cache_bytes_before_flush_,
std::shared_ptr<ReadBuffer> readbuffer_,
std::function<void(RemoteCacheController *)> const & finish_callback);
@ -60,14 +61,14 @@ public:
// recover from local disk
static std::shared_ptr<RemoteCacheController>
recover(const std::filesystem::path & local_path, std::function<void(RemoteCacheController *)> const & finish_callback);
recover(const String & local_path, std::function<void(RemoteCacheController *)> const & finish_callback);
/**
* Called by LocalCachedFileReader, must be used in pair
* The second value of the return tuple is the local_path to store file.
* It will be empty if the file has not been downloaded
*/
std::pair<FILE *, std::filesystem::path> allocFile();
std::pair<FILE *, String> allocFile();
void deallocFile(FILE * fs_);
/**
@ -90,8 +91,8 @@ public:
inline size_t size() const { return current_offset; }
inline const std::filesystem::path & getLocalPath() { return local_path; }
inline const std::string & getRemotePath() { return remote_path; }
inline String getLocalPath() const { return local_path; }
inline String getRemotePath() const { return remote_path; }
inline UInt64 getLastModificationTimestamp() const { return last_modify_time; }
inline void markInvalid()
@ -117,10 +118,10 @@ private:
std::set<FILE *> opened_file_streams;
// meta info
std::string schema;
std::string cluster;
std::string remote_path;
std::filesystem::path local_path;
String schema;
String cluster;
String remote_path;
String local_path;
UInt64 last_modify_time;
bool valid;
@ -140,7 +141,7 @@ private:
class LocalCachedFileReader
{
public:
LocalCachedFileReader(RemoteCacheController * cntrl_, size_t size_);
LocalCachedFileReader(RemoteCacheController * cache_controller_, size_t size_);
~LocalCachedFileReader();
// expect to read size bytes into buf, return is the real bytes read
@ -148,15 +149,15 @@ public:
inline off_t getOffset() const { return static_cast<off_t>(offset); }
size_t size();
off_t seek(off_t offset);
inline std::string getPath() { return local_path; }
inline String getPath() const { return local_path; }
private:
std::mutex mutex;
size_t offset;
size_t file_size;
FILE * fs;
std::filesystem::path local_path;
RemoteCacheController * controller;
String local_path;
RemoteCacheController * cache_controller;
Poco::Logger * log = &Poco::Logger::get("RemoteReadBufferCache");
};
@ -170,7 +171,7 @@ class RemoteReadBuffer : public BufferWithOwnMemory<SeekableReadBuffer>
public:
explicit RemoteReadBuffer(size_t buff_size);
~RemoteReadBuffer() override;
static std::unique_ptr<RemoteReadBuffer> create(const RemoteFileMetadata & remote_file_meta_, std::unique_ptr<ReadBuffer> readbuffer);
static std::unique_ptr<RemoteReadBuffer> create(const RemoteFileMetadata & remote_file_meta, std::unique_ptr<ReadBuffer> read_buffer);
bool nextImpl() override;
inline bool seekable() { return file_reader != nullptr && file_reader->size() > 0; }
@ -185,41 +186,47 @@ private:
class RemoteReadBufferCache
{
protected:
RemoteReadBufferCache();
public:
~RemoteReadBufferCache();
// global instance
static RemoteReadBufferCache & instance();
std::shared_ptr<FreeThreadPool> getThreadPool() { return thread_pool; }
void initOnce(const std::filesystem::path & dir, size_t limit_size, size_t bytes_read_before_flush_, size_t max_threads);
void initOnce(const String & root_dir_, size_t limit_size_, size_t bytes_read_before_flush_, size_t max_threads_);
inline bool isInitialized() const { return initialized; }
std::pair<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError>
createReader(const RemoteFileMetadata & remote_file_meta, std::shared_ptr<ReadBuffer> & read_buffer);
void updateTotalSize(size_t size) { total_size += size; }
protected:
RemoteReadBufferCache();
private:
std::string local_path_prefix;
// root directory of local cache for remote filesystem
String root_dir;
size_t limit_size = 0;
size_t local_cache_bytes_read_before_flush = 0;
std::shared_ptr<FreeThreadPool> thread_pool;
std::atomic<bool> initialized = false;
std::mutex mutex;
size_t limit_size = 0;
size_t local_cache_bytes_read_before_flush = 0;
std::atomic<size_t> total_size;
std::mutex mutex;
Poco::Logger * log = &Poco::Logger::get("RemoteReadBufferCache");
struct CacheCell
{
std::list<std::string>::iterator key_iterator;
std::list<String>::iterator key_iterator;
std::shared_ptr<RemoteCacheController> cache_controller;
};
std::list<std::string> keys;
std::map<std::string, CacheCell> caches;
std::list<String> keys;
std::map<String, CacheCell> caches;
std::filesystem::path calculateLocalPath(const RemoteFileMetadata & meta);
String calculateLocalPath(const RemoteFileMetadata & meta) const;
void recoverCachedFilesMeta(
const std::filesystem::path & current_path,

View File

@ -27,7 +27,7 @@ namespace DB
template <class FieldType, class StatisticsType>
Range createRangeFromOrcStatistics(const StatisticsType * stats)
{
/// We must check if there are minimum or maximum values in statistics in case of
/// We must check if there are minimum or maximum values in statistics in case of
/// null values or NaN/Inf values of double type.
if (stats->hasMinimum() && stats->hasMaximum())
{
@ -50,7 +50,7 @@ Range createRangeFromOrcStatistics(const StatisticsType * stats)
template <class FieldType, class StatisticsType>
Range createRangeFromParquetStatistics(std::shared_ptr<StatisticsType> stats)
{
/// We must check if there are minimum or maximum values in statistics in case of
/// We must check if there are minimum or maximum values in statistics in case of
/// null values or NaN/Inf values of double type.
if (!stats->HasMinMax())
return Range();