#include #if USE_MYSQL #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH; } MySQLBlockInputStream::MySQLBlockInputStream( const mysqlxx::PoolWithFailover::Entry & entry, const std::string & query_str, const Block & sample_block, const size_t max_block_size) : entry{entry}, query{this->entry->query(query_str)}, result{query.use()}, max_block_size{max_block_size} { if (sample_block.columns() != result.getNumFields()) throw Exception{"mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while " + toString(sample_block.columns()) + " expected", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH}; description.init(sample_block); } namespace { using ValueType = ExternalResultDescription::ValueType; void insertValue(IColumn & column, const ValueType type, const mysqlxx::Value & value) { switch (type) { case ValueType::UInt8: static_cast(column).insert(value.getUInt()); break; case ValueType::UInt16: static_cast(column).insert(value.getUInt()); break; case ValueType::UInt32: static_cast(column).insert(value.getUInt()); break; case ValueType::UInt64: static_cast(column).insert(value.getUInt()); break; case ValueType::Int8: static_cast(column).insert(value.getInt()); break; case ValueType::Int16: static_cast(column).insert(value.getInt()); break; case ValueType::Int32: static_cast(column).insert(value.getInt()); break; case ValueType::Int64: static_cast(column).insert(value.getInt()); break; case ValueType::Float32: static_cast(column).insert(value.getDouble()); break; case ValueType::Float64: static_cast(column).insert(value.getDouble()); break; case ValueType::String: static_cast(column).insertData(value.data(), value.size()); break; case ValueType::Date: static_cast(column).insert(UInt16{value.getDate().getDayNum()}); break; case ValueType::DateTime: static_cast(column).insert(time_t{value.getDateTime()}); break; } } void insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); } } Block MySQLBlockInputStream::readImpl() { auto row = result.fetch(); if (!row) return {}; MutableColumns columns(description.sample_block.columns()); for (const auto i : ext::range(0, columns.size())) columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty(); size_t num_rows = 0; while (row) { for (const auto idx : ext::range(0, row.size())) { const auto value = row[idx]; if (!value.isNull()) insertValue(*columns[idx], description.types[idx], value); else insertDefaultValue(*columns[idx], *description.sample_columns[idx]); } ++num_rows; if (num_rows == max_block_size) break; row = result.fetch(); } return description.sample_block.cloneWithColumns(std::move(columns)); } } #endif