comments and checks

This commit is contained in:
Nikita Mikhaylov 2019-10-30 18:49:10 +03:00
parent 5bcfee117a
commit b691cc744c
6 changed files with 25 additions and 14 deletions

View File

@ -21,7 +21,13 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction()
if (is_exception_occured)
break;
if (original_buffer.eof())
// Segmentating the original input.
segments[current_unit_number].used_size = 0;
//It returns bool, but it is useless
const auto res = file_segmentation_engine(original_buffer, segments[current_unit_number].memory, segments[current_unit_number].used_size, min_chunk_size);
if (!res)
{
is_last[current_unit_number] = true;
status[current_unit_number] = READY_TO_PARSE;
@ -29,12 +35,6 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction()
break;
}
// Segmentating the original input.
segments[current_unit_number].used_size = 0;
//It returns bool, but it is useless
file_segmentation_engine(original_buffer, segments[current_unit_number].memory, segments[current_unit_number].used_size, min_chunk_size);
// Creating buffer from the segment of data.
auto new_buffer = BufferBase::Buffer(segments[current_unit_number].memory.data(),
segments[current_unit_number].memory.data() + segments[current_unit_number].used_size);

View File

@ -1053,12 +1053,15 @@ void skipToUnescapedNextLineOrEOF(ReadBuffer & buf)
}
}
bool eofWithSavingBufferState(ReadBuffer & buf, DB::Memory<> & memory, size_t & used_size, char * & begin_pos, bool save_buffer_state)
bool eofWithSavingBufferState(ReadBuffer & buf, DB::Memory<> & memory, size_t & used_size, char * & begin_pos, bool force_saving_buffer_state)
{
if (save_buffer_state || !buf.hasPendingData())
/// If there is some pending data - no need to copy data from buffer to memory.
if (force_saving_buffer_state || !buf.hasPendingData())
{
const size_t capacity = memory.size();
const size_t block_size = static_cast<size_t>(buf.position() - begin_pos);
const auto capacity = memory.size();
const auto block_size = static_cast<size_t>(buf.position() - begin_pos);
/// To avoid calling a function when not needed.
if (capacity <= block_size + used_size)
{
memory.resize(used_size + block_size);

View File

@ -912,9 +912,8 @@ void skipToNextLineOrEOF(ReadBuffer & buf);
/// Skip to next character after next unescaped \n. If no \n in stream, skip to end. Does not throw on invalid escape sequences.
void skipToUnescapedNextLineOrEOF(ReadBuffer & buf);
/** Return buffer eof() result.
* If there is no pending data in buffer or it was explicitly asked
* save current state to memory.
/** Returns buffer eof() result.
* And saves data if there is no pending data in buffer or it was explicitly asked.
*/
bool eofWithSavingBufferState(ReadBuffer & buf, DB::Memory<> & memory, size_t & used_size, char * & begin_pos, bool force_saving_buffer_state = false);

View File

@ -424,6 +424,9 @@ void registerInputFormatProcessorCSV(FormatFactory & factory)
bool fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t & used_size, size_t min_chunk_size)
{
if (in.eof())
return false;
skipWhitespacesAndTabs(in);
char * begin_pos = in.position();
bool quotes = false;

View File

@ -272,6 +272,9 @@ void registerInputFormatProcessorJSONEachRow(FormatFactory & factory)
bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t & used_size, size_t min_chunk_size)
{
if (in.eof())
return false;
skipWhitespaceIfAny(in);
char * begin_pos = in.position();
size_t balance = 0;

View File

@ -212,6 +212,9 @@ void registerInputFormatProcessorTSKV(FormatFactory & factory)
bool fileSegmentationEngineTSKVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t & used_size, size_t min_chunk_size)
{
if (in.eof())
return false;
char * begin_pos = in.position();
bool need_more_data = true;
memory.resize(min_chunk_size);