mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 13:13:36 +00:00
260 lines
7.6 KiB
C++
260 lines
7.6 KiB
C++
#pragma once
|
|
|
|
#include <common/logger_useful.h>
|
|
|
|
#include <Common/ConcurrentBoundedQueue.h>
|
|
#include <DataStreams/IBlockInputStream.h>
|
|
#include <DataStreams/ParallelInputsProcessor.h>
|
|
|
|
|
|
namespace DB
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
{
|
|
extern const int LOGICAL_ERROR;
|
|
}
|
|
|
|
|
|
/** Merges several sources into one.
|
|
* Blocks from different sources are interleaved with each other in an arbitrary way.
|
|
* You can specify the number of threads (max_threads),
|
|
* in which data will be retrieved from different sources.
|
|
*
|
|
* It's managed like this:
|
|
* - with the help of ParallelInputsProcessor in several threads it takes out blocks from the sources;
|
|
* - the completed blocks are added to a limited queue of finished blocks;
|
|
* - the main thread takes out completed blocks from the queue of finished blocks;
|
|
*/
|
|
class UnionBlockInputStream final : public IBlockInputStream
|
|
{
|
|
private:
|
|
/// A block or an exception.
|
|
struct OutputData
|
|
{
|
|
Block block;
|
|
std::exception_ptr exception;
|
|
|
|
OutputData() {}
|
|
OutputData(Block & block_) : block(block_) {}
|
|
OutputData(std::exception_ptr & exception_) : exception(exception_) {}
|
|
};
|
|
|
|
public:
|
|
using ExceptionCallback = std::function<void()>;
|
|
|
|
UnionBlockInputStream(
|
|
BlockInputStreams inputs,
|
|
BlockInputStreamPtr additional_input_at_end,
|
|
size_t max_threads,
|
|
ExceptionCallback exception_callback_ = ExceptionCallback()
|
|
) :
|
|
output_queue(std::min(inputs.size(), max_threads)), handler(*this),
|
|
processor(inputs, additional_input_at_end, max_threads, handler),
|
|
exception_callback(exception_callback_)
|
|
{
|
|
children = inputs;
|
|
if (additional_input_at_end)
|
|
children.push_back(additional_input_at_end);
|
|
|
|
size_t num_children = children.size();
|
|
if (num_children > 1)
|
|
{
|
|
Block header = children.at(0)->getHeader();
|
|
for (size_t i = 1; i < num_children; ++i)
|
|
assertBlocksHaveEqualStructure(children[i]->getHeader(), header, "UNION");
|
|
}
|
|
}
|
|
|
|
String getName() const override { return "Union"; }
|
|
|
|
~UnionBlockInputStream() override
|
|
{
|
|
try
|
|
{
|
|
if (!all_read)
|
|
cancel(false);
|
|
|
|
finalize();
|
|
}
|
|
catch (...)
|
|
{
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
}
|
|
}
|
|
|
|
/** Different from the default implementation by trying to stop all sources,
|
|
* skipping failed by execution.
|
|
*/
|
|
void cancel(bool kill) override
|
|
{
|
|
if (kill)
|
|
is_killed = true;
|
|
|
|
bool old_val = false;
|
|
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
|
|
return;
|
|
|
|
//std::cerr << "cancelling\n";
|
|
processor.cancel(kill);
|
|
}
|
|
|
|
Block getHeader() const override { return children.at(0)->getHeader(); }
|
|
|
|
protected:
|
|
void finalize()
|
|
{
|
|
if (!started)
|
|
return;
|
|
|
|
LOG_TRACE(log, "Waiting for threads to finish");
|
|
|
|
std::exception_ptr exception;
|
|
if (!all_read)
|
|
{
|
|
/** Let's read everything up to the end, so that ParallelInputsProcessor is not blocked when trying to insert into the queue.
|
|
* Maybe there is an exception in the queue.
|
|
*/
|
|
OutputData res;
|
|
while (true)
|
|
{
|
|
//std::cerr << "popping\n";
|
|
output_queue.pop(res);
|
|
|
|
if (res.exception)
|
|
{
|
|
if (!exception)
|
|
exception = res.exception;
|
|
else if (Exception * e = exception_cast<Exception *>(exception))
|
|
e->addMessage("\n" + getExceptionMessage(res.exception, false));
|
|
}
|
|
else if (!res.block)
|
|
break;
|
|
}
|
|
|
|
all_read = true;
|
|
}
|
|
|
|
processor.wait();
|
|
|
|
LOG_TRACE(log, "Waited for threads to finish");
|
|
|
|
if (exception)
|
|
std::rethrow_exception(exception);
|
|
}
|
|
|
|
/// Do nothing, to make the preparation for the query execution in parallel, in ParallelInputsProcessor.
|
|
void readPrefix() override
|
|
{
|
|
}
|
|
|
|
/** The following options are possible:
|
|
* 1. `readImpl` function is called until it returns an empty block.
|
|
* Then `readSuffix` function is called and then destructor.
|
|
* 2. `readImpl` function is called. At some point, `cancel` function is called perhaps from another thread.
|
|
* Then `readSuffix` function is called and then destructor.
|
|
* 3. At any time, the object can be destroyed (destructor called).
|
|
*/
|
|
|
|
Block readImpl() override
|
|
{
|
|
if (all_read)
|
|
return received_payload.block;
|
|
|
|
/// Run threads if this has not already been done.
|
|
if (!started)
|
|
{
|
|
started = true;
|
|
processor.process();
|
|
}
|
|
|
|
/// We will wait until the next block is ready or an exception is thrown.
|
|
//std::cerr << "popping\n";
|
|
output_queue.pop(received_payload);
|
|
|
|
if (received_payload.exception)
|
|
{
|
|
if (exception_callback)
|
|
exception_callback();
|
|
std::rethrow_exception(received_payload.exception);
|
|
}
|
|
|
|
if (!received_payload.block)
|
|
all_read = true;
|
|
|
|
return received_payload.block;
|
|
}
|
|
|
|
/// Called either after everything is read, or after cancel.
|
|
void readSuffix() override
|
|
{
|
|
//std::cerr << "readSuffix\n";
|
|
if (!all_read && !isCancelled())
|
|
throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
finalize();
|
|
|
|
for (size_t i = 0; i < children.size(); ++i)
|
|
children[i]->readSuffix();
|
|
}
|
|
|
|
private:
|
|
using Payload = OutputData;
|
|
using OutputQueue = ConcurrentBoundedQueue<Payload>;
|
|
|
|
/** The queue of the finished blocks. Also, you can put an exception instead of a block.
|
|
* When data is run out, an empty block is inserted into the queue.
|
|
* Sooner or later, an empty block is always inserted into the queue (even after exception or query cancellation).
|
|
* The queue is always (even after exception or canceling the query, even in destructor) you must read up to an empty block,
|
|
* otherwise ParallelInputsProcessor can be blocked during insertion into the queue.
|
|
*/
|
|
OutputQueue output_queue;
|
|
|
|
struct Handler
|
|
{
|
|
Handler(UnionBlockInputStream & parent_) : parent(parent_) {}
|
|
|
|
void onBlock(Block & block, size_t /*thread_num*/)
|
|
{
|
|
parent.output_queue.push(Payload(block));
|
|
}
|
|
|
|
void onFinish()
|
|
{
|
|
parent.output_queue.push(Payload());
|
|
}
|
|
|
|
void onFinishThread(size_t /*thread_num*/)
|
|
{
|
|
}
|
|
|
|
void onException(std::exception_ptr & exception, size_t /*thread_num*/)
|
|
{
|
|
//std::cerr << "pushing exception\n";
|
|
|
|
/// The order of the rows matters. If it is changed, then the situation is possible,
|
|
/// when before exception, an empty block (end of data) will be put into the queue,
|
|
/// and the exception is lost.
|
|
|
|
parent.output_queue.push(exception);
|
|
parent.cancel(false); /// Does not throw exceptions.
|
|
}
|
|
|
|
UnionBlockInputStream & parent;
|
|
};
|
|
|
|
Handler handler;
|
|
ParallelInputsProcessor<Handler> processor;
|
|
|
|
ExceptionCallback exception_callback;
|
|
|
|
Payload received_payload;
|
|
|
|
bool started = false;
|
|
bool all_read = false;
|
|
|
|
Poco::Logger * log = &Poco::Logger::get("UnionBlockInputStream");
|
|
};
|
|
|
|
}
|