2019-08-17 22:53:46 +00:00
|
|
|
#include "ParquetBlockOutputFormat.h"
|
2019-06-25 17:19:32 +00:00
|
|
|
|
2019-02-21 18:36:46 +00:00
|
|
|
#if USE_PARQUET
|
|
|
|
|
|
|
|
// TODO: clean includes
|
2019-08-21 02:28:04 +00:00
|
|
|
#include <Columns/ColumnString.h>
|
|
|
|
#include <Columns/ColumnVector.h>
|
|
|
|
#include <Common/assert_cast.h>
|
|
|
|
#include <Core/callOnTypeIndex.h>
|
|
|
|
#include <DataStreams/SquashingBlockOutputStream.h>
|
|
|
|
#include <Formats/FormatFactory.h>
|
|
|
|
#include <IO/WriteHelpers.h>
|
|
|
|
#include <arrow/api.h>
|
|
|
|
#include <arrow/util/decimal.h>
|
2019-12-22 07:15:51 +00:00
|
|
|
#include <arrow/util/memory.h>
|
2019-08-21 02:28:04 +00:00
|
|
|
#include <parquet/arrow/writer.h>
|
2019-12-22 07:15:51 +00:00
|
|
|
#include <parquet/deprecated_io.h>
|
2020-05-03 00:54:39 +00:00
|
|
|
#include "CHColumnToArrowColumn.h"
|
2019-08-21 02:28:04 +00:00
|
|
|
|
2019-02-21 18:36:46 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int UNKNOWN_EXCEPTION;
|
|
|
|
}
|
|
|
|
|
2019-08-03 11:02:40 +00:00
|
|
|
ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
|
|
|
|
: IOutputFormat(header_, out_), format_settings{format_settings_}
|
2019-02-21 18:36:46 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2019-12-22 07:15:51 +00:00
|
|
|
class OstreamOutputStream : public arrow::io::OutputStream
|
2019-02-21 18:36:46 +00:00
|
|
|
{
|
|
|
|
public:
|
2019-12-23 06:51:35 +00:00
|
|
|
explicit OstreamOutputStream(WriteBuffer & ostr_) : ostr(ostr_) { is_open = true; }
|
2020-03-09 02:55:28 +00:00
|
|
|
~OstreamOutputStream() override = default;
|
2019-12-22 07:15:51 +00:00
|
|
|
|
|
|
|
// FileInterface
|
2019-12-23 06:51:35 +00:00
|
|
|
::arrow::Status Close() override
|
|
|
|
{
|
|
|
|
is_open = false;
|
2019-12-22 07:15:51 +00:00
|
|
|
return ::arrow::Status::OK();
|
|
|
|
}
|
2019-12-25 04:47:53 +00:00
|
|
|
|
2019-12-23 06:51:35 +00:00
|
|
|
::arrow::Status Tell(int64_t* position) const override
|
|
|
|
{
|
2019-12-22 07:15:51 +00:00
|
|
|
*position = total_length;
|
|
|
|
return ::arrow::Status::OK();
|
2019-12-25 04:47:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bool closed() const override { return !is_open; }
|
2019-12-22 07:15:51 +00:00
|
|
|
|
|
|
|
// Writable
|
2019-12-23 06:51:35 +00:00
|
|
|
::arrow::Status Write(const void* data, int64_t length) override
|
2019-02-21 18:36:46 +00:00
|
|
|
{
|
|
|
|
ostr.write(reinterpret_cast<const char *>(data), length);
|
|
|
|
total_length += length;
|
2019-12-22 07:15:51 +00:00
|
|
|
return ::arrow::Status::OK();
|
2019-02-21 18:36:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
WriteBuffer & ostr;
|
|
|
|
int64_t total_length = 0;
|
2019-12-23 06:51:35 +00:00
|
|
|
bool is_open = false;
|
2019-02-21 18:36:46 +00:00
|
|
|
|
|
|
|
PARQUET_DISALLOW_COPY_AND_ASSIGN(OstreamOutputStream);
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
void ParquetBlockOutputFormat::consume(Chunk chunk)
|
|
|
|
{
|
2020-05-03 00:54:39 +00:00
|
|
|
const Block & header = getPort(PortKind::Main).getHeader();
|
2019-02-21 18:36:46 +00:00
|
|
|
const size_t columns_num = chunk.getNumColumns();
|
2020-05-03 00:54:39 +00:00
|
|
|
std::shared_ptr<arrow::Table> arrow_table;
|
2019-02-21 18:36:46 +00:00
|
|
|
|
2020-05-03 00:54:39 +00:00
|
|
|
CHColumnToArrowColumn::CHChunkToArrowTable(arrow_table, header, chunk, columns_num, "Parquet");
|
2019-02-21 18:36:46 +00:00
|
|
|
|
|
|
|
auto sink = std::make_shared<OstreamOutputStream>(out);
|
|
|
|
|
|
|
|
if (!file_writer)
|
|
|
|
{
|
|
|
|
parquet::WriterProperties::Builder builder;
|
|
|
|
#if USE_SNAPPY
|
|
|
|
builder.compression(parquet::Compression::SNAPPY);
|
|
|
|
#endif
|
|
|
|
auto props = builder.build();
|
|
|
|
auto status = parquet::arrow::FileWriter::Open(
|
|
|
|
*arrow_table->schema(),
|
|
|
|
arrow::default_memory_pool(),
|
|
|
|
sink,
|
|
|
|
props, /*parquet::default_writer_properties(),*/
|
|
|
|
&file_writer);
|
|
|
|
if (!status.ok())
|
|
|
|
throw Exception{"Error while opening a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: calculate row_group_size depending on a number of rows and table size
|
|
|
|
auto status = file_writer->WriteTable(*arrow_table, format_settings.parquet.row_group_size);
|
|
|
|
|
|
|
|
if (!status.ok())
|
|
|
|
throw Exception{"Error while writing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
|
|
|
|
}
|
|
|
|
|
|
|
|
void ParquetBlockOutputFormat::finalize()
|
|
|
|
{
|
|
|
|
if (file_writer)
|
|
|
|
{
|
|
|
|
auto status = file_writer->Close();
|
|
|
|
if (!status.ok())
|
|
|
|
throw Exception{"Error while closing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void registerOutputFormatProcessorParquet(FormatFactory & factory)
|
|
|
|
{
|
|
|
|
factory.registerOutputFormatProcessor(
|
2019-08-20 11:17:57 +00:00
|
|
|
"Parquet",
|
|
|
|
[](WriteBuffer & buf,
|
|
|
|
const Block & sample,
|
|
|
|
FormatFactory::WriteCallback,
|
|
|
|
const FormatSettings & format_settings)
|
2019-02-21 18:36:46 +00:00
|
|
|
{
|
|
|
|
auto impl = std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);
|
|
|
|
/// TODO
|
|
|
|
// auto res = std::make_shared<SquashingBlockOutputStream>(impl, impl->getHeader(), format_settings.parquet.row_group_size, 0);
|
|
|
|
// res->disableFlush();
|
|
|
|
return impl;
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#else
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
class FormatFactory;
|
|
|
|
void registerOutputFormatProcessorParquet(FormatFactory &)
|
|
|
|
{
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#endif
|