Merge ProfileEvents in case they were not printed

That way with --profile-events-delay-ms=-1 you will always get totals.

Plus, this will fix periodic failures, that can be reproduced by
limitting CPU (5% is enough in my setup), i.e.:

    $ systemd-run --collect --unit ch -p CPUQuota=5% --user clickhouse-server
    $ while clickhouse-client --print-profile-events --profile-events-delay-ms=-1 -q 'select * from numbers (1e5) format Null' |& tee /dev/stderr | fgrep 'SelectedRows: 131010 (increment)'; do :; done

And as a bonus it will make 02050_client_profile_events deterministic.
This commit is contained in:
Azat Khuzhin 2021-12-14 10:25:30 +03:00
parent 6aebc3e94c
commit 1d25ec3e82

View File

@ -4,6 +4,8 @@
#include <iomanip>
#include <string_view>
#include <filesystem>
#include <map>
#include <unordered_map>
#include <base/argsToConfig.h>
#include <base/DateLUT.h>
@ -52,6 +54,7 @@
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <Interpreters/ProfileEventsExt.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/CompressionMethod.h>
#include <Client/InternalTextLogs.h>
@ -105,6 +108,99 @@ namespace ProfileEvents
namespace DB
{
static void incrementProfileEventsBlock(Block & dst, const Block & src)
{
if (!dst)
{
dst = src;
return;
}
assertBlocksHaveEqualStructure(src, dst, "ProfileEvents");
std::unordered_map<String, size_t> name_pos;
for (size_t i = 0; i < dst.columns(); ++i)
name_pos[dst.getByPosition(i).name] = i;
size_t dst_rows = dst.rows();
MutableColumns mutable_columns = dst.mutateColumns();
auto & dst_column_host_name = typeid_cast<ColumnString &>(*mutable_columns[name_pos["host_name"]]);
auto & dst_array_current_time = typeid_cast<ColumnUInt32 &>(*mutable_columns[name_pos["current_time"]]).getData();
auto & dst_array_thread_id = typeid_cast<ColumnUInt64 &>(*mutable_columns[name_pos["thread_id"]]).getData();
auto & dst_array_type = typeid_cast<ColumnInt8 &>(*mutable_columns[name_pos["type"]]).getData();
auto & dst_column_name = typeid_cast<ColumnString &>(*mutable_columns[name_pos["name"]]);
auto & dst_array_value = typeid_cast<ColumnInt64 &>(*mutable_columns[name_pos["value"]]).getData();
const auto & src_column_host_name = typeid_cast<const ColumnString &>(*src.getByName("host_name").column);
const auto & src_array_current_time = typeid_cast<const ColumnUInt32 &>(*src.getByName("current_time").column).getData();
const auto & src_array_thread_id = typeid_cast<const ColumnUInt64 &>(*src.getByName("thread_id").column).getData();
const auto & src_column_name = typeid_cast<const ColumnString &>(*src.getByName("name").column);
const auto & src_array_value = typeid_cast<const ColumnInt64 &>(*src.getByName("value").column).getData();
struct Id
{
StringRef name;
StringRef host_name;
UInt64 thread_id;
bool operator<(const Id & rhs) const
{
return std::tie(name, host_name, thread_id)
< std::tie(rhs.name, rhs.host_name, rhs.thread_id);
}
};
std::map<Id, UInt64> rows_by_name;
for (size_t src_row = 0; src_row < src.rows(); ++src_row)
{
Id id{
src_column_name.getDataAt(src_row),
src_column_host_name.getDataAt(src_row),
src_array_thread_id[src_row],
};
rows_by_name[id] = src_row;
}
/// Merge src into dst.
for (size_t dst_row = 0; dst_row < dst_rows; ++dst_row)
{
Id id{
dst_column_name.getDataAt(dst_row),
dst_column_host_name.getDataAt(dst_row),
dst_array_thread_id[dst_row],
};
if (auto it = rows_by_name.find(id); it != rows_by_name.end())
{
size_t src_row = it->second;
dst_array_current_time[dst_row] = src_array_current_time[src_row];
switch (dst_array_type[dst_row])
{
case ProfileEvents::Type::INCREMENT:
dst_array_value[dst_row] += src_array_value[src_row];
break;
case ProfileEvents::Type::GAUGE:
dst_array_value[dst_row] = src_array_value[src_row];
break;
}
rows_by_name.erase(it);
}
}
/// Copy rows from src that dst does not contains.
for (const auto & [id, pos] : rows_by_name)
{
for (size_t col = 0; col < src.columns(); ++col)
{
mutable_columns[col]->insert((*src.getByPosition(col).column)[pos]);
}
}
dst.setColumns(std::move(mutable_columns));
}
std::atomic_flag exit_on_signal = ATOMIC_FLAG_INIT;
@ -753,7 +849,7 @@ void ClientBase::onProfileEvents(Block & block)
}
else
{
profile_events.last_block = block;
incrementProfileEventsBlock(profile_events.last_block, block);
}
}
profile_events.watch.restart();