minor changes

This commit is contained in:
Anton Popov 2021-01-13 02:20:32 +03:00
parent 91dc347ff3
commit d7200ee2ed
12 changed files with 19 additions and 38 deletions

View File

@ -584,9 +584,8 @@ void DataTypeLowCardinality::deserializeBinaryBulkWithMultipleStreamsImpl(
size_t limit, size_t limit,
DeserializeBinaryBulkSettings & settings, DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state, DeserializeBinaryBulkStatePtr & state,
SubstreamsCache * cache) const SubstreamsCache * /* cache */) const
{ {
UNUSED(cache);
ColumnLowCardinality & low_cardinality_column = typeid_cast<ColumnLowCardinality &>(column); ColumnLowCardinality & low_cardinality_column = typeid_cast<ColumnLowCardinality &>(column);
settings.path.push_back(Substream::DictionaryKeys); settings.path.push_back(Substream::DictionaryKeys);

View File

@ -541,12 +541,12 @@ ColumnsDescription ColumnsDescription::parse(const String & str)
return result; return result;
} }
void ColumnsDescription::addSubcolumns(const String & storage_name, const DataTypePtr & storage_type) void ColumnsDescription::addSubcolumns(const String & name_in_storage, const DataTypePtr & type_in_storage)
{ {
for (const auto & subcolumn_name : storage_type->getSubcolumnNames()) for (const auto & subcolumn_name : type_in_storage->getSubcolumnNames())
{ {
auto subcolumn = NameAndTypePair(storage_name, subcolumn_name, auto subcolumn = NameAndTypePair(name_in_storage, subcolumn_name,
storage_type, storage_type->getSubcolumnType(subcolumn_name)); type_in_storage, type_in_storage->getSubcolumnType(subcolumn_name));
if (has(subcolumn.name)) if (has(subcolumn.name))
throw Exception(ErrorCodes::ILLEGAL_COLUMN, throw Exception(ErrorCodes::ILLEGAL_COLUMN,
@ -556,10 +556,10 @@ void ColumnsDescription::addSubcolumns(const String & storage_name, const DataTy
} }
} }
void ColumnsDescription::removeSubcolumns(const String & storage_name, const DataTypePtr & storage_type) void ColumnsDescription::removeSubcolumns(const String & name_in_storage, const DataTypePtr & type_in_storage)
{ {
for (const auto & subcolumn_name : storage_type->getSubcolumnNames()) for (const auto & subcolumn_name : type_in_storage->getSubcolumnNames())
subcolumns.erase(storage_name + "." + subcolumn_name); subcolumns.erase(name_in_storage + "." + subcolumn_name);
} }
Block validateColumnsDefaultsAndGetSampleBlock(ASTPtr default_expr_list, const NamesAndTypesList & all_columns, const Context & context) Block validateColumnsDefaultsAndGetSampleBlock(ASTPtr default_expr_list, const NamesAndTypesList & all_columns, const Context & context)

View File

@ -149,8 +149,8 @@ private:
SubcolumnsContainer subcolumns; SubcolumnsContainer subcolumns;
void modifyColumnOrder(const String & column_name, const String & after_column, bool first); void modifyColumnOrder(const String & column_name, const String & after_column, bool first);
void addSubcolumns(const String & storage_name, const DataTypePtr & storage_type); void addSubcolumns(const String & name_in_storage, const DataTypePtr & type_in_storage);
void removeSubcolumns(const String & storage_name, const DataTypePtr & storage_type); void removeSubcolumns(const String & name_in_storage, const DataTypePtr & type_in_storage);
}; };
/// Validate default expressions and corresponding types compatibility, i.e. /// Validate default expressions and corresponding types compatibility, i.e.

View File

@ -193,11 +193,6 @@ std::optional<size_t> IMergeTreeDataPart::getColumnPosition(const String & colum
return it->second; return it->second;
} }
std::optional<size_t> IMergeTreeDataPart::getColumnPosition(const NameAndTypePair & column) const
{
return getColumnPosition(column.name);
}
DayNum IMergeTreeDataPart::getMinDate() const DayNum IMergeTreeDataPart::getMinDate() const
{ {
if (storage.minmax_idx_date_column_pos != -1 && minmax_idx.initialized) if (storage.minmax_idx_date_column_pos != -1 && minmax_idx.initialized)

View File

@ -142,7 +142,6 @@ public:
/// take place, you must take original name of column for this part from /// take place, you must take original name of column for this part from
/// storage and pass it to this method. /// storage and pass it to this method.
std::optional<size_t> getColumnPosition(const String & column_name) const; std::optional<size_t> getColumnPosition(const String & column_name) const;
std::optional<size_t> getColumnPosition(const NameAndTypePair & column) const;
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()). /// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column. /// If no checksums are present returns the name of the first physically existing column.

View File

@ -285,7 +285,7 @@ IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const St
{ {
if (typeid_cast<const DataTypeArray *>(part_column.type.get())) if (typeid_cast<const DataTypeArray *>(part_column.type.get()))
{ {
auto position = data_part->getColumnPosition(part_column); auto position = data_part->getColumnPosition(part_column.name);
if (position && Nested::extractTableName(part_column.name) == table_name) if (position && Nested::extractTableName(part_column.name) == table_name)
return position; return position;
} }

View File

@ -124,7 +124,7 @@ void MergeTreeDataPartCompact::loadIndexGranularity()
bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) const bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) const
{ {
if (!getColumnPosition(column)) if (!getColumnPosition(column.name))
return false; return false;
auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION); auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION);

View File

@ -42,7 +42,7 @@ public:
const MergeTreeIndexGranularity & computed_index_granularity) const override; const MergeTreeIndexGranularity & computed_index_granularity) const override;
bool isStoredOnDisk() const override { return false; } bool isStoredOnDisk() const override { return false; }
bool hasColumnFiles(const NameAndTypePair & column) const override { return !!getColumnPosition(column); } bool hasColumnFiles(const NameAndTypePair & column) const override { return !!getColumnPosition(column.name); }
String getFileNameForColumn(const NameAndTypePair & /* column */) const override { return ""; } String getFileNameForColumn(const NameAndTypePair & /* column */) const override { return ""; }
void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const override; void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const override;
void makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const override; void makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const override;

View File

@ -55,7 +55,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
{ {
auto column_from_part = getColumnFromPart(*name_and_type); auto column_from_part = getColumnFromPart(*name_and_type);
auto position = data_part->getColumnPosition(column_from_part); auto position = data_part->getColumnPosition(column_from_part.name);
if (!position && typeid_cast<const DataTypeArray *>(column_from_part.type.get())) if (!position && typeid_cast<const DataTypeArray *>(column_from_part.type.get()))
{ {
/// If array of Nested column is missing in part, /// If array of Nested column is missing in part,
@ -140,7 +140,6 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
while (read_rows < max_rows_to_read) while (read_rows < max_rows_to_read)
{ {
size_t rows_to_read = data_part->index_granularity.getMarkRows(from_mark); size_t rows_to_read = data_part->index_granularity.getMarkRows(from_mark);
std::unordered_map<String, IDataType::SubstreamsCache> caches;
auto name_and_type = columns.begin(); auto name_and_type = columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type) for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
@ -154,8 +153,7 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
auto & column = res_columns[pos]; auto & column = res_columns[pos];
size_t column_size_before_reading = column->size(); size_t column_size_before_reading = column->size();
readData(column_from_part, column, from_mark, *column_positions[pos], readData(column_from_part, column, from_mark, *column_positions[pos], rows_to_read, read_only_offsets[pos]);
rows_to_read, read_only_offsets[pos], caches[column_from_part.getNameInStorage()]);
size_t read_rows_in_column = column->size() - column_size_before_reading; size_t read_rows_in_column = column->size() - column_size_before_reading;
if (read_rows_in_column < rows_to_read) if (read_rows_in_column < rows_to_read)
@ -189,8 +187,7 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
void MergeTreeReaderCompact::readData( void MergeTreeReaderCompact::readData(
const NameAndTypePair & name_and_type, ColumnPtr & column, const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t from_mark, size_t column_position, size_t rows_to_read, size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets)
bool only_offsets, IDataType::SubstreamsCache & cache)
{ {
const auto & [name, type] = name_and_type; const auto & [name, type] = name_and_type;
@ -199,9 +196,6 @@ void MergeTreeReaderCompact::readData(
auto buffer_getter = [&](const IDataType::SubstreamPath & substream_path) -> ReadBuffer * auto buffer_getter = [&](const IDataType::SubstreamPath & substream_path) -> ReadBuffer *
{ {
// if (cache.count(IDataType::getSubcolumnNameForStream(substream_path)))
// return nullptr;
if (only_offsets && (substream_path.size() != 1 || substream_path[0].type != IDataType::Substream::ArraySizes)) if (only_offsets && (substream_path.size() != 1 || substream_path[0].type != IDataType::Substream::ArraySizes))
return nullptr; return nullptr;
@ -228,8 +222,6 @@ void MergeTreeReaderCompact::readData(
type->deserializeBinaryBulkWithMultipleStreams(column, rows_to_read, deserialize_settings, state); type->deserializeBinaryBulkWithMultipleStreams(column, rows_to_read, deserialize_settings, state);
} }
UNUSED(cache);
/// The buffer is left in inconsistent state after reading single offsets /// The buffer is left in inconsistent state after reading single offsets
if (only_offsets) if (only_offsets)
last_read_granule.reset(); last_read_granule.reset();

View File

@ -57,7 +57,7 @@ private:
void seekToMark(size_t row_index, size_t column_index); void seekToMark(size_t row_index, size_t column_index);
void readData(const NameAndTypePair & name_and_type, ColumnPtr & column, size_t from_mark, void readData(const NameAndTypePair & name_and_type, ColumnPtr & column, size_t from_mark,
size_t column_position, size_t rows_to_read, bool only_offsets, IDataType::SubstreamsCache & cache); size_t column_position, size_t rows_to_read, bool only_offsets);
/// Returns maximal value of granule size in compressed file from @mark_ranges. /// Returns maximal value of granule size in compressed file from @mark_ranges.
/// This value is used as size of read buffer. /// This value is used as size of read buffer.

View File

@ -174,7 +174,7 @@ void MergeTreeReaderWide::readData(
{ {
return [&, stream_for_prefix](const IDataType::SubstreamPath & substream_path) -> ReadBuffer * return [&, stream_for_prefix](const IDataType::SubstreamPath & substream_path) -> ReadBuffer *
{ {
/// If offsets for arrays have already been read. TODO /// If substream have already been read.
if (cache.count(IDataType::getSubcolumnNameForStream(substream_path))) if (cache.count(IDataType::getSubcolumnNameForStream(substream_path)))
return nullptr; return nullptr;

View File

@ -121,7 +121,7 @@ Chunk TinyLogSource::generate()
{ {
Block res; Block res;
if (is_finished || (!streams.empty() && streams.begin()->second->compressed.eof())) if (is_finished || file_sizes.empty() || (!streams.empty() && streams.begin()->second->compressed.eof()))
{ {
/** Close the files (before destroying the object). /** Close the files (before destroying the object).
* When many sources are created, but simultaneously reading only a few of them, * When many sources are created, but simultaneously reading only a few of them,
@ -132,10 +132,6 @@ Chunk TinyLogSource::generate()
return {}; return {};
} }
/// if there are no files in the folder, it means that the table is empty
if (storage.disk->isDirectoryEmpty(storage.table_path))
return {};
std::unordered_map<String, IDataType::SubstreamsCache> caches; std::unordered_map<String, IDataType::SubstreamsCache> caches;
for (const auto & name_type : columns) for (const auto & name_type : columns)
{ {