first try

This commit is contained in:
Nikita Mikhaylov 2020-06-12 02:00:49 +03:00 committed by nikitamikhaylov
parent 0a508c7b8a
commit 9f127a46c7
8 changed files with 238 additions and 58 deletions

View File

@ -15,7 +15,7 @@
namespace ProfileEvents namespace ProfileEvents
{ {
extern const Event ArenaAllocMemoryChunks; extern const Event ArenaAllocChunks;
extern const Event ArenaAllocBytes; extern const Event ArenaAllocBytes;
} }
@ -45,18 +45,16 @@ private:
char * end; /// does not include padding. char * end; /// does not include padding.
MemoryChunk * prev; MemoryChunk * prev;
MemoryChunk * next;
MemoryChunk(size_t size_, MemoryChunk * prev_) MemoryChunk(size_t size_, MemoryChunk * prev_)
{ {
ProfileEvents::increment(ProfileEvents::ArenaAllocMemoryChunks); ProfileEvents::increment(ProfileEvents::ArenaAllocChunks);
ProfileEvents::increment(ProfileEvents::ArenaAllocBytes, size_); ProfileEvents::increment(ProfileEvents::ArenaAllocBytes, size_);
begin = reinterpret_cast<char *>(Allocator<false>::alloc(size_)); begin = reinterpret_cast<char *>(Allocator<false>::alloc(size_));
pos = begin; pos = begin;
end = begin + size_ - pad_right; end = begin + size_ - pad_right;
prev = prev_; prev = prev_;
prev->next = this;
ASAN_POISON_MEMORY_REGION(begin, size_); ASAN_POISON_MEMORY_REGION(begin, size_);
} }

View File

@ -5,7 +5,6 @@
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <DataStreams/MaterializingBlockOutputStream.h> #include <DataStreams/MaterializingBlockOutputStream.h>
#include <DataStreams/ParallelParsingBlockInputStream.h>
#include <DataStreams/SquashingBlockOutputStream.h> #include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h> #include <DataStreams/NativeBlockInputStream.h>
#include <Formats/FormatSettings.h> #include <Formats/FormatSettings.h>
@ -17,6 +16,7 @@
#include <Processors/Formats/Impl/MySQLOutputFormat.h> #include <Processors/Formats/Impl/MySQLOutputFormat.h>
#include <Processors/Formats/Impl/NativeFormat.cpp> #include <Processors/Formats/Impl/NativeFormat.cpp>
#include <Processors/Formats/Impl/ParallelParsingInputFormat.h> #include <Processors/Formats/Impl/ParallelParsingInputFormat.h>
#include <Processors/Formats/Impl/ParallelFormattingOutputFormat.h>
#include <Poco/URI.h> #include <Poco/URI.h>
#if !defined(ARCADIA_BUILD) #if !defined(ARCADIA_BUILD)
@ -233,10 +233,37 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
sample); sample);
} }
auto format = getOutputFormat(name, buf, sample, context, std::move(callback),
format_settings); bool parallel_formatting = true;
return std::make_shared<MaterializingBlockOutputStream>(
std::make_shared<OutputStreamToOutputFormat>(format), sample); if (parallel_formatting && name == "CSV")
{
const auto & output_getter = getCreators(name).output_processor_creator;
if (!output_getter)
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getOutputFormatSetting(settings, context);
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
auto formatter_creator = [output_getter, sample, callback, format_settings]
(WriteBuffer & output) -> OutputFormatPtr
{ return output_getter(output, sample, std::move(callback), format_settings);};
/// Enable auto-flush for streaming mode. Currently it is needed by INSERT WATCH query.
// if (format_settings.enable_streaming)
// format->setAutoFlush();
ParallelFormattingOutputFormat::Params params{buf, sample, formatter_creator};
auto format = std::make_shared<ParallelFormattingOutputFormat>(params);
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
}
auto format = getOutputFormat(name, buf, sample, context, std::move(callback));
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
} }

View File

@ -149,4 +149,29 @@ public:
}; };
/** Buffer that could write data to external memory which came from outside
* Template parameter: ReadBuffer or WriteBuffer
*/
template <typename Base>
class BufferWithOutsideMemory : public Base
{
protected:
Memory<> & memory;
public:
BufferWithOutsideMemory(Memory<> & memory_)
: Base(nullptr, 0), memory(memory_)
{
Base::set(memory.data(), memory.size());
Base::padded = false;
}
private:
void nextImpl() override final
{
const size_t prev_size = memory.size();
memory.resize(2 * prev_size);
Base::set(memory.data(), memory.size());
}
};
} }

View File

@ -557,7 +557,6 @@ void writeProbablyBackQuotedStringMySQL(const StringRef & s, WriteBuffer & buf);
template <char quote = '"'> template <char quote = '"'>
void writeCSVString(const char * begin, const char * end, WriteBuffer & buf) void writeCSVString(const char * begin, const char * end, WriteBuffer & buf)
{ {
std::cout << StackTrace().toString() << std::endl;
writeChar(quote, buf); writeChar(quote, buf);
const char * pos = begin; const char * pos = begin;

View File

@ -39,12 +39,15 @@ protected:
RowsBeforeLimitCounterPtr rows_before_limit_counter; RowsBeforeLimitCounterPtr rows_before_limit_counter;
friend class ParallelFormattingOutputFormat;
virtual void consume(Chunk) = 0; virtual void consume(Chunk) = 0;
virtual void consumeTotals(Chunk) {} virtual void consumeTotals(Chunk) {}
virtual void consumeExtremes(Chunk) {} virtual void consumeExtremes(Chunk) {}
virtual void finalize() {} virtual void finalize() {}
public: public:
IOutputFormat(const Block & header_, WriteBuffer & out_); IOutputFormat(const Block & header_, WriteBuffer & out_);
Status prepare() override; Status prepare() override;

View File

@ -11,7 +11,6 @@ namespace DB
CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_) CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), with_names(with_names_), format_settings(format_settings_) : IRowOutputFormat(header_, out_, params_), with_names(with_names_), format_settings(format_settings_)
{ {
std::cout << StackTrace().toString() << std::endl;
const auto & sample = getPort(PortKind::Main).getHeader(); const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns(); size_t columns = sample.columns();
data_types.resize(columns); data_types.resize(columns);

View File

@ -16,7 +16,6 @@ JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(
: IRowOutputFormat(header_, out_, params_), : IRowOutputFormat(header_, out_, params_),
settings(settings_) settings(settings_)
{ {
std::cout << StackTrace().toString() << std::endl;
const auto & sample = getPort(PortKind::Main).getHeader(); const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns(); size_t columns = sample.columns();
fields.resize(columns); fields.resize(columns);

View File

@ -3,32 +3,50 @@
#include <Processors/Formats/IOutputFormat.h> #include <Processors/Formats/IOutputFormat.h>
#include <Common/Arena.h> #include <Common/Arena.h>
#include <IO/WriteBufferFromArena.h> #include <IO/WriteBufferFromArena.h>
#include <Formats/FormatFactory.h>
#include <deque> #include <deque>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <atomic> #include <atomic>
#include <Common/ThreadPool.h>
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
namespace DB namespace DB
{ {
const size_t min_chunk_bytes_for_parallel_formatting = 1024; const size_t min_chunk_bytes_for_parallel_formatting_ = 1024;
const size_t max_threads_for_parallel_formatting_ = 4;
class ParallelFormattingOutputFormat : public IOutputFormat class ParallelFormattingOutputFormat : public IOutputFormat
{ {
public: public:
/* Used to recreate formatter on every new data piece. */
using InternalFormatterCreator = std::function<OutputFormatPtr(WriteBuffer & buf)>;
struct Params struct Params
{ {
const Block & header; WriteBuffer & out_;
WriteBuffer & out; const Block & header_;
InternalFormatterCreator internal_formatter_creator_;
}; };
explicit ParallelFormattingOutputFormat(Params params) explicit ParallelFormattingOutputFormat(Params params)
: IOutputFormat(params.header, params.out) : IOutputFormat(params.header_, params.out_)
, original_write_buffer(params.out) , internal_formatter_creator(params.internal_formatter_creator_)
{ , pool(max_threads_for_parallel_formatting_)
{
// std::cout << "ParallelFormattingOutputFormat::constructor" << std::endl;
processing_units.resize(max_threads_for_parallel_formatting_ + 2);
collector_thread = ThreadFromGlobalPool([this] { сollectorThreadFunction(); });
}
~ParallelFormattingOutputFormat() override
{
finishAndWait();
} }
String getName() const override final { return "ParallelFormattingOutputFormat"; } String getName() const override final { return "ParallelFormattingOutputFormat"; }
@ -36,31 +54,46 @@ public:
protected: protected:
void consume(Chunk chunk) override final void consume(Chunk chunk) override final
{ {
std::cout << StackTrace().toString() << std::endl;
if (chunk.empty()) if (chunk.empty())
{
// std::cout << "Empty chunk" << std::endl;
formatting_finished = true; formatting_finished = true;
}
const auto current_unit_number = writer_unit_number % processing_units.size(); const auto current_unit_number = writer_unit_number % processing_units.size();
auto & unit = processing_units[current_unit_number]; auto & unit = processing_units[current_unit_number];
// std::cout << "Consume " << current_unit_number << " before wait" << std::endl;
{ {
std::unique_lock<std::mutex> lock(mutex); std::unique_lock<std::mutex> lock(mutex);
writer_condvar.wait(lock, writer_condvar.wait(lock,
[&]{ return unit.status == READY_TO_INSERT || formatting_finished; }); [&]{ return unit.status == READY_TO_INSERT || formatting_finished; });
} }
// std::cout << "Consume " << current_unit_number << " after wait" << std::endl;
assert(unit.status == READY_TO_INSERT); assert(unit.status == READY_TO_INSERT);
unit.chunk = std::move(chunk); unit.chunk = std::move(chunk);
/// TODO: Allocate new arena for current chunk
/// Resize memory without deallocate
unit.segment.resize(0);
unit.status = READY_TO_FORMAT;
/// TODO: Submit task to ThreadPool scheduleFormatterThreadForUnitWithNumber(current_unit_number);
++writer_unit_number;
} }
private: private:
WriteBuffer & original_write_buffer; InternalFormatterCreator internal_formatter_creator;
enum ProcessingUnitStatus enum ProcessingUnitStatus
{ {
@ -71,63 +104,134 @@ private:
struct ProcessingUnit struct ProcessingUnit
{ {
explicit ProcessingUnit()
: status(ProcessingUnitStatus::READY_TO_INSERT)
{
}
std::atomic<ProcessingUnitStatus> status;
Chunk chunk; Chunk chunk;
Arena arena; Memory<> segment;
ProcessingUnitStatus status;
}; };
// There are multiple "formatters", that's why we use thread pool.
ThreadPool pool;
// Collecting all memory to original ReadBuffer
ThreadFromGlobalPool collector_thread;
std::exception_ptr background_exception = nullptr;
std::deque<ProcessingUnit> processing_units; std::deque<ProcessingUnit> processing_units;
std::mutex mutex; std::mutex mutex;
std::atomic_bool formatting_finished{false}; std::atomic_bool formatting_finished{false};
std::atomic_size_t collector_unit_number{0}; std::atomic_size_t collector_unit_number{0};
std::atomic_size_t writer_unit_number{0}; std::atomic_size_t writer_unit_number{0};
std::condition_variable collector_condvar; std::condition_variable collector_condvar;
std::condition_variable writer_condvar; std::condition_variable writer_condvar;
void finishAndWait()
{
std::cout << "finishAndWait()" << std::endl;
formatting_finished = true;
{
std::unique_lock<std::mutex> lock(mutex);
collector_condvar.notify_all();
writer_condvar.notify_all();
}
if (collector_thread.joinable())
collector_thread.join();
try
{
pool.wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void onBackgroundException()
{
// std::cout << "onBackgroundException" << std::endl;
tryLogCurrentException(__PRETTY_FUNCTION__);
std::unique_lock<std::mutex> lock(mutex);
if (!background_exception)
{
background_exception = std::current_exception();
}
formatting_finished = true;
writer_condvar.notify_all();
collector_condvar.notify_all();
}
void scheduleFormatterThreadForUnitWithNumber(size_t ticket_number)
{
pool.scheduleOrThrowOnError([this, ticket_number] { formatterThreadFunction(ticket_number); });
}
void сollectorThreadFunction() void сollectorThreadFunction()
{ {
setThreadName("Collector"); setThreadName("Collector");
while (!formatting_finished) try
{ {
const auto current_unit_number = collector_unit_number % processing_units.size(); while (!formatting_finished)
auto & unit = processing_units[current_unit_number];
{ {
std::unique_lock<std::mutex> lock(mutex); const auto current_unit_number = collector_unit_number % processing_units.size();
collector_condvar.wait(lock,
[&]{ return unit.status == READY_TO_READ || formatting_finished; }); auto &unit = processing_units[current_unit_number];
// std::cout << "Collector " << current_unit_number << " before wait" << std::endl;
{
std::unique_lock<std::mutex> lock(mutex);
collector_condvar.wait(lock,
[&]{ return unit.status == READY_TO_READ || formatting_finished; });
}
// std::cout << "Collector " << current_unit_number << " after wait" << std::endl;
if (formatting_finished)
{
// std::cout << "formatting finished" << std::endl;
break;
}
assert(unit.status == READY_TO_READ);
assert(unit.segment.size() > 0);
for (size_t i=0; i < unit.segment.size(); ++i)
{
std::cout << *(unit.segment.data() + i);
}
std::cout << std::endl;
/// Do main work here.
out.write(unit.segment.data(), unit.segment.size());
flush();
++collector_unit_number;
std::lock_guard<std::mutex> lock(mutex);
unit.status = READY_TO_INSERT;
writer_condvar.notify_all();
} }
if (formatting_finished)
{
break;
}
assert(unit.status == READY_TO_READ);
/// TODO: Arena is singly linked list, and it is simply a list of chunks.
/// We have to write them all into original WriteBuffer.
char * arena_begin = nullptr;
size_t arena_size = 0;
/// Do main work here.
original_write_buffer.write(arena_begin, arena_size);
/// How to drop this arena?
std::lock_guard<std::mutex> lock(mutex);
unit.status = READY_TO_INSERT;
writer_condvar.notify_all();
} }
catch (...)
{
onBackgroundException();
}
} }
@ -135,12 +239,38 @@ private:
{ {
setThreadName("Formatter"); setThreadName("Formatter");
auto & unit = processing_units[current_unit_number]; try
{
// std::cout << "Formatter " << current_unit_number << std::endl;
const char * arena_begin = nullptr; auto & unit = processing_units[current_unit_number];
WriteBufferFromArena out(unit.arena, arena_begin);
assert(unit.status = READY_TO_FORMAT);
unit.segment.resize(10 * DBMS_DEFAULT_BUFFER_SIZE);
/// TODO: Implement proper nextImpl
BufferWithOutsideMemory<WriteBuffer> out(unit.segment);
auto formatter = internal_formatter_creator(out);
formatter->consume(std::move(unit.chunk));
// std::cout << "Formatter " << current_unit_number << " before notify" << std::endl;
{
std::lock_guard<std::mutex> lock(mutex);
unit.status = READY_TO_READ;
collector_condvar.notify_all();
}
// std::cout << "Formatter " << current_unit_number << " after notify" << std::endl;
}
catch (...)
{
onBackgroundException();
}
/// TODO: create parser and parse current chunk to arena
} }
}; };