Add caching

This commit is contained in:
kssenii 2023-09-26 16:23:24 +02:00
parent 4d78dbacfe
commit 14b09d3cdc
9 changed files with 315 additions and 123 deletions

View File

@ -534,6 +534,7 @@ The server successfully detected this situation and will download merged part fr
M(S3QueueSetFileFailedMicroseconds, "Time spent to set file as failed")\
M(S3QueueCleanupMaxSetSizeOrTTLMicroseconds, "Time spent to set file as failed")\
M(S3QueuePullMicroseconds, "Time spent to read file data")\
M(S3QueueLockLocalFileStatusesMicroseconds, "Time spent to lock local file statuses")\
\
M(ServerStartupMilliseconds, "Time elapsed from starting server to listening to sockets in milliseconds")\
M(IOUringSQEsSubmitted, "Total number of io_uring SQEs submitted") \

View File

@ -3,6 +3,7 @@
#include <Interpreters/Cache/FileSegment.h>
#include <Common/logger_useful.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Common/SipHash.h>
#include <filesystem>
namespace fs = std::filesystem;

View File

@ -27,6 +27,7 @@ namespace ProfileEvents
extern const Event S3QueueSetFileProcessedMicroseconds;
extern const Event S3QueueSetFileFailedMicroseconds;
extern const Event S3QueueCleanupMaxSetSizeOrTTLMicroseconds;
extern const Event S3QueueLockLocalFileStatusesMicroseconds;
};
namespace DB
@ -35,6 +36,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
namespace
@ -53,41 +55,54 @@ namespace
}
}
S3QueueFilesMetadata::S3QueueFilesMetadata(
const StorageS3Queue * storage_,
const S3QueueSettings & settings_,
ContextPtr context)
: storage(storage_)
, mode(settings_.mode)
, max_set_size(settings_.s3queue_tracked_files_limit.value)
, max_set_age_sec(settings_.s3queue_tracked_file_ttl_sec.value)
, max_loading_retries(settings_.s3queue_loading_retries.value)
, min_cleanup_interval_ms(settings_.s3queue_cleanup_interval_min_ms.value)
, max_cleanup_interval_ms(settings_.s3queue_cleanup_interval_max_ms.value)
, zookeeper_processing_path(storage->getZooKeeperPath() / "processing")
, zookeeper_processed_path(storage->getZooKeeperPath() / "processed")
, zookeeper_failed_path(storage->getZooKeeperPath() / "failed")
, zookeeper_cleanup_lock_path(storage->getZooKeeperPath() / "cleanup_lock")
, log(&Poco::Logger::get("S3QueueFilesMetadata"))
std::unique_lock<std::mutex> S3QueueFilesMetadata::LocalFileStatuses::lock() const
{
if (mode == S3QueueMode::UNORDERED && (max_set_size || max_set_age_sec))
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueLockLocalFileStatusesMicroseconds);
return std::unique_lock(mutex);
}
S3QueueFilesMetadata::FileStatus::State S3QueueFilesMetadata::LocalFileStatuses::state(const std::string & filename) const
{
auto lk = lock();
if (auto it = file_statuses.find(filename); it != file_statuses.end())
return it->second->state;
else
return FileStatus::State::None;
}
S3QueueFilesMetadata::FileStatuses S3QueueFilesMetadata::LocalFileStatuses::getAll() const
{
auto lk = lock();
return file_statuses;
}
std::shared_ptr<S3QueueFilesMetadata::FileStatus> S3QueueFilesMetadata::LocalFileStatuses::get(const std::string & filename, bool create)
{
auto lk = lock();
auto it = file_statuses.find(filename);
if (it == file_statuses.end())
{
task = context->getSchedulePool().createTask("S3QueueCleanupFunc", [this] { cleanupThreadFunc(); });
task->activate();
task->scheduleAfter(generateRescheduleInterval(min_cleanup_interval_ms, max_cleanup_interval_ms));
if (create)
it = file_statuses.emplace(filename, std::make_shared<FileStatus>()).first;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "File status for {} doesn't exist", filename);
}
return it->second;
}
S3QueueFilesMetadata::~S3QueueFilesMetadata()
bool S3QueueFilesMetadata::LocalFileStatuses::remove(const std::string & filename, bool if_exists)
{
deactivateCleanupTask();
}
void S3QueueFilesMetadata::deactivateCleanupTask()
{
shutdown = true;
if (task)
task->deactivate();
auto lk = lock();
auto it = file_statuses.find(filename);
if (it == file_statuses.end())
{
if (if_exists)
return false;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "File status for {} doesn't exist", filename);
}
file_statuses.erase(it);
return true;
}
std::string S3QueueFilesMetadata::NodeMetadata::toString() const
@ -117,6 +132,49 @@ S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::NodeMetadata::fromStrin
return metadata;
}
S3QueueFilesMetadata::S3QueueFilesMetadata(const fs::path & zookeeper_path_, const S3QueueSettings & settings_)
: mode(settings_.mode)
, max_set_size(settings_.s3queue_tracked_files_limit.value)
, max_set_age_sec(settings_.s3queue_tracked_file_ttl_sec.value)
, max_loading_retries(settings_.s3queue_loading_retries.value)
, min_cleanup_interval_ms(settings_.s3queue_cleanup_interval_min_ms.value)
, max_cleanup_interval_ms(settings_.s3queue_cleanup_interval_max_ms.value)
, zookeeper_processing_path(zookeeper_path_ / "processing")
, zookeeper_processed_path(zookeeper_path_ / "processed")
, zookeeper_failed_path(zookeeper_path_ / "failed")
, zookeeper_cleanup_lock_path(zookeeper_path_ / "cleanup_lock")
, log(&Poco::Logger::get("S3QueueFilesMetadata"))
{
if (mode == S3QueueMode::UNORDERED && (max_set_size || max_set_age_sec))
{
task = Context::getGlobalContextInstance()->getSchedulePool().createTask("S3QueueCleanupFunc", [this] { cleanupThreadFunc(); });
task->activate();
task->scheduleAfter(generateRescheduleInterval(min_cleanup_interval_ms, max_cleanup_interval_ms));
}
}
S3QueueFilesMetadata::~S3QueueFilesMetadata()
{
deactivateCleanupTask();
}
void S3QueueFilesMetadata::deactivateCleanupTask()
{
shutdown = true;
if (task)
task->deactivate();
}
zkutil::ZooKeeperPtr S3QueueFilesMetadata::getZooKeeper() const
{
return Context::getGlobalContextInstance()->getZooKeeper();
}
std::shared_ptr<S3QueueFilesMetadata::FileStatus> S3QueueFilesMetadata::getFileStatus(const std::string & path)
{
return local_file_statuses.get(path, /* create */false);
}
std::string S3QueueFilesMetadata::getNodeName(const std::string & path)
{
SipHash path_hash;
@ -137,23 +195,39 @@ S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::createNodeMetadata(
return metadata;
}
std::shared_ptr<S3QueueFilesMetadata::FileStatus> S3QueueFilesMetadata::getFileStatus(const std::string & path)
{
std::lock_guard lock(file_statuses_mutex);
return file_statuses.at(path);
}
S3QueueFilesMetadata::FileStatuses S3QueueFilesMetadata::getFileStateses() const
{
std::lock_guard lock(file_statuses_mutex);
return file_statuses;
}
bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path)
{
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessingMicroseconds);
bool result;
/// Check locally cached file status.
switch (local_file_statuses.state(path))
{
case FileStatus::State::Processing: [[fallthrough]];
case FileStatus::State::Processed:
{
/// File is already processes or processing by current server.
return false;
}
case FileStatus::State::Failed:
{
if (!max_loading_retries)
{
/// File was processes by current server and failed,
/// retries are disabled.
return false;
}
/// TODO save information if file is still retriable.
break;
}
case FileStatus::State::None:
{
/// The file was not processed by current server,
/// check metadata in zookeeper.
break;
}
}
SetFileProcessingResult result;
switch (mode)
{
case S3QueueMode::ORDERED:
@ -167,21 +241,42 @@ bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path)
break;
}
}
if (result)
switch (result)
{
std::lock_guard lock(file_statuses_mutex);
auto it = file_statuses.emplace(path, std::make_shared<FileStatus>()).first;
auto & file_status = it->second;
file_status->state = FileStatus::State::Processing;
file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileProcessingMicroseconds, timer.get());
timer.cancel();
if (!file_status->processing_start_time)
file_status->processing_start_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
case SetFileProcessingResult::Success:
{
auto file_status = local_file_statuses.get(path, /* create */true);
file_status->state = FileStatus::State::Processing;
file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileProcessingMicroseconds, timer.get());
timer.cancel();
if (!file_status->processing_start_time)
file_status->processing_start_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
break;
}
case SetFileProcessingResult::AlreadyProcessed:
{
/// Cache the state.
auto file_status = local_file_statuses.get(path, /* create */true);
file_status->state = FileStatus::State::Processed;
break;
}
case SetFileProcessingResult::AlreadyFailed:
{
/// Cache the state.
auto file_status = local_file_statuses.get(path, /* create */true);
file_status->state = FileStatus::State::Failed;
break;
}
case SetFileProcessingResult::ProcessingByOtherNode:
{
/// We cannot save any local state.
break;
}
}
return result;
return result == SetFileProcessingResult::Success;
}
bool S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path)
S3QueueFilesMetadata::SetFileProcessingResult S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path)
{
/// Create an ephemenral node in /processing
/// if corresponding node does not exist in failed/, processed/ and processing/.
@ -189,7 +284,7 @@ bool S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::str
const auto node_name = getNodeName(path);
const auto node_metadata = createNodeMetadata(path).toString();
const auto zk_client = storage->getZooKeeper();
const auto zk_client = getZooKeeper();
Coordination::Requests requests;
zkutil::addCheckNotExistsRequest(requests, *zk_client, zookeeper_processed_path / node_name);
@ -198,10 +293,26 @@ bool S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::str
Coordination::Responses responses;
auto code = zk_client->tryMulti(requests, responses);
return code == Coordination::Error::ZOK;
if (code == Coordination::Error::ZOK)
{
return SetFileProcessingResult::Success;
}
else if (responses[0]->error == Coordination::Error::ZOK)
{
if (responses[1]->error == Coordination::Error::ZOK)
{
chassert(responses[2]->error != Coordination::Error::ZOK);
return SetFileProcessingResult::ProcessingByOtherNode;
}
else
return SetFileProcessingResult::AlreadyFailed;
}
else
return SetFileProcessingResult::AlreadyProcessed;
}
bool S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path)
S3QueueFilesMetadata::SetFileProcessingResult S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path)
{
/// Create an ephemenral node in /processing
/// if corresponding it does not exist in failed/, processing/ and satisfied max processed file check.
@ -209,7 +320,7 @@ bool S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::strin
const auto node_name = getNodeName(path);
const auto node_metadata = createNodeMetadata(path).toString();
const auto zk_client = storage->getZooKeeper();
const auto zk_client = getZooKeeper();
while (true)
{
@ -221,9 +332,16 @@ bool S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::strin
auto code = zk_client->tryMulti(requests, responses);
if (code != Coordination::Error::ZOK)
{
LOG_TEST(log, "Skipping file `{}`: {}",
path, responses[0]->error != Coordination::Error::ZOK ? "failed" : "processing");
return false;
if (responses[0]->error == Coordination::Error::ZOK)
{
LOG_TEST(log, "Skipping file `{}`: already processing", path);
return SetFileProcessingResult::ProcessingByOtherNode;
}
else
{
LOG_TEST(log, "Skipping file `{}`: failed", path);
return SetFileProcessingResult::AlreadyFailed;
}
}
Coordination::Stat processed_node_stat;
@ -234,7 +352,7 @@ bool S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::strin
auto max_processed_file_path = processed_node_metadata.file_path;
if (!max_processed_file_path.empty() && path <= max_processed_file_path)
return false;
return SetFileProcessingResult::AlreadyProcessed;
requests.clear();
responses.clear();
@ -244,14 +362,17 @@ bool S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::strin
code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
return true;
return SetFileProcessingResult::Success;
if (responses[0]->error != Coordination::Error::ZOK
|| responses[1]->error != Coordination::Error::ZOK)
if (responses[0]->error != Coordination::Error::ZOK)
{
LOG_TEST(log, "Skipping file `{}`: {}",
path, responses[0]->error != Coordination::Error::ZOK ? "failed" : "processing");
return false;
LOG_TEST(log, "Skipping file `{}`: failed", path);
return SetFileProcessingResult::AlreadyFailed;
}
else if (responses[1]->error != Coordination::Error::ZOK)
{
LOG_TEST(log, "Skipping file `{}`: already processing", path);
return SetFileProcessingResult::ProcessingByOtherNode;
}
else
{
@ -264,8 +385,7 @@ void S3QueueFilesMetadata::setFileProcessed(const String & path)
{
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessedMicroseconds);
SCOPE_EXIT({
std::lock_guard lock(file_statuses_mutex);
auto & file_status = file_statuses.at(path);
auto file_status = local_file_statuses.get(path, /* create */false);
file_status->state = FileStatus::State::Processed;
file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileProcessedMicroseconds, timer.get());
timer.cancel();
@ -291,7 +411,7 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(const String & path)
const auto node_name = getNodeName(path);
const auto node_metadata = createNodeMetadata(path).toString();
const auto zk_client = storage->getZooKeeper();
const auto zk_client = getZooKeeper();
Coordination::Requests requests;
requests.push_back(zkutil::makeRemoveRequest(zookeeper_processing_path / node_name, -1));
@ -316,7 +436,7 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(const String & path)
{
const auto node_name = getNodeName(path);
const auto node_metadata = createNodeMetadata(path).toString();
const auto zk_client = storage->getZooKeeper();
const auto zk_client = getZooKeeper();
while (true)
{
@ -351,8 +471,7 @@ void S3QueueFilesMetadata::setFileFailed(const String & path, const String & exc
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds);
SCOPE_EXIT_SAFE({
std::lock_guard lock(file_statuses_mutex);
auto & file_status = file_statuses.at(path);
auto file_status = local_file_statuses.get(path, /* create */false);
file_status->state = FileStatus::State::Failed;
file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileFailedMicroseconds, timer.get());
timer.cancel();
@ -361,7 +480,7 @@ void S3QueueFilesMetadata::setFileFailed(const String & path, const String & exc
const auto node_name = getNodeName(path);
auto node_metadata = createNodeMetadata(path, exception_message);
const auto zk_client = storage->getZooKeeper();
const auto zk_client = getZooKeeper();
if (max_loading_retries == 0)
{
@ -474,7 +593,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
const bool check_nodes_limit = max_set_size > 0;
const bool check_nodes_ttl = max_set_age_sec > 0;
const auto zk_client = storage->getZooKeeper();
const auto zk_client = getZooKeeper();
auto nodes = zk_client->getChildren(zookeeper_processed_path);
if (nodes.empty())
{
@ -571,6 +690,8 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
LOG_TEST(log, "Removing node at path {} ({}) because max files limit is reached",
node.metadata.file_path, path.string());
local_file_statuses.remove(node.metadata.file_path, /* if_exists */true);
code = zk_client->tryRemove(path);
if (code == Coordination::Error::ZOK)
--nodes_to_remove;
@ -586,6 +707,8 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
LOG_TEST(log, "Removing node at path {} ({}) because file is reached",
node.metadata.file_path, path.string());
local_file_statuses.remove(node.metadata.file_path, /* if_exists */true);
code = zk_client->tryRemove(path);
if (code != Coordination::Error::ZOK)
LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", path.string(), code);
@ -608,6 +731,16 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
LOG_TRACE(log, "Node limits check finished");
}
bool S3QueueFilesMetadata::checkSettings(const S3QueueSettings & settings) const
{
return mode == settings.mode
&& max_set_size == settings.s3queue_tracked_files_limit.value
&& max_set_age_sec == settings.s3queue_tracked_file_ttl_sec.value
&& max_loading_retries == settings.s3queue_loading_retries.value
&& min_cleanup_interval_ms == settings.s3queue_cleanup_interval_min_ms.value
&& max_cleanup_interval_ms == settings.s3queue_cleanup_interval_max_ms.value;
}
}
#endif

View File

@ -6,6 +6,7 @@
#include <Core/Types.h>
#include <Core/SettingsEnums.h>
#include <Core/BackgroundSchedulePool.h>
#include <Common/ZooKeeper/ZooKeeper.h>
namespace fs = std::filesystem;
namespace Poco { class Logger; }
@ -18,7 +19,7 @@ class StorageS3Queue;
class S3QueueFilesMetadata
{
public:
S3QueueFilesMetadata(const StorageS3Queue * storage_, const S3QueueSettings & settings_, ContextPtr context);
S3QueueFilesMetadata(const fs::path & zookeeper_path_, const S3QueueSettings & settings_);
~S3QueueFilesMetadata();
@ -52,10 +53,11 @@ public:
std::shared_ptr<FileStatus> getFileStatus(const std::string & path);
FileStatuses getFileStateses() const;
FileStatuses getFileStateses() const { return local_file_statuses.getAll(); }
bool checkSettings(const S3QueueSettings & settings) const;
private:
const StorageS3Queue * storage;
const S3QueueMode mode;
const UInt64 max_set_size;
const UInt64 max_set_age_sec;
@ -73,16 +75,22 @@ private:
std::atomic_bool shutdown = false;
BackgroundSchedulePool::TaskHolder task;
FileStatuses file_statuses;
mutable std::mutex file_statuses_mutex;
std::string getNodeName(const std::string & path);
bool trySetFileAsProcessingForOrderedMode(const std::string & path);
bool trySetFileAsProcessingForUnorderedMode(const std::string & path);
zkutil::ZooKeeperPtr getZooKeeper() const;
void setFileProcessedForOrderedMode(const std::string & path);
void setFileProcessedForUnorderedMode(const std::string & path);
std::string getNodeName(const std::string & path);
enum class SetFileProcessingResult
{
Success,
ProcessingByOtherNode,
AlreadyProcessed,
AlreadyFailed,
};
SetFileProcessingResult trySetFileAsProcessingForOrderedMode(const std::string & path);
SetFileProcessingResult trySetFileAsProcessingForUnorderedMode(const std::string & path);
struct NodeMetadata
{
@ -99,6 +107,19 @@ private:
void cleanupThreadFunc();
void cleanupThreadFuncImpl();
struct LocalFileStatuses
{
FileStatuses file_statuses;
mutable std::mutex mutex;
FileStatuses getAll() const;
std::shared_ptr<FileStatus> get(const std::string & filename, bool create);
bool remove(const std::string & filename, bool if_exists);
FileStatus::State state(const std::string & filename) const;
std::unique_lock<std::mutex> lock() const;
};
LocalFileStatuses local_file_statuses;
};
}

View File

@ -0,0 +1,29 @@
#include <Storages/S3Queue/S3QueueMetadataFactory.h>
namespace DB
{
S3QueueMetadataFactory & S3QueueMetadataFactory::instance()
{
static S3QueueMetadataFactory ret;
return ret;
}
S3QueueMetadataFactory::MetadataPtr
S3QueueMetadataFactory::getOrCreate(const std::string & zookeeper_path, const S3QueueSettings & settings)
{
std::lock_guard lock(mutex);
auto it = metadata_by_path.find(zookeeper_path);
if (it == metadata_by_path.end())
{
it = metadata_by_path.emplace(zookeeper_path, std::make_shared<S3QueueFilesMetadata>(fs::path(zookeeper_path), settings)).first;
}
else if (!it->second->checkSettings(settings))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Metadata with the same `s3queue_zookeeper_path` "
"was already created but with different settings");
}
return it->second;
}
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/S3Queue/S3QueueFilesMetadata.h>
namespace DB
{
class S3QueueMetadataFactory final : private boost::noncopyable
{
public:
using MetadataPtr = std::shared_ptr<S3QueueFilesMetadata>;
using MetadataByPath = std::unordered_map<std::string, MetadataPtr>;
static S3QueueMetadataFactory & instance();
MetadataPtr getOrCreate(const std::string & zookeeper_path, const S3QueueSettings & settings);
MetadataByPath getAll() { return metadata_by_path; }
private:
MetadataByPath metadata_by_path;
std::mutex mutex;
};
}

View File

@ -14,6 +14,7 @@
#include <Storages/S3Queue/S3QueueTableMetadata.h>
#include <Storages/S3Queue/StorageS3Queue.h>
#include <Storages/S3Queue/S3QueueFilesMetadata.h>
#include <Storages/S3Queue/S3QueueMetadataFactory.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/StorageSnapshot.h>
@ -85,7 +86,7 @@ StorageS3Queue::StorageS3Queue(
, s3queue_settings(std::move(s3queue_settings_))
, zk_path(chooseZooKeeperPath(table_id_, context_->getSettingsRef(), *s3queue_settings))
, after_processing(s3queue_settings->after_processing)
, files_metadata(std::make_shared<S3QueueFilesMetadata>(this, *s3queue_settings, context_))
, files_metadata(S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings))
, configuration{configuration_}
, format_settings(format_settings_)
, reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms)

View File

@ -54,8 +54,6 @@ public:
zkutil::ZooKeeperPtr getZooKeeper() const;
S3QueueFilesMetadata::FileStatuses getFileStatuses() const { return files_metadata->getFileStateses(); }
private:
using FileIterator = StorageS3QueueSource::FileIterator;

View File

@ -14,6 +14,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Storages/S3Queue/S3QueueFilesMetadata.h>
#include <Storages/S3Queue/S3QueueMetadataFactory.h>
#include <Storages/S3Queue/StorageS3Queue.h>
#include <Disks/IDisk.h>
@ -24,8 +25,7 @@ namespace DB
NamesAndTypesList StorageSystemS3Queue::getNamesAndTypes()
{
return {
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"zookeeper_path", std::make_shared<DataTypeString>()},
{"file_name", std::make_shared<DataTypeString>()},
{"rows_processed", std::make_shared<DataTypeUInt64>()},
{"status", std::make_shared<DataTypeString>()},
@ -40,47 +40,29 @@ StorageSystemS3Queue::StorageSystemS3Queue(const StorageID & table_id_)
{
}
void StorageSystemS3Queue::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
void StorageSystemS3Queue::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
{
const auto access = context->getAccess();
const bool show_tables_granted = access->isGranted(AccessType::SHOW_TABLES);
if (show_tables_granted)
for (const auto & [zookeeper_path, metadata] : S3QueueMetadataFactory::instance().getAll())
{
auto databases = DatabaseCatalog::instance().getDatabases();
for (const auto & db : databases)
for (const auto & [file_name, file_status] : metadata->getFileStateses())
{
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
{
StoragePtr storage = iterator->table();
if (auto * s3queue_table = dynamic_cast<StorageS3Queue *>(storage.get()))
{
const auto & table_id = s3queue_table->getStorageID();
auto file_statuses = s3queue_table->getFileStatuses();
for (const auto & [file_name, file_status] : file_statuses)
{
size_t i = 0;
res_columns[i++]->insert(table_id.database_name);
res_columns[i++]->insert(table_id.table_name);
res_columns[i++]->insert(file_name);
res_columns[i++]->insert(file_status->processed_rows);
res_columns[i++]->insert(magic_enum::enum_name(file_status->state));
size_t i = 0;
res_columns[i++]->insert(zookeeper_path);
res_columns[i++]->insert(file_name);
res_columns[i++]->insert(file_status->processed_rows);
res_columns[i++]->insert(magic_enum::enum_name(file_status->state));
if (file_status->processing_start_time)
res_columns[i++]->insert(file_status->processing_start_time);
else
res_columns[i++]->insertDefault();
if (file_status->processing_end_time)
res_columns[i++]->insert(file_status->processing_end_time);
else
res_columns[i++]->insertDefault();
if (file_status->processing_start_time)
res_columns[i++]->insert(file_status->processing_start_time);
else
res_columns[i++]->insertDefault();
if (file_status->processing_end_time)
res_columns[i++]->insert(file_status->processing_end_time);
else
res_columns[i++]->insertDefault();
ProfileEvents::dumpToMapColumn(file_status->profile_counters.getPartiallyAtomicSnapshot(), res_columns[i++].get(), true);
}
}
}
ProfileEvents::dumpToMapColumn(file_status->profile_counters.getPartiallyAtomicSnapshot(), res_columns[i++].get(), true);
}
}
}