2022-01-04 07:16:24 +00:00
|
|
|
#include <functional>
|
2021-12-23 03:50:26 +00:00
|
|
|
#include <memory>
|
|
|
|
#include <unistd.h>
|
|
|
|
#include <Core/BackgroundSchedulePool.h>
|
2022-01-04 07:16:24 +00:00
|
|
|
#include <IO/WriteHelpers.h>
|
|
|
|
#include <Storages/Cache/ExternalDataSourceCache.h>
|
|
|
|
#include <Storages/Cache/RemoteFileMetadataFactory.h>
|
|
|
|
#include <base/errnoToString.h>
|
2021-12-23 03:50:26 +00:00
|
|
|
#include <base/logger_useful.h>
|
|
|
|
#include <base/sleep.h>
|
2022-01-04 07:16:24 +00:00
|
|
|
#include <Poco/Logger.h>
|
2021-12-27 07:31:04 +00:00
|
|
|
#include <Common/ErrorCodes.h>
|
2022-01-04 07:16:24 +00:00
|
|
|
#include <Common/Exception.h>
|
2021-12-23 03:50:26 +00:00
|
|
|
#include <Common/ProfileEvents.h>
|
|
|
|
#include <Common/SipHash.h>
|
|
|
|
#include <Common/hex.h>
|
|
|
|
|
|
|
|
namespace ProfileEvents
|
|
|
|
{
|
2022-01-04 07:16:24 +00:00
|
|
|
extern const Event ExternalDataSourceLocalCacheReadBytes;
|
2021-12-23 03:50:26 +00:00
|
|
|
}
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
namespace fs = std::filesystem;
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
}
|
|
|
|
|
2022-03-01 07:22:07 +00:00
|
|
|
LocalFileHolder::LocalFileHolder(RemoteFileCacheType::MappedHolderPtr cache_controller)
|
|
|
|
: file_cache_controller(std::move(cache_controller)), file_buffer(nullptr), original_readbuffer(nullptr), thread_pool(nullptr)
|
2021-12-23 03:50:26 +00:00
|
|
|
{
|
2022-01-04 07:16:24 +00:00
|
|
|
file_buffer = file_cache_controller->value().allocFile();
|
2021-12-28 08:57:07 +00:00
|
|
|
if (!file_buffer)
|
2022-01-04 07:16:24 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::LOGICAL_ERROR, "Create file readbuffer failed. {}", file_cache_controller->value().getLocalPath().string());
|
2021-12-23 03:50:26 +00:00
|
|
|
}
|
|
|
|
|
2022-03-01 07:22:07 +00:00
|
|
|
LocalFileHolder::LocalFileHolder(
|
|
|
|
RemoteFileCacheType::MappedHolderPtr cache_controller,
|
|
|
|
std::unique_ptr<ReadBuffer> original_readbuffer_,
|
|
|
|
BackgroundSchedulePool * thread_pool_)
|
|
|
|
: file_cache_controller(std::move(cache_controller))
|
|
|
|
, file_buffer(nullptr)
|
|
|
|
, original_readbuffer(std::move(original_readbuffer_))
|
|
|
|
, thread_pool(thread_pool_)
|
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
LocalFileHolder::~LocalFileHolder()
|
|
|
|
{
|
|
|
|
if (original_readbuffer)
|
|
|
|
{
|
|
|
|
dynamic_cast<SeekableReadBuffer *>(original_readbuffer.get())->seek(0, SEEK_SET);
|
|
|
|
file_cache_controller->value().startBackgroundDownload(std::move(original_readbuffer), *thread_pool);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-28 08:57:07 +00:00
|
|
|
RemoteReadBuffer::RemoteReadBuffer(size_t buff_size) : BufferWithOwnMemory<SeekableReadBufferWithSize>(buff_size)
|
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2022-01-04 07:16:24 +00:00
|
|
|
std::unique_ptr<ReadBuffer> RemoteReadBuffer::create(
|
2022-03-01 07:22:07 +00:00
|
|
|
ContextPtr context,
|
|
|
|
IRemoteFileMetadataPtr remote_file_metadata,
|
|
|
|
std::unique_ptr<ReadBuffer> read_buffer,
|
|
|
|
size_t buff_size,
|
|
|
|
bool is_random_accessed)
|
|
|
|
|
2021-12-23 03:50:26 +00:00
|
|
|
{
|
|
|
|
auto remote_path = remote_file_metadata->remote_path;
|
|
|
|
auto remote_read_buffer = std::make_unique<RemoteReadBuffer>(buff_size);
|
|
|
|
|
2022-01-04 07:16:24 +00:00
|
|
|
std::tie(remote_read_buffer->local_file_holder, read_buffer)
|
2022-03-01 07:22:07 +00:00
|
|
|
= ExternalDataSourceCache::instance().createReader(context, remote_file_metadata, read_buffer, is_random_accessed);
|
2021-12-28 08:57:07 +00:00
|
|
|
if (remote_read_buffer->local_file_holder == nullptr)
|
2021-12-23 03:50:26 +00:00
|
|
|
return read_buffer;
|
|
|
|
remote_read_buffer->remote_file_size = remote_file_metadata->file_size;
|
|
|
|
return remote_read_buffer;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool RemoteReadBuffer::nextImpl()
|
|
|
|
{
|
2022-03-01 07:22:07 +00:00
|
|
|
if (local_file_holder->original_readbuffer)
|
|
|
|
{
|
|
|
|
auto status = local_file_holder->original_readbuffer->next();
|
|
|
|
if (status)
|
|
|
|
{
|
|
|
|
BufferBase::set(
|
|
|
|
local_file_holder->original_readbuffer->buffer().begin(),
|
|
|
|
local_file_holder->original_readbuffer->buffer().size(),
|
|
|
|
local_file_holder->original_readbuffer->offset());
|
|
|
|
}
|
|
|
|
return status;
|
|
|
|
}
|
|
|
|
|
2021-12-28 08:57:07 +00:00
|
|
|
auto start_offset = local_file_holder->file_buffer->getPosition();
|
|
|
|
auto end_offset = start_offset + local_file_holder->file_buffer->internalBuffer().size();
|
2022-01-04 07:16:24 +00:00
|
|
|
local_file_holder->file_cache_controller->value().waitMoreData(start_offset, end_offset);
|
2021-12-23 03:50:26 +00:00
|
|
|
|
2021-12-28 08:57:07 +00:00
|
|
|
auto status = local_file_holder->file_buffer->next();
|
2021-12-23 03:50:26 +00:00
|
|
|
if (status)
|
|
|
|
{
|
2022-01-04 07:16:24 +00:00
|
|
|
BufferBase::set(
|
|
|
|
local_file_holder->file_buffer->buffer().begin(),
|
|
|
|
local_file_holder->file_buffer->buffer().size(),
|
|
|
|
local_file_holder->file_buffer->offset());
|
2021-12-28 08:57:07 +00:00
|
|
|
ProfileEvents::increment(ProfileEvents::ExternalDataSourceLocalCacheReadBytes, local_file_holder->file_buffer->available());
|
2021-12-23 03:50:26 +00:00
|
|
|
}
|
|
|
|
return status;
|
|
|
|
}
|
|
|
|
|
|
|
|
off_t RemoteReadBuffer::seek(off_t offset, int whence)
|
|
|
|
{
|
2022-03-01 07:22:07 +00:00
|
|
|
if (local_file_holder->original_readbuffer)
|
|
|
|
{
|
|
|
|
auto ret = dynamic_cast<SeekableReadBuffer *>(local_file_holder->original_readbuffer.get())->seek(offset, whence);
|
|
|
|
BufferBase::set(
|
|
|
|
local_file_holder->original_readbuffer->buffer().begin(),
|
|
|
|
local_file_holder->original_readbuffer->buffer().size(),
|
|
|
|
local_file_holder->original_readbuffer->offset());
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
|
2021-12-28 08:57:07 +00:00
|
|
|
if (!local_file_holder->file_buffer)
|
2021-12-23 03:50:26 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot call seek() in this buffer. It's a bug!");
|
|
|
|
/*
|
|
|
|
* Need to wait here. For example, the current file has been download at position X, but here we try to seek to
|
2022-01-04 06:30:01 +00:00
|
|
|
* position Y (Y > X), it would fail.
|
2021-12-23 03:50:26 +00:00
|
|
|
*/
|
2021-12-28 08:57:07 +00:00
|
|
|
auto & file_buffer = local_file_holder->file_buffer;
|
2022-01-04 07:16:24 +00:00
|
|
|
local_file_holder->file_cache_controller->value().waitMoreData(offset, offset + file_buffer->internalBuffer().size());
|
2021-12-23 03:50:26 +00:00
|
|
|
auto ret = file_buffer->seek(offset, whence);
|
2022-01-04 07:16:24 +00:00
|
|
|
BufferBase::set(file_buffer->buffer().begin(), file_buffer->buffer().size(), file_buffer->offset());
|
2021-12-23 03:50:26 +00:00
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
|
|
|
|
off_t RemoteReadBuffer::getPosition()
|
|
|
|
{
|
2022-03-01 07:22:07 +00:00
|
|
|
if (local_file_holder->original_readbuffer)
|
|
|
|
{
|
|
|
|
return dynamic_cast<SeekableReadBuffer *>(local_file_holder->original_readbuffer.get())->getPosition();
|
|
|
|
}
|
2021-12-28 08:57:07 +00:00
|
|
|
return local_file_holder->file_buffer->getPosition();
|
2021-12-23 03:50:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
ExternalDataSourceCache::ExternalDataSourceCache() = default;
|
|
|
|
|
2021-12-27 07:04:26 +00:00
|
|
|
ExternalDataSourceCache::~ExternalDataSourceCache()
|
|
|
|
{
|
|
|
|
recover_task_holder->deactivate();
|
|
|
|
}
|
2021-12-23 03:50:26 +00:00
|
|
|
|
|
|
|
ExternalDataSourceCache & ExternalDataSourceCache::instance()
|
|
|
|
{
|
|
|
|
static ExternalDataSourceCache instance;
|
|
|
|
return instance;
|
|
|
|
}
|
|
|
|
|
2021-12-28 03:26:39 +00:00
|
|
|
void ExternalDataSourceCache::recoverTask()
|
2021-12-23 03:50:26 +00:00
|
|
|
{
|
2021-12-28 03:26:39 +00:00
|
|
|
std::vector<fs::path> invalid_paths;
|
|
|
|
for (auto const & group_dir : fs::directory_iterator{root_dir})
|
2021-12-23 03:50:26 +00:00
|
|
|
{
|
2021-12-28 03:26:39 +00:00
|
|
|
for (auto const & cache_dir : fs::directory_iterator{group_dir.path()})
|
2021-12-23 03:50:26 +00:00
|
|
|
{
|
2021-12-28 03:26:39 +00:00
|
|
|
String path = cache_dir.path();
|
2021-12-23 03:50:26 +00:00
|
|
|
auto cache_controller = RemoteCacheController::recover(path);
|
|
|
|
if (!cache_controller)
|
|
|
|
{
|
|
|
|
invalid_paths.emplace_back(path);
|
|
|
|
continue;
|
|
|
|
}
|
2022-01-04 07:16:24 +00:00
|
|
|
auto cache_load_func = [&] { return cache_controller; };
|
|
|
|
if (!lru_caches->getOrSet(path, cache_load_func))
|
2021-12-23 03:50:26 +00:00
|
|
|
{
|
|
|
|
invalid_paths.emplace_back(path);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-12-28 03:26:39 +00:00
|
|
|
for (auto & path : invalid_paths)
|
|
|
|
fs::remove_all(path);
|
2021-12-23 03:50:26 +00:00
|
|
|
initialized = true;
|
|
|
|
LOG_INFO(log, "Recovered from directory:{}", root_dir);
|
|
|
|
}
|
|
|
|
|
2022-01-04 07:16:24 +00:00
|
|
|
void ExternalDataSourceCache::initOnce(ContextPtr context, const String & root_dir_, size_t limit_size_, size_t bytes_read_before_flush_)
|
2021-12-23 03:50:26 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock(mutex);
|
|
|
|
if (isInitialized())
|
|
|
|
{
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot initialize ExternalDataSourceCache twice");
|
|
|
|
}
|
|
|
|
LOG_INFO(
|
|
|
|
log, "Initializing local cache for remote data sources. Local cache root path: {}, cache size limit: {}", root_dir_, limit_size_);
|
|
|
|
root_dir = root_dir_;
|
|
|
|
local_cache_bytes_read_before_flush = bytes_read_before_flush_;
|
2022-01-04 07:16:24 +00:00
|
|
|
lru_caches = std::make_unique<RemoteFileCacheType>(limit_size_);
|
2021-12-23 03:50:26 +00:00
|
|
|
|
2022-02-28 00:15:37 +00:00
|
|
|
/// Create if root_dir not exists.
|
2021-12-23 03:50:26 +00:00
|
|
|
if (!fs::exists(fs::path(root_dir)))
|
|
|
|
{
|
|
|
|
fs::create_directories(fs::path(root_dir));
|
|
|
|
}
|
|
|
|
|
2022-01-04 07:16:24 +00:00
|
|
|
recover_task_holder = context->getSchedulePool().createTask("recover local cache metadata for remote files", [this] { recoverTask(); });
|
2021-12-23 03:50:26 +00:00
|
|
|
recover_task_holder->activateAndSchedule();
|
|
|
|
}
|
|
|
|
|
|
|
|
String ExternalDataSourceCache::calculateLocalPath(IRemoteFileMetadataPtr metadata) const
|
|
|
|
{
|
2022-02-28 00:15:37 +00:00
|
|
|
// Add version into the full_path, and not block to read the new version.
|
2022-01-04 07:16:24 +00:00
|
|
|
String full_path = metadata->getName() + ":" + metadata->remote_path + ":" + metadata->getVersion();
|
2021-12-23 03:50:26 +00:00
|
|
|
UInt128 hashcode = sipHash128(full_path.c_str(), full_path.size());
|
|
|
|
String hashcode_str = getHexUIntLowercase(hashcode);
|
|
|
|
return fs::path(root_dir) / hashcode_str.substr(0, 3) / hashcode_str;
|
|
|
|
}
|
|
|
|
|
2022-01-04 07:16:24 +00:00
|
|
|
std::pair<std::unique_ptr<LocalFileHolder>, std::unique_ptr<ReadBuffer>> ExternalDataSourceCache::createReader(
|
2022-03-01 07:22:07 +00:00
|
|
|
ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> & read_buffer, bool is_random_accessed)
|
2021-12-23 03:50:26 +00:00
|
|
|
{
|
2022-02-28 00:15:37 +00:00
|
|
|
// If something is wrong on startup, rollback to read from the original ReadBuffer.
|
2021-12-23 03:50:26 +00:00
|
|
|
if (!isInitialized())
|
|
|
|
{
|
|
|
|
LOG_ERROR(log, "ExternalDataSourceCache has not been initialized");
|
2021-12-28 03:26:39 +00:00
|
|
|
return {nullptr, std::move(read_buffer)};
|
2021-12-23 03:50:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
auto remote_path = remote_file_metadata->remote_path;
|
|
|
|
const auto & last_modification_timestamp = remote_file_metadata->last_modification_timestamp;
|
|
|
|
auto local_path = calculateLocalPath(remote_file_metadata);
|
|
|
|
std::lock_guard lock(mutex);
|
|
|
|
auto cache = lru_caches->get(local_path);
|
|
|
|
if (cache)
|
|
|
|
{
|
2022-03-01 07:22:07 +00:00
|
|
|
if (!cache->value().isEnable())
|
|
|
|
{
|
|
|
|
return {nullptr, std::move(read_buffer)};
|
|
|
|
}
|
|
|
|
|
|
|
|
// the remote file has been updated, need to redownload
|
2022-01-04 07:16:24 +00:00
|
|
|
if (!cache->value().isValid() || cache->value().isModified(remote_file_metadata))
|
2021-12-23 03:50:26 +00:00
|
|
|
{
|
|
|
|
LOG_TRACE(
|
|
|
|
log,
|
|
|
|
"Remote file ({}) has been updated. Last saved modification time: {}, actual last modification time: {}",
|
|
|
|
remote_path,
|
2022-01-04 07:16:24 +00:00
|
|
|
std::to_string(cache->value().getLastModificationTimestamp()),
|
2021-12-23 03:50:26 +00:00
|
|
|
std::to_string(last_modification_timestamp));
|
2022-01-04 07:16:24 +00:00
|
|
|
cache->value().markInvalid();
|
|
|
|
cache.reset();
|
2021-12-23 03:50:26 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2022-01-04 07:16:24 +00:00
|
|
|
return {std::make_unique<LocalFileHolder>(std::move(cache)), nullptr};
|
2021-12-23 03:50:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!fs::exists(local_path))
|
|
|
|
fs::create_directories(local_path);
|
|
|
|
|
2022-02-28 00:15:37 +00:00
|
|
|
// Cache is not found or is invalid, try to remove it at first.
|
2022-01-04 07:16:24 +00:00
|
|
|
lru_caches->tryRemove(local_path);
|
|
|
|
|
|
|
|
auto new_cache_controller
|
|
|
|
= std::make_shared<RemoteCacheController>(remote_file_metadata, local_path, local_cache_bytes_read_before_flush);
|
|
|
|
auto new_cache = lru_caches->getOrSet(local_path, [&] { return new_cache_controller; });
|
|
|
|
if (!new_cache)
|
2021-12-23 03:50:26 +00:00
|
|
|
{
|
2022-01-04 07:16:24 +00:00
|
|
|
LOG_ERROR(
|
|
|
|
log,
|
|
|
|
"Insert the new cache failed. new file size:{}, current total size:{}",
|
|
|
|
remote_file_metadata->file_size,
|
|
|
|
lru_caches->weight());
|
2021-12-28 03:26:39 +00:00
|
|
|
return {nullptr, std::move(read_buffer)};
|
2021-12-23 03:50:26 +00:00
|
|
|
}
|
2022-03-01 07:22:07 +00:00
|
|
|
/*
|
|
|
|
If read_buffer is seekable, use read_buffer directly inside LocalFileHolder. And once LocalFileHolder is released,
|
|
|
|
start the download process in background.
|
|
|
|
The cache is marked disable until the download process finish.
|
|
|
|
For reading parquet files from hdfs, with this optimization, the speedup can reach 3x.
|
|
|
|
*/
|
|
|
|
if (dynamic_cast<SeekableReadBuffer *>(read_buffer.get()) && is_random_accessed)
|
|
|
|
{
|
|
|
|
new_cache->value().disable();
|
|
|
|
return {std::make_unique<LocalFileHolder>(std::move(new_cache), std::move(read_buffer), &context->getSchedulePool()), nullptr};
|
|
|
|
}
|
2022-01-04 07:16:24 +00:00
|
|
|
new_cache->value().startBackgroundDownload(std::move(read_buffer), context->getSchedulePool());
|
|
|
|
return {std::make_unique<LocalFileHolder>(std::move(new_cache)), nullptr};
|
2021-12-23 03:50:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|