This commit is contained in:
kssenii 2021-11-13 17:18:16 +03:00
parent f18dcd2287
commit f4ffedd5f3
4 changed files with 39 additions and 73 deletions

View File

@ -597,6 +597,7 @@
M(627, BACKUP_ENGINE_NOT_FOUND) \
M(628, OFFSET_FETCH_WITHOUT_ORDER_BY) \
M(629, HTTP_RANGE_NOT_SATISFIABLE) \
M(630, UNKNOWN_FILE_SIZE) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -143,7 +143,7 @@ namespace detail
size_t getOffset() const
{
return read_range.begin + bytes_read;
return read_range.begin + offset_from_begin_pos;
}
std::istream * call(Poco::URI uri_, Poco::Net::HTTPResponse & response, const std::string & method_)
@ -172,19 +172,6 @@ namespace detail
request.set("Range", range_header_value);
}
/**
* Add range header if we have some passed range (for disk web)
* or if we want to retry GET request on purpose.
*/
bool with_partial_content = read_range.begin || read_range.end || retry_with_range_header;
if (with_partial_content)
{
if (read_range.end)
request.set("Range", fmt::format("bytes={}-{}", read_range.begin + offset_from_begin_pos, *read_range.end));
else
request.set("Range", fmt::format("bytes={}-", read_range.begin + offset_from_begin_pos));
}
if (!credentials.getUsername().empty())
credentials.authenticate(request);
@ -202,14 +189,6 @@ namespace detail
istr = receiveResponse(*sess, request, response, true);
response.getCookies(cookies);
if (with_partial_content && response.getStatus() != Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT)
{
/// If we retried some request, throw error from that request.
if (exception)
std::rethrow_exception(exception);
throw Exception(ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE, "Cannot read with range: {}", request.get("Range"));
}
content_encoding = response.get("Content-Encoding", "");
return istr;
}
@ -340,7 +319,7 @@ namespace detail
}
}
if (!bytes_read && !read_range.end && response.hasContentLength())
if (!offset_from_begin_pos && !read_range.end && response.hasContentLength())
read_range.end = read_range.begin + response.getContentLength();
try
@ -374,7 +353,7 @@ namespace detail
if (next_callback)
next_callback(count());
if (read_range.end && static_cast<size_t>(getOffset()) == read_range.end.value())
if (read_range.end && getOffset() == read_range.end.value())
return false;
if (impl)
@ -438,9 +417,9 @@ namespace detail
{
/**
* Retry request unconditionally if nothing has been read yet.
* Otherwise if it is GET method retry with range header starting from bytes_read.
* Otherwise if it is GET method retry with range header.
*/
bool can_retry_request = !bytes_read || method == Poco::Net::HTTPRequest::HTTP_GET;
bool can_retry_request = !offset_from_begin_pos || method == Poco::Net::HTTPRequest::HTTP_GET;
if (!can_retry_request)
throw;
@ -471,7 +450,7 @@ namespace detail
internal_buffer = impl->buffer();
working_buffer = internal_buffer;
bytes_read += working_buffer.size();
offset_from_begin_pos += working_buffer.size();
return true;
}
@ -488,7 +467,7 @@ namespace detail
if (offset_ < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
auto current_offset = getOffset();
off_t current_offset = getOffset();
if (!working_buffer.empty()
&& size_t(offset_) >= current_offset - working_buffer.size()
&& offset_ < current_offset)
@ -520,6 +499,7 @@ namespace detail
pos = working_buffer.end();
read_range.begin = offset_;
read_range.end = std::nullopt;
offset_from_begin_pos = 0;
return offset_;

View File

@ -22,7 +22,7 @@ namespace DB
namespace ErrorCodes
{
extern const int FILE_SIZE_UNKNOWN;
extern const int UNKNOWN_FILE_SIZE;
}
ArrowBufferedOutputStream::ArrowBufferedOutputStream(WriteBuffer & out_) : out{out_}, is_open{true}
@ -65,7 +65,7 @@ arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::GetSize()
if (buf_with_size)
file_size = buf_with_size->getTotalSize();
if (!file_size)
throw Exception(ErrorCodes::FILE_SIZE_UNKNOWN, "Cannot find out size of file");
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out size of file");
}
return arrow::Result<int64_t>(*file_size);
}

View File

@ -38,53 +38,38 @@ Chunk ORCBlockInputFormat::generate()
if (!file_reader)
prepareReader();
while (true)
if (!batch_reader)
{
if (!batch_reader)
{
arrow::Status reader_status = file_reader->NextStripeReader(
DBMS_DEFAULT_BUFFER_SIZE, include_indices, &batch_reader);
if (!reader_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Failed to create batch reader: {}",
reader_status.ToString());
}
if (!batch_reader)
break;
std::shared_ptr<arrow::RecordBatch> batch_result;
arrow::Status batch_status = batch_reader->ReadNext(&batch_result);
if (!batch_status.ok())
arrow::Status reader_status = file_reader->NextStripeReader(
DBMS_DEFAULT_BUFFER_SIZE, include_indices, &batch_reader);
if (!reader_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Error while reading batch of ORC data: {}",
batch_status.ToString());
if (!batch_result || !batch_result->num_rows())
{
batch_reader.reset();
continue;
}
auto table_result = arrow::Table::FromRecordBatches({batch_result});
if (!table_result.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Error while reading batch of ORC data: {}",
table_result.status().ToString());
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::ChunkedArray>>;
NameToColumnPtr name_to_column_ptr;
for (const auto& column_name : column_names)
{
arrow::ArrayVector vec = {batch_result->GetColumnByName(column_name)};
std::shared_ptr<arrow::ChunkedArray> arrow_column = std::make_shared<arrow::ChunkedArray>(vec);
name_to_column_ptr[column_name] = arrow_column;
}
arrow_column_to_ch_column->arrowColumnsToCHChunk(res, name_to_column_ptr);
break;
"Failed to create batch reader: {}",
reader_status.ToString());
if (!batch_reader)
return res;
}
std::shared_ptr<arrow::RecordBatch> batch_result;
arrow::Status batch_status = batch_reader->ReadNext(&batch_result);
if (!batch_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Error while reading batch of ORC data: {}",
batch_status.ToString());
if (!batch_result || !batch_result->num_rows())
return res;
ArrowColumnToCHColumn::NameToColumnPtr name_to_column_ptr;
for (const auto & column_name : column_names)
{
arrow::ArrayVector vec = {batch_result->GetColumnByName(column_name)};
std::shared_ptr<arrow::ChunkedArray> arrow_column = std::make_shared<arrow::ChunkedArray>(vec);
name_to_column_ptr[column_name] = arrow_column;
}
arrow_column_to_ch_column->arrowColumnsToCHChunk(res, name_to_column_ptr);
batch_reader.reset();
return res;
}