fix JSONEachRowArray

This commit is contained in:
Nikita Mikhaylov 2020-06-10 15:02:34 +03:00 committed by nikitamikhaylov
parent e0addac6fc
commit f40f3ced2a
10 changed files with 117 additions and 48 deletions

View File

@ -150,6 +150,7 @@ InputFormatPtr FormatFactory::getInput(
if (!getCreators(name).input_processor_creator)
{
// const auto & input_getter = getCreators(name).input_creator;
// if (!input_getter)
// throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
@ -194,26 +195,14 @@ InputFormatPtr FormatFactory::getInput(
row_input_format_params.max_execution_time = settings.max_execution_time;
row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;
/// Const reference is copied to lambda.
auto parser_creator = [input_getter, sample, row_input_format_params, format_settings]
(ReadBuffer & input) -> InputFormatPtr
{ return input_getter(input, sample, row_input_format_params, format_settings); };
auto parser_creator = std::bind(
input_getter.target<InputProcessorCreatorFunc>(),
std::placeholders::_1, sample, row_input_format_params, format_settings);
// auto anime = parser_creator(buf)
auto boruto = input_getter(buf, sample, row_input_format_params, format_settings);
// auto naruto = input_getter.target<InputProcessorCreatorFunc>()(buf, sample, row_input_format_params, format_settings);
auto naruto =
[sample, row_input_format_params, format_settings, input_getter]
(ReadBuffer & input)
{return input_getter(input, sample, row_input_format_params, format_settings);};
auto aaa = naruto(buf);
ParallelParsingBlockInputFormat::Params params{buf, sample,
naruto, file_segmentation_engine,
settings.max_threads,
settings.min_chunk_bytes_for_parallel_parsing};
ParallelParsingBlockInputFormat::Params params{
buf, sample, parser_creator, file_segmentation_engine, name, settings.max_threads, settings.min_chunk_bytes_for_parallel_parsing};
return std::make_shared<ParallelParsingBlockInputFormat>(params);
}

View File

@ -9,6 +9,14 @@ bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memor
skipWhitespaceIfAny(in);
char * pos = in.position();
/// In case that independent JSONs are splitted by comma we skip that comma.
if (pos && *pos == ',')
{
++in.position();
++pos;
}
size_t balance = 0;
bool quotes = false;
@ -61,6 +69,7 @@ bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memor
}
saveUpToPosition(in, memory, pos);
assert(*memory.data() == '{');
return loadAtPosition(in, memory, pos);
}

View File

@ -464,9 +464,6 @@ void PipelineExecutor::finalizeExecution()
if (!all_processors_finished)
throw Exception("Pipeline stuck. Current state:\n" + dumpPipeline(), ErrorCodes::LOGICAL_ERROR);
WriteBufferFromOStream out(std::cout);
printPipeline(processors, out);
}
void PipelineExecutor::wakeUpExecutor(size_t thread_num)

View File

@ -0,0 +1,56 @@
#pragma once
#include <IO/ReadHelpers.h>
namespace DB
{
class IReadBufferPrepareAndEndUp
{
public:
virtual ~IReadBufferPrepareAndEndUp() {}
virtual void prepareReadBuffer(ReadBuffer & buffer) = 0;
virtual void endUpReadBuffer(ReadBuffer & buffer) = 0;
};
using IReadBufferPrepareAndEndUpPtr = std::shared_ptr<IReadBufferPrepareAndEndUp>;
class JSONEachRowPrepareAndEndUp : public IReadBufferPrepareAndEndUp
{
public:
void prepareReadBuffer(ReadBuffer & buffer) override
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(in);
skipWhitespaceIfAny(in);
if (!in.eof() && *in.position() == '[')
{
++in.position();
data_in_square_brackets = true;
}
}
void endUpReadBuffer(ReadBuffer & buffer) override
{
skipWhitespaceIfAny(buffer);
if (data_in_square_brackets)
{
assertChar(']', buffer);
skipWhitespaceIfAny(buffer);
}
if (!buffer.eof() && *buffer.position() == ';')
{
++buffer.position();
skipWhitespaceIfAny(buffer);
}
assertEOF(buffer);
}
private:
bool data_in_square_brackets{false};
};
}

View File

@ -303,31 +303,12 @@ void JSONEachRowRowInputFormat::resetParser()
void JSONEachRowRowInputFormat::readPrefix()
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(in);
skipWhitespaceIfAny(in);
if (!in.eof() && *in.position() == '[')
{
++in.position();
data_in_square_brackets = true;
}
prepare_and_end_up.prepareReadBuffer(in);
}
void JSONEachRowRowInputFormat::readSuffix()
{
skipWhitespaceIfAny(in);
if (data_in_square_brackets)
{
assertChar(']', in);
skipWhitespaceIfAny(in);
}
if (!in.eof() && *in.position() == ';')
{
++in.position();
skipWhitespaceIfAny(in);
}
assertEOF(in);
prepare_and_end_up.endUpReadBuffer(in);
}

View File

@ -4,7 +4,7 @@
#include <Processors/Formats/IRowInputFormat.h>
#include <Formats/FormatSettings.h>
#include <Common/HashTable/HashMap.h>
#include "IReadBufferPrepareAndEndUp.h"
namespace DB
{
@ -81,7 +81,11 @@ private:
bool allow_new_rows = true;
bool yield_strings;
/// Used when readSuffix() or readPrefix() are called.
JSONEachRowPrepareAndEndUp prepare_and_end_up;
};
}

View File

@ -1,4 +1,5 @@
#include <Processors/Formats/Impl/ParallelParsingBlockInputFormat.h>
#include <IO/ReadHelpers.h>
namespace DB
{
@ -64,6 +65,10 @@ void ParallelParsingBlockInputFormat::parserThreadFunction(size_t current_ticket
* can use it from multiple threads simultaneously.
*/
ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0);
if (current_ticket_number == 0)
prepareReadBuffer(read_buffer);
InputFormatPtr input_format = internal_parser_creator(read_buffer);
InternalParser parser(input_format);
@ -83,6 +88,9 @@ void ParallelParsingBlockInputFormat::parserThreadFunction(size_t current_ticket
// except at the end of file. Also see a matching assert in readImpl().
assert(unit.is_last || !unit.chunk_ext.chunk.empty());
if (unit.is_last)
endUpReadBuffer(read_buffer);
std::lock_guard<std::mutex> lock(mutex);
unit.status = READY_TO_READ;
reader_condvar.notify_all();
@ -198,5 +206,17 @@ Chunk ParallelParsingBlockInputFormat::generate()
return res;
}
void ParallelParsingBlockInputFormat::prepareReadBuffer(ReadBuffer & buffer)
{
if (prepare_and_end_up_ptr)
prepare_and_end_up_ptr->prepareReadBuffer(buffer);
}
void ParallelParsingBlockInputFormat::endUpReadBuffer(ReadBuffer & buffer)
{
if (prepare_and_end_up_ptr)
prepare_and_end_up_ptr->endUpReadBuffer(buffer);
}
}

View File

@ -10,6 +10,7 @@
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Interpreters/Context.h>
#include "IReadBufferPrepareAndEndUp.h"
namespace DB
{
@ -61,6 +62,7 @@ public:
Block header;
InternalParserCreator internal_parser_creator;
FormatFactory::FileSegmentationEngine file_segmentation_engine;
String format_name;
size_t max_threads;
size_t min_chunk_bytes;
};
@ -69,6 +71,7 @@ public:
: IInputFormat(std::move(params.header), params.in)
, internal_parser_creator(params.internal_parser_creator)
, file_segmentation_engine(params.file_segmentation_engine)
, format_name(params.format_name)
, min_chunk_bytes(params.min_chunk_bytes)
// Subtract one thread that we use for segmentation and one for
// reading. After that, must have at least two threads left for
@ -80,6 +83,10 @@ public:
// bump into reader thread on wraparound.
processing_units.resize(params.max_threads + 2);
/// To skip '[' and ']'.
if (format_name == "JSONEachRow")
prepare_and_end_up_ptr = std::make_shared<JSONEachRowPrepareAndEndUp>();
segmentator_thread = ThreadFromGlobalPool([this] { segmentatorThreadFunction(); });
}
@ -165,6 +172,7 @@ private:
const InternalParserCreator internal_parser_creator;
// Function to segment the file. Then "parsers" will parse that segments.
FormatFactory::FileSegmentationEngine file_segmentation_engine;
const String format_name;
const size_t min_chunk_bytes;
BlockMissingValues last_block_missing_values;
@ -178,7 +186,7 @@ private:
std::condition_variable reader_condvar;
std::condition_variable segmentator_condvar;
std::atomic<bool> parsing_finished;
std::atomic<bool> parsing_finished{false};
// There are multiple "parsers", that's why we use thread pool.
ThreadPool pool;
@ -254,6 +262,12 @@ private:
// readImpl() is called from the main thread, so the exception handling
// is different.
void onBackgroundException();
IReadBufferPrepareAndEndUpPtr prepare_and_end_up_ptr;
void prepareReadBuffer(ReadBuffer & buffer);
void endUpReadBuffer(ReadBuffer & buffer);
};
}

View File

@ -60,7 +60,7 @@
#include <typeinfo>
#include <typeindex>
#include <unordered_set>
#include <Processors/Formats/InputStreamFromInputFormat.h>
namespace ProfileEvents
{
@ -2904,8 +2904,8 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
ReadBufferFromMemory right_paren_buf(")", 1);
ConcatReadBuffer buf({&left_paren_buf, &fields_buf, &right_paren_buf});
auto format = FormatFactory::instance().getInput("Values", buf, partition_key_sample, context, context.getSettingsRef().max_block_size);
auto input_stream = std::make_shared<InputStreamFromInputFormat>(format);
auto input_format = FormatFactory::instance().getInput("Values", buf, getPartitionKey().sample_block, context, context.getSettingsRef().max_block_size);
auto input_stream = std::make_shared<InputStreamFromInputFormat>(input_format);
auto block = input_stream->read();
if (!block || !block.rows())

View File

@ -291,7 +291,6 @@ public:
Chunk generate() override
{
std::cout << StackTrace().toString() << std::endl;
while (!finished_generate)
{
/// Open file lazily on first read. This is needed to avoid too many open files from different streams.