Fix double readPrefix call.

This commit is contained in:
Nikolai Kochetov 2019-04-11 16:04:43 +03:00
parent 403df591dc
commit a72f402468
8 changed files with 135 additions and 47 deletions

View File

@ -677,7 +677,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (prepared_input)
{
if constexpr (pipeline_with_processors)
pipeline.init({std::make_shared<SourceFromInputStream>(source_header, prepared_input)});
pipeline.init({std::make_shared<SourceFromInputStream>(std::make_shared<InputStreamHolder>(prepared_input))});
else
pipeline.streams.push_back(prepared_input);
}
@ -779,7 +779,8 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if constexpr (pipeline_with_processors)
{
auto source = std::make_shared<SourceFromInputStream>(stream->getHeader(), std::move(stream));
auto holder = std::make_shared<InputStreamHolder>(std::move(stream));
auto source = std::make_shared<SourceFromInputStream>(std::move(holder));
pipeline.addDelayedStream(source);
}
else
@ -1321,13 +1322,20 @@ void InterpreterSelectQuery::executeFetchColumns(
Processors sources;
sources.reserve(streams.size());
InputStreamHolders holders;
holders.reserve(streams.size());
for (auto & stream : streams)
sources.emplace_back(std::make_shared<SourceFromInputStream>(stream->getHeader(), stream));
{
auto holder = std::make_shared<InputStreamHolder>(stream);
sources.emplace_back(std::make_shared<SourceFromInputStream>(holder));
holders.emplace_back(std::move(holder));
}
pipeline.init(std::move(sources));
if (processing_stage == QueryProcessingStage::Complete)
pipeline.addTotals(std::make_shared<SourceFromTotals>(streams));
pipeline.addTotals(std::make_shared<SourceFromTotals>(std::move(holders)));
}
else
pipeline.streams = std::move(streams);

View File

@ -460,7 +460,7 @@ void QueryPipeline::setProgressCallback(const ProgressCallback & callback)
for (auto & processor : processors)
{
if (auto * source = typeid_cast<SourceFromInputStream *>(processor.get()))
source->getStream()->setProgressCallback(callback);
source->getStream().setProgressCallback(callback);
if (auto * source = typeid_cast<CreatingSetsTransform *>(processor.get()))
source->setProgressCallback(callback);
@ -472,7 +472,7 @@ void QueryPipeline::setProcessListElement(QueryStatus * elem)
for (auto & processor : processors)
{
if (auto * source = typeid_cast<SourceFromInputStream *>(processor.get()))
source->getStream()->setProcessListElement(elem);
source->getStream().setProcessListElement(elem);
if (auto * source = typeid_cast<CreatingSetsTransform *>(processor.get()))
source->setProcessListElement(elem);

View File

@ -0,0 +1,38 @@
#include <Processors/Sources/InputStreamHolder.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
Block InputStreamHolder::read()
{
if (stream_finished)
return {};
if (!initialized)
{
stream->readPrefix();
initialized = true;
}
return stream->read();
}
void InputStreamHolder::readSuffix()
{
std::lock_guard lock_guard(lock);
if (stream_finished)
return;
if (!initialized)
{
stream->readPrefix();
initialized = true;
}
stream->readSuffix();
stream_finished = true;
}
}

View File

@ -0,0 +1,40 @@
#pragma once
#include <Core/Block.h>
#include <mutex>
namespace DB
{
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
class InputStreamHolder
{
public:
explicit InputStreamHolder(BlockInputStreamPtr stream_) : stream(std::move(stream_)) {}
/// Unsafe. Only for single thread.
Block read();
/// Safe for multiple threads.
void readSuffix();
bool isFinished() const { return stream_finished; }
IBlockInputStream & getStream() { return *stream; }
const IBlockInputStream & getStream() const { return *stream; }
private:
bool initialized = false;
bool stream_finished = false;
BlockInputStreamPtr stream;
std::mutex lock;
};
using InputStreamHolderPtr = std::shared_ptr<InputStreamHolder>;
using InputStreamHolders = std::vector<InputStreamHolderPtr>;
}

View File

@ -5,8 +5,8 @@
namespace DB
{
SourceFromInputStream::SourceFromInputStream(Block header, BlockInputStreamPtr stream)
: ISource(std::move(header)), stream(std::move(stream))
SourceFromInputStream::SourceFromInputStream(InputStreamHolderPtr holder_)
: ISource(holder_->getStream().getHeader()), holder(std::move(holder_))
{
auto & sample = getPort().getHeader();
for (auto & type : sample.getDataTypes())
@ -16,20 +16,13 @@ SourceFromInputStream::SourceFromInputStream(Block header, BlockInputStreamPtr s
Chunk SourceFromInputStream::generate()
{
if (stream_finished)
if (holder->isFinished())
return {};
if (!initialized)
{
stream->readPrefix();
initialized = true;
}
auto block = stream->read();
auto block = holder->read();
if (!block)
{
stream->readSuffix();
stream_finished = true;
holder->readSuffix();
return {};
}

View File

@ -1,28 +1,23 @@
#pragma once
#include <Processors/ISource.h>
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Sources/InputStreamHolder.h>
namespace DB
{
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
class SourceFromInputStream : public ISource
{
public:
SourceFromInputStream(Block header, BlockInputStreamPtr stream);
explicit SourceFromInputStream(InputStreamHolderPtr holder_);
String getName() const override { return "SourceFromInputStream"; }
Chunk generate() override;
BlockInputStreamPtr & getStream() { return stream; }
IBlockInputStream & getStream() { return holder->getStream(); }
private:
bool initialized = false;
bool stream_finished = false;
bool has_aggregate_functions = false;
BlockInputStreamPtr stream;
InputStreamHolderPtr holder;
};
}

View File

@ -0,0 +1,30 @@
#include <Processors/Sources/SourceFromTotals.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
SourceFromTotals::SourceFromTotals(InputStreamHolders holders_)
: ISource(holders_.at(0)->getStream().getHeader()), holders(std::move(holders_))
{
}
Chunk SourceFromTotals::generate()
{
if (generated)
return {};
generated = true;
for (auto & holder : holders)
{
holder->readSuffix();
if (auto block = holder->getStream().getTotals())
return Chunk(block.getColumns(), 1);
}
return {};
}
}

View File

@ -1,39 +1,23 @@
#pragma once
#include <Processors/ISource.h>
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Sources/InputStreamHolder.h>
namespace DB
{
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockInputStreams = std::vector<BlockInputStreamPtr>;
class SourceFromTotals : public ISource
{
public:
explicit SourceFromTotals(BlockInputStreams streams_)
: ISource(streams_.at(0)->getHeader()), streams(std::move(streams_)) {}
explicit SourceFromTotals(InputStreamHolders holders_);
String getName() const override { return "SourceFromTotals"; }
Chunk generate() override
{
if (generated)
return {};
generated = true;
for (auto & stream : streams)
if (auto block = stream->getTotals())
return Chunk(block.getColumns(), 1);
return {};
}
Chunk generate() override;
private:
bool generated = false;
BlockInputStreams streams;
InputStreamHolders holders;
};
}