Merge pull request #1954 from yandex/fix-race-condition-in-lazy-stream

Fix race condition in lazy stream
This commit is contained in:
alexey-milovidov 2018-02-23 14:16:37 +03:00 committed by GitHub
commit 05be50de89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 111 additions and 130 deletions

View File

@ -77,11 +77,11 @@ void BlockStreamProfileInfo::collectInfosForStreamsWithName(const char * name, B
return; return;
} }
for (const auto & child_stream : parent->getChildren()) parent->forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{ {
if (const auto * profiling_child = dynamic_cast<const IProfilingBlockInputStream *>(child_stream.get())) child.getProfileInfo().collectInfosForStreamsWithName(name, res);
profiling_child->getProfileInfo().collectInfosForStreamsWithName(name, res); return false;
} });
} }
@ -107,11 +107,11 @@ void BlockStreamProfileInfo::calculateRowsBeforeLimit() const
for (const BlockStreamProfileInfo * info_limit_or_sort : limits_or_sortings) for (const BlockStreamProfileInfo * info_limit_or_sort : limits_or_sortings)
{ {
for (const auto & child_stream : info_limit_or_sort->parent->getChildren()) info_limit_or_sort->parent->forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{ {
if (const auto * profiling_child = dynamic_cast<const IProfilingBlockInputStream *>(child_stream.get())) rows_before_limit += child.getProfileInfo().rows;
rows_before_limit += profiling_child->getProfileInfo().rows; return false;
} });
} }
} }
else else

View File

@ -35,7 +35,7 @@ void CreatingSetsBlockInputStream::readPrefixImpl()
} }
const Block & CreatingSetsBlockInputStream::getTotals() Block CreatingSetsBlockInputStream::getTotals()
{ {
auto input = dynamic_cast<IProfilingBlockInputStream *>(children.back().get()); auto input = dynamic_cast<IProfilingBlockInputStream *>(children.back().get());
@ -148,24 +148,20 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
if (table_out) if (table_out)
table_out->writeSuffix(); table_out->writeSuffix();
/// We will display information about how many rows and bytes are read.
size_t rows = 0;
size_t bytes = 0;
watch.stop(); watch.stop();
subquery.source->getLeafRowsBytes(rows, bytes);
size_t head_rows = 0; size_t head_rows = 0;
if (IProfilingBlockInputStream * profiling_in = dynamic_cast<IProfilingBlockInputStream *>(&*subquery.source)) if (IProfilingBlockInputStream * profiling_in = dynamic_cast<IProfilingBlockInputStream *>(&*subquery.source))
{ {
head_rows = profiling_in->getProfileInfo().rows; const BlockStreamProfileInfo & profile_info = profiling_in->getProfileInfo();
head_rows = profile_info.rows;
if (subquery.join) if (subquery.join)
subquery.join->setTotals(profiling_in->getTotals()); subquery.join->setTotals(profiling_in->getTotals());
} }
if (rows != 0) if (head_rows != 0)
{ {
std::stringstream msg; std::stringstream msg;
msg << std::fixed << std::setprecision(3); msg << std::fixed << std::setprecision(3);
@ -178,9 +174,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
if (subquery.table) if (subquery.table)
msg << "Table with " << head_rows << " rows. "; msg << "Table with " << head_rows << " rows. ";
msg << "Read " << rows << " rows, " << bytes / 1048576.0 << " MiB in " << watch.elapsedSeconds() << " sec., " msg << "In " << watch.elapsedSeconds() << " sec.";
<< static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., " << bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.";
LOG_DEBUG(log, msg.rdbuf()); LOG_DEBUG(log, msg.rdbuf());
} }
else else

View File

@ -38,7 +38,7 @@ public:
Block getHeader() const override { return children.back()->getHeader(); } Block getHeader() const override { return children.back()->getHeader(); }
/// Takes `totals` only from the main source, not from subquery sources. /// Takes `totals` only from the main source, not from subquery sources.
const Block & getTotals() override; Block getTotals() override;
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -13,7 +13,7 @@ ExpressionBlockInputStream::ExpressionBlockInputStream(const BlockInputStreamPtr
String ExpressionBlockInputStream::getName() const { return "Expression"; } String ExpressionBlockInputStream::getName() const { return "Expression"; }
const Block & ExpressionBlockInputStream::getTotals() Block ExpressionBlockInputStream::getTotals()
{ {
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back())) if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{ {

View File

@ -22,7 +22,7 @@ public:
ExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_); ExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_);
String getName() const override; String getName() const override;
const Block & getTotals() override; Block getTotals() override;
Block getHeader() const override; Block getHeader() const override;
protected: protected:

View File

@ -53,7 +53,7 @@ FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input
String FilterBlockInputStream::getName() const { return "Filter"; } String FilterBlockInputStream::getName() const { return "Filter"; }
const Block & FilterBlockInputStream::getTotals() Block FilterBlockInputStream::getTotals()
{ {
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back())) if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{ {

View File

@ -25,7 +25,7 @@ public:
FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name_); FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name_);
String getName() const override; String getName() const override;
const Block & getTotals() override; Block getTotals() override;
Block getHeader() const override; Block getHeader() const override;
protected: protected:

View File

@ -13,6 +13,10 @@ namespace ErrorCodes
} }
/** It's safe to access children without mutex as long as these methods are called before first call to read, readPrefix.
*/
String IBlockInputStream::getTreeID() const String IBlockInputStream::getTreeID() const
{ {
std::stringstream s; std::stringstream s;
@ -87,44 +91,5 @@ void IBlockInputStream::dumpTree(std::ostream & ostr, size_t indent, size_t mult
} }
} }
BlockInputStreams IBlockInputStream::getLeaves()
{
BlockInputStreams res;
getLeavesImpl(res, nullptr);
return res;
}
void IBlockInputStream::getLeafRowsBytes(size_t & rows, size_t & bytes)
{
BlockInputStreams leaves = getLeaves();
rows = 0;
bytes = 0;
for (BlockInputStreams::const_iterator it = leaves.begin(); it != leaves.end(); ++it)
{
if (const IProfilingBlockInputStream * profiling = dynamic_cast<const IProfilingBlockInputStream *>(&**it))
{
const BlockStreamProfileInfo & info = profiling->getProfileInfo();
rows += info.rows;
bytes += info.bytes;
}
}
}
void IBlockInputStream::getLeavesImpl(BlockInputStreams & res, const BlockInputStreamPtr & this_shared_ptr)
{
if (children.empty())
{
if (this_shared_ptr)
res.push_back(this_shared_ptr);
}
else
for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it)
(*it)->getLeavesImpl(res, *it);
}
} }

View File

@ -89,18 +89,13 @@ public:
/// In case of isGroupedOutput or isSortedOutput, return corresponding SortDescription /// In case of isGroupedOutput or isSortedOutput, return corresponding SortDescription
virtual const SortDescription & getSortDescription() const { throw Exception("Output of " + getName() + " is not sorted", ErrorCodes::OUTPUT_IS_NOT_SORTED); } virtual const SortDescription & getSortDescription() const { throw Exception("Output of " + getName() + " is not sorted", ErrorCodes::OUTPUT_IS_NOT_SORTED); }
BlockInputStreams & getChildren() { return children; } /** Must be called before read, readPrefix.
*/
void dumpTree(std::ostream & ostr, size_t indent = 0, size_t multiplier = 1); void dumpTree(std::ostream & ostr, size_t indent = 0, size_t multiplier = 1);
/// Get leaf sources (not including this one).
BlockInputStreams getLeaves();
/// Get the number of rows and bytes read in the leaf sources.
void getLeafRowsBytes(size_t & rows, size_t & bytes);
/** Check the depth of the pipeline. /** Check the depth of the pipeline.
* If max_depth is specified and the `depth` is greater - throw an exception. * If max_depth is specified and the `depth` is greater - throw an exception.
* Must be called before read, readPrefix.
*/ */
size_t checkDepth(size_t max_depth) const; size_t checkDepth(size_t max_depth) const;
@ -108,13 +103,22 @@ public:
*/ */
void addTableLock(const TableStructureReadLockPtr & lock) { table_locks.push_back(lock); } void addTableLock(const TableStructureReadLockPtr & lock) { table_locks.push_back(lock); }
protected:
TableStructureReadLocks table_locks;
template <typename F>
void forEachChild(F && f)
{
std::lock_guard lock(children_mutex);
for (auto & child : children)
if (f(*child))
return;
}
protected:
BlockInputStreams children; BlockInputStreams children;
std::mutex children_mutex;
private: private:
void getLeavesImpl(BlockInputStreams & res, const BlockInputStreamPtr & this_shared_ptr); TableStructureReadLocks table_locks;
size_t checkDepthImpl(size_t max_depth, size_t level) const; size_t checkDepthImpl(size_t max_depth, size_t level) const;

View File

@ -66,7 +66,7 @@ public:
*/ */
void addTableLock(const TableStructureReadLockPtr & lock) { table_locks.push_back(lock); } void addTableLock(const TableStructureReadLockPtr & lock) { table_locks.push_back(lock); }
protected: private:
TableStructureReadLocks table_locks; TableStructureReadLocks table_locks;
}; };

View File

@ -93,15 +93,21 @@ void IProfilingBlockInputStream::readPrefix()
{ {
readPrefixImpl(); readPrefixImpl();
for (auto & child : children) forEachChild([&] (IBlockInputStream & child)
child->readPrefix(); {
child.readPrefix();
return false;
});
} }
void IProfilingBlockInputStream::readSuffix() void IProfilingBlockInputStream::readSuffix()
{ {
for (auto & child : children) forEachChild([&] (IBlockInputStream & child)
child->readSuffix(); {
child.readSuffix();
return false;
});
readSuffixImpl(); readSuffixImpl();
} }
@ -350,9 +356,11 @@ void IProfilingBlockInputStream::cancel()
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return; return;
for (auto & child : children) forEachProfilingChild([] (IProfilingBlockInputStream & child)
if (IProfilingBlockInputStream * p_child = dynamic_cast<IProfilingBlockInputStream *>(&*child)) {
p_child->cancel(); child.cancel();
return false;
});
} }
@ -360,9 +368,11 @@ void IProfilingBlockInputStream::setProgressCallback(const ProgressCallback & ca
{ {
progress_callback = callback; progress_callback = callback;
for (auto & child : children) forEachProfilingChild([&] (IProfilingBlockInputStream & child)
if (IProfilingBlockInputStream * p_child = dynamic_cast<IProfilingBlockInputStream *>(&*child)) {
p_child->setProgressCallback(callback); child.setProgressCallback(callback);
return false;
});
} }
@ -370,46 +380,44 @@ void IProfilingBlockInputStream::setProcessListElement(ProcessListElement * elem
{ {
process_list_elem = elem; process_list_elem = elem;
for (auto & child : children) forEachProfilingChild([&] (IProfilingBlockInputStream & child)
if (IProfilingBlockInputStream * p_child = dynamic_cast<IProfilingBlockInputStream *>(&*child)) {
p_child->setProcessListElement(elem); child.setProcessListElement(elem);
return false;
});
} }
const Block & IProfilingBlockInputStream::getTotals() Block IProfilingBlockInputStream::getTotals()
{ {
if (totals) if (totals)
return totals; return totals;
for (auto & child : children) Block res;
forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{ {
if (IProfilingBlockInputStream * p_child = dynamic_cast<IProfilingBlockInputStream *>(&*child)) res = child.getTotals();
{ if (res)
const Block & res = p_child->getTotals(); return true;
if (res) return false;
return res; });
} return res;
}
return totals;
} }
const Block & IProfilingBlockInputStream::getExtremes() const Block IProfilingBlockInputStream::getExtremes()
{ {
if (extremes) if (extremes)
return extremes; return extremes;
for (const auto & child : children) Block res;
forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{ {
if (const IProfilingBlockInputStream * p_child = dynamic_cast<const IProfilingBlockInputStream *>(&*child)) res = child.getExtremes();
{ if (res)
const Block & res = p_child->getExtremes(); return true;
if (res) return false;
return res; });
} return res;
}
return extremes;
} }
} }

View File

@ -26,6 +26,8 @@ using ProfilingBlockInputStreamPtr = std::shared_ptr<IProfilingBlockInputStream>
*/ */
class IProfilingBlockInputStream : public IBlockInputStream class IProfilingBlockInputStream : public IBlockInputStream
{ {
friend struct BlockStreamProfileInfo;
public: public:
IProfilingBlockInputStream(); IProfilingBlockInputStream();
@ -56,10 +58,10 @@ public:
* Call this method only after all the data has been retrieved with `read`, * Call this method only after all the data has been retrieved with `read`,
* otherwise there will be problems if any data at the same time is computed in another thread. * otherwise there will be problems if any data at the same time is computed in another thread.
*/ */
virtual const Block & getTotals(); virtual Block getTotals();
/// The same for minimums and maximums. /// The same for minimums and maximums.
const Block & getExtremes() const; Block getExtremes();
/** Set the execution progress bar callback. /** Set the execution progress bar callback.
@ -181,6 +183,13 @@ protected:
/// Minimums and maximums. The first row of the block - minimums, the second - the maximums. /// Minimums and maximums. The first row of the block - minimums, the second - the maximums.
Block extremes; Block extremes;
void addChild(BlockInputStreamPtr & child)
{
std::lock_guard lock(children_mutex);
children.push_back(child);
}
private: private:
bool enabled_extremes = false; bool enabled_extremes = false;
@ -214,6 +223,17 @@ private:
bool checkDataSizeLimits(); bool checkDataSizeLimits();
bool checkTimeLimits(); bool checkTimeLimits();
void checkQuota(Block & block); void checkQuota(Block & block);
template <typename F>
void forEachProfilingChild(F && f)
{
std::lock_guard lock(children_mutex);
for (auto & child : children)
if (IProfilingBlockInputStream * p_child = dynamic_cast<IProfilingBlockInputStream *>(child.get()))
if (f(*p_child))
return;
}
}; };
} }

View File

@ -27,12 +27,6 @@ public:
String getName() const override { return name; } String getName() const override { return name; }
void cancel() override
{
std::lock_guard<std::mutex> lock(cancel_mutex);
IProfilingBlockInputStream::cancel();
}
Block getHeader() const override Block getHeader() const override
{ {
return header; return header;
@ -62,9 +56,7 @@ protected:
input->readPrefix(); input->readPrefix();
{ {
std::lock_guard<std::mutex> lock(cancel_mutex); addChild(input);
children.push_back(input);
if (isCancelled() && p_input) if (isCancelled() && p_input)
p_input->cancel(); p_input->cancel();
@ -80,8 +72,6 @@ private:
Generator generator; Generator generator;
BlockInputStreamPtr input; BlockInputStreamPtr input;
std::mutex cancel_mutex;
}; };
} }

View File

@ -47,7 +47,7 @@ static void finalize(Block & block)
} }
const Block & TotalsHavingBlockInputStream::getTotals() Block TotalsHavingBlockInputStream::getTotals()
{ {
if (!totals) if (!totals)
{ {

View File

@ -27,7 +27,7 @@ public:
String getName() const override { return "TotalsHaving"; } String getName() const override { return "TotalsHaving"; }
const Block & getTotals() override; Block getTotals() override;
Block getHeader() const override; Block getHeader() const override;

View File

@ -400,7 +400,7 @@ void TCPHandler::processTablesStatusRequest()
void TCPHandler::sendProfileInfo() void TCPHandler::sendProfileInfo()
{ {
if (const IProfilingBlockInputStream * input = dynamic_cast<const IProfilingBlockInputStream *>(&*state.io.in)) if (const IProfilingBlockInputStream * input = dynamic_cast<const IProfilingBlockInputStream *>(state.io.in.get()))
{ {
writeVarUInt(Protocol::Server::ProfileInfo, *out); writeVarUInt(Protocol::Server::ProfileInfo, *out);
input->getProfileInfo().write(*out); input->getProfileInfo().write(*out);
@ -411,7 +411,7 @@ void TCPHandler::sendProfileInfo()
void TCPHandler::sendTotals() void TCPHandler::sendTotals()
{ {
if (IProfilingBlockInputStream * input = dynamic_cast<IProfilingBlockInputStream *>(&*state.io.in)) if (IProfilingBlockInputStream * input = dynamic_cast<IProfilingBlockInputStream *>(state.io.in.get()))
{ {
const Block & totals = input->getTotals(); const Block & totals = input->getTotals();
@ -432,9 +432,9 @@ void TCPHandler::sendTotals()
void TCPHandler::sendExtremes() void TCPHandler::sendExtremes()
{ {
if (const IProfilingBlockInputStream * input = dynamic_cast<const IProfilingBlockInputStream *>(&*state.io.in)) if (IProfilingBlockInputStream * input = dynamic_cast<IProfilingBlockInputStream *>(state.io.in.get()))
{ {
const Block & extremes = input->getExtremes(); Block extremes = input->getExtremes();
if (extremes) if (extremes)
{ {