mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Report memory usage in performance tests.
This commit is contained in:
parent
2469ec1af3
commit
a43d3a6902
@ -14,10 +14,15 @@ std::vector<XMLConfigurationPtr> ConfigPreprocessor::processConfig(
|
||||
{
|
||||
|
||||
std::vector<XMLConfigurationPtr> result;
|
||||
for (const auto & path : paths)
|
||||
for (const auto & pathStr : paths)
|
||||
{
|
||||
result.emplace_back(XMLConfigurationPtr(new XMLConfiguration(path)));
|
||||
result.back()->setString("path", Poco::Path(path).absolute().toString());
|
||||
auto test = XMLConfigurationPtr(new XMLConfiguration(pathStr));
|
||||
result.push_back(test);
|
||||
|
||||
const auto path = Poco::Path(pathStr);
|
||||
test->setString("path", path.absolute().toString());
|
||||
if (test->getString("name", "") == "")
|
||||
test->setString("name", path.getBaseName());
|
||||
}
|
||||
|
||||
/// Leave tests:
|
||||
|
@ -259,15 +259,12 @@ static std::vector<std::string> getInputFiles(const po::variables_map & options,
|
||||
|
||||
if (input_files.empty())
|
||||
throw DB::Exception("Did not find any xml files", DB::ErrorCodes::BAD_ARGUMENTS);
|
||||
else
|
||||
LOG_INFO(log, "Found " << input_files.size() << " files");
|
||||
}
|
||||
else
|
||||
{
|
||||
input_files = options["input-files"].as<std::vector<std::string>>();
|
||||
LOG_INFO(log, "Found " + std::to_string(input_files.size()) + " input files");
|
||||
std::vector<std::string> collected_files;
|
||||
|
||||
std::vector<std::string> collected_files;
|
||||
for (const std::string & filename : input_files)
|
||||
{
|
||||
fs::path file(filename);
|
||||
@ -289,6 +286,8 @@ static std::vector<std::string> getInputFiles(const po::variables_map & options,
|
||||
|
||||
input_files = std::move(collected_files);
|
||||
}
|
||||
|
||||
LOG_INFO(log, "Found " + std::to_string(input_files.size()) + " input files");
|
||||
std::sort(input_files.begin(), input_files.end());
|
||||
return input_files;
|
||||
}
|
||||
|
@ -157,6 +157,9 @@ std::string ReportBuilder::buildFullReport(
|
||||
runJSON.set("avg_bytes_per_second", statistics.avg_bytes_speed_value);
|
||||
}
|
||||
|
||||
runJSON.set("max_memory_usage", statistics.max_memory_usage);
|
||||
runJSON.set("min_memory_usage", statistics.min_memory_usage);
|
||||
|
||||
run_infos.push_back(runJSON);
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ struct TestStats
|
||||
Stopwatch avg_bytes_speed_watch;
|
||||
|
||||
bool last_query_was_cancelled = false;
|
||||
std::string query_id;
|
||||
|
||||
size_t queries = 0;
|
||||
|
||||
@ -49,6 +50,9 @@ struct TestStats
|
||||
size_t number_of_rows_speed_info_batches = 0;
|
||||
size_t number_of_bytes_speed_info_batches = 0;
|
||||
|
||||
UInt64 max_memory_usage = 0;
|
||||
UInt64 min_memory_usage = std::numeric_limits<uint64_t>::max();
|
||||
|
||||
bool ready = false; // check if a query wasn't interrupted by SIGINT
|
||||
std::string exception;
|
||||
|
||||
|
@ -2,9 +2,11 @@
|
||||
#include <IO/Progress.h>
|
||||
#include <DataStreams/RemoteBlockInputStream.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Poco/UUIDGenerator.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
@ -36,7 +38,7 @@ void checkFulfilledConditionsAndUpdate(
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
} // anonymous namespace
|
||||
|
||||
void executeQuery(
|
||||
Connection & connection,
|
||||
@ -47,12 +49,19 @@ void executeQuery(
|
||||
Context & context,
|
||||
const Settings & settings)
|
||||
{
|
||||
static const std::string query_id_prefix
|
||||
= Poco::UUIDGenerator::defaultGenerator().create().toString() + "-";
|
||||
static int next_query_id = 1;
|
||||
|
||||
statistics.watch_per_query.restart();
|
||||
statistics.last_query_was_cancelled = false;
|
||||
statistics.last_query_rows_read = 0;
|
||||
statistics.last_query_bytes_read = 0;
|
||||
statistics.query_id = query_id_prefix + std::to_string(next_query_id++);
|
||||
|
||||
fprintf(stderr, "Query id is '%s'\n", statistics.query_id.c_str());
|
||||
RemoteBlockInputStream stream(connection, query, {}, context, &settings);
|
||||
stream.setQueryId(statistics.query_id);
|
||||
|
||||
stream.setProgressCallback(
|
||||
[&](const Progress & value)
|
||||
@ -69,5 +78,45 @@ void executeQuery(
|
||||
statistics.updateQueryInfo();
|
||||
|
||||
statistics.setTotalTime();
|
||||
|
||||
/// Get max memory usage from the server query log.
|
||||
/// We might have to wait for some time before the query log is updated.
|
||||
int n_waits = 0;
|
||||
const int one_wait_us = 500 * 1000;
|
||||
const int max_waits = (10 * 1000 * 1000) / one_wait_us;
|
||||
for (; n_waits < max_waits; n_waits++)
|
||||
{
|
||||
RemoteBlockInputStream log(connection,
|
||||
"select memory_usage from system.query_log where type = 2 and query_id = '"
|
||||
+ statistics.query_id + "'",
|
||||
{}, context, &settings);
|
||||
|
||||
log.readPrefix();
|
||||
Block block = log.read();
|
||||
if (block.columns() == 0)
|
||||
{
|
||||
log.readSuffix();
|
||||
::usleep(one_wait_us);
|
||||
continue;
|
||||
}
|
||||
assert(block.columns() == 1);
|
||||
assert(block.getDataTypes()[0]->getName() == "UInt64");
|
||||
ColumnPtr column = block.getByPosition(0).column;
|
||||
assert(column->size() == 1);
|
||||
StringRef ref = column->getDataAt(0);
|
||||
assert(ref.size == sizeof(UInt64));
|
||||
const UInt64 memory_usage = *reinterpret_cast<const UInt64*>(ref.data);
|
||||
statistics.max_memory_usage = std::max(statistics.max_memory_usage,
|
||||
memory_usage);
|
||||
statistics.min_memory_usage = std::min(statistics.min_memory_usage,
|
||||
memory_usage);
|
||||
log.readSuffix();
|
||||
|
||||
fprintf(stderr, "Memory usage is %ld\n", memory_usage);
|
||||
break;
|
||||
}
|
||||
fprintf(stderr, "Waited for query log for %.2fs\n",
|
||||
(n_waits * one_wait_us) / 1e6f);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -292,7 +292,7 @@ void RemoteBlockInputStream::sendQuery()
|
||||
established = true;
|
||||
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
|
||||
multiplexed_connections->sendQuery(timeouts, query, "", stage, &context.getClientInfo(), true);
|
||||
multiplexed_connections->sendQuery(timeouts, query, query_id, stage, &context.getClientInfo(), true);
|
||||
|
||||
established = false;
|
||||
sent_query = true;
|
||||
|
@ -46,6 +46,20 @@ public:
|
||||
|
||||
~RemoteBlockInputStream() override;
|
||||
|
||||
/// Set the query_id. For now, used by performance test to later find the query
|
||||
/// in the server query_log. Must be called before sending the query to the
|
||||
/// server.
|
||||
///
|
||||
/// FIXME This should have been a parameter of the constructor, but I can't bring
|
||||
/// myself to add even more parameters. These constructors actually implement
|
||||
/// (in a quite bizarre way) an overloaded function that prepares the multiplexed
|
||||
/// connection wrapper. It should have been a plain function that is run by
|
||||
/// the caller, but apparently that would have been obscenely straighforward,
|
||||
/// too easy to understand and not insane at all, which is a blatant violation
|
||||
/// of our coding conventions.
|
||||
/// I'm not going to rewrite it now, so that I can get at least something done.
|
||||
void setQueryId(std::string _query_id) { assert(!sent_query); query_id = _query_id; }
|
||||
|
||||
/// Specify how we allocate connections on a shard.
|
||||
void setPoolMode(PoolMode pool_mode_) { pool_mode = pool_mode_; }
|
||||
|
||||
@ -95,6 +109,7 @@ private:
|
||||
std::unique_ptr<MultiplexedConnections> multiplexed_connections;
|
||||
|
||||
const String query;
|
||||
String query_id = "";
|
||||
Context context;
|
||||
|
||||
/// Temporary tables needed to be sent to remote servers
|
||||
|
Loading…
Reference in New Issue
Block a user