This commit is contained in:
Nikita Mikhaylov 2020-08-14 03:34:35 +03:00 committed by nikitamikhaylov
parent 5a47928431
commit 9922324787
4 changed files with 37 additions and 57 deletions

View File

@ -121,6 +121,7 @@ class IColumn;
\
M(Bool, input_format_parallel_parsing, true, "Enable parallel parsing for some data formats.", 0) \
M(UInt64, min_chunk_bytes_for_parallel_parsing, (10 * 1024 * 1024), "The minimum chunk size in bytes, which each thread will parse in parallel.", 0) \
M(Bool, output_format_parallel_formatting, false, "Enable parallel formatting for all data formats.", 0) \
\
M(UInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.", 0) \
M(UInt64, merge_tree_min_bytes_for_concurrent_read, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized.", 0) \

View File

@ -234,7 +234,8 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
}
bool parallel_formatting = true;
const Settings & settings = context.getSettingsRef();
bool parallel_formatting = settings.output_format_parallel_formatting;
if (parallel_formatting)
{
@ -242,7 +243,6 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
if (!output_getter)
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getOutputFormatSetting(settings, context);
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,

View File

@ -159,18 +159,25 @@ protected:
Memory<> & memory;
public:
BufferWithOutsideMemory(Memory<> & memory_)
explicit BufferWithOutsideMemory(Memory<> & memory_)
: Base(nullptr, 0), memory(memory_)
{
Base::set(memory.data(), memory.size());
Base::padded = false;
}
size_t getActualSize()
{
return Base::count();
}
private:
void nextImpl() override final
{
std::cout << "nextImpl" << std::endl;
const size_t prev_size = memory.size();
memory.resize(2 * prev_size);
Base::set(memory.data(), memory.size());
Base::set(memory.data() + prev_size, memory.size() - prev_size);
}
};

View File

@ -17,7 +17,7 @@ namespace DB
{
const size_t min_chunk_bytes_for_parallel_formatting = 1024;
const size_t max_threads_for_parallel_formatting = 4;
const size_t max_threads_for_parallel_formatting = 6;
class ParallelFormattingOutputFormat : public IOutputFormat
{
@ -38,15 +38,14 @@ public:
, pool(max_threads_for_parallel_formatting)
{
// std::cout << "ParallelFormattingOutputFormat::constructor" << std::endl;
processing_units.resize(max_threads_for_parallel_formatting + 2);
collector_thread = ThreadFromGlobalPool([this] { collectorThreadFunction(); });
collector_thread = ThreadFromGlobalPool([&] { collectorThreadFunction(); });
}
~ParallelFormattingOutputFormat() override
{
std::cout << "ParallelFormattingOutputFormat::destructor" << std::endl;
flush();
finishAndWait();
}
@ -54,45 +53,23 @@ public:
void flush() override final
{
std::cout << "flush()" << std::endl;
///FIXME
while(writer_unit_number - collector_unit_number > 1)
{
std::cout << writer_unit_number << ' ' << collector_unit_number << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
waitForFormattingFinished();
IOutputFormat::flush();
}
protected:
void consume(Chunk chunk) override final
{
// std::cout << StackTrace().toString() << std::endl;
std::cout << chunk.dumpStructure() << std::endl;
if (chunk.empty())
{
// std::cout << "Empty chunk" << std::endl;
formatting_finished = true;
}
const auto current_unit_number = writer_unit_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
// std::cout << "Consume " << current_unit_number << " before wait" << std::endl;
{
std::unique_lock<std::mutex> lock(mutex);
writer_condvar.wait(lock,
[&]{ return unit.status == READY_TO_INSERT || formatting_finished; });
}
// std::cout << "Consume " << current_unit_number << " after wait" << std::endl;
assert(unit.status == READY_TO_INSERT);
unit.chunk = std::move(chunk);
@ -127,6 +104,7 @@ private:
std::atomic<ProcessingUnitStatus> status;
Chunk chunk;
Memory<> segment;
size_t actual_memory_size{0};
};
// There are multiple "formatters", that's why we use thread pool.
@ -149,9 +127,6 @@ private:
void finishAndWait()
{
// std::cout << "finishAndWait()" << std::endl;
// std::cout << StackTrace().toString() << std::endl;
formatting_finished = true;
{
@ -176,7 +151,6 @@ private:
void onBackgroundException()
{
// std::cout << "onBackgroundException" << std::endl;
tryLogCurrentException(__PRETTY_FUNCTION__);
std::unique_lock<std::mutex> lock(mutex);
@ -194,6 +168,21 @@ private:
pool.scheduleOrThrowOnError([this, ticket_number] { formatterThreadFunction(ticket_number); });
}
void waitForFormattingFinished()
{
///FIXME
while(hasChunksToWorkWith())
{
std::cout << writer_unit_number << ' ' << collector_unit_number << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
bool hasChunksToWorkWith()
{
return writer_unit_number - collector_unit_number > 0;
}
void collectorThreadFunction()
{
setThreadName("Collector");
@ -206,36 +195,21 @@ private:
auto &unit = processing_units[current_unit_number];
// std::cout << "Collector " << current_unit_number << " before wait" << std::endl;
{
std::unique_lock<std::mutex> lock(mutex);
collector_condvar.wait(lock,
[&]{ return unit.status == READY_TO_READ || formatting_finished; });
}
// std::cout << "Collector " << current_unit_number << " after wait" << std::endl;
if (formatting_finished)
{
std::cout << "formatting finished" << std::endl;
break;
}
assert(unit.status == READY_TO_READ);
assert(unit.segment.size() > 0);
// for (size_t i=0; i < unit.segment.size(); ++i)
// {
// std::cout << *(unit.segment.data() + i);
// }
// std::cout << std::endl;
/// Do main work here.
out.write(unit.segment.data(), unit.segment.size());
flush();
out.write(unit.segment.data(), unit.actual_memory_size);
out.sync();
++collector_unit_number;
@ -257,13 +231,11 @@ private:
try
{
// std::cout << "Formatter " << current_unit_number << std::endl;
auto & unit = processing_units[current_unit_number];
assert(unit.status = READY_TO_FORMAT);
unit.segment.resize(10 * DBMS_DEFAULT_BUFFER_SIZE);
unit.segment.resize(DBMS_DEFAULT_BUFFER_SIZE);
/// TODO: Implement proper nextImpl
BufferWithOutsideMemory<WriteBuffer> out_buffer(unit.segment);
@ -271,8 +243,9 @@ private:
auto formatter = internal_formatter_creator(out_buffer);
formatter->consume(std::move(unit.chunk));
formatter->flush();
// std::cout << "Formatter " << current_unit_number << " before notify" << std::endl;
unit.actual_memory_size = out_buffer.getActualSize();
{
std::lock_guard<std::mutex> lock(mutex);
@ -280,7 +253,6 @@ private:
collector_condvar.notify_all();
}
// std::cout << "Formatter " << current_unit_number << " after notify" << std::endl;
}
catch (...)
{