Processors support for StorageS3 reading.

This commit is contained in:
Nikolai Kochetov 2020-02-03 21:01:41 +03:00
parent 71f746e01a
commit bc757f6b24
2 changed files with 36 additions and 24 deletions

View File

@ -22,6 +22,8 @@
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <aws/s3/S3Client.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
namespace DB
@ -34,23 +36,27 @@ namespace ErrorCodes
namespace
{
class StorageS3BlockInputStream : public IBlockInputStream
class StorageS3Source : public SourceWithProgress
{
public:
StorageS3BlockInputStream(
StorageS3Source(
const String & format,
const String & name_,
String name_,
const Block & sample_block,
const Context & context,
const ColumnDefaults & column_defaults,
UInt64 max_block_size,
const CompressionMethod compression_method,
const std::shared_ptr<Aws::S3::S3Client> & client,
const String & bucket,
const String & key)
: name(name_)
: SourceWithProgress(sample_block), name(std::move(name_))
{
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromS3>(client, bucket, key), compression_method);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
if (!column_defaults.empty())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
}
String getName() const override
@ -58,30 +64,34 @@ namespace
return name;
}
Block readImpl() override
Chunk generate() override
{
return reader->read();
}
if (!reader)
return {};
Block getHeader() const override
{
return reader->getHeader();
}
if (!initialized)
{
reader->readSuffix();
initialized = true;
}
void readPrefixImpl() override
{
reader->readPrefix();
}
if (auto block = reader->read())
{
UInt64 num_rows = block.rows();
return Chunk(block.getColumns(), num_rows);
}
void readSuffixImpl() override
{
reader->readSuffix();
reader.reset();
return {};
}
private:
String name;
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader;
bool initialized = false;
};
class StorageS3BlockOutputStream : public IBlockOutputStream
@ -158,7 +168,7 @@ StorageS3::StorageS3(
}
BlockInputStreams StorageS3::read(
Pipes StorageS3::readWithProcessors(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
@ -166,21 +176,21 @@ BlockInputStreams StorageS3::read(
size_t max_block_size,
unsigned /*num_streams*/)
{
BlockInputStreamPtr block_input = std::make_shared<StorageS3BlockInputStream>(
auto block_input = std::make_shared<StorageS3Source>(
format_name,
getName(),
getHeaderBlock(column_names),
context,
getColumns().getDefaults(),
max_block_size,
chooseCompressionMethod(uri.endpoint, compression_method),
client,
uri.bucket,
uri.key);
auto column_defaults = getColumns().getDefaults();
if (column_defaults.empty())
return {block_input};
return {std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context)};
Pipes pipes;
pipes.emplace_back(std::move(block_input));
return pipes;
}
BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const Context & /*context*/)

View File

@ -46,7 +46,7 @@ public:
return getSampleBlock();
}
BlockInputStreams read(
Pipes readWithProcessors(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -54,6 +54,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
private: