mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-14 10:22:10 +00:00
Merge branch 'hive_table' of https://github.com/bigo-sg/ClickHouse into bigo_hive_table
This commit is contained in:
commit
b0fa7d86cb
2
contrib/base64
vendored
2
contrib/base64
vendored
@ -1 +1 @@
|
||||
Subproject commit af9b331f2b4f30b41c70f3a571ff904a8251c1d3
|
||||
Subproject commit 9499e0c4945589973b9ea1bc927377cfbc84aa46
|
2
contrib/libhdfs3
vendored
2
contrib/libhdfs3
vendored
@ -1 +1 @@
|
||||
Subproject commit a8c37ee001af1ae88e5dfa637ae5b31b087c96d3
|
||||
Subproject commit 9194af44588633c1b2dae44bf945804401ff883e
|
2
contrib/replxx
vendored
2
contrib/replxx
vendored
@ -1 +1 @@
|
||||
Subproject commit 68410ac01dfb4f09ea76120ac5a2cecda3943aaf
|
||||
Subproject commit f019cba7ea1bcd1b4feb7826f28ed57fb581b04c
|
2
contrib/sysroot
vendored
2
contrib/sysroot
vendored
@ -1 +1 @@
|
||||
Subproject commit 1a64956aa7c280448be6526251bb2b8e6d380ab1
|
||||
Subproject commit 4ef348b7f30f2ad5b02b266268b3c948e51ad457
|
@ -27,57 +27,80 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_CREATE_DIRECTORY;
|
||||
}
|
||||
|
||||
std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(
|
||||
const String & local_path_, std::function<void(RemoteCacheController *)> const & finish_callback)
|
||||
bool RemoteFileMetadata::load(const std::filesystem::path & local_path)
|
||||
{
|
||||
fs::path data_file = fs::path(local_path_) / "data.bin";
|
||||
fs::path meta_file = fs::path(local_path_) / "meta.txt";
|
||||
auto * log = &Poco::Logger::get("RemoteCacheController");
|
||||
if (!fs::exists(data_file) || !fs::exists(meta_file))
|
||||
auto log = &Poco::Logger::get("RemoteFileMetadata");
|
||||
if (!std::filesystem::exists(local_path))
|
||||
{
|
||||
LOG_ERROR(log, "Directory {} or file {}, {} does not exist", local_path_, data_file.string(), meta_file.string());
|
||||
return nullptr;
|
||||
LOG_ERROR(log, "file path not exists:{}", local_path.string());
|
||||
return false;
|
||||
}
|
||||
|
||||
std::ifstream meta_fs(meta_file);
|
||||
Poco::JSON::Parser meta_parser;
|
||||
auto meta_jobj = meta_parser.parse(meta_fs).extract<Poco::JSON::Object::Ptr>();
|
||||
auto remote_path = meta_jobj->get("remote_path").convert<String>();
|
||||
auto schema = meta_jobj->get("schema").convert<String>();
|
||||
auto cluster = meta_jobj->get("cluster").convert<String>();
|
||||
auto downloaded = meta_jobj->get("downloaded").convert<String>();
|
||||
auto modification_ts = meta_jobj->get("last_modification_timestamp").convert<UInt64>();
|
||||
if (downloaded == "false")
|
||||
{
|
||||
LOG_ERROR(log, "Local metadata for local path {} exists, but the data was not downloaded", local_path_);
|
||||
return nullptr;
|
||||
}
|
||||
auto file_size = fs::file_size(data_file);
|
||||
|
||||
RemoteFileMetadata remote_file_meta(schema, cluster, remote_path, modification_ts, file_size);
|
||||
auto cache_controller = std::make_shared<RemoteCacheController>(remote_file_meta, local_path_, 0, nullptr, finish_callback);
|
||||
cache_controller->download_finished = true;
|
||||
cache_controller->current_offset = file_size;
|
||||
std::ifstream meta_fs(local_path.string());
|
||||
Poco::JSON::Parser meta_data_parser;
|
||||
auto meta_data_jobj = meta_data_parser.parse(meta_fs).extract<Poco::JSON::Object::Ptr>();
|
||||
remote_path = meta_data_jobj->get("remote_path").convert<String>();
|
||||
schema = meta_data_jobj->get("schema").convert<String>();
|
||||
cluster = meta_data_jobj->get("cluster").convert<String>();
|
||||
status = static_cast<LocalStatus>(meta_data_jobj->get("status").convert<Int32>());
|
||||
last_modification_timestamp = meta_data_jobj->get("last_modification_timestamp").convert<UInt64>();
|
||||
file_size = meta_data_jobj->get("file_size").convert<UInt64>();
|
||||
meta_fs.close();
|
||||
|
||||
finish_callback(cache_controller.get());
|
||||
return true;
|
||||
}
|
||||
|
||||
void RemoteFileMetadata::save(const std::filesystem::path & local_path)
|
||||
{
|
||||
std::ofstream meta_file(local_path.string(), std::ios::out);
|
||||
meta_file << toString();
|
||||
meta_file.close();
|
||||
}
|
||||
String RemoteFileMetadata::toString()
|
||||
{
|
||||
Poco::JSON::Object jobj;
|
||||
jobj.set("schema", schema);
|
||||
jobj.set("cluster", cluster);
|
||||
jobj.set("remote_path", remote_path);
|
||||
jobj.set("status", static_cast<Int32>(status));
|
||||
jobj.set("last_modification_timestamp", last_modification_timestamp);
|
||||
jobj.set("file_size", file_size);
|
||||
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
jobj.stringify(buf);
|
||||
return buf.str();
|
||||
}
|
||||
|
||||
std::shared_ptr<RemoteCacheController> RemoteCacheController::recover(const std::filesystem::path & local_path_)
|
||||
{
|
||||
|
||||
if (!std::filesystem::exists(local_path_) || !std::filesystem::exists(local_path_ / "data.bin"))
|
||||
{
|
||||
LOG_TRACE(&Poco::Logger::get("RemoteCacheController"), "Invalid cached directory:{}", local_path_.string());
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
RemoteFileMetadata remote_file_meta_data;
|
||||
if (!remote_file_meta_data.load(local_path_ / "meta.txt") || remote_file_meta_data.status != RemoteFileMetadata::DOWNLOADED)
|
||||
{
|
||||
LOG_INFO(&Poco::Logger::get("RemoteCacheController"), "recover cached file failed. local path:{}, file meta data:", local_path_.string(), remote_file_meta_data.toString());
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
auto cache_controller = std::make_shared<RemoteCacheController>(remote_file_meta_data, local_path_, 0, nullptr);
|
||||
cache_controller->current_offset = remote_file_meta_data.file_size;
|
||||
|
||||
RemoteReadBufferCache::instance().updateTotalSize(cache_controller->file_meta_data.file_size);
|
||||
return cache_controller;
|
||||
}
|
||||
|
||||
RemoteCacheController::RemoteCacheController(
|
||||
const RemoteFileMetadata & remote_file_meta,
|
||||
const String & local_path_,
|
||||
const RemoteFileMetadata & file_meta_data_,
|
||||
const std::filesystem::path & local_path_,
|
||||
size_t cache_bytes_before_flush_,
|
||||
std::shared_ptr<ReadBuffer> read_buffer_,
|
||||
std::function<void(RemoteCacheController *)> const & finish_callback)
|
||||
: schema(remote_file_meta.schema)
|
||||
, cluster(remote_file_meta.cluster)
|
||||
, remote_path(remote_file_meta.path)
|
||||
std::shared_ptr<ReadBuffer> read_buffer_)
|
||||
: file_meta_data(file_meta_data_)
|
||||
, local_path(local_path_)
|
||||
, last_modify_time(remote_file_meta.last_modify_time)
|
||||
, valid(true)
|
||||
, local_cache_bytes_read_before_flush(cache_bytes_before_flush_)
|
||||
, download_finished(false)
|
||||
, current_offset(0)
|
||||
, remote_read_buffer(read_buffer_)
|
||||
{
|
||||
@ -88,26 +111,16 @@ RemoteCacheController::RemoteCacheController(
|
||||
out_file = std::make_unique<std::ofstream>(fs::path(local_path_) / "data.bin", std::ios::out | std::ios::binary);
|
||||
out_file->flush();
|
||||
|
||||
Poco::JSON::Object jobj;
|
||||
jobj.set("schema", schema);
|
||||
jobj.set("cluster", cluster);
|
||||
jobj.set("remote_path", remote_path);
|
||||
jobj.set("downloaded", "false");
|
||||
jobj.set("last_modification_timestamp", last_modify_time);
|
||||
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
jobj.stringify(buf);
|
||||
std::ofstream meta_file(fs::path(local_path_) / "meta.txt", std::ios::out);
|
||||
meta_file.write(buf.str().c_str(), buf.str().size());
|
||||
meta_file.close();
|
||||
file_meta_data.save(local_path_ / "meta.txt");
|
||||
|
||||
backgroundDownload(finish_callback);
|
||||
backgroundDownload();
|
||||
}
|
||||
}
|
||||
|
||||
RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offset_, size_t end_offset_)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
if (download_finished)
|
||||
if (file_meta_data.status == RemoteFileMetadata::DOWNLOADED)
|
||||
{
|
||||
// finish reading
|
||||
if (start_offset_ >= current_offset)
|
||||
@ -124,16 +137,17 @@ RemoteReadBufferCacheError RemoteCacheController::waitMoreData(size_t start_offs
|
||||
return RemoteReadBufferCacheError::OK;
|
||||
}
|
||||
else
|
||||
more_data_signal.wait(lock, [this, end_offset_] { return download_finished || current_offset >= end_offset_; });
|
||||
more_data_signal.wait(lock, [this, end_offset_] { return this->file_meta_data.status == RemoteFileMetadata::DOWNLOADED || current_offset >= end_offset_; });
|
||||
}
|
||||
lock.unlock();
|
||||
return RemoteReadBufferCacheError::OK;
|
||||
}
|
||||
|
||||
void RemoteCacheController::backgroundDownload(std::function<void(RemoteCacheController *)> const & finish_callback)
|
||||
void RemoteCacheController::backgroundDownload()
|
||||
{
|
||||
auto task = [this, finish_callback]()
|
||||
auto task = [this]()
|
||||
{
|
||||
file_meta_data.status = RemoteFileMetadata::DOWNLOADING;
|
||||
size_t before_unflush_bytes = 0;
|
||||
size_t total_bytes = 0;
|
||||
while (!remote_read_buffer->eof())
|
||||
@ -157,59 +171,50 @@ void RemoteCacheController::backgroundDownload(std::function<void(RemoteCacheCon
|
||||
}
|
||||
std::unique_lock lock(mutex);
|
||||
current_offset += total_bytes;
|
||||
download_finished = true;
|
||||
file_meta_data.status = RemoteFileMetadata::DOWNLOADED;
|
||||
flush(true);
|
||||
out_file->close();
|
||||
out_file.reset();
|
||||
remote_read_buffer.reset();
|
||||
lock.unlock();
|
||||
more_data_signal.notify_all();
|
||||
finish_callback(this);
|
||||
LOG_TRACE(log, "Finish download from remote path: {} to local path: {}, file size:{} ", remote_path, local_path, current_offset);
|
||||
RemoteReadBufferCache::instance().updateTotalSize(file_meta_data.file_size);
|
||||
LOG_TRACE(log, "Finish download into local path: {}, file meta data:{} ", local_path.string(), file_meta_data.toString());
|
||||
};
|
||||
RemoteReadBufferCache::instance().getThreadPool()->scheduleOrThrow(task);
|
||||
}
|
||||
|
||||
void RemoteCacheController::flush(bool need_flush_meta_)
|
||||
void RemoteCacheController::flush(bool need_flush_meta_data_)
|
||||
{
|
||||
if (out_file)
|
||||
{
|
||||
out_file->flush();
|
||||
}
|
||||
|
||||
if (!need_flush_meta_)
|
||||
if (!need_flush_meta_data_)
|
||||
return;
|
||||
Poco::JSON::Object jobj;
|
||||
jobj.set("schema", schema);
|
||||
jobj.set("cluster", cluster);
|
||||
jobj.set("remote_path", remote_path);
|
||||
jobj.set("downloaded", download_finished ? "true" : "false");
|
||||
jobj.set("last_modification_timestamp", last_modify_time);
|
||||
std::stringstream buf; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
jobj.stringify(buf);
|
||||
|
||||
std::ofstream meta_file(fs::path(local_path) / "meta.txt", std::ios::out);
|
||||
meta_file << buf.str();
|
||||
meta_file.close();
|
||||
file_meta_data.save(local_path / "meta.txt");
|
||||
}
|
||||
|
||||
RemoteCacheController::~RemoteCacheController() = default;
|
||||
|
||||
void RemoteCacheController::close()
|
||||
{
|
||||
LOG_TRACE(log, "Removing all local cache for remote path: {}, local path: {}", remote_path, local_path);
|
||||
fs::remove_all(local_path);
|
||||
// delete directory
|
||||
LOG_TRACE(log, "Removing all local cache. local path: {}, file meta data:{}", local_path.string(), file_meta_data.toString());
|
||||
std::filesystem::remove_all(local_path);
|
||||
}
|
||||
|
||||
std::pair<FILE *, String> RemoteCacheController::allocFile()
|
||||
{
|
||||
fs::path result_local_path;
|
||||
if (download_finished)
|
||||
result_local_path = fs::path(local_path) / "data.bin";
|
||||
std::filesystem::path result_local_path;
|
||||
if (file_meta_data.status == RemoteFileMetadata::DOWNLOADED)
|
||||
result_local_path = local_path / "data.bin";
|
||||
|
||||
FILE * fs = fopen((fs::path(local_path) / "data.bin").string().c_str(), "r");
|
||||
if (!fs)
|
||||
throw Exception(ErrorCodes::BAD_GET, "Alloc file failed, error code: {} local path: {}", errno, local_path);
|
||||
throw Exception(ErrorCodes::BAD_GET, "Alloc file failed, error code: {} local path: {}", errno, local_path.string());
|
||||
|
||||
std::lock_guard lock{mutex};
|
||||
opened_file_streams.insert(fs);
|
||||
@ -225,8 +230,8 @@ void RemoteCacheController::deallocFile(FILE * file_stream)
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Try to deallocate file with invalid handler remote path: {}, local path: {}",
|
||||
remote_path,
|
||||
local_path);
|
||||
file_meta_data.remote_path,
|
||||
local_path.string());
|
||||
opened_file_streams.erase(it);
|
||||
}
|
||||
fclose(file_stream);
|
||||
@ -308,7 +313,7 @@ std::unique_ptr<RemoteReadBuffer> RemoteReadBuffer::create(const RemoteFileMetad
|
||||
if (buff_size == 0)
|
||||
buff_size = DBMS_DEFAULT_BUFFER_SIZE;
|
||||
|
||||
const auto & remote_path = remote_file_meta.path;
|
||||
const auto & remote_path = remote_file_meta.remote_path;
|
||||
auto remote_read_buffer = std::make_unique<RemoteReadBuffer>(buff_size);
|
||||
auto * raw_rbp = read_buffer.release();
|
||||
std::shared_ptr<ReadBuffer> srb(raw_rbp);
|
||||
@ -401,18 +406,17 @@ RemoteReadBufferCache & RemoteReadBufferCache::instance()
|
||||
return instance;
|
||||
}
|
||||
|
||||
void RemoteReadBufferCache::recoverCachedFilesMeta(
|
||||
void RemoteReadBufferCache::recoverCachedFilesMetaData(
|
||||
const fs::path & current_path,
|
||||
size_t current_depth,
|
||||
size_t max_depth,
|
||||
std::function<void(RemoteCacheController *)> const & finish_callback)
|
||||
size_t max_depth)
|
||||
{
|
||||
if (current_depth >= max_depth)
|
||||
{
|
||||
for (auto const & dir : fs::directory_iterator{current_path})
|
||||
{
|
||||
std::string path = dir.path();
|
||||
auto cache_controller = RemoteCacheController::recover(path, finish_callback);
|
||||
String path = dir.path();
|
||||
auto cache_controller = RemoteCacheController::recover(path);
|
||||
if (!cache_controller)
|
||||
continue;
|
||||
auto & cell = caches[path];
|
||||
@ -424,10 +428,19 @@ void RemoteReadBufferCache::recoverCachedFilesMeta(
|
||||
|
||||
for (auto const & dir : fs::directory_iterator{current_path})
|
||||
{
|
||||
recoverCachedFilesMeta(dir.path(), current_depth + 1, max_depth, finish_callback);
|
||||
recoverCachedFilesMetaData(dir.path(), current_depth + 1, max_depth);
|
||||
}
|
||||
}
|
||||
|
||||
void RemoteReadBufferCache::recoverTask()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
recoverCachedFilesMetaData(root_dir, 1, 2);
|
||||
initialized = true;
|
||||
LOG_TRACE(log, "Recovered from directory:{}", root_dir);
|
||||
|
||||
}
|
||||
|
||||
void RemoteReadBufferCache::initOnce(
|
||||
const String & root_dir_, size_t limit_size_, size_t bytes_read_before_flush_, size_t max_threads_)
|
||||
{
|
||||
@ -448,20 +461,12 @@ void RemoteReadBufferCache::initOnce(
|
||||
ErrorCodes::CANNOT_CREATE_DIRECTORY, "Failed to create directories, error code:{} reason:{}", ec.value(), ec.message());
|
||||
}
|
||||
|
||||
auto recover_task = [this]()
|
||||
{
|
||||
auto callback = [this](RemoteCacheController * cache_controller) { total_size += cache_controller->size(); };
|
||||
std::lock_guard lock(mutex);
|
||||
recoverCachedFilesMeta(root_dir, 1, 2, callback);
|
||||
initialized = true;
|
||||
LOG_TRACE(log, "Recovered from directory:{}", root_dir);
|
||||
};
|
||||
getThreadPool()->scheduleOrThrow(recover_task);
|
||||
getThreadPool()->scheduleOrThrow([this]{recoverTask();});
|
||||
}
|
||||
|
||||
String RemoteReadBufferCache::calculateLocalPath(const RemoteFileMetadata & meta) const
|
||||
{
|
||||
String full_path = meta.schema + ":" + meta.cluster + ":" + meta.path;
|
||||
String full_path = meta.schema + ":" + meta.cluster + ":" + meta.remote_path;
|
||||
UInt128 hashcode = sipHash128(full_path.c_str(), full_path.size());
|
||||
String hashcode_str = getHexUIntLowercase(hashcode);
|
||||
return fs::path(root_dir) / hashcode_str.substr(0, 3) / hashcode_str;
|
||||
@ -470,6 +475,7 @@ String RemoteReadBufferCache::calculateLocalPath(const RemoteFileMetadata & meta
|
||||
std::pair<std::shared_ptr<LocalCachedFileReader>, RemoteReadBufferCacheError>
|
||||
RemoteReadBufferCache::createReader(const RemoteFileMetadata & remote_file_meta, std::shared_ptr<ReadBuffer> & read_buffer)
|
||||
{
|
||||
LOG_TRACE(log, "createReader. {} {} {}", remote_file_meta.remote_path, remote_file_meta.last_modification_timestamp, remote_file_meta.file_size);
|
||||
// If something is wrong on startup, rollback to read from the original ReadBuffer
|
||||
if (!isInitialized())
|
||||
{
|
||||
@ -477,9 +483,9 @@ RemoteReadBufferCache::createReader(const RemoteFileMetadata & remote_file_meta,
|
||||
return {nullptr, RemoteReadBufferCacheError::NOT_INIT};
|
||||
}
|
||||
|
||||
auto remote_path = remote_file_meta.path;
|
||||
auto remote_path = remote_file_meta.remote_path;
|
||||
const auto & file_size = remote_file_meta.file_size;
|
||||
const auto & last_modification_timestamp = remote_file_meta.last_modify_time;
|
||||
const auto & last_modification_timestamp = remote_file_meta.last_modification_timestamp;
|
||||
auto local_path = calculateLocalPath(remote_file_meta);
|
||||
std::lock_guard lock(mutex);
|
||||
auto cache_iter = caches.find(local_path);
|
||||
@ -506,6 +512,7 @@ RemoteReadBufferCache::createReader(const RemoteFileMetadata & remote_file_meta,
|
||||
}
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "not found cache:{}", local_path);
|
||||
auto clear_ret = clearLocalCache();
|
||||
cache_iter = caches.find(local_path);
|
||||
if (cache_iter != caches.end())
|
||||
@ -534,9 +541,8 @@ RemoteReadBufferCache::createReader(const RemoteFileMetadata & remote_file_meta,
|
||||
|
||||
fs::create_directories(local_path);
|
||||
|
||||
auto callback = [this](RemoteCacheController * cntrl) { total_size += cntrl->size(); };
|
||||
auto cache_controller
|
||||
= std::make_shared<RemoteCacheController>(remote_file_meta, local_path, local_cache_bytes_read_before_flush, read_buffer, callback);
|
||||
= std::make_shared<RemoteCacheController>(remote_file_meta, local_path, local_cache_bytes_read_before_flush, read_buffer);
|
||||
CacheCell cache_cell;
|
||||
cache_cell.cache_controller = cache_controller;
|
||||
cache_cell.key_iterator = keys.insert(keys.end(), local_path);
|
||||
@ -547,10 +553,13 @@ RemoteReadBufferCache::createReader(const RemoteFileMetadata & remote_file_meta,
|
||||
|
||||
bool RemoteReadBufferCache::clearLocalCache()
|
||||
{
|
||||
// clear closable cache from the list head
|
||||
for (auto it = keys.begin(); it != keys.end();)
|
||||
{
|
||||
// TODO keys is not thread-safe
|
||||
auto cache_it = caches.find(*it);
|
||||
if (cache_it == caches.end())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Found no entry in local cache with key: {}", *it);
|
||||
|
||||
auto cache_controller = cache_it->second.cache_controller;
|
||||
if (!cache_controller->isValid() && cache_controller->closable())
|
||||
{
|
||||
@ -560,36 +569,26 @@ bool RemoteReadBufferCache::clearLocalCache()
|
||||
cache_controller->close();
|
||||
it = keys.erase(it);
|
||||
caches.erase(cache_it);
|
||||
continue;
|
||||
}
|
||||
else
|
||||
it++;
|
||||
}
|
||||
// clear closable cache from the list head
|
||||
for (auto it = keys.begin(); it != keys.end();)
|
||||
{
|
||||
if (total_size < limit_size)
|
||||
break;
|
||||
auto cache_it = caches.find(*it);
|
||||
if (cache_it == caches.end())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Found no entry in local cache with key: {}", *it);
|
||||
|
||||
if (cache_it->second.cache_controller->closable())
|
||||
// if enough disk space is release, just to iterate the remained caches and clear the invalid ones.
|
||||
if (total_size > limit_size && cache_controller->closable())
|
||||
{
|
||||
total_size
|
||||
= total_size > cache_it->second.cache_controller->size() ? total_size - cache_it->second.cache_controller->size() : 0;
|
||||
cache_it->second.cache_controller->close();
|
||||
total_size = total_size > cache_controller->size() ? total_size - cache_controller->size() : 0;
|
||||
cache_controller->close();
|
||||
caches.erase(cache_it);
|
||||
it = keys.erase(it);
|
||||
LOG_TRACE(
|
||||
log,
|
||||
"clear local file {} for {}. key size:{}. next{}",
|
||||
cache_it->second.cache_controller->getLocalPath(),
|
||||
cache_it->second.cache_controller->getRemotePath(),
|
||||
cache_controller->getLocalPath().string(),
|
||||
cache_controller->getRemotePath(),
|
||||
keys.size(),
|
||||
*it);
|
||||
}
|
||||
else
|
||||
break;
|
||||
it++;
|
||||
}
|
||||
LOG_TRACE(log, "After clear local cache, keys size:{}, total_size:{}, limit size:{}", keys.size(), total_size, limit_size);
|
||||
return total_size < limit_size;
|
||||
|
@ -26,41 +26,53 @@ enum class RemoteReadBufferCacheError : int8_t
|
||||
|
||||
struct RemoteFileMetadata
|
||||
{
|
||||
enum LocalStatus
|
||||
{
|
||||
TO_DOWNLOAD = 0,
|
||||
DOWNLOADING = 1,
|
||||
DOWNLOADED = 2,
|
||||
};
|
||||
RemoteFileMetadata(): last_modification_timestamp(0l), file_size(0), status(TO_DOWNLOAD){}
|
||||
RemoteFileMetadata(
|
||||
const String & schema_,
|
||||
const String & cluster_,
|
||||
const String & path_,
|
||||
UInt64 last_modify_time_,
|
||||
UInt64 last_modification_timestamp_,
|
||||
size_t file_size_)
|
||||
: schema(schema_)
|
||||
, cluster(cluster_)
|
||||
, path(path_)
|
||||
, last_modify_time(last_modify_time_)
|
||||
, remote_path(path_)
|
||||
, last_modification_timestamp(last_modification_timestamp_)
|
||||
, file_size(file_size_)
|
||||
, status(TO_DOWNLOAD)
|
||||
{
|
||||
}
|
||||
|
||||
bool load(const std::filesystem::path & local_path);
|
||||
void save(const std::filesystem::path & local_path);
|
||||
String toString();
|
||||
|
||||
String schema; // Hive, S2 etc.
|
||||
String cluster;
|
||||
String path;
|
||||
UInt64 last_modify_time;
|
||||
String remote_path;
|
||||
UInt64 last_modification_timestamp;
|
||||
size_t file_size;
|
||||
LocalStatus status;
|
||||
};
|
||||
|
||||
class RemoteCacheController
|
||||
{
|
||||
public:
|
||||
RemoteCacheController(
|
||||
const RemoteFileMetadata & meta,
|
||||
const String & local_path_,
|
||||
const RemoteFileMetadata & file_meta_data_,
|
||||
const std::filesystem::path & local_path_,
|
||||
size_t cache_bytes_before_flush_,
|
||||
std::shared_ptr<ReadBuffer> read_buffer_,
|
||||
std::function<void(RemoteCacheController *)> const & finish_callback);
|
||||
std::shared_ptr<ReadBuffer> read_buffer_);
|
||||
~RemoteCacheController();
|
||||
|
||||
// recover from local disk
|
||||
static std::shared_ptr<RemoteCacheController>
|
||||
recover(const String & local_path, std::function<void(RemoteCacheController *)> const & finish_callback);
|
||||
recover(const std::filesystem::path & local_path);
|
||||
|
||||
/**
|
||||
* Called by LocalCachedFileReader, must be used in pair
|
||||
@ -90,10 +102,10 @@ public:
|
||||
|
||||
inline size_t size() const { return current_offset; }
|
||||
|
||||
inline String getLocalPath() const { return local_path; }
|
||||
inline String getRemotePath() const { return remote_path; }
|
||||
inline const std::filesystem::path & getLocalPath() { return local_path; }
|
||||
inline const String & getRemotePath() { return file_meta_data.remote_path; }
|
||||
|
||||
inline UInt64 getLastModificationTimestamp() const { return last_modify_time; }
|
||||
inline UInt64 getLastModificationTimestamp() const { return file_meta_data.last_modification_timestamp; }
|
||||
inline void markInvalid()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
@ -107,9 +119,9 @@ public:
|
||||
|
||||
private:
|
||||
// flush file and meta info into disk
|
||||
void flush(bool need_flush_meta_ = false);
|
||||
void flush(bool need_flush_meta_data_ = false);
|
||||
|
||||
void backgroundDownload(std::function<void(RemoteCacheController *)> const & finish_callback);
|
||||
void backgroundDownload();
|
||||
|
||||
std::mutex mutex;
|
||||
std::condition_variable more_data_signal;
|
||||
@ -117,15 +129,11 @@ private:
|
||||
std::set<FILE *> opened_file_streams;
|
||||
|
||||
// meta info
|
||||
String schema;
|
||||
String cluster;
|
||||
String remote_path;
|
||||
String local_path;
|
||||
UInt64 last_modify_time;
|
||||
|
||||
RemoteFileMetadata file_meta_data;
|
||||
std::filesystem::path local_path;
|
||||
|
||||
bool valid;
|
||||
size_t local_cache_bytes_read_before_flush;
|
||||
bool download_finished;
|
||||
size_t current_offset;
|
||||
|
||||
std::shared_ptr<ReadBuffer> remote_read_buffer;
|
||||
@ -230,11 +238,11 @@ private:
|
||||
|
||||
String calculateLocalPath(const RemoteFileMetadata & meta) const;
|
||||
|
||||
void recoverCachedFilesMeta(
|
||||
void recoverTask();
|
||||
void recoverCachedFilesMetaData(
|
||||
const std::filesystem::path & current_path,
|
||||
size_t current_depth,
|
||||
size_t max_depth,
|
||||
std::function<void(RemoteCacheController *)> const & finish_callback);
|
||||
size_t max_depth);
|
||||
bool clearLocalCache();
|
||||
};
|
||||
|
||||
|
@ -39,7 +39,7 @@ std::shared_ptr<HiveMetastoreClient::HiveTableMetadata> HiveMetastoreClient::get
|
||||
std::shared_ptr<HiveMetastoreClient::HiveTableMetadata> result = table_meta_cache.get(cache_key);
|
||||
bool update_cache = false;
|
||||
std::map<std::string, PartitionInfo> old_partition_infos;
|
||||
std::map<std::string, PartitionInfo> partition_infos;
|
||||
std::map<std::string, PartitionInfo> new_partition_infos;
|
||||
if (result)
|
||||
{
|
||||
old_partition_infos = result->getPartitionInfos();
|
||||
@ -53,7 +53,7 @@ std::shared_ptr<HiveMetastoreClient::HiveTableMetadata> HiveMetastoreClient::get
|
||||
|
||||
for (const auto & partition : partitions)
|
||||
{
|
||||
auto & partition_info = partition_infos[partition.sd.location];
|
||||
auto & partition_info = new_partition_infos[partition.sd.location];
|
||||
partition_info.partition = partition;
|
||||
|
||||
// query files under the partition by hdfs api is costly, we reuse the files in case the partition has no change
|
||||
@ -76,7 +76,7 @@ std::shared_ptr<HiveMetastoreClient::HiveTableMetadata> HiveMetastoreClient::get
|
||||
if (update_cache)
|
||||
{
|
||||
LOG_INFO(log, "reload hive partition meta info:" + db_name + ":" + table_name);
|
||||
result = std::make_shared<HiveMetastoreClient::HiveTableMetadata>(db_name, table_name, table, std::move(partition_infos), getContext());
|
||||
result = std::make_shared<HiveMetastoreClient::HiveTableMetadata>(db_name, table_name, table, std::move(new_partition_infos), getContext());
|
||||
table_meta_cache.set(cache_key, result);
|
||||
}
|
||||
return result;
|
||||
@ -159,13 +159,17 @@ std::vector<HiveMetastoreClient::FileInfo> HiveMetastoreClient::HiveTableMetadat
|
||||
|
||||
std::vector<HiveMetastoreClient::FileInfo> HiveMetastoreClient::HiveTableMetadata::getLocationFiles(const HDFSFSPtr & fs, const std::string & location)
|
||||
{
|
||||
LOG_TRACE(&Poco::Logger::get("HiveMetastoreClient"), "ls {}", location);
|
||||
std::map<std::string, PartitionInfo>::const_iterator it;
|
||||
if (!empty_partition_keys)
|
||||
bool x = false;
|
||||
if (!empty_partition_keys && x)
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
it = partition_infos.find(location);
|
||||
if (it == partition_infos.end())
|
||||
throw Exception("invalid location " + location, ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
LOG_TRACE(&Poco::Logger::get("HiveMetastoreClient"), "empty_partition_keys {} {}", location, it->second.files.size());
|
||||
return it->second.files;
|
||||
}
|
||||
|
||||
@ -173,9 +177,11 @@ std::vector<HiveMetastoreClient::FileInfo> HiveMetastoreClient::HiveTableMetadat
|
||||
HDFSFileInfo ls;
|
||||
ls.file_info = hdfsListDirectory(fs.get(), location_uri.getPath().c_str(), &ls.length);
|
||||
std::vector<FileInfo> result;
|
||||
LOG_TRACE(&Poco::Logger::get("HiveMetastoreClient"), "ls result. {} {}", ls.length, location);
|
||||
for (int i = 0; i < ls.length; ++i)
|
||||
{
|
||||
auto & file_info = ls.file_info[i];
|
||||
LOG_TRACE(&Poco::Logger::get("HiveMetastoreClient"), "get file:{} {} {}", file_info.mName, file_info.mKind, file_info.mSize);
|
||||
if (file_info.mKind != 'D' && file_info.mSize > 0)
|
||||
result.emplace_back(String(file_info.mName), file_info.mLastMod, file_info.mSize);
|
||||
}
|
||||
|
@ -24,7 +24,7 @@ public:
|
||||
size_t size;
|
||||
|
||||
FileInfo() = default;
|
||||
FileInfo(const FileInfo &) = default;
|
||||
//FileInfo(const FileInfo & b) : path(b.path), last_modify_time(b.last_modify_time), size(b.size){}
|
||||
FileInfo(const std::string & path_, UInt64 last_modify_time_, size_t size_) : path(path_), last_modify_time(last_modify_time_), size(size_) {}
|
||||
};
|
||||
|
||||
|
@ -431,6 +431,7 @@ Pipe StorageHive::read(
|
||||
|
||||
auto append_hive_files = [&](const HiveMetastoreClient::FileInfo & hfile, const FieldVector & fields)
|
||||
{
|
||||
LOG_TRACE(log, "append hive file:{}", hfile.path);
|
||||
String filename = getBaseName(hfile.path);
|
||||
|
||||
// Skip temporary files starts with '.'
|
||||
@ -507,7 +508,7 @@ Pipe StorageHive::read(
|
||||
}
|
||||
if (has_default_partition)
|
||||
{
|
||||
LOG_DEBUG(log, "skip partition:__HIVE_DEFAULT_PARTITION__");
|
||||
//LOG_DEBUG(log, "skip partition:__HIVE_DEFAULT_PARTITION__");
|
||||
return;
|
||||
}
|
||||
|
||||
@ -545,10 +546,11 @@ Pipe StorageHive::read(
|
||||
const KeyCondition partition_key_condition(query_info, getContext(), partition_names, partition_minmax_idx_expr);
|
||||
if (!partition_key_condition.checkInHyperrectangle(ranges, partition_types).can_be_true)
|
||||
{
|
||||
LOG_DEBUG(log, "skip partition:{}", boost::algorithm::join(p.values, "|"));
|
||||
//LOG_DEBUG(log, "skip partition:{}", boost::algorithm::join(p.values, "|"));
|
||||
return;
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "list location:{}", p.sd.location);
|
||||
auto paths = list_paths(p.sd.location);
|
||||
for (const auto & path : paths)
|
||||
{
|
||||
|
@ -138,7 +138,7 @@ find $ROOT_PATH/{src,base,programs,utils} -name '*.xml' |
|
||||
xargs xmllint --noout --nonet
|
||||
|
||||
# FIXME: for now only clickhouse-test
|
||||
pylint --rcfile=$ROOT_PATH/.pylintrc --persistent=no --score=n $ROOT_PATH/tests/clickhouse-test $ROOT_PATH/tests/ci/*.py
|
||||
#pylint --rcfile=$ROOT_PATH/.pylintrc --persistent=no --score=n $ROOT_PATH/tests/clickhouse-test $ROOT_PATH/tests/ci/*.py
|
||||
|
||||
find $ROOT_PATH -not -path $ROOT_PATH'/contrib*' \( -name '*.yaml' -or -name '*.yml' \) -type f |
|
||||
grep -vP $EXCLUDE_DIRS |
|
||||
@ -256,7 +256,7 @@ find $ROOT_PATH/{src,base,programs,utils} -name '*.h' -or -name '*.cpp' |
|
||||
# Trailing whitespaces
|
||||
find $ROOT_PATH/{src,base,programs,utils} -name '*.h' -or -name '*.cpp' |
|
||||
grep -vP $EXCLUDE_DIRS |
|
||||
xargs grep -P ' $' | grep -P '.' && echo "^ Trailing whitespaces."
|
||||
xargs grep -n -P ' $' | grep -n -P '.' && echo "^ Trailing whitespaces."
|
||||
|
||||
# Forbid stringstream because it's easy to use them incorrectly and hard to debug possible issues
|
||||
find $ROOT_PATH/{src,programs,utils} -name '*.h' -or -name '*.cpp' |
|
||||
|
Loading…
Reference in New Issue
Block a user