parallel parsing

This commit is contained in:
Nikita Mikhaylov 2019-10-01 13:48:46 +03:00
parent 46d8748e98
commit d47d4cd6c1
17 changed files with 373 additions and 9 deletions

View File

@ -1058,7 +1058,11 @@ private:
/// Check if server send Exception packet
auto packet_type = connection->checkPacket();
if (packet_type && *packet_type == Protocol::Server::Exception)
{
async_block_input->cancel(false);
return;
}
connection->sendData(block);
processed_rows += block.rows();

View File

@ -108,6 +108,10 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.") \
M(SettingBool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.") \
\
M(SettingBool, input_format_parallel_parsing, true, "Enable parallel parsing for several data formats (JSON, TSV, TKSV, Values, CSV).") \
M(SettingUInt64, max_threads_for_parallel_reading, 10, "The maximum number of threads to parallel reading. By default, it is set to max_threads.") \
M(SettingUInt64, min_chunk_size_for_parallel_reading, (1024 * 1024), "The minimum chunk size in bytes, which each thread tries to parse under mutex in parallel reading.") \
\
M(SettingUInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.") \
M(SettingUInt64, merge_tree_min_bytes_for_concurrent_read, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized.") \
M(SettingUInt64, merge_tree_min_rows_for_seek, 0, "You can skip reading more than that number of rows at the price of one seek per file.") \

View File

@ -5,6 +5,7 @@
#include <Common/ConcurrentBoundedQueue.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/ParallelInputsProcessor.h>
#include <IO/ReadBuffer.h>
namespace DB
@ -41,6 +42,7 @@ private:
};
public:
std::vector<std::unique_ptr<ReadBuffer>> buffers;
using ExceptionCallback = std::function<void()>;
UnionBlockInputStream(

View File

@ -3,8 +3,11 @@
#include <Interpreters/Context.h>
#include <Core/Settings.h>
#include <DataStreams/MaterializingBlockOutputStream.h>
#include <DataStreams/ParallelParsingBlockInputStream.h>
#include <Formats/FormatSettings.h>
#include <Formats/FormatFactory.h>
#include <IO/SharedReadBuffer.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Formats/OutputStreamToOutputFormat.h>
@ -90,7 +93,7 @@ BlockInputStreamPtr FormatFactory::getInput(
if (!getCreators(name).input_processor_creator)
{
const auto & input_getter = getCreators(name).inout_creator;
const auto & input_getter = getCreators(name).input_creator;
if (!input_getter)
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
@ -100,6 +103,34 @@ BlockInputStreamPtr FormatFactory::getInput(
return input_getter(buf, sample, context, max_block_size, callback ? callback : ReadCallback(), format_settings);
}
const Settings & settings = context.getSettingsRef();
const auto & file_segmentation_engine = getCreators(name).file_segmentation_engine;
if (name != "Values" && settings.input_format_parallel_parsing && file_segmentation_engine)
{
const auto & input_getter = getCreators(name).input_processor_creator;
if (!input_getter)
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
FormatSettings format_settings = getInputFormatSetting(settings);
RowInputFormatParams row_input_format_params;
row_input_format_params.max_block_size = max_block_size;
row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num;
row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
row_input_format_params.callback = std::move(callback);
row_input_format_params.max_execution_time = settings.max_execution_time;
row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;
size_t max_threads_to_use = settings.max_threads_for_parallel_reading;
if (!max_threads_to_use)
max_threads_to_use = settings.max_threads;
auto params = ParallelParsingBlockInputStream::InputCreatorParams{sample, context, row_input_format_params, format_settings};
auto builder = ParallelParsingBlockInputStream::Builder{buf, input_getter, params, file_segmentation_engine, max_threads_to_use, settings.min_chunk_size_for_parallel_reading};
return std::make_shared<ParallelParsingBlockInputStream>(builder);
}
auto format = getInputFormat(name, buf, sample, context, max_block_size, std::move(callback));
return std::make_shared<InputStreamFromInputFormat>(std::move(format));
}
@ -188,7 +219,7 @@ OutputFormatPtr FormatFactory::getOutputFormat(
void FormatFactory::registerInputFormat(const String & name, InputCreator input_creator)
{
auto & target = dict[name].inout_creator;
auto & target = dict[name].input_creator;
if (target)
throw Exception("FormatFactory: Input format " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
target = std::move(input_creator);
@ -218,6 +249,13 @@ void FormatFactory::registerOutputFormatProcessor(const String & name, OutputPro
target = std::move(output_creator);
}
void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine)
{
auto & target = dict[name].file_segmentation_engine;
if (target)
throw Exception("FormatFactory: File segmentation engine " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
target = file_segmentation_engine;
}
/// Formats for both input/output.
@ -246,6 +284,14 @@ void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
void registerInputFormatProcessorTemplate(FormatFactory & factory);
void registerOutputFormatProcessorTemplate(FormatFactory &factory);
/// File Segmentation Engines for parallel reading
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory);
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory);
void registerFileSegmentationEngineValues(FormatFactory & factory);
void registerFileSegmentationEngineCSV(FormatFactory & factory);
void registerFileSegmentationEngineTSKV(FormatFactory & factory);
/// Output only (presentational) formats.
void registerOutputFormatNull(FormatFactory & factory);
@ -298,6 +344,11 @@ FormatFactory::FormatFactory()
registerInputFormatProcessorTemplate(*this);
registerOutputFormatProcessorTemplate(*this);
registerFileSegmentationEngineJSONEachRow(*this);
registerFileSegmentationEngineTabSeparated(*this);
registerFileSegmentationEngineValues(*this);
registerFileSegmentationEngineCSV(*this);
registerFileSegmentationEngineTSKV(*this);
registerOutputFormatNull(*this);

View File

@ -2,6 +2,7 @@
#include <Core/Types.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <IO/BufferWithOwnMemory.h>
#include <functional>
#include <memory>
@ -41,6 +42,16 @@ public:
/// It's initial purpose was to extract payload for virtual columns from Kafka Consumer ReadBuffer.
using ReadCallback = std::function<void()>;
/** Fast reading data from buffer and save result to memory.
* Reads at least min_chunk_size bytes and some more until the end of the chunk, depends on the format.
* Used in SharedReadBuffer.
*/
using FileSegmentationEngine = std::function<bool(
ReadBuffer & buf,
DB::Memory<> & memory,
size_t & used_size,
size_t min_chunk_size)>;
/// This callback allows to perform some additional actions after writing a single row.
/// It's initial purpose was to flush Kafka message for each row.
using WriteCallback = std::function<void()>;
@ -77,10 +88,11 @@ private:
struct Creators
{
InputCreator inout_creator;
InputCreator input_creator;
OutputCreator output_creator;
InputProcessorCreator input_processor_creator;
OutputProcessorCreator output_processor_creator;
FileSegmentationEngine file_segmentation_engine;
};
using FormatsDictionary = std::unordered_map<String, Creators>;
@ -114,6 +126,7 @@ public:
/// Register format by its name.
void registerInputFormat(const String & name, InputCreator input_creator);
void registerOutputFormat(const String & name, OutputCreator output_creator);
void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine);
void registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator);
void registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator);

View File

@ -2,6 +2,7 @@
#include <Core/Defines.h>
#include <algorithm>
#include <iostream>
namespace DB

View File

@ -1053,4 +1053,23 @@ void skipToUnescapedNextLineOrEOF(ReadBuffer & buf)
}
}
bool eofWithSavingBufferState(ReadBuffer & buf, DB::Memory<> & memory, size_t & used_size, char * & begin_pos, bool save_buffer_state)
{
if (save_buffer_state || !buf.hasPendingData())
{
const size_t capacity = memory.size();
const size_t block_size = static_cast<size_t>(buf.position() - begin_pos);
if (capacity <= block_size + used_size)
{
memory.resize(used_size + block_size);
}
memcpy(memory.data() + used_size, begin_pos, buf.position() - begin_pos);
used_size += block_size;
bool res = buf.eof();
begin_pos = buf.position();
return res;
}
return false;
}
}

View File

@ -24,6 +24,7 @@
#include <Formats/FormatSettings.h>
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/VarInt.h>
@ -911,4 +912,10 @@ void skipToNextLineOrEOF(ReadBuffer & buf);
/// Skip to next character after next unescaped \n. If no \n in stream, skip to end. Does not throw on invalid escape sequences.
void skipToUnescapedNextLineOrEOF(ReadBuffer & buf);
/** Return buffer eof() result.
* If there is no pending data in buffer or it was explicitly asked
* save current state to memory.
*/
bool eofWithSavingBufferState(ReadBuffer & buf, DB::Memory<> & memory, size_t & used_size, char * & begin_pos, bool save_buffer_state = false);
}

View File

@ -433,4 +433,67 @@ void registerInputFormatProcessorCSV(FormatFactory & factory)
}
}
void registerFileSegmentationEngineCSV(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("CSV", [](
ReadBuffer & in,
DB::Memory<> & memory,
size_t & used_size,
size_t min_chunk_size)
{
skipWhitespacesAndTabs(in);
char * begin_pos = in.position();
bool quotes = false;
bool need_more_data = true;
memory.resize(min_chunk_size);
while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && need_more_data)
{
if (quotes)
{
in.position() = find_first_symbols<'"'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
continue;
if (*in.position() == '"')
{
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && *in.position() == '"')
++in.position();
else
quotes = false;
}
}
else
{
in.position() = find_first_symbols<'"','\r', '\n'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
continue;
if (*in.position() == '"')
{
quotes = true;
++in.position();
}
else if (*in.position() == '\n')
{
if (used_size + static_cast<size_t>(in.position() - begin_pos) >= min_chunk_size)
need_more_data = false;
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && *in.position() == '\r')
++in.position();
}
else if (*in.position() == '\r')
{
if (used_size + static_cast<size_t>(in.position() - begin_pos) >= min_chunk_size)
need_more_data = false;
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && *in.position() == '\n')
++in.position();
}
}
}
eofWithSavingBufferState(in, memory, used_size, begin_pos, true);
return true;
});
}
}

View File

@ -267,4 +267,70 @@ void registerInputFormatProcessorJSONEachRow(FormatFactory & factory)
});
}
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("JSONEachRow", [](
ReadBuffer & in,
DB::Memory<> & memory,
size_t & used_size,
size_t min_chunk_size)
{
skipWhitespaceIfAny(in);
char * begin_pos = in.position();
size_t balance = 0;
bool quotes = false;
memory.resize(min_chunk_size);
while (!eofWithSavingBufferState(in, memory, used_size, begin_pos)
&& (balance || used_size + static_cast<size_t>(in.position() - begin_pos) < min_chunk_size))
{
if (quotes)
{
in.position() = find_first_symbols<'\\', '"'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
continue;
if (*in.position() == '\\')
{
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos))
++in.position();
}
else if (*in.position() == '"')
{
++in.position();
quotes = false;
}
}
else
{
in.position() = find_first_symbols<'{', '}', '\\', '"'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
continue;
if (*in.position() == '{')
{
++balance;
++in.position();
}
else if (*in.position() == '}')
{
--balance;
++in.position();
}
else if (*in.position() == '\\')
{
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos))
++in.position();
}
else if (*in.position() == '"')
{
quotes = true;
++in.position();
}
}
}
eofWithSavingBufferState(in, memory, used_size, begin_pos, true);
return true;
});
}
}

View File

@ -205,4 +205,39 @@ void registerInputFormatProcessorTSKV(FormatFactory & factory)
});
}
void registerFileSegmentationEngineTSKV(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("TSKV", [](
ReadBuffer & in,
DB::Memory<> & memory,
size_t & used_size,
size_t min_chunk_size)
{
char * begin_pos = in.position();
bool need_more_data = true;
memory.resize(min_chunk_size);
while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && need_more_data)
{
in.position() = find_first_symbols<'\\','\r', '\n'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
continue;
if (*in.position() == '\\')
{
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos))
++in.position();
}
else if (*in.position() == '\n' || *in.position() == '\r')
{
if (used_size + static_cast<size_t>(in.position() - begin_pos) >= min_chunk_size)
need_more_data = false;
++in.position();
}
}
eofWithSavingBufferState(in, memory, used_size, begin_pos, true);
return true;
});
}
}

View File

@ -360,4 +360,46 @@ void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
}
}
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory)
{
for (auto name : {"TabSeparated", "TSV"})
{
factory.registerFileSegmentationEngine(name, [](
ReadBuffer & in,
DB::Memory<> & memory,
size_t & used_size,
size_t min_chunk_size)
{
if (in.eof())
return false;
char * begin_pos = in.position();
bool need_more_data = true;
memory.resize(min_chunk_size);
while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && need_more_data)
{
in.position() = find_first_symbols<'\\', '\r', '\n'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
{
continue;
}
if (*in.position() == '\\')
{
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos, true))
++in.position();
} else if (*in.position() == '\n' || *in.position() == '\r')
{
if (used_size + static_cast<size_t>(in.position() - begin_pos) >= min_chunk_size)
need_more_data = false;
++in.position();
}
}
eofWithSavingBufferState(in, memory, used_size, begin_pos, true);
return true;
});
}
}
}

View File

@ -8,6 +8,7 @@
#include <Common/FieldVisitors.h>
#include <Core/Block.h>
#include <Common/typeid_cast.h>
#include <common/find_symbols.h>
namespace DB
@ -162,4 +163,56 @@ void registerInputFormatProcessorValues(FormatFactory & factory)
});
}
void registerFileSegmentationEngineValues(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("Values", [](
ReadBuffer & in,
DB::Memory<> & memory,
size_t & used_size,
size_t min_chunk_size)
{
skipWhitespaceIfAny(in);
if (in.eof() || *in.position() == ';')
return false;
char * begin_pos = in.position();
int balance = 0;
bool quoted = false;
memory.resize(min_chunk_size);
while (!eofWithSavingBufferState(in, memory, used_size, begin_pos)
&& (balance || used_size + static_cast<size_t>(in.position() - begin_pos) < min_chunk_size))
{
in.position() = find_first_symbols<'\\', '\'', ')', '('>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
continue;
if (*in.position() == '\\')
{
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos))
++in.position();
}
else if (*in.position() == '\'')
{
quoted ^= true;
++in.position();
}
else if (*in.position() == ')')
{
++in.position();
if (!quoted)
--balance;
}
else if (*in.position() == '(')
{
++in.position();
if (!quoted)
++balance;
}
}
eofWithSavingBufferState(in, memory, used_size, begin_pos, true);
if (!in.eof() && *in.position() == ',')
++in.position();
return true;
});
}
}

View File

@ -21,7 +21,7 @@ void StorageSystemFormats::fillData(MutableColumns & res_columns, const Context
for (const auto & pair : formats)
{
const auto & [format_name, creators] = pair;
UInt64 has_input_format(creators.inout_creator != nullptr || creators.input_processor_creator != nullptr);
UInt64 has_input_format(creators.input_creator != nullptr || creators.input_processor_creator != nullptr);
UInt64 has_output_format(creators.output_creator != nullptr || creators.output_processor_creator != nullptr);
res_columns[0]->insert(format_name);
res_columns[1]->insert(has_input_format);

View File

@ -102,11 +102,11 @@ SELECT col1 FROM test1_00395 ORDER BY col1 ASC;
SELECT '----- Insert. Source and target columns have same types up to nullability. -----';
DROP TABLE IF EXISTS test1_00395;
CREATE TABLE test1_00395(col1 Nullable(UInt64), col2 UInt64) Engine=Memory;
DROP TABLE IF EXISTS test2;
CREATE TABLE test2(col1 UInt64, col2 Nullable(UInt64)) Engine=Memory;
DROP TABLE IF EXISTS test2_00395;
CREATE TABLE test2_00395(col1 UInt64, col2 Nullable(UInt64)) Engine=Memory;
INSERT INTO test1_00395(col1,col2) VALUES (2,7)(6,9)(5,1)(4,3)(8,2);
INSERT INTO test2(col1,col2) SELECT col1,col2 FROM test1_00395;
SELECT col1,col2 FROM test2 ORDER BY col1,col2 ASC;
INSERT INTO test2_00395(col1,col2) SELECT col1,col2 FROM test1_00395;
SELECT col1,col2 FROM test2_00395 ORDER BY col1,col2 ASC;
SELECT '----- Apply functions and aggregate functions on columns that may contain null values -----';
@ -497,4 +497,4 @@ INSERT INTO test1_00395(col1,col2) VALUES([NULL], 'ACDEFBGH');
SELECT col1, count() FROM test1_00395 GROUP BY col1 ORDER BY col1;
DROP TABLE IF EXISTS test1_00395;
DROP TABLE test2;
DROP TABLE test2_00395;

View File

@ -1,3 +1,5 @@
USE test;
DROP TABLE IF EXISTS ipv4_range;
CREATE TABLE ipv4_range(ip IPv4, cidr UInt8) ENGINE = Memory;

View File

@ -1,3 +1,5 @@
USE test;
DROP TABLE IF EXISTS ipv6_range;
CREATE TABLE ipv6_range(ip IPv6, cidr UInt8) ENGINE = Memory;