better segmenating + move read functions to avoid linker errors

This commit is contained in:
Nikita Mikhaylov 2019-11-22 16:53:26 +03:00
parent d495e282c7
commit 22a8e8efa7
4 changed files with 40 additions and 36 deletions

View File

@ -1053,4 +1053,36 @@ void skipToUnescapedNextLineOrEOF(ReadBuffer & buf)
}
}
/// TODO (akuzm) - write comments for this and next function.
void saveUpToPosition(ReadBuffer & in, DB::Memory<> & memory, char * current)
{
assert(current >= in.position());
assert(current <= in.buffer().end());
const int old_bytes = memory.size();
const int additional_bytes = current - in.position();
const int new_bytes = old_bytes + additional_bytes;
/// There are no new bytes to add to memory.
/// No need to do extra stuff.
if (new_bytes == 0)
return;
memory.resize(new_bytes);
memcpy(memory.data() + old_bytes, in.position(), additional_bytes);
in.position() = current;
}
bool loadAtPosition(ReadBuffer & in, DB::Memory<> & memory, char * & current)
{
assert(current <= in.buffer().end());
if (current < in.buffer().end())
return true;
saveUpToPosition(in, memory, current);
bool loaded_more = !in.eof();
assert(in.position() == in.buffer().begin());
current = in.position();
return loaded_more;
}
}

View File

@ -925,35 +925,7 @@ return std::make_unique<TReadBuffer>(args...);
}
/// TODO (akuzm) - write comments for this and next function.
void saveUpToPosition(ReadBuffer & in, DB::Memory<> & memory, char * current)
{
assert(current >= in.position());
assert(current <= in.buffer().end());
const int old_bytes = memory.size();
const int additional_bytes = current - in.position();
const int new_bytes = old_bytes + additional_bytes;
/// There are no new bytes to add to memory.
/// No need to do extra stuff.
if (new_bytes == 0)
return;
memory.resize(new_bytes);
memcpy(memory.data() + old_bytes, in.position(), additional_bytes);
in.position() = current;
}
bool loadAtPosition(ReadBuffer & in, DB::Memory<> & memory, char * & current)
{
assert(current <= in.buffer().end());
if (current < in.buffer().end())
return true;
saveUpToPosition(in, memory, current);
bool loaded_more = !in.eof();
assert(in.position() == in.buffer().begin());
current = in.position();
return loaded_more;
}
void saveUpToPosition(ReadBuffer & in, DB::Memory<> & memory, char * current);
bool loadAtPosition(ReadBuffer & in, DB::Memory<> & memory, char * & current);
}

View File

@ -424,8 +424,6 @@ void registerInputFormatProcessorCSV(FormatFactory & factory)
bool fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
skipWhitespacesAndTabs(in);
char * pos = in.position();
bool quotes = false;
bool need_more_data = true;
@ -474,8 +472,9 @@ bool fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_
}
}
}
loadAtPosition(in, memory, pos);
return true;
saveUpToPosition(in, memory, pos);
return loadAtPosition(in, memory, pos);
}
void registerFileSegmentationEngineCSV(FormatFactory & factory)

View File

@ -325,8 +325,9 @@ bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memor
}
}
}
loadAtPosition(in, memory, pos)
return true;
saveUpToPosition(in, memory, pos);
return loadAtPosition(in, memory, pos);
}
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory)