diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index a5db9636a5d..7512649746f 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -108,11 +108,19 @@ StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor() void StorageDistributedDirectoryMonitor::flushAllData() { - if (!quit) + if (quit) + return; + + CurrentMetrics::Increment metric_pending_files{CurrentMetrics::DistributedFilesToInsert, 0}; + std::unique_lock lock{mutex}; + + const auto & files = getFiles(metric_pending_files); + if (!files.empty()) { - CurrentMetrics::Increment metric_pending_files{CurrentMetrics::DistributedFilesToInsert, 0}; - std::unique_lock lock{mutex}; - processFiles(metric_pending_files); + processFiles(files, metric_pending_files); + + /// Update counters + getFiles(metric_pending_files); } } @@ -139,11 +147,16 @@ void StorageDistributedDirectoryMonitor::run() while (!quit) { do_sleep = true; + + const auto & files = getFiles(metric_pending_files); + if (files.empty()) + break; + if (!monitor_blocker.isCancelled()) { try { - do_sleep = !processFiles(metric_pending_files); + do_sleep = !processFiles(files, metric_pending_files); } catch (...) { @@ -171,6 +184,9 @@ void StorageDistributedDirectoryMonitor::run() break; } + /// Update counters + getFiles(metric_pending_files); + if (!quit && do_sleep) task_handle->scheduleAfter(sleep_time.count()); } @@ -226,9 +242,10 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri } -bool StorageDistributedDirectoryMonitor::processFiles(CurrentMetrics::Increment & metric_pending_files) +std::map StorageDistributedDirectoryMonitor::getFiles(CurrentMetrics::Increment & metric_pending_files) { std::map files; + size_t new_bytes_count = 0; Poco::DirectoryIterator end; for (Poco::DirectoryIterator it{path}; it != end; ++it) @@ -237,16 +254,23 @@ bool StorageDistributedDirectoryMonitor::processFiles(CurrentMetrics::Increment Poco::Path file_path{file_path_str}; if (!it->isDirectory() && startsWith(file_path.getExtension(), "bin")) + { files[parse(file_path.getBaseName())] = file_path_str; + new_bytes_count += Poco::File(file_path).getSize(); + } } + files_count = files.size(); + bytes_count = new_bytes_count; + /// Note: the value of this metric will be kept if this function will throw an exception. /// This is needed, because in case of exception, files still pending. metric_pending_files.changeTo(files.size()); - if (files.empty()) - return false; - + return files; +} +bool StorageDistributedDirectoryMonitor::processFiles(const std::map & files, CurrentMetrics::Increment & metric_pending_files) +{ if (should_batch_inserts) { processFilesWithBatching(files, metric_pending_files); diff --git a/src/Storages/Distributed/DirectoryMonitor.h b/src/Storages/Distributed/DirectoryMonitor.h index 418cd430243..cb67ae85dfb 100644 --- a/src/Storages/Distributed/DirectoryMonitor.h +++ b/src/Storages/Distributed/DirectoryMonitor.h @@ -37,9 +37,20 @@ public: /// For scheduling via DistributedBlockOutputStream bool scheduleAfter(size_t ms); + + /// system.distribution_queue interface + std::string getPath() const { return path; } + /// Racy but ok + size_t getErrorCount() const { return error_count; } + size_t getFilesCount() const { return files_count; } + size_t getBytesCount() const { return bytes_count; } + size_t isBlocked() const { return monitor_blocker.isCancelled(); } + private: void run(); - bool processFiles(CurrentMetrics::Increment & metric_pending_files); + + std::map getFiles(CurrentMetrics::Increment & metric_pending_files); + bool processFiles(const std::map & files, CurrentMetrics::Increment & metric_pending_files); void processFile(const std::string & file_path, CurrentMetrics::Increment & metric_pending_files); void processFilesWithBatching(const std::map & files, CurrentMetrics::Increment & metric_pending_files); @@ -61,7 +72,10 @@ private: struct BatchHeader; struct Batch; - size_t error_count{}; + size_t error_count = 0; + size_t files_count = 0; + size_t bytes_count = 0; + const std::chrono::milliseconds default_sleep_time; std::chrono::milliseconds sleep_time; const std::chrono::milliseconds max_sleep_time; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index aafc864321d..55e1e5810f1 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -656,6 +656,17 @@ StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor( return *node_data.directory_monitor; } +std::vector StorageDistributed::getAllDirectoryMonitors() +{ + std::vector monitors; + { + std::lock_guard lock(cluster_nodes_mutex); + for (auto & node : cluster_nodes_data) + monitors.push_back(node.second.directory_monitor.get()); + } + return monitors; +} + size_t StorageDistributed::getShardCount() const { return getCluster()->getShardCount(); diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 770acba47cc..74aadb8f580 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -94,6 +94,7 @@ public: void shutdown() override; Strings getDataPaths() const override; + size_t getInsertQueueSize() const { return 0; } const ExpressionActionsPtr & getShardingKeyExpr() const { return sharding_key_expr; } const String & getShardingKeyColumnName() const { return sharding_key_column_name; } @@ -107,6 +108,8 @@ public: void createDirectoryMonitors(const std::string & disk); /// ensure directory monitor thread and connectoin pool creation by disk and subdirectory name StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const std::string & disk, const std::string & name); + /// Return list of all monitors lazy (because there are no monitors until at least one INSERT executed) + std::vector getAllDirectoryMonitors(); void flushClusterNodesAllData(); diff --git a/src/Storages/System/StorageSystemDistributionQueue.cpp b/src/Storages/System/StorageSystemDistributionQueue.cpp new file mode 100644 index 00000000000..55dc9f71f3e --- /dev/null +++ b/src/Storages/System/StorageSystemDistributionQueue.cpp @@ -0,0 +1,110 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + + +NamesAndTypesList StorageSystemDistributionQueue::getNamesAndTypes() +{ + return { + { "database", std::make_shared() }, + { "table", std::make_shared() }, + { "data_path", std::make_shared() }, + { "is_blocked", std::make_shared() }, + { "error_count", std::make_shared() }, + { "data_files", std::make_shared() }, + { "data_compressed_bytes", std::make_shared() }, + }; +} + + +void StorageSystemDistributionQueue::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const +{ + const auto access = context.getAccess(); + const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES); + + std::map> tables; + for (const auto & db : DatabaseCatalog::instance().getDatabases()) + { + /// Lazy database can not contain distributed tables + if (db.second->getEngineName() == "Lazy") + continue; + + const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first); + + for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) + { + if (!dynamic_cast(iterator->table().get())) + continue; + if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name())) + continue; + tables[db.first][iterator->name()] = iterator->table(); + } + } + + + MutableColumnPtr col_database_mut = ColumnString::create(); + MutableColumnPtr col_table_mut = ColumnString::create(); + + for (auto & db : tables) + { + for (auto & table : db.second) + { + col_database_mut->insert(db.first); + col_table_mut->insert(table.first); + } + } + + ColumnPtr col_database_to_filter = std::move(col_database_mut); + ColumnPtr col_table_to_filter = std::move(col_table_mut); + + /// Determine what tables are needed by the conditions in the query. + { + Block filtered_block + { + { col_database_to_filter, std::make_shared(), "database" }, + { col_table_to_filter, std::make_shared(), "table" }, + }; + + VirtualColumnUtils::filterBlockWithQuery(query_info.query, filtered_block, context); + + if (!filtered_block.rows()) + return; + + col_database_to_filter = filtered_block.getByName("database").column; + col_table_to_filter = filtered_block.getByName("table").column; + } + + for (size_t i = 0, tables_size = col_database_to_filter->size(); i < tables_size; ++i) + { + String database = (*col_database_to_filter)[i].safeGet(); + String table = (*col_table_to_filter)[i].safeGet(); + + auto & distributed_table = dynamic_cast(*tables[database][table]); + + for (auto * monitor : distributed_table.getAllDirectoryMonitors()) + { + size_t col_num = 0; + res_columns[col_num++]->insert(database); + res_columns[col_num++]->insert(table); + res_columns[col_num++]->insert(monitor->getPath()); + res_columns[col_num++]->insert(monitor->isBlocked()); + res_columns[col_num++]->insert(monitor->getErrorCount()); + res_columns[col_num++]->insert(monitor->getFilesCount()); + res_columns[col_num++]->insert(monitor->getBytesCount()); + } + } +} + +} diff --git a/src/Storages/System/StorageSystemDistributionQueue.h b/src/Storages/System/StorageSystemDistributionQueue.h new file mode 100644 index 00000000000..88e7fa45cf5 --- /dev/null +++ b/src/Storages/System/StorageSystemDistributionQueue.h @@ -0,0 +1,29 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +class Context; + + +/** Implements the `distribution_queue` system table, which allows you to view the INSERT queues for the Distributed tables. + */ +class StorageSystemDistributionQueue final : public ext::shared_ptr_helper, public IStorageSystemOneBlock +{ + friend struct ext::shared_ptr_helper; +public: + std::string getName() const override { return "SystemDistributionQueue"; } + + static NamesAndTypesList getNamesAndTypes(); + +protected: + using IStorageSystemOneBlock::IStorageSystemOneBlock; + + void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override; +}; + +} diff --git a/src/Storages/System/attachSystemTables.cpp b/src/Storages/System/attachSystemTables.cpp index 585eab2b4d8..2b52f0fe5cc 100644 --- a/src/Storages/System/attachSystemTables.cpp +++ b/src/Storages/System/attachSystemTables.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -124,6 +125,7 @@ void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper) system_database.attachTable("mutations", StorageSystemMutations::create("mutations")); system_database.attachTable("replicas", StorageSystemReplicas::create("replicas")); system_database.attachTable("replication_queue", StorageSystemReplicationQueue::create("replication_queue")); + system_database.attachTable("distribution_queue", StorageSystemDistributionQueue::create("distribution_queue")); system_database.attachTable("dictionaries", StorageSystemDictionaries::create("dictionaries")); system_database.attachTable("models", StorageSystemModels::create("models")); system_database.attachTable("clusters", StorageSystemClusters::create("clusters")); diff --git a/src/Storages/ya.make b/src/Storages/ya.make index 7e36e4145eb..33844d5547c 100644 --- a/src/Storages/ya.make +++ b/src/Storages/ya.make @@ -121,6 +121,7 @@ SRCS( System/StorageSystemQuotasUsage.cpp System/StorageSystemReplicas.cpp System/StorageSystemReplicationQueue.cpp + System/StorageSystemDistributionQueue.cpp System/StorageSystemRoleGrants.cpp System/StorageSystemRoles.cpp System/StorageSystemRowPolicies.cpp diff --git a/tests/queries/0_stateless/01293_system_distribution_queue.reference b/tests/queries/0_stateless/01293_system_distribution_queue.reference new file mode 100644 index 00000000000..a2c1e5f2a7b --- /dev/null +++ b/tests/queries/0_stateless/01293_system_distribution_queue.reference @@ -0,0 +1,6 @@ +INSERT +1 0 1 1 +FLUSH +1 0 0 0 +UNBLOCK +0 0 0 0 diff --git a/tests/queries/0_stateless/01293_system_distribution_queue.sql b/tests/queries/0_stateless/01293_system_distribution_queue.sql new file mode 100644 index 00000000000..c0ff6a21e8e --- /dev/null +++ b/tests/queries/0_stateless/01293_system_distribution_queue.sql @@ -0,0 +1,21 @@ +drop table if exists null_01293; +drop table if exists dist_01293; + +create table null_01293 (key Int) engine=Null(); +create table dist_01293 as null_01293 engine=Distributed(test_cluster_two_shards, currentDatabase(), null_01293, key); + +-- no rows, since no active monitor +select * from system.distribution_queue; + +select 'INSERT'; +system stop distributed sends dist_01293; +insert into dist_01293 select * from numbers(10); +select is_blocked, error_count, data_files, data_compressed_bytes>100 from system.distribution_queue; +system flush distributed dist_01293; + +select 'FLUSH'; +select is_blocked, error_count, data_files, data_compressed_bytes from system.distribution_queue; + +select 'UNBLOCK'; +system start distributed sends dist_01293; +select is_blocked, error_count, data_files, data_compressed_bytes from system.distribution_queue;