Thread safe performance statistics

This commit is contained in:
Dmitrii Kovalkov 2020-05-26 13:15:44 +02:00
parent 4a9891c601
commit 991cbf397a
3 changed files with 119 additions and 87 deletions

View File

@ -109,7 +109,7 @@ inline UInt64x4 CombineValues(UInt64x4 a, UInt64x4 b)
/// Now every 8-byte value in xa is xx....xx and every value in xb is ..xxxx.. where x is random byte we want to use.
/// Just blend them to get the result vector.
/// result = xa[0],xb[1,2],xa[3,4],xb[5,6],xa[7,8],xb[9,10],xa[11,12],xb[13,14],xa[15]
__m256i result = _mm256_blend_epi16(xa, xb, 0x66);
__m256i result = _mm256_blend_epi16(xa, xb, 0x66);
return reinterpret_cast<UInt64x4>(result);
}

View File

@ -40,6 +40,7 @@ DECLARE_MULTITARGET_CODE(
struct RandImpl
{
/// Fill memory with random data. The memory region must be 15-bytes padded.
static void execute(char * output, size_t size);
static String getImplementationTag() { return ToString(BuildArch); }
};

View File

@ -6,6 +6,7 @@
#include <Common/Stopwatch.h>
#include <Interpreters/Context.h>
#include <mutex>
#include <random>
/// This file contains Adaptors which help to combine several implementations of the function.
@ -20,104 +21,138 @@ namespace ErrorCodes
extern const int NO_SUITABLE_FUNCTION_IMPLEMENTATION;
}
// TODO(dakovalkov): This is copied and pasted struct from LZ4_decompress_faster.h with little changes.
struct PerformanceStatistics
namespace detail
{
struct Element
class PerformanceStatistics
{
double count = 0;
double sum = 0;
double adjustedCount() const
public:
size_t select(bool considarable)
{
return count - NUM_INVOCATIONS_TO_THROW_OFF;
/// We don't need to choose/measure anything if there's only one variant.
if (size() == 1)
return 0;
std::lock_guard guard(lock);
size_t best = 0;
double best_sample = data[0].sample(rng);
for (size_t i = 1; i < data.size(); ++i)
{
double sample = data[i].sample(rng);
if (sample < best_sample)
{
best_sample = sample;
best = i;
}
}
if (considarable)
data[best].run();
return best;
}
double mean() const
void complete(size_t id, double seconds, double bytes)
{
return sum / adjustedCount();
if (size() == 1)
return;
std::lock_guard guard(lock);
data[id].complete(seconds, bytes);
}
/// For better convergence, we don't use proper estimate of stddev.
/// We want to eventually separate between two algorithms even in case
/// when there is no statistical significant difference between them.
double sigma() const
size_t size() const
{
return mean() / sqrt(adjustedCount());
return data.size();
}
void update(double seconds, double bytes)
bool empty() const
{
++count;
if (count > NUM_INVOCATIONS_TO_THROW_OFF)
sum += seconds / bytes;
return size() == 0;
}
double sample(pcg64 & stat_rng) const
void emplace_back()
{
/// If there is a variant with not enough statistics, always choose it.
/// And in that case prefer variant with less number of invocations.
data.emplace_back();
}
private:
struct Element
{
int completed_count = 0;
int running_count = 0;
double sum = 0;
int adjustedCount() const
{
return completed_count - NUM_INVOCATIONS_TO_THROW_OFF;
}
double mean() const
{
return sum / adjustedCount();
}
/// For better convergence, we don't use proper estimate of stddev.
/// We want to eventually separate between two algorithms even in case
/// when there is no statistical significant difference between them.
double sigma() const
{
return mean() / sqrt(adjustedCount());
}
void run()
{
++running_count;
}
void complete(double seconds, double bytes)
{
--running_count;
++completed_count;
if (adjustedCount() > 0)
sum += seconds / bytes;
}
double sample(pcg64 & stat_rng) const
{
/// If there is a variant with not enough statistics, always choose it.
/// And in that case prefer variant with less number of invocations.
if (adjustedCount() < 2)
return adjustedCount() - 1 + running_count * 2;
if (adjustedCount() < 2)
return adjustedCount() - 1;
else
return std::normal_distribution<>(mean(), sigma())(stat_rng);
}
}
};
std::vector<Element> data;
std::mutex lock;
/// It's Ok that generator is not seeded.
pcg64 rng;
/// Cold invocations may be affected by additional memory latencies. Don't take first invocations into account.
static constexpr int NUM_INVOCATIONS_TO_THROW_OFF = 2;
};
/// Cold invocations may be affected by additional memory latencies. Don't take first invocations into account.
static constexpr double NUM_INVOCATIONS_TO_THROW_OFF = 2;
template <typename T, class = decltype(T::getImplementationTag())>
std::true_type hasImplementationTagTest(const T&);
std::false_type hasImplementationTagTest(...);
/// How to select method to run.
/// -1 - automatically, based on statistics (default);
/// -2 - choose methods in round robin fashion (for performance testing).
/// >= 0 - always choose specified method (for performance testing);
ssize_t choose_method = -1;
template <typename T>
constexpr bool has_implementation_tag = decltype(hasImplementationTagTest(std::declval<T>()))::value;
std::vector<Element> data;
/// It's Ok that generator is not seeded.
pcg64 rng;
/// To select from different algorithms we use a kind of "bandits" algorithm.
/// Sample random values from estimated normal distributions and choose the minimal.
size_t select()
template <typename T>
String getImplementationTag(TargetArch arch)
{
if (choose_method < 0)
{
std::vector<double> samples(data.size());
for (size_t i = 0; i < data.size(); ++i)
samples[i] = choose_method == -1
? data[i].sample(rng)
: data[i].adjustedCount();
return std::min_element(samples.begin(), samples.end()) - samples.begin();
}
if constexpr (has_implementation_tag<T>)
return ToString(arch) + "_" + T::getImplementationTag();
else
return choose_method;
return ToString(arch);
}
}
size_t size() const
{
return data.size();
}
bool empty() const
{
return size() == 0;
}
void emplace_back()
{
data.emplace_back();
}
PerformanceStatistics() {}
PerformanceStatistics(ssize_t choose_method_) : choose_method(choose_method_) {}
};
/* Class which is used to store implementations for the function and selecting the best one to run
/* Class which is used to store implementations for the function and to select the best one to run
* based on processor architecture and statistics from previous runs.
*
* FunctionInterface is typically IFunction or IExecutableFunctionImpl, but practically it can be
@ -170,7 +205,10 @@ public:
throw Exception("There are no available implementations for function " "TODO(dakovalkov): add name",
ErrorCodes::NO_SUITABLE_FUNCTION_IMPLEMENTATION);
auto id = statistics.select();
/// Statistics shouldn't rely on small blocks.
bool considerable = (input_rows_count > 1000);
size_t id = statistics.select(considerable);
Stopwatch watch;
if constexpr (std::is_same_v<FunctionInterface, IFunction>)
@ -180,17 +218,10 @@ public:
watch.stop();
// TODO(dakovalkov): Calculate something more informative.
size_t rows_summary = 0;
for (auto i : arguments)
if (considerable)
{
rows_summary += block.getByPosition(i).column->size();
}
rows_summary += block.getByPosition(result).column->size();
if (rows_summary >= 1000)
{
statistics.data[id].update(watch.elapsedSeconds(), rows_summary);
// TODO(dakovalkov): Calculate something more informative than rows count.
statistics.complete(id, watch.elapsedSeconds(), input_rows_count);
}
}
@ -210,7 +241,7 @@ public:
{
// TODO(dakovalkov): make this option better.
const auto & choose_impl = context.getSettingsRef().function_implementation.value;
if (choose_impl.empty() || choose_impl == FunctionImpl::getImplementationTag())
if (choose_impl.empty() || choose_impl == detail::getImplementationTag<FunctionImpl>(Arch))
{
implementations.emplace_back(std::make_shared<FunctionImpl>(std::forward<Args>(args)...));
statistics.emplace_back();
@ -221,7 +252,7 @@ public:
private:
const Context & context;
std::vector<ImplementationPtr> implementations;
PerformanceStatistics statistics;
detail::PerformanceStatistics statistics;
};
}