#include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int INCORRECT_QUERY; } MergeTreeMinMaxGranule::MergeTreeMinMaxGranule(const MergeTreeMinMaxIndex & index) : MergeTreeIndexGranule(), index(index), parallelogram() { } void MergeTreeMinMaxGranule::serializeBinary(WriteBuffer & ostr) const { if (empty()) throw Exception( "Attempt to write empty minmax index `" + index.name + "`", ErrorCodes::LOGICAL_ERROR); Poco::Logger * log = &Poco::Logger::get("minmax_idx"); LOG_DEBUG(log, "serializeBinary Granule"); for (size_t i = 0; i < index.columns.size(); ++i) { const DataTypePtr & type = index.data_types[i]; LOG_DEBUG(log, "parallel " << i << " :: " << applyVisitor(FieldVisitorToString(), parallelogram[i].left) << " " << applyVisitor(FieldVisitorToString(), parallelogram[i].right)); type->serializeBinary(parallelogram[i].left, ostr); type->serializeBinary(parallelogram[i].right, ostr); } } void MergeTreeMinMaxGranule::deserializeBinary(ReadBuffer & istr) { Poco::Logger * log = &Poco::Logger::get("minmax_idx"); LOG_DEBUG(log, "deserializeBinary Granule"); parallelogram.clear(); for (size_t i = 0; i < index.columns.size(); ++i) { const DataTypePtr & type = index.data_types[i]; Field min_val; type->deserializeBinary(min_val, istr); Field max_val; type->deserializeBinary(max_val, istr); LOG_DEBUG(log, "parallel " << i << " :: " << applyVisitor(FieldVisitorToString(), min_val) << " " << applyVisitor(FieldVisitorToString(), max_val)); parallelogram.emplace_back(min_val, true, max_val, true); } } String MergeTreeMinMaxGranule::toString() const { String res = "minmax granule: "; for (size_t i = 0; i < parallelogram.size(); ++i) { res += "[" + applyVisitor(FieldVisitorToString(), parallelogram[i].left) + ", " + applyVisitor(FieldVisitorToString(), parallelogram[i].right) + "]"; } return res; } void MergeTreeMinMaxGranule::update(const Block & block, size_t * pos, size_t limit) { /// TODO: remove logs Poco::Logger * log = &Poco::Logger::get("minmax_idx"); LOG_DEBUG(log, "update Granule " << parallelogram.size() << " pos: "<< *pos << " limit: " << limit << " rows: " << block.rows()); size_t rows_read = std::min(limit, block.rows() - *pos); for (size_t i = 0; i < index.columns.size(); ++i) { LOG_DEBUG(log, "granule column: " << index.columns[i]); const auto & column = block.getByName(index.columns[i]).column; Field field_min, field_max; column->cut(*pos, rows_read)->getExtremes(field_min, field_max); if (parallelogram.size() <= i) { parallelogram.emplace_back(field_min, true, field_max, true); } else { parallelogram[i].left = std::min(parallelogram[i].left, field_min); parallelogram[i].right = std::max(parallelogram[i].right, field_max); } LOG_DEBUG(log, "res:: [" << applyVisitor(FieldVisitorToString(), parallelogram[i].left) << ", " << applyVisitor(FieldVisitorToString(), parallelogram[i].right) << "]"); } LOG_DEBUG(log, "updated rows_read: " << rows_read); *pos += rows_read; }; MinMaxCondition::MinMaxCondition( const SelectQueryInfo &query, const Context &context, const MergeTreeMinMaxIndex &index) : IndexCondition(), index(index), condition(query, context, index.columns, index.expr) {}; bool MinMaxCondition::alwaysUnknownOrTrue() const { return condition.alwaysUnknownOrTrue(); } bool MinMaxCondition::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const { std::shared_ptr granule = std::dynamic_pointer_cast(idx_granule); if (!granule) throw Exception( "Minmax index condition got wrong granule", ErrorCodes::LOGICAL_ERROR); return condition.mayBeTrueInParallelogram(granule->parallelogram, index.data_types); } MergeTreeIndexGranulePtr MergeTreeMinMaxIndex::createIndexGranule() const { return std::make_shared(*this); } IndexConditionPtr MergeTreeMinMaxIndex::createIndexCondition( const SelectQueryInfo & query, const Context & context) const { return std::make_shared(query, context, *this); }; std::unique_ptr MergeTreeMinMaxIndexCreator( const MergeTreeData & data, std::shared_ptr node, const Context & context) { if (node->name.empty()) throw Exception("Index must have unique name", ErrorCodes::INCORRECT_QUERY); if (node->type->arguments) throw Exception("Minmax index have not any arguments", ErrorCodes::INCORRECT_QUERY); ASTPtr expr_list = MergeTreeData::extractKeyExpressionList(node->expr->clone()); auto syntax = SyntaxAnalyzer(context, {}).analyze( expr_list, data.getColumns().getAllPhysical()); auto minmax_expr = ExpressionAnalyzer(expr_list, syntax, context).getActions(false); auto sample = ExpressionAnalyzer(expr_list, syntax, context) .getActions(true)->getSampleBlock(); Names columns; DataTypes data_types; Poco::Logger * log = &Poco::Logger::get("minmax_idx"); LOG_DEBUG(log, "new minmax index" << node->name); for (size_t i = 0; i < expr_list->children.size(); ++i) { const auto & column = sample.getByPosition(i); columns.emplace_back(column.name); data_types.emplace_back(column.type); LOG_DEBUG(log, ">" << column.name << " " << column.type->getName()); } return std::make_unique( node->name, std::move(minmax_expr), columns, data_types, node->granularity.get());; } }