comments + fixes for parsing

This commit is contained in:
nikitamikhaylov 2020-10-06 20:49:57 +03:00
parent d0bd4e97c9
commit a89d6bc75a
3 changed files with 72 additions and 72 deletions

View File

@ -11,6 +11,7 @@
#include <atomic>
#include <Common/ThreadPool.h>
#include <Poco/Event.h>
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
@ -20,9 +21,10 @@ namespace DB
class ParallelFormattingOutputFormat : public IOutputFormat
{
public:
/* Used to recreate formatter on every new data piece. */
/// Used to recreate formatter on every new data piece.
using InternalFormatterCreator = std::function<OutputFormatPtr(WriteBuffer & buf)>;
/// Struct to simplify constructor.
struct Params
{
WriteBuffer & out;
@ -37,14 +39,17 @@ public:
, pool(params.max_threads_for_parallel_formatting)
{
/// Just heuristic. We need one thread for collecting, one thread for receiving chunks
/// and n threads for formatting.
processing_units.resize(params.max_threads_for_parallel_formatting + 2);
collector_thread = ThreadFromGlobalPool([&] { collectorThreadFunction(); });
}
~ParallelFormattingOutputFormat() override
{
flush();
if (!IOutputFormat::finalized)
finalize();
finishAndWait();
}
@ -80,11 +85,13 @@ protected:
{
IOutputFormat::finalized = true;
addChunk(Chunk{}, ProcessingUnitType::FINALIZE);
collector_finished.wait();
}
private:
InternalFormatterCreator internal_formatter_creator;
/// Status to synchronize multiple threads.
enum ProcessingUnitStatus
{
READY_TO_INSERT,
@ -92,7 +99,7 @@ private:
READY_TO_READ
};
/// Some information about what methods to call from internal parser.
enum class ProcessingUnitType
{
START,
@ -104,22 +111,22 @@ private:
void addChunk(Chunk chunk, ProcessingUnitType type)
{
// std::cout << "AddChunk of size " << chunk.getNumRows() << std::endl;
const auto current_unit_number = writer_unit_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
{
std::unique_lock<std::mutex> lock(mutex);
writer_condvar.wait(lock,
[&]{ return unit.status == READY_TO_INSERT || formatting_finished; });
[&]{ return unit.status == READY_TO_INSERT || emergency_stop; });
}
if (emergency_stop)
return;
assert(unit.status == READY_TO_INSERT);
unit.chunk = std::move(chunk);
/// Resize memory without deallocate
/// Resize memory without deallocation.
unit.segment.resize(0);
unit.status = READY_TO_FORMAT;
unit.type = type;
@ -140,10 +147,9 @@ private:
Chunk chunk;
Memory<> segment;
size_t actual_memory_size{0};
};
std::promise<bool> finalizator{};
Poco::Event collector_finished{};
std::atomic_bool need_flush{false};
@ -154,10 +160,11 @@ private:
std::exception_ptr background_exception = nullptr;
/// We use deque, because ProcessingUnit doesn't have move or copy constructor.
std::deque<ProcessingUnit> processing_units;
std::mutex mutex;
std::atomic_bool formatting_finished{false};
std::atomic_bool emergency_stop{false};
std::atomic_size_t collector_unit_number{0};
std::atomic_size_t writer_unit_number{0};
@ -167,10 +174,7 @@ private:
void finishAndWait()
{
std::future<bool> future_finalizator = finalizator.get_future();
future_finalizator.get();
formatting_finished = true;
emergency_stop = true;
{
std::unique_lock<std::mutex> lock(mutex);
@ -201,7 +205,7 @@ private:
{
background_exception = std::current_exception();
}
formatting_finished = true;
emergency_stop = true;
writer_condvar.notify_all();
collector_condvar.notify_all();
}
@ -211,43 +215,29 @@ private:
pool.scheduleOrThrowOnError([this, ticket_number] { formatterThreadFunction(ticket_number); });
}
void waitForFormattingFinished()
{
///FIXME
while(hasChunksToWorkWith())
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
bool hasChunksToWorkWith()
{
return writer_unit_number - collector_unit_number > 0;
}
void collectorThreadFunction()
{
setThreadName("Collector");
try
{
while (!formatting_finished)
while (!emergency_stop)
{
const auto current_unit_number = collector_unit_number % processing_units.size();
// std::cout << "collecting " << current_unit_number << std::endl;
auto & unit = processing_units[current_unit_number];
{
std::unique_lock<std::mutex> lock(mutex);
collector_condvar.wait(lock,
[&]{ return unit.status == READY_TO_READ; });
[&]{ return unit.status == READY_TO_READ || emergency_stop; });
}
assert(unit.status == READY_TO_READ);
assert(unit.segment.size() > 0);
if (emergency_stop)
return;
assert(unit.status == READY_TO_READ);
/// Use this copy to after notification to stop the execution.
auto copy_if_unit_type = unit.type;
/// Do main work here.
@ -259,17 +249,17 @@ private:
++collector_unit_number;
{
/// Notify other threads.
std::lock_guard<std::mutex> lock(mutex);
unit.status = READY_TO_INSERT;
writer_condvar.notify_all();
}
/// We can exit only after writing last piece of to out buffer.
if (copy_if_unit_type == ProcessingUnitType::FINALIZE)
{
finalizator.set_value(true);
collector_finished.set();
break;
}
}
}
catch (...)
@ -286,17 +276,14 @@ private:
try
{
auto & unit = processing_units[current_unit_number];
assert(unit.status = READY_TO_FORMAT);
unit.segment.resize(1);
unit.segment.resize(0);
unit.actual_memory_size = 0;
BufferWithOutsideMemory<WriteBuffer> out_buffer(unit.segment);
auto formatter = internal_formatter_creator(out_buffer);
unit.actual_memory_size = 0;
switch (unit.type)
{
case ProcessingUnitType::START :
@ -325,7 +312,7 @@ private:
break;
}
}
/// Flush all the data to handmade buffer.
formatter->flush();
unit.actual_memory_size = out_buffer.getActualSize();
@ -334,13 +321,11 @@ private:
unit.status = READY_TO_READ;
collector_condvar.notify_all();
}
}
catch (...)
{
onBackgroundException();
}
}
};

View File

@ -1,11 +1,21 @@
#include <Processors/Formats/Impl/ParallelParsingInputFormat.h>
#include <IO/ReadHelpers.h>
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
#include <ext/scope_guard.h>
namespace DB
{
void ParallelParsingInputFormat::segmentatorThreadFunction()
void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr thread_group)
{
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachTo(thread_group);
setThreadName("Segmentator");
try
{
@ -21,9 +31,7 @@ void ParallelParsingInputFormat::segmentatorThreadFunction()
}
if (parsing_finished)
{
break;
}
assert(unit.status == READY_TO_INSERT);
@ -38,19 +46,24 @@ void ParallelParsingInputFormat::segmentatorThreadFunction()
++segmentator_ticket_number;
if (!have_more_data)
{
break;
}
}
}
catch (...)
{
onBackgroundException();
}
}
void ParallelParsingInputFormat::parserThreadFunction(size_t current_ticket_number)
void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number)
{
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachTo(thread_group);
try
{
setThreadName("ChunkParser");
@ -58,8 +71,6 @@ void ParallelParsingInputFormat::parserThreadFunction(size_t current_ticket_numb
const auto current_unit_number = current_ticket_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
assert(unit.segment.size() > 0);
/*
* This is kind of suspicious -- the input_process_creator contract with
* respect to multithreaded use is not clear, but we hope that it is

View File

@ -84,7 +84,8 @@ public:
initializePrepareEndUpMap();
segmentator_thread = ThreadFromGlobalPool([this] { segmentatorThreadFunction(); });
segmentator_thread = ThreadFromGlobalPool(
&ParallelParsingInputFormat::segmentatorThreadFunction, this, CurrentThread::getGroup());
}
~ParallelParsingInputFormat() override
@ -172,14 +173,14 @@ private:
};
const InternalParserCreator internal_parser_creator;
// Function to segment the file. Then "parsers" will parse that segments.
/// Function to segment the file. Then "parsers" will parse that segments.
FormatFactory::FileSegmentationEngine file_segmentation_engine;
const String format_name;
const size_t min_chunk_bytes;
BlockMissingValues last_block_missing_values;
//Non-atomic because it is used in one thread.
/// Non-atomic because it is used in one thread.
std::optional<size_t> next_block_in_current_unit;
size_t segmentator_ticket_number{0};
size_t reader_ticket_number{0};
@ -190,9 +191,9 @@ private:
std::atomic<bool> parsing_finished{false};
// There are multiple "parsers", that's why we use thread pool.
/// There are multiple "parsers", that's why we use thread pool.
ThreadPool pool;
// Reading and segmentating the file
/// Reading and segmentating the file
ThreadFromGlobalPool segmentator_thread;
enum ProcessingUnitStatus
@ -223,14 +224,17 @@ private:
std::exception_ptr background_exception = nullptr;
// We use deque instead of vector, because it does not require a move
// constructor, which is absent for atomics that are inside ProcessingUnit.
/// We use deque instead of vector, because it does not require a move
/// constructor, which is absent for atomics that are inside ProcessingUnit.
std::deque<ProcessingUnit> processing_units;
void scheduleParserThreadForUnitWithNumber(size_t ticket_number)
{
pool.scheduleOrThrowOnError([this, ticket_number] { parserThreadFunction(ticket_number); });
pool.scheduleOrThrowOnError([this, ticket_number, group = CurrentThread::getGroup()]()
{
parserThreadFunction(group, ticket_number);
});
}
void finishAndWait()
@ -256,13 +260,13 @@ private:
}
}
void segmentatorThreadFunction();
void parserThreadFunction(size_t current_ticket_number);
void segmentatorThreadFunction(ThreadGroupStatusPtr thread_group);
void parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number);
// Save/log a background exception, set termination flag, wake up all
// threads. This function is used by segmentator and parsed threads.
// readImpl() is called from the main thread, so the exception handling
// is different.
/// Save/log a background exception, set termination flag, wake up all
/// threads. This function is used by segmentator and parsed threads.
/// readImpl() is called from the main thread, so the exception handling
/// is different.
void onBackgroundException();
/// To store objects which will prepare and end up ReadBuffer for each format.