fix race condition while initializing BlockStreamProfileInfo [#CLICKHOUSE-3099]

This commit is contained in:
Alexey Zatelepin 2017-06-26 15:30:35 +03:00
parent 061c214c16
commit 1917de0380
4 changed files with 28 additions and 21 deletions

View File

@ -1,11 +1,11 @@
#include <DataStreams/BlockStreamProfileInfo.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Core/Block.h>
#include <DataStreams/BlockStreamProfileInfo.h>
namespace DB
{
@ -71,14 +71,17 @@ void BlockStreamProfileInfo::update(Block & block)
void BlockStreamProfileInfo::collectInfosForStreamsWithName(const char * name, BlockStreamProfileInfos & res) const
{
if (stream_name == name)
if (parent->getName() == name)
{
res.push_back(this);
return;
}
for (const auto & nested_info : nested_infos)
nested_info->collectInfosForStreamsWithName(name, res);
for (const auto & child_stream : parent->getChildren())
{
if (const auto * profiling_child = dynamic_cast<const IProfilingBlockInputStream *>(child_stream.get()))
profiling_child->getProfileInfo().collectInfosForStreamsWithName(name, res);
}
}
@ -102,9 +105,14 @@ void BlockStreamProfileInfo::calculateRowsBeforeLimit() const
BlockStreamProfileInfos & limits_or_sortings = partial_sortings.empty() ? limits : partial_sortings;
for (const auto & info_limit_or_sort : limits_or_sortings)
for (const auto & nested_info : info_limit_or_sort->nested_infos)
rows_before_limit += nested_info->rows;
for (const BlockStreamProfileInfo * info_limit_or_sort : limits_or_sortings)
{
for (const auto & child_stream : info_limit_or_sort->parent->getChildren())
{
if (const auto * profiling_child = dynamic_cast<const IProfilingBlockInputStream *>(child_stream.get()))
rows_before_limit += profiling_child->getProfileInfo().rows;
}
}
}
else
{

View File

@ -15,22 +15,22 @@ namespace DB
class Block;
class ReadBuffer;
class WriteBuffer;
class IProfilingBlockInputStream;
/// Information for profiling. See IProfilingBlockInputStream.h
struct BlockStreamProfileInfo
{
/// Info about stream object this profile info refers to.
IProfilingBlockInputStream * parent = nullptr;
bool started = false;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Time with waiting time
String stream_name; /// The short name of the stream for which information is collected
size_t rows = 0;
size_t blocks = 0;
size_t bytes = 0;
/// Information about nested threads - to calculate pure processing time.
using BlockStreamProfileInfos = std::vector<const BlockStreamProfileInfo *>;
BlockStreamProfileInfos nested_infos;
/// Collect BlockStreamProfileInfo for the nearest sources in the tree named `name`. Example; collect all info for PartialSorting streams.
void collectInfosForStreamsWithName(const char * name, BlockStreamProfileInfos & res) const;

View File

@ -18,6 +18,11 @@ namespace ErrorCodes
}
IProfilingBlockInputStream::IProfilingBlockInputStream()
{
info.parent = this;
}
Block IProfilingBlockInputStream::read()
{
collectAndSendTotalRowsApprox();
@ -25,14 +30,6 @@ Block IProfilingBlockInputStream::read()
if (!info.started)
{
info.total_stopwatch.start();
info.stream_name = getName();
for (const auto & child : children)
if (const IProfilingBlockInputStream * p_child = dynamic_cast<const IProfilingBlockInputStream *>(&*child))
info.nested_infos.push_back(&p_child->info);
/// Note that after this, `children` elements can not be deleted before you might need to work with `nested_info`.
info.started = true;
}

View File

@ -27,6 +27,8 @@ using ProfilingBlockInputStreamPtr = std::shared_ptr<IProfilingBlockInputStream>
class IProfilingBlockInputStream : public IBlockInputStream
{
public:
IProfilingBlockInputStream();
Block read() override final;
/** The default implementation calls readPrefixImpl() on itself, and then readPrefix() recursively for all children.