remove SharedReadBuffer + remove lambdas

This commit is contained in:
Nikita Mikhaylov 2019-10-25 15:28:24 +03:00
parent 78b6322797
commit 682b9df17f
8 changed files with 166 additions and 247 deletions

View File

@ -5,7 +5,6 @@
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/SharedReadBuffer.h>
#include <IO/ReadBuffer.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>

View File

@ -6,7 +6,6 @@
#include <DataStreams/ParallelParsingBlockInputStream.h>
#include <Formats/FormatSettings.h>
#include <Formats/FormatFactory.h>
#include <IO/SharedReadBuffer.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
@ -349,7 +348,6 @@ FormatFactory::FormatFactory()
registerFileSegmentationEngineJSONEachRow(*this);
registerFileSegmentationEngineTabSeparated(*this);
//registerFileSegmentationEngineValues(*this);
registerFileSegmentationEngineCSV(*this);
registerFileSegmentationEngineTSKV(*this);

View File

@ -44,7 +44,7 @@ public:
/** Fast reading data from buffer and save result to memory.
* Reads at least min_chunk_size bytes and some more until the end of the chunk, depends on the format.
* Used in SharedReadBuffer.
* Used in ParallelParsingBlockInputStream.
*/
using FileSegmentationEngine = std::function<bool(
ReadBuffer & buf,

View File

@ -1,70 +0,0 @@
#pragma once
#include <Formats/FormatFactory.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadBuffer.h>
#include <Common/setThreadName.h>
namespace DB
{
/** Allows many threads to read from single ReadBuffer
* Each SharedReadBuffer has his own memory,
* which he filles under shared mutex using FileSegmentationEngine.
*/
class SharedReadBuffer : public BufferWithOwnMemory<ReadBuffer>
{
public:
SharedReadBuffer(
ReadBuffer & in_,
std::shared_ptr<std::mutex> & mutex_,
FormatFactory::FileSegmentationEngine file_segmentation_engine_,
size_t min_chunk_size_)
: BufferWithOwnMemory<ReadBuffer>(),
mutex(mutex_),
in(in_),
file_segmentation_engine(file_segmentation_engine_),
min_chunk_size(min_chunk_size_)
{
}
private:
bool nextImpl() override
{
if (eof || !mutex)
return false;
std::lock_guard<std::mutex> lock(*mutex);
if (in.eof())
{
eof = true;
return false;
}
size_t used_size = 0;
bool res = file_segmentation_engine(in, memory, used_size, min_chunk_size);
if (!res)
return false;
working_buffer = Buffer(memory.data(), memory.data() + used_size);
return true;
}
private:
// Pointer to common mutex.
std::shared_ptr<std::mutex> mutex;
// Original ReadBuffer to read from.
ReadBuffer & in;
// Function to fill working_buffer.
FormatFactory::FileSegmentationEngine file_segmentation_engine;
// FileSegmentationEngine parameter.
size_t min_chunk_size;
// Indicator of the eof. Save extra lock acquiring.
bool eof{false};
};
}

View File

@ -422,66 +422,64 @@ void registerInputFormatProcessorCSV(FormatFactory & factory)
}
}
void registerFileSegmentationEngineCSV(FormatFactory & factory)
bool fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t & used_size, size_t min_chunk_size)
{
factory.registerFileSegmentationEngine("CSV", [](
ReadBuffer & in,
DB::Memory<> & memory,
size_t & used_size,
size_t min_chunk_size)
skipWhitespacesAndTabs(in);
char * begin_pos = in.position();
bool quotes = false;
bool need_more_data = true;
memory.resize(min_chunk_size);
while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && need_more_data)
{
skipWhitespacesAndTabs(in);
char * begin_pos = in.position();
bool quotes = false;
bool need_more_data = true;
memory.resize(min_chunk_size);
while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && need_more_data)
if (quotes)
{
if (quotes)
in.position() = find_first_symbols<'"'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
continue;
if (*in.position() == '"')
{
in.position() = find_first_symbols<'"'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
continue;
if (*in.position() == '"')
{
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && *in.position() == '"')
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && *in.position() == '"')
++in.position();
else
quotes = false;
}
}
else
{
in.position() = find_first_symbols<'"','\r', '\n'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
continue;
if (*in.position() == '"')
{
quotes = true;
++in.position();
}
else if (*in.position() == '\n')
{
if (used_size + static_cast<size_t>(in.position() - begin_pos) >= min_chunk_size)
need_more_data = false;
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && *in.position() == '\r')
++in.position();
}
else if (*in.position() == '\r')
{
if (used_size + static_cast<size_t>(in.position() - begin_pos) >= min_chunk_size)
need_more_data = false;
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && *in.position() == '\n')
++in.position();
}
else
quotes = false;
}
}
eofWithSavingBufferState(in, memory, used_size, begin_pos, true);
return true;
});
else
{
in.position() = find_first_symbols<'"','\r', '\n'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
continue;
if (*in.position() == '"')
{
quotes = true;
++in.position();
}
else if (*in.position() == '\n')
{
if (used_size + static_cast<size_t>(in.position() - begin_pos) >= min_chunk_size)
need_more_data = false;
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && *in.position() == '\r')
++in.position();
}
else if (*in.position() == '\r')
{
if (used_size + static_cast<size_t>(in.position() - begin_pos) >= min_chunk_size)
need_more_data = false;
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && *in.position() == '\n')
++in.position();
}
}
}
eofWithSavingBufferState(in, memory, used_size, begin_pos, true);
return true;
}
void registerFileSegmentationEngineCSV(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("CSV", &fileSegmentationEngineCSVImpl);
}

View File

@ -270,70 +270,68 @@ void registerInputFormatProcessorJSONEachRow(FormatFactory & factory)
});
}
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory)
bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t & used_size, size_t min_chunk_size)
{
factory.registerFileSegmentationEngine("JSONEachRow", [](
ReadBuffer & in,
DB::Memory<> & memory,
size_t & used_size,
size_t min_chunk_size)
skipWhitespaceIfAny(in);
char * begin_pos = in.position();
size_t balance = 0;
bool quotes = false;
memory.resize(min_chunk_size);
while (!eofWithSavingBufferState(in, memory, used_size, begin_pos)
&& (balance || used_size + static_cast<size_t>(in.position() - begin_pos) < min_chunk_size))
{
skipWhitespaceIfAny(in);
char * begin_pos = in.position();
size_t balance = 0;
bool quotes = false;
memory.resize(min_chunk_size);
while (!eofWithSavingBufferState(in, memory, used_size, begin_pos)
&& (balance || used_size + static_cast<size_t>(in.position() - begin_pos) < min_chunk_size))
if (quotes)
{
if (quotes)
in.position() = find_first_symbols<'\\', '"'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
continue;
if (*in.position() == '\\')
{
in.position() = find_first_symbols<'\\', '"'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
continue;
if (*in.position() == '\\')
{
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos))
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos))
++in.position();
}
else if (*in.position() == '"')
{
++in.position();
quotes = false;
}
}
else
else if (*in.position() == '"')
{
in.position() = find_first_symbols<'{', '}', '\\', '"'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
continue;
if (*in.position() == '{')
{
++balance;
++in.position();
}
else if (*in.position() == '}')
{
--balance;
++in.position();
}
else if (*in.position() == '\\')
{
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos))
++in.position();
}
else if (*in.position() == '"')
{
quotes = true;
++in.position();
}
++in.position();
quotes = false;
}
}
eofWithSavingBufferState(in, memory, used_size, begin_pos, true);
return true;
});
else
{
in.position() = find_first_symbols<'{', '}', '\\', '"'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
continue;
if (*in.position() == '{')
{
++balance;
++in.position();
}
else if (*in.position() == '}')
{
--balance;
++in.position();
}
else if (*in.position() == '\\')
{
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos))
++in.position();
}
else if (*in.position() == '"')
{
quotes = true;
++in.position();
}
}
}
eofWithSavingBufferState(in, memory, used_size, begin_pos, true);
return true;
}
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("JSONEachRow", &fileSegmentationEngineJSONEachRowImpl);
}
}

View File

@ -210,38 +210,36 @@ void registerInputFormatProcessorTSKV(FormatFactory & factory)
});
}
bool fileSegmentationEngineTSKVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t & used_size, size_t min_chunk_size)
{
char * begin_pos = in.position();
bool need_more_data = true;
memory.resize(min_chunk_size);
while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && need_more_data)
{
in.position() = find_first_symbols<'\\','\r', '\n'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
continue;
if (*in.position() == '\\')
{
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos))
++in.position();
}
else if (*in.position() == '\n' || *in.position() == '\r')
{
if (used_size + static_cast<size_t>(in.position() - begin_pos) >= min_chunk_size)
need_more_data = false;
++in.position();
}
}
eofWithSavingBufferState(in, memory, used_size, begin_pos, true);
return true;
}
void registerFileSegmentationEngineTSKV(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("TSKV", [](
ReadBuffer & in,
DB::Memory<> & memory,
size_t & used_size,
size_t min_chunk_size)
{
char * begin_pos = in.position();
bool need_more_data = true;
memory.resize(min_chunk_size);
while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && need_more_data)
{
in.position() = find_first_symbols<'\\','\r', '\n'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
continue;
if (*in.position() == '\\')
{
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos))
++in.position();
}
else if (*in.position() == '\n' || *in.position() == '\r')
{
if (used_size + static_cast<size_t>(in.position() - begin_pos) >= min_chunk_size)
need_more_data = false;
++in.position();
}
}
eofWithSavingBufferState(in, memory, used_size, begin_pos, true);
return true;
});
factory.registerFileSegmentationEngine("TSKV", &fileSegmentationEngineTSKVImpl);
}

View File

@ -384,45 +384,43 @@ void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
}
}
bool fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, size_t & used_size, size_t min_chunk_size)
{
if (in.eof())
return false;
char * begin_pos = in.position();
bool need_more_data = true;
memory.resize(min_chunk_size);
while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && need_more_data)
{
in.position() = find_first_symbols<'\\', '\r', '\n'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
{
continue;
}
if (*in.position() == '\\')
{
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos, true))
++in.position();
} else if (*in.position() == '\n' || *in.position() == '\r')
{
if (used_size + static_cast<size_t>(in.position() - begin_pos) >= min_chunk_size)
need_more_data = false;
++in.position();
}
}
eofWithSavingBufferState(in, memory, used_size, begin_pos, true);
return true;
}
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory)
{
for (auto name : {"TabSeparated", "TSV"})
{
factory.registerFileSegmentationEngine(name, [](
ReadBuffer & in,
DB::Memory<> & memory,
size_t & used_size,
size_t min_chunk_size)
{
if (in.eof())
return false;
char * begin_pos = in.position();
bool need_more_data = true;
memory.resize(min_chunk_size);
while (!eofWithSavingBufferState(in, memory, used_size, begin_pos) && need_more_data)
{
in.position() = find_first_symbols<'\\', '\r', '\n'>(in.position(), in.buffer().end());
if (in.position() == in.buffer().end())
{
continue;
}
if (*in.position() == '\\')
{
++in.position();
if (!eofWithSavingBufferState(in, memory, used_size, begin_pos, true))
++in.position();
} else if (*in.position() == '\n' || *in.position() == '\r')
{
if (used_size + static_cast<size_t>(in.position() - begin_pos) >= min_chunk_size)
need_more_data = false;
++in.position();
}
}
eofWithSavingBufferState(in, memory, used_size, begin_pos, true);
return true;
});
factory.registerFileSegmentationEngine(name, &fileSegmentationEngineTabSeparatedImpl);
}
}