Fix possible concurrent access in ProgressIndication

In case of all of the above:
- clickhouse-local
- input_format_parallel_parsing=true
- write_progress_on_update=true

It is possible concurrent access to the following:
- writeProgress() (class properties) (guarded with progress_mutex)
- thread_data/host_cpu_usage (guarded with profile_events_mutex)

v2: decrease number of rows for INSERT ProfileEvents test (10 times)
    CI: https://s3.amazonaws.com/clickhouse-test-reports/37391/4bd5c335182279dcc5020aa081b13c3044135951/stateless_tests__debug__actions__[1/3].html
v3: decrease number of rows for INSERT ProfileEvents test (10 times) and add a comment
    CI: https://s3.amazonaws.com/clickhouse-test-reports/37391/026d7f732cb166c90d6c287b02824b6c7fdebf0c/stateless_tests_flaky_check__address__actions_/runlog.log
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>

f
This commit is contained in:
Azat Khuzhin 2022-05-31 11:05:35 +03:00
parent 4baa7690ae
commit b3bf7589ef
6 changed files with 70 additions and 2 deletions

View File

@ -53,8 +53,11 @@ void ProgressIndication::resetProgress()
show_progress_bar = false;
written_progress_chars = 0;
write_progress_on_update = false;
host_cpu_usage.clear();
thread_data.clear();
{
std::lock_guard lock(profile_events_mutex);
host_cpu_usage.clear();
thread_data.clear();
}
}
void ProgressIndication::setFileProgressCallback(ContextMutablePtr context, bool write_progress_on_update_)
@ -71,6 +74,8 @@ void ProgressIndication::setFileProgressCallback(ContextMutablePtr context, bool
void ProgressIndication::addThreadIdToList(String const & host, UInt64 thread_id)
{
std::lock_guard lock(profile_events_mutex);
auto & thread_to_times = thread_data[host];
if (thread_to_times.contains(thread_id))
return;
@ -79,6 +84,8 @@ void ProgressIndication::addThreadIdToList(String const & host, UInt64 thread_id
void ProgressIndication::updateThreadEventData(HostToThreadTimesMap & new_thread_data, UInt64 elapsed_time)
{
std::lock_guard lock(profile_events_mutex);
for (auto & new_host_map : new_thread_data)
{
host_cpu_usage[new_host_map.first] = calculateCPUUsage(new_host_map.second, elapsed_time);
@ -88,6 +95,8 @@ void ProgressIndication::updateThreadEventData(HostToThreadTimesMap & new_thread
size_t ProgressIndication::getUsedThreadsCount() const
{
std::lock_guard lock(profile_events_mutex);
return std::accumulate(thread_data.cbegin(), thread_data.cend(), 0,
[] (size_t acc, auto const & threads)
{
@ -97,6 +106,8 @@ size_t ProgressIndication::getUsedThreadsCount() const
double ProgressIndication::getCPUUsage() const
{
std::lock_guard lock(profile_events_mutex);
double res = 0;
for (const auto & elem : host_cpu_usage)
res += elem.second;
@ -105,6 +116,8 @@ double ProgressIndication::getCPUUsage() const
ProgressIndication::MemoryUsage ProgressIndication::getMemoryUsage() const
{
std::lock_guard lock(profile_events_mutex);
return std::accumulate(thread_data.cbegin(), thread_data.cend(), MemoryUsage{},
[](MemoryUsage const & acc, auto const & host_data)
{
@ -137,6 +150,8 @@ void ProgressIndication::writeFinalProgress()
void ProgressIndication::writeProgress()
{
std::lock_guard lock(progress_mutex);
/// Output all progress bar commands to stderr at once to avoid flicker.
WriteBufferFromFileDescriptor message(STDERR_FILENO, 1024);

View File

@ -2,6 +2,7 @@
#include <unordered_map>
#include <unordered_set>
#include <mutex>
#include <IO/Progress.h>
#include <Interpreters/Context.h>
#include <base/types.h>
@ -92,6 +93,16 @@ private:
std::unordered_map<String, double> host_cpu_usage;
HostToThreadTimesMap thread_data;
/// In case of all of the above:
/// - clickhouse-local
/// - input_format_parallel_parsing=true
/// - write_progress_on_update=true
///
/// It is possible concurrent access to the following:
/// - writeProgress() (class properties) (guarded with progress_mutex)
/// - thread_data/host_cpu_usage (guarded with profile_events_mutex)
mutable std::mutex profile_events_mutex;
mutable std::mutex progress_mutex;
};
}

View File

@ -0,0 +1,2 @@
0
--progress produce some rows

View File

@ -0,0 +1,19 @@
#!/usr/bin/env bash
# Tags: long
# This is the regression for the concurrent access in ProgressIndication,
# so it is important to read enough rows here (10e6).
#
# Initially there was 100e6, but under thread fuzzer 10min may be not enough sometimes,
# but I believe that CI will catch possible issues even with less rows anyway.
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
tmp_file_progress="$(mktemp "$CUR_DIR/$CLICKHOUSE_TEST_UNIQUE_NAME.XXXXXX.progress")"
trap 'rm $tmp_file_progress' EXIT
yes | head -n10000000 | $CLICKHOUSE_CLIENT -q "insert into function null('foo String') format TSV" --progress 2> "$tmp_file_progress"
echo $?
test -s "$tmp_file_progress" && echo "--progress produce some rows" || echo "FAIL: no rows with --progress"

View File

@ -0,0 +1,2 @@
0
--progress produce some rows

View File

@ -0,0 +1,19 @@
#!/usr/bin/env bash
# Tags: long
# This is the regression for the concurrent access in ProgressIndication,
# so it is important to read enough rows here (10e6).
#
# Initially there was 100e6, but under thread fuzzer 10min may be not enough sometimes,
# but I believe that CI will catch possible issues even with less rows anyway.
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
tmp_file_progress="$(mktemp "$CUR_DIR/$CLICKHOUSE_TEST_UNIQUE_NAME.XXXXXX.progress")"
trap 'rm $tmp_file_progress' EXIT
yes | head -n10000000 | $CLICKHOUSE_LOCAL -q "insert into function null('foo String') format TSV" --progress 2> "$tmp_file_progress"
echo $?
test -s "$tmp_file_progress" && echo "--progress produce some rows" || echo "FAIL: no rows with --progress"