ClickHouse/programs/odbc-bridge/ODBCBlockOutputStream.cpp

77 lines
2.6 KiB
C++
Raw Normal View History

2020-04-28 00:56:44 +00:00
#include "ODBCBlockOutputStream.h"
2021-01-27 00:54:57 +00:00
#include <Common/hex.h>
2020-04-28 00:56:44 +00:00
#include <common/logger_useful.h>
#include <Core/Field.h>
#include <common/LocalDate.h>
2020-05-05 23:42:44 +00:00
#include <common/LocalDateTime.h>
2020-05-14 21:51:07 +00:00
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIdentifier.h>
#include "getIdentifierQuote.h"
2021-03-22 11:40:29 +00:00
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <Formats/FormatFactory.h>
2020-05-14 21:51:07 +00:00
2020-04-28 00:56:44 +00:00
namespace DB
{
2020-05-05 23:42:44 +00:00
namespace
{
2020-04-28 00:56:44 +00:00
using ValueType = ExternalResultDescription::ValueType;
2020-05-14 21:51:07 +00:00
std::string getInsertQuery(const std::string & db_name, const std::string & table_name, const ColumnsWithTypeAndName & columns, IdentifierQuotingStyle quoting)
2020-04-28 00:56:44 +00:00
{
2020-05-14 21:51:07 +00:00
ASTInsertQuery query;
query.table_id.database_name = db_name;
query.table_id.table_name = table_name;
query.columns = std::make_shared<ASTExpressionList>(',');
query.children.push_back(query.columns);
for (const auto & column : columns)
query.columns->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));
2020-05-14 21:51:07 +00:00
2020-11-09 16:05:40 +00:00
WriteBufferFromOwnString buf;
IAST::FormatSettings settings(buf, true);
2020-05-14 21:51:07 +00:00
settings.always_quote_identifiers = true;
settings.identifier_quoting_style = quoting;
query.IAST::format(settings);
2020-11-09 16:05:40 +00:00
return buf.str();
2020-04-28 00:56:44 +00:00
}
}
2021-06-07 18:09:16 +00:00
ODBCBlockOutputStream::ODBCBlockOutputStream(nanodbc::ConnectionHolderPtr connection_holder_,
2020-04-28 00:56:44 +00:00
const std::string & remote_database_name_,
const std::string & remote_table_name_,
2020-05-14 21:51:07 +00:00
const Block & sample_block_,
ContextPtr local_context_,
2020-05-14 21:51:07 +00:00
IdentifierQuotingStyle quoting_)
2021-03-22 11:40:29 +00:00
: log(&Poco::Logger::get("ODBCBlockOutputStream"))
2021-06-07 18:09:16 +00:00
, connection_holder(std::move(connection_holder_))
2020-04-28 00:56:44 +00:00
, db_name(remote_database_name_)
, table_name(remote_table_name_)
, sample_block(sample_block_)
, local_context(local_context_)
2020-05-14 21:51:07 +00:00
, quoting(quoting_)
2020-04-28 00:56:44 +00:00
{
description.init(sample_block);
}
Block ODBCBlockOutputStream::getHeader() const
{
return sample_block;
}
void ODBCBlockOutputStream::write(const Block & block)
{
2021-03-22 11:40:29 +00:00
WriteBufferFromOwnString values_buf;
auto writer = FormatFactory::instance().getOutputStream("Values", values_buf, sample_block, local_context);
2021-03-22 11:40:29 +00:00
writer->write(block);
2020-04-28 00:56:44 +00:00
2021-03-22 11:40:29 +00:00
std::string query = getInsertQuery(db_name, table_name, block.getColumnsWithTypeAndName(), quoting) + values_buf.str();
2021-06-07 18:09:16 +00:00
execute<void>(connection_holder,
[&](nanodbc::connection & connection) { execute(connection, query); });
2020-04-28 00:56:44 +00:00
}
}