Progress bar for file table engine

This commit is contained in:
kssenii 2021-04-26 13:34:44 +00:00
parent 1a5b18cf52
commit ffcfe5f9a1
6 changed files with 148 additions and 12 deletions

View File

@ -5,6 +5,8 @@
#include <common/types.h>
#include <Core/Defines.h>
#include <Common/Stopwatch.h>
namespace DB
{
@ -45,6 +47,37 @@ struct WriteProgress
: written_rows(written_rows_), written_bytes(written_bytes_) {}
};
/// Track progress of processing one or multiple files via File table engine.
struct FileTableEngineProgress
{
/// Track elapsed time.
Stopwatch watch;
size_t total_bytes_to_process;
/// FileTableEngineProgress struct can be accessed from Context via const reference.
/// These fields are allowed to be updated in a progress callback.
mutable std::atomic<uint64_t> processed_bytes;
mutable std::atomic<uint64_t> processed_rows;
FileTableEngineProgress() : total_bytes_to_process(0), processed_bytes(0) {}
FileTableEngineProgress(const FileTableEngineProgress & other)
: watch(other.watch)
, total_bytes_to_process(other.total_bytes_to_process)
, processed_bytes(other.processed_bytes.load())
, processed_rows(other.processed_rows.load()) {}
FileTableEngineProgress & operator=(FileTableEngineProgress other)
{
watch = other.watch;
total_bytes_to_process = other.total_bytes_to_process;
processed_bytes = other.processed_bytes.load();
processed_rows = other.processed_rows.load();
return *this;
}
};
/** Progress of query execution.
* Values, transferred over network are deltas - how much was done after previously sent value.
* The same struct is also used for summarized values.

View File

@ -181,10 +181,13 @@ private:
std::shared_ptr<const ContextAccess> access;
std::shared_ptr<const EnabledRowPolicies> initial_row_policy;
String current_database;
Settings settings; /// Setting for query execution.
Settings settings; /// Setting for query execution.
using ProgressCallback = std::function<void(const Progress & progress)>;
ProgressCallback progress_callback; /// Callback for tracking progress of query execution.
QueryStatus * process_list_elem = nullptr; /// For tracking total resource usage for query.
ProgressCallback progress_callback; /// Callback for tracking progress of query execution.
FileTableEngineProgress file_progress; /// Progress data to track processing of one or multiple files for File table engine.
QueryStatus * process_list_elem = nullptr; /// For tracking total resource usage for query.
StorageID insertion_table = StorageID::createEmpty(); /// Saved insertion table in query context
String default_format; /// Format, used when server formats data by itself and if query does not have FORMAT specification.
@ -581,6 +584,9 @@ public:
/// Used in InterpreterSelectQuery to pass it to the IBlockInputStream.
ProgressCallback getProgressCallback() const;
const FileTableEngineProgress & getFileTableEngineProgress() { return file_progress; }
void setFileTableEngineApproxBytesToProcess(size_t num_bytes) { file_progress.total_bytes_to_process = num_bytes; }
/** Set in executeQuery and InterpreterSelectQuery. Then it is used in IBlockInputStream,
* to update and monitor information about the total number of resources spent for the query.
*/

View File

@ -52,20 +52,24 @@ public:
void setLeafLimits(const SizeLimits & leaf_limits_) final {leaf_limits = leaf_limits_; }
void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) final { quota = quota_; }
void setProcessListElement(QueryStatus * elem) final { process_list_elem = elem; }
void setProgressCallback(const ProgressCallback & callback) final { progress_callback = callback; }
void addTotalRowsApprox(size_t value) final { total_rows_approx += value; }
/// This method might be overriden, if, during query execution, there is a Source, that needs
/// to add one more progress callback.
void setProgressCallback(const ProgressCallback & callback) override { progress_callback = callback; }
protected:
/// Call this method to provide information about progress.
void progress(const Progress & value);
void work() override;
ProgressCallback progress_callback;
private:
StreamLocalLimits limits;
SizeLimits leaf_limits;
std::shared_ptr<const EnabledQuota> quota;
ProgressCallback progress_callback;
QueryStatus * process_list_elem = nullptr;
/// The approximate total number of rows to read. For progress bar.

View File

@ -38,6 +38,9 @@
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Pipe.h>
#include <Common/UnicodeBar.h>
#include <Common/TerminalSize.h>
namespace fs = std::filesystem;
@ -65,7 +68,7 @@ namespace
/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageHDFS.
*/
std::vector<std::string> listFilesWithRegexpMatching(const std::string & path_for_ls, const std::string & for_match)
std::vector<std::string> listFilesWithRegexpMatching(const std::string & path_for_ls, const std::string & for_match, size_t & total_bytes_to_read)
{
const size_t first_glob = for_match.find_first_of("*?{");
@ -94,6 +97,7 @@ std::vector<std::string> listFilesWithRegexpMatching(const std::string & path_fo
{
if (re2::RE2::FullMatch(file_name, matcher))
{
total_bytes_to_read += fs::file_size(it->path());
result.push_back(it->path().string());
}
}
@ -102,7 +106,7 @@ std::vector<std::string> listFilesWithRegexpMatching(const std::string & path_fo
if (re2::RE2::FullMatch(file_name, matcher))
{
/// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
Strings result_part = listFilesWithRegexpMatching(full_path + "/", suffix_with_globs.substr(next_slash));
Strings result_part = listFilesWithRegexpMatching(full_path + "/", suffix_with_globs.substr(next_slash), total_bytes_to_read);
std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
}
}
@ -131,7 +135,7 @@ void checkCreationIsAllowed(ContextPtr context_global, const std::string & db_di
}
}
Strings StorageFile::getPathsList(const String & table_path, const String & user_files_path, ContextPtr context)
Strings StorageFile::getPathsList(const String & table_path, const String & user_files_path, ContextPtr context, size_t & total_bytes_to_read)
{
String user_files_absolute_path = Poco::Path(user_files_path).makeAbsolute().makeDirectory().toString();
Poco::Path poco_path = Poco::Path(table_path);
@ -141,9 +145,12 @@ Strings StorageFile::getPathsList(const String & table_path, const String & user
Strings paths;
const String path = poco_path.absolute().toString();
if (path.find_first_of("*?{") == std::string::npos)
{
total_bytes_to_read += fs::file_size(path);
paths.push_back(path);
}
else
paths = listFilesWithRegexpMatching("/", path);
paths = listFilesWithRegexpMatching("/", path, total_bytes_to_read);
for (const auto & cur_path : paths)
checkCreationIsAllowed(context, user_files_absolute_path, cur_path);
@ -177,7 +184,7 @@ StorageFile::StorageFile(const std::string & table_path_, const std::string & us
: StorageFile(args)
{
is_db_table = false;
paths = getPathsList(table_path_, user_files_path, args.getContext());
paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read);
if (args.format_name == "Distributed")
{
@ -329,6 +336,7 @@ public:
Chunk generate() override
{
//setFileProgressCallback();
while (!finished_generate)
{
/// Open file lazily on first read. This is needed to avoid too many open files from different streams.
@ -421,6 +429,80 @@ public:
return {};
}
void setProgressCallback(const ProgressCallback & callback) override
{
/// Add file progress callback only for clickhouse-local.
if (context->getApplicationType() != Context::ApplicationType::LOCAL)
{
progress_callback = callback;
return;
}
auto file_progress_callback = [this](const Progress & progress)
{
static size_t increment = 0;
static const char * indicators[8] =
{
"\033[1;30m→\033[0m",
"\033[1;31m↘\033[0m",
"\033[1;32m↓\033[0m",
"\033[1;33m↙\033[0m",
"\033[1;34m←\033[0m",
"\033[1;35m↖\033[0m",
"\033[1;36m↑\033[0m",
"\033[1m↗\033[0m",
};
size_t terminal_width = getTerminalWidth();
auto & file_progress = context->getFileTableEngineProgress();
WriteBufferFromFileDescriptor message(STDERR_FILENO, 1024);
/// Output progress bar one line lower.
if (!file_progress.processed_bytes)
message << std::string(terminal_width, ' ');
file_progress.processed_bytes += progress.read_bytes;
file_progress.processed_rows += progress.read_rows;
/// Display progress bar only if .25 seconds have passed since query execution start.
size_t elapsed_ns = file_progress.watch.elapsed();
if (elapsed_ns > 25000000 && progress.read_bytes > 0)
{
message << '\r';
const char * indicator = indicators[increment % 8];
size_t prefix_size = message.count();
message << indicator << " Progress: ";
message << formatReadableQuantity(file_progress.processed_rows) << " rows, ";
message << formatReadableSizeWithDecimalSuffix(file_progress.processed_bytes) << " bytes. ";
size_t written_progress_chars = message.count() - prefix_size - (strlen(indicator) - 1); /// Don't count invisible output (escape sequences).
ssize_t width_of_progress_bar = static_cast<ssize_t>(terminal_width) - written_progress_chars - strlen(" 99%");
std::string bar = UnicodeBar::render(UnicodeBar::getWidth(file_progress.processed_bytes, 0, file_progress.total_bytes_to_process, width_of_progress_bar));
message << "\033[0;32m" << bar << "\033[0m";
if (width_of_progress_bar > static_cast<ssize_t>(bar.size() / UNICODE_BAR_CHAR_SIZE))
message << std::string(width_of_progress_bar - bar.size() / UNICODE_BAR_CHAR_SIZE, ' ');
message << ' ' << std::min((100 * file_progress.processed_bytes / file_progress.total_bytes_to_process), static_cast<size_t>(99)) << '%';
}
++increment;
};
/// Progress callback can be added via context or via method in SourceWithProgress.
/// In executeQuery a callback from context is wrapped into another
/// progress callback and then passed to SourceWithProgress. Here another callback is
/// added to avoid overriding previous callbacks or avoid other callbacks overriding this one.
progress_callback = [callback, file_progress_callback](const Progress & progress)
{
callback(progress);
file_progress_callback(progress);
};
}
private:
std::shared_ptr<StorageFile> storage;
StorageMetadataPtr metadata_snapshot;
@ -483,6 +565,10 @@ Pipe StorageFile::read(
Pipes pipes;
pipes.reserve(num_streams);
/// For clickhouse-local add progress callback to display in a progress bar.
if (context->getApplicationType() == Context::ApplicationType::LOCAL)
context->setFileTableEngineApproxBytesToProcess(total_bytes_to_read);
for (size_t i = 0; i < num_streams; ++i)
{
const auto get_columns_for_format = [&]() -> ColumnsDescription

View File

@ -61,7 +61,7 @@ public:
NamesAndTypesList getVirtuals() const override;
static Strings getPathsList(const String & table_path, const String & user_files_path, ContextPtr context);
static Strings getPathsList(const String & table_path, const String & user_files_path, ContextPtr context, size_t & total_bytes_to_read);
/// Check if the format is column-oriented.
/// Is is useful because column oriented formats could effectively skip unknown columns
@ -85,6 +85,9 @@ protected:
private:
explicit StorageFile(CommonArguments args);
/// For clickhouse-local query display progress of processed files.
static void addProgressCallback(ContextPtr context);
std::string format_name;
// We use format settings from global context + CREATE query for File table
// function -- in this case, format_settings is set.
@ -106,6 +109,9 @@ private:
mutable std::shared_timed_mutex rwlock;
Poco::Logger * log = &Poco::Logger::get("StorageFile");
/// Approximate number of bytes to read. Needed for progress bar.
size_t total_bytes_to_read = 0;
};
}

View File

@ -77,7 +77,8 @@ ColumnsDescription ITableFunctionFileLike::getActualTableStructure(ContextPtr co
if (structure.empty())
{
assert(getName() == "file" && format == "Distributed");
Strings paths = StorageFile::getPathsList(filename, context->getUserFilesPath(), context);
size_t total_bytes_to_read = 0;
Strings paths = StorageFile::getPathsList(filename, context->getUserFilesPath(), context, total_bytes_to_read);
if (paths.empty())
throw Exception("Cannot get table structure from file, because no files match specified name", ErrorCodes::INCORRECT_FILE_NAME);
auto read_stream = StorageDistributedDirectoryMonitor::createStreamFromFile(paths[0]);