ClickHouse/dbms/src/Processors/Sources/SourceFromInputStream.cpp

48 lines
1.3 KiB
C++
Raw Normal View History

2019-03-26 18:28:37 +00:00
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <DataTypes/DataTypeAggregateFunction.h>
2019-03-26 18:28:37 +00:00
namespace DB
{
SourceFromInputStream::SourceFromInputStream(InputStreamHolderPtr holder_, bool force_add_aggregating_info)
: ISource(holder_->getStream().getHeader())
, force_add_aggregating_info(force_add_aggregating_info)
, holder(std::move(holder_))
2019-03-26 18:28:37 +00:00
{
auto & sample = getPort().getHeader();
for (auto & type : sample.getDataTypes())
if (typeid_cast<const DataTypeAggregateFunction *>(type.get()))
has_aggregate_functions = true;
2019-03-26 18:28:37 +00:00
}
Chunk SourceFromInputStream::generate()
{
2019-04-11 13:04:43 +00:00
if (holder->isFinished())
2019-03-26 18:28:37 +00:00
return {};
2019-04-11 13:04:43 +00:00
auto block = holder->read();
2019-03-26 18:28:37 +00:00
if (!block)
{
2019-04-11 13:04:43 +00:00
holder->readSuffix();
2019-03-26 18:28:37 +00:00
return {};
}
assertBlocksHaveEqualStructure(getPort().getHeader(), block, "SourceFromInputStream");
UInt64 num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);
if (force_add_aggregating_info || has_aggregate_functions)
{
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = block.info.bucket_num;
info->is_overflows = block.info.is_overflows;
chunk.setChunkInfo(std::move(info));
}
return chunk;
2019-03-26 18:28:37 +00:00
}
}