From ffcfe5f9a1e9ba2eec6e8995186c5e1ebaea4414 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 26 Apr 2021 13:34:44 +0000 Subject: [PATCH] Progress bar for file table engine --- src/IO/Progress.h | 33 +++++++ src/Interpreters/Context.h | 12 ++- src/Processors/Sources/SourceWithProgress.h | 8 +- src/Storages/StorageFile.cpp | 96 ++++++++++++++++++- src/Storages/StorageFile.h | 8 +- src/TableFunctions/ITableFunctionFileLike.cpp | 3 +- 6 files changed, 148 insertions(+), 12 deletions(-) diff --git a/src/IO/Progress.h b/src/IO/Progress.h index 64bf3a404af..2f69396dd5d 100644 --- a/src/IO/Progress.h +++ b/src/IO/Progress.h @@ -5,6 +5,8 @@ #include #include +#include + namespace DB { @@ -45,6 +47,37 @@ struct WriteProgress : written_rows(written_rows_), written_bytes(written_bytes_) {} }; +/// Track progress of processing one or multiple files via File table engine. +struct FileTableEngineProgress +{ + /// Track elapsed time. + Stopwatch watch; + size_t total_bytes_to_process; + + /// FileTableEngineProgress struct can be accessed from Context via const reference. + /// These fields are allowed to be updated in a progress callback. + mutable std::atomic processed_bytes; + mutable std::atomic processed_rows; + + FileTableEngineProgress() : total_bytes_to_process(0), processed_bytes(0) {} + + FileTableEngineProgress(const FileTableEngineProgress & other) + : watch(other.watch) + , total_bytes_to_process(other.total_bytes_to_process) + , processed_bytes(other.processed_bytes.load()) + , processed_rows(other.processed_rows.load()) {} + + FileTableEngineProgress & operator=(FileTableEngineProgress other) + { + watch = other.watch; + total_bytes_to_process = other.total_bytes_to_process; + processed_bytes = other.processed_bytes.load(); + processed_rows = other.processed_rows.load(); + return *this; + } +}; + + /** Progress of query execution. * Values, transferred over network are deltas - how much was done after previously sent value. * The same struct is also used for summarized values. diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 680ee7c779f..adcb7cfe5eb 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -181,10 +181,13 @@ private: std::shared_ptr access; std::shared_ptr initial_row_policy; String current_database; - Settings settings; /// Setting for query execution. + Settings settings; /// Setting for query execution. + using ProgressCallback = std::function; - ProgressCallback progress_callback; /// Callback for tracking progress of query execution. - QueryStatus * process_list_elem = nullptr; /// For tracking total resource usage for query. + ProgressCallback progress_callback; /// Callback for tracking progress of query execution. + FileTableEngineProgress file_progress; /// Progress data to track processing of one or multiple files for File table engine. + + QueryStatus * process_list_elem = nullptr; /// For tracking total resource usage for query. StorageID insertion_table = StorageID::createEmpty(); /// Saved insertion table in query context String default_format; /// Format, used when server formats data by itself and if query does not have FORMAT specification. @@ -581,6 +584,9 @@ public: /// Used in InterpreterSelectQuery to pass it to the IBlockInputStream. ProgressCallback getProgressCallback() const; + const FileTableEngineProgress & getFileTableEngineProgress() { return file_progress; } + void setFileTableEngineApproxBytesToProcess(size_t num_bytes) { file_progress.total_bytes_to_process = num_bytes; } + /** Set in executeQuery and InterpreterSelectQuery. Then it is used in IBlockInputStream, * to update and monitor information about the total number of resources spent for the query. */ diff --git a/src/Processors/Sources/SourceWithProgress.h b/src/Processors/Sources/SourceWithProgress.h index 3aa7a81f418..3b283009c73 100644 --- a/src/Processors/Sources/SourceWithProgress.h +++ b/src/Processors/Sources/SourceWithProgress.h @@ -52,20 +52,24 @@ public: void setLeafLimits(const SizeLimits & leaf_limits_) final {leaf_limits = leaf_limits_; } void setQuota(const std::shared_ptr & quota_) final { quota = quota_; } void setProcessListElement(QueryStatus * elem) final { process_list_elem = elem; } - void setProgressCallback(const ProgressCallback & callback) final { progress_callback = callback; } void addTotalRowsApprox(size_t value) final { total_rows_approx += value; } + /// This method might be overriden, if, during query execution, there is a Source, that needs + /// to add one more progress callback. + void setProgressCallback(const ProgressCallback & callback) override { progress_callback = callback; } + protected: /// Call this method to provide information about progress. void progress(const Progress & value); void work() override; + ProgressCallback progress_callback; + private: StreamLocalLimits limits; SizeLimits leaf_limits; std::shared_ptr quota; - ProgressCallback progress_callback; QueryStatus * process_list_elem = nullptr; /// The approximate total number of rows to read. For progress bar. diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 14b91d29805..81679a2916d 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -38,6 +38,9 @@ #include #include #include +#include +#include + namespace fs = std::filesystem; @@ -65,7 +68,7 @@ namespace /* Recursive directory listing with matched paths as a result. * Have the same method in StorageHDFS. */ -std::vector listFilesWithRegexpMatching(const std::string & path_for_ls, const std::string & for_match) +std::vector listFilesWithRegexpMatching(const std::string & path_for_ls, const std::string & for_match, size_t & total_bytes_to_read) { const size_t first_glob = for_match.find_first_of("*?{"); @@ -94,6 +97,7 @@ std::vector listFilesWithRegexpMatching(const std::string & path_fo { if (re2::RE2::FullMatch(file_name, matcher)) { + total_bytes_to_read += fs::file_size(it->path()); result.push_back(it->path().string()); } } @@ -102,7 +106,7 @@ std::vector listFilesWithRegexpMatching(const std::string & path_fo if (re2::RE2::FullMatch(file_name, matcher)) { /// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check. - Strings result_part = listFilesWithRegexpMatching(full_path + "/", suffix_with_globs.substr(next_slash)); + Strings result_part = listFilesWithRegexpMatching(full_path + "/", suffix_with_globs.substr(next_slash), total_bytes_to_read); std::move(result_part.begin(), result_part.end(), std::back_inserter(result)); } } @@ -131,7 +135,7 @@ void checkCreationIsAllowed(ContextPtr context_global, const std::string & db_di } } -Strings StorageFile::getPathsList(const String & table_path, const String & user_files_path, ContextPtr context) +Strings StorageFile::getPathsList(const String & table_path, const String & user_files_path, ContextPtr context, size_t & total_bytes_to_read) { String user_files_absolute_path = Poco::Path(user_files_path).makeAbsolute().makeDirectory().toString(); Poco::Path poco_path = Poco::Path(table_path); @@ -141,9 +145,12 @@ Strings StorageFile::getPathsList(const String & table_path, const String & user Strings paths; const String path = poco_path.absolute().toString(); if (path.find_first_of("*?{") == std::string::npos) + { + total_bytes_to_read += fs::file_size(path); paths.push_back(path); + } else - paths = listFilesWithRegexpMatching("/", path); + paths = listFilesWithRegexpMatching("/", path, total_bytes_to_read); for (const auto & cur_path : paths) checkCreationIsAllowed(context, user_files_absolute_path, cur_path); @@ -177,7 +184,7 @@ StorageFile::StorageFile(const std::string & table_path_, const std::string & us : StorageFile(args) { is_db_table = false; - paths = getPathsList(table_path_, user_files_path, args.getContext()); + paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read); if (args.format_name == "Distributed") { @@ -329,6 +336,7 @@ public: Chunk generate() override { + //setFileProgressCallback(); while (!finished_generate) { /// Open file lazily on first read. This is needed to avoid too many open files from different streams. @@ -421,6 +429,80 @@ public: return {}; } + + void setProgressCallback(const ProgressCallback & callback) override + { + /// Add file progress callback only for clickhouse-local. + if (context->getApplicationType() != Context::ApplicationType::LOCAL) + { + progress_callback = callback; + return; + } + + auto file_progress_callback = [this](const Progress & progress) + { + static size_t increment = 0; + static const char * indicators[8] = + { + "\033[1;30m→\033[0m", + "\033[1;31m↘\033[0m", + "\033[1;32m↓\033[0m", + "\033[1;33m↙\033[0m", + "\033[1;34m←\033[0m", + "\033[1;35m↖\033[0m", + "\033[1;36m↑\033[0m", + "\033[1m↗\033[0m", + }; + size_t terminal_width = getTerminalWidth(); + + auto & file_progress = context->getFileTableEngineProgress(); + WriteBufferFromFileDescriptor message(STDERR_FILENO, 1024); + + /// Output progress bar one line lower. + if (!file_progress.processed_bytes) + message << std::string(terminal_width, ' '); + + file_progress.processed_bytes += progress.read_bytes; + file_progress.processed_rows += progress.read_rows; + + /// Display progress bar only if .25 seconds have passed since query execution start. + size_t elapsed_ns = file_progress.watch.elapsed(); + if (elapsed_ns > 25000000 && progress.read_bytes > 0) + { + message << '\r'; + const char * indicator = indicators[increment % 8]; + size_t prefix_size = message.count(); + + message << indicator << " Progress: "; + message << formatReadableQuantity(file_progress.processed_rows) << " rows, "; + message << formatReadableSizeWithDecimalSuffix(file_progress.processed_bytes) << " bytes. "; + + size_t written_progress_chars = message.count() - prefix_size - (strlen(indicator) - 1); /// Don't count invisible output (escape sequences). + ssize_t width_of_progress_bar = static_cast(terminal_width) - written_progress_chars - strlen(" 99%"); + std::string bar = UnicodeBar::render(UnicodeBar::getWidth(file_progress.processed_bytes, 0, file_progress.total_bytes_to_process, width_of_progress_bar)); + + message << "\033[0;32m" << bar << "\033[0m"; + + if (width_of_progress_bar > static_cast(bar.size() / UNICODE_BAR_CHAR_SIZE)) + message << std::string(width_of_progress_bar - bar.size() / UNICODE_BAR_CHAR_SIZE, ' '); + + message << ' ' << std::min((100 * file_progress.processed_bytes / file_progress.total_bytes_to_process), static_cast(99)) << '%'; + } + ++increment; + }; + + /// Progress callback can be added via context or via method in SourceWithProgress. + /// In executeQuery a callback from context is wrapped into another + /// progress callback and then passed to SourceWithProgress. Here another callback is + /// added to avoid overriding previous callbacks or avoid other callbacks overriding this one. + progress_callback = [callback, file_progress_callback](const Progress & progress) + { + callback(progress); + file_progress_callback(progress); + }; + } + + private: std::shared_ptr storage; StorageMetadataPtr metadata_snapshot; @@ -483,6 +565,10 @@ Pipe StorageFile::read( Pipes pipes; pipes.reserve(num_streams); + /// For clickhouse-local add progress callback to display in a progress bar. + if (context->getApplicationType() == Context::ApplicationType::LOCAL) + context->setFileTableEngineApproxBytesToProcess(total_bytes_to_read); + for (size_t i = 0; i < num_streams; ++i) { const auto get_columns_for_format = [&]() -> ColumnsDescription diff --git a/src/Storages/StorageFile.h b/src/Storages/StorageFile.h index a277dda7cc0..fc69f29cd20 100644 --- a/src/Storages/StorageFile.h +++ b/src/Storages/StorageFile.h @@ -61,7 +61,7 @@ public: NamesAndTypesList getVirtuals() const override; - static Strings getPathsList(const String & table_path, const String & user_files_path, ContextPtr context); + static Strings getPathsList(const String & table_path, const String & user_files_path, ContextPtr context, size_t & total_bytes_to_read); /// Check if the format is column-oriented. /// Is is useful because column oriented formats could effectively skip unknown columns @@ -85,6 +85,9 @@ protected: private: explicit StorageFile(CommonArguments args); + /// For clickhouse-local query display progress of processed files. + static void addProgressCallback(ContextPtr context); + std::string format_name; // We use format settings from global context + CREATE query for File table // function -- in this case, format_settings is set. @@ -106,6 +109,9 @@ private: mutable std::shared_timed_mutex rwlock; Poco::Logger * log = &Poco::Logger::get("StorageFile"); + + /// Approximate number of bytes to read. Needed for progress bar. + size_t total_bytes_to_read = 0; }; } diff --git a/src/TableFunctions/ITableFunctionFileLike.cpp b/src/TableFunctions/ITableFunctionFileLike.cpp index 44a917a0f00..f3d6905d1a7 100644 --- a/src/TableFunctions/ITableFunctionFileLike.cpp +++ b/src/TableFunctions/ITableFunctionFileLike.cpp @@ -77,7 +77,8 @@ ColumnsDescription ITableFunctionFileLike::getActualTableStructure(ContextPtr co if (structure.empty()) { assert(getName() == "file" && format == "Distributed"); - Strings paths = StorageFile::getPathsList(filename, context->getUserFilesPath(), context); + size_t total_bytes_to_read = 0; + Strings paths = StorageFile::getPathsList(filename, context->getUserFilesPath(), context, total_bytes_to_read); if (paths.empty()) throw Exception("Cannot get table structure from file, because no files match specified name", ErrorCodes::INCORRECT_FILE_NAME); auto read_stream = StorageDistributedDirectoryMonitor::createStreamFromFile(paths[0]);