Support parallel parsing for LineAsString input format

This commit is contained in:
avogar 2022-10-28 18:44:36 +00:00
parent d7eee86827
commit fe0aea2e3a
6 changed files with 79 additions and 39 deletions

View File

@ -0,0 +1,50 @@
#include <Formats/newLineSegmentationEngine.h>
#include <IO/ReadHelpers.h>
#include <base/find_symbols.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
std::pair<bool, size_t> newLineFileSegmentationEngine(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
{
char * pos = in.position();
bool need_more_data = true;
size_t number_of_rows = 0;
while (loadAtPosition(in, memory, pos) && need_more_data)
{
pos = find_first_symbols<'\r', '\n'>(pos, in.buffer().end());
if (pos > in.buffer().end())
throw Exception("Position in buffer is out of bounds. There must be a bug.", ErrorCodes::LOGICAL_ERROR);
else if (pos == in.buffer().end())
continue;
++number_of_rows;
if ((memory.size() + static_cast<size_t>(pos - in.position()) >= min_bytes) || (number_of_rows == max_rows))
need_more_data = false;
if (*pos == '\n')
{
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\r')
++pos;
}
else if (*pos == '\r')
{
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\n')
++pos;
}
}
saveUpToPosition(in, memory, pos);
return {loadAtPosition(in, memory, pos), number_of_rows};
}
}

View File

@ -0,0 +1,9 @@
#pragma once
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
namespace DB
{
std::pair<bool, size_t> newLineFileSegmentationEngine(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows);
}

View File

@ -18,6 +18,7 @@ void registerFileSegmentationEngineJSONCompactEachRow(FormatFactory & factory);
#if USE_HIVE
void registerFileSegmentationEngineHiveText(FormatFactory & factory);
#endif
void registerFileSegmentationEngineLineAsString(FormatFactory & factory);
/// Formats for both input/output.
@ -153,6 +154,7 @@ void registerFormats()
#if USE_HIVE
registerFileSegmentationEngineHiveText(factory);
#endif
registerFileSegmentationEngineLineAsString(factory);
registerInputFormatNative(factory);

View File

@ -1,5 +1,5 @@
#include <Processors/Formats/Impl/LineAsStringRowInputFormat.h>
#include <Formats/JSONUtils.h>
#include <Formats/newLineSegmentationEngine.h>
#include <base/find_symbols.h>
#include <IO/ReadHelpers.h>
#include <Columns/ColumnString.h>
@ -63,6 +63,12 @@ void registerInputFormatLineAsString(FormatFactory & factory)
});
}
void registerFileSegmentationEngineLineAsString(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("LineAsString", &newLineFileSegmentationEngine);
}
void registerLineAsStringSchemaReader(FormatFactory & factory)
{
factory.registerExternalSchemaReader("LineAsString", [](

View File

@ -3,6 +3,7 @@
#include <Processors/Formats/Impl/RegexpRowInputFormat.h>
#include <DataTypes/Serializations/SerializationNullable.h>
#include <Formats/EscapingRuleUtils.h>
#include <Formats/newLineSegmentationEngine.h>
#include <IO/ReadHelpers.h>
namespace DB
@ -178,46 +179,9 @@ void registerInputFormatRegexp(FormatFactory & factory)
});
}
static std::pair<bool, size_t> fileSegmentationEngineRegexpImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
{
char * pos = in.position();
bool need_more_data = true;
size_t number_of_rows = 0;
while (loadAtPosition(in, memory, pos) && need_more_data)
{
pos = find_first_symbols<'\r', '\n'>(pos, in.buffer().end());
if (pos > in.buffer().end())
throw Exception("Position in buffer is out of bounds. There must be a bug.", ErrorCodes::LOGICAL_ERROR);
else if (pos == in.buffer().end())
continue;
++number_of_rows;
if ((memory.size() + static_cast<size_t>(pos - in.position()) >= min_bytes) || (number_of_rows == max_rows))
need_more_data = false;
if (*pos == '\n')
{
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\r')
++pos;
}
else if (*pos == '\r')
{
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\n')
++pos;
}
}
saveUpToPosition(in, memory, pos);
return {loadAtPosition(in, memory, pos), number_of_rows};
}
void registerFileSegmentationEngineRegexp(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("Regexp", &fileSegmentationEngineRegexpImpl);
factory.registerFileSegmentationEngine("Regexp", &newLineFileSegmentationEngine);
}
void registerRegexpSchemaReader(FormatFactory & factory)

View File

@ -0,0 +1,9 @@
<test>
<fill_query>INSERT INTO FUNCTION file(test_line_as_string.tsv) SELECT randomString(1000) FROM numbers(1000000) SETTINGS engine_file_truncate_on_insert=1</fill_query>
<query>SELECT * FROM file(test_line_as_string.tsv, LineAsString) FORMAT Null</query>
<drop_query>INSERT INTO FUNCTION file(test_line_as_string.tsv) SELECT * FROM numbers(0) SETTINGS engine_file_truncate_on_insert=1</drop_query>
</test>