ClickHouse/src/Storages/StorageInput.cpp
Azat Khuzhin 4e76629aaf Fixes for -Wshorten-64-to-32
- lots of static_cast
- add safe_cast
- types adjustments
  - config
  - IStorage::read/watch
  - ...
- some TODO's (to convert types in future)

P.S. That was quite a journey...

v2: fixes after rebase
v3: fix conflicts after #42308 merged
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-10-21 13:25:19 +02:00

79 lines
2.0 KiB
C++

#include <Storages/StorageInput.h>
#include <Storages/IStorage.h>
#include <Interpreters/Context.h>
#include <memory>
#include <Processors/ISource.h>
#include <QueryPipeline/Pipe.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_USAGE_OF_INPUT;
}
StorageInput::StorageInput(const StorageID & table_id, const ColumnsDescription & columns_)
: IStorage(table_id)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
}
class StorageInputSource : public ISource, WithContext
{
public:
StorageInputSource(ContextPtr context_, Block sample_block) : ISource(std::move(sample_block)), WithContext(context_) {}
Chunk generate() override
{
auto block = getContext()->getInputBlocksReaderCallback()(getContext());
if (!block)
return {};
UInt64 num_rows = block.rows();
return Chunk(block.getColumns(), num_rows);
}
String getName() const override { return "Input"; }
};
void StorageInput::setPipe(Pipe pipe_)
{
pipe = std::move(pipe_);
}
Pipe StorageInput::read(
const Names & /*column_names*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
size_t /*num_streams*/)
{
Pipes pipes;
auto query_context = context->getQueryContext();
/// It is TCP request if we have callbacks for input().
if (query_context->getInputBlocksReaderCallback())
{
/// Send structure to the client.
query_context->initializeInput(shared_from_this());
return Pipe(std::make_shared<StorageInputSource>(query_context, storage_snapshot->metadata->getSampleBlock()));
}
if (pipe.empty())
throw Exception("Input stream is not initialized, input() must be used only in INSERT SELECT query", ErrorCodes::INVALID_USAGE_OF_INPUT);
return std::move(pipe);
}
}