Processors support for StorageURL reading.

This commit is contained in:
Nikolai Kochetov 2020-02-17 18:01:03 +03:00
parent df76f1fe56
commit 5372942aef
2 changed files with 29 additions and 26 deletions

View File

@ -17,6 +17,8 @@
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <Poco/Net/HTTPRequest.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
namespace DB
@ -48,26 +50,27 @@ IStorageURLBase::IStorageURLBase(
namespace
{
class StorageURLBlockInputStream : public IBlockInputStream
class StorageURLSource : public SourceWithProgress
{
public:
StorageURLBlockInputStream(const Poco::URI & uri,
StorageURLSource(const Poco::URI & uri,
const std::string & method,
std::function<void(std::ostream &)> callback,
const String & format,
const String & name_,
String name_,
const Block & sample_block,
const Context & context,
const ColumnDefaults & column_defaults,
UInt64 max_block_size,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
: name(name_)
: SourceWithProgress(sample_block), name(std::move(name_))
{
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
uri,
method,
callback,
std::move(callback),
timeouts,
context.getSettingsRef().max_http_get_redirects,
Poco::Net::HTTPBasicCredentials{},
@ -77,6 +80,7 @@ namespace
compression_method);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
}
String getName() const override
@ -84,30 +88,28 @@ namespace
return name;
}
Block readImpl() override
Chunk generate() override
{
return reader->read();
}
if (!reader)
return {};
Block getHeader() const override
{
return reader->getHeader();
}
void readPrefixImpl() override
{
if (!initialized)
reader->readPrefix();
}
void readSuffixImpl() override
{
if (auto block = reader->read())
return Chunk(block.getColumns(), block.rows());
reader->readSuffix();
reader.reset();
return {};
}
private:
String name;
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader;
bool initialized = false;
};
class StorageURLBlockOutputStream : public IBlockOutputStream
@ -181,7 +183,7 @@ std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(con
}
BlockInputStreams IStorageURLBase::read(const Names & column_names,
Pipes IStorageURLBase::readWithProcessors(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
@ -193,21 +195,20 @@ BlockInputStreams IStorageURLBase::read(const Names & column_names,
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
BlockInputStreamPtr block_input = std::make_shared<StorageURLBlockInputStream>(request_uri,
Pipes pipes;
pipes.emplace_back(std::make_shared<StorageURLSource>(request_uri,
getReadMethod(),
getReadPOSTDataCallback(column_names, query_info, context, processed_stage, max_block_size),
format_name,
getName(),
getHeaderBlock(column_names),
context,
getColumns().getDefaults(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(request_uri.getPath(), compression_method));
chooseCompressionMethod(request_uri.getPath(), compression_method)));
auto column_defaults = getColumns().getDefaults();
if (column_defaults.empty())
return {block_input};
return {std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context)};
return pipes;
}
BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const Context & /*context*/)

View File

@ -16,7 +16,7 @@ namespace DB
class IStorageURLBase : public IStorage
{
public:
BlockInputStreams read(
Pipes readWithProcessors(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -24,6 +24,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
protected: