Reduce memory usage for some formats

This commit is contained in:
kssenii 2021-10-31 22:53:24 +03:00
parent 2940d9fd19
commit 45ea820297
27 changed files with 418 additions and 132 deletions

View File

@ -594,6 +594,7 @@
M(624, BAD_FILE_TYPE) \
M(625, IO_SETUP_ERROR) \
M(626, HTTP_RANGE_NOT_SATISFIABLE) \
M(627, FILE_SIZE_UNKNOWN) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -261,6 +261,8 @@
M(RemoteFSUnprefetchedReads, "Number of reads from unprefetched buffer") \
M(RemoteFSBuffers, "Number of buffers created for asynchronous reading from remote filesystem") \
\
M(ReadBufferSeekCancelConnection, "Number of seeks which lead to new connection (s3, http)") \
\
M(SleepFunctionCalls, "Number of times a sleep function (sleep, sleepEachRow) has been called.") \
M(SleepFunctionMicroseconds, "Time spent sleeping due to a sleep function call.") \
\

View File

@ -511,6 +511,7 @@ class IColumn;
M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \
M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \
M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \
M(UInt64, remote_read_min_bytes_for_seek, DBMS_DEFAULT_BUFFER_SIZE, "Min bytes required for remote read ( url, s3 ) to do seek, instead for read with ignore", 0) \
\
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. Makes sense only for inserts via HTTP protocol. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
@ -570,6 +571,7 @@ class IColumn;
M(Bool, input_format_arrow_import_nested, false, "Allow to insert array of structs into Nested table in Arrow input format.", 0) \
M(Bool, input_format_orc_import_nested, false, "Allow to insert array of structs into Nested table in ORC input format.", 0) \
M(Bool, input_format_parquet_import_nested, false, "Allow to insert array of structs into Nested table in Parquet input format.", 0) \
M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \
\
M(DateTimeInputFormat, date_time_input_format, FormatSettings::DateTimeInputFormat::Basic, "Method to read DateTime from text input formats. Possible values: 'basic' and 'best_effort'.", 0) \
M(DateTimeOutputFormat, date_time_output_format, FormatSettings::DateTimeOutputFormat::Simple, "Method to write DateTime to text output. Possible values: 'simple', 'iso', 'unix_timestamp'.", 0) \

View File

@ -88,6 +88,9 @@ void ReadBufferFromRemoteFSGather::initialize()
{
current_buf = createImplementationBuffer(file_path, read_until_position);
current_buf_idx = i;
if (auto * in = dynamic_cast<SeekableReadBufferWithSize *>(current_buf.get()))
in->setReadType(SeekableReadBufferWithSize::ReadType::DISK_READ);
}
current_buf->seek(current_buf_offset, SEEK_SET);

View File

@ -53,7 +53,7 @@ std::unique_ptr<ReadBuffer> ReadBufferFromWebServer::initialize()
}
else
{
range = { .begin = static_cast<size_t>(offset) };
range = { .begin = static_cast<size_t>(offset), .end = std::nullopt };
LOG_DEBUG(log, "Reading from offset: {}", offset);
}

View File

@ -112,6 +112,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.arrow.import_nested = settings.input_format_arrow_import_nested;
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
format_settings.seekable_read = settings.input_format_allow_seeks;
/// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context
if (format_settings.schema.is_server)

View File

@ -30,6 +30,8 @@ struct FormatSettings
bool null_as_default = true;
bool decimal_trailing_zeros = false;
bool seekable_read = true;
enum class DateTimeInputFormat
{
Basic, /// Default format for fast parsing: YYYY-MM-DD hh:mm:ss (ISO-8601 without fractional part and timezone) or NNNNNNNNNN unix timestamp.

View File

@ -8,6 +8,7 @@
#include <aws/s3/S3Client.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <base/logger_useful.h>
#include <base/sleep.h>
@ -20,6 +21,7 @@ namespace ProfileEvents
extern const Event S3ReadMicroseconds;
extern const Event S3ReadBytes;
extern const Event S3ReadRequestsErrors;
extern const Event ReadBufferSeekCancelConnection;
}
namespace DB
@ -34,9 +36,14 @@ namespace ErrorCodes
ReadBufferFromS3::ReadBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, const String & key_,
UInt64 max_single_read_retries_, const ReadSettings & settings_, bool use_external_buffer_, size_t read_until_position_)
: SeekableReadBuffer(nullptr, 0)
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
UInt64 max_single_read_retries_,
const ReadSettings & settings_,
bool use_external_buffer_,
size_t read_until_position_)
: SeekableReadBufferWithSize(nullptr, 0)
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, key(key_)
@ -142,9 +149,12 @@ bool ReadBufferFromS3::nextImpl()
return true;
}
off_t ReadBufferFromS3::seek(off_t offset_, int whence)
{
if (impl)
bool restricted_seek = read_type == SeekableReadBufferWithSize::ReadType::DISK_READ;
if (impl && restricted_seek)
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (whence != SEEK_SET)
@ -153,11 +163,57 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
if (offset_ < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
offset = offset_;
if (!restricted_seek)
{
if (!working_buffer.empty()
&& size_t(offset_) >= offset - working_buffer.size()
&& offset_ < offset)
{
pos = working_buffer.end() - (offset - offset_);
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
return getPosition();
}
auto position = getPosition();
if (offset_ > position)
{
size_t diff = offset_ - position;
if (diff < read_settings.remote_read_min_bytes_for_seek)
{
ignore(diff);
return offset_;
}
}
pos = working_buffer.end();
if (impl)
{
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
impl.reset();
}
}
offset = offset_;
return offset;
}
std::optional<size_t> ReadBufferFromS3::getTotalSize()
{
if (file_size)
return file_size;
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(bucket);
request.SetKey(key);
auto outcome = client_ptr->HeadObject(request);
auto head_result = outcome.GetResultWithOwnership();
file_size = head_result.GetContentLength();
return file_size;
}
off_t ReadBufferFromS3::getPosition()
{
return offset - available();

View File

@ -23,7 +23,7 @@ namespace DB
/**
* Perform S3 HTTP GET request and provide response to read.
*/
class ReadBufferFromS3 : public SeekableReadBuffer
class ReadBufferFromS3 : public SeekableReadBufferWithSize
{
private:
std::shared_ptr<Aws::S3::S3Client> client_ptr;
@ -49,13 +49,18 @@ public:
bool nextImpl() override;
off_t seek(off_t off, int whence) override;
off_t getPosition() override;
std::optional<size_t> getTotalSize() override;
private:
std::unique_ptr<ReadBuffer> initialize();
ReadSettings read_settings;
bool use_external_buffer;
off_t read_until_position = 0;
};

View File

@ -77,6 +77,8 @@ struct ReadSettings
size_t remote_fs_read_max_backoff_ms = 10000;
size_t remote_fs_read_backoff_max_tries = 4;
size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE;
size_t http_max_tries = 1;
size_t http_retry_initial_backoff_ms = 100;
size_t http_retry_max_backoff_ms = 1600;

View File

@ -18,11 +18,15 @@
#include <Poco/Version.h>
#include <Common/DNSResolver.h>
#include <Common/RemoteHostFilter.h>
#include <Common/config.h>
#include <base/logger_useful.h>
#include <Poco/URIStreamFactory.h>
#include <Common/config.h>
namespace ProfileEvents
{
extern const Event ReadBufferSeekCancelConnection;
}
namespace DB
{
@ -34,6 +38,8 @@ namespace ErrorCodes
extern const int TOO_MANY_REDIRECTS;
extern const int HTTP_RANGE_NOT_SATISFIABLE;
extern const int BAD_ARGUMENTS;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
template <typename SessionPtr>
@ -83,7 +89,7 @@ public:
namespace detail
{
template <typename UpdatableSessionPtr>
class ReadWriteBufferFromHTTPBase : public ReadBuffer
class ReadWriteBufferFromHTTPBase : public SeekableReadBufferWithSize
{
public:
using HTTPHeaderEntry = std::tuple<std::string, std::string>;
@ -114,7 +120,7 @@ namespace detail
size_t buffer_size;
bool use_external_buffer;
size_t bytes_read = 0;
size_t offset_from_begin_pos = 0;
Range read_range;
/// Delayed exception in case retries with partial content are not satisfiable.
@ -124,13 +130,13 @@ namespace detail
ReadSettings settings;
Poco::Logger * log;
std::istream * call(Poco::URI uri_, Poco::Net::HTTPResponse & response)
std::istream * call(Poco::URI uri_, Poco::Net::HTTPResponse & response, const std::string & method_)
{
// With empty path poco will send "POST HTTP/1.1" its bug.
if (uri_.getPath().empty())
uri_.setPath("/");
Poco::Net::HTTPRequest request(method, uri_.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
Poco::Net::HTTPRequest request(method_, uri_.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request.setHost(uri_.getHost()); // use original, not resolved host name in header
if (out_stream_callback)
@ -145,13 +151,13 @@ namespace detail
* Add range header if we have some passed range (for disk web)
* or if we want to retry GET request on purpose.
*/
bool with_partial_content = (read_range.begin || read_range.end) || retry_with_range_header;
bool with_partial_content = read_range.begin || read_range.end || retry_with_range_header;
if (with_partial_content)
{
if (read_range.end)
request.set("Range", fmt::format("bytes={}-{}", read_range.begin + bytes_read, *read_range.end));
request.set("Range", fmt::format("bytes={}-{}", read_range.begin + offset_from_begin_pos, *read_range.end));
else
request.set("Range", fmt::format("bytes={}-", read_range.begin + bytes_read));
request.set("Range", fmt::format("bytes={}-", read_range.begin + offset_from_begin_pos));
}
if (!credentials.getUsername().empty())
@ -191,6 +197,46 @@ namespace detail
}
}
off_t getOffset() const
{
return read_range.begin + offset_from_begin_pos;
}
std::optional<size_t> getTotalSize() override
{
if (read_range.end)
return *read_range.end - read_range.begin;
Poco::Net::HTTPResponse response;
for (size_t i = 0; i < 10; ++i)
{
try
{
call(uri, response, Poco::Net::HTTPRequest::HTTP_HEAD);
while (isRedirect(response.getStatus()))
{
Poco::URI uri_redirect(response.get("Location"));
remote_host_filter.checkURL(uri_redirect);
session->updateSession(uri_redirect);
istr = call(uri_redirect, response, method);
}
break;
}
catch (...)
{
}
}
if (response.hasContentLength())
read_range.end = read_range.begin + response.getContentLength();
return read_range.end;
}
public:
using NextCallback = std::function<void(size_t)>;
using OutStreamCallback = std::function<void(std::ostream &)>;
@ -208,7 +254,7 @@ namespace detail
const RemoteHostFilter & remote_host_filter_ = {},
bool delay_initialization = false,
bool use_external_buffer_ = false)
: ReadBuffer(nullptr, 0)
: SeekableReadBufferWithSize(nullptr, 0)
, uri {uri_}
, method {!method_.empty() ? method_ : out_stream_callback_ ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}
, session {session_}
@ -237,7 +283,7 @@ namespace detail
void initialize()
{
Poco::Net::HTTPResponse response;
istr = call(uri, response);
istr = call(uri, response, method);
while (isRedirect(response.getStatus()))
{
@ -246,12 +292,11 @@ namespace detail
session->updateSession(uri_redirect);
istr = call(uri_redirect, response);
istr = call(uri_redirect, response, method);
}
if (!bytes_read && !read_range.end && response.hasContentLength())
if (!offset_from_begin_pos && !read_range.end && response.hasContentLength())
read_range.end = response.getContentLength();
try
{
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size);
@ -281,9 +326,10 @@ namespace detail
if (next_callback)
next_callback(count());
if (read_range.end && bytes_read == read_range.end.value())
if (read_range.end && static_cast<size_t>(getOffset()) == read_range.end.value())
return false;
if (impl)
{
if (use_external_buffer)
@ -342,14 +388,14 @@ namespace detail
* Retry request unconditionally if nothing has beed read yet.
* Otherwise if it is GET method retry with range header starting from bytes_read.
*/
bool can_retry_request = !bytes_read || method == Poco::Net::HTTPRequest::HTTP_GET;
bool can_retry_request = !offset_from_begin_pos || method == Poco::Net::HTTPRequest::HTTP_GET;
if (!can_retry_request)
throw;
LOG_ERROR(log,
"HTTP request to `{}` failed at try {}/{} with bytes read: {}. "
"Error: {}. (Current backoff wait is {}/{} ms)",
uri.toString(), i, settings.http_max_tries, bytes_read, e.what(),
uri.toString(), i, settings.http_max_tries, offset_from_begin_pos, e.what(),
milliseconds_to_wait, settings.http_retry_max_backoff_ms);
retry_with_range_header = true;
@ -372,9 +418,60 @@ namespace detail
internal_buffer = impl->buffer();
working_buffer = internal_buffer;
offset_from_begin_pos += working_buffer.size();
return true;
}
off_t getPosition() override
{
return getOffset() - available();
}
off_t seek(off_t offset_, int whence) override
{
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset_ < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
auto current_offset = getOffset();
if (!working_buffer.empty()
&& size_t(offset_) >= current_offset - working_buffer.size()
&& offset_ < current_offset)
{
pos = working_buffer.end() - (current_offset - offset_);
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
return getPosition();
}
auto position = getPosition();
if (offset_ > position)
{
size_t diff = offset_ - position;
if (diff < settings.remote_read_min_bytes_for_seek)
{
ignore(diff);
return offset_;
}
}
if (impl)
{
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
impl.reset();
}
pos = working_buffer.end();
read_range.begin = offset_;
offset_from_begin_pos = 0;
return offset_;
}
std::string getResponseCookie(const std::string & name, const std::string & def) const
{
for (const auto & cookie : cookies)

View File

@ -1,6 +1,7 @@
#pragma once
#include <IO/ReadBuffer.h>
#include <optional>
namespace DB
{
@ -34,4 +35,36 @@ public:
};
using SeekableReadBufferPtr = std::shared_ptr<SeekableReadBuffer>;
class SeekableReadBufferWithSize : public SeekableReadBuffer
{
public:
SeekableReadBufferWithSize(Position ptr, size_t size)
: SeekableReadBuffer(ptr, size) {}
SeekableReadBufferWithSize(Position ptr, size_t size, size_t offset)
: SeekableReadBuffer(ptr, size, offset) {}
/// set std::nullopt in case it is impossible to find out total size.
virtual std::optional<size_t> getTotalSize() = 0;
/**
* Some buffers might have different seek restrictions according to where it is used.
* For example, ReadBufferFromS3 and ReadBufferFromWebServer, when used for reading
* from remote disks, require some additional invariants and restrictions, which
* are not needed in other cases.
*/
enum class ReadType
{
DEFAULT,
DISK_READ
};
void setReadType(ReadType type) { read_type = type; }
protected:
ReadType read_type = ReadType::DEFAULT;
std::optional<size_t> file_size;
};
}

View File

@ -3083,6 +3083,8 @@ ReadSettings Context::getReadSettings() const
res.remote_fs_read_max_backoff_ms = settings.remote_fs_read_max_backoff_ms;
res.remote_fs_read_backoff_max_tries = settings.remote_fs_read_backoff_max_tries;
res.remote_read_min_bytes_for_seek = settings.remote_read_min_bytes_for_seek;
res.local_fs_buffer_size = settings.max_read_buffer_size;
res.direct_io_threshold = settings.min_bytes_to_use_direct_io;
res.mmap_threshold = settings.min_bytes_to_use_mmap_io;

View File

@ -94,7 +94,7 @@ void ArrowBlockInputFormat::prepareReader()
}
else
{
auto file_reader_status = arrow::ipc::RecordBatchFileReader::Open(asArrowFile(*in));
auto file_reader_status = arrow::ipc::RecordBatchFileReader::Open(asArrowFile(*in, format_settings));
if (!file_reader_status.ok())
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Error while opening a table: {}", file_reader_status.status().ToString());

View File

@ -6,6 +6,7 @@
#if USE_ARROW || USE_ORC || USE_PARQUET
#include <Common/assert_cast.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromString.h>
#include <IO/copyData.h>
@ -19,6 +20,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int FILE_SIZE_UNKNOWN;
}
ArrowBufferedOutputStream::ArrowBufferedOutputStream(WriteBuffer & out_) : out{out_}, is_open{true}
{
}
@ -46,9 +52,22 @@ RandomAccessFileFromSeekableReadBuffer::RandomAccessFileFromSeekableReadBuffer(S
{
}
RandomAccessFileFromSeekableReadBuffer::RandomAccessFileFromSeekableReadBuffer(SeekableReadBufferWithSize & in_)
: in{in_}, is_open{true}
{
}
arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::GetSize()
{
return arrow::Result<int64_t>(file_size);
if (!file_size)
{
auto * buf_with_size = dynamic_cast<SeekableReadBufferWithSize *>(&in);
if (buf_with_size)
file_size = buf_with_size->getTotalSize();
if (!file_size)
throw Exception(ErrorCodes::FILE_SIZE_UNKNOWN, "Cannot find out size of file");
}
return arrow::Result<int64_t>(*file_size);
}
arrow::Status RandomAccessFileFromSeekableReadBuffer::Close()
@ -121,7 +140,7 @@ arrow::Status ArrowInputStreamFromReadBuffer::Close()
return arrow::Status();
}
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in)
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in, const FormatSettings & settings)
{
if (auto * fd_in = dynamic_cast<ReadBufferFromFileDescriptor *>(&in))
{
@ -131,6 +150,11 @@ std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in)
if (res == 0 && S_ISREG(stat.st_mode))
return std::make_shared<RandomAccessFileFromSeekableReadBuffer>(*fd_in, stat.st_size);
}
else if (auto * seekable_in = dynamic_cast<SeekableReadBufferWithSize *>(&in))
{
if (settings.seekable_read)
return std::make_shared<RandomAccessFileFromSeekableReadBuffer>(*seekable_in);
}
// fallback to loading the entire file in memory
std::string file_data;

View File

@ -4,14 +4,18 @@
#if USE_ARROW || USE_ORC || USE_PARQUET
#include <arrow/io/interfaces.h>
#include <optional>
namespace DB
{
class ReadBuffer;
class SeekableReadBuffer;
class WriteBuffer;
class SeekableReadBuffer;
class SeekableReadBufferWithSize;
struct FormatSettings;
class ArrowBufferedOutputStream : public arrow::io::OutputStream
{
public:
@ -40,6 +44,8 @@ class RandomAccessFileFromSeekableReadBuffer : public arrow::io::RandomAccessFil
public:
RandomAccessFileFromSeekableReadBuffer(SeekableReadBuffer & in_, off_t file_size_);
RandomAccessFileFromSeekableReadBuffer(SeekableReadBufferWithSize & in_);
arrow::Result<int64_t> GetSize() override;
arrow::Status Close() override;
@ -56,7 +62,7 @@ public:
private:
SeekableReadBuffer & in;
off_t file_size;
std::optional<off_t> file_size;
bool is_open = false;
ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromSeekableReadBuffer);
@ -80,7 +86,7 @@ private:
ARROW_DISALLOW_COPY_AND_ASSIGN(ArrowInputStreamFromReadBuffer);
};
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in);
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in, const FormatSettings & settings);
}

View File

@ -518,13 +518,6 @@ ArrowColumnToCHColumn::ArrowColumnToCHColumn(
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table)
{
Columns columns_list;
UInt64 num_rows = 0;
columns_list.reserve(header.rows());
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::ChunkedArray>>;
NameToColumnPtr name_to_column_ptr;
for (const auto& column_name : table->ColumnNames())
{
@ -532,6 +525,16 @@ void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arr
name_to_column_ptr[column_name] = arrow_column;
}
arrowColumnsToCHChunk(res, name_to_column_ptr);
}
void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr)
{
Columns columns_list;
UInt64 num_rows = 0;
columns_list.reserve(header.rows());
std::unordered_map<String, BlockPtr> nested_tables;
for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i)
{
@ -587,7 +590,5 @@ void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arr
res.setColumns(columns_list, num_rows);
}
}
#endif

View File

@ -19,6 +19,8 @@ class Chunk;
class ArrowColumnToCHColumn
{
public:
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::ChunkedArray>>;
ArrowColumnToCHColumn(const Block & header_, const std::string & format_name_, bool import_nested_);
/// Constructor that create header by arrow schema. It will be useful for inserting
@ -27,6 +29,8 @@ public:
void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table);
void arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr);
private:
const Block header;
const std::string format_name;

View File

@ -5,7 +5,6 @@
#include <IO/ReadBufferFromMemory.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <arrow/adapters/orc/adapter.h>
#include <arrow/io/memory.h>
#include "ArrowBufferedStreams.h"
#include "ArrowColumnToCHColumn.h"
@ -39,23 +38,53 @@ Chunk ORCBlockInputFormat::generate()
if (!file_reader)
prepareReader();
if (stripe_current >= stripe_total)
return res;
while (true)
{
if (!batch_reader)
{
arrow::Status reader_status = file_reader->NextStripeReader(
DBMS_DEFAULT_BUFFER_SIZE, include_indices, &batch_reader);
if (!reader_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Failed to create batch reader: {}",
reader_status.ToString());
}
std::shared_ptr<arrow::RecordBatch> batch_result;
arrow::Status batch_status = file_reader->ReadStripe(stripe_current, include_indices, &batch_result);
if (!batch_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Error while reading batch of ORC data: {}", batch_status.ToString());
if (!batch_reader)
break;
auto table_result = arrow::Table::FromRecordBatches({batch_result});
if (!table_result.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Error while reading batch of ORC data: {}", table_result.status().ToString());
std::shared_ptr<arrow::RecordBatch> batch_result;
arrow::Status batch_status = batch_reader->ReadNext(&batch_result);
if (!batch_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Error while reading batch of ORC data: {}",
batch_status.ToString());
++stripe_current;
if (!batch_result || !batch_result->num_rows())
{
batch_reader.reset();
continue;
}
auto table_result = arrow::Table::FromRecordBatches({batch_result});
if (!table_result.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Error while reading batch of ORC data: {}",
table_result.status().ToString());
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::ChunkedArray>>;
NameToColumnPtr name_to_column_ptr;
for (const auto& column_name : column_names)
{
arrow::ArrayVector vec = {batch_result->GetColumnByName(column_name)};
std::shared_ptr<arrow::ChunkedArray> arrow_column = std::make_shared<arrow::ChunkedArray>(vec);
name_to_column_ptr[column_name] = arrow_column;
}
arrow_column_to_ch_column->arrowColumnsToCHChunk(res, name_to_column_ptr);
break;
}
arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result);
return res;
}
@ -93,7 +122,7 @@ static size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)
void ORCBlockInputFormat::prepareReader()
{
THROW_ARROW_NOT_OK(arrow::adapters::orc::ORCFileReader::Open(asArrowFile(*in), arrow::default_memory_pool(), &file_reader));
THROW_ARROW_NOT_OK(arrow::adapters::orc::ORCFileReader::Open(asArrowFile(*in, format_settings), arrow::default_memory_pool(), &file_reader));
stripe_total = file_reader->NumberOfStripes();
stripe_current = 0;
@ -117,6 +146,7 @@ void ORCBlockInputFormat::prepareReader()
const auto & name = schema->field(i)->name();
if (getPort().getHeader().has(name) || nested_table_names.contains(name))
{
column_names.push_back(name);
for (int j = 0; j != indexes_count; ++j)
include_indices.push_back(index + j);
}

View File

@ -5,7 +5,12 @@
#include <Processors/Formats/IInputFormat.h>
#include <Formats/FormatSettings.h>
namespace arrow::adapters::orc { class ORCFileReader; }
#include <arrow/adapters/orc/adapter.h>
namespace arrow::adapters::orc
{
class ORCFileReader;
}
namespace DB
{
@ -30,8 +35,12 @@ private:
std::unique_ptr<arrow::adapters::orc::ORCFileReader> file_reader;
std::shared_ptr<arrow::RecordBatchReader> batch_reader;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
std::vector<String> column_names;
int stripe_total = 0;
int stripe_current = 0;

View File

@ -93,7 +93,7 @@ static size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)
void ParquetBlockInputFormat::prepareReader()
{
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(*in), arrow::default_memory_pool(), &file_reader));
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(*in, format_settings), arrow::default_memory_pool(), &file_reader));
row_group_total = file_reader->num_row_groups();
row_group_current = 0;

View File

@ -74,6 +74,14 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Fail to seek HDFS file: {}, error: {}", hdfs_uri, std::string(hdfsGetLastError()));
}
std::optional<size_t> getTotalSize() const
{
auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str());
if (!file_info)
return std::nullopt;
return file_info->mSize;
}
bool nextImpl() override
{
if (!initialized)
@ -115,19 +123,10 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
return false;
}
off_t seek(off_t offset_, int whence) override
off_t seek(off_t offset_, int) override
{
if (initialized)
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset_ < 0)
throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", std::to_string(offset_));
offset = offset_;
initialize();
return offset;
}
@ -145,11 +144,15 @@ ReadBufferFromHDFS::ReadBufferFromHDFS(
const String & hdfs_file_path_,
const Poco::Util::AbstractConfiguration & config_,
size_t buf_size_, size_t read_until_position_)
: SeekableReadBuffer(nullptr, 0)
: SeekableReadBufferWithSize(nullptr, 0)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_uri_, hdfs_file_path_, config_, buf_size_, read_until_position_))
{
}
std::optional<size_t> ReadBufferFromHDFS::getTotalSize()
{
return impl->getTotalSize();
}
bool ReadBufferFromHDFS::nextImpl()
{
@ -163,9 +166,29 @@ bool ReadBufferFromHDFS::nextImpl()
}
off_t ReadBufferFromHDFS::seek(off_t off, int whence)
off_t ReadBufferFromHDFS::seek(off_t offset_, int whence)
{
return impl->seek(off, whence);
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset_ < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
if (!working_buffer.empty()
&& size_t(offset_) >= impl->getPosition() - working_buffer.size()
&& offset_ < impl->getPosition())
{
pos = working_buffer.end() - (impl->getPosition() - offset_);
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
return getPosition();
}
pos = working_buffer.end();
impl->initialize();
impl->seek(offset_, whence);
return impl->getPosition();
}

View File

@ -19,7 +19,7 @@ namespace DB
/** Accepts HDFS path to file and opens it.
* Closes file by himself (thus "owns" a file descriptor).
*/
class ReadBufferFromHDFS : public SeekableReadBuffer
class ReadBufferFromHDFS : public SeekableReadBufferWithSize
{
struct ReadBufferFromHDFSImpl;
@ -37,6 +37,8 @@ public:
off_t getPosition() override;
std::optional<size_t> getTotalSize() override;
private:
std::unique_ptr<ReadBufferFromHDFSImpl> impl;
};

View File

@ -1,10 +1,6 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
import threading
import time
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/named_collections.xml'], with_zookeeper=False, with_hdfs=True)
@ -99,61 +95,3 @@ def test_predefined_connection_configuration(started_cluster):
result = node1.query("SET max_http_get_redirects=1; select * from url(url1, url='http://hdfs1:50070/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', format='TSV', structure='id UInt32, name String, weight Float64')")
assert(result == "1\tMark\t72.53\n")
node1.query("drop table WebHDFSStorageWithRedirect")
result = ''
def test_url_reconnect_at_start(started_cluster):
hdfs_api = started_cluster.hdfs_api
with PartitionManager() as pm:
node1.query(
"insert into table function hdfs('hdfs://hdfs1:9000/storage_big', 'TSV', 'id Int32') select number from numbers(500000)")
pm._add_rule({'probability': 1, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
def select():
global result
print("reading")
result = node1.query(
"select sum(cityHash64(id)) from url('http://hdfs1:50075/webhdfs/v1/storage_big?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', 'TSV', 'id Int32') settings http_max_tries = 20, http_retry_max_backoff_ms=10000")
print(result)
thread = threading.Thread(target=select)
thread.start()
time.sleep(1)
print("delete rule")
pm._delete_rule({'probability': 1, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
thread.join()
#assert node1.contains_in_log("Error: Timeout, code:")
print(result)
result = ''
def test_url_reconnect_in_the_middle(started_cluster):
hdfs_api = started_cluster.hdfs_api
with PartitionManager() as pm:
node1.query(
"insert into table function hdfs('hdfs://hdfs1:9000/storage_big2', 'TSV', 'id Int32') select number from numbers(10000000)")
def select():
global result
print("reading")
result = node1.query(
"select sum(cityHash64(id)) from url('http://hdfs1:50075/webhdfs/v1/storage_big2?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', 'TSV', 'id Int32')")
print(result)
thread = threading.Thread(target=select)
print("add rule")
pm._add_rule({'probability': 0.3, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
thread.start()
time.sleep(0.5)
pm._add_rule({'probability': 1, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
time.sleep(3)
print("delete rule")
pm._delete_rule({'probability': 0.3, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
pm._delete_rule({'probability': 1, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
thread.join()
assert node1.contains_in_log("Error: Timeout, code:")
print(result)

View File

@ -257,6 +257,22 @@ def test_truncate_table(started_cluster):
node1.query("drop table test_truncate")
def test_seekable_formats(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_function = f"hdfs('hdfs://hdfs1:9000/parquet', 'Parquet', 'a Int32, b String')"
node1.query(f"insert into table function {table_function} SELECT number, randomString(1000) FROM numbers(5000000)")
result = node1.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000)
table_function = f"hdfs('hdfs://hdfs1:9000/orc', 'ORC', 'a Int32, b String')"
node1.query(f"insert into table function {table_function} SELECT number, randomString(1000) FROM numbers(5000000)")
result = node1.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -5,5 +5,15 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_conf1>
<s3_parquet>
<url>http://minio1:9001/root/test_parquet</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_parquet>
<s3_orc>
<url>http://minio1:9001/root/test_orc</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_orc>
</named_collections>
</clickhouse>

View File

@ -750,3 +750,20 @@ def test_predefined_connection_configuration(started_cluster):
result = instance.query("SELECT * FROM s3(s3_conf1, format='CSV', structure='id UInt32')")
assert result == instance.query("SELECT number FROM numbers(10)")
def test_seekable_formats(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(f"insert into table function {table_function} SELECT number, randomString(1000) FROM numbers(5000000)")
result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000)
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
instance.query(f"insert into table function {table_function} SELECT number, randomString(1000) FROM numbers(5000000)")
result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000)