#include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; } IAccumulatingTransform::IAccumulatingTransform(Block input_header, Block output_header) : IProcessor({std::move(input_header)}, {std::move(output_header)}), input(inputs.front()), output(outputs.front()) { } InputPort * IAccumulatingTransform::addTotalsPort() { if (inputs.size() > 1) throw Exception("Totals port was already added to IAccumulatingTransform", ErrorCodes::LOGICAL_ERROR); return &inputs.emplace_back(getInputPort().getHeader(), this); } IAccumulatingTransform::Status IAccumulatingTransform::prepare() { /// Check can output. if (output.isFinished()) { input.close(); return Status::Finished; } if (!output.canPush()) { input.setNotNeeded(); return Status::PortFull; } /// Output if has data. if (current_output_chunk) output.push(std::move(current_output_chunk)); if (finished_generate) { output.finish(); return Status::Finished; } /// Close input if flag was set manually. if (finished_input) input.close(); /// Read from totals port if has it. if (input.isFinished()) { if (inputs.size() > 1) { auto & totals_input = inputs.back(); if (!totals_input.isFinished()) { totals_input.setNeeded(); if (!totals_input.hasData()) return Status::NeedData; totals = totals_input.pull(); totals_input.close(); } } } /// Generate output block. if (input.isFinished()) { finished_input = true; return Status::Ready; } /// Check can input. if (!has_input) { input.setNeeded(); if (!input.hasData()) return Status::NeedData; current_input_chunk = input.pull(); has_input = true; } return Status::Ready; } void IAccumulatingTransform::work() { if (!finished_input) { consume(std::move(current_input_chunk)); has_input = false; } else { current_output_chunk = generate(); if (!current_output_chunk) finished_generate = true; } } void IAccumulatingTransform::setReadyChunk(Chunk chunk) { if (current_output_chunk) throw Exception("IAccumulatingTransform already has input. Cannot set another chunk. " "Probably, setReadyChunk method was called twice per consume().", ErrorCodes::LOGICAL_ERROR); current_output_chunk = std::move(chunk); } }