This commit is contained in:
Nikita Mikhaylov 2020-06-11 03:36:57 +03:00 committed by nikitamikhaylov
parent f40f3ced2a
commit 0a508c7b8a
10 changed files with 213 additions and 51 deletions

View File

@ -15,7 +15,7 @@
namespace ProfileEvents
{
extern const Event ArenaAllocChunks;
extern const Event ArenaAllocMemoryChunks;
extern const Event ArenaAllocBytes;
}
@ -28,7 +28,7 @@ namespace DB
* - put lot of strings inside pool, keep their addresses;
* - addresses remain valid during lifetime of pool;
* - at destruction of pool, all memory is freed;
* - memory is allocated and freed by large chunks;
* - memory is allocated and freed by large MemoryChunks;
* - freeing parts of data is not possible (but look at ArenaWithFreeLists if you need);
*/
class Arena : private boost::noncopyable
@ -37,29 +37,31 @@ private:
/// Padding allows to use 'memcpySmallAllowReadWriteOverflow15' instead of 'memcpy'.
static constexpr size_t pad_right = 15;
/// Contiguous chunk of memory and pointer to free space inside it. Member of single-linked list.
struct alignas(16) Chunk : private Allocator<false> /// empty base optimization
/// Contiguous MemoryChunk of memory and pointer to free space inside it. Member of single-linked list.
struct alignas(16) MemoryChunk : private Allocator<false> /// empty base optimization
{
char * begin;
char * pos;
char * end; /// does not include padding.
Chunk * prev;
MemoryChunk * prev;
MemoryChunk * next;
Chunk(size_t size_, Chunk * prev_)
MemoryChunk(size_t size_, MemoryChunk * prev_)
{
ProfileEvents::increment(ProfileEvents::ArenaAllocChunks);
ProfileEvents::increment(ProfileEvents::ArenaAllocMemoryChunks);
ProfileEvents::increment(ProfileEvents::ArenaAllocBytes, size_);
begin = reinterpret_cast<char *>(Allocator<false>::alloc(size_));
pos = begin;
end = begin + size_ - pad_right;
prev = prev_;
prev->next = this;
ASAN_POISON_MEMORY_REGION(begin, size_);
}
~Chunk()
~MemoryChunk()
{
/// We must unpoison the memory before returning to the allocator,
/// because the allocator might not have asan integration, and the
@ -80,8 +82,8 @@ private:
size_t growth_factor;
size_t linear_growth_threshold;
/// Last contiguous chunk of memory.
Chunk * head;
/// Last contiguous MemoryChunk of memory.
MemoryChunk * head;
size_t size_in_bytes;
static size_t roundUpToPageSize(size_t s)
@ -89,7 +91,7 @@ private:
return (s + 4096 - 1) / 4096 * 4096;
}
/// If chunks size is less than 'linear_growth_threshold', then use exponential growth, otherwise - linear growth
/// If MemoryChunks size is less than 'linear_growth_threshold', then use exponential growth, otherwise - linear growth
/// (to not allocate too much excessive memory).
size_t nextSize(size_t min_next_size) const
{
@ -103,7 +105,7 @@ private:
{
// allocContinue() combined with linear growth results in quadratic
// behavior: we append the data by small amounts, and when it
// doesn't fit, we create a new chunk and copy all the previous data
// doesn't fit, we create a new MemoryChunk and copy all the previous data
// into it. The number of times we do this is directly proportional
// to the total size of data that is going to be serialized. To make
// the copying happen less often, round the next size up to the
@ -116,10 +118,10 @@ private:
return roundUpToPageSize(size_after_grow);
}
/// Add next contiguous chunk of memory with size not less than specified.
void NO_INLINE addChunk(size_t min_size)
/// Add next contiguous MemoryChunk of memory with size not less than specified.
void NO_INLINE addMemoryChunk(size_t min_size)
{
head = new Chunk(nextSize(min_size + pad_right), head);
head = new MemoryChunk(nextSize(min_size + pad_right), head);
size_in_bytes += head->size();
}
@ -129,7 +131,7 @@ private:
public:
Arena(size_t initial_size_ = 4096, size_t growth_factor_ = 2, size_t linear_growth_threshold_ = 128 * 1024 * 1024)
: growth_factor(growth_factor_), linear_growth_threshold(linear_growth_threshold_),
head(new Chunk(initial_size_, nullptr)), size_in_bytes(head->size())
head(new MemoryChunk(initial_size_, nullptr)), size_in_bytes(head->size())
{
}
@ -142,7 +144,7 @@ public:
char * alloc(size_t size)
{
if (unlikely(head->pos + size > head->end))
addChunk(size);
addMemoryChunk(size);
char * res = head->pos;
head->pos += size;
@ -167,7 +169,7 @@ public:
return res;
}
addChunk(size + alignment);
addMemoryChunk(size + alignment);
} while (true);
}
@ -192,8 +194,8 @@ public:
/** Begin or expand a contiguous range of memory.
* 'range_start' is the start of range. If nullptr, a new range is
* allocated.
* If there is no space in the current chunk to expand the range,
* the entire range is copied to a new, bigger memory chunk, and the value
* If there is no space in the current MemoryChunk to expand the range,
* the entire range is copied to a new, bigger memory MemoryChunk, and the value
* of 'range_start' is updated.
* If the optional 'start_alignment' is specified, the start of range is
* kept aligned to this value.
@ -207,7 +209,7 @@ public:
/*
* Allocating zero bytes doesn't make much sense. Also, a zero-sized
* range might break the invariant that the range begins at least before
* the current chunk end.
* the current MemoryChunk end.
*/
assert(additional_bytes > 0);
@ -226,19 +228,19 @@ public:
// This method only works for extending the last allocation. For lack of
// original size, check a weaker condition: that 'begin' is at least in
// the current Chunk.
// the current MemoryChunk.
assert(range_start >= head->begin);
assert(range_start < head->end);
if (head->pos + additional_bytes <= head->end)
{
// The new size fits into the last chunk, so just alloc the
// The new size fits into the last MemoryChunk, so just alloc the
// additional size. We can alloc without alignment here, because it
// only applies to the start of the range, and we don't change it.
return alloc(additional_bytes);
}
// New range doesn't fit into this chunk, will copy to a new one.
// New range doesn't fit into this MemoryChunk, will copy to a new one.
//
// Note: among other things, this method is used to provide a hack-ish
// implementation of realloc over Arenas in ArenaAllocators. It wastes a
@ -299,16 +301,16 @@ public:
return res;
}
/// Size of chunks in bytes.
/// Size of MemoryChunks in bytes.
size_t size() const
{
return size_in_bytes;
}
/// Bad method, don't use it -- the chunks are not your business, the entire
/// Bad method, don't use it -- the MemoryChunks are not your business, the entire
/// purpose of the arena code is to manage them for you, so if you find
/// yourself having to use this method, probably you're doing something wrong.
size_t remainingSpaceInCurrentChunk() const
size_t remainingSpaceInCurrentMemoryChunk() const
{
return head->remaining();
}

View File

@ -13,7 +13,6 @@
#include <Common/ThreadStatus.h>
#include <ext/scope_guard.h>
/** Very simple thread pool similar to boost::threadpool.
* Advantages:
* - catches exceptions and rethrows on wait.

View File

@ -16,7 +16,7 @@
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
#include <Processors/Formats/Impl/MySQLOutputFormat.h>
#include <Processors/Formats/Impl/NativeFormat.cpp>
#include <Processors/Formats/Impl/ParallelParsingBlockInputFormat.h>
#include <Processors/Formats/Impl/ParallelParsingInputFormat.h>
#include <Poco/URI.h>
#if !defined(ARCADIA_BUILD)
@ -201,9 +201,9 @@ InputFormatPtr FormatFactory::getInput(
{ return input_getter(input, sample, row_input_format_params, format_settings); };
ParallelParsingBlockInputFormat::Params params{
ParallelParsingInputFormat::Params params{
buf, sample, parser_creator, file_segmentation_engine, name, settings.max_threads, settings.min_chunk_bytes_for_parallel_parsing};
return std::make_shared<ParallelParsingBlockInputFormat>(params);
return std::make_shared<ParallelParsingInputFormat>(params);
}

View File

@ -35,7 +35,7 @@ private:
/// tear down the entire WriteBuffer thing and implement it again,
/// properly.
size_t continuation_size = std::max(size_t(1),
std::max(count(), arena.remainingSpaceInCurrentChunk()));
std::max(count(), arena.remainingSpaceInCurrentMemoryChunk()));
/// allocContinue method will possibly move memory region to new place and modify "begin" pointer.

View File

@ -557,6 +557,7 @@ void writeProbablyBackQuotedStringMySQL(const StringRef & s, WriteBuffer & buf);
template <char quote = '"'>
void writeCSVString(const char * begin, const char * end, WriteBuffer & buf)
{
std::cout << StackTrace().toString() << std::endl;
writeChar(quote, buf);
const char * pos = begin;

View File

@ -11,6 +11,7 @@ namespace DB
CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), with_names(with_names_), format_settings(format_settings_)
{
std::cout << StackTrace().toString() << std::endl;
const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
data_types.resize(columns);

View File

@ -16,6 +16,7 @@ JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(
: IRowOutputFormat(header_, out_, params_),
settings(settings_)
{
std::cout << StackTrace().toString() << std::endl;
const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
fields.resize(columns);

View File

@ -0,0 +1,147 @@
#pragma once
#include <Processors/Formats/IOutputFormat.h>
#include <Common/Arena.h>
#include <IO/WriteBufferFromArena.h>
#include <deque>
#include <Common/setThreadName.h>
#include <atomic>
namespace DB
{
const size_t min_chunk_bytes_for_parallel_formatting = 1024;
class ParallelFormattingOutputFormat : public IOutputFormat
{
public:
struct Params
{
const Block & header;
WriteBuffer & out;
};
explicit ParallelFormattingOutputFormat(Params params)
: IOutputFormat(params.header, params.out)
, original_write_buffer(params.out)
{
}
String getName() const override final { return "ParallelFormattingOutputFormat"; }
protected:
void consume(Chunk chunk) override final
{
if (chunk.empty())
formatting_finished = true;
const auto current_unit_number = writer_unit_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
{
std::unique_lock<std::mutex> lock(mutex);
writer_condvar.wait(lock,
[&]{ return unit.status == READY_TO_INSERT || formatting_finished; });
}
assert(unit.status == READY_TO_INSERT);
unit.chunk = std::move(chunk);
/// TODO: Allocate new arena for current chunk
/// TODO: Submit task to ThreadPool
}
private:
WriteBuffer & original_write_buffer;
enum ProcessingUnitStatus
{
READY_TO_INSERT,
READY_TO_FORMAT,
READY_TO_READ
};
struct ProcessingUnit
{
Chunk chunk;
Arena arena;
ProcessingUnitStatus status;
};
std::deque<ProcessingUnit> processing_units;
std::mutex mutex;
std::atomic_bool formatting_finished{false};
std::atomic_size_t collector_unit_number{0};
std::atomic_size_t writer_unit_number{0};
std::condition_variable collector_condvar;
std::condition_variable writer_condvar;
void сollectorThreadFunction()
{
setThreadName("Collector");
while (!formatting_finished)
{
const auto current_unit_number = collector_unit_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
{
std::unique_lock<std::mutex> lock(mutex);
collector_condvar.wait(lock,
[&]{ return unit.status == READY_TO_READ || formatting_finished; });
}
if (formatting_finished)
{
break;
}
assert(unit.status == READY_TO_READ);
/// TODO: Arena is singly linked list, and it is simply a list of chunks.
/// We have to write them all into original WriteBuffer.
char * arena_begin = nullptr;
size_t arena_size = 0;
/// Do main work here.
original_write_buffer.write(arena_begin, arena_size);
/// How to drop this arena?
std::lock_guard<std::mutex> lock(mutex);
unit.status = READY_TO_INSERT;
writer_condvar.notify_all();
}
}
void formatterThreadFunction(size_t current_unit_number)
{
setThreadName("Formatter");
auto & unit = processing_units[current_unit_number];
const char * arena_begin = nullptr;
WriteBufferFromArena out(unit.arena, arena_begin);
/// TODO: create parser and parse current chunk to arena
}
};
}

View File

@ -1,10 +1,10 @@
#include <Processors/Formats/Impl/ParallelParsingBlockInputFormat.h>
#include <Processors/Formats/Impl/ParallelParsingInputFormat.h>
#include <IO/ReadHelpers.h>
namespace DB
{
void ParallelParsingBlockInputFormat::segmentatorThreadFunction()
void ParallelParsingInputFormat::segmentatorThreadFunction()
{
setThreadName("Segmentator");
try
@ -49,7 +49,7 @@ void ParallelParsingBlockInputFormat::segmentatorThreadFunction()
}
}
void ParallelParsingBlockInputFormat::parserThreadFunction(size_t current_ticket_number)
void ParallelParsingInputFormat::parserThreadFunction(size_t current_ticket_number)
{
try
{
@ -58,6 +58,8 @@ void ParallelParsingBlockInputFormat::parserThreadFunction(size_t current_ticket
const auto current_unit_number = current_ticket_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
assert(unit.segment.size() > 0);
/*
* This is kind of suspicious -- the input_process_creator contract with
* respect to multithreaded use is not clear, but we hope that it is
@ -86,7 +88,7 @@ void ParallelParsingBlockInputFormat::parserThreadFunction(size_t current_ticket
// We suppose we will get at least some blocks for a non-empty buffer,
// except at the end of file. Also see a matching assert in readImpl().
assert(unit.is_last || !unit.chunk_ext.chunk.empty());
assert(unit.is_last || !unit.chunk_ext.chunk.empty() || parsing_finished);
if (unit.is_last)
endUpReadBuffer(read_buffer);
@ -102,7 +104,7 @@ void ParallelParsingBlockInputFormat::parserThreadFunction(size_t current_ticket
}
void ParallelParsingBlockInputFormat::onBackgroundException()
void ParallelParsingInputFormat::onBackgroundException()
{
tryLogCurrentException(__PRETTY_FUNCTION__);
@ -116,7 +118,7 @@ void ParallelParsingBlockInputFormat::onBackgroundException()
segmentator_condvar.notify_all();
}
Chunk ParallelParsingBlockInputFormat::generate()
Chunk ParallelParsingInputFormat::generate()
{
if (isCancelled() || parsing_finished)
{
@ -206,17 +208,17 @@ Chunk ParallelParsingBlockInputFormat::generate()
return res;
}
void ParallelParsingBlockInputFormat::prepareReadBuffer(ReadBuffer & buffer)
void ParallelParsingInputFormat::prepareReadBuffer(ReadBuffer & buffer)
{
if (prepare_and_end_up_ptr)
prepare_and_end_up_ptr->prepareReadBuffer(buffer);
if (prepare_and_end_up_map.count(format_name))
prepare_and_end_up_map[format_name]->prepareReadBuffer(buffer);
}
void ParallelParsingBlockInputFormat::endUpReadBuffer(ReadBuffer & buffer)
void ParallelParsingInputFormat::endUpReadBuffer(ReadBuffer & buffer)
{
if (prepare_and_end_up_ptr)
prepare_and_end_up_ptr->endUpReadBuffer(buffer);
if (prepare_and_end_up_map.count(format_name))
prepare_and_end_up_map[format_name]->endUpReadBuffer(buffer);
}
}

View File

@ -8,7 +8,6 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadBuffer.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Interpreters/Context.h>
#include "IReadBufferPrepareAndEndUp.h"
@ -50,7 +49,7 @@ class Context;
* 4) repeat until it encounters unit that is marked as "past_the_end"
* All threads must also check for cancel/eof/exception flags.
*/
class ParallelParsingBlockInputFormat : public IInputFormat
class ParallelParsingInputFormat : public IInputFormat
{
public:
/* Used to recreate parser on every new data piece. */
@ -67,7 +66,7 @@ public:
size_t min_chunk_bytes;
};
explicit ParallelParsingBlockInputFormat(Params params)
explicit ParallelParsingInputFormat(Params params)
: IInputFormat(std::move(params.header), params.in)
, internal_parser_creator(params.internal_parser_creator)
, file_segmentation_engine(params.file_segmentation_engine)
@ -83,13 +82,16 @@ public:
// bump into reader thread on wraparound.
processing_units.resize(params.max_threads + 2);
/// To skip '[' and ']'.
if (format_name == "JSONEachRow")
prepare_and_end_up_ptr = std::make_shared<JSONEachRowPrepareAndEndUp>();
initializePrepareEndUpMap();
segmentator_thread = ThreadFromGlobalPool([this] { segmentatorThreadFunction(); });
}
~ParallelParsingInputFormat() override
{
finishAndWait();
}
void resetParser() override final
{
throw Exception("resetParser() is not allowed for " + getName(), ErrorCodes::LOGICAL_ERROR);
@ -263,7 +265,14 @@ private:
// is different.
void onBackgroundException();
IReadBufferPrepareAndEndUpPtr prepare_and_end_up_ptr;
/// To store objects which will prepare and end up ReadBuffer for each format.
std::unordered_map<String, IReadBufferPrepareAndEndUpPtr> prepare_and_end_up_map;
void initializePrepareEndUpMap()
{
/// To skip '[' and ']'.
prepare_and_end_up_map.insert({"JSONEachRow", std::make_shared<JSONEachRowPrepareAndEndUp>()});
}
void prepareReadBuffer(ReadBuffer & buffer);