Merge pull request #23415 from godliness/fix-kafka-with-arrow

Fix kafka storage does not work with arrow and arrowstream format messages
This commit is contained in:
alexey-milovidov 2021-04-22 04:59:31 +03:00 committed by GitHub
commit 1fcf198cec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 45 deletions

View File

@ -24,7 +24,6 @@ namespace ErrorCodes
ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_, bool stream_) ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_, bool stream_)
: IInputFormat(header_, in_), stream{stream_} : IInputFormat(header_, in_), stream{stream_}
{ {
prepareReader();
} }
Chunk ArrowBlockInputFormat::generate() Chunk ArrowBlockInputFormat::generate()
@ -35,12 +34,18 @@ Chunk ArrowBlockInputFormat::generate()
if (stream) if (stream)
{ {
if (!stream_reader)
prepareReader();
batch_result = stream_reader->Next(); batch_result = stream_reader->Next();
if (batch_result.ok() && !(*batch_result)) if (batch_result.ok() && !(*batch_result))
return res; return res;
} }
else else
{ {
if (!file_reader)
prepareReader();
if (record_batch_current >= record_batch_total) if (record_batch_current >= record_batch_total)
return res; return res;
@ -71,7 +76,7 @@ void ArrowBlockInputFormat::resetParser()
stream_reader.reset(); stream_reader.reset();
else else
file_reader.reset(); file_reader.reset();
prepareReader(); record_batch_current = 0;
} }
void ArrowBlockInputFormat::prepareReader() void ArrowBlockInputFormat::prepareReader()

File diff suppressed because one or more lines are too long