diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 6d9fd686765..6878533c2fd 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -5,6 +5,7 @@ #define APPLY_FOR_METRICS(M) \ M(Query, "Number of executing queries") \ M(Merge, "Number of executing background merges") \ + M(Move, "Number of currently executing moves") \ M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \ M(ReplicatedFetch, "Number of data parts being fetched from replica") \ M(ReplicatedSend, "Number of data parts being sent to replicas") \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index b413c784159..913b0535358 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -229,6 +230,7 @@ struct ContextSharedPart : boost::noncopyable ProcessList process_list; /// Executing queries at the moment. GlobalOvercommitTracker global_overcommit_tracker; MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree) + MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree) ReplicatedFetchList replicated_fetch_list; ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections. InterserverIOHandler interserver_io_handler; /// Handler for interserver communication. @@ -637,6 +639,8 @@ const ProcessList & Context::getProcessList() const { return shared->process_lis OvercommitTracker * Context::getGlobalOvercommitTracker() const { return &shared->global_overcommit_tracker; } MergeList & Context::getMergeList() { return shared->merge_list; } const MergeList & Context::getMergeList() const { return shared->merge_list; } +MovesList & Context::getMovesList() { return shared->moves_list; } +const MovesList & Context::getMovesList() const { return shared->moves_list; } ReplicatedFetchList & Context::getReplicatedFetchList() { return shared->replicated_fetch_list; } const ReplicatedFetchList & Context::getReplicatedFetchList() const { return shared->replicated_fetch_list; } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index a0b62da364e..bc89ce36edc 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -63,6 +63,7 @@ using InterserverCredentialsPtr = std::shared_ptr; class InterserverIOHandler; class BackgroundSchedulePool; class MergeList; +class MovesList; class ReplicatedFetchList; class Cluster; class Compiler; @@ -775,6 +776,9 @@ public: MergeList & getMergeList(); const MergeList & getMergeList() const; + MovesList & getMovesList(); + const MovesList & getMovesList() const; + ReplicatedFetchList & getReplicatedFetchList(); const ReplicatedFetchList & getReplicatedFetchList() const; diff --git a/src/Storages/MergeTree/BackgroundProcessList.h b/src/Storages/MergeTree/BackgroundProcessList.h index baf3e281257..c9a4887cca3 100644 --- a/src/Storages/MergeTree/BackgroundProcessList.h +++ b/src/Storages/MergeTree/BackgroundProcessList.h @@ -10,7 +10,7 @@ namespace DB { /// Common code for background processes lists, like system.merges and system.replicated_fetches -/// Look at examples in MergeList and ReplicatedFetchList +/// Look at examples in MergeList, MovesList and ReplicatedFetchList template class BackgroundProcessList; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 77d687e0e29..fcd5afb9947 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6650,7 +6650,7 @@ MergeTreeData::CurrentlyMovingPartsTagger::CurrentlyMovingPartsTagger(MergeTreeM MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger() { std::lock_guard lock(data.moving_parts_mutex); - for (const auto & moving_part : parts_to_move) + for (auto & moving_part : parts_to_move) { /// Something went completely wrong if (!data.currently_moving_parts.contains(moving_part.part)) @@ -6776,6 +6776,14 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge nullptr); }; + // Register in global moves list (StorageSystemMoves) + auto moves_list_entry = getContext()->getMovesList().insert( + getStorageID(), + moving_part.part->name, + moving_part.reserved_space->getDisk()->getName(), + moving_part.reserved_space->getDisk()->getPath(), + moving_part.part->getBytesOnDisk()); + try { /// If zero-copy replication enabled than replicas shouldn't try to diff --git a/src/Storages/MergeTree/MergeTreePartsMover.h b/src/Storages/MergeTree/MergeTreePartsMover.h index 0266b2daa46..dfb4bb954d7 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.h +++ b/src/Storages/MergeTree/MergeTreePartsMover.h @@ -5,14 +5,14 @@ #include #include #include +#include #include namespace DB { -/// Active part from storage and destination reservation where -/// it have to be moved. +/// Active part from storage and destination reservation where it has to be moved struct MergeTreeMoveEntry { std::shared_ptr part; @@ -54,7 +54,7 @@ public: /// Replaces cloned part from detached directory into active data parts set. /// Replacing part changes state to DeleteOnDestroy and will be removed from disk after destructor of - ///IMergeTreeDataPart called. If replacing part doesn't exists or not active (committed) than + /// IMergeTreeDataPart called. If replacing part doesn't exists or not active (committed) than /// cloned part will be removed and log message will be reported. It may happen in case of concurrent /// merge or mutation. void swapClonedPart(const MergeTreeMutableDataPartPtr & cloned_parts) const; diff --git a/src/Storages/MergeTree/MovesList.cpp b/src/Storages/MergeTree/MovesList.cpp new file mode 100644 index 00000000000..730cd44a697 --- /dev/null +++ b/src/Storages/MergeTree/MovesList.cpp @@ -0,0 +1,37 @@ +#include +#include +#include + +namespace DB +{ + +MovesListElement::MovesListElement( + const StorageID & table_id_, + const std::string & part_name_, + const std::string & target_disk_name_, + const std::string & target_disk_path_, + UInt64 part_size_) + : table_id(table_id_) + , part_name(part_name_) + , target_disk_name(target_disk_name_) + , target_disk_path(target_disk_path_) + , part_size(part_size_) + , thread_id(getThreadId()) +{ +} + +MoveInfo MovesListElement::getInfo() const +{ + MoveInfo res; + res.database = table_id.database_name; + res.table = table_id.table_name; + res.part_name = part_name; + res.target_disk_name = target_disk_name; + res.target_disk_path = target_disk_path; + res.part_size = part_size; + res.elapsed = watch.elapsedSeconds(); + res.thread_id = thread_id; + return res; +} + +} diff --git a/src/Storages/MergeTree/MovesList.h b/src/Storages/MergeTree/MovesList.h new file mode 100644 index 00000000000..42f0901b41d --- /dev/null +++ b/src/Storages/MergeTree/MovesList.h @@ -0,0 +1,64 @@ +#pragma once +#include +#include +#include +#include +#include +#include + +namespace CurrentMetrics +{ + extern const Metric Move; +} + +namespace DB +{ + +struct MoveInfo +{ + std::string database; + std::string table; + std::string part_name; + std::string target_disk_name; + std::string target_disk_path; + UInt64 part_size; + + Float64 elapsed; + UInt64 thread_id; +}; + +struct MovesListElement : private boost::noncopyable +{ + const StorageID table_id; + const std::string part_name; + const std::string target_disk_name; + const std::string target_disk_path; + const UInt64 part_size; + + Stopwatch watch; + const UInt64 thread_id; + + MovesListElement( + const StorageID & table_id_, + const std::string & part_name_, + const std::string & target_disk_name_, + const std::string & target_disk_path_, + UInt64 part_size_); + + MoveInfo getInfo() const; +}; + + +/// List of currently processing moves +class MovesList final : public BackgroundProcessList +{ +private: + using Parent = BackgroundProcessList; + +public: + MovesList() + : Parent(CurrentMetrics::Move) + {} +}; + +} diff --git a/src/Storages/System/StorageSystemMoves.cpp b/src/Storages/System/StorageSystemMoves.cpp new file mode 100644 index 00000000000..6ecc9e7f373 --- /dev/null +++ b/src/Storages/System/StorageSystemMoves.cpp @@ -0,0 +1,47 @@ +#include +#include +#include +#include + + +namespace DB +{ + +NamesAndTypesList StorageSystemMoves::getNamesAndTypes() +{ + return { + {"database", std::make_shared()}, + {"table", std::make_shared()}, + {"elapsed", std::make_shared()}, + {"target_disk_name", std::make_shared()}, + {"target_disk_path", std::make_shared()}, + {"part_name", std::make_shared()}, + {"part_size", std::make_shared()}, + {"thread_id", std::make_shared()}, + }; +} + + +void StorageSystemMoves::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const +{ + const auto access = context->getAccess(); + const bool check_access_for_tables = !access->isGranted(AccessType::SHOW_TABLES); + + for (const auto & move : context->getMovesList().get()) + { + if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, move.database, move.table)) + continue; + + size_t i = 0; + res_columns[i++]->insert(move.database); + res_columns[i++]->insert(move.table); + res_columns[i++]->insert(move.elapsed); + res_columns[i++]->insert(move.target_disk_name); + res_columns[i++]->insert(move.target_disk_path); + res_columns[i++]->insert(move.part_name); + res_columns[i++]->insert(move.part_size); + res_columns[i++]->insert(move.thread_id); + } +} + +} diff --git a/src/Storages/System/StorageSystemMoves.h b/src/Storages/System/StorageSystemMoves.h new file mode 100644 index 00000000000..2e4ceec2abd --- /dev/null +++ b/src/Storages/System/StorageSystemMoves.h @@ -0,0 +1,28 @@ +#pragma once + +#include +#include +#include +#include + + +namespace DB +{ + +class Context; + + +class StorageSystemMoves final : public IStorageSystemOneBlock +{ +public: + std::string getName() const override { return "SystemMoves"; } + + static NamesAndTypesList getNamesAndTypes(); + +protected: + using IStorageSystemOneBlock::IStorageSystemOneBlock; + + void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override; +}; + +} diff --git a/src/Storages/System/attachSystemTables.cpp b/src/Storages/System/attachSystemTables.cpp index 068f7ddce46..e82f7c9bb2b 100644 --- a/src/Storages/System/attachSystemTables.cpp +++ b/src/Storages/System/attachSystemTables.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -159,6 +160,7 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b attach(context, system_database, "processes"); attach(context, system_database, "metrics"); attach(context, system_database, "merges"); + attach(context, system_database, "moves"); attach(context, system_database, "mutations"); attach(context, system_database, "replicas"); attach(context, system_database, "replication_queue"); diff --git a/tests/queries/0_stateless/02117_show_create_table_system.reference b/tests/queries/0_stateless/02117_show_create_table_system.reference index 6432a6e6518..8e62c0937e8 100644 --- a/tests/queries/0_stateless/02117_show_create_table_system.reference +++ b/tests/queries/0_stateless/02117_show_create_table_system.reference @@ -371,6 +371,19 @@ CREATE TABLE system.metrics ) ENGINE = SystemMetrics COMMENT 'SYSTEM TABLE is built on the fly.' +CREATE TABLE system.moves +( + `database` String, + `table` String, + `elapsed` Float64, + `target_disk_name` String, + `target_disk_path` String, + `part_name` String, + `part_size` UInt64, + `thread_id` UInt64 +) +ENGINE = SystemMoves +COMMENT 'SYSTEM TABLE is built on the fly.' CREATE TABLE system.mutations ( `database` String, diff --git a/tests/queries/0_stateless/02117_show_create_table_system.sql b/tests/queries/0_stateless/02117_show_create_table_system.sql index 8b75ed60eec..37bf2667069 100644 --- a/tests/queries/0_stateless/02117_show_create_table_system.sql +++ b/tests/queries/0_stateless/02117_show_create_table_system.sql @@ -45,6 +45,7 @@ show create table macros format TSVRaw; show create table merge_tree_settings format TSVRaw; show create table merges format TSVRaw; show create table metrics format TSVRaw; +show create table moves format TSVRaw; show create table mutations format TSVRaw; show create table numbers format TSVRaw; show create table numbers_mt format TSVRaw;