ClickHouse/programs/odbc-bridge/ODBCSink.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

46 lines
1.3 KiB
C++
Raw Normal View History

2024-03-24 16:21:53 +00:00
#include "ODBCSink.h"
2020-04-28 00:56:44 +00:00
2021-12-24 12:37:40 +00:00
#include <IO/WriteBufferFromString.h>
#include <Interpreters/Context.h>
2021-10-11 16:11:50 +00:00
#include <Processors/Formats/IOutputFormat.h>
2021-08-25 21:51:43 +00:00
#include <Parsers/getInsertQuery.h>
2020-05-14 21:51:07 +00:00
2020-04-28 00:56:44 +00:00
namespace DB
{
2021-10-11 16:11:50 +00:00
ODBCSink::ODBCSink(
nanodbc::ConnectionHolderPtr connection_holder_,
const std::string & remote_database_name_,
const std::string & remote_table_name_,
const Block & sample_block_,
ContextPtr local_context_,
IdentifierQuotingStyle quoting_)
: ISink(sample_block_)
2024-01-23 17:04:50 +00:00
, log(getLogger("ODBCSink"))
2021-06-07 18:09:16 +00:00
, connection_holder(std::move(connection_holder_))
2020-04-28 00:56:44 +00:00
, db_name(remote_database_name_)
, table_name(remote_table_name_)
, sample_block(sample_block_)
, local_context(local_context_)
2020-05-14 21:51:07 +00:00
, quoting(quoting_)
2020-04-28 00:56:44 +00:00
{
description.init(sample_block);
}
2021-10-11 16:11:50 +00:00
void ODBCSink::consume(Chunk chunk)
2020-04-28 00:56:44 +00:00
{
2021-10-11 16:11:50 +00:00
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
2021-03-22 11:40:29 +00:00
WriteBufferFromOwnString values_buf;
2021-10-11 16:11:50 +00:00
auto writer = local_context->getOutputFormat("Values", values_buf, sample_block);
2021-03-22 11:40:29 +00:00
writer->write(block);
2020-04-28 00:56:44 +00:00
2021-03-22 11:40:29 +00:00
std::string query = getInsertQuery(db_name, table_name, block.getColumnsWithTypeAndName(), quoting) + values_buf.str();
2021-06-07 18:09:16 +00:00
execute<void>(connection_holder,
2021-12-24 12:37:40 +00:00
[&](nanodbc::connection & connection) { execute(connection, query); });
2020-04-28 00:56:44 +00:00
}
}