mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Merge pull request #7780 from ClickHouse/aku/parallel-parsing
Merging parallel format parsing
This commit is contained in:
commit
762d295ab0
@ -1112,7 +1112,14 @@ private:
|
||||
/// Check if server send Exception packet
|
||||
auto packet_type = connection->checkPacket();
|
||||
if (packet_type && *packet_type == Protocol::Server::Exception)
|
||||
{
|
||||
/*
|
||||
* We're exiting with error, so it makes sense to kill the
|
||||
* input stream without waiting for it to complete.
|
||||
*/
|
||||
async_block_input->cancel(true);
|
||||
return;
|
||||
}
|
||||
|
||||
connection->sendData(block);
|
||||
processed_rows += block.rows();
|
||||
|
@ -111,6 +111,9 @@ struct Settings : public SettingsCollection<Settings>
|
||||
M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.", 0) \
|
||||
M(SettingBool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \
|
||||
\
|
||||
M(SettingBool, input_format_parallel_parsing, true, "Enable parallel parsing for some data formats.", 0) \
|
||||
M(SettingUInt64, min_chunk_bytes_for_parallel_parsing, (1024 * 1024), "The minimum chunk size in bytes, which each thread will parse in parallel.", 0) \
|
||||
\
|
||||
M(SettingUInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.", 0) \
|
||||
M(SettingUInt64, merge_tree_min_bytes_for_concurrent_read, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized.", 0) \
|
||||
M(SettingUInt64, merge_tree_min_rows_for_seek, 0, "You can skip reading more than that number of rows at the price of one seek per file.", 0) \
|
||||
|
203
dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp
Normal file
203
dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp
Normal file
@ -0,0 +1,203 @@
|
||||
#include <DataStreams/ParallelParsingBlockInputStream.h>
|
||||
#include "ParallelParsingBlockInputStream.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
void ParallelParsingBlockInputStream::segmentatorThreadFunction()
|
||||
{
|
||||
setThreadName("Segmentator");
|
||||
try
|
||||
{
|
||||
while (!finished)
|
||||
{
|
||||
const auto current_unit_number = segmentator_ticket_number % processing_units.size();
|
||||
auto & unit = processing_units[current_unit_number];
|
||||
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
segmentator_condvar.wait(lock,
|
||||
[&]{ return unit.status == READY_TO_INSERT || finished; });
|
||||
}
|
||||
|
||||
if (finished)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
assert(unit.status == READY_TO_INSERT);
|
||||
|
||||
// Segmentating the original input.
|
||||
unit.segment.resize(0);
|
||||
|
||||
const bool have_more_data = file_segmentation_engine(original_buffer,
|
||||
unit.segment, min_chunk_bytes);
|
||||
|
||||
unit.is_last = !have_more_data;
|
||||
unit.status = READY_TO_PARSE;
|
||||
scheduleParserThreadForUnitWithNumber(current_unit_number);
|
||||
++segmentator_ticket_number;
|
||||
|
||||
if (!have_more_data)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
onBackgroundException();
|
||||
}
|
||||
}
|
||||
|
||||
void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_number)
|
||||
{
|
||||
try
|
||||
{
|
||||
setThreadName("ChunkParser");
|
||||
|
||||
auto & unit = processing_units[current_unit_number];
|
||||
|
||||
/*
|
||||
* This is kind of suspicious -- the input_process_creator contract with
|
||||
* respect to multithreaded use is not clear, but we hope that it is
|
||||
* just a 'normal' factory class that doesn't have any state, and so we
|
||||
* can use it from multiple threads simultaneously.
|
||||
*/
|
||||
ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0);
|
||||
auto parser = std::make_unique<InputStreamFromInputFormat>(
|
||||
input_processor_creator(read_buffer, header, context,
|
||||
row_input_format_params, format_settings));
|
||||
|
||||
unit.block_ext.block.clear();
|
||||
unit.block_ext.block_missing_values.clear();
|
||||
|
||||
// We don't know how many blocks will be. So we have to read them all
|
||||
// until an empty block occured.
|
||||
Block block;
|
||||
while (!finished && (block = parser->read()) != Block())
|
||||
{
|
||||
unit.block_ext.block.emplace_back(block);
|
||||
unit.block_ext.block_missing_values.emplace_back(parser->getMissingValues());
|
||||
}
|
||||
|
||||
// We suppose we will get at least some blocks for a non-empty buffer,
|
||||
// except at the end of file. Also see a matching assert in readImpl().
|
||||
assert(unit.is_last || unit.block_ext.block.size() > 0);
|
||||
|
||||
std::unique_lock lock(mutex);
|
||||
unit.status = READY_TO_READ;
|
||||
reader_condvar.notify_all();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
onBackgroundException();
|
||||
}
|
||||
}
|
||||
|
||||
void ParallelParsingBlockInputStream::onBackgroundException()
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
|
||||
std::unique_lock lock(mutex);
|
||||
if (!background_exception)
|
||||
{
|
||||
background_exception = std::current_exception();
|
||||
}
|
||||
finished = true;
|
||||
reader_condvar.notify_all();
|
||||
segmentator_condvar.notify_all();
|
||||
}
|
||||
|
||||
Block ParallelParsingBlockInputStream::readImpl()
|
||||
{
|
||||
if (isCancelledOrThrowIfKilled() || finished)
|
||||
{
|
||||
/**
|
||||
* Check for background exception and rethrow it before we return.
|
||||
*/
|
||||
std::unique_lock lock(mutex);
|
||||
if (background_exception)
|
||||
{
|
||||
lock.unlock();
|
||||
cancel(false);
|
||||
std::rethrow_exception(background_exception);
|
||||
}
|
||||
|
||||
return Block{};
|
||||
}
|
||||
|
||||
const auto current_unit_number = reader_ticket_number % processing_units.size();
|
||||
auto & unit = processing_units[current_unit_number];
|
||||
|
||||
if (!next_block_in_current_unit.has_value())
|
||||
{
|
||||
// We have read out all the Blocks from the previous Processing Unit,
|
||||
// wait for the current one to become ready.
|
||||
std::unique_lock lock(mutex);
|
||||
reader_condvar.wait(lock, [&](){ return unit.status == READY_TO_READ || finished; });
|
||||
|
||||
if (finished)
|
||||
{
|
||||
/**
|
||||
* Check for background exception and rethrow it before we return.
|
||||
*/
|
||||
if (background_exception)
|
||||
{
|
||||
lock.unlock();
|
||||
cancel(false);
|
||||
std::rethrow_exception(background_exception);
|
||||
}
|
||||
|
||||
return Block{};
|
||||
}
|
||||
|
||||
assert(unit.status == READY_TO_READ);
|
||||
next_block_in_current_unit = 0;
|
||||
}
|
||||
|
||||
if (unit.block_ext.block.size() == 0)
|
||||
{
|
||||
/*
|
||||
* Can we get zero blocks for an entire segment, when the format parser
|
||||
* skips it entire content and does not create any blocks? Probably not,
|
||||
* but if we ever do, we should add a loop around the above if, to skip
|
||||
* these. Also see a matching assert in the parser thread.
|
||||
*/
|
||||
assert(unit.is_last);
|
||||
finished = true;
|
||||
return Block{};
|
||||
}
|
||||
|
||||
assert(next_block_in_current_unit.value() < unit.block_ext.block.size());
|
||||
|
||||
Block res = std::move(unit.block_ext.block.at(*next_block_in_current_unit));
|
||||
last_block_missing_values = std::move(unit.block_ext.block_missing_values[*next_block_in_current_unit]);
|
||||
|
||||
next_block_in_current_unit.value() += 1;
|
||||
|
||||
if (*next_block_in_current_unit == unit.block_ext.block.size())
|
||||
{
|
||||
// Finished reading this Processing Unit, move to the next one.
|
||||
next_block_in_current_unit.reset();
|
||||
++reader_ticket_number;
|
||||
|
||||
if (unit.is_last)
|
||||
{
|
||||
// It it was the last unit, we're finished.
|
||||
finished = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Pass the unit back to the segmentator.
|
||||
std::unique_lock lock(mutex);
|
||||
unit.status = READY_TO_INSERT;
|
||||
segmentator_condvar.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
}
|
258
dbms/src/DataStreams/ParallelParsingBlockInputStream.h
Normal file
258
dbms/src/DataStreams/ParallelParsingBlockInputStream.h
Normal file
@ -0,0 +1,258 @@
|
||||
#pragma once
|
||||
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <IO/BufferWithOwnMemory.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <Processors/Formats/IRowInputFormat.h>
|
||||
#include <Processors/Formats/InputStreamFromInputFormat.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/**
|
||||
* ORDER-PRESERVING parallel parsing of data formats.
|
||||
* It splits original data into chunks. Then each chunk is parsed by different thread.
|
||||
* The number of chunks equals to the number or parser threads.
|
||||
* The size of chunk is equal to min_chunk_bytes_for_parallel_parsing setting.
|
||||
*
|
||||
* This stream has three kinds of threads: one segmentator, multiple parsers,
|
||||
* and one reader thread -- that is, the one from which readImpl() is called.
|
||||
* They operate one after another on parts of data called "processing units".
|
||||
* One unit consists of buffer with raw data from file, filled by segmentator
|
||||
* thread. This raw data is then parsed by a parser thread to form a number of
|
||||
* Blocks. These Blocks are returned to the parent stream from readImpl().
|
||||
* After being read out, a processing unit is reused, to save on allocating
|
||||
* memory for the raw buffer. The processing units are organized into a circular
|
||||
* array to facilitate reuse and to apply backpressure on the segmentator thread
|
||||
* -- after it runs out of processing units, it has to wait for the reader to
|
||||
* read out the previous blocks.
|
||||
* The outline of what the threads do is as follows:
|
||||
* segmentator thread:
|
||||
* 1) wait for the next processing unit to become empty
|
||||
* 2) fill it with a part of input file
|
||||
* 3) start a parser thread
|
||||
* 4) repeat until eof
|
||||
* parser thread:
|
||||
* 1) parse the given raw buffer without any synchronization
|
||||
* 2) signal that the given unit is ready to read
|
||||
* 3) finish
|
||||
* readImpl():
|
||||
* 1) wait for the next processing unit to become ready to read
|
||||
* 2) take the blocks from the processing unit to return them to the caller
|
||||
* 3) signal that the processing unit is empty
|
||||
* 4) repeat until it encounters unit that is marked as "past_the_end"
|
||||
* All threads must also check for cancel/eof/exception flags.
|
||||
*/
|
||||
class ParallelParsingBlockInputStream : public IBlockInputStream
|
||||
{
|
||||
private:
|
||||
using ReadCallback = std::function<void()>;
|
||||
|
||||
using InputProcessorCreator = std::function<InputFormatPtr(
|
||||
ReadBuffer & buf,
|
||||
const Block & header,
|
||||
const Context & context,
|
||||
const RowInputFormatParams & params,
|
||||
const FormatSettings & settings)>;
|
||||
public:
|
||||
struct InputCreatorParams
|
||||
{
|
||||
const Block &sample;
|
||||
const Context &context;
|
||||
const RowInputFormatParams& row_input_format_params;
|
||||
const FormatSettings &settings;
|
||||
};
|
||||
|
||||
struct Params
|
||||
{
|
||||
ReadBuffer & read_buffer;
|
||||
const InputProcessorCreator &input_processor_creator;
|
||||
const InputCreatorParams &input_creator_params;
|
||||
FormatFactory::FileSegmentationEngine file_segmentation_engine;
|
||||
int max_threads;
|
||||
size_t min_chunk_bytes;
|
||||
};
|
||||
|
||||
explicit ParallelParsingBlockInputStream(const Params & params)
|
||||
: header(params.input_creator_params.sample),
|
||||
context(params.input_creator_params.context),
|
||||
row_input_format_params(params.input_creator_params.row_input_format_params),
|
||||
format_settings(params.input_creator_params.settings),
|
||||
input_processor_creator(params.input_processor_creator),
|
||||
min_chunk_bytes(params.min_chunk_bytes),
|
||||
original_buffer(params.read_buffer),
|
||||
// Subtract one thread that we use for segmentation and one for
|
||||
// reading. After that, must have at least two threads left for
|
||||
// parsing. See the assertion below.
|
||||
pool(std::max(2, params.max_threads - 2)),
|
||||
file_segmentation_engine(params.file_segmentation_engine)
|
||||
{
|
||||
// See comment above.
|
||||
assert(params.max_threads >= 4);
|
||||
|
||||
// One unit for each thread, including segmentator and reader, plus a
|
||||
// couple more units so that the segmentation thread doesn't spuriously
|
||||
// bump into reader thread on wraparound.
|
||||
processing_units.resize(params.max_threads + 2);
|
||||
|
||||
segmentator_thread = ThreadFromGlobalPool([this] { segmentatorThreadFunction(); });
|
||||
}
|
||||
|
||||
String getName() const override { return "ParallelParsing"; }
|
||||
|
||||
~ParallelParsingBlockInputStream() override
|
||||
{
|
||||
finishAndWait();
|
||||
}
|
||||
|
||||
void cancel(bool kill) override
|
||||
{
|
||||
/**
|
||||
* Can be called multiple times, from different threads. Saturate the
|
||||
* the kill flag with OR.
|
||||
*/
|
||||
if (kill)
|
||||
is_killed = true;
|
||||
is_cancelled = true;
|
||||
|
||||
/*
|
||||
* The format parsers themselves are not being cancelled here, so we'll
|
||||
* have to wait until they process the current block. Given that the
|
||||
* chunk size is on the order of megabytes, this should't be too long.
|
||||
* We can't call IInputFormat->cancel here, because the parser object is
|
||||
* local to the parser thread, and we don't want to introduce any
|
||||
* synchronization between parser threads and the other threads to get
|
||||
* better performance. An ideal solution would be to add a callback to
|
||||
* IInputFormat that checks whether it was cancelled.
|
||||
*/
|
||||
|
||||
finishAndWait();
|
||||
}
|
||||
|
||||
Block getHeader() const override
|
||||
{
|
||||
return header;
|
||||
}
|
||||
|
||||
protected:
|
||||
//Reader routine
|
||||
Block readImpl() override;
|
||||
|
||||
const BlockMissingValues & getMissingValues() const override
|
||||
{
|
||||
return last_block_missing_values;
|
||||
}
|
||||
|
||||
private:
|
||||
const Block header;
|
||||
const Context context;
|
||||
const RowInputFormatParams row_input_format_params;
|
||||
const FormatSettings format_settings;
|
||||
const InputProcessorCreator input_processor_creator;
|
||||
|
||||
const size_t min_chunk_bytes;
|
||||
|
||||
/*
|
||||
* This is declared as atomic to avoid UB, because parser threads access it
|
||||
* without synchronization.
|
||||
*/
|
||||
std::atomic<bool> finished{false};
|
||||
|
||||
BlockMissingValues last_block_missing_values;
|
||||
|
||||
// Original ReadBuffer to read from.
|
||||
ReadBuffer & original_buffer;
|
||||
|
||||
//Non-atomic because it is used in one thread.
|
||||
std::optional<size_t> next_block_in_current_unit;
|
||||
size_t segmentator_ticket_number{0};
|
||||
size_t reader_ticket_number{0};
|
||||
|
||||
std::mutex mutex;
|
||||
std::condition_variable reader_condvar;
|
||||
std::condition_variable segmentator_condvar;
|
||||
|
||||
// There are multiple "parsers", that's why we use thread pool.
|
||||
ThreadPool pool;
|
||||
// Reading and segmentating the file
|
||||
ThreadFromGlobalPool segmentator_thread;
|
||||
|
||||
// Function to segment the file. Then "parsers" will parse that segments.
|
||||
FormatFactory::FileSegmentationEngine file_segmentation_engine;
|
||||
|
||||
enum ProcessingUnitStatus
|
||||
{
|
||||
READY_TO_INSERT,
|
||||
READY_TO_PARSE,
|
||||
READY_TO_READ
|
||||
};
|
||||
|
||||
struct BlockExt
|
||||
{
|
||||
std::vector<Block> block;
|
||||
std::vector<BlockMissingValues> block_missing_values;
|
||||
};
|
||||
|
||||
struct ProcessingUnit
|
||||
{
|
||||
explicit ProcessingUnit()
|
||||
: status(ProcessingUnitStatus::READY_TO_INSERT)
|
||||
{
|
||||
}
|
||||
|
||||
BlockExt block_ext;
|
||||
Memory<> segment;
|
||||
std::atomic<ProcessingUnitStatus> status;
|
||||
bool is_last{false};
|
||||
};
|
||||
|
||||
std::exception_ptr background_exception = nullptr;
|
||||
|
||||
// We use deque instead of vector, because it does not require a move
|
||||
// constructor, which is absent for atomics that are inside ProcessingUnit.
|
||||
std::deque<ProcessingUnit> processing_units;
|
||||
|
||||
|
||||
void scheduleParserThreadForUnitWithNumber(size_t unit_number)
|
||||
{
|
||||
pool.scheduleOrThrowOnError(std::bind(&ParallelParsingBlockInputStream::parserThreadFunction, this, unit_number));
|
||||
}
|
||||
|
||||
void finishAndWait()
|
||||
{
|
||||
finished = true;
|
||||
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
segmentator_condvar.notify_all();
|
||||
reader_condvar.notify_all();
|
||||
}
|
||||
|
||||
if (segmentator_thread.joinable())
|
||||
segmentator_thread.join();
|
||||
|
||||
try
|
||||
{
|
||||
pool.wait();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
void segmentatorThreadFunction();
|
||||
void parserThreadFunction(size_t bucket_num);
|
||||
|
||||
// Save/log a background exception, set termination flag, wake up all
|
||||
// threads. This function is used by segmentator and parsed threads.
|
||||
// readImpl() is called from the main thread, so the exception handling
|
||||
// is different.
|
||||
void onBackgroundException();
|
||||
};
|
||||
|
||||
};
|
@ -1,8 +1,10 @@
|
||||
#include <algorithm>
|
||||
#include <Common/config.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Core/Settings.h>
|
||||
#include <DataStreams/MaterializingBlockOutputStream.h>
|
||||
#include <DataStreams/ParallelParsingBlockInputStream.h>
|
||||
#include <Formats/FormatSettings.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Processors/Formats/IRowInputFormat.h>
|
||||
@ -93,7 +95,7 @@ BlockInputStreamPtr FormatFactory::getInput(
|
||||
|
||||
if (!getCreators(name).input_processor_creator)
|
||||
{
|
||||
const auto & input_getter = getCreators(name).inout_creator;
|
||||
const auto & input_getter = getCreators(name).input_creator;
|
||||
if (!input_getter)
|
||||
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
|
||||
|
||||
@ -103,6 +105,37 @@ BlockInputStreamPtr FormatFactory::getInput(
|
||||
return input_getter(buf, sample, context, max_block_size, callback ? callback : ReadCallback(), format_settings);
|
||||
}
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
const auto & file_segmentation_engine = getCreators(name).file_segmentation_engine;
|
||||
|
||||
// Doesn't make sense to use parallel parsing with less than four threads
|
||||
// (segmentator + two parsers + reader).
|
||||
if (settings.input_format_parallel_parsing
|
||||
&& file_segmentation_engine
|
||||
&& settings.max_threads >= 4)
|
||||
{
|
||||
const auto & input_getter = getCreators(name).input_processor_creator;
|
||||
if (!input_getter)
|
||||
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
|
||||
|
||||
FormatSettings format_settings = getInputFormatSetting(settings);
|
||||
|
||||
RowInputFormatParams row_input_format_params;
|
||||
row_input_format_params.max_block_size = max_block_size;
|
||||
row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num;
|
||||
row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
|
||||
row_input_format_params.callback = std::move(callback);
|
||||
row_input_format_params.max_execution_time = settings.max_execution_time;
|
||||
row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;
|
||||
|
||||
auto input_creator_params = ParallelParsingBlockInputStream::InputCreatorParams{sample, context, row_input_format_params, format_settings};
|
||||
ParallelParsingBlockInputStream::Params params{buf, input_getter,
|
||||
input_creator_params, file_segmentation_engine,
|
||||
static_cast<int>(settings.max_threads),
|
||||
settings.min_chunk_bytes_for_parallel_parsing};
|
||||
return std::make_shared<ParallelParsingBlockInputStream>(params);
|
||||
}
|
||||
|
||||
auto format = getInputFormat(name, buf, sample, context, max_block_size, std::move(callback));
|
||||
return std::make_shared<InputStreamFromInputFormat>(std::move(format));
|
||||
}
|
||||
@ -191,7 +224,7 @@ OutputFormatPtr FormatFactory::getOutputFormat(
|
||||
|
||||
void FormatFactory::registerInputFormat(const String & name, InputCreator input_creator)
|
||||
{
|
||||
auto & target = dict[name].inout_creator;
|
||||
auto & target = dict[name].input_creator;
|
||||
if (target)
|
||||
throw Exception("FormatFactory: Input format " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
|
||||
target = std::move(input_creator);
|
||||
@ -221,6 +254,13 @@ void FormatFactory::registerOutputFormatProcessor(const String & name, OutputPro
|
||||
target = std::move(output_creator);
|
||||
}
|
||||
|
||||
void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine)
|
||||
{
|
||||
auto & target = dict[name].file_segmentation_engine;
|
||||
if (target)
|
||||
throw Exception("FormatFactory: File segmentation engine " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
|
||||
target = file_segmentation_engine;
|
||||
}
|
||||
|
||||
/// Formats for both input/output.
|
||||
|
||||
@ -249,6 +289,10 @@ void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
|
||||
void registerInputFormatProcessorTemplate(FormatFactory & factory);
|
||||
void registerOutputFormatProcessorTemplate(FormatFactory &factory);
|
||||
|
||||
/// File Segmentation Engines for parallel reading
|
||||
|
||||
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory);
|
||||
|
||||
/// Output only (presentational) formats.
|
||||
|
||||
void registerOutputFormatNull(FormatFactory & factory);
|
||||
@ -299,6 +343,7 @@ FormatFactory::FormatFactory()
|
||||
registerInputFormatProcessorTemplate(*this);
|
||||
registerOutputFormatProcessorTemplate(*this);
|
||||
|
||||
registerFileSegmentationEngineTabSeparated(*this);
|
||||
|
||||
registerOutputFormatNull(*this);
|
||||
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Core/Types.h>
|
||||
#include <DataStreams/IBlockStream_fwd.h>
|
||||
#include <IO/BufferWithOwnMemory.h>
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
@ -41,6 +42,15 @@ public:
|
||||
/// It's initial purpose was to extract payload for virtual columns from Kafka Consumer ReadBuffer.
|
||||
using ReadCallback = std::function<void()>;
|
||||
|
||||
/** Fast reading data from buffer and save result to memory.
|
||||
* Reads at least min_chunk_bytes and some more until the end of the chunk, depends on the format.
|
||||
* Used in ParallelParsingBlockInputStream.
|
||||
*/
|
||||
using FileSegmentationEngine = std::function<bool(
|
||||
ReadBuffer & buf,
|
||||
DB::Memory<> & memory,
|
||||
size_t min_chunk_bytes)>;
|
||||
|
||||
/// This callback allows to perform some additional actions after writing a single row.
|
||||
/// It's initial purpose was to flush Kafka message for each row.
|
||||
using WriteCallback = std::function<void()>;
|
||||
@ -77,10 +87,11 @@ private:
|
||||
|
||||
struct Creators
|
||||
{
|
||||
InputCreator inout_creator;
|
||||
InputCreator input_creator;
|
||||
OutputCreator output_creator;
|
||||
InputProcessorCreator input_processor_creator;
|
||||
OutputProcessorCreator output_processor_creator;
|
||||
FileSegmentationEngine file_segmentation_engine;
|
||||
};
|
||||
|
||||
using FormatsDictionary = std::unordered_map<String, Creators>;
|
||||
@ -114,6 +125,7 @@ public:
|
||||
/// Register format by its name.
|
||||
void registerInputFormat(const String & name, InputCreator input_creator);
|
||||
void registerOutputFormat(const String & name, OutputCreator output_creator);
|
||||
void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine);
|
||||
|
||||
void registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator);
|
||||
void registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator);
|
||||
|
@ -8,7 +8,6 @@
|
||||
#include <Common/PODArray.h>
|
||||
#include <Common/UTF8Helpers.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
@ -384,4 +384,81 @@ void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
|
||||
}
|
||||
}
|
||||
|
||||
void saveUpToPosition(ReadBuffer & in, DB::Memory<> & memory, char * current)
|
||||
{
|
||||
assert(current >= in.position());
|
||||
assert(current <= in.buffer().end());
|
||||
|
||||
const int old_bytes = memory.size();
|
||||
const int additional_bytes = current - in.position();
|
||||
const int new_bytes = old_bytes + additional_bytes;
|
||||
/// There are no new bytes to add to memory.
|
||||
/// No need to do extra stuff.
|
||||
if (new_bytes == 0)
|
||||
return;
|
||||
memory.resize(new_bytes);
|
||||
memcpy(memory.data() + old_bytes, in.position(), additional_bytes);
|
||||
in.position() = current;
|
||||
}
|
||||
|
||||
bool loadAtPosition(ReadBuffer & in, DB::Memory<> & memory, char * & current)
|
||||
{
|
||||
assert(current <= in.buffer().end());
|
||||
|
||||
if (current < in.buffer().end())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
saveUpToPosition(in, memory, current);
|
||||
bool loaded_more = !in.eof();
|
||||
assert(in.position() == in.buffer().begin());
|
||||
current = in.position();
|
||||
return loaded_more;
|
||||
}
|
||||
|
||||
bool fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
|
||||
{
|
||||
bool need_more_data = true;
|
||||
char * pos = in.position();
|
||||
while (loadAtPosition(in, memory, pos) && need_more_data)
|
||||
{
|
||||
pos = find_first_symbols<'\\', '\r', '\n'>(pos, in.buffer().end());
|
||||
if (pos == in.buffer().end())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (*pos == '\\')
|
||||
{
|
||||
++pos;
|
||||
if (loadAtPosition(in, memory, pos))
|
||||
{
|
||||
++pos;
|
||||
}
|
||||
}
|
||||
else if (*pos == '\n' || *pos == '\r')
|
||||
{
|
||||
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
|
||||
{
|
||||
need_more_data = false;
|
||||
}
|
||||
|
||||
++pos;
|
||||
}
|
||||
}
|
||||
saveUpToPosition(in, memory, pos);
|
||||
|
||||
return loadAtPosition(in, memory, pos);
|
||||
}
|
||||
|
||||
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory)
|
||||
{
|
||||
// We can use the same segmentation engine for TSKV.
|
||||
for (auto name : {"TabSeparated", "TSV", "TSKV"})
|
||||
{
|
||||
factory.registerFileSegmentationEngine(name, &fileSegmentationEngineTabSeparatedImpl);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -320,7 +320,7 @@ bool ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx
|
||||
}
|
||||
|
||||
/// Can be used in fileSegmentationEngine for parallel parsing of Values
|
||||
bool ValuesBlockInputFormat::skipToNextRow(size_t min_chunk_size, int balance)
|
||||
bool ValuesBlockInputFormat::skipToNextRow(size_t min_chunk_bytes, int balance)
|
||||
{
|
||||
skipWhitespaceIfAny(buf);
|
||||
if (buf.eof() || *buf.position() == ';')
|
||||
@ -328,7 +328,7 @@ bool ValuesBlockInputFormat::skipToNextRow(size_t min_chunk_size, int balance)
|
||||
bool quoted = false;
|
||||
|
||||
size_t chunk_begin_buf_count = buf.count();
|
||||
while (!buf.eof() && (balance || buf.count() - chunk_begin_buf_count < min_chunk_size))
|
||||
while (!buf.eof() && (balance || buf.count() - chunk_begin_buf_count < min_chunk_bytes))
|
||||
{
|
||||
buf.position() = find_first_symbols<'\\', '\'', ')', '('>(buf.position(), buf.buffer().end());
|
||||
if (buf.position() == buf.buffer().end())
|
||||
|
@ -60,7 +60,7 @@ private:
|
||||
|
||||
void readSuffix();
|
||||
|
||||
bool skipToNextRow(size_t min_chunk_size = 0, int balance = 0);
|
||||
bool skipToNextRow(size_t min_chunk_bytes = 0, int balance = 0);
|
||||
|
||||
private:
|
||||
PeekableReadBuffer buf;
|
||||
|
@ -21,7 +21,7 @@ void StorageSystemFormats::fillData(MutableColumns & res_columns, const Context
|
||||
for (const auto & pair : formats)
|
||||
{
|
||||
const auto & [format_name, creators] = pair;
|
||||
UInt64 has_input_format(creators.inout_creator != nullptr || creators.input_processor_creator != nullptr);
|
||||
UInt64 has_input_format(creators.input_creator != nullptr || creators.input_processor_creator != nullptr);
|
||||
UInt64 has_output_format(creators.output_creator != nullptr || creators.output_processor_creator != nullptr);
|
||||
res_columns[0]->insert(format_name);
|
||||
res_columns[1]->insert(has_input_format);
|
||||
|
@ -0,0 +1,11 @@
|
||||
<yandex>
|
||||
<logger>
|
||||
<level>trace</level>
|
||||
<log>/var/log/clickhouse-server/log.log</log>
|
||||
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
|
||||
<size>1000M</size>
|
||||
<count>10</count>
|
||||
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
|
||||
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
|
||||
</logger>
|
||||
</yandex>
|
@ -31,7 +31,7 @@ import kafka_pb2
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
instance = cluster.add_instance('instance',
|
||||
config_dir='configs',
|
||||
main_configs=['configs/kafka.xml'],
|
||||
main_configs=['configs/kafka.xml', 'configs/log_conf.xml' ],
|
||||
with_kafka=True,
|
||||
clickhouse_path_dir='clickhouse_path')
|
||||
kafka_id = ''
|
||||
@ -557,7 +557,7 @@ def test_kafka_insert(kafka_cluster):
|
||||
kafka_check_result(result, True)
|
||||
|
||||
|
||||
@pytest.mark.timeout(180)
|
||||
@pytest.mark.timeout(240)
|
||||
def test_kafka_produce_consume(kafka_cluster):
|
||||
instance.query('''
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
|
@ -0,0 +1 @@
|
||||
OK
|
48
dbms/tests/queries/0_stateless/01019_parallel_parsing_cancel.sh
Executable file
48
dbms/tests/queries/0_stateless/01019_parallel_parsing_cancel.sh
Executable file
@ -0,0 +1,48 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. $CURDIR/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS a;"
|
||||
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS b;"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="CREATE TABLE a (x UInt64) ENGINE = Memory;"
|
||||
$CLICKHOUSE_CLIENT --query="CREATE TABLE b (x UInt64) ENGINE = Memory;"
|
||||
|
||||
function thread1()
|
||||
{
|
||||
while true; do
|
||||
seq 1 11000000 | $CLICKHOUSE_CLIENT --query_id=11 --query="INSERT INTO a(x) FORMAT TSV"
|
||||
sleep 1
|
||||
$CLICKHOUSE_CLIENT --query="kill query where query_id='22'" SYNC
|
||||
|
||||
done
|
||||
}
|
||||
|
||||
function thread2()
|
||||
{
|
||||
while true; do
|
||||
seq 1 11000000 | $CLICKHOUSE_CLIENT --query_id=22 --query="INSERT INTO b(x) FORMAT TSV"
|
||||
sleep 1
|
||||
$CLICKHOUSE_CLIENT --query="kill query where query_id='11'" SYNC
|
||||
done
|
||||
}
|
||||
|
||||
|
||||
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
|
||||
export -f thread1;
|
||||
export -f thread2;
|
||||
|
||||
TIMEOUT=20
|
||||
|
||||
timeout $TIMEOUT bash -c thread1 2>&1 > /dev/null &
|
||||
timeout $TIMEOUT bash -c thread2 2>&1 > /dev/null &
|
||||
|
||||
wait
|
||||
|
||||
echo OK
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE a"
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE b"
|
||||
|
||||
|
@ -994,4 +994,18 @@ Lower values mean higher priority. Threads with low `nice` priority values are e
|
||||
|
||||
Default value: 0.
|
||||
|
||||
## input_format_parallel_parsing
|
||||
|
||||
- Type: bool
|
||||
- Default value: True
|
||||
|
||||
Enable order-preserving parallel parsing of data formats. Supported only for TSV format.
|
||||
|
||||
## min_chunk_bytes_for_parallel_parsing
|
||||
|
||||
- Type: unsigned int
|
||||
- Default value: 1 MiB
|
||||
|
||||
The minimum chunk size in bytes, which each thread will parse in parallel.
|
||||
|
||||
[Original article](https://clickhouse.yandex/docs/en/operations/settings/settings/) <!-- hide -->
|
||||
|
Loading…
Reference in New Issue
Block a user