Unification of serde of data types: development [#CLICKHOUSE-2838].

This commit is contained in:
Alexey Milovidov 2017-11-28 05:13:46 +03:00
parent 71378d329c
commit 36d4f89c7a
2 changed files with 22 additions and 11 deletions

View File

@ -272,13 +272,18 @@ void LogBlockInputStream::readData(const String & name, const IDataType & type,
String stream_name = IDataType::getFileNameForStream(name, path); String stream_name = IDataType::getFileNameForStream(name, path);
const auto & file_it = storage.files.find(stream_name);
if (storage.files.end() == file_it)
throw Exception("Logical error: no information about file " + stream_name + " in StorageLog", ErrorCodes::LOGICAL_ERROR);
std::cerr << "Stream: " << stream_name << "\n"; std::cerr << "Stream: " << stream_name << "\n";
std::cerr << "Offset: " << storage.files[stream_name].marks[mark_number].offset << "\n"; std::cerr << "Mark number: " << mark_number << "\n";
std::cerr << "Offset: " << file_it->second.marks[mark_number].offset << "\n";
auto it = streams.try_emplace(stream_name, auto it = streams.try_emplace(stream_name,
storage.files[stream_name].data_file.path(), file_it->second.data_file.path(),
mark_number mark_number
? storage.files[stream_name].marks[mark_number].offset ? file_it->second.marks[mark_number].offset
: 0, : 0,
max_read_buffer_size).first; max_read_buffer_size).first;
@ -339,13 +344,15 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
type.enumerateStreams([&] (const IDataType::SubstreamPath & path) type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
{ {
String stream_name = IDataType::getFileNameForStream(name, path); String stream_name = IDataType::getFileNameForStream(name, path);
if (written_streams.count(stream_name))
return;
const auto & file = storage.files[stream_name]; const auto & file = storage.files[stream_name];
const auto stream_it = streams.find(stream_name); const auto stream_it = streams.try_emplace(stream_name, storage.files[stream_name].data_file.path(), storage.max_compress_block_size).first;
Mark mark; Mark mark;
mark.rows = (file.marks.empty() ? 0 : file.marks.back().rows) + column.size(); mark.rows = (file.marks.empty() ? 0 : file.marks.back().rows) + column.size();
mark.offset = stream_it != streams.end() ? stream_it->second.plain_offset + stream_it->second.plain.count() : 0; mark.offset = stream_it->second.plain_offset + stream_it->second.plain.count();
out_marks.emplace_back(file.column_index, mark); out_marks.emplace_back(file.column_index, mark);
}, {}); }, {});
@ -353,12 +360,13 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
IDataType::OutputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & path) -> WriteBuffer * IDataType::OutputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & path) -> WriteBuffer *
{ {
String stream_name = IDataType::getFileNameForStream(name, path); String stream_name = IDataType::getFileNameForStream(name, path);
if (written_streams.count(stream_name))
auto it_inserted = streams.try_emplace(stream_name, storage.files[stream_name].data_file.path(), storage.max_compress_block_size);
if (!it_inserted.second)
return nullptr; return nullptr;
return &it_inserted.first->second.compressed; auto it = streams.find(stream_name);
if (streams.end() == it)
throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream", ErrorCodes::LOGICAL_ERROR);
return &it->second.compressed;
}; };
type.serializeBinaryBulkWithMultipleStreams(column, stream_getter, 0, 0, true, {}); type.serializeBinaryBulkWithMultipleStreams(column, stream_getter, 0, 0, true, {});
@ -366,6 +374,9 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
type.enumerateStreams([&] (const IDataType::SubstreamPath & path) type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
{ {
String stream_name = IDataType::getFileNameForStream(name, path); String stream_name = IDataType::getFileNameForStream(name, path);
if (!written_streams.emplace(stream_name).second)
return;
auto it = streams.find(stream_name); auto it = streams.find(stream_name);
if (streams.end() == it) if (streams.end() == it)
throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream", ErrorCodes::LOGICAL_ERROR);

View File

@ -68,7 +68,7 @@ private:
*/ */
struct Mark struct Mark
{ {
size_t rows; /// How many rows are before this offset. size_t rows; /// How many rows are before this offset including the block at this offset.
size_t offset; /// The offset in compressed file. size_t offset; /// The offset in compressed file.
}; };
@ -95,7 +95,7 @@ private:
/// The order of adding files should not change: it corresponds to the order of the columns in the marks file. /// The order of adding files should not change: it corresponds to the order of the columns in the marks file.
void addFiles(const String & column_name, const IDataType & type); void addFiles(const String & column_name, const IDataType & type);
bool loaded_marks; bool loaded_marks = false;
size_t max_compress_block_size; size_t max_compress_block_size;
size_t file_count = 0; size_t file_count = 0;