ClickHouse/src/Storages/StorageInput.cpp

85 lines
2.1 KiB
C++
Raw Normal View History

2019-05-28 18:27:00 +00:00
#include <Storages/StorageInput.h>
#include <Storages/IStorage.h>
#include <Interpreters/Context.h>
#include <DataStreams/IBlockInputStream.h>
#include <memory>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
#include <Processors/Sources/SourceFromInputStream.h>
2019-05-28 18:27:00 +00:00
namespace DB
{
namespace ErrorCodes
{
2019-09-05 13:17:01 +00:00
extern const int INVALID_USAGE_OF_INPUT;
2019-05-28 18:27:00 +00:00
}
2020-03-10 19:36:17 +00:00
StorageInput::StorageInput(const StorageID & table_id, const ColumnsDescription & columns_)
: IStorage(table_id)
2019-05-28 18:27:00 +00:00
{
2019-09-05 13:17:01 +00:00
setColumns(columns_);
2019-05-28 18:27:00 +00:00
}
class StorageInputSource : public SourceWithProgress
2019-05-28 18:27:00 +00:00
{
public:
StorageInputSource(Context & context_, Block sample_block)
: SourceWithProgress(std::move(sample_block)), context(context_)
2019-05-28 18:27:00 +00:00
{
}
Chunk generate() override
{
auto block = context.getInputBlocksReaderCallback()(context);
if (!block)
return {};
2019-05-28 18:27:00 +00:00
UInt64 num_rows = block.rows();
return Chunk(block.getColumns(), num_rows);
}
2019-05-28 18:27:00 +00:00
String getName() const override { return "Input"; }
2019-05-28 18:27:00 +00:00
private:
Context & context;
};
void StorageInput::setInputStream(BlockInputStreamPtr input_stream_)
{
input_stream = input_stream_;
}
Pipes StorageInput::read(const Names & /*column_names*/,
2019-05-28 18:27:00 +00:00
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned /*num_streams*/)
{
Pipes pipes;
2019-05-28 18:27:00 +00:00
Context & query_context = const_cast<Context &>(context).getQueryContext();
/// It is TCP request if we have callbacks for input().
if (query_context.getInputBlocksReaderCallback())
{
/// Send structure to the client.
query_context.initializeInput(shared_from_this());
pipes.emplace_back(std::make_shared<StorageInputSource>(query_context, getSampleBlock()));
return pipes;
2019-05-28 18:27:00 +00:00
}
if (!input_stream)
2019-09-05 13:17:01 +00:00
throw Exception("Input stream is not initialized, input() must be used only in INSERT SELECT query", ErrorCodes::INVALID_USAGE_OF_INPUT);
2019-05-28 18:27:00 +00:00
pipes.emplace_back(std::make_shared<SourceFromInputStream>(input_stream));
return pipes;
2019-05-28 18:27:00 +00:00
}
}