#include #include #include #include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; } SourceFromInputStream::SourceFromInputStream(BlockInputStreamPtr stream_, bool force_add_aggregating_info_) : ISourceWithProgress(stream_->getHeader()) , force_add_aggregating_info(force_add_aggregating_info_) , stream(std::move(stream_)) { init(); } void SourceFromInputStream::init() { auto & sample = getPort().getHeader(); for (auto & type : sample.getDataTypes()) if (typeid_cast(type.get())) has_aggregate_functions = true; } void SourceFromInputStream::addTotalsPort() { if (has_totals_port) throw Exception("Totals port was already added for SourceFromInputStream.", ErrorCodes::LOGICAL_ERROR); outputs.emplace_back(outputs.front().getHeader(), this); has_totals_port = true; } IProcessor::Status SourceFromInputStream::prepare() { auto status = ISource::prepare(); if (status == Status::Finished) { is_generating_finished = true; /// Read postfix and get totals if needed. if (!is_stream_finished && !isCancelled()) return Status::Ready; if (has_totals_port) { auto & totals_out = outputs.back(); if (totals_out.isFinished()) return Status::Finished; if (has_totals) { if (!totals_out.canPush()) return Status::PortFull; totals_out.push(std::move(totals)); has_totals = false; } totals_out.finish(); } } return status; } void SourceFromInputStream::work() { if (!is_generating_finished) { try { ISource::work(); } catch (...) { /// Won't read suffix in case of exception. is_stream_finished = true; throw; } return; } if (is_stream_finished) return; /// Don't cancel for RemoteBlockInputStream (otherwise readSuffix can stack) if (!typeid_cast(stream.get())) stream->cancel(false); if (rows_before_limit) { auto & info = stream->getProfileInfo(); if (info.hasAppliedLimit()) rows_before_limit->add(info.getRowsBeforeLimit()); } stream->readSuffix(); if (auto totals_block = stream->getTotals()) { totals.setColumns(totals_block.getColumns(), 1); has_totals = true; } is_stream_finished = true; } Chunk SourceFromInputStream::generate() { if (is_stream_finished) return {}; if (!is_stream_started) { stream->readPrefix(); is_stream_started = true; } auto block = stream->read(); if (!block && !isCancelled()) { if (rows_before_limit) { auto & info = stream->getProfileInfo(); if (info.hasAppliedLimit()) rows_before_limit->add(info.getRowsBeforeLimit()); } stream->readSuffix(); if (auto totals_block = stream->getTotals()) { if (totals_block.rows() == 1) /// Sometimes we can get empty totals. Skip it. { totals.setColumns(totals_block.getColumns(), 1); has_totals = true; } } is_stream_finished = true; return {}; } #ifndef NDEBUG assertBlocksHaveEqualStructure(getPort().getHeader(), block, "SourceFromInputStream"); #endif UInt64 num_rows = block.rows(); Chunk chunk(block.getColumns(), num_rows); if (force_add_aggregating_info || has_aggregate_functions) { auto info = std::make_shared(); info->bucket_num = block.info.bucket_num; info->is_overflows = block.info.is_overflows; chunk.setChunkInfo(std::move(info)); } return chunk; } }