wip: a saner segmentation function for TSV

This commit is contained in:
Alexander Kuzmenkov 2019-11-15 21:08:17 +03:00
parent 8f3bd8f546
commit 5d5882d92b
3 changed files with 99 additions and 58 deletions

View File

@ -29,27 +29,11 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction()
// Segmentating the original input.
unit.segment.used_size = 0;
unit.segment.memory.resize(0);
const bool have_more_data = file_segmentation_engine(original_buffer,
unit.segment.memory, unit.segment.used_size, min_chunk_size);
if (!have_more_data)
{
/**
* On EOF, the buffer must be empty. We don't have to start
* the parser since we have no data, and can wake up the reader
* directly.
*/
assert(unit.segment.used_size == 0);
unit.is_past_the_end = true;
unit.status = READY_TO_READ;
std::unique_lock lock(mutex);
reader_condvar.notify_all();
break;
}
// Creating buffer from the segment of data.
auto new_buffer = BufferBase::Buffer(unit.segment.memory.data(),
unit.segment.memory.data() + unit.segment.used_size);
@ -61,9 +45,19 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction()
input_processor_creator(*unit.readbuffer, header, context, row_input_format_params, format_settings)
);
if (!have_more_data)
{
unit.is_last = true;
}
unit.status = READY_TO_PARSE;
scheduleParserThreadForUnitWithNumber(current_unit_number);
++segmentator_ticket_number;
if (!have_more_data)
{
break;
}
}
}
catch (...)
@ -142,8 +136,8 @@ Block ParallelParsingBlockInputStream::readImpl()
if (!next_block_in_current_unit.has_value())
{
// We have read out all the Blocks from the current Processing Unit,
// time to move to the next one. Wait for it to become ready.
// We have read out all the Blocks from the previous Processing Unit,
// wait for the current one to become ready.
std::unique_lock lock(mutex);
reader_condvar.wait(lock, [&](){ return unit.status == READY_TO_READ || finished; });
@ -163,34 +157,41 @@ Block ParallelParsingBlockInputStream::readImpl()
}
assert(unit.status == READY_TO_READ);
if (unit.is_past_the_end)
{
// No more data.
return Block{};
}
next_block_in_current_unit = 0;
}
assert(next_block_in_current_unit);
assert(!unit.is_past_the_end);
if (unit.block_ext.block.size() == 0)
{
assert(unit.is_last);
finished = true;
return Block{};
}
assert(next_block_in_current_unit.value() < unit.block_ext.block.size());
Block res = std::move(unit.block_ext.block.at(*next_block_in_current_unit));
last_block_missing_values = std::move(unit.block_ext.block_missing_values[*next_block_in_current_unit]);
++*next_block_in_current_unit;
next_block_in_current_unit.value() += 1;
if (*next_block_in_current_unit == unit.block_ext.block.size())
if (next_block_in_current_unit.value() == unit.block_ext.block.size())
{
/**
* Finished reading this Processing Unit, pass it back to the segmentator.
*/
// Finished reading this Processing Unit, move to the next one.
next_block_in_current_unit.reset();
++reader_ticket_number;
std::unique_lock lock(mutex);
unit.status = READY_TO_INSERT;
segmentator_condvar.notify_all();
if (unit.is_last)
{
// It it was the last unit, we're finished.
finished = true;
}
else
{
// Pass the unit back to the segmentator.
std::unique_lock lock(mutex);
unit.status = READY_TO_INSERT;
segmentator_condvar.notify_all();
}
}
return res;

View File

@ -208,7 +208,7 @@ private:
MemoryExt segment;
std::unique_ptr<InputStreamFromInputFormat> parser;
std::atomic<ProcessingUnitStatus> status;
bool is_past_the_end{false};
bool is_last{false};
};
std::exception_ptr background_exception = nullptr;

View File

@ -384,36 +384,76 @@ void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
}
}
bool fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, size_t & used_size, size_t min_chunk_size)
bool fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, size_t & used_size, size_t min_chunk_bytes)
{
if (in.eof())
return false;
char * begin_pos = in.position();
bool need_more_data = true;
memory.resize(min_chunk_size);
while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && need_more_data)
for (;;)
{
in.position() = find_first_symbols<'\\', '\r', '\n'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
if (in.eof())
{
continue;
used_size = memory.size(); // remove me
return false;
}
if (*in.position() == '\\')
const auto old_total_bytes = memory.size();
// Calculate the minimal amount of bytes we must read for this chunk.
// The chunk size may be already bigger than the required minimum, if
// we have a giant row and still haven't read up to the separator.
const auto min_bytes_needed = (min_chunk_bytes >= old_total_bytes)
? min_chunk_bytes - old_total_bytes : 0;
// The start position might be over the in.buffer().end(), it's OK --
// find_first_symbols will process this correctly and return
// in.buffer().end().
//char * next_separator = in.position() + min_bytes_needed;
bool found_separator = false;
char * chunk_end = in.position() + min_bytes_needed;
// Loop to skip the escaped line separators.
for (;;)
{
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos, true))
++in.position();
} else if (*in.position() == '\n' || *in.position() == '\r')
{
if (used_size + static_cast<size_t>(in.position() - begin_pos) >= min_chunk_size)
need_more_data = false;
++in.position();
const auto next_separator = find_first_symbols<'\r', '\n'>(chunk_end,
in.buffer().end());
assert(next_separator <= in.buffer().end());
if (next_separator == in.buffer().end())
{
// Got to end of buffer, return it.
chunk_end = in.buffer().end();
break;
}
chunk_end = next_separator + 1;
// We found a line separator character, check whether it is escaped by
// checking if there is a '\' to the left. The previous character may
// have been read on the previous loop, in this case we read it from
// 'memory' buffer.
if ((next_separator > in.position() && *(next_separator - 1) != '\\')
|| (next_separator == in.position() && memory[memory.size() - 1] != '\\'))
{
found_separator = true;
break;
}
// This is an escaped separator, loop further.
}
const auto bytes_read_now = chunk_end - in.position();
const auto new_total_bytes = old_total_bytes + bytes_read_now;
memory.resize(new_total_bytes);
memcpy(memory.data() + old_total_bytes, in.position(), bytes_read_now);
in.position() = chunk_end;
if (found_separator && new_total_bytes >= min_chunk_bytes)
{
// Found the separator and the chunk big enough so that we can
// return it.
used_size = memory.size(); //FIXME
return true;
}
// Didn't find the separator, or the chunk is not big enough. Read more
// from the file.
}
eofWithSavingBufferState(in, memory, used_size, begin_pos, true);
return true;
}
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory)