Unification of serde of data types: development [#CLICKHOUSE-2838].

This commit is contained in:
Alexey Milovidov 2017-11-21 05:23:41 +03:00
parent 0d8d16b2fc
commit 63f79b7b4f
5 changed files with 41 additions and 24 deletions

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit bcf9ebad48b2162d25f5fc432b176d74a09f498d
Subproject commit 4746a846952808f220595602f67831516822dc13

View File

@ -108,12 +108,12 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
rows_to_read = std::min(rows_to_read, rows_to_read_for_max_size_column_with_filtration);
}
size_t unread_rows_in_current_granule = reader.unreadRowsInCurrentGranule();
size_t unread_rows_in_current_granule = reader.numPendingRowsInCurrentGranule();
if (unread_rows_in_current_granule >= rows_to_read)
return rows_to_read;
size_t granule_to_read = (rows_to_read + reader.readRowsInCurrentGranule() + index_granularity / 2) / index_granularity;
return index_granularity * granule_to_read - reader.readRowsInCurrentGranule();
size_t granule_to_read = (rows_to_read + reader.numReadRowsInCurrentGranule() + index_granularity / 2) / index_granularity;
return index_granularity * granule_to_read - reader.numReadRowsInCurrentGranule();
};
// read rows from reader and clear columns
@ -196,7 +196,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
if (!pre_range_reader)
processNextRange(*task, *pre_reader);
size_t rows_to_read = std::min(pre_range_reader->unreadRows(), space_left);
size_t rows_to_read = std::min(pre_range_reader->numPendingRows(), space_left);
size_t read_rows = pre_range_reader->read(res, rows_to_read);
rows_was_read_in_last_range += read_rows;
if (pre_range_reader->isReadingFinished())
@ -263,7 +263,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
if (task->number_of_rows_to_skip)
skipRows(res, *task->current_range_reader, *task, task->number_of_rows_to_skip);
size_t rows_to_read = ranges_to_read.empty()
? rows_was_read_in_last_range : task->current_range_reader->unreadRows();
? rows_was_read_in_last_range : task->current_range_reader->numPendingRows();
task->current_range_reader->read(res, rows_to_read);
}
@ -272,7 +272,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
const auto & range = ranges_to_read[range_idx];
task->current_range_reader = reader->readRange(range.begin, range.end);
size_t rows_to_read = range_idx + 1 == ranges_to_read.size()
? rows_was_read_in_last_range : task->current_range_reader->unreadRows();
? rows_was_read_in_last_range : task->current_range_reader->numPendingRows();
task->current_range_reader->read(res, rows_to_read);
}
@ -310,7 +310,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
/// Now we need to read the same number of rows as in prewhere.
size_t rows_to_read = next_range_idx == ranges_to_read.size()
? rows_was_read_in_last_range : (task->current_range_reader->unreadRows() - number_of_rows_to_skip);
? rows_was_read_in_last_range : (task->current_range_reader->numPendingRows() - number_of_rows_to_skip);
auto readRows = [&]()
{
@ -338,7 +338,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
{
auto rows_should_be_copied = pre_filter_pos - pre_filter_begin_pos;
auto range_reader_with_skipped_rows = range_reader.getFutureState(number_of_rows_to_skip + rows_should_be_copied);
auto unread_rows_in_current_granule = range_reader_with_skipped_rows.unreadRowsInCurrentGranule();
auto unread_rows_in_current_granule = range_reader_with_skipped_rows.numPendingRowsInCurrentGranule();
const size_t limit = std::min(pre_filter.size(), pre_filter_pos + unread_rows_in_current_granule);
bool will_read_until_mark = unread_rows_in_current_granule == limit - pre_filter_pos;
@ -464,6 +464,8 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
}
space_left -= rows_was_read;
std::cerr << "rows_was_read: " << rows_was_read << ", space_left: " << space_left << "\n";
}
/// In the case of isCancelled.

View File

@ -6,13 +6,13 @@ namespace DB
MergeTreeRangeReader::MergeTreeRangeReader(
MergeTreeReader & merge_tree_reader, size_t from_mark, size_t to_mark, size_t index_granularity)
: merge_tree_reader(merge_tree_reader), current_mark(from_mark), last_mark(to_mark)
, read_rows_after_current_mark(0), index_granularity(index_granularity), continue_reading(false), is_reading_finished(false)
, index_granularity(index_granularity)
{
}
size_t MergeTreeRangeReader::skipToNextMark()
{
auto unread_rows_in_current_part = unreadRowsInCurrentGranule();
auto unread_rows_in_current_part = numPendingRowsInCurrentGranule();
continue_reading = false;
++current_mark;
if (current_mark == last_mark)
@ -33,13 +33,19 @@ MergeTreeRangeReader MergeTreeRangeReader::getFutureState(size_t rows_to_read) c
size_t MergeTreeRangeReader::read(Block & res, size_t max_rows_to_read)
{
size_t rows_to_read = unreadRows();
size_t rows_to_read = numPendingRows();
std::cerr << "rows_to_read: " << rows_to_read << "\n";
rows_to_read = std::min(rows_to_read, max_rows_to_read);
if (rows_to_read == 0)
{
throw Exception("rows_to_read: 0;");
return 0;
}
auto read_rows = merge_tree_reader.get().readRows(current_mark, continue_reading, rows_to_read, res);
std::cerr << "read_rows: " << read_rows << "\n";
if (read_rows && read_rows < rows_to_read)
is_reading_finished = true;

View File

@ -13,10 +13,10 @@ class MergeTreeReader;
class MergeTreeRangeReader
{
public:
size_t unreadRows() const { return (last_mark - current_mark) * index_granularity - read_rows_after_current_mark; }
size_t unreadRowsInCurrentGranule() const { return index_granularity - read_rows_after_current_mark; }
size_t numPendingRows() const { return (last_mark - current_mark) * index_granularity - read_rows_after_current_mark; }
size_t numPendingRowsInCurrentGranule() const { return index_granularity - read_rows_after_current_mark; }
size_t readRowsInCurrentGranule() const { return read_rows_after_current_mark; }
size_t numReadRowsInCurrentGranule() const { return read_rows_after_current_mark; }
/// Seek to next mark before next reading.
size_t skipToNextMark();
@ -41,10 +41,10 @@ private:
std::reference_wrapper<MergeTreeReader> merge_tree_reader;
size_t current_mark;
size_t last_mark;
size_t read_rows_after_current_mark;
size_t read_rows_after_current_mark = 0;
size_t index_granularity;
bool continue_reading;
bool is_reading_finished;
bool continue_reading = false;
bool is_reading_finished = false;
friend class MergeTreeReader;
};

View File

@ -136,8 +136,13 @@ size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t
try
{
size_t column_size_before_reading = column.column->size();
readData(column.name, *column.type, *column.column, from_mark, continue_reading, max_rows_to_read, read_offsets);
read_rows = std::max(read_rows, column.column->size() - column_size_before_reading);
/// For elements of Nested, column_size_before_reading may be greater than column size
/// if offsets are not empty and were already read, but elements are empty.
if (column.column->size())
read_rows = std::max(read_rows, column.column->size() - column_size_before_reading);
}
catch (Exception & e)
{
@ -147,9 +152,16 @@ size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t
}
if (!append && column.column->size())
{
std::cerr << "Inserting " << column.name << "\n";
res.insert(std::move(column));
}
else
std::cerr << "Not inserting " << column.name << "\n";
}
std::cerr << res.dumpStructure() << "\n";
/// NOTE: positions for all streams must be kept in sync. In particular, even if for some streams there are no rows to be read,
/// you must ensure that no seeks are skipped and at this point they all point to to_mark.
}
@ -364,16 +376,13 @@ void MergeTreeReader::addStreams(const String & name, const IDataType & type, co
return;
bool data_file_exists = Poco::File(path + stream_name + DATA_FILE_EXTENSION).exists();
bool is_sizes_of_nested_type = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes
&& DataTypeNested::extractNestedTableName(name) != name;
std::cerr << "File exists: " << data_file_exists << ", is_sizes_of_nested_type: " << is_sizes_of_nested_type << "\n";
std::cerr << "File exists: " << data_file_exists << "\n";
/** If data file is missing then we will not try to open it.
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
* But we should try to load offset data for array columns of Nested subtable (their data will be filled by default value).
*/
if (!data_file_exists && !is_sizes_of_nested_type)
if (!data_file_exists)
return;
streams.emplace(stream_name, std::make_unique<Stream>(