This commit is contained in:
nikitamikhaylov 2020-10-06 17:02:01 +03:00
parent a1010d708f
commit 4ff1be6e25
12 changed files with 70 additions and 11 deletions

View File

@ -216,7 +216,7 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
const Settings & settings = context.getSettingsRef();
bool parallel_formatting = settings.output_format_parallel_formatting;
if (parallel_formatting && name != "PrettyCompactMonoBlock")
if (parallel_formatting && getCreators(name).supports_parallel_formatting)
{
const auto & output_getter = getCreators(name).output_processor_creator;
@ -351,6 +351,16 @@ void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegm
target = std::move(file_segmentation_engine);
}
void FormatFactory::markOutputFormatSupportsParallelFormatting(const String & name)
{
auto & target = dict[name].supports_parallel_formatting;
if (target)
throw Exception("FormatFactory: Output format " + name + " is already marked as supporting parallel formatting.", ErrorCodes::LOGICAL_ERROR);
target = true;
}
FormatFactory & FormatFactory::instance()
{
static FormatFactory ret;

View File

@ -100,6 +100,7 @@ private:
InputProcessorCreator input_processor_creator;
OutputProcessorCreator output_processor_creator;
FileSegmentationEngine file_segmentation_engine;
bool supports_parallel_formatting{false};
};
using FormatsDictionary = std::unordered_map<String, Creators>;
@ -140,6 +141,8 @@ public:
void registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator);
void registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator);
void markOutputFormatSupportsParallelFormatting(const String & name);
const FormatsDictionary & getAllFormats() const
{
return dict;

View File

@ -176,7 +176,7 @@ private:
{
const size_t prev_size = memory.size();
memory.resize(2 * prev_size + 1);
Base::set(memory.data(), memory.size(), prev_size + 1);
Base::set(memory.data() + prev_size, memory.size() - prev_size, 0);
}
};

View File

@ -82,6 +82,7 @@ void registerOutputFormatProcessorCSV(FormatFactory & factory)
{
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, params, format_settings);
});
factory.markOutputFormatSupportsParallelFormatting(with_names ? "CSVWithNames" : "CSV");
}
}

View File

@ -108,6 +108,7 @@ void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, params, format_settings, false, false);
});
factory.markOutputFormatSupportsParallelFormatting("JSONCompactEachRow");
factory.registerOutputFormatProcessor("JSONCompactEachRowWithNamesAndTypes", [](
WriteBuffer &buf,
@ -117,6 +118,7 @@ void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, params, format_settings, true, false);
});
factory.markOutputFormatSupportsParallelFormatting("JSONCompactEachRowWithNamesAndTypes");
factory.registerOutputFormatProcessor("JSONCompactStringsEachRow", [](
WriteBuffer & buf,
@ -126,6 +128,7 @@ void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, params, format_settings, false, true);
});
factory.markOutputFormatSupportsParallelFormatting("JSONCompactStringsEachRow");
factory.registerOutputFormatProcessor("JSONCompactStringsEachRowWithNamesAndTypes", [](
WriteBuffer &buf,
@ -135,6 +138,7 @@ void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, params, format_settings, true, true);
});
factory.markOutputFormatSupportsParallelFormatting("JSONCompactStringsEachRowWithNamesAndTypes");
}

View File

@ -138,6 +138,7 @@ void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory)
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params,
settings);
});
factory.markOutputFormatSupportsParallelFormatting("JSONEachRow");
factory.registerOutputFormatProcessor("JSONStringsEachRow", [](
WriteBuffer & buf,
@ -150,6 +151,7 @@ void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory)
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params,
settings);
});
factory.markOutputFormatSupportsParallelFormatting("JSONStringEachRow");
}
}

View File

@ -6,6 +6,7 @@
#include <Formats/FormatFactory.h>
#include <deque>
#include <future>
#include <Common/setThreadName.h>
#include <atomic>
@ -82,7 +83,6 @@ protected:
}
private:
InternalFormatterCreator internal_formatter_creator;
enum ProcessingUnitStatus
@ -104,6 +104,7 @@ private:
void addChunk(Chunk chunk, ProcessingUnitType type)
{
// std::cout << "AddChunk of size " << chunk.getNumRows() << std::endl;
const auto current_unit_number = writer_unit_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
@ -119,7 +120,6 @@ private:
unit.chunk = std::move(chunk);
/// Resize memory without deallocate
unit.segment.resize(0);
unit.status = READY_TO_FORMAT;
unit.type = type;
@ -143,6 +143,8 @@ private:
};
std::promise<bool> finalizator{};
std::atomic_bool need_flush{false};
// There are multiple "formatters", that's why we use thread pool.
@ -165,6 +167,9 @@ private:
void finishAndWait()
{
std::future<bool> future_finalizator = finalizator.get_future();
future_finalizator.get();
formatting_finished = true;
{
@ -230,6 +235,8 @@ private:
{
const auto current_unit_number = collector_unit_number % processing_units.size();
// std::cout << "collecting " << current_unit_number << std::endl;
auto & unit = processing_units[current_unit_number];
{
@ -238,13 +245,11 @@ private:
[&]{ return unit.status == READY_TO_READ; });
}
if (unit.type == ProcessingUnitType::TOTALS) {
}
assert(unit.status == READY_TO_READ);
assert(unit.segment.size() > 0);
auto copy_if_unit_type = unit.type;
/// Do main work here.
out.write(unit.segment.data(), unit.actual_memory_size);
@ -259,9 +264,13 @@ private:
writer_condvar.notify_all();
}
if (unit.type == ProcessingUnitType::FINALIZE)
if (copy_if_unit_type == ProcessingUnitType::FINALIZE)
{
finalizator.set_value(true);
break;
}
}
}
catch (...)
{
@ -280,9 +289,8 @@ private:
assert(unit.status = READY_TO_FORMAT);
unit.segment.resize(DBMS_DEFAULT_BUFFER_SIZE);
unit.segment.resize(1);
/// TODO: Implement proper nextImpl
BufferWithOutsideMemory<WriteBuffer> out_buffer(unit.segment);
auto formatter = internal_formatter_creator(out_buffer);

View File

@ -49,6 +49,7 @@ void registerOutputFormatProcessorTSKV(FormatFactory & factory)
{
return std::make_shared<TSKVRowOutputFormat>(buf, sample, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting("TSKV");
}
}

View File

@ -85,6 +85,7 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
{
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, false, false, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting(name);
}
for (const auto * name : {"TabSeparatedRaw", "TSVRaw"})
@ -97,6 +98,7 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
{
return std::make_shared<TabSeparatedRawRowOutputFormat>(buf, sample, false, false, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting(name);
}
for (const auto * name : {"TabSeparatedWithNames", "TSVWithNames"})
@ -109,6 +111,7 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
{
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, false, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting(name);
}
for (const auto * name : {"TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes"})
@ -121,6 +124,7 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
{
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, true, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting(name);
}
}

View File

@ -51,6 +51,7 @@ void registerOutputFormatProcessorValues(FormatFactory & factory)
{
return std::make_shared<ValuesRowOutputFormat>(buf, sample, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting("Values");
}
}

View File

@ -0,0 +1,6 @@
10000000
10000000
20000000
20000000
30000000
30000000

View File

@ -0,0 +1,19 @@
drop table if exists tsv;
create table tsv(a int, b int default 7) engine File(TSV);
insert into tsv(a) select number from numbers(10000000);
select '10000000';
select count() from tsv;
insert into tsv(a) select number from numbers(10000000);
select '20000000';
select count() from tsv;
insert into tsv(a) select number from numbers(10000000);
select '30000000';
select count() from tsv;
drop table tsv;