#include namespace DB { ISource::ISource(Block header) : IProcessor({}, {std::move(header)}), output(outputs.front()) { } ISource::Status ISource::prepare() { if (finished || isCancelled()) { output.finish(); return Status::Finished; } /// Check can output. if (output.isFinished()) return Status::Finished; if (!output.canPush()) return Status::PortFull; if (!has_input) return Status::Ready; output.pushData(std::move(current_chunk)); has_input = false; if (got_exception) { finished = true; output.finish(); return Status::Finished; } /// Now, we pushed to output, and it must be full. return Status::PortFull; } void ISource::work() { try { current_chunk.chunk = generate(); if (!current_chunk.chunk || isCancelled()) finished = true; else has_input = true; } catch (...) { finished = true; throw; } } }