mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 09:02:00 +00:00
Merge branch 'master' into even-more-warnings
This commit is contained in:
commit
ce59ad8e04
2
.gitmodules
vendored
2
.gitmodules
vendored
@ -156,4 +156,4 @@
|
|||||||
url = https://github.com/msgpack/msgpack-c
|
url = https://github.com/msgpack/msgpack-c
|
||||||
[submodule "contrib/libcpuid"]
|
[submodule "contrib/libcpuid"]
|
||||||
path = contrib/libcpuid
|
path = contrib/libcpuid
|
||||||
url = https://github.com/anrieff/libcpuid.git
|
url = https://github.com/ClickHouse-Extras/libcpuid.git
|
||||||
|
@ -70,6 +70,7 @@ elseif(NOT MISSING_INTERNAL_PARQUET_LIBRARY AND NOT OS_FREEBSD)
|
|||||||
|
|
||||||
set(USE_PARQUET 1)
|
set(USE_PARQUET 1)
|
||||||
set(USE_ORC 1)
|
set(USE_ORC 1)
|
||||||
|
set(USE_ARROW 1)
|
||||||
endif()
|
endif()
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
# AggregateFunction(name, types\_of\_arguments…) {#data-type-aggregatefunction}
|
# AggregateFunction {#data-type-aggregatefunction}
|
||||||
|
|
||||||
Промежуточное состояние агрегатной функции. Чтобы его получить, используются агрегатные функции с суффиксом `-State`. Чтобы в дальнейшем получить агрегированные данные необходимо использовать те же агрегатные функции с суффиксом `-Merge`.
|
Промежуточное состояние агрегатной функции. Чтобы его получить, используются агрегатные функции с суффиксом `-State`. Чтобы в дальнейшем получить агрегированные данные необходимо использовать те же агрегатные функции с суффиксом `-Merge`.
|
||||||
|
|
||||||
`AggregateFunction` — параметрический тип данных.
|
`AggregateFunction(name, types\_of\_arguments…)` — параметрический тип данных.
|
||||||
|
|
||||||
**Параметры**
|
**Параметры**
|
||||||
|
|
||||||
|
@ -128,7 +128,7 @@ struct Settings : public SettingsCollection<Settings>
|
|||||||
M(SettingUInt64, merge_tree_min_bytes_for_seek, 0, "You can skip reading more than that number of bytes at the price of one seek per file.", 0) \
|
M(SettingUInt64, merge_tree_min_bytes_for_seek, 0, "You can skip reading more than that number of bytes at the price of one seek per file.", 0) \
|
||||||
M(SettingUInt64, merge_tree_coarse_index_granularity, 8, "If the index segment can contain the required keys, divide it into as many parts and recursively check them.", 0) \
|
M(SettingUInt64, merge_tree_coarse_index_granularity, 8, "If the index segment can contain the required keys, divide it into as many parts and recursively check them.", 0) \
|
||||||
M(SettingUInt64, merge_tree_max_rows_to_use_cache, (128 * 8192), "The maximum number of rows per request, to use the cache of uncompressed data. If the request is large, the cache is not used. (For large queries not to flush out the cache.)", 0) \
|
M(SettingUInt64, merge_tree_max_rows_to_use_cache, (128 * 8192), "The maximum number of rows per request, to use the cache of uncompressed data. If the request is large, the cache is not used. (For large queries not to flush out the cache.)", 0) \
|
||||||
M(SettingUInt64, merge_tree_max_bytes_to_use_cache, (192 * 10 * 1024 * 1024), "The maximum number of rows per request, to use the cache of uncompressed data. If the request is large, the cache is not used. (For large queries not to flush out the cache.)", 0) \
|
M(SettingUInt64, merge_tree_max_bytes_to_use_cache, (192 * 10 * 1024 * 1024), "The maximum number of bytes per request, to use the cache of uncompressed data. If the request is large, the cache is not used. (For large queries not to flush out the cache.)", 0) \
|
||||||
\
|
\
|
||||||
M(SettingUInt64, mysql_max_rows_to_insert, 65536, "The maximum number of rows in MySQL batch insertion of the MySQL storage engine", 0) \
|
M(SettingUInt64, mysql_max_rows_to_insert, 65536, "The maximum number of rows in MySQL batch insertion of the MySQL storage engine", 0) \
|
||||||
\
|
\
|
||||||
|
@ -361,6 +361,8 @@ FormatFactory::FormatFactory()
|
|||||||
registerInputFormatProcessorORC(*this);
|
registerInputFormatProcessorORC(*this);
|
||||||
registerInputFormatProcessorParquet(*this);
|
registerInputFormatProcessorParquet(*this);
|
||||||
registerOutputFormatProcessorParquet(*this);
|
registerOutputFormatProcessorParquet(*this);
|
||||||
|
registerInputFormatProcessorArrow(*this);
|
||||||
|
registerOutputFormatProcessorArrow(*this);
|
||||||
registerInputFormatProcessorAvro(*this);
|
registerInputFormatProcessorAvro(*this);
|
||||||
registerOutputFormatProcessorAvro(*this);
|
registerOutputFormatProcessorAvro(*this);
|
||||||
#endif
|
#endif
|
||||||
|
@ -164,8 +164,9 @@ void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory);
|
|||||||
void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
|
void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
|
||||||
void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
|
void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
|
||||||
void registerInputFormatProcessorParquet(FormatFactory & factory);
|
void registerInputFormatProcessorParquet(FormatFactory & factory);
|
||||||
void registerInputFormatProcessorORC(FormatFactory & factory);
|
|
||||||
void registerOutputFormatProcessorParquet(FormatFactory & factory);
|
void registerOutputFormatProcessorParquet(FormatFactory & factory);
|
||||||
|
void registerInputFormatProcessorArrow(FormatFactory & factory);
|
||||||
|
void registerOutputFormatProcessorArrow(FormatFactory & factory);
|
||||||
void registerInputFormatProcessorProtobuf(FormatFactory & factory);
|
void registerInputFormatProcessorProtobuf(FormatFactory & factory);
|
||||||
void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
|
void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
|
||||||
void registerInputFormatProcessorAvro(FormatFactory & factory);
|
void registerInputFormatProcessorAvro(FormatFactory & factory);
|
||||||
@ -205,5 +206,6 @@ void registerOutputFormatProcessorMarkdown(FormatFactory & factory);
|
|||||||
void registerInputFormatProcessorCapnProto(FormatFactory & factory);
|
void registerInputFormatProcessorCapnProto(FormatFactory & factory);
|
||||||
void registerInputFormatProcessorRegexp(FormatFactory & factory);
|
void registerInputFormatProcessorRegexp(FormatFactory & factory);
|
||||||
void registerInputFormatProcessorJSONAsString(FormatFactory & factory);
|
void registerInputFormatProcessorJSONAsString(FormatFactory & factory);
|
||||||
|
void registerInputFormatProcessorORC(FormatFactory & factory);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -90,6 +90,11 @@ struct FormatSettings
|
|||||||
UInt64 input_allow_errors_num = 0;
|
UInt64 input_allow_errors_num = 0;
|
||||||
Float32 input_allow_errors_ratio = 0;
|
Float32 input_allow_errors_ratio = 0;
|
||||||
|
|
||||||
|
struct Arrow
|
||||||
|
{
|
||||||
|
UInt64 row_group_size = 1000000;
|
||||||
|
} arrow;
|
||||||
|
|
||||||
struct Parquet
|
struct Parquet
|
||||||
{
|
{
|
||||||
UInt64 row_group_size = 1000000;
|
UInt64 row_group_size = 1000000;
|
||||||
|
@ -7,4 +7,5 @@
|
|||||||
#cmakedefine01 USE_SNAPPY
|
#cmakedefine01 USE_SNAPPY
|
||||||
#cmakedefine01 USE_PARQUET
|
#cmakedefine01 USE_PARQUET
|
||||||
#cmakedefine01 USE_ORC
|
#cmakedefine01 USE_ORC
|
||||||
|
#cmakedefine01 USE_ARROW
|
||||||
#cmakedefine01 USE_PROTOBUF
|
#cmakedefine01 USE_PROTOBUF
|
||||||
|
97
src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp
Normal file
97
src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
#include "ArrowBlockInputFormat.h"
|
||||||
|
#if USE_ARROW
|
||||||
|
|
||||||
|
#include <Formats/FormatFactory.h>
|
||||||
|
#include <IO/ReadBufferFromMemory.h>
|
||||||
|
#include <IO/WriteHelpers.h>
|
||||||
|
#include <IO/copyData.h>
|
||||||
|
#include <arrow/api.h>
|
||||||
|
#include <arrow/ipc/reader.h>
|
||||||
|
#include <arrow/status.h>
|
||||||
|
#include "ArrowBufferedStreams.h"
|
||||||
|
#include "ArrowColumnToCHColumn.h"
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int BAD_ARGUMENTS;
|
||||||
|
extern const int CANNOT_READ_ALL_DATA;
|
||||||
|
}
|
||||||
|
|
||||||
|
ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_)
|
||||||
|
: IInputFormat(header_, in_)
|
||||||
|
{
|
||||||
|
prepareReader();
|
||||||
|
}
|
||||||
|
|
||||||
|
Chunk ArrowBlockInputFormat::generate()
|
||||||
|
{
|
||||||
|
Chunk res;
|
||||||
|
const Block & header = getPort().getHeader();
|
||||||
|
|
||||||
|
if (record_batch_current >= record_batch_total)
|
||||||
|
return res;
|
||||||
|
|
||||||
|
std::vector<std::shared_ptr<arrow::RecordBatch>> single_batch(1);
|
||||||
|
arrow::Status read_status = file_reader->ReadRecordBatch(record_batch_current, &single_batch[0]);
|
||||||
|
if (!read_status.ok())
|
||||||
|
throw Exception{"Error while reading batch of Arrow data: " + read_status.ToString(),
|
||||||
|
ErrorCodes::CANNOT_READ_ALL_DATA};
|
||||||
|
|
||||||
|
std::shared_ptr<arrow::Table> table;
|
||||||
|
arrow::Status make_status = arrow::Table::FromRecordBatches(single_batch, &table);
|
||||||
|
if (!make_status.ok())
|
||||||
|
throw Exception{"Error while reading table of Arrow data: " + read_status.ToString(),
|
||||||
|
ErrorCodes::CANNOT_READ_ALL_DATA};
|
||||||
|
|
||||||
|
++record_batch_current;
|
||||||
|
|
||||||
|
ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, header, "Arrow");
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ArrowBlockInputFormat::resetParser()
|
||||||
|
{
|
||||||
|
IInputFormat::resetParser();
|
||||||
|
|
||||||
|
file_reader.reset();
|
||||||
|
prepareReader();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ArrowBlockInputFormat::prepareReader()
|
||||||
|
{
|
||||||
|
arrow::Status open_status = arrow::ipc::RecordBatchFileReader::Open(asArrowFile(in), &file_reader);
|
||||||
|
if (!open_status.ok())
|
||||||
|
throw Exception(open_status.ToString(), ErrorCodes::BAD_ARGUMENTS);
|
||||||
|
record_batch_total = file_reader->num_record_batches();
|
||||||
|
record_batch_current = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void registerInputFormatProcessorArrow(FormatFactory &factory)
|
||||||
|
{
|
||||||
|
factory.registerInputFormatProcessor(
|
||||||
|
"Arrow",
|
||||||
|
[](ReadBuffer & buf,
|
||||||
|
const Block & sample,
|
||||||
|
const RowInputFormatParams & /* params */,
|
||||||
|
const FormatSettings & /* format_settings */)
|
||||||
|
{
|
||||||
|
return std::make_shared<ArrowBlockInputFormat>(buf, sample);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
class FormatFactory;
|
||||||
|
void registerInputFormatProcessorArrow(FormatFactory &)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
37
src/Processors/Formats/Impl/ArrowBlockInputFormat.h
Normal file
37
src/Processors/Formats/Impl/ArrowBlockInputFormat.h
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
#pragma once
|
||||||
|
#include "config_formats.h"
|
||||||
|
#if USE_ARROW
|
||||||
|
|
||||||
|
#include <Processors/Formats/IInputFormat.h>
|
||||||
|
|
||||||
|
namespace arrow::ipc { class RecordBatchFileReader; }
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
class ReadBuffer;
|
||||||
|
|
||||||
|
class ArrowBlockInputFormat : public IInputFormat
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_);
|
||||||
|
|
||||||
|
void resetParser() override;
|
||||||
|
|
||||||
|
String getName() const override { return "ArrowBlockInputFormat"; }
|
||||||
|
|
||||||
|
protected:
|
||||||
|
Chunk generate() override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
void prepareReader();
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::shared_ptr<arrow::ipc::RecordBatchFileReader> file_reader;
|
||||||
|
int record_batch_total = 0;
|
||||||
|
int record_batch_current = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
81
src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp
Normal file
81
src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
#include "ArrowBlockOutputFormat.h"
|
||||||
|
|
||||||
|
#if USE_ARROW
|
||||||
|
|
||||||
|
#include <Formats/FormatFactory.h>
|
||||||
|
#include <arrow/ipc/writer.h>
|
||||||
|
#include <arrow/table.h>
|
||||||
|
#include "ArrowBufferedStreams.h"
|
||||||
|
#include "CHColumnToArrowColumn.h"
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int UNKNOWN_EXCEPTION;
|
||||||
|
}
|
||||||
|
|
||||||
|
ArrowBlockOutputFormat::ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
|
||||||
|
: IOutputFormat(header_, out_), format_settings{format_settings_}, arrow_ostream{std::make_shared<ArrowBufferedOutputStream>(out_)}
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void ArrowBlockOutputFormat::consume(Chunk chunk)
|
||||||
|
{
|
||||||
|
const Block & header = getPort(PortKind::Main).getHeader();
|
||||||
|
const size_t columns_num = chunk.getNumColumns();
|
||||||
|
std::shared_ptr<arrow::Table> arrow_table;
|
||||||
|
|
||||||
|
CHColumnToArrowColumn::chChunkToArrowTable(arrow_table, header, chunk, columns_num, "Arrow");
|
||||||
|
|
||||||
|
if (!writer)
|
||||||
|
{
|
||||||
|
// TODO: should we use arrow::ipc::IpcOptions::alignment?
|
||||||
|
auto status = arrow::ipc::RecordBatchFileWriter::Open(arrow_ostream.get(), arrow_table->schema(), &writer);
|
||||||
|
if (!status.ok())
|
||||||
|
throw Exception{"Error while opening a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: calculate row_group_size depending on a number of rows and table size
|
||||||
|
auto status = writer->WriteTable(*arrow_table, format_settings.arrow.row_group_size);
|
||||||
|
|
||||||
|
if (!status.ok())
|
||||||
|
throw Exception{"Error while writing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
|
||||||
|
}
|
||||||
|
|
||||||
|
void ArrowBlockOutputFormat::finalize()
|
||||||
|
{
|
||||||
|
if (writer)
|
||||||
|
{
|
||||||
|
auto status = writer->Close();
|
||||||
|
if (!status.ok())
|
||||||
|
throw Exception{"Error while closing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void registerOutputFormatProcessorArrow(FormatFactory & factory)
|
||||||
|
{
|
||||||
|
factory.registerOutputFormatProcessor(
|
||||||
|
"Arrow",
|
||||||
|
[](WriteBuffer & buf,
|
||||||
|
const Block & sample,
|
||||||
|
FormatFactory::WriteCallback,
|
||||||
|
const FormatSettings & format_settings)
|
||||||
|
{
|
||||||
|
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, format_settings);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#else
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
class FormatFactory;
|
||||||
|
void registerOutputFormatProcessorArrow(FormatFactory &)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
33
src/Processors/Formats/Impl/ArrowBlockOutputFormat.h
Normal file
33
src/Processors/Formats/Impl/ArrowBlockOutputFormat.h
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
#pragma once
|
||||||
|
#include "config_formats.h"
|
||||||
|
#if USE_ARROW
|
||||||
|
|
||||||
|
#include <Formats/FormatSettings.h>
|
||||||
|
#include <Processors/Formats/IOutputFormat.h>
|
||||||
|
#include "ArrowBufferedStreams.h"
|
||||||
|
|
||||||
|
namespace arrow::ipc { class RecordBatchWriter; }
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
class ArrowBlockOutputFormat : public IOutputFormat
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
|
||||||
|
|
||||||
|
String getName() const override { return "ArrowBlockOutputFormat"; }
|
||||||
|
void consume(Chunk) override;
|
||||||
|
void finalize() override;
|
||||||
|
|
||||||
|
String getContentType() const override { return "application/octet-stream"; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
const FormatSettings format_settings;
|
||||||
|
std::shared_ptr<ArrowBufferedOutputStream> arrow_ostream;
|
||||||
|
std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
107
src/Processors/Formats/Impl/ArrowBufferedStreams.cpp
Normal file
107
src/Processors/Formats/Impl/ArrowBufferedStreams.cpp
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
#include "ArrowBufferedStreams.h"
|
||||||
|
|
||||||
|
#if USE_ARROW || USE_ORC || USE_PARQUET
|
||||||
|
|
||||||
|
#include <IO/ReadBufferFromFileDescriptor.h>
|
||||||
|
#include <IO/WriteBufferFromString.h>
|
||||||
|
#include <IO/copyData.h>
|
||||||
|
#include <arrow/buffer.h>
|
||||||
|
#include <arrow/io/api.h>
|
||||||
|
#include <arrow/status.h>
|
||||||
|
|
||||||
|
#include <sys/stat.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
ArrowBufferedOutputStream::ArrowBufferedOutputStream(WriteBuffer & out_) : out{out_}, is_open{true}
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
arrow::Status ArrowBufferedOutputStream::Close()
|
||||||
|
{
|
||||||
|
is_open = false;
|
||||||
|
return arrow::Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
arrow::Status ArrowBufferedOutputStream::Tell(int64_t * position) const
|
||||||
|
{
|
||||||
|
*position = total_length;
|
||||||
|
return arrow::Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
arrow::Status ArrowBufferedOutputStream::Write(const void * data, int64_t length)
|
||||||
|
{
|
||||||
|
out.write(reinterpret_cast<const char *>(data), length);
|
||||||
|
total_length += length;
|
||||||
|
return arrow::Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
RandomAccessFileFromSeekableReadBuffer::RandomAccessFileFromSeekableReadBuffer(SeekableReadBuffer & in_, off_t file_size_)
|
||||||
|
: in{in_}, file_size{file_size_}, is_open{true}
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
arrow::Status RandomAccessFileFromSeekableReadBuffer::GetSize(int64_t * size)
|
||||||
|
{
|
||||||
|
*size = file_size;
|
||||||
|
return arrow::Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
arrow::Status RandomAccessFileFromSeekableReadBuffer::Close()
|
||||||
|
{
|
||||||
|
is_open = false;
|
||||||
|
return arrow::Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
arrow::Status RandomAccessFileFromSeekableReadBuffer::Tell(int64_t * position) const
|
||||||
|
{
|
||||||
|
*position = in.getPosition();
|
||||||
|
return arrow::Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
arrow::Status RandomAccessFileFromSeekableReadBuffer::Read(int64_t nbytes, int64_t * bytes_read, void * out)
|
||||||
|
{
|
||||||
|
*bytes_read = in.readBig(reinterpret_cast<char *>(out), nbytes);
|
||||||
|
return arrow::Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
arrow::Status RandomAccessFileFromSeekableReadBuffer::Read(int64_t nbytes, std::shared_ptr<arrow::Buffer> * out)
|
||||||
|
{
|
||||||
|
std::shared_ptr<arrow::Buffer> buf;
|
||||||
|
ARROW_RETURN_NOT_OK(arrow::AllocateBuffer(nbytes, &buf));
|
||||||
|
size_t n = in.readBig(reinterpret_cast<char *>(buf->mutable_data()), nbytes);
|
||||||
|
*out = arrow::SliceBuffer(buf, 0, n);
|
||||||
|
return arrow::Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
arrow::Status RandomAccessFileFromSeekableReadBuffer::Seek(int64_t position)
|
||||||
|
{
|
||||||
|
in.seek(position, SEEK_SET);
|
||||||
|
return arrow::Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in)
|
||||||
|
{
|
||||||
|
if (auto * fd_in = dynamic_cast<ReadBufferFromFileDescriptor *>(&in))
|
||||||
|
{
|
||||||
|
struct stat stat;
|
||||||
|
auto res = ::fstat(fd_in->getFD(), &stat);
|
||||||
|
// if fd is a regular file i.e. not stdin
|
||||||
|
if (res == 0 && S_ISREG(stat.st_mode))
|
||||||
|
return std::make_shared<RandomAccessFileFromSeekableReadBuffer>(*fd_in, stat.st_size);
|
||||||
|
}
|
||||||
|
|
||||||
|
// fallback to loading the entire file in memory
|
||||||
|
std::string file_data;
|
||||||
|
{
|
||||||
|
WriteBufferFromString file_buffer(file_data);
|
||||||
|
copyData(in, file_buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::make_shared<arrow::io::BufferReader>(arrow::Buffer::FromString(std::move(file_data)));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
68
src/Processors/Formats/Impl/ArrowBufferedStreams.h
Normal file
68
src/Processors/Formats/Impl/ArrowBufferedStreams.h
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
#pragma once
|
||||||
|
#include "config_formats.h"
|
||||||
|
#if USE_ARROW || USE_ORC || USE_PARQUET
|
||||||
|
|
||||||
|
#include <arrow/io/interfaces.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
class ReadBuffer;
|
||||||
|
class SeekableReadBuffer;
|
||||||
|
class WriteBuffer;
|
||||||
|
|
||||||
|
class ArrowBufferedOutputStream : public arrow::io::OutputStream
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit ArrowBufferedOutputStream(WriteBuffer & out_);
|
||||||
|
|
||||||
|
// FileInterface
|
||||||
|
arrow::Status Close() override;
|
||||||
|
|
||||||
|
arrow::Status Tell(int64_t * position) const override;
|
||||||
|
|
||||||
|
bool closed() const override { return !is_open; }
|
||||||
|
|
||||||
|
// Writable
|
||||||
|
arrow::Status Write(const void * data, int64_t length) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
WriteBuffer & out;
|
||||||
|
int64_t total_length = 0;
|
||||||
|
bool is_open = false;
|
||||||
|
|
||||||
|
ARROW_DISALLOW_COPY_AND_ASSIGN(ArrowBufferedOutputStream);
|
||||||
|
};
|
||||||
|
|
||||||
|
class RandomAccessFileFromSeekableReadBuffer : public arrow::io::RandomAccessFile
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
RandomAccessFileFromSeekableReadBuffer(SeekableReadBuffer & in_, off_t file_size_);
|
||||||
|
|
||||||
|
arrow::Status GetSize(int64_t * size) override;
|
||||||
|
|
||||||
|
arrow::Status Close() override;
|
||||||
|
|
||||||
|
arrow::Status Tell(int64_t * position) const override;
|
||||||
|
|
||||||
|
bool closed() const override { return !is_open; }
|
||||||
|
|
||||||
|
arrow::Status Read(int64_t nbytes, int64_t * bytes_read, void * out) override;
|
||||||
|
|
||||||
|
arrow::Status Read(int64_t nbytes, std::shared_ptr<arrow::Buffer> * out) override;
|
||||||
|
|
||||||
|
arrow::Status Seek(int64_t position) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
SeekableReadBuffer & in;
|
||||||
|
off_t file_size;
|
||||||
|
bool is_open = false;
|
||||||
|
|
||||||
|
ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromSeekableReadBuffer);
|
||||||
|
};
|
||||||
|
|
||||||
|
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
@ -1,7 +1,7 @@
|
|||||||
#include "config_formats.h"
|
#include "config_formats.h"
|
||||||
#include "ArrowColumnToCHColumn.h"
|
#include "ArrowColumnToCHColumn.h"
|
||||||
|
|
||||||
#if USE_ORC || USE_PARQUET
|
#if USE_ARROW || USE_ORC || USE_PARQUET
|
||||||
#include <DataTypes/DataTypeFactory.h>
|
#include <DataTypes/DataTypeFactory.h>
|
||||||
#include <DataTypes/DataTypeNullable.h>
|
#include <DataTypes/DataTypeNullable.h>
|
||||||
#include <DataTypes/DataTypeString.h>
|
#include <DataTypes/DataTypeString.h>
|
||||||
@ -22,7 +22,6 @@ namespace DB
|
|||||||
{
|
{
|
||||||
extern const int UNKNOWN_TYPE;
|
extern const int UNKNOWN_TYPE;
|
||||||
extern const int VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE;
|
extern const int VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE;
|
||||||
extern const int CANNOT_READ_ALL_DATA;
|
|
||||||
extern const int CANNOT_CONVERT_TYPE;
|
extern const int CANNOT_CONVERT_TYPE;
|
||||||
extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN;
|
extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN;
|
||||||
extern const int THERE_IS_NO_COLUMN;
|
extern const int THERE_IS_NO_COLUMN;
|
||||||
@ -244,9 +243,8 @@ namespace DB
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk &res, std::shared_ptr<arrow::Table> &table,
|
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table,
|
||||||
arrow::Status &read_status, const Block &header,
|
const Block & header, std::string format_name)
|
||||||
int &row_group_current, std::string format_name)
|
|
||||||
{
|
{
|
||||||
Columns columns_list;
|
Columns columns_list;
|
||||||
UInt64 num_rows = 0;
|
UInt64 num_rows = 0;
|
||||||
@ -254,11 +252,6 @@ namespace DB
|
|||||||
columns_list.reserve(header.rows());
|
columns_list.reserve(header.rows());
|
||||||
|
|
||||||
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::ChunkedArray>>;
|
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::ChunkedArray>>;
|
||||||
if (!read_status.ok())
|
|
||||||
throw Exception{"Error while reading " + format_name + " data: " + read_status.ToString(),
|
|
||||||
ErrorCodes::CANNOT_READ_ALL_DATA};
|
|
||||||
|
|
||||||
++row_group_current;
|
|
||||||
|
|
||||||
NameToColumnPtr name_to_column_ptr;
|
NameToColumnPtr name_to_column_ptr;
|
||||||
for (const auto& column_name : table->ColumnNames())
|
for (const auto& column_name : table->ColumnNames())
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
|
#pragma once
|
||||||
#include "config_formats.h"
|
#include "config_formats.h"
|
||||||
|
|
||||||
#if USE_ORC || USE_PARQUET
|
#if USE_ARROW || USE_ORC || USE_PARQUET
|
||||||
|
|
||||||
#include <DataTypes/IDataType.h>
|
#include <DataTypes/IDataType.h>
|
||||||
#include <DataTypes/DataTypesNumber.h>
|
#include <DataTypes/DataTypesNumber.h>
|
||||||
@ -31,15 +32,15 @@ namespace DB
|
|||||||
M(arrow::Type::INT32, DB::Int32) \
|
M(arrow::Type::INT32, DB::Int32) \
|
||||||
M(arrow::Type::UINT64, DB::UInt64) \
|
M(arrow::Type::UINT64, DB::UInt64) \
|
||||||
M(arrow::Type::INT64, DB::Int64) \
|
M(arrow::Type::INT64, DB::Int64) \
|
||||||
|
M(arrow::Type::HALF_FLOAT, DB::Float32) \
|
||||||
M(arrow::Type::FLOAT, DB::Float32) \
|
M(arrow::Type::FLOAT, DB::Float32) \
|
||||||
M(arrow::Type::DOUBLE, DB::Float64)
|
M(arrow::Type::DOUBLE, DB::Float64)
|
||||||
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
||||||
static void arrowTableToCHChunk(Chunk &res, std::shared_ptr<arrow::Table> &table,
|
static void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table,
|
||||||
arrow::Status &read_status, const Block &header,
|
const Block & header, std::string format_name);
|
||||||
int &row_group_current, std::string format_name);
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
329
src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp
Normal file
329
src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp
Normal file
@ -0,0 +1,329 @@
|
|||||||
|
#include "CHColumnToArrowColumn.h"
|
||||||
|
|
||||||
|
#if USE_ARROW || USE_PARQUET
|
||||||
|
|
||||||
|
#include <Columns/ColumnFixedString.h>
|
||||||
|
#include <Columns/ColumnNullable.h>
|
||||||
|
#include <Columns/ColumnString.h>
|
||||||
|
#include <Core/callOnTypeIndex.h>
|
||||||
|
#include <DataTypes/DataTypeDateTime.h>
|
||||||
|
#include <DataTypes/DataTypeNullable.h>
|
||||||
|
#include <DataTypes/DataTypesDecimal.h>
|
||||||
|
#include <Processors/Formats/IOutputFormat.h>
|
||||||
|
#include <arrow/api.h>
|
||||||
|
#include <arrow/util/decimal.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int UNKNOWN_EXCEPTION;
|
||||||
|
extern const int UNKNOWN_TYPE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static const std::initializer_list<std::pair<String, std::shared_ptr<arrow::DataType>>> internal_type_to_arrow_type =
|
||||||
|
{
|
||||||
|
{"UInt8", arrow::uint8()},
|
||||||
|
{"Int8", arrow::int8()},
|
||||||
|
{"UInt16", arrow::uint16()},
|
||||||
|
{"Int16", arrow::int16()},
|
||||||
|
{"UInt32", arrow::uint32()},
|
||||||
|
{"Int32", arrow::int32()},
|
||||||
|
{"UInt64", arrow::uint64()},
|
||||||
|
{"Int64", arrow::int64()},
|
||||||
|
{"Float32", arrow::float32()},
|
||||||
|
{"Float64", arrow::float64()},
|
||||||
|
|
||||||
|
//{"Date", arrow::date64()},
|
||||||
|
//{"Date", arrow::date32()},
|
||||||
|
{"Date", arrow::uint16()}, // CHECK
|
||||||
|
//{"DateTime", arrow::date64()}, // BUG! saves as date32
|
||||||
|
{"DateTime", arrow::uint32()},
|
||||||
|
|
||||||
|
// TODO: ClickHouse can actually store non-utf8 strings!
|
||||||
|
{"String", arrow::utf8()},
|
||||||
|
{"FixedString", arrow::utf8()},
|
||||||
|
};
|
||||||
|
|
||||||
|
static const PaddedPODArray<UInt8> * extractNullBytemapPtr(ColumnPtr column)
|
||||||
|
{
|
||||||
|
ColumnPtr null_column = assert_cast<const ColumnNullable &>(*column).getNullMapColumnPtr();
|
||||||
|
const PaddedPODArray<UInt8> & null_bytemap = assert_cast<const ColumnVector<UInt8> &>(*null_column).getData();
|
||||||
|
return &null_bytemap;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void checkStatus(const arrow::Status & status, const String & column_name, const String & format_name)
|
||||||
|
{
|
||||||
|
if (!status.ok())
|
||||||
|
throw Exception{"Error with a " + format_name + " column \"" + column_name + "\": " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename NumericType, typename ArrowBuilderType>
|
||||||
|
static void fillArrowArrayWithNumericColumnData(
|
||||||
|
ColumnPtr write_column,
|
||||||
|
std::shared_ptr<arrow::Array> & arrow_array,
|
||||||
|
const PaddedPODArray<UInt8> * null_bytemap,
|
||||||
|
const String & format_name)
|
||||||
|
{
|
||||||
|
const PaddedPODArray<NumericType> & internal_data = assert_cast<const ColumnVector<NumericType> &>(*write_column).getData();
|
||||||
|
ArrowBuilderType builder;
|
||||||
|
arrow::Status status;
|
||||||
|
|
||||||
|
const UInt8 * arrow_null_bytemap_raw_ptr = nullptr;
|
||||||
|
PaddedPODArray<UInt8> arrow_null_bytemap;
|
||||||
|
if (null_bytemap)
|
||||||
|
{
|
||||||
|
/// Invert values since Arrow interprets 1 as a non-null value, while CH as a null
|
||||||
|
arrow_null_bytemap.reserve(null_bytemap->size());
|
||||||
|
for (auto is_null : *null_bytemap)
|
||||||
|
arrow_null_bytemap.emplace_back(!is_null);
|
||||||
|
|
||||||
|
arrow_null_bytemap_raw_ptr = arrow_null_bytemap.data();
|
||||||
|
}
|
||||||
|
|
||||||
|
if constexpr (std::is_same_v<NumericType, UInt8>)
|
||||||
|
status = builder.AppendValues(
|
||||||
|
reinterpret_cast<const uint8_t *>(internal_data.data()),
|
||||||
|
internal_data.size(),
|
||||||
|
reinterpret_cast<const uint8_t *>(arrow_null_bytemap_raw_ptr));
|
||||||
|
else
|
||||||
|
status = builder.AppendValues(internal_data.data(), internal_data.size(), reinterpret_cast<const uint8_t *>(arrow_null_bytemap_raw_ptr));
|
||||||
|
checkStatus(status, write_column->getName(), format_name);
|
||||||
|
|
||||||
|
status = builder.Finish(&arrow_array);
|
||||||
|
checkStatus(status, write_column->getName(), format_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename ColumnType>
|
||||||
|
static void fillArrowArrayWithStringColumnData(
|
||||||
|
ColumnPtr write_column,
|
||||||
|
std::shared_ptr<arrow::Array> & arrow_array,
|
||||||
|
const PaddedPODArray<UInt8> * null_bytemap,
|
||||||
|
const String & format_name)
|
||||||
|
{
|
||||||
|
const auto & internal_column = assert_cast<const ColumnType &>(*write_column);
|
||||||
|
arrow::StringBuilder builder;
|
||||||
|
arrow::Status status;
|
||||||
|
|
||||||
|
for (size_t string_i = 0, size = internal_column.size(); string_i < size; ++string_i)
|
||||||
|
{
|
||||||
|
if (null_bytemap && (*null_bytemap)[string_i])
|
||||||
|
{
|
||||||
|
status = builder.AppendNull();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
StringRef string_ref = internal_column.getDataAt(string_i);
|
||||||
|
status = builder.Append(string_ref.data, string_ref.size);
|
||||||
|
}
|
||||||
|
|
||||||
|
checkStatus(status, write_column->getName(), format_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
status = builder.Finish(&arrow_array);
|
||||||
|
checkStatus(status, write_column->getName(), format_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void fillArrowArrayWithDateColumnData(
|
||||||
|
ColumnPtr write_column,
|
||||||
|
std::shared_ptr<arrow::Array> & arrow_array,
|
||||||
|
const PaddedPODArray<UInt8> * null_bytemap,
|
||||||
|
const String & format_name)
|
||||||
|
{
|
||||||
|
const PaddedPODArray<UInt16> & internal_data = assert_cast<const ColumnVector<UInt16> &>(*write_column).getData();
|
||||||
|
//arrow::Date32Builder date_builder;
|
||||||
|
arrow::UInt16Builder builder;
|
||||||
|
arrow::Status status;
|
||||||
|
|
||||||
|
for (size_t value_i = 0, size = internal_data.size(); value_i < size; ++value_i)
|
||||||
|
{
|
||||||
|
if (null_bytemap && (*null_bytemap)[value_i])
|
||||||
|
status = builder.AppendNull();
|
||||||
|
else
|
||||||
|
/// Implicitly converts UInt16 to Int32
|
||||||
|
status = builder.Append(internal_data[value_i]);
|
||||||
|
checkStatus(status, write_column->getName(), format_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
status = builder.Finish(&arrow_array);
|
||||||
|
checkStatus(status, write_column->getName(), format_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void fillArrowArrayWithDateTimeColumnData(
|
||||||
|
ColumnPtr write_column,
|
||||||
|
std::shared_ptr<arrow::Array> & arrow_array,
|
||||||
|
const PaddedPODArray<UInt8> * null_bytemap,
|
||||||
|
const String & format_name)
|
||||||
|
{
|
||||||
|
const auto & internal_data = assert_cast<const ColumnVector<UInt32> &>(*write_column).getData();
|
||||||
|
//arrow::Date64Builder builder;
|
||||||
|
arrow::UInt32Builder builder;
|
||||||
|
arrow::Status status;
|
||||||
|
|
||||||
|
for (size_t value_i = 0, size = internal_data.size(); value_i < size; ++value_i)
|
||||||
|
{
|
||||||
|
if (null_bytemap && (*null_bytemap)[value_i])
|
||||||
|
status = builder.AppendNull();
|
||||||
|
else
|
||||||
|
/// Implicitly converts UInt16 to Int32
|
||||||
|
//status = date_builder.Append(static_cast<int64_t>(internal_data[value_i]) * 1000); // now ms. TODO check other units
|
||||||
|
status = builder.Append(internal_data[value_i]);
|
||||||
|
|
||||||
|
checkStatus(status, write_column->getName(), format_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
status = builder.Finish(&arrow_array);
|
||||||
|
checkStatus(status, write_column->getName(), format_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename DataType>
|
||||||
|
static void fillArrowArrayWithDecimalColumnData(
|
||||||
|
ColumnPtr write_column,
|
||||||
|
std::shared_ptr<arrow::Array> & arrow_array,
|
||||||
|
const PaddedPODArray<UInt8> * null_bytemap,
|
||||||
|
const DataType * decimal_type,
|
||||||
|
const String & format_name)
|
||||||
|
{
|
||||||
|
const auto & column = static_cast<const typename DataType::ColumnType &>(*write_column);
|
||||||
|
arrow::DecimalBuilder builder(arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale()));
|
||||||
|
arrow::Status status;
|
||||||
|
|
||||||
|
for (size_t value_i = 0, size = column.size(); value_i < size; ++value_i)
|
||||||
|
{
|
||||||
|
if (null_bytemap && (*null_bytemap)[value_i])
|
||||||
|
status = builder.AppendNull();
|
||||||
|
else
|
||||||
|
status = builder.Append(
|
||||||
|
arrow::Decimal128(reinterpret_cast<const uint8_t *>(&column.getElement(value_i).value))); // TODO: try copy column
|
||||||
|
|
||||||
|
checkStatus(status, write_column->getName(), format_name);
|
||||||
|
}
|
||||||
|
status = builder.Finish(&arrow_array);
|
||||||
|
checkStatus(status, write_column->getName(), format_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
void CHColumnToArrowColumn::chChunkToArrowTable(
|
||||||
|
std::shared_ptr<arrow::Table> & res,
|
||||||
|
const Block & header,
|
||||||
|
const Chunk & chunk,
|
||||||
|
size_t columns_num,
|
||||||
|
String format_name)
|
||||||
|
{
|
||||||
|
/// For arrow::Schema and arrow::Table creation
|
||||||
|
std::vector<std::shared_ptr<arrow::Field>> arrow_fields;
|
||||||
|
std::vector<std::shared_ptr<arrow::Array>> arrow_arrays;
|
||||||
|
arrow_fields.reserve(columns_num);
|
||||||
|
arrow_arrays.reserve(columns_num);
|
||||||
|
|
||||||
|
for (size_t column_i = 0; column_i < columns_num; ++column_i)
|
||||||
|
{
|
||||||
|
// TODO: constructed every iteration
|
||||||
|
ColumnWithTypeAndName column = header.safeGetByPosition(column_i);
|
||||||
|
column.column = chunk.getColumns()[column_i];
|
||||||
|
|
||||||
|
const bool is_column_nullable = column.type->isNullable();
|
||||||
|
const auto & column_nested_type
|
||||||
|
= is_column_nullable ? static_cast<const DataTypeNullable *>(column.type.get())->getNestedType() : column.type;
|
||||||
|
const String column_nested_type_name = column_nested_type->getFamilyName();
|
||||||
|
|
||||||
|
if (isDecimal(column_nested_type))
|
||||||
|
{
|
||||||
|
const auto add_decimal_field = [&](const auto & types) -> bool {
|
||||||
|
using Types = std::decay_t<decltype(types)>;
|
||||||
|
using ToDataType = typename Types::LeftType;
|
||||||
|
|
||||||
|
if constexpr (
|
||||||
|
std::is_same_v<ToDataType, DataTypeDecimal<Decimal32>>
|
||||||
|
|| std::is_same_v<ToDataType, DataTypeDecimal<Decimal64>>
|
||||||
|
|| std::is_same_v<ToDataType, DataTypeDecimal<Decimal128>>)
|
||||||
|
{
|
||||||
|
const auto & decimal_type = static_cast<const ToDataType *>(column_nested_type.get());
|
||||||
|
arrow_fields.emplace_back(std::make_shared<arrow::Field>(
|
||||||
|
column.name, arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale()), is_column_nullable));
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
callOnIndexAndDataType<void>(column_nested_type->getTypeId(), add_decimal_field);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (const auto * arrow_type_it = std::find_if(internal_type_to_arrow_type.begin(), internal_type_to_arrow_type.end(),
|
||||||
|
[=](auto && elem) { return elem.first == column_nested_type_name; });
|
||||||
|
arrow_type_it != internal_type_to_arrow_type.end())
|
||||||
|
{
|
||||||
|
arrow_fields.emplace_back(std::make_shared<arrow::Field>(column.name, arrow_type_it->second, is_column_nullable));
|
||||||
|
} else
|
||||||
|
{
|
||||||
|
throw Exception{"The type \"" + column_nested_type_name + "\" of a column \"" + column.name + "\""
|
||||||
|
" is not supported for conversion into a " + format_name + " data format",
|
||||||
|
ErrorCodes::UNKNOWN_TYPE};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ColumnPtr nested_column
|
||||||
|
= is_column_nullable ? assert_cast<const ColumnNullable &>(*column.column).getNestedColumnPtr() : column.column;
|
||||||
|
const PaddedPODArray<UInt8> * null_bytemap = is_column_nullable ? extractNullBytemapPtr(column.column) : nullptr;
|
||||||
|
|
||||||
|
std::shared_ptr<arrow::Array> arrow_array;
|
||||||
|
|
||||||
|
if ("String" == column_nested_type_name)
|
||||||
|
{
|
||||||
|
fillArrowArrayWithStringColumnData<ColumnString>(nested_column, arrow_array, null_bytemap, format_name);
|
||||||
|
}
|
||||||
|
else if ("FixedString" == column_nested_type_name)
|
||||||
|
{
|
||||||
|
fillArrowArrayWithStringColumnData<ColumnFixedString>(nested_column, arrow_array, null_bytemap, format_name);
|
||||||
|
}
|
||||||
|
else if ("Date" == column_nested_type_name)
|
||||||
|
{
|
||||||
|
fillArrowArrayWithDateColumnData(nested_column, arrow_array, null_bytemap, format_name);
|
||||||
|
}
|
||||||
|
else if ("DateTime" == column_nested_type_name)
|
||||||
|
{
|
||||||
|
fillArrowArrayWithDateTimeColumnData(nested_column, arrow_array, null_bytemap, format_name);
|
||||||
|
}
|
||||||
|
else if (isDecimal(column_nested_type))
|
||||||
|
{
|
||||||
|
auto fill_decimal = [&](const auto & types) -> bool
|
||||||
|
{
|
||||||
|
using Types = std::decay_t<decltype(types)>;
|
||||||
|
using ToDataType = typename Types::LeftType;
|
||||||
|
if constexpr (
|
||||||
|
std::is_same_v<ToDataType,DataTypeDecimal<Decimal32>>
|
||||||
|
|| std::is_same_v<ToDataType, DataTypeDecimal<Decimal64>>
|
||||||
|
|| std::is_same_v<ToDataType, DataTypeDecimal<Decimal128>>)
|
||||||
|
{
|
||||||
|
const auto & decimal_type = static_cast<const ToDataType *>(column_nested_type.get());
|
||||||
|
fillArrowArrayWithDecimalColumnData(nested_column, arrow_array, null_bytemap, decimal_type, format_name);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
callOnIndexAndDataType<void>(column_nested_type->getTypeId(), fill_decimal);
|
||||||
|
}
|
||||||
|
#define DISPATCH(CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE) \
|
||||||
|
else if (#CPP_NUMERIC_TYPE == column_nested_type_name) \
|
||||||
|
{ \
|
||||||
|
fillArrowArrayWithNumericColumnData<CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE>(nested_column, arrow_array, null_bytemap, format_name); \
|
||||||
|
}
|
||||||
|
|
||||||
|
FOR_INTERNAL_NUMERIC_TYPES(DISPATCH)
|
||||||
|
#undef DISPATCH
|
||||||
|
else
|
||||||
|
{
|
||||||
|
throw Exception{"Internal type \"" + column_nested_type_name + "\" of a column \"" + column.name + "\""
|
||||||
|
" is not supported for conversion into a " + format_name + " data format",
|
||||||
|
ErrorCodes::UNKNOWN_TYPE};
|
||||||
|
}
|
||||||
|
|
||||||
|
arrow_arrays.emplace_back(std::move(arrow_array));
|
||||||
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<arrow::Schema> arrow_schema = std::make_shared<arrow::Schema>(std::move(arrow_fields));
|
||||||
|
|
||||||
|
res = arrow::Table::Make(arrow_schema, arrow_arrays);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
35
src/Processors/Formats/Impl/CHColumnToArrowColumn.h
Normal file
35
src/Processors/Formats/Impl/CHColumnToArrowColumn.h
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
#pragma once
|
||||||
|
#include "config_formats.h"
|
||||||
|
|
||||||
|
#if USE_ARROW || USE_PARQUET
|
||||||
|
|
||||||
|
#include <Core/Block.h>
|
||||||
|
#include <Processors/Chunk.h>
|
||||||
|
#include <arrow/table.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
class CHColumnToArrowColumn
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
|
||||||
|
#define FOR_INTERNAL_NUMERIC_TYPES(M) \
|
||||||
|
M(UInt8, arrow::UInt8Builder) \
|
||||||
|
M(Int8, arrow::Int8Builder) \
|
||||||
|
M(UInt16, arrow::UInt16Builder) \
|
||||||
|
M(Int16, arrow::Int16Builder) \
|
||||||
|
M(UInt32, arrow::UInt32Builder) \
|
||||||
|
M(Int32, arrow::Int32Builder) \
|
||||||
|
M(UInt64, arrow::UInt64Builder) \
|
||||||
|
M(Int64, arrow::Int64Builder) \
|
||||||
|
M(Float32, arrow::FloatBuilder) \
|
||||||
|
M(Float64, arrow::DoubleBuilder)
|
||||||
|
|
||||||
|
|
||||||
|
public:
|
||||||
|
static void chChunkToArrowTable(std::shared_ptr<arrow::Table> & res, const Block & header, const Chunk & chunk,
|
||||||
|
size_t columns_num, String format_name);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
#endif
|
@ -2,12 +2,12 @@
|
|||||||
#if USE_ORC
|
#if USE_ORC
|
||||||
|
|
||||||
#include <Formats/FormatFactory.h>
|
#include <Formats/FormatFactory.h>
|
||||||
#include <IO/BufferBase.h>
|
|
||||||
#include <IO/ReadBufferFromMemory.h>
|
#include <IO/ReadBufferFromMemory.h>
|
||||||
#include <IO/WriteBufferFromString.h>
|
|
||||||
#include <IO/WriteHelpers.h>
|
#include <IO/WriteHelpers.h>
|
||||||
#include <IO/copyData.h>
|
#include <IO/copyData.h>
|
||||||
|
#include <arrow/adapters/orc/adapter.h>
|
||||||
#include <arrow/io/memory.h>
|
#include <arrow/io/memory.h>
|
||||||
|
#include "ArrowBufferedStreams.h"
|
||||||
#include "ArrowColumnToCHColumn.h"
|
#include "ArrowColumnToCHColumn.h"
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -15,10 +15,10 @@ namespace DB
|
|||||||
|
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
|
extern const int BAD_ARGUMENTS;
|
||||||
extern const int CANNOT_READ_ALL_DATA;
|
extern const int CANNOT_READ_ALL_DATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_) : IInputFormat(std::move(header_), in_)
|
ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_) : IInputFormat(std::move(header_), in_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -26,46 +26,22 @@ ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_) : IInp
|
|||||||
Chunk ORCBlockInputFormat::generate()
|
Chunk ORCBlockInputFormat::generate()
|
||||||
{
|
{
|
||||||
Chunk res;
|
Chunk res;
|
||||||
|
const Block & header = getPort().getHeader();
|
||||||
|
|
||||||
const auto & header = getPort().getHeader();
|
if (in.eof())
|
||||||
|
|
||||||
if (!in.eof())
|
|
||||||
{
|
|
||||||
if (row_group_current < row_group_total)
|
|
||||||
throw Exception{"Got new data, but data from previous chunks was not read " +
|
|
||||||
std::to_string(row_group_current) + "/" + std::to_string(row_group_total),
|
|
||||||
ErrorCodes::CANNOT_READ_ALL_DATA};
|
|
||||||
|
|
||||||
file_data.clear();
|
|
||||||
{
|
|
||||||
WriteBufferFromString file_buffer(file_data);
|
|
||||||
copyData(in, file_buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::unique_ptr<arrow::Buffer> local_buffer = std::make_unique<arrow::Buffer>(file_data);
|
|
||||||
|
|
||||||
std::shared_ptr<arrow::io::RandomAccessFile> in_stream = std::make_shared<arrow::io::BufferReader>(*local_buffer);
|
|
||||||
|
|
||||||
bool ok = arrow::adapters::orc::ORCFileReader::Open(in_stream, arrow::default_memory_pool(),
|
|
||||||
&file_reader).ok();
|
|
||||||
if (!ok)
|
|
||||||
return res;
|
return res;
|
||||||
|
|
||||||
row_group_total = file_reader->NumberOfRows();
|
arrow::Status open_status = arrow::adapters::orc::ORCFileReader::Open(asArrowFile(in), arrow::default_memory_pool(), &file_reader);
|
||||||
row_group_current = 0;
|
if (!open_status.ok())
|
||||||
|
throw Exception(open_status.ToString(), ErrorCodes::BAD_ARGUMENTS);
|
||||||
}
|
|
||||||
else
|
|
||||||
return res;
|
|
||||||
|
|
||||||
if (row_group_current >= row_group_total)
|
|
||||||
return res;
|
|
||||||
|
|
||||||
std::shared_ptr<arrow::Table> table;
|
std::shared_ptr<arrow::Table> table;
|
||||||
|
|
||||||
arrow::Status read_status = file_reader->Read(&table);
|
arrow::Status read_status = file_reader->Read(&table);
|
||||||
|
if (!read_status.ok())
|
||||||
|
throw Exception{"Error while reading ORC data: " + read_status.ToString(),
|
||||||
|
ErrorCodes::CANNOT_READ_ALL_DATA};
|
||||||
|
|
||||||
ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, read_status, header, row_group_current, "ORC");
|
ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, header, "ORC");
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
@ -75,9 +51,6 @@ void ORCBlockInputFormat::resetParser()
|
|||||||
IInputFormat::resetParser();
|
IInputFormat::resetParser();
|
||||||
|
|
||||||
file_reader.reset();
|
file_reader.reset();
|
||||||
file_data.clear();
|
|
||||||
row_group_total = 0;
|
|
||||||
row_group_current = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void registerInputFormatProcessorORC(FormatFactory &factory)
|
void registerInputFormatProcessorORC(FormatFactory &factory)
|
||||||
|
@ -1,20 +1,14 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "config_formats.h"
|
#include "config_formats.h"
|
||||||
#include <DataStreams/IBlockInputStream.h>
|
|
||||||
#include <Processors/Chunk.h>
|
|
||||||
#include <Processors/Formats/IInputFormat.h>
|
|
||||||
|
|
||||||
#if USE_ORC
|
#if USE_ORC
|
||||||
|
|
||||||
#include "arrow/adapters/orc/adapter.h"
|
#include <Processors/Formats/IInputFormat.h>
|
||||||
#include "arrow/io/interfaces.h"
|
|
||||||
|
namespace arrow::adapters::orc { class ORCFileReader; }
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
class Context;
|
class ORCBlockInputFormat : public IInputFormat
|
||||||
|
|
||||||
class ORCBlockInputFormat: public IInputFormat
|
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ORCBlockInputFormat(ReadBuffer & in_, Block header_);
|
ORCBlockInputFormat(ReadBuffer & in_, Block header_);
|
||||||
@ -31,9 +25,6 @@ private:
|
|||||||
// TODO: check that this class implements every part of its parent
|
// TODO: check that this class implements every part of its parent
|
||||||
|
|
||||||
std::unique_ptr<arrow::adapters::orc::ORCFileReader> file_reader;
|
std::unique_ptr<arrow::adapters::orc::ORCFileReader> file_reader;
|
||||||
std::string file_data;
|
|
||||||
int row_group_total = 0;
|
|
||||||
int row_group_current = 0;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -2,112 +2,25 @@
|
|||||||
#if USE_PARQUET
|
#if USE_PARQUET
|
||||||
|
|
||||||
#include <Formats/FormatFactory.h>
|
#include <Formats/FormatFactory.h>
|
||||||
#include <IO/BufferBase.h>
|
|
||||||
#include <IO/ReadBufferFromMemory.h>
|
#include <IO/ReadBufferFromMemory.h>
|
||||||
#include <IO/ReadBufferFromFileDescriptor.h>
|
|
||||||
#include <IO/WriteBufferFromString.h>
|
|
||||||
#include <IO/WriteHelpers.h>
|
|
||||||
#include <IO/copyData.h>
|
#include <IO/copyData.h>
|
||||||
#include <arrow/api.h>
|
#include <arrow/api.h>
|
||||||
#include <arrow/io/api.h>
|
#include <arrow/io/api.h>
|
||||||
#include <arrow/status.h>
|
#include <arrow/status.h>
|
||||||
#include <parquet/arrow/reader.h>
|
#include <parquet/arrow/reader.h>
|
||||||
#include <parquet/file_reader.h>
|
#include <parquet/file_reader.h>
|
||||||
|
#include "ArrowBufferedStreams.h"
|
||||||
#include "ArrowColumnToCHColumn.h"
|
#include "ArrowColumnToCHColumn.h"
|
||||||
|
|
||||||
#include <common/logger_useful.h>
|
#include <common/logger_useful.h>
|
||||||
|
|
||||||
|
|
||||||
#include <sys/stat.h>
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
class RandomAccessFileFromSeekableReadBuffer : public arrow::io::RandomAccessFile
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
RandomAccessFileFromSeekableReadBuffer(SeekableReadBuffer & in_, off_t file_size_)
|
|
||||||
: in(in_)
|
|
||||||
, file_size(file_size_)
|
|
||||||
, is_closed(false)
|
|
||||||
{
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
arrow::Status GetSize(int64_t* size) override
|
|
||||||
{
|
|
||||||
*size = file_size;
|
|
||||||
return arrow::Status::OK();
|
|
||||||
}
|
|
||||||
|
|
||||||
arrow::Status Close() override
|
|
||||||
{
|
|
||||||
is_closed = true;
|
|
||||||
return arrow::Status::OK();
|
|
||||||
}
|
|
||||||
|
|
||||||
arrow::Status Tell(int64_t* position) const override
|
|
||||||
{
|
|
||||||
*position = in.getPosition();
|
|
||||||
return arrow::Status::OK();
|
|
||||||
}
|
|
||||||
|
|
||||||
bool closed() const override { return is_closed; }
|
|
||||||
|
|
||||||
arrow::Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override
|
|
||||||
{
|
|
||||||
*bytes_read = in.readBig(reinterpret_cast<char *>(out), nbytes);
|
|
||||||
return arrow::Status::OK();
|
|
||||||
}
|
|
||||||
|
|
||||||
arrow::Status Read(int64_t nbytes, std::shared_ptr<arrow::Buffer>* out) override
|
|
||||||
{
|
|
||||||
std::shared_ptr<arrow::Buffer> buf;
|
|
||||||
ARROW_RETURN_NOT_OK(arrow::AllocateBuffer(nbytes, &buf));
|
|
||||||
size_t n = in.readBig(reinterpret_cast<char *>(buf->mutable_data()), nbytes);
|
|
||||||
*out = arrow::SliceBuffer(buf, 0, n);
|
|
||||||
return arrow::Status::OK();
|
|
||||||
}
|
|
||||||
|
|
||||||
arrow::Status Seek(int64_t position) override
|
|
||||||
{
|
|
||||||
in.seek(position, SEEK_SET);
|
|
||||||
return arrow::Status::OK();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
SeekableReadBuffer & in;
|
|
||||||
off_t file_size;
|
|
||||||
bool is_closed;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
static std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in)
|
|
||||||
{
|
|
||||||
if (auto * fd_in = dynamic_cast<ReadBufferFromFileDescriptor *>(&in))
|
|
||||||
{
|
|
||||||
struct stat stat;
|
|
||||||
auto res = ::fstat(fd_in->getFD(), &stat);
|
|
||||||
// if fd is a regular file i.e. not stdin
|
|
||||||
if (res == 0 && S_ISREG(stat.st_mode))
|
|
||||||
{
|
|
||||||
return std::make_shared<RandomAccessFileFromSeekableReadBuffer>(*fd_in, stat.st_size);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// fallback to loading the entire file in memory
|
|
||||||
std::string file_data;
|
|
||||||
{
|
|
||||||
WriteBufferFromString file_buffer(file_data);
|
|
||||||
copyData(in, file_buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
return std::make_shared<arrow::io::BufferReader>(arrow::Buffer::FromString(std::move(file_data)));
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int BAD_ARGUMENTS;
|
extern const int BAD_ARGUMENTS;
|
||||||
|
extern const int CANNOT_READ_ALL_DATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
#define THROW_ARROW_NOT_OK(status) \
|
#define THROW_ARROW_NOT_OK(status) \
|
||||||
@ -120,8 +33,43 @@ namespace ErrorCodes
|
|||||||
ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_)
|
ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_)
|
||||||
: IInputFormat(std::move(header_), in_)
|
: IInputFormat(std::move(header_), in_)
|
||||||
{
|
{
|
||||||
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(in_), arrow::default_memory_pool(), &file_reader));
|
prepareReader();
|
||||||
|
}
|
||||||
|
|
||||||
|
Chunk ParquetBlockInputFormat::generate()
|
||||||
|
{
|
||||||
|
Chunk res;
|
||||||
|
const Block & header = getPort().getHeader();
|
||||||
|
|
||||||
|
if (row_group_current >= row_group_total)
|
||||||
|
return res;
|
||||||
|
|
||||||
|
std::shared_ptr<arrow::Table> table;
|
||||||
|
arrow::Status read_status = file_reader->ReadRowGroup(row_group_current, column_indices, &table);
|
||||||
|
if (!read_status.ok())
|
||||||
|
throw Exception{"Error while reading Parquet data: " + read_status.ToString(),
|
||||||
|
ErrorCodes::CANNOT_READ_ALL_DATA};
|
||||||
|
|
||||||
|
++row_group_current;
|
||||||
|
|
||||||
|
ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, header, "Parquet");
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ParquetBlockInputFormat::resetParser()
|
||||||
|
{
|
||||||
|
IInputFormat::resetParser();
|
||||||
|
|
||||||
|
file_reader.reset();
|
||||||
|
column_indices.clear();
|
||||||
|
prepareReader();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ParquetBlockInputFormat::prepareReader()
|
||||||
|
{
|
||||||
|
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(in), arrow::default_memory_pool(), &file_reader));
|
||||||
row_group_total = file_reader->num_row_groups();
|
row_group_total = file_reader->num_row_groups();
|
||||||
|
row_group_current = 0;
|
||||||
|
|
||||||
std::shared_ptr<arrow::Schema> schema;
|
std::shared_ptr<arrow::Schema> schema;
|
||||||
THROW_ARROW_NOT_OK(file_reader->GetSchema(&schema));
|
THROW_ARROW_NOT_OK(file_reader->GetSchema(&schema));
|
||||||
@ -135,29 +83,6 @@ ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Chunk ParquetBlockInputFormat::generate()
|
|
||||||
{
|
|
||||||
Chunk res;
|
|
||||||
const auto & header = getPort().getHeader();
|
|
||||||
|
|
||||||
if (row_group_current >= row_group_total)
|
|
||||||
return res;
|
|
||||||
|
|
||||||
std::shared_ptr<arrow::Table> table;
|
|
||||||
arrow::Status read_status = file_reader->ReadRowGroup(row_group_current, column_indices, &table);
|
|
||||||
ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, read_status, header, row_group_current, "Parquet");
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ParquetBlockInputFormat::resetParser()
|
|
||||||
{
|
|
||||||
IInputFormat::resetParser();
|
|
||||||
|
|
||||||
file_reader.reset();
|
|
||||||
row_group_total = 0;
|
|
||||||
row_group_current = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void registerInputFormatProcessorParquet(FormatFactory &factory)
|
void registerInputFormatProcessorParquet(FormatFactory &factory)
|
||||||
{
|
{
|
||||||
factory.registerInputFormatProcessor(
|
factory.registerInputFormatProcessor(
|
||||||
|
@ -5,27 +5,28 @@
|
|||||||
|
|
||||||
#include <Processors/Formats/IInputFormat.h>
|
#include <Processors/Formats/IInputFormat.h>
|
||||||
|
|
||||||
|
namespace parquet::arrow { class FileReader; }
|
||||||
|
|
||||||
namespace parquet { namespace arrow { class FileReader; } }
|
|
||||||
namespace arrow { class Buffer; }
|
namespace arrow { class Buffer; }
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
class Context;
|
|
||||||
|
|
||||||
class ParquetBlockInputFormat: public IInputFormat
|
class ParquetBlockInputFormat : public IInputFormat
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ParquetBlockInputFormat(ReadBuffer & in_, Block header_);
|
ParquetBlockInputFormat(ReadBuffer & in_, Block header_);
|
||||||
|
|
||||||
void resetParser() override;
|
void resetParser() override;
|
||||||
|
|
||||||
|
|
||||||
String getName() const override { return "ParquetBlockInputFormat"; }
|
String getName() const override { return "ParquetBlockInputFormat"; }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
Chunk generate() override;
|
Chunk generate() override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
void prepareReader();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::unique_ptr<parquet::arrow::FileReader> file_reader;
|
std::unique_ptr<parquet::arrow::FileReader> file_reader;
|
||||||
int row_group_total = 0;
|
int row_group_total = 0;
|
||||||
|
@ -3,28 +3,19 @@
|
|||||||
#if USE_PARQUET
|
#if USE_PARQUET
|
||||||
|
|
||||||
// TODO: clean includes
|
// TODO: clean includes
|
||||||
#include <Columns/ColumnDecimal.h>
|
|
||||||
#include <Columns/ColumnFixedString.h>
|
|
||||||
#include <Columns/ColumnNullable.h>
|
|
||||||
#include <Columns/ColumnString.h>
|
#include <Columns/ColumnString.h>
|
||||||
#include <Columns/ColumnVector.h>
|
#include <Columns/ColumnVector.h>
|
||||||
#include <Columns/ColumnsNumber.h>
|
|
||||||
#include <Common/assert_cast.h>
|
#include <Common/assert_cast.h>
|
||||||
#include <Core/ColumnWithTypeAndName.h>
|
|
||||||
#include <Core/callOnTypeIndex.h>
|
#include <Core/callOnTypeIndex.h>
|
||||||
#include <DataTypes/DataTypeDateTime.h>
|
|
||||||
#include <DataTypes/DataTypeNullable.h>
|
|
||||||
#include <DataTypes/DataTypesDecimal.h>
|
|
||||||
#include <DataStreams/SquashingBlockOutputStream.h>
|
#include <DataStreams/SquashingBlockOutputStream.h>
|
||||||
#include <Formats/FormatFactory.h>
|
#include <Formats/FormatFactory.h>
|
||||||
#include <IO/WriteHelpers.h>
|
#include <IO/WriteHelpers.h>
|
||||||
#include <arrow/api.h>
|
#include <arrow/api.h>
|
||||||
#include <arrow/io/api.h>
|
|
||||||
#include <arrow/util/decimal.h>
|
|
||||||
#include <arrow/util/memory.h>
|
#include <arrow/util/memory.h>
|
||||||
#include <parquet/arrow/writer.h>
|
#include <parquet/arrow/writer.h>
|
||||||
#include <parquet/exception.h>
|
|
||||||
#include <parquet/deprecated_io.h>
|
#include <parquet/deprecated_io.h>
|
||||||
|
#include "ArrowBufferedStreams.h"
|
||||||
|
#include "CHColumnToArrowColumn.h"
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -32,7 +23,6 @@ namespace DB
|
|||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int UNKNOWN_EXCEPTION;
|
extern const int UNKNOWN_EXCEPTION;
|
||||||
extern const int UNKNOWN_TYPE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
|
ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
|
||||||
@ -40,381 +30,17 @@ ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Blo
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
static void checkStatus(arrow::Status & status, const std::string & column_name)
|
|
||||||
{
|
|
||||||
if (!status.ok())
|
|
||||||
throw Exception{"Error with a parquet column \"" + column_name + "\": " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename NumericType, typename ArrowBuilderType>
|
|
||||||
static void fillArrowArrayWithNumericColumnData(
|
|
||||||
ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array, const PaddedPODArray<UInt8> * null_bytemap)
|
|
||||||
{
|
|
||||||
const PaddedPODArray<NumericType> & internal_data = assert_cast<const ColumnVector<NumericType> &>(*write_column).getData();
|
|
||||||
ArrowBuilderType builder;
|
|
||||||
arrow::Status status;
|
|
||||||
|
|
||||||
const UInt8 * arrow_null_bytemap_raw_ptr = nullptr;
|
|
||||||
PaddedPODArray<UInt8> arrow_null_bytemap;
|
|
||||||
if (null_bytemap)
|
|
||||||
{
|
|
||||||
/// Invert values since Arrow interprets 1 as a non-null value, while CH as a null
|
|
||||||
arrow_null_bytemap.reserve(null_bytemap->size());
|
|
||||||
for (auto is_null : *null_bytemap)
|
|
||||||
arrow_null_bytemap.emplace_back(!is_null);
|
|
||||||
|
|
||||||
arrow_null_bytemap_raw_ptr = arrow_null_bytemap.data();
|
|
||||||
}
|
|
||||||
|
|
||||||
if constexpr (std::is_same_v<NumericType, UInt8>)
|
|
||||||
status = builder.AppendValues(
|
|
||||||
reinterpret_cast<const uint8_t *>(internal_data.data()),
|
|
||||||
internal_data.size(),
|
|
||||||
reinterpret_cast<const uint8_t *>(arrow_null_bytemap_raw_ptr));
|
|
||||||
else
|
|
||||||
status = builder.AppendValues(internal_data.data(), internal_data.size(), reinterpret_cast<const uint8_t *>(arrow_null_bytemap_raw_ptr));
|
|
||||||
checkStatus(status, write_column->getName());
|
|
||||||
|
|
||||||
status = builder.Finish(&arrow_array);
|
|
||||||
checkStatus(status, write_column->getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename ColumnType>
|
|
||||||
static void fillArrowArrayWithStringColumnData(
|
|
||||||
ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array, const PaddedPODArray<UInt8> * null_bytemap)
|
|
||||||
{
|
|
||||||
const auto & internal_column = assert_cast<const ColumnType &>(*write_column);
|
|
||||||
arrow::StringBuilder builder;
|
|
||||||
arrow::Status status;
|
|
||||||
|
|
||||||
for (size_t string_i = 0, size = internal_column.size(); string_i < size; ++string_i)
|
|
||||||
{
|
|
||||||
if (null_bytemap && (*null_bytemap)[string_i])
|
|
||||||
{
|
|
||||||
status = builder.AppendNull();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
StringRef string_ref = internal_column.getDataAt(string_i);
|
|
||||||
status = builder.Append(string_ref.data, string_ref.size);
|
|
||||||
}
|
|
||||||
|
|
||||||
checkStatus(status, write_column->getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
status = builder.Finish(&arrow_array);
|
|
||||||
checkStatus(status, write_column->getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
static void fillArrowArrayWithDateColumnData(
|
|
||||||
ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array, const PaddedPODArray<UInt8> * null_bytemap)
|
|
||||||
{
|
|
||||||
const PaddedPODArray<UInt16> & internal_data = assert_cast<const ColumnVector<UInt16> &>(*write_column).getData();
|
|
||||||
//arrow::Date32Builder date_builder;
|
|
||||||
arrow::UInt16Builder builder;
|
|
||||||
arrow::Status status;
|
|
||||||
|
|
||||||
for (size_t value_i = 0, size = internal_data.size(); value_i < size; ++value_i)
|
|
||||||
{
|
|
||||||
if (null_bytemap && (*null_bytemap)[value_i])
|
|
||||||
status = builder.AppendNull();
|
|
||||||
else
|
|
||||||
/// Implicitly converts UInt16 to Int32
|
|
||||||
status = builder.Append(internal_data[value_i]);
|
|
||||||
checkStatus(status, write_column->getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
status = builder.Finish(&arrow_array);
|
|
||||||
checkStatus(status, write_column->getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
static void fillArrowArrayWithDateTimeColumnData(
|
|
||||||
ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array, const PaddedPODArray<UInt8> * null_bytemap)
|
|
||||||
{
|
|
||||||
const auto & internal_data = assert_cast<const ColumnVector<UInt32> &>(*write_column).getData();
|
|
||||||
//arrow::Date64Builder builder;
|
|
||||||
arrow::UInt32Builder builder;
|
|
||||||
arrow::Status status;
|
|
||||||
|
|
||||||
for (size_t value_i = 0, size = internal_data.size(); value_i < size; ++value_i)
|
|
||||||
{
|
|
||||||
if (null_bytemap && (*null_bytemap)[value_i])
|
|
||||||
status = builder.AppendNull();
|
|
||||||
else
|
|
||||||
/// Implicitly converts UInt16 to Int32
|
|
||||||
//status = date_builder.Append(static_cast<int64_t>(internal_data[value_i]) * 1000); // now ms. TODO check other units
|
|
||||||
status = builder.Append(internal_data[value_i]);
|
|
||||||
|
|
||||||
checkStatus(status, write_column->getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
status = builder.Finish(&arrow_array);
|
|
||||||
checkStatus(status, write_column->getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename DataType>
|
|
||||||
static void fillArrowArrayWithDecimalColumnData(
|
|
||||||
ColumnPtr write_column,
|
|
||||||
std::shared_ptr<arrow::Array> & arrow_array,
|
|
||||||
const PaddedPODArray<UInt8> * null_bytemap,
|
|
||||||
const DataType * decimal_type)
|
|
||||||
{
|
|
||||||
const auto & column = static_cast<const typename DataType::ColumnType &>(*write_column);
|
|
||||||
arrow::DecimalBuilder builder(arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale()));
|
|
||||||
arrow::Status status;
|
|
||||||
|
|
||||||
for (size_t value_i = 0, size = column.size(); value_i < size; ++value_i)
|
|
||||||
{
|
|
||||||
if (null_bytemap && (*null_bytemap)[value_i])
|
|
||||||
status = builder.AppendNull();
|
|
||||||
else
|
|
||||||
status = builder.Append(
|
|
||||||
arrow::Decimal128(reinterpret_cast<const uint8_t *>(&column.getElement(value_i).value))); // TODO: try copy column
|
|
||||||
|
|
||||||
checkStatus(status, write_column->getName());
|
|
||||||
}
|
|
||||||
status = builder.Finish(&arrow_array);
|
|
||||||
checkStatus(status, write_column->getName());
|
|
||||||
|
|
||||||
/* TODO column copy
|
|
||||||
const auto & internal_data = static_cast<const typename DataType::ColumnType &>(*write_column).getData();
|
|
||||||
//ArrowBuilderType numeric_builder;
|
|
||||||
arrow::DecimalBuilder builder(arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale()));
|
|
||||||
arrow::Status status;
|
|
||||||
|
|
||||||
const uint8_t * arrow_null_bytemap_raw_ptr = nullptr;
|
|
||||||
PaddedPODArray<UInt8> arrow_null_bytemap;
|
|
||||||
if (null_bytemap)
|
|
||||||
{
|
|
||||||
/// Invert values since Arrow interprets 1 as a non-null value, while CH as a null
|
|
||||||
arrow_null_bytemap.reserve(null_bytemap->size());
|
|
||||||
for (size_t i = 0, size = null_bytemap->size(); i < size; ++i)
|
|
||||||
arrow_null_bytemap.emplace_back(1 ^ (*null_bytemap)[i]);
|
|
||||||
|
|
||||||
arrow_null_bytemap_raw_ptr = arrow_null_bytemap.data();
|
|
||||||
}
|
|
||||||
|
|
||||||
if constexpr (std::is_same_v<NumericType, UInt8>)
|
|
||||||
status = builder.AppendValues(
|
|
||||||
reinterpret_cast<const uint8_t *>(internal_data.data()),
|
|
||||||
internal_data.size(),
|
|
||||||
reinterpret_cast<const uint8_t *>(arrow_null_bytemap_raw_ptr));
|
|
||||||
else
|
|
||||||
status = builder.AppendValues(internal_data.data(), internal_data.size(), reinterpret_cast<const uint8_t *>(arrow_null_bytemap_raw_ptr));
|
|
||||||
checkStatus(status, write_column->getName());
|
|
||||||
|
|
||||||
status = builder.Finish(&arrow_array);
|
|
||||||
checkStatus(status, write_column->getName());
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
|
|
||||||
#define FOR_INTERNAL_NUMERIC_TYPES(M) \
|
|
||||||
M(UInt8, arrow::UInt8Builder) \
|
|
||||||
M(Int8, arrow::Int8Builder) \
|
|
||||||
M(UInt16, arrow::UInt16Builder) \
|
|
||||||
M(Int16, arrow::Int16Builder) \
|
|
||||||
M(UInt32, arrow::UInt32Builder) \
|
|
||||||
M(Int32, arrow::Int32Builder) \
|
|
||||||
M(UInt64, arrow::UInt64Builder) \
|
|
||||||
M(Int64, arrow::Int64Builder) \
|
|
||||||
M(Float32, arrow::FloatBuilder) \
|
|
||||||
M(Float64, arrow::DoubleBuilder)
|
|
||||||
|
|
||||||
const std::unordered_map<String, std::shared_ptr<arrow::DataType>> internal_type_to_arrow_type = {
|
|
||||||
{"UInt8", arrow::uint8()},
|
|
||||||
{"Int8", arrow::int8()},
|
|
||||||
{"UInt16", arrow::uint16()},
|
|
||||||
{"Int16", arrow::int16()},
|
|
||||||
{"UInt32", arrow::uint32()},
|
|
||||||
{"Int32", arrow::int32()},
|
|
||||||
{"UInt64", arrow::uint64()},
|
|
||||||
{"Int64", arrow::int64()},
|
|
||||||
{"Float32", arrow::float32()},
|
|
||||||
{"Float64", arrow::float64()},
|
|
||||||
|
|
||||||
//{"Date", arrow::date64()},
|
|
||||||
//{"Date", arrow::date32()},
|
|
||||||
{"Date", arrow::uint16()}, // CHECK
|
|
||||||
//{"DateTime", arrow::date64()}, // BUG! saves as date32
|
|
||||||
{"DateTime", arrow::uint32()},
|
|
||||||
|
|
||||||
// TODO: ClickHouse can actually store non-utf8 strings!
|
|
||||||
{"String", arrow::utf8()},
|
|
||||||
{"FixedString", arrow::utf8()},
|
|
||||||
};
|
|
||||||
|
|
||||||
static const PaddedPODArray<UInt8> * extractNullBytemapPtr(ColumnPtr column)
|
|
||||||
{
|
|
||||||
ColumnPtr null_column = assert_cast<const ColumnNullable &>(*column).getNullMapColumnPtr();
|
|
||||||
const PaddedPODArray<UInt8> & null_bytemap = assert_cast<const ColumnVector<UInt8> &>(*null_column).getData();
|
|
||||||
return &null_bytemap;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class OstreamOutputStream : public arrow::io::OutputStream
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
explicit OstreamOutputStream(WriteBuffer & ostr_) : ostr(ostr_) { is_open = true; }
|
|
||||||
~OstreamOutputStream() override = default;
|
|
||||||
|
|
||||||
// FileInterface
|
|
||||||
::arrow::Status Close() override
|
|
||||||
{
|
|
||||||
is_open = false;
|
|
||||||
return ::arrow::Status::OK();
|
|
||||||
}
|
|
||||||
|
|
||||||
::arrow::Status Tell(int64_t* position) const override
|
|
||||||
{
|
|
||||||
*position = total_length;
|
|
||||||
return ::arrow::Status::OK();
|
|
||||||
}
|
|
||||||
|
|
||||||
bool closed() const override { return !is_open; }
|
|
||||||
|
|
||||||
// Writable
|
|
||||||
::arrow::Status Write(const void* data, int64_t length) override
|
|
||||||
{
|
|
||||||
ostr.write(reinterpret_cast<const char *>(data), length);
|
|
||||||
total_length += length;
|
|
||||||
return ::arrow::Status::OK();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
WriteBuffer & ostr;
|
|
||||||
int64_t total_length = 0;
|
|
||||||
bool is_open = false;
|
|
||||||
|
|
||||||
PARQUET_DISALLOW_COPY_AND_ASSIGN(OstreamOutputStream);
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
void ParquetBlockOutputFormat::consume(Chunk chunk)
|
void ParquetBlockOutputFormat::consume(Chunk chunk)
|
||||||
{
|
{
|
||||||
const auto & header = getPort(PortKind::Main).getHeader();
|
const Block & header = getPort(PortKind::Main).getHeader();
|
||||||
const size_t columns_num = chunk.getNumColumns();
|
const size_t columns_num = chunk.getNumColumns();
|
||||||
|
std::shared_ptr<arrow::Table> arrow_table;
|
||||||
|
|
||||||
/// For arrow::Schema and arrow::Table creation
|
CHColumnToArrowColumn::chChunkToArrowTable(arrow_table, header, chunk, columns_num, "Parquet");
|
||||||
std::vector<std::shared_ptr<arrow::Field>> arrow_fields;
|
|
||||||
std::vector<std::shared_ptr<arrow::Array>> arrow_arrays;
|
|
||||||
arrow_fields.reserve(columns_num);
|
|
||||||
arrow_arrays.reserve(columns_num);
|
|
||||||
|
|
||||||
for (size_t column_i = 0; column_i < columns_num; ++column_i)
|
|
||||||
{
|
|
||||||
// TODO: constructed every iteration
|
|
||||||
ColumnWithTypeAndName column = header.safeGetByPosition(column_i);
|
|
||||||
column.column = chunk.getColumns()[column_i];
|
|
||||||
|
|
||||||
const bool is_column_nullable = column.type->isNullable();
|
|
||||||
const auto & column_nested_type
|
|
||||||
= is_column_nullable ? static_cast<const DataTypeNullable *>(column.type.get())->getNestedType() : column.type;
|
|
||||||
const std::string column_nested_type_name = column_nested_type->getFamilyName();
|
|
||||||
|
|
||||||
if (isDecimal(column_nested_type))
|
|
||||||
{
|
|
||||||
const auto add_decimal_field = [&](const auto & types) -> bool {
|
|
||||||
using Types = std::decay_t<decltype(types)>;
|
|
||||||
using ToDataType = typename Types::LeftType;
|
|
||||||
|
|
||||||
if constexpr (
|
|
||||||
std::is_same_v<
|
|
||||||
ToDataType,
|
|
||||||
DataTypeDecimal<
|
|
||||||
Decimal32>> || std::is_same_v<ToDataType, DataTypeDecimal<Decimal64>> || std::is_same_v<ToDataType, DataTypeDecimal<Decimal128>>)
|
|
||||||
{
|
|
||||||
const auto & decimal_type = static_cast<const ToDataType *>(column_nested_type.get());
|
|
||||||
arrow_fields.emplace_back(std::make_shared<arrow::Field>(
|
|
||||||
column.name, arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale()), is_column_nullable));
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
callOnIndexAndDataType<void>(column_nested_type->getTypeId(), add_decimal_field);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (internal_type_to_arrow_type.find(column_nested_type_name) == internal_type_to_arrow_type.end())
|
|
||||||
{
|
|
||||||
throw Exception{"The type \"" + column_nested_type_name + "\" of a column \"" + column.name
|
|
||||||
+ "\""
|
|
||||||
" is not supported for conversion into a Parquet data format",
|
|
||||||
ErrorCodes::UNKNOWN_TYPE};
|
|
||||||
}
|
|
||||||
|
|
||||||
arrow_fields.emplace_back(std::make_shared<arrow::Field>(column.name, internal_type_to_arrow_type.at(column_nested_type_name), is_column_nullable));
|
|
||||||
}
|
|
||||||
|
|
||||||
std::shared_ptr<arrow::Array> arrow_array;
|
|
||||||
|
|
||||||
ColumnPtr nested_column
|
|
||||||
= is_column_nullable ? assert_cast<const ColumnNullable &>(*column.column).getNestedColumnPtr() : column.column;
|
|
||||||
const PaddedPODArray<UInt8> * null_bytemap = is_column_nullable ? extractNullBytemapPtr(column.column) : nullptr;
|
|
||||||
|
|
||||||
if ("String" == column_nested_type_name)
|
|
||||||
{
|
|
||||||
fillArrowArrayWithStringColumnData<ColumnString>(nested_column, arrow_array, null_bytemap);
|
|
||||||
}
|
|
||||||
else if ("FixedString" == column_nested_type_name)
|
|
||||||
{
|
|
||||||
fillArrowArrayWithStringColumnData<ColumnFixedString>(nested_column, arrow_array, null_bytemap);
|
|
||||||
}
|
|
||||||
else if ("Date" == column_nested_type_name)
|
|
||||||
{
|
|
||||||
fillArrowArrayWithDateColumnData(nested_column, arrow_array, null_bytemap);
|
|
||||||
}
|
|
||||||
else if ("DateTime" == column_nested_type_name)
|
|
||||||
{
|
|
||||||
fillArrowArrayWithDateTimeColumnData(nested_column, arrow_array, null_bytemap);
|
|
||||||
}
|
|
||||||
|
|
||||||
else if (isDecimal(column_nested_type))
|
|
||||||
{
|
|
||||||
auto fill_decimal = [&](const auto & types) -> bool
|
|
||||||
{
|
|
||||||
using Types = std::decay_t<decltype(types)>;
|
|
||||||
using ToDataType = typename Types::LeftType;
|
|
||||||
if constexpr (
|
|
||||||
std::is_same_v<
|
|
||||||
ToDataType,
|
|
||||||
DataTypeDecimal<
|
|
||||||
Decimal32>> || std::is_same_v<ToDataType, DataTypeDecimal<Decimal64>> || std::is_same_v<ToDataType, DataTypeDecimal<Decimal128>>)
|
|
||||||
{
|
|
||||||
const auto & decimal_type = static_cast<const ToDataType *>(column_nested_type.get());
|
|
||||||
fillArrowArrayWithDecimalColumnData(nested_column, arrow_array, null_bytemap, decimal_type);
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
callOnIndexAndDataType<void>(column_nested_type->getTypeId(), fill_decimal);
|
|
||||||
}
|
|
||||||
#define DISPATCH(CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE) \
|
|
||||||
else if (#CPP_NUMERIC_TYPE == column_nested_type_name) \
|
|
||||||
{ \
|
|
||||||
fillArrowArrayWithNumericColumnData<CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE>(nested_column, arrow_array, null_bytemap); \
|
|
||||||
}
|
|
||||||
|
|
||||||
FOR_INTERNAL_NUMERIC_TYPES(DISPATCH)
|
|
||||||
#undef DISPATCH
|
|
||||||
else
|
|
||||||
{
|
|
||||||
throw Exception{"Internal type \"" + column_nested_type_name + "\" of a column \"" + column.name
|
|
||||||
+ "\""
|
|
||||||
" is not supported for conversion into a Parquet data format",
|
|
||||||
ErrorCodes::UNKNOWN_TYPE};
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
arrow_arrays.emplace_back(std::move(arrow_array));
|
|
||||||
}
|
|
||||||
|
|
||||||
std::shared_ptr<arrow::Schema> arrow_schema = std::make_shared<arrow::Schema>(std::move(arrow_fields));
|
|
||||||
|
|
||||||
std::shared_ptr<arrow::Table> arrow_table = arrow::Table::Make(arrow_schema, arrow_arrays);
|
|
||||||
|
|
||||||
auto sink = std::make_shared<OstreamOutputStream>(out);
|
|
||||||
|
|
||||||
if (!file_writer)
|
if (!file_writer)
|
||||||
{
|
{
|
||||||
|
auto sink = std::make_shared<ArrowBufferedOutputStream>(out);
|
||||||
|
|
||||||
parquet::WriterProperties::Builder builder;
|
parquet::WriterProperties::Builder builder;
|
||||||
#if USE_SNAPPY
|
#if USE_SNAPPY
|
||||||
@ -448,7 +74,6 @@ void ParquetBlockOutputFormat::finalize()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void registerOutputFormatProcessorParquet(FormatFactory & factory)
|
void registerOutputFormatProcessorParquet(FormatFactory & factory)
|
||||||
{
|
{
|
||||||
factory.registerOutputFormatProcessor(
|
factory.registerOutputFormatProcessor(
|
||||||
@ -468,7 +93,6 @@ void registerOutputFormatProcessorParquet(FormatFactory & factory)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#else
|
#else
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -479,5 +103,4 @@ void registerOutputFormatProcessorParquet(FormatFactory &)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -813,6 +813,14 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static ExpressionActionsPtr createProjection(const Pipe & pipe, const MergeTreeData & data)
|
||||||
|
{
|
||||||
|
const auto & header = pipe.getHeader();
|
||||||
|
auto projection = std::make_shared<ExpressionActions>(header.getNamesAndTypesList(), data.global_context);
|
||||||
|
projection->add(ExpressionAction::project(header.getNames()));
|
||||||
|
return projection;
|
||||||
|
}
|
||||||
|
|
||||||
Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
|
Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
|
||||||
RangesInDataParts && parts,
|
RangesInDataParts && parts,
|
||||||
size_t num_streams,
|
size_t num_streams,
|
||||||
@ -999,13 +1007,19 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
|
|||||||
sort_description.emplace_back(data.sorting_key_columns[j],
|
sort_description.emplace_back(data.sorting_key_columns[j],
|
||||||
input_sorting_info->direction, 1);
|
input_sorting_info->direction, 1);
|
||||||
|
|
||||||
|
/// Project input columns to drop columns from sorting_key_prefix_expr
|
||||||
|
/// to allow execute the same expression later.
|
||||||
|
/// NOTE: It may lead to double computation of expression.
|
||||||
|
auto projection = createProjection(pipes.back(), data);
|
||||||
for (auto & pipe : pipes)
|
for (auto & pipe : pipes)
|
||||||
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), sorting_key_prefix_expr));
|
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), sorting_key_prefix_expr));
|
||||||
|
|
||||||
auto merging_sorted = std::make_shared<MergingSortedTransform>(
|
auto merging_sorted = std::make_shared<MergingSortedTransform>(
|
||||||
pipes.back().getHeader(), pipes.size(), sort_description, max_block_size);
|
pipes.back().getHeader(), pipes.size(), sort_description, max_block_size);
|
||||||
|
|
||||||
res.emplace_back(std::move(pipes), std::move(merging_sorted));
|
Pipe merged(std::move(pipes), std::move(merging_sorted));
|
||||||
|
merged.addSimpleTransform(std::make_shared<ExpressionTransform>(merged.getHeader(), projection));
|
||||||
|
res.emplace_back(std::move(merged));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
res.emplace_back(std::move(pipes.front()));
|
res.emplace_back(std::move(pipes.front()));
|
||||||
@ -1051,6 +1065,10 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
|
|||||||
use_uncompressed_cache = false;
|
use_uncompressed_cache = false;
|
||||||
|
|
||||||
Pipes pipes;
|
Pipes pipes;
|
||||||
|
/// Project input columns to drop columns from sorting_key_expr
|
||||||
|
/// to allow execute the same expression later.
|
||||||
|
/// NOTE: It may lead to double computation of expression.
|
||||||
|
ExpressionActionsPtr projection;
|
||||||
|
|
||||||
for (const auto & part : parts)
|
for (const auto & part : parts)
|
||||||
{
|
{
|
||||||
@ -1061,6 +1079,9 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
|
|||||||
virt_columns, part.part_index_in_query);
|
virt_columns, part.part_index_in_query);
|
||||||
|
|
||||||
Pipe pipe(std::move(source_processor));
|
Pipe pipe(std::move(source_processor));
|
||||||
|
if (!projection)
|
||||||
|
projection = createProjection(pipe, data);
|
||||||
|
|
||||||
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), data.sorting_key_expr));
|
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), data.sorting_key_expr));
|
||||||
pipes.emplace_back(std::move(pipe));
|
pipes.emplace_back(std::move(pipe));
|
||||||
}
|
}
|
||||||
@ -1133,6 +1154,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
|
|||||||
if (merged_processor)
|
if (merged_processor)
|
||||||
{
|
{
|
||||||
Pipe pipe(std::move(pipes), std::move(merged_processor));
|
Pipe pipe(std::move(pipes), std::move(merged_processor));
|
||||||
|
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), projection));
|
||||||
pipes = Pipes();
|
pipes = Pipes();
|
||||||
pipes.emplace_back(std::move(pipe));
|
pipes.emplace_back(std::move(pipe));
|
||||||
}
|
}
|
||||||
|
@ -106,7 +106,10 @@ void StorageMergeTree::shutdown()
|
|||||||
shutdown_called = true;
|
shutdown_called = true;
|
||||||
|
|
||||||
/// Unlock all waiting mutations
|
/// Unlock all waiting mutations
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mutation_wait_mutex);
|
||||||
mutation_wait_event.notify_all();
|
mutation_wait_event.notify_all();
|
||||||
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -499,7 +502,10 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
|
|||||||
global_context.getMergeList().cancelPartMutations({}, to_kill->block_number);
|
global_context.getMergeList().cancelPartMutations({}, to_kill->block_number);
|
||||||
to_kill->removeFile();
|
to_kill->removeFile();
|
||||||
LOG_TRACE(log, "Cancelled part mutations and removed mutation file " << mutation_id);
|
LOG_TRACE(log, "Cancelled part mutations and removed mutation file " << mutation_id);
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mutation_wait_mutex);
|
||||||
mutation_wait_event.notify_all();
|
mutation_wait_event.notify_all();
|
||||||
|
}
|
||||||
|
|
||||||
/// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately.
|
/// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately.
|
||||||
merging_mutating_task_handle->wake();
|
merging_mutating_task_handle->wake();
|
||||||
@ -768,8 +774,11 @@ bool StorageMergeTree::tryMutatePart()
|
|||||||
write_part_log({});
|
write_part_log({});
|
||||||
|
|
||||||
/// Notify all, who wait for this or previous mutations
|
/// Notify all, who wait for this or previous mutations
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mutation_wait_mutex);
|
||||||
mutation_wait_event.notify_all();
|
mutation_wait_event.notify_all();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
tagger->exception_message = getCurrentExceptionMessage(false);
|
tagger->exception_message = getCurrentExceptionMessage(false);
|
||||||
|
@ -30,6 +30,7 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
|
|||||||
client1.send('CREATE LIVE VIEW test.lv AS SELECT toStartOfDay(time) AS day, location, avg(temperature) FROM test.mt GROUP BY day, location ORDER BY day, location')
|
client1.send('CREATE LIVE VIEW test.lv AS SELECT toStartOfDay(time) AS day, location, avg(temperature) FROM test.mt GROUP BY day, location ORDER BY day, location')
|
||||||
client1.expect(prompt)
|
client1.expect(prompt)
|
||||||
client1.send('WATCH test.lv FORMAT CSV')
|
client1.send('WATCH test.lv FORMAT CSV')
|
||||||
|
client1.expect(r'0.*1' + end_of_block)
|
||||||
client2.send("INSERT INTO test.mt VALUES ('2019-01-01 00:00:00','New York',60),('2019-01-01 00:10:00','New York',70)")
|
client2.send("INSERT INTO test.mt VALUES ('2019-01-01 00:00:00','New York',60),('2019-01-01 00:10:00','New York',70)")
|
||||||
client2.expect(prompt)
|
client2.expect(prompt)
|
||||||
client1.expect(r'"2019-01-01 00:00:00","New York",65,2')
|
client1.expect(r'"2019-01-01 00:00:00","New York",65,2')
|
8
tests/queries/0_stateless/01137_order_by_func.reference
Normal file
8
tests/queries/0_stateless/01137_order_by_func.reference
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
2020-05-05 01:00:00 0
|
||||||
|
2020-05-05 01:00:00 1
|
||||||
|
2020-05-05 01:00:00 2
|
||||||
|
2020-05-05 01:00:00 3
|
||||||
|
2020-05-05 01:00:00 4
|
||||||
|
111 9999999 9999999
|
||||||
|
111 9999998 9999998
|
||||||
|
111 9999997 9999997
|
25
tests/queries/0_stateless/01137_order_by_func.sql
Normal file
25
tests/queries/0_stateless/01137_order_by_func.sql
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
DROP TABLE IF EXISTS pk_func;
|
||||||
|
CREATE TABLE pk_func(d DateTime, ui UInt32) ENGINE = MergeTree ORDER BY toDate(d);
|
||||||
|
|
||||||
|
INSERT INTO pk_func SELECT '2020-05-05 01:00:00', number FROM numbers(1000000);
|
||||||
|
INSERT INTO pk_func SELECT '2020-05-06 01:00:00', number FROM numbers(1000000);
|
||||||
|
INSERT INTO pk_func SELECT '2020-05-07 01:00:00', number FROM numbers(1000000);
|
||||||
|
|
||||||
|
SELECT * FROM pk_func ORDER BY toDate(d), ui LIMIT 5;
|
||||||
|
|
||||||
|
DROP TABLE pk_func;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS nORX;
|
||||||
|
CREATE TABLE nORX (`A` Int64, `B` Int64, `V` Int64) ENGINE = MergeTree ORDER BY (A, negate(B));
|
||||||
|
INSERT INTO nORX SELECT 111, number, number FROM numbers(10000000);
|
||||||
|
|
||||||
|
SELECT *
|
||||||
|
FROM nORX
|
||||||
|
WHERE B >= 1000
|
||||||
|
ORDER BY
|
||||||
|
A ASC,
|
||||||
|
-B ASC
|
||||||
|
LIMIT 3
|
||||||
|
SETTINGS max_threads = 1;
|
||||||
|
|
||||||
|
DROP TABLE nORX;
|
@ -0,0 +1,3 @@
|
|||||||
|
2020-05-05 704982704
|
||||||
|
2020-05-06 704982704
|
||||||
|
2020-05-07 704982704
|
10
tests/queries/0_stateless/01137_order_by_func_final.sql
Normal file
10
tests/queries/0_stateless/01137_order_by_func_final.sql
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
DROP TABLE IF EXISTS pk_func;
|
||||||
|
CREATE TABLE pk_func(d DateTime, ui UInt32) ENGINE = SummingMergeTree ORDER BY toDate(d);
|
||||||
|
|
||||||
|
INSERT INTO pk_func SELECT '2020-05-05 01:00:00', number FROM numbers(100000);
|
||||||
|
INSERT INTO pk_func SELECT '2020-05-06 01:00:00', number FROM numbers(100000);
|
||||||
|
INSERT INTO pk_func SELECT '2020-05-07 01:00:00', number FROM numbers(100000);
|
||||||
|
|
||||||
|
SELECT toDate(d), ui FROM pk_func FINAL;
|
||||||
|
|
||||||
|
DROP TABLE pk_func;
|
62
tests/queries/0_stateless/01273_arrow.reference
Normal file
62
tests/queries/0_stateless/01273_arrow.reference
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
9999
|
||||||
|
9998
|
||||||
|
9997
|
||||||
|
9996
|
||||||
|
9995
|
||||||
|
9994
|
||||||
|
9993
|
||||||
|
9992
|
||||||
|
9991
|
||||||
|
9990
|
||||||
|
99999
|
||||||
|
99998
|
||||||
|
99997
|
||||||
|
99996
|
||||||
|
99995
|
||||||
|
99994
|
||||||
|
99993
|
||||||
|
99992
|
||||||
|
99991
|
||||||
|
99990
|
||||||
|
2
|
||||||
|
1
|
||||||
|
0
|
||||||
|
999
|
||||||
|
998
|
||||||
|
997
|
||||||
|
996
|
||||||
|
995
|
||||||
|
994
|
||||||
|
993
|
||||||
|
992
|
||||||
|
991
|
||||||
|
990
|
||||||
|
ContextLock Number of times the lock of Context was acquired or tried to acquire. This is global lock.
|
||||||
|
Query Number of queries to be interpreted and potentially executed. Does not include queries that failed to parse or were rejected due to AST size limits, quota limits or limits on the number of simultaneously running queries. May include internal queries initiated by ClickHouse itself. Does not count subqueries.
|
||||||
|
original:
|
||||||
|
-128 0 -32768 0 -2147483648 0 -9223372036854775808 0 -1.032 -1.064 string-1 fixedstring-1\0\0 2003-04-05 2003-02-03 04:05:06
|
||||||
|
-108 108 -1016 1116 -1032 1132 -1064 1164 -1.032 -1.064 string-0 fixedstring\0\0\0\0 2001-02-03 2002-02-03 04:05:06
|
||||||
|
127 255 32767 65535 2147483647 4294967295 9223372036854775807 9223372036854775807 -1.032 -1.064 string-2 fixedstring-2\0\0 2004-06-07 2004-02-03 04:05:06
|
||||||
|
converted:
|
||||||
|
-128 0 -32768 0 -2147483648 0 -9223372036854775808 0 -1.032 -1.064 string-1 fixedstring-1\0\0 2003-04-05 2003-02-03 04:05:06
|
||||||
|
-108 108 -1016 1116 -1032 1132 -1064 1164 -1.032 -1.064 string-0 fixedstring\0\0\0\0 2001-02-03 2002-02-03 04:05:06
|
||||||
|
127 255 32767 65535 2147483647 4294967295 9223372036854775807 9223372036854775807 -1.032 -1.064 string-2 fixedstring-2\0\0 2004-06-07 2004-02-03 04:05:06
|
||||||
|
diff:
|
||||||
|
dest:
|
||||||
|
79 81 82 83 84 85 86 87 88 89 str01\0\0\0\0\0\0\0\0\0\0 fstr1\0\0\0\0\0\0\0\0\0\0 2003-03-04 1970-01-01 06:29:04
|
||||||
|
80 81 82 83 84 85 86 87 88 89 str02 fstr2\0\0\0\0\0\0\0\0\0\0 2005-03-04 2006-08-09 10:11:12
|
||||||
|
min:
|
||||||
|
-128 0 0 0 0 0 0 0 -1 -1 string-1\0\0\0\0\0\0\0 fixedstring-1\0\0 2003-04-05 2003-02-03
|
||||||
|
-108 108 8 92 -8 108 -40 -116 -1 -1 string-0\0\0\0\0\0\0\0 fixedstring\0\0\0\0 2001-02-03 2002-02-03
|
||||||
|
79 81 82 83 84 85 86 87 88 89 str01\0\0\0\0\0\0\0\0\0\0 fstr1\0\0\0\0\0\0\0\0\0\0 2003-03-04 2004-05-06
|
||||||
|
127 -1 -1 -1 -1 -1 -1 -1 -1 -1 string-2\0\0\0\0\0\0\0 fixedstring-2\0\0 2004-06-07 2004-02-03
|
||||||
|
max:
|
||||||
|
-128 0 -32768 0 -2147483648 0 -9223372036854775808 0 -1 -1 string-1 fixedstring-1\0\0 1970-01-01 06:22:27 2003-02-03 04:05:06
|
||||||
|
-108 108 -1016 1116 -1032 1132 -1064 1164 -1 -1 string-0 fixedstring\0\0\0\0 1970-01-01 06:09:16 2002-02-03 04:05:06
|
||||||
|
80 81 82 83 84 85 86 87 88 89 str02 fstr2 2005-03-04 05:06:07 2006-08-09 10:11:12
|
||||||
|
127 255 32767 65535 2147483647 4294967295 9223372036854775807 9223372036854775807 -1 -1 string-2 fixedstring-2\0\0 1970-01-01 06:29:36 2004-02-03 04:05:06
|
||||||
|
dest from null:
|
||||||
|
-128 0 -32768 0 -2147483648 0 -9223372036854775808 0 -1.032 -1.064 string-1 fixedstring-1\0\0 2003-04-05 2003-02-03 04:05:06
|
||||||
|
-108 108 -1016 1116 -1032 1132 -1064 1164 -1.032 -1.064 string-0 fixedstring\0\0\0\0 2001-02-03 2002-02-03 04:05:06
|
||||||
|
127 255 32767 65535 2147483647 4294967295 9223372036854775807 9223372036854775807 -1.032 -1.064 string-2 fixedstring-2\0\0 2004-06-07 2004-02-03 04:05:06
|
||||||
|
\N \N \N \N \N \N \N \N \N \N \N \N \N \N
|
115
tests/queries/0_stateless/01273_arrow.sh
Executable file
115
tests/queries/0_stateless/01273_arrow.sh
Executable file
@ -0,0 +1,115 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
. $CUR_DIR/../shell_config.sh
|
||||||
|
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS contributors"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="CREATE TABLE contributors (name String) ENGINE = Memory"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM system.contributors ORDER BY name DESC FORMAT Arrow" | ${CLICKHOUSE_CLIENT} --query="INSERT INTO contributors FORMAT Arrow"
|
||||||
|
# random results
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM contributors LIMIT 10" > /dev/null
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE contributors"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS arrow_numbers"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="CREATE TABLE arrow_numbers (number UInt64) ENGINE = Memory"
|
||||||
|
# less than default block size (65k)
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM system.numbers LIMIT 10000 FORMAT Arrow" | ${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_numbers FORMAT Arrow"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_numbers ORDER BY number DESC LIMIT 10"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="TRUNCATE TABLE arrow_numbers"
|
||||||
|
|
||||||
|
# More than default block size
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM system.numbers LIMIT 100000 FORMAT Arrow" | ${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_numbers FORMAT Arrow"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_numbers ORDER BY number DESC LIMIT 10"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="TRUNCATE TABLE arrow_numbers"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --max_block_size=2 --query="SELECT * FROM system.numbers LIMIT 3 FORMAT Arrow" | ${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_numbers FORMAT Arrow"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_numbers ORDER BY number DESC LIMIT 10"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="TRUNCATE TABLE arrow_numbers"
|
||||||
|
${CLICKHOUSE_CLIENT} --max_block_size=1 --query="SELECT * FROM system.numbers LIMIT 1000 FORMAT Arrow" | ${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_numbers FORMAT Arrow"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_numbers ORDER BY number DESC LIMIT 10"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE arrow_numbers"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS arrow_events"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="CREATE TABLE arrow_events (event String, value UInt64, description String) ENGINE = Memory"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM system.events FORMAT Arrow" | ${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_events FORMAT Arrow"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT event, description FROM arrow_events WHERE event IN ('ContextLock', 'Query') ORDER BY event"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE arrow_events"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS arrow_types1"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS arrow_types2"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS arrow_types3"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS arrow_types4"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="CREATE TABLE arrow_types1 (int8 Int8, uint8 UInt8, int16 Int16, uint16 UInt16, int32 Int32, uint32 UInt32, int64 Int64, uint64 UInt64, float32 Float32, float64 Float64, string String, fixedstring FixedString(15), date Date, datetime DateTime) ENGINE = Memory"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="CREATE TABLE arrow_types2 (int8 Int8, uint8 UInt8, int16 Int16, uint16 UInt16, int32 Int32, uint32 UInt32, int64 Int64, uint64 UInt64, float32 Float32, float64 Float64, string String, fixedstring FixedString(15), date Date, datetime DateTime) ENGINE = Memory"
|
||||||
|
# convert min type
|
||||||
|
${CLICKHOUSE_CLIENT} --query="CREATE TABLE arrow_types3 (int8 Int8, uint8 Int8, int16 Int8, uint16 Int8, int32 Int8, uint32 Int8, int64 Int8, uint64 Int8, float32 Int8, float64 Int8, string FixedString(15), fixedstring FixedString(15), date Date, datetime Date) ENGINE = Memory"
|
||||||
|
# convert max type
|
||||||
|
${CLICKHOUSE_CLIENT} --query="CREATE TABLE arrow_types4 (int8 Int64, uint8 Int64, int16 Int64, uint16 Int64, int32 Int64, uint32 Int64, int64 Int64, uint64 Int64, float32 Int64, float64 Int64, string String, fixedstring String, date DateTime, datetime DateTime) ENGINE = Memory"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_types1 values ( -108, 108, -1016, 1116, -1032, 1132, -1064, 1164, -1.032, -1.064, 'string-0', 'fixedstring', '2001-02-03', '2002-02-03 04:05:06')"
|
||||||
|
|
||||||
|
# min
|
||||||
|
${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_types1 values ( -128, 0, -32768, 0, -2147483648, 0, -9223372036854775808, 0, -1.032, -1.064, 'string-1', 'fixedstring-1', '2003-04-05', '2003-02-03 04:05:06')"
|
||||||
|
|
||||||
|
# max
|
||||||
|
${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_types1 values ( 127, 255, 32767, 65535, 2147483647, 4294967295, 9223372036854775807, 9223372036854775807, -1.032, -1.064, 'string-2', 'fixedstring-2', '2004-06-07', '2004-02-03 04:05:06')"
|
||||||
|
|
||||||
|
# 'SELECT -127,-128,-129,126,127,128,255,256,257,-32767,-32768,-32769,32766,32767,32768,65535,65536,65537, -2147483647,-2147483648,-2147483649,2147483646,2147483647,2147483648,4294967295,4294967296,4294967297, -9223372036854775807,-9223372036854775808,9223372036854775806,9223372036854775807,9223372036854775808,18446744073709551615';
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types1 FORMAT Arrow" | ${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_types2 FORMAT Arrow"
|
||||||
|
|
||||||
|
echo original:
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types1 ORDER BY int8" | tee ${CLICKHOUSE_TMP}/arrow_all_types_1.dump
|
||||||
|
echo converted:
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types2 ORDER BY int8" | tee ${CLICKHOUSE_TMP}/arrow_all_types_2.dump
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types1 ORDER BY int8 FORMAT Arrow" > ${CLICKHOUSE_TMP}/arrow_all_types_1.arrow
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types2 ORDER BY int8 FORMAT Arrow" > ${CLICKHOUSE_TMP}/arrow_all_types_2.arrow
|
||||||
|
echo diff:
|
||||||
|
diff ${CLICKHOUSE_TMP}/arrow_all_types_1.dump ${CLICKHOUSE_TMP}/arrow_all_types_2.dump
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="TRUNCATE TABLE arrow_types2"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_types3 values ( 79, 81, 82, 83, 84, 85, 86, 87, 88, 89, 'str01', 'fstr1', '2003-03-04', '2004-05-06')"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types3 ORDER BY int8 FORMAT Arrow" | ${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_types2 FORMAT Arrow"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types1 ORDER BY int8 FORMAT Arrow" | ${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_types3 FORMAT Arrow"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_types4 values ( 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 'str02', 'fstr2', '2005-03-04 05:06:07', '2006-08-09 10:11:12')"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types4 ORDER BY int8 FORMAT Arrow" | ${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_types2 FORMAT Arrow"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types1 ORDER BY int8 FORMAT Arrow" | ${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_types4 FORMAT Arrow"
|
||||||
|
|
||||||
|
echo dest:
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types2 ORDER BY int8"
|
||||||
|
echo min:
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types3 ORDER BY int8"
|
||||||
|
echo max:
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types4 ORDER BY int8"
|
||||||
|
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS arrow_types5"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS arrow_types6"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="TRUNCATE TABLE arrow_types2"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="CREATE TABLE arrow_types5 (int8 Nullable(Int8), uint8 Nullable(UInt8), int16 Nullable(Int16), uint16 Nullable(UInt16), int32 Nullable(Int32), uint32 Nullable(UInt32), int64 Nullable(Int64), uint64 Nullable(UInt64), float32 Nullable(Float32), float64 Nullable(Float64), string Nullable(String), fixedstring Nullable(FixedString(15)), date Nullable(Date), datetime Nullable(DateTime)) ENGINE = Memory"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="CREATE TABLE arrow_types6 (int8 Nullable(Int8), uint8 Nullable(UInt8), int16 Nullable(Int16), uint16 Nullable(UInt16), int32 Nullable(Int32), uint32 Nullable(UInt32), int64 Nullable(Int64), uint64 Nullable(UInt64), float32 Nullable(Float32), float64 Nullable(Float64), string Nullable(String), fixedstring Nullable(FixedString(15)), date Nullable(Date), datetime Nullable(DateTime)) ENGINE = Memory"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_types5 values ( NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types5 ORDER BY int8 FORMAT Arrow" > ${CLICKHOUSE_TMP}/arrow_all_types_5.arrow
|
||||||
|
#${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types5 FORMAT Arrow" | ${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_types6 FORMAT Arrow"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types5 ORDER BY int8 FORMAT Arrow" | ${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_types6 FORMAT Arrow"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types1 ORDER BY int8 FORMAT Arrow" | ${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_types6 FORMAT Arrow"
|
||||||
|
echo dest from null:
|
||||||
|
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_types6 ORDER BY int8"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE arrow_types5"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE arrow_types6"
|
||||||
|
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE arrow_types1"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE arrow_types2"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE arrow_types3"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE arrow_types4"
|
||||||
|
|
4
tests/queries/0_stateless/01273_arrow_load.reference
Normal file
4
tests/queries/0_stateless/01273_arrow_load.reference
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
0 127 32767 2147483647 9223372036854775807 0 0 0 0 2.6480716e36 1e-45 5e-324 Hello 2010-10-10 2020-05-05 03:36:28 2020-05-05 03:36:28
|
||||||
|
1 -128 -32768 -2147483648 -9223372036854775808 255 65535 4294967295 18446744073709551615 0 3.4028235e38 1.7976931348623157e308 World 2011-11-11 2020-04-04 03:00:00 2020-04-04 03:00:00
|
||||||
|
0 127 32767 2147483647 9223372036854775807 0 0 0 0 2.6480716e36 1e-45 5e-324 Hello 2010-10-10 2020-05-05 03:36:28 2020-05-05 03:36:28
|
||||||
|
1 -128 -32768 -2147483648 -9223372036854775808 255 65535 4294967295 18446744073709551615 0 3.4028235e38 1.7976931348623157e308 World 2011-11-11 2020-04-04 03:00:00 2020-04-04 03:00:00
|
17
tests/queries/0_stateless/01273_arrow_load.sh
Executable file
17
tests/queries/0_stateless/01273_arrow_load.sh
Executable file
@ -0,0 +1,17 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
. $CUR_DIR/../shell_config.sh
|
||||||
|
|
||||||
|
CB_DIR=$(dirname "$CLICKHOUSE_CLIENT_BINARY")
|
||||||
|
[ "$CB_DIR" == "." ] && ROOT_DIR=$CUR_DIR/../../../..
|
||||||
|
[ "$CB_DIR" != "." ] && BUILD_DIR=$CB_DIR/../..
|
||||||
|
[ -z "$ROOT_DIR" ] && ROOT_DIR=$CB_DIR/../../..
|
||||||
|
|
||||||
|
DATA_FILE=$CUR_DIR/data_arrow/test.arrow
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS arrow_load"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="CREATE TABLE arrow_load (bool UInt8, int8 Int8, int16 Int16, int32 Int32, int64 Int64, uint8 UInt8, uint16 UInt16, uint32 UInt32, uint64 UInt64, halffloat Float32, float Float32, double Float64, string String, date32 Date, date64 DateTime, timestamp DateTime) ENGINE = Memory"
|
||||||
|
cat $DATA_FILE | ${CLICKHOUSE_CLIENT} -q "insert into arrow_load format Arrow"
|
||||||
|
${CLICKHOUSE_CLIENT} --query="select * from arrow_load"
|
||||||
|
|
BIN
tests/queries/0_stateless/data_arrow/test.arrow
Normal file
BIN
tests/queries/0_stateless/data_arrow/test.arrow
Normal file
Binary file not shown.
@ -39,8 +39,8 @@ class ExpectTimeoutError(Exception):
|
|||||||
if self.pattern:
|
if self.pattern:
|
||||||
s += 'for %s ' % repr(self.pattern.pattern)
|
s += 'for %s ' % repr(self.pattern.pattern)
|
||||||
if self.buffer:
|
if self.buffer:
|
||||||
s += 'buffer %s ' % repr(self.buffer[:])
|
s += 'buffer %s' % repr(self.buffer[:])
|
||||||
s += 'or \'%s\'' % ','.join(['%x' % ord(c) for c in self.buffer[:]])
|
#s += ' or \'%s\'' % ','.join(['%x' % ord(c) for c in self.buffer[:]])
|
||||||
return s
|
return s
|
||||||
|
|
||||||
class IO(object):
|
class IO(object):
|
||||||
|
Loading…
Reference in New Issue
Block a user