diff --git a/src/Interpreters/AsynchronousInsertQueue.h b/src/Interpreters/AsynchronousInsertQueue.h index 3fadaaf263b..4ef8c5af7e0 100644 --- a/src/Interpreters/AsynchronousInsertQueue.h +++ b/src/Interpreters/AsynchronousInsertQueue.h @@ -91,11 +91,11 @@ private: using Queue = std::unordered_map, InsertQuery::Hash>; using QueueIterator = Queue::iterator; - std::shared_mutex rwlock; + mutable std::shared_mutex rwlock; Queue queue; using QueryIdToEntry = std::unordered_map; - std::mutex currently_processing_mutex; + mutable std::mutex currently_processing_mutex; QueryIdToEntry currently_processing_queries; /// Logic and events behind queue are as follows: @@ -128,8 +128,11 @@ private: static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context); public: - Queue getQueue() const { return queue; } - QueryIdToEntry getCurrentlyProcessingQueries() const { return currently_processing_queries; } + Queue getQueue() const + { + std::shared_lock lock(rwlock); + return queue; + } }; } diff --git a/src/Storages/System/StorageSystemAsynchronousInserts.cpp b/src/Storages/System/StorageSystemAsynchronousInserts.cpp new file mode 100644 index 00000000000..ff0796172bb --- /dev/null +++ b/src/Storages/System/StorageSystemAsynchronousInserts.cpp @@ -0,0 +1,99 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +static constexpr auto TIME_SCALE = 6; + +NamesAndTypesList StorageSystemAsynchronousInserts::getNamesAndTypes() +{ + return + { + {"query", std::make_shared()}, + {"first_update", std::make_shared(TIME_SCALE)}, + {"last_update", std::make_shared(TIME_SCALE)}, + {"total_bytes", std::make_shared()}, + {"entries.query_id", std::make_shared(std::make_shared())}, + {"entries.bytes", std::make_shared(std::make_shared())}, + {"entries.finished", std::make_shared(std::make_shared())}, + {"entries.exception", std::make_shared(std::make_shared())}, + }; +} + +void StorageSystemAsynchronousInserts::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const +{ + using namespace std::chrono; + + auto * insert_queue = context->getAsynchronousInsertQueue(); + if (!insert_queue) + return; + + auto queue = insert_queue->getQueue(); + for (const auto & [key, elem] : queue) + { + std::lock_guard lock(elem->mutex); + + if (!elem->data) + continue; + + auto time_in_microseconds = [](const time_point & timestamp) + { + auto time_diff = duration_cast(steady_clock::now() - timestamp); + return (system_clock::now() - time_diff).time_since_epoch().count(); + }; + + size_t i = 0; + + res_columns[i++]->insert(queryToString(key.query)); + res_columns[i++]->insert(time_in_microseconds(elem->data->first_update)); + res_columns[i++]->insert(time_in_microseconds(elem->data->last_update)); + res_columns[i++]->insert(elem->data->size); + + Array arr_query_id; + Array arr_bytes; + Array arr_finished; + Array arr_exception; + + for (const auto & entry : elem->data->entries) + { + arr_query_id.push_back(entry->query_id); + arr_bytes.push_back(entry->bytes.size()); + arr_finished.push_back(entry->isFinished()); + + auto exception = entry->getException(); + if (exception) + { + try + { + std::rethrow_exception(exception); + } + catch (const Exception & e) + { + arr_exception.push_back(e.displayText()); + } + catch (...) + { + arr_exception.push_back("Unknown exception"); + } + } + else + arr_exception.push_back(""); + } + + res_columns[i++]->insert(arr_query_id); + res_columns[i++]->insert(arr_bytes); + res_columns[i++]->insert(arr_finished); + res_columns[i++]->insert(arr_exception); + } +} + +} diff --git a/src/Storages/System/StorageSystemAsynchronousInserts.h b/src/Storages/System/StorageSystemAsynchronousInserts.h new file mode 100644 index 00000000000..0d7cbbe4eaf --- /dev/null +++ b/src/Storages/System/StorageSystemAsynchronousInserts.h @@ -0,0 +1,26 @@ +#pragma once + +#include +#include + +namespace DB +{ + +/** Implements the system table `asynhronous_inserts`, + * which contains information about pending asynchronous inserts in queue. +*/ +class StorageSystemAsynchronousInserts final : + public shared_ptr_helper, + public IStorageSystemOneBlock +{ +public: + std::string getName() const override { return "AsynchronousInserts"; } + static NamesAndTypesList getNamesAndTypes(); + +protected: + friend struct shared_ptr_helper; + using IStorageSystemOneBlock::IStorageSystemOneBlock; + void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override; +}; + +} diff --git a/src/Storages/System/attachSystemTables.cpp b/src/Storages/System/attachSystemTables.cpp index 3656a239adb..f77112a82a2 100644 --- a/src/Storages/System/attachSystemTables.cpp +++ b/src/Storages/System/attachSystemTables.cpp @@ -74,6 +74,7 @@ #include #include #include +#include #ifdef OS_LINUX #include @@ -167,6 +168,7 @@ void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper) attach(system_database, "macros"); attach(system_database, "replicated_fetches"); attach(system_database, "part_moves_between_shards"); + attach(system_database, "asynchronous_inserts"); if (has_zookeeper) attach(system_database, "zookeeper");