ClickHouse/programs/odbc-bridge/ODBCBlockOutputStream.cpp

134 lines
5.2 KiB
C++
Raw Normal View History

2020-04-28 00:56:44 +00:00
#include "ODBCBlockOutputStream.h"
#include <common/logger_useful.h>
#include <Core/Field.h>
#include <common/LocalDate.h>
2020-05-05 23:42:44 +00:00
#include <common/LocalDateTime.h>
2020-05-14 21:51:07 +00:00
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIdentifier.h>
#include "getIdentifierQuote.h"
2020-04-28 00:56:44 +00:00
namespace DB
{
2020-05-05 23:42:44 +00:00
namespace
{
2020-04-28 00:56:44 +00:00
using ValueType = ExternalResultDescription::ValueType;
2020-05-14 21:51:07 +00:00
std::string getInsertQuery(const std::string & db_name, const std::string & table_name, const ColumnsWithTypeAndName & columns, IdentifierQuotingStyle quoting)
2020-04-28 00:56:44 +00:00
{
2020-05-14 21:51:07 +00:00
ASTInsertQuery query;
query.table_id.database_name = db_name;
query.table_id.table_name = table_name;
query.columns = std::make_shared<ASTExpressionList>(',');
query.children.push_back(query.columns);
2020-05-05 23:42:44 +00:00
for (size_t i = 0; i < columns.size(); ++i)
2020-05-14 21:51:07 +00:00
query.columns->children.emplace_back(std::make_shared<ASTIdentifier>(columns[i].name));
std::stringstream ss;
IAST::FormatSettings settings(ss, true);
settings.always_quote_identifiers = true;
settings.identifier_quoting_style = quoting;
query.IAST::format(settings);
return ss.str();
2020-04-28 00:56:44 +00:00
}
std::string getQuestionMarks(size_t n)
{
std::string result = "(";
2020-05-05 23:42:44 +00:00
for (size_t i = 0; i < n; ++i)
{
2020-04-28 00:56:44 +00:00
if (i > 0)
result += ",";
result += "?";
}
return result + ")";
}
Poco::Dynamic::Var getVarFromField(const Field & field, const ValueType type)
{
2020-05-05 23:42:44 +00:00
switch (type)
{
2020-04-28 00:56:44 +00:00
case ValueType::vtUInt8:
2020-05-05 23:42:44 +00:00
return Poco::Dynamic::Var(static_cast<UInt64>(field.get<UInt64>())).convert<UInt64>();
2020-04-28 00:56:44 +00:00
case ValueType::vtUInt16:
2020-05-05 23:42:44 +00:00
return Poco::Dynamic::Var(static_cast<UInt64>(field.get<UInt64>())).convert<UInt64>();
2020-04-28 00:56:44 +00:00
case ValueType::vtUInt32:
2020-05-05 23:42:44 +00:00
return Poco::Dynamic::Var(static_cast<UInt64>(field.get<UInt64>())).convert<UInt64>();
2020-04-28 00:56:44 +00:00
case ValueType::vtUInt64:
return Poco::Dynamic::Var(field.get<UInt64>()).convert<UInt64>();
case ValueType::vtInt8:
2020-05-05 23:42:44 +00:00
return Poco::Dynamic::Var(static_cast<Int64>(field.get<Int64>())).convert<Int64>();
2020-04-28 00:56:44 +00:00
case ValueType::vtInt16:
2020-05-05 23:42:44 +00:00
return Poco::Dynamic::Var(static_cast<Int64>(field.get<Int64>())).convert<Int64>();
2020-04-28 00:56:44 +00:00
case ValueType::vtInt32:
2020-05-05 23:42:44 +00:00
return Poco::Dynamic::Var(static_cast<Int64>(field.get<Int64>())).convert<Int64>();
2020-04-28 00:56:44 +00:00
case ValueType::vtInt64:
return Poco::Dynamic::Var(field.get<Int64>()).convert<Int64>();
case ValueType::vtFloat32:
return Poco::Dynamic::Var(field.get<Float64>()).convert<Float64>();
case ValueType::vtFloat64:
return Poco::Dynamic::Var(field.get<Float64>()).convert<Float64>();
case ValueType::vtString:
return Poco::Dynamic::Var(field.get<String>()).convert<String>();
case ValueType::vtDate:
2020-05-05 23:42:44 +00:00
return Poco::Dynamic::Var(LocalDate(DayNum(field.get<UInt64>())).toString()).convert<String>();
2020-04-28 00:56:44 +00:00
case ValueType::vtDateTime:
2020-05-05 23:42:44 +00:00
return Poco::Dynamic::Var(std::to_string(LocalDateTime(time_t(field.get<UInt64>())))).convert<String>();
2020-04-28 00:56:44 +00:00
case ValueType::vtUUID:
return Poco::Dynamic::Var(UUID(field.get<UInt128>()).toUnderType().toHexString()).convert<std::string>();
}
2020-05-15 11:26:51 +00:00
__builtin_unreachable();
2020-04-28 00:56:44 +00:00
}
}
ODBCBlockOutputStream::ODBCBlockOutputStream(Poco::Data::Session && session_,
const std::string & remote_database_name_,
const std::string & remote_table_name_,
2020-05-14 21:51:07 +00:00
const Block & sample_block_,
IdentifierQuotingStyle quoting_)
2020-04-28 00:56:44 +00:00
: session(session_)
, db_name(remote_database_name_)
, table_name(remote_table_name_)
, sample_block(sample_block_)
2020-05-14 21:51:07 +00:00
, quoting(quoting_)
2020-04-28 00:56:44 +00:00
, log(&Logger::get("ODBCBlockOutputStream"))
{
description.init(sample_block);
}
Block ODBCBlockOutputStream::getHeader() const
{
return sample_block;
}
void ODBCBlockOutputStream::write(const Block & block)
{
ColumnsWithTypeAndName columns;
for (size_t i = 0; i < block.columns(); ++i)
columns.push_back({block.getColumns()[i], sample_block.getDataTypes()[i], sample_block.getNames()[i]});
std::vector<Poco::Dynamic::Var> row_to_insert(block.columns());
2020-05-14 21:51:07 +00:00
Poco::Data::Statement statement(session << getInsertQuery(db_name, table_name, columns, quoting) + getQuestionMarks(block.columns()));
2020-04-28 00:56:44 +00:00
for (size_t i = 0; i < block.columns(); ++i)
2020-05-15 13:25:27 +00:00
statement.addBind(Poco::Data::Keywords::use(row_to_insert[i]));
2020-04-28 00:56:44 +00:00
for (size_t i = 0; i < block.rows(); ++i)
{
for (size_t col_idx = 0; col_idx < block.columns(); ++col_idx)
{
Field val;
columns[col_idx].column->get(i, val);
2020-05-14 21:51:07 +00:00
if (val.isNull())
row_to_insert[col_idx] = Poco::Dynamic::Var();
else
row_to_insert[col_idx] = getVarFromField(val, description.types[col_idx].first);
2020-04-28 00:56:44 +00:00
}
statement.execute();
}
}
}