diff --git a/src/Common/Arena.h b/src/Common/Arena.h index 44a9b444ff2..2a05a257381 100644 --- a/src/Common/Arena.h +++ b/src/Common/Arena.h @@ -15,7 +15,7 @@ namespace ProfileEvents { - extern const Event ArenaAllocChunks; + extern const Event ArenaAllocMemoryChunks; extern const Event ArenaAllocBytes; } @@ -28,7 +28,7 @@ namespace DB * - put lot of strings inside pool, keep their addresses; * - addresses remain valid during lifetime of pool; * - at destruction of pool, all memory is freed; - * - memory is allocated and freed by large chunks; + * - memory is allocated and freed by large MemoryChunks; * - freeing parts of data is not possible (but look at ArenaWithFreeLists if you need); */ class Arena : private boost::noncopyable @@ -37,29 +37,31 @@ private: /// Padding allows to use 'memcpySmallAllowReadWriteOverflow15' instead of 'memcpy'. static constexpr size_t pad_right = 15; - /// Contiguous chunk of memory and pointer to free space inside it. Member of single-linked list. - struct alignas(16) Chunk : private Allocator /// empty base optimization + /// Contiguous MemoryChunk of memory and pointer to free space inside it. Member of single-linked list. + struct alignas(16) MemoryChunk : private Allocator /// empty base optimization { char * begin; char * pos; char * end; /// does not include padding. - Chunk * prev; + MemoryChunk * prev; + MemoryChunk * next; - Chunk(size_t size_, Chunk * prev_) + MemoryChunk(size_t size_, MemoryChunk * prev_) { - ProfileEvents::increment(ProfileEvents::ArenaAllocChunks); + ProfileEvents::increment(ProfileEvents::ArenaAllocMemoryChunks); ProfileEvents::increment(ProfileEvents::ArenaAllocBytes, size_); begin = reinterpret_cast(Allocator::alloc(size_)); pos = begin; end = begin + size_ - pad_right; prev = prev_; + prev->next = this; ASAN_POISON_MEMORY_REGION(begin, size_); } - ~Chunk() + ~MemoryChunk() { /// We must unpoison the memory before returning to the allocator, /// because the allocator might not have asan integration, and the @@ -80,8 +82,8 @@ private: size_t growth_factor; size_t linear_growth_threshold; - /// Last contiguous chunk of memory. - Chunk * head; + /// Last contiguous MemoryChunk of memory. + MemoryChunk * head; size_t size_in_bytes; static size_t roundUpToPageSize(size_t s) @@ -89,7 +91,7 @@ private: return (s + 4096 - 1) / 4096 * 4096; } - /// If chunks size is less than 'linear_growth_threshold', then use exponential growth, otherwise - linear growth + /// If MemoryChunks size is less than 'linear_growth_threshold', then use exponential growth, otherwise - linear growth /// (to not allocate too much excessive memory). size_t nextSize(size_t min_next_size) const { @@ -103,7 +105,7 @@ private: { // allocContinue() combined with linear growth results in quadratic // behavior: we append the data by small amounts, and when it - // doesn't fit, we create a new chunk and copy all the previous data + // doesn't fit, we create a new MemoryChunk and copy all the previous data // into it. The number of times we do this is directly proportional // to the total size of data that is going to be serialized. To make // the copying happen less often, round the next size up to the @@ -116,10 +118,10 @@ private: return roundUpToPageSize(size_after_grow); } - /// Add next contiguous chunk of memory with size not less than specified. - void NO_INLINE addChunk(size_t min_size) + /// Add next contiguous MemoryChunk of memory with size not less than specified. + void NO_INLINE addMemoryChunk(size_t min_size) { - head = new Chunk(nextSize(min_size + pad_right), head); + head = new MemoryChunk(nextSize(min_size + pad_right), head); size_in_bytes += head->size(); } @@ -129,7 +131,7 @@ private: public: Arena(size_t initial_size_ = 4096, size_t growth_factor_ = 2, size_t linear_growth_threshold_ = 128 * 1024 * 1024) : growth_factor(growth_factor_), linear_growth_threshold(linear_growth_threshold_), - head(new Chunk(initial_size_, nullptr)), size_in_bytes(head->size()) + head(new MemoryChunk(initial_size_, nullptr)), size_in_bytes(head->size()) { } @@ -142,7 +144,7 @@ public: char * alloc(size_t size) { if (unlikely(head->pos + size > head->end)) - addChunk(size); + addMemoryChunk(size); char * res = head->pos; head->pos += size; @@ -167,7 +169,7 @@ public: return res; } - addChunk(size + alignment); + addMemoryChunk(size + alignment); } while (true); } @@ -192,8 +194,8 @@ public: /** Begin or expand a contiguous range of memory. * 'range_start' is the start of range. If nullptr, a new range is * allocated. - * If there is no space in the current chunk to expand the range, - * the entire range is copied to a new, bigger memory chunk, and the value + * If there is no space in the current MemoryChunk to expand the range, + * the entire range is copied to a new, bigger memory MemoryChunk, and the value * of 'range_start' is updated. * If the optional 'start_alignment' is specified, the start of range is * kept aligned to this value. @@ -207,7 +209,7 @@ public: /* * Allocating zero bytes doesn't make much sense. Also, a zero-sized * range might break the invariant that the range begins at least before - * the current chunk end. + * the current MemoryChunk end. */ assert(additional_bytes > 0); @@ -226,19 +228,19 @@ public: // This method only works for extending the last allocation. For lack of // original size, check a weaker condition: that 'begin' is at least in - // the current Chunk. + // the current MemoryChunk. assert(range_start >= head->begin); assert(range_start < head->end); if (head->pos + additional_bytes <= head->end) { - // The new size fits into the last chunk, so just alloc the + // The new size fits into the last MemoryChunk, so just alloc the // additional size. We can alloc without alignment here, because it // only applies to the start of the range, and we don't change it. return alloc(additional_bytes); } - // New range doesn't fit into this chunk, will copy to a new one. + // New range doesn't fit into this MemoryChunk, will copy to a new one. // // Note: among other things, this method is used to provide a hack-ish // implementation of realloc over Arenas in ArenaAllocators. It wastes a @@ -299,16 +301,16 @@ public: return res; } - /// Size of chunks in bytes. + /// Size of MemoryChunks in bytes. size_t size() const { return size_in_bytes; } - /// Bad method, don't use it -- the chunks are not your business, the entire + /// Bad method, don't use it -- the MemoryChunks are not your business, the entire /// purpose of the arena code is to manage them for you, so if you find /// yourself having to use this method, probably you're doing something wrong. - size_t remainingSpaceInCurrentChunk() const + size_t remainingSpaceInCurrentMemoryChunk() const { return head->remaining(); } diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index 8dd6cbbe02c..27b76865ab9 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -13,7 +13,6 @@ #include #include - /** Very simple thread pool similar to boost::threadpool. * Advantages: * - catches exceptions and rethrows on wait. diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 9fba199d0c1..6fde6bee430 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -16,7 +16,7 @@ #include #include #include -#include +#include #include #if !defined(ARCADIA_BUILD) @@ -201,9 +201,9 @@ InputFormatPtr FormatFactory::getInput( { return input_getter(input, sample, row_input_format_params, format_settings); }; - ParallelParsingBlockInputFormat::Params params{ + ParallelParsingInputFormat::Params params{ buf, sample, parser_creator, file_segmentation_engine, name, settings.max_threads, settings.min_chunk_bytes_for_parallel_parsing}; - return std::make_shared(params); + return std::make_shared(params); } diff --git a/src/IO/WriteBufferFromArena.h b/src/IO/WriteBufferFromArena.h index ee09240e9c7..b5fd9fac5a3 100644 --- a/src/IO/WriteBufferFromArena.h +++ b/src/IO/WriteBufferFromArena.h @@ -35,7 +35,7 @@ private: /// tear down the entire WriteBuffer thing and implement it again, /// properly. size_t continuation_size = std::max(size_t(1), - std::max(count(), arena.remainingSpaceInCurrentChunk())); + std::max(count(), arena.remainingSpaceInCurrentMemoryChunk())); /// allocContinue method will possibly move memory region to new place and modify "begin" pointer. diff --git a/src/IO/WriteHelpers.h b/src/IO/WriteHelpers.h index e5c6f5b4ab8..4d070577e84 100644 --- a/src/IO/WriteHelpers.h +++ b/src/IO/WriteHelpers.h @@ -557,6 +557,7 @@ void writeProbablyBackQuotedStringMySQL(const StringRef & s, WriteBuffer & buf); template void writeCSVString(const char * begin, const char * end, WriteBuffer & buf) { + std::cout << StackTrace().toString() << std::endl; writeChar(quote, buf); const char * pos = begin; diff --git a/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp b/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp index 2d6a49ccb6f..4546f83d6cc 100644 --- a/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp @@ -11,6 +11,7 @@ namespace DB CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_) : IRowOutputFormat(header_, out_, params_), with_names(with_names_), format_settings(format_settings_) { + std::cout << StackTrace().toString() << std::endl; const auto & sample = getPort(PortKind::Main).getHeader(); size_t columns = sample.columns(); data_types.resize(columns); diff --git a/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp b/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp index 15d8a843f41..6a87feab315 100644 --- a/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp @@ -16,6 +16,7 @@ JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat( : IRowOutputFormat(header_, out_, params_), settings(settings_) { + std::cout << StackTrace().toString() << std::endl; const auto & sample = getPort(PortKind::Main).getHeader(); size_t columns = sample.columns(); fields.resize(columns); diff --git a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h new file mode 100644 index 00000000000..52b5475420d --- /dev/null +++ b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h @@ -0,0 +1,147 @@ +#pragma once + +#include +#include +#include + +#include +#include + +#include + +namespace DB +{ + +const size_t min_chunk_bytes_for_parallel_formatting = 1024; + +class ParallelFormattingOutputFormat : public IOutputFormat +{ +public: + + struct Params + { + const Block & header; + WriteBuffer & out; + }; + + explicit ParallelFormattingOutputFormat(Params params) + : IOutputFormat(params.header, params.out) + , original_write_buffer(params.out) + { + + } + + String getName() const override final { return "ParallelFormattingOutputFormat"; } + +protected: + void consume(Chunk chunk) override final + { + if (chunk.empty()) + formatting_finished = true; + + const auto current_unit_number = writer_unit_number % processing_units.size(); + + auto & unit = processing_units[current_unit_number]; + + { + std::unique_lock lock(mutex); + writer_condvar.wait(lock, + [&]{ return unit.status == READY_TO_INSERT || formatting_finished; }); + } + + assert(unit.status == READY_TO_INSERT); + + unit.chunk = std::move(chunk); + /// TODO: Allocate new arena for current chunk + + + /// TODO: Submit task to ThreadPool + } + +private: + + WriteBuffer & original_write_buffer; + + enum ProcessingUnitStatus + { + READY_TO_INSERT, + READY_TO_FORMAT, + READY_TO_READ + }; + + struct ProcessingUnit + { + Chunk chunk; + Arena arena; + ProcessingUnitStatus status; + }; + + + + std::deque processing_units; + + std::mutex mutex; + std::atomic_bool formatting_finished{false}; + + + std::atomic_size_t collector_unit_number{0}; + std::atomic_size_t writer_unit_number{0}; + + std::condition_variable collector_condvar; + std::condition_variable writer_condvar; + + void сollectorThreadFunction() + { + setThreadName("Collector"); + + while (!formatting_finished) + { + const auto current_unit_number = collector_unit_number % processing_units.size(); + + auto & unit = processing_units[current_unit_number]; + + { + std::unique_lock lock(mutex); + collector_condvar.wait(lock, + [&]{ return unit.status == READY_TO_READ || formatting_finished; }); + } + + if (formatting_finished) + { + break; + } + + assert(unit.status == READY_TO_READ); + + /// TODO: Arena is singly linked list, and it is simply a list of chunks. + /// We have to write them all into original WriteBuffer. + char * arena_begin = nullptr; + size_t arena_size = 0; + /// Do main work here. + original_write_buffer.write(arena_begin, arena_size); + + /// How to drop this arena? + + + std::lock_guard lock(mutex); + unit.status = READY_TO_INSERT; + writer_condvar.notify_all(); + } + + } + + + void formatterThreadFunction(size_t current_unit_number) + { + setThreadName("Formatter"); + + auto & unit = processing_units[current_unit_number]; + + const char * arena_begin = nullptr; + WriteBufferFromArena out(unit.arena, arena_begin); + + /// TODO: create parser and parse current chunk to arena + } +}; + +} diff --git a/src/Processors/Formats/Impl/ParallelParsingBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp similarity index 88% rename from src/Processors/Formats/Impl/ParallelParsingBlockInputFormat.cpp rename to src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp index ad7a160ce11..898c103cb24 100644 --- a/src/Processors/Formats/Impl/ParallelParsingBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp @@ -1,10 +1,10 @@ -#include +#include #include namespace DB { -void ParallelParsingBlockInputFormat::segmentatorThreadFunction() +void ParallelParsingInputFormat::segmentatorThreadFunction() { setThreadName("Segmentator"); try @@ -49,7 +49,7 @@ void ParallelParsingBlockInputFormat::segmentatorThreadFunction() } } -void ParallelParsingBlockInputFormat::parserThreadFunction(size_t current_ticket_number) +void ParallelParsingInputFormat::parserThreadFunction(size_t current_ticket_number) { try { @@ -58,6 +58,8 @@ void ParallelParsingBlockInputFormat::parserThreadFunction(size_t current_ticket const auto current_unit_number = current_ticket_number % processing_units.size(); auto & unit = processing_units[current_unit_number]; + assert(unit.segment.size() > 0); + /* * This is kind of suspicious -- the input_process_creator contract with * respect to multithreaded use is not clear, but we hope that it is @@ -86,7 +88,7 @@ void ParallelParsingBlockInputFormat::parserThreadFunction(size_t current_ticket // We suppose we will get at least some blocks for a non-empty buffer, // except at the end of file. Also see a matching assert in readImpl(). - assert(unit.is_last || !unit.chunk_ext.chunk.empty()); + assert(unit.is_last || !unit.chunk_ext.chunk.empty() || parsing_finished); if (unit.is_last) endUpReadBuffer(read_buffer); @@ -102,7 +104,7 @@ void ParallelParsingBlockInputFormat::parserThreadFunction(size_t current_ticket } -void ParallelParsingBlockInputFormat::onBackgroundException() +void ParallelParsingInputFormat::onBackgroundException() { tryLogCurrentException(__PRETTY_FUNCTION__); @@ -116,7 +118,7 @@ void ParallelParsingBlockInputFormat::onBackgroundException() segmentator_condvar.notify_all(); } -Chunk ParallelParsingBlockInputFormat::generate() +Chunk ParallelParsingInputFormat::generate() { if (isCancelled() || parsing_finished) { @@ -206,17 +208,17 @@ Chunk ParallelParsingBlockInputFormat::generate() return res; } -void ParallelParsingBlockInputFormat::prepareReadBuffer(ReadBuffer & buffer) +void ParallelParsingInputFormat::prepareReadBuffer(ReadBuffer & buffer) { - if (prepare_and_end_up_ptr) - prepare_and_end_up_ptr->prepareReadBuffer(buffer); + if (prepare_and_end_up_map.count(format_name)) + prepare_and_end_up_map[format_name]->prepareReadBuffer(buffer); } -void ParallelParsingBlockInputFormat::endUpReadBuffer(ReadBuffer & buffer) +void ParallelParsingInputFormat::endUpReadBuffer(ReadBuffer & buffer) { - if (prepare_and_end_up_ptr) - prepare_and_end_up_ptr->endUpReadBuffer(buffer); + if (prepare_and_end_up_map.count(format_name)) + prepare_and_end_up_map[format_name]->endUpReadBuffer(buffer); } } diff --git a/src/Processors/Formats/Impl/ParallelParsingBlockInputFormat.h b/src/Processors/Formats/Impl/ParallelParsingInputFormat.h similarity index 94% rename from src/Processors/Formats/Impl/ParallelParsingBlockInputFormat.h rename to src/Processors/Formats/Impl/ParallelParsingInputFormat.h index 1a0cd2c537a..35cf7047d3c 100644 --- a/src/Processors/Formats/Impl/ParallelParsingBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParallelParsingInputFormat.h @@ -8,7 +8,6 @@ #include #include #include -#include #include #include "IReadBufferPrepareAndEndUp.h" @@ -50,7 +49,7 @@ class Context; * 4) repeat until it encounters unit that is marked as "past_the_end" * All threads must also check for cancel/eof/exception flags. */ -class ParallelParsingBlockInputFormat : public IInputFormat +class ParallelParsingInputFormat : public IInputFormat { public: /* Used to recreate parser on every new data piece. */ @@ -67,7 +66,7 @@ public: size_t min_chunk_bytes; }; - explicit ParallelParsingBlockInputFormat(Params params) + explicit ParallelParsingInputFormat(Params params) : IInputFormat(std::move(params.header), params.in) , internal_parser_creator(params.internal_parser_creator) , file_segmentation_engine(params.file_segmentation_engine) @@ -83,13 +82,16 @@ public: // bump into reader thread on wraparound. processing_units.resize(params.max_threads + 2); - /// To skip '[' and ']'. - if (format_name == "JSONEachRow") - prepare_and_end_up_ptr = std::make_shared(); + initializePrepareEndUpMap(); segmentator_thread = ThreadFromGlobalPool([this] { segmentatorThreadFunction(); }); } + ~ParallelParsingInputFormat() override + { + finishAndWait(); + } + void resetParser() override final { throw Exception("resetParser() is not allowed for " + getName(), ErrorCodes::LOGICAL_ERROR); @@ -263,7 +265,14 @@ private: // is different. void onBackgroundException(); - IReadBufferPrepareAndEndUpPtr prepare_and_end_up_ptr; + /// To store objects which will prepare and end up ReadBuffer for each format. + std::unordered_map prepare_and_end_up_map; + + void initializePrepareEndUpMap() + { + /// To skip '[' and ']'. + prepare_and_end_up_map.insert({"JSONEachRow", std::make_shared()}); + } void prepareReadBuffer(ReadBuffer & buffer);