Processors support for StorageHDFS reading.

This commit is contained in:
Nikolai Kochetov 2020-01-31 17:06:43 +03:00
parent 2d1f06a49f
commit 6870132713
2 changed files with 108 additions and 67 deletions

View File

@ -25,6 +25,8 @@
#include <re2/re2.h>
#include <re2/stringpiece.h>
#include <hdfs/hdfs.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
namespace DB
{
@ -63,24 +65,46 @@ StorageHDFS::StorageHDFS(const String & uri_,
namespace
{
class HDFSBlockInputStream : public IBlockInputStream
class HDFSSource : public SourceWithProgress
{
public:
HDFSBlockInputStream(const String & uri,
bool need_path,
bool need_file,
const String & format,
const Block & sample_block,
const Context & context,
UInt64 max_block_size,
const CompressionMethod compression_method)
struct SourcesInfo
{
std::vector<String> uris;
std::atomic<size_t> next_uri_to_read = 0;
bool need_path_column = false;
bool need_file_column = false;
};
using SourcesInfoPtr = std::shared_ptr<SourcesInfo>;
static Block getHeader(Block header, bool need_path_column, bool need_file_column)
{
if (need_path_column)
header.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_path"});
if (need_file_column)
header.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_file"});
return header;
}
HDFSSource(
SourcesInfoPtr source_info_,
String uri_,
String format_,
Block sample_block_,
const Context & context_,
UInt64 max_block_size_)
: SourceWithProgress(getHeader(sample_block_, source_info_->need_path_column, source_info_->need_file_column))
, source_info(std::move(source_info_))
, uri(std::move(uri_))
, format(std::move(format_))
, max_block_size(max_block_size_)
, sample_block(std::move(sample_block_))
, context(context_)
{
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri), compression_method);
file_path = uri;
with_file_column = need_file;
with_path_column = need_path;
auto input_stream = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(read_buf));
}
String getName() const override
@ -88,53 +112,65 @@ public:
return "HDFS";
}
Block readImpl() override
Chunk generate() override
{
auto res = reader->read();
if (res)
while (true)
{
if (with_path_column)
res.insert({DataTypeString().createColumnConst(res.rows(), file_path)->convertToFullColumnIfConst(), std::make_shared<DataTypeString>(),
"_path"}); /// construction with const is for probably generating less code
if (with_file_column)
if (!reader)
{
size_t last_slash_pos = file_path.find_last_of('/');
res.insert({DataTypeString().createColumnConst(res.rows(), file_path.substr(
last_slash_pos + 1))->convertToFullColumnIfConst(), std::make_shared<DataTypeString>(),
"_file"});
auto pos = source_info->next_uri_to_read.fetch_add(1);
if (pos >= source_info->uris.size())
return {};
auto path = source_info->uris[pos];
current_path = uri + path;
auto compression = chooseCompressionMethod(path, format);
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(current_path), compression);
auto input_stream = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(read_buf));
reader->readPrefix();
}
if (auto res = reader->read())
{
Columns columns = res.getColumns();
UInt64 num_rows = res.rows();
/// Enrich with virtual columns.
if (source_info->need_path_column)
{
auto column = DataTypeString().createColumnConst(num_rows, current_path);
columns.push_back(column->convertToFullColumnIfConst());
}
if (source_info->need_file_column)
{
size_t last_slash_pos = current_path.find_last_of('/');
auto file_name = current_path.substr(last_slash_pos + 1);
auto column = DataTypeString().createColumnConst(num_rows, std::move(file_name));
columns.push_back(column->convertToFullColumnIfConst());
}
return Chunk(std::move(columns), num_rows);
}
reader->readSuffix();
}
return res;
}
Block getHeader() const override
{
auto res = reader->getHeader();
if (res)
{
if (with_path_column)
res.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_path"});
if (with_file_column)
res.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_file"});
}
return res;
}
void readPrefixImpl() override
{
reader->readPrefix();
}
void readSuffixImpl() override
{
reader->readSuffix();
}
private:
BlockInputStreamPtr reader;
String file_path;
bool with_path_column = false;
bool with_file_column = false;
SourcesInfoPtr source_info;
String uri;
String format;
String current_path;
UInt64 max_block_size;
Block sample_block;
const Context & context;
};
class HDFSBlockOutputStream : public IBlockOutputStream
@ -228,7 +264,7 @@ Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, c
}
BlockInputStreams StorageHDFS::read(
Pipes StorageHDFS::readWithProcessors(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context_,
@ -243,24 +279,27 @@ BlockInputStreams StorageHDFS::read(
HDFSBuilderPtr builder = createHDFSBuilder(uri_without_path + "/");
HDFSFSPtr fs = createHDFSFS(builder.get());
const Strings res_paths = LSWithRegexpMatching("/", fs, path_from_uri);
BlockInputStreams result;
bool need_path_column = false;
bool need_file_column = false;
auto sources_info = std::make_shared<HDFSSource::SourcesInfo>();
sources_info->uris = LSWithRegexpMatching("/", fs, path_from_uri);
for (const auto & column : column_names)
{
if (column == "_path")
need_path_column = true;
sources_info->need_path_column = true;
if (column == "_file")
need_file_column = true;
}
for (const auto & res_path : res_paths)
{
result.push_back(std::make_shared<HDFSBlockInputStream>(uri_without_path + res_path, need_path_column, need_file_column, format_name, getSampleBlock(), context_,
max_block_size, chooseCompressionMethod(res_path, compression_method)));
sources_info->need_file_column = true;
}
return narrowBlockInputStreams(result, num_streams);
if (num_streams > sources_info->uris.size())
num_streams = sources_info->uris.size();
Pipes pipes;
for (size_t i = 0; i < num_streams; ++i)
pipes.emplace_back(std::make_shared<HDFSSource>(
sources_info, uri_without_path, format_name, getSampleBlock(), context_, max_block_size));
return pipes;
}
BlockOutputStreamPtr StorageHDFS::write(const ASTPtr & /*query*/, const Context & /*context*/)

View File

@ -19,13 +19,15 @@ class StorageHDFS : public ext::shared_ptr_helper<StorageHDFS>, public IStorage
public:
String getName() const override { return "HDFS"; }
BlockInputStreams read(const Names & column_names,
Pipes readWithProcessors(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
protected: