Add a template for ParquetBlockInputStream

This commit is contained in:
Ivan Zhukov 2018-05-06 22:33:30 +03:00
parent d30b98073e
commit fabd38fe6d
3 changed files with 69 additions and 0 deletions

View File

@ -29,6 +29,7 @@
#include <DataStreams/MaterializingBlockOutputStream.h>
#include <DataStreams/FormatFactory.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/ParquetBlockInputStream.h>
#include <DataTypes/FormatSettingsJSON.h>
#if USE_CAPNP
#include <DataStreams/CapnProtoRowInputStream.h>
@ -88,6 +89,10 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
return wrap_row_stream(std::make_shared<CSVRowInputStream>(buf, sample, csv_delimiter, with_names));
}
else if (name == "Parquet")
{
return std::make_shared<ParquetBlockInputStream>(buf, sample);
}
else if (name == "TSKV")
{
return wrap_row_stream(std::make_shared<TSKVRowInputStream>(buf, sample, settings.input_format_skip_unknown_fields));

View File

@ -0,0 +1,38 @@
#include <DataStreams/ParquetBlockInputStream.h>
#include <IO/copyData.h>
#include <IO/WriteBufferFromString.h>
namespace DB
{
ParquetBlockInputStream::ParquetBlockInputStream(ReadBuffer & istr_, const Block & header_)
: istr(istr_)
, header(header_)
{
}
Block ParquetBlockInputStream::getHeader() const
{
return header;
}
Block ParquetBlockInputStream::readImpl()
{
Block res;
if (istr.eof())
return res;
// TODO: maybe use parquet::RandomAccessSource?
std::string file_data;
{
WriteBufferFromString file_buffer(file_data);
copyData(istr, file_buffer);
}
return res;
}
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
/* #include <DataStreams/MarkInCompressedFile.h> */
/* #include <Common/PODArray.h> */
namespace DB
{
class ParquetBlockInputStream : public IProfilingBlockInputStream
{
public:
ParquetBlockInputStream(ReadBuffer & istr_, const Block & header_);
String getName() const override { return "Parquet"; }
Block getHeader() const override;
protected:
Block readImpl() override;
private:
ReadBuffer & istr;
Block header;
};
}