Merge pull request #7894 from nikitamikhaylov/segmentating_functions

Returning back segmentation engines for JSONEachRow and CSV formats.
This commit is contained in:
Nikita Mikhaylov 2019-11-25 13:12:18 +03:00 committed by GitHub
commit 7c59fd4795
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 169 additions and 41 deletions

View File

@ -292,6 +292,8 @@ void registerOutputFormatProcessorTemplate(FormatFactory &factory);
/// File Segmentation Engines for parallel reading
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory);
void registerFileSegmentationEngineCSV(FormatFactory & factory);
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory);
/// Output only (presentational) formats.
@ -344,6 +346,8 @@ FormatFactory::FormatFactory()
registerOutputFormatProcessorTemplate(*this);
registerFileSegmentationEngineTabSeparated(*this);
registerFileSegmentationEngineCSV(*this);
registerFileSegmentationEngineJSONEachRow(*this);
registerOutputFormatNull(*this);

View File

@ -1053,4 +1053,36 @@ void skipToUnescapedNextLineOrEOF(ReadBuffer & buf)
}
}
/// TODO (akuzm) - write comments for this and next function.
void saveUpToPosition(ReadBuffer & in, DB::Memory<> & memory, char * current)
{
assert(current >= in.position());
assert(current <= in.buffer().end());
const int old_bytes = memory.size();
const int additional_bytes = current - in.position();
const int new_bytes = old_bytes + additional_bytes;
/// There are no new bytes to add to memory.
/// No need to do extra stuff.
if (new_bytes == 0)
return;
memory.resize(new_bytes);
memcpy(memory.data() + old_bytes, in.position(), additional_bytes);
in.position() = current;
}
bool loadAtPosition(ReadBuffer & in, DB::Memory<> & memory, char * & current)
{
assert(current <= in.buffer().end());
if (current < in.buffer().end())
return true;
saveUpToPosition(in, memory, current);
bool loaded_more = !in.eof();
assert(in.position() == in.buffer().begin());
current = in.position();
return loaded_more;
}
}

View File

@ -924,4 +924,8 @@ if (method == DB::CompressionMethod::Gzip)
return std::make_unique<TReadBuffer>(args...);
}
/// TODO (akuzm) - write comments for this and next function.
void saveUpToPosition(ReadBuffer & in, DB::Memory<> & memory, char * current);
bool loadAtPosition(ReadBuffer & in, DB::Memory<> & memory, char * & current);
}

View File

@ -422,4 +422,64 @@ void registerInputFormatProcessorCSV(FormatFactory & factory)
}
}
bool fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
char * pos = in.position();
bool quotes = false;
bool need_more_data = true;
while (loadAtPosition(in, memory, pos) && need_more_data)
{
if (quotes)
{
pos = find_first_symbols<'"'>(pos, in.buffer().end());
if (pos == in.buffer().end())
continue;
if (*pos == '"')
{
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '"')
++pos;
else
quotes = false;
}
}
else
{
pos = find_first_symbols<'"', '\r', '\n'>(pos, in.buffer().end());
if (pos == in.buffer().end())
continue;
if (*pos == '"')
{
quotes = true;
++pos;
}
else if (*pos == '\n')
{
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
need_more_data = false;
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\r')
++pos;
}
else if (*pos == '\r')
{
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
need_more_data = false;
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\n')
++pos;
}
}
}
saveUpToPosition(in, memory, pos);
return loadAtPosition(in, memory, pos);
}
void registerFileSegmentationEngineCSV(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("CSV", &fileSegmentationEngineCSVImpl);
}
}

View File

@ -270,4 +270,69 @@ void registerInputFormatProcessorJSONEachRow(FormatFactory & factory)
});
}
bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
skipWhitespaceIfAny(in);
char * pos = in.position();
size_t balance = 0;
bool quotes = false;
while (loadAtPosition(in, memory, pos) && (balance || memory.size() + static_cast<size_t>(pos - in.position()) < min_chunk_size))
{
if (quotes)
{
pos = find_first_symbols<'\\', '"'>(pos, in.buffer().end());
if (pos == in.buffer().end())
continue;
if (*pos == '\\')
{
++pos;
if (loadAtPosition(in, memory, pos))
++pos;
}
else if (*pos == '"')
{
++pos;
quotes = false;
}
}
else
{
pos = find_first_symbols<'{', '}', '\\', '"'>(pos, in.buffer().end());
if (pos == in.buffer().end())
continue;
if (*pos == '{')
{
++balance;
++pos;
}
else if (*pos == '}')
{
--balance;
++pos;
}
else if (*pos == '\\')
{
++pos;
if (loadAtPosition(in, memory, pos))
++pos;
}
else if (*pos == '"')
{
quotes = true;
++pos;
}
}
}
saveUpToPosition(in, memory, pos);
return loadAtPosition(in, memory, pos);
}
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("JSONEachRow", &fileSegmentationEngineJSONEachRowImpl);
}
}

View File

@ -384,69 +384,32 @@ void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
}
}
void saveUpToPosition(ReadBuffer & in, DB::Memory<> & memory, char * current)
{
assert(current >= in.position());
assert(current <= in.buffer().end());
const int old_bytes = memory.size();
const int additional_bytes = current - in.position();
const int new_bytes = old_bytes + additional_bytes;
/// There are no new bytes to add to memory.
/// No need to do extra stuff.
if (new_bytes == 0)
return;
memory.resize(new_bytes);
memcpy(memory.data() + old_bytes, in.position(), additional_bytes);
in.position() = current;
}
bool loadAtPosition(ReadBuffer & in, DB::Memory<> & memory, char * & current)
{
assert(current <= in.buffer().end());
if (current < in.buffer().end())
{
return true;
}
saveUpToPosition(in, memory, current);
bool loaded_more = !in.eof();
assert(in.position() == in.buffer().begin());
current = in.position();
return loaded_more;
}
bool fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
bool need_more_data = true;
char * pos = in.position();
while (loadAtPosition(in, memory, pos) && need_more_data)
{
pos = find_first_symbols<'\\', '\r', '\n'>(pos, in.buffer().end());
if (pos == in.buffer().end())
{
continue;
}
if (*pos == '\\')
{
++pos;
if (loadAtPosition(in, memory, pos))
{
++pos;
}
}
else if (*pos == '\n' || *pos == '\r')
{
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
{
need_more_data = false;
}
++pos;
}
}
saveUpToPosition(in, memory, pos);
return loadAtPosition(in, memory, pos);

View File

@ -999,7 +999,7 @@ Default value: 0.
- Type: bool
- Default value: True
Enable order-preserving parallel parsing of data formats. Supported only for TSV format.
Enable order-preserving parallel parsing of data formats. Supported only for TSV, TKSV, CSV and JSONEachRow formats.
## min_chunk_bytes_for_parallel_parsing