#pragma once #include #include #include #include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; } /** Merges several sources into one. * Blocks from different sources are interleaved with each other in an arbitrary way. * You can specify the number of threads (max_threads), * in which data will be retrieved from different sources. * * It's managed like this: * - with the help of ParallelInputsProcessor in several threads it takes out blocks from the sources; * - the completed blocks are added to a limited queue of finished blocks; * - the main thread takes out completed blocks from the queue of finished blocks; */ class UnionBlockInputStream final : public IBlockInputStream { private: /// A block or an exception. struct OutputData { Block block; std::exception_ptr exception; OutputData() {} OutputData(Block & block_) : block(block_) {} OutputData(std::exception_ptr & exception_) : exception(exception_) {} }; public: using ExceptionCallback = std::function; UnionBlockInputStream( BlockInputStreams inputs, BlockInputStreamPtr additional_input_at_end, size_t max_threads, ExceptionCallback exception_callback_ = ExceptionCallback() ) : output_queue(std::min(inputs.size(), max_threads)), handler(*this), processor(inputs, additional_input_at_end, max_threads, handler), exception_callback(exception_callback_) { children = inputs; if (additional_input_at_end) children.push_back(additional_input_at_end); size_t num_children = children.size(); if (num_children > 1) { Block header = children.at(0)->getHeader(); for (size_t i = 1; i < num_children; ++i) assertBlocksHaveEqualStructure(children[i]->getHeader(), header, "UNION"); } } String getName() const override { return "Union"; } ~UnionBlockInputStream() override { try { if (!all_read) cancel(false); finalize(); } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); } } /** Different from the default implementation by trying to stop all sources, * skipping failed by execution. */ void cancel(bool kill) override { if (kill) is_killed = true; bool old_val = false; if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) return; //std::cerr << "cancelling\n"; processor.cancel(kill); } Block getHeader() const override { return children.at(0)->getHeader(); } protected: void finalize() { if (!started) return; LOG_TRACE(log, "Waiting for threads to finish"); std::exception_ptr exception; if (!all_read) { /** Let's read everything up to the end, so that ParallelInputsProcessor is not blocked when trying to insert into the queue. * Maybe there is an exception in the queue. */ OutputData res; while (true) { //std::cerr << "popping\n"; output_queue.pop(res); if (res.exception) { if (!exception) exception = res.exception; else if (Exception * e = exception_cast(exception)) e->addMessage("\n" + getExceptionMessage(res.exception, false)); } else if (!res.block) break; } all_read = true; } processor.wait(); LOG_TRACE(log, "Waited for threads to finish"); if (exception) std::rethrow_exception(exception); } /// Do nothing, to make the preparation for the query execution in parallel, in ParallelInputsProcessor. void readPrefix() override { } /** The following options are possible: * 1. `readImpl` function is called until it returns an empty block. * Then `readSuffix` function is called and then destructor. * 2. `readImpl` function is called. At some point, `cancel` function is called perhaps from another thread. * Then `readSuffix` function is called and then destructor. * 3. At any time, the object can be destroyed (destructor called). */ Block readImpl() override { if (all_read) return received_payload.block; /// Run threads if this has not already been done. if (!started) { started = true; processor.process(); } /// We will wait until the next block is ready or an exception is thrown. //std::cerr << "popping\n"; output_queue.pop(received_payload); if (received_payload.exception) { if (exception_callback) exception_callback(); std::rethrow_exception(received_payload.exception); } if (!received_payload.block) all_read = true; return received_payload.block; } /// Called either after everything is read, or after cancel. void readSuffix() override { //std::cerr << "readSuffix\n"; if (!all_read && !isCancelled()) throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR); finalize(); for (size_t i = 0; i < children.size(); ++i) children[i]->readSuffix(); } private: using Payload = OutputData; using OutputQueue = ConcurrentBoundedQueue; /** The queue of the finished blocks. Also, you can put an exception instead of a block. * When data is run out, an empty block is inserted into the queue. * Sooner or later, an empty block is always inserted into the queue (even after exception or query cancellation). * The queue is always (even after exception or canceling the query, even in destructor) you must read up to an empty block, * otherwise ParallelInputsProcessor can be blocked during insertion into the queue. */ OutputQueue output_queue; struct Handler { Handler(UnionBlockInputStream & parent_) : parent(parent_) {} void onBlock(Block & block, size_t /*thread_num*/) { parent.output_queue.push(Payload(block)); } void onFinish() { parent.output_queue.push(Payload()); } void onFinishThread(size_t /*thread_num*/) { } void onException(std::exception_ptr & exception, size_t /*thread_num*/) { //std::cerr << "pushing exception\n"; /// The order of the rows matters. If it is changed, then the situation is possible, /// when before exception, an empty block (end of data) will be put into the queue, /// and the exception is lost. parent.output_queue.push(exception); parent.cancel(false); /// Does not throw exceptions. } UnionBlockInputStream & parent; }; Handler handler; ParallelInputsProcessor processor; ExceptionCallback exception_callback; Payload received_payload; bool started = false; bool all_read = false; Logger * log = &Logger::get("UnionBlockInputStream"); }; }