mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
ISSUES-16835 try fix review comment
This commit is contained in:
parent
972cb41fae
commit
2f735aa61d
@ -22,4 +22,12 @@ ResultBase::~ResultBase()
|
|||||||
mysql_free_result(res);
|
mysql_free_result(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string ResultBase::getFieldName(size_t n) const
|
||||||
|
{
|
||||||
|
if (num_fields <= n)
|
||||||
|
throw Exception(std::string("Unknown column position ") + std::to_string(n));
|
||||||
|
|
||||||
|
return fields[n].name;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,8 @@ public:
|
|||||||
MYSQL_RES * getRes() { return res; }
|
MYSQL_RES * getRes() { return res; }
|
||||||
const Query * getQuery() const { return query; }
|
const Query * getQuery() const { return query; }
|
||||||
|
|
||||||
|
std::string getFieldName(size_t n) const;
|
||||||
|
|
||||||
virtual ~ResultBase();
|
virtual ~ResultBase();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
@ -21,12 +21,4 @@ Value Row::operator[] (const char * name) const
|
|||||||
throw Exception(std::string("Unknown column ") + name);
|
throw Exception(std::string("Unknown column ") + name);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string Row::getFieldName(size_t n) const
|
|
||||||
{
|
|
||||||
if (res->getNumFields() <= n)
|
|
||||||
throw Exception(std::string("Unknown column position ") + std::to_string(n));
|
|
||||||
|
|
||||||
return res->getFields()[n].name;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -79,8 +79,6 @@ public:
|
|||||||
*/
|
*/
|
||||||
operator private_bool_type() const { return row == nullptr ? nullptr : &Row::row; }
|
operator private_bool_type() const { return row == nullptr ? nullptr : &Row::row; }
|
||||||
|
|
||||||
std::string getFieldName(size_t n) const;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
MYSQL_ROW row{};
|
MYSQL_ROW row{};
|
||||||
ResultBase * res{};
|
ResultBase * res{};
|
||||||
|
@ -45,12 +45,8 @@ MySQLBlockInputStream::MySQLBlockInputStream(
|
|||||||
, auto_close{auto_close_}
|
, auto_close{auto_close_}
|
||||||
, fetch_by_name(fetch_by_name_)
|
, fetch_by_name(fetch_by_name_)
|
||||||
{
|
{
|
||||||
if (!fetch_by_name && sample_block.columns() != connection->result.getNumFields())
|
|
||||||
throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while "
|
|
||||||
+ toString(sample_block.columns()) + " expected",
|
|
||||||
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
|
|
||||||
|
|
||||||
description.init(sample_block);
|
description.init(sample_block);
|
||||||
|
initPositionMappingFromQueryResultStructure();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -135,47 +131,6 @@ Block MySQLBlockInputStream::readImpl()
|
|||||||
for (const auto i : ext::range(0, columns.size()))
|
for (const auto i : ext::range(0, columns.size()))
|
||||||
columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty();
|
columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty();
|
||||||
|
|
||||||
if (unlikely(position_mapping.size() != description.sample_block.columns()))
|
|
||||||
{
|
|
||||||
position_mapping.resize(description.sample_block.columns());
|
|
||||||
|
|
||||||
if (!fetch_by_name)
|
|
||||||
{
|
|
||||||
for (const auto idx : ext::range(0, row.size()))
|
|
||||||
position_mapping[idx] = idx;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
const auto & sample_names = description.sample_block.getNames();
|
|
||||||
std::unordered_set<std::string> missing_names(sample_names.begin(), sample_names.end());
|
|
||||||
|
|
||||||
for (const auto idx : ext::range(0, row.size()))
|
|
||||||
{
|
|
||||||
const auto & field_name = row.getFieldName(idx);
|
|
||||||
if (description.sample_block.has(field_name))
|
|
||||||
{
|
|
||||||
const auto & position = description.sample_block.getPositionByName(field_name);
|
|
||||||
position_mapping[position] = idx;
|
|
||||||
missing_names.erase(field_name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!missing_names.empty())
|
|
||||||
{
|
|
||||||
WriteBufferFromOwnString exception_message;
|
|
||||||
for (auto iter = missing_names.begin(); iter != missing_names.end(); ++iter)
|
|
||||||
{
|
|
||||||
if (iter != missing_names.begin())
|
|
||||||
exception_message << ", ";
|
|
||||||
exception_message << *iter;
|
|
||||||
}
|
|
||||||
|
|
||||||
throw Exception("mysqlxx::UseQueryResult must be contain the" + exception_message.str() + " columns.",
|
|
||||||
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t num_rows = 0;
|
size_t num_rows = 0;
|
||||||
while (row)
|
while (row)
|
||||||
{
|
{
|
||||||
@ -221,6 +176,53 @@ MySQLBlockInputStream::MySQLBlockInputStream(
|
|||||||
description.init(sample_block_);
|
description.init(sample_block_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void MySQLBlockInputStream::initPositionMappingFromQueryResultStructure()
|
||||||
|
{
|
||||||
|
position_mapping.resize(description.sample_block.columns());
|
||||||
|
|
||||||
|
if (!fetch_by_name)
|
||||||
|
{
|
||||||
|
if (description.sample_block.columns() != connection->result.getNumFields())
|
||||||
|
throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while "
|
||||||
|
+ toString(description.sample_block.columns()) + " expected", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
|
||||||
|
|
||||||
|
for (const auto idx : ext::range(0, connection->result.getNumFields()))
|
||||||
|
position_mapping[idx] = idx;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
const auto & sample_names = description.sample_block.getNames();
|
||||||
|
std::unordered_set<std::string> missing_names(sample_names.begin(), sample_names.end());
|
||||||
|
|
||||||
|
size_t fields_size = connection->result.getNumFields();
|
||||||
|
|
||||||
|
for (const size_t & idx : ext::range(0, fields_size))
|
||||||
|
{
|
||||||
|
const auto & field_name = connection->result.getFieldName(idx);
|
||||||
|
if (description.sample_block.has(field_name))
|
||||||
|
{
|
||||||
|
const auto & position = description.sample_block.getPositionByName(field_name);
|
||||||
|
position_mapping[position] = idx;
|
||||||
|
missing_names.erase(field_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!missing_names.empty())
|
||||||
|
{
|
||||||
|
WriteBufferFromOwnString exception_message;
|
||||||
|
for (auto iter = missing_names.begin(); iter != missing_names.end(); ++iter)
|
||||||
|
{
|
||||||
|
if (iter != missing_names.begin())
|
||||||
|
exception_message << ", ";
|
||||||
|
exception_message << *iter;
|
||||||
|
}
|
||||||
|
|
||||||
|
throw Exception("mysqlxx::UseQueryResult must be contain the" + exception_message.str() + " columns.",
|
||||||
|
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
MySQLLazyBlockInputStream::MySQLLazyBlockInputStream(
|
MySQLLazyBlockInputStream::MySQLLazyBlockInputStream(
|
||||||
mysqlxx::Pool & pool_,
|
mysqlxx::Pool & pool_,
|
||||||
const std::string & query_str_,
|
const std::string & query_str_,
|
||||||
@ -237,10 +239,7 @@ MySQLLazyBlockInputStream::MySQLLazyBlockInputStream(
|
|||||||
void MySQLLazyBlockInputStream::readPrefix()
|
void MySQLLazyBlockInputStream::readPrefix()
|
||||||
{
|
{
|
||||||
connection = std::make_unique<Connection>(pool.get(), query_str);
|
connection = std::make_unique<Connection>(pool.get(), query_str);
|
||||||
if (!fetch_by_name && description.sample_block.columns() != connection->result.getNumFields())
|
initPositionMappingFromQueryResultStructure();
|
||||||
throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while "
|
|
||||||
+ toString(description.sample_block.columns()) + " expected",
|
|
||||||
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,7 @@ public:
|
|||||||
protected:
|
protected:
|
||||||
MySQLBlockInputStream(const Block & sample_block_, UInt64 max_block_size_, bool auto_close_, bool fetch_by_name_);
|
MySQLBlockInputStream(const Block & sample_block_, UInt64 max_block_size_, bool auto_close_, bool fetch_by_name_);
|
||||||
Block readImpl() override;
|
Block readImpl() override;
|
||||||
|
void initPositionMappingFromQueryResultStructure();
|
||||||
|
|
||||||
struct Connection
|
struct Connection
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user