#include "ODBCBlockInputStream.h" #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH; extern const int UNKNOWN_TYPE; } ODBCBlockInputStream::ODBCBlockInputStream( Poco::Data::Session && session_, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_) : session{session_} , statement{(this->session << query_str, Poco::Data::Keywords::now)} , result{statement} , iterator{result.begin()} , max_block_size{max_block_size_} , log(&Poco::Logger::get("ODBCBlockInputStream")) { if (sample_block.columns() != result.columnCount()) throw Exception{"RecordSet contains " + toString(result.columnCount()) + " columns while " + toString(sample_block.columns()) + " expected", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH}; description.init(sample_block); } namespace { using ValueType = ExternalResultDescription::ValueType; void insertValue(IColumn & column, const ValueType type, const Poco::Dynamic::Var & value) { switch (type) { case ValueType::vtUInt8: assert_cast(column).insertValue(value.convert()); break; case ValueType::vtUInt16: assert_cast(column).insertValue(value.convert()); break; case ValueType::vtUInt32: assert_cast(column).insertValue(value.convert()); break; case ValueType::vtUInt64: assert_cast(column).insertValue(value.convert()); break; case ValueType::vtInt8: assert_cast(column).insertValue(value.convert()); break; case ValueType::vtInt16: assert_cast(column).insertValue(value.convert()); break; case ValueType::vtInt32: assert_cast(column).insertValue(value.convert()); break; case ValueType::vtInt64: assert_cast(column).insertValue(value.convert()); break; case ValueType::vtFloat32: assert_cast(column).insertValue(value.convert()); break; case ValueType::vtFloat64: assert_cast(column).insertValue(value.convert()); break; case ValueType::vtString: assert_cast(column).insert(value.convert()); break; case ValueType::vtDate: { Poco::DateTime date = value.convert(); assert_cast(column).insertValue(UInt16{LocalDate(date.year(), date.month(), date.day()).getDayNum()}); break; } case ValueType::vtDateTime: { Poco::DateTime datetime = value.convert(); assert_cast(column).insertValue(DateLUT::instance().makeDateTime( datetime.year(), datetime.month(), datetime.day(), datetime.hour(), datetime.minute(), datetime.second())); break; } case ValueType::vtUUID: assert_cast(column).insert(parse(value.convert())); break; default: throw Exception("Unsupported value type", ErrorCodes::UNKNOWN_TYPE); } } void insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); } } Block ODBCBlockInputStream::readImpl() { if (iterator == result.end()) return {}; MutableColumns columns(description.sample_block.columns()); for (const auto i : ext::range(0, columns.size())) columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty(); size_t num_rows = 0; while (iterator != result.end()) { Poco::Data::Row & row = *iterator; for (const auto idx : ext::range(0, row.fieldCount())) { /// TODO This is extremely slow. const Poco::Dynamic::Var & value = row[idx]; if (!value.isEmpty()) { if (description.types[idx].second) { ColumnNullable & column_nullable = assert_cast(*columns[idx]); insertValue(column_nullable.getNestedColumn(), description.types[idx].first, value); column_nullable.getNullMapData().emplace_back(0); } else insertValue(*columns[idx], description.types[idx].first, value); } else insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column); } ++iterator; ++num_rows; if (num_rows == max_block_size) break; } return description.sample_block.cloneWithColumns(std::move(columns)); } }