diff --git a/dbms/src/DataStreams/ParallelParsingBlockInputStream.h b/dbms/src/DataStreams/ParallelParsingBlockInputStream.h index c09254d1cf6..ad921167193 100644 --- a/dbms/src/DataStreams/ParallelParsingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelParsingBlockInputStream.h @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include diff --git a/dbms/src/Formats/FormatFactory.cpp b/dbms/src/Formats/FormatFactory.cpp index 4c9e74ef977..bb1e93f6c02 100644 --- a/dbms/src/Formats/FormatFactory.cpp +++ b/dbms/src/Formats/FormatFactory.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -349,7 +348,6 @@ FormatFactory::FormatFactory() registerFileSegmentationEngineJSONEachRow(*this); registerFileSegmentationEngineTabSeparated(*this); - //registerFileSegmentationEngineValues(*this); registerFileSegmentationEngineCSV(*this); registerFileSegmentationEngineTSKV(*this); diff --git a/dbms/src/Formats/FormatFactory.h b/dbms/src/Formats/FormatFactory.h index 4e044652eb7..b3359a36003 100644 --- a/dbms/src/Formats/FormatFactory.h +++ b/dbms/src/Formats/FormatFactory.h @@ -44,7 +44,7 @@ public: /** Fast reading data from buffer and save result to memory. * Reads at least min_chunk_size bytes and some more until the end of the chunk, depends on the format. - * Used in SharedReadBuffer. + * Used in ParallelParsingBlockInputStream. */ using FileSegmentationEngine = std::function -#include -#include -#include - - -namespace DB -{ - -/** Allows many threads to read from single ReadBuffer - * Each SharedReadBuffer has his own memory, - * which he filles under shared mutex using FileSegmentationEngine. - */ -class SharedReadBuffer : public BufferWithOwnMemory -{ -public: - SharedReadBuffer( - ReadBuffer & in_, - std::shared_ptr & mutex_, - FormatFactory::FileSegmentationEngine file_segmentation_engine_, - size_t min_chunk_size_) - : BufferWithOwnMemory(), - mutex(mutex_), - in(in_), - file_segmentation_engine(file_segmentation_engine_), - min_chunk_size(min_chunk_size_) - { - } - -private: - bool nextImpl() override - { - if (eof || !mutex) - return false; - - std::lock_guard lock(*mutex); - if (in.eof()) - { - eof = true; - return false; - } - - size_t used_size = 0; - bool res = file_segmentation_engine(in, memory, used_size, min_chunk_size); - if (!res) - return false; - - working_buffer = Buffer(memory.data(), memory.data() + used_size); - return true; - } - -private: - // Pointer to common mutex. - std::shared_ptr mutex; - - // Original ReadBuffer to read from. - ReadBuffer & in; - - // Function to fill working_buffer. - FormatFactory::FileSegmentationEngine file_segmentation_engine; - - // FileSegmentationEngine parameter. - size_t min_chunk_size; - - // Indicator of the eof. Save extra lock acquiring. - bool eof{false}; -}; -} diff --git a/dbms/src/Processors/Formats/Impl/CSVRowInputFormat.cpp b/dbms/src/Processors/Formats/Impl/CSVRowInputFormat.cpp index f33b6706c0a..2dbb2f4308e 100644 --- a/dbms/src/Processors/Formats/Impl/CSVRowInputFormat.cpp +++ b/dbms/src/Processors/Formats/Impl/CSVRowInputFormat.cpp @@ -422,66 +422,64 @@ void registerInputFormatProcessorCSV(FormatFactory & factory) } } -void registerFileSegmentationEngineCSV(FormatFactory & factory) +bool fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t & used_size, size_t min_chunk_size) { - factory.registerFileSegmentationEngine("CSV", []( - ReadBuffer & in, - DB::Memory<> & memory, - size_t & used_size, - size_t min_chunk_size) + skipWhitespacesAndTabs(in); + char * begin_pos = in.position(); + bool quotes = false; + bool need_more_data = true; + memory.resize(min_chunk_size); + while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && need_more_data) { - skipWhitespacesAndTabs(in); - char * begin_pos = in.position(); - bool quotes = false; - bool need_more_data = true; - memory.resize(min_chunk_size); - while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && need_more_data) + if (quotes) { - if (quotes) + in.position() = find_first_symbols<'"'>(in.position(), in.buffer().end()); + if (in.position() == in.buffer().end()) + continue; + if (*in.position() == '"') { - in.position() = find_first_symbols<'"'>(in.position(), in.buffer().end()); - if (in.position() == in.buffer().end()) - continue; - if (*in.position() == '"') - { + ++in.position(); + if (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && *in.position() == '"') ++in.position(); - if (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && *in.position() == '"') - ++in.position(); - else - quotes = false; - } - } - else - { - in.position() = find_first_symbols<'"','\r', '\n'>(in.position(), in.buffer().end()); - if (in.position() == in.buffer().end()) - continue; - if (*in.position() == '"') - { - quotes = true; - ++in.position(); - } - else if (*in.position() == '\n') - { - if (used_size + static_cast(in.position() - begin_pos) >= min_chunk_size) - need_more_data = false; - ++in.position(); - if (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && *in.position() == '\r') - ++in.position(); - } - else if (*in.position() == '\r') - { - if (used_size + static_cast(in.position() - begin_pos) >= min_chunk_size) - need_more_data = false; - ++in.position(); - if (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && *in.position() == '\n') - ++in.position(); - } + else + quotes = false; } } - eofWithSavingBufferState(in, memory, used_size, begin_pos, true); - return true; - }); + else + { + in.position() = find_first_symbols<'"','\r', '\n'>(in.position(), in.buffer().end()); + if (in.position() == in.buffer().end()) + continue; + if (*in.position() == '"') + { + quotes = true; + ++in.position(); + } + else if (*in.position() == '\n') + { + if (used_size + static_cast(in.position() - begin_pos) >= min_chunk_size) + need_more_data = false; + ++in.position(); + if (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && *in.position() == '\r') + ++in.position(); + } + else if (*in.position() == '\r') + { + if (used_size + static_cast(in.position() - begin_pos) >= min_chunk_size) + need_more_data = false; + ++in.position(); + if (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && *in.position() == '\n') + ++in.position(); + } + } + } + eofWithSavingBufferState(in, memory, used_size, begin_pos, true); + return true; +} + +void registerFileSegmentationEngineCSV(FormatFactory & factory) +{ + factory.registerFileSegmentationEngine("CSV", &fileSegmentationEngineCSVImpl); } diff --git a/dbms/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp b/dbms/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp index d5a4b50de74..e1bf94e4c23 100644 --- a/dbms/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp +++ b/dbms/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp @@ -270,70 +270,68 @@ void registerInputFormatProcessorJSONEachRow(FormatFactory & factory) }); } -void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory) +bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t & used_size, size_t min_chunk_size) { - factory.registerFileSegmentationEngine("JSONEachRow", []( - ReadBuffer & in, - DB::Memory<> & memory, - size_t & used_size, - size_t min_chunk_size) + skipWhitespaceIfAny(in); + char * begin_pos = in.position(); + size_t balance = 0; + bool quotes = false; + memory.resize(min_chunk_size); + while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) + && (balance || used_size + static_cast(in.position() - begin_pos) < min_chunk_size)) { - skipWhitespaceIfAny(in); - char * begin_pos = in.position(); - size_t balance = 0; - bool quotes = false; - memory.resize(min_chunk_size); - while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) - && (balance || used_size + static_cast(in.position() - begin_pos) < min_chunk_size)) + if (quotes) { - if (quotes) + in.position() = find_first_symbols<'\\', '"'>(in.position(), in.buffer().end()); + if (in.position() == in.buffer().end()) + continue; + if (*in.position() == '\\') { - in.position() = find_first_symbols<'\\', '"'>(in.position(), in.buffer().end()); - if (in.position() == in.buffer().end()) - continue; - if (*in.position() == '\\') - { + ++in.position(); + if (!eofWithSavingBufferState(in, memory, used_size, begin_pos)) ++in.position(); - if (!eofWithSavingBufferState(in, memory, used_size, begin_pos)) - ++in.position(); - } - else if (*in.position() == '"') - { - ++in.position(); - quotes = false; - } } - else + else if (*in.position() == '"') { - in.position() = find_first_symbols<'{', '}', '\\', '"'>(in.position(), in.buffer().end()); - if (in.position() == in.buffer().end()) - continue; - if (*in.position() == '{') - { - ++balance; - ++in.position(); - } - else if (*in.position() == '}') - { - --balance; - ++in.position(); - } - else if (*in.position() == '\\') - { - ++in.position(); - if (!eofWithSavingBufferState(in, memory, used_size, begin_pos)) - ++in.position(); - } - else if (*in.position() == '"') - { - quotes = true; - ++in.position(); - } + ++in.position(); + quotes = false; } } - eofWithSavingBufferState(in, memory, used_size, begin_pos, true); - return true; - }); + else + { + in.position() = find_first_symbols<'{', '}', '\\', '"'>(in.position(), in.buffer().end()); + if (in.position() == in.buffer().end()) + continue; + if (*in.position() == '{') + { + ++balance; + ++in.position(); + } + else if (*in.position() == '}') + { + --balance; + ++in.position(); + } + else if (*in.position() == '\\') + { + ++in.position(); + if (!eofWithSavingBufferState(in, memory, used_size, begin_pos)) + ++in.position(); + } + else if (*in.position() == '"') + { + quotes = true; + ++in.position(); + } + } + } + eofWithSavingBufferState(in, memory, used_size, begin_pos, true); + return true; +} + +void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory) +{ + factory.registerFileSegmentationEngine("JSONEachRow", &fileSegmentationEngineJSONEachRowImpl); } } diff --git a/dbms/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp b/dbms/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp index 5d990cc7fef..e14a5dce16d 100644 --- a/dbms/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp +++ b/dbms/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp @@ -210,38 +210,36 @@ void registerInputFormatProcessorTSKV(FormatFactory & factory) }); } +bool fileSegmentationEngineTSKVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t & used_size, size_t min_chunk_size) +{ + char * begin_pos = in.position(); + bool need_more_data = true; + memory.resize(min_chunk_size); + while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && need_more_data) + { + in.position() = find_first_symbols<'\\','\r', '\n'>(in.position(), in.buffer().end()); + if (in.position() == in.buffer().end()) + continue; + if (*in.position() == '\\') + { + ++in.position(); + if (!eofWithSavingBufferState(in, memory, used_size, begin_pos)) + ++in.position(); + } + else if (*in.position() == '\n' || *in.position() == '\r') + { + if (used_size + static_cast(in.position() - begin_pos) >= min_chunk_size) + need_more_data = false; + ++in.position(); + } + } + eofWithSavingBufferState(in, memory, used_size, begin_pos, true); + return true; +} + void registerFileSegmentationEngineTSKV(FormatFactory & factory) { - factory.registerFileSegmentationEngine("TSKV", []( - ReadBuffer & in, - DB::Memory<> & memory, - size_t & used_size, - size_t min_chunk_size) - { - char * begin_pos = in.position(); - bool need_more_data = true; - memory.resize(min_chunk_size); - while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && need_more_data) - { - in.position() = find_first_symbols<'\\','\r', '\n'>(in.position(), in.buffer().end()); - if (in.position() == in.buffer().end()) - continue; - if (*in.position() == '\\') - { - ++in.position(); - if (!eofWithSavingBufferState(in, memory, used_size, begin_pos)) - ++in.position(); - } - else if (*in.position() == '\n' || *in.position() == '\r') - { - if (used_size + static_cast(in.position() - begin_pos) >= min_chunk_size) - need_more_data = false; - ++in.position(); - } - } - eofWithSavingBufferState(in, memory, used_size, begin_pos, true); - return true; - }); + factory.registerFileSegmentationEngine("TSKV", &fileSegmentationEngineTSKVImpl); } diff --git a/dbms/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp b/dbms/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp index 521d8ad27c5..f13b732b371 100644 --- a/dbms/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp +++ b/dbms/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp @@ -384,45 +384,43 @@ void registerInputFormatProcessorTabSeparated(FormatFactory & factory) } } +bool fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, size_t & used_size, size_t min_chunk_size) +{ + if (in.eof()) + return false; + + char * begin_pos = in.position(); + bool need_more_data = true; + memory.resize(min_chunk_size); + while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && need_more_data) + { + in.position() = find_first_symbols<'\\', '\r', '\n'>(in.position(), in.buffer().end()); + if (in.position() == in.buffer().end()) + { + continue; + } + + if (*in.position() == '\\') + { + ++in.position(); + if (!eofWithSavingBufferState(in, memory, used_size, begin_pos, true)) + ++in.position(); + } else if (*in.position() == '\n' || *in.position() == '\r') + { + if (used_size + static_cast(in.position() - begin_pos) >= min_chunk_size) + need_more_data = false; + ++in.position(); + } + } + eofWithSavingBufferState(in, memory, used_size, begin_pos, true); + return true; +} + void registerFileSegmentationEngineTabSeparated(FormatFactory & factory) { for (auto name : {"TabSeparated", "TSV"}) { - factory.registerFileSegmentationEngine(name, []( - ReadBuffer & in, - DB::Memory<> & memory, - size_t & used_size, - size_t min_chunk_size) - { - if (in.eof()) - return false; - - char * begin_pos = in.position(); - bool need_more_data = true; - memory.resize(min_chunk_size); - while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && need_more_data) - { - in.position() = find_first_symbols<'\\', '\r', '\n'>(in.position(), in.buffer().end()); - if (in.position() == in.buffer().end()) - { - continue; - } - - if (*in.position() == '\\') - { - ++in.position(); - if (!eofWithSavingBufferState(in, memory, used_size, begin_pos, true)) - ++in.position(); - } else if (*in.position() == '\n' || *in.position() == '\r') - { - if (used_size + static_cast(in.position() - begin_pos) >= min_chunk_size) - need_more_data = false; - ++in.position(); - } - } - eofWithSavingBufferState(in, memory, used_size, begin_pos, true); - return true; - }); + factory.registerFileSegmentationEngine(name, &fileSegmentationEngineTabSeparatedImpl); } }