Fix limits for subqueries.

This commit is contained in:
Nikolai Kochetov 2019-04-10 15:38:57 +03:00
parent 6eb097fb54
commit 539c06636f
3 changed files with 30 additions and 4 deletions

View File

@ -444,15 +444,25 @@ void QueryPipeline::unitePipelines(
void QueryPipeline::setProgressCallback(const ProgressCallback & callback)
{
for (auto & processor : processors)
{
if (auto * source = typeid_cast<SourceFromInputStream *>(processor.get()))
source->getStream()->setProgressCallback(callback);
if (auto * source = typeid_cast<CreatingSetsTransform *>(processor.get()))
source->setProgressCallback(callback);
}
}
void QueryPipeline::setProcessListElement(QueryStatus * elem)
{
for (auto & processor : processors)
{
if (auto * source = typeid_cast<SourceFromInputStream *>(processor.get()))
source->getStream()->setProcessListElement(elem);
if (auto * source = typeid_cast<CreatingSetsTransform *>(processor.get()))
source->setProcessListElement(elem);
}
}
void QueryPipeline::finalize()

View File

@ -111,10 +111,6 @@ void CreatingSetsTransform::init()
{
is_initialized = true;
const Settings & settings = context.getSettingsRef();
network_transfer_limits = SizeLimits(
settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode);
for (auto & elem : subqueries_for_sets)
if (elem.second.source && elem.second.set)
elem.second.set->setHeader(elem.second.source->getHeader());
@ -210,4 +206,18 @@ void CreatingSetsTransform::work()
elapsed_nanoseconds += watch.elapsedNanoseconds();
}
void CreatingSetsTransform::setProgressCallback(const ProgressCallback & callback)
{
for (auto & elem : subqueries_for_sets)
if (elem.second.source)
elem.second.source->setProgressCallback(callback);
}
void CreatingSetsTransform::setProcessListElement(QueryStatus * status)
{
for (auto & elem : subqueries_for_sets)
if (elem.second.source)
elem.second.source->setProcessListElement(status);
}
}

View File

@ -6,6 +6,9 @@
namespace DB
{
struct Progress;
using ProgressCallback = std::function<void(const Progress & progress)>;
/// This processor creates sets during execution.
/// Don't return any data. Sets are created when Finish status is returned.
/// In general, several work() methods need to be called to finish.
@ -23,6 +26,9 @@ public:
Status prepare() override;
void work() override;
void setProgressCallback(const ProgressCallback & callback);
void setProcessListElement(QueryStatus * status);
protected:
bool finished = false;