2020-04-28 00:56:44 +00:00
|
|
|
#include "ODBCBlockOutputStream.h"
|
|
|
|
|
2021-01-27 00:54:57 +00:00
|
|
|
#include <Common/hex.h>
|
2021-10-02 07:13:14 +00:00
|
|
|
#include <base/logger_useful.h>
|
2020-04-28 00:56:44 +00:00
|
|
|
#include <Core/Field.h>
|
2021-10-02 07:13:14 +00:00
|
|
|
#include <base/LocalDate.h>
|
|
|
|
#include <base/LocalDateTime.h>
|
2020-05-14 21:51:07 +00:00
|
|
|
#include "getIdentifierQuote.h"
|
2021-03-22 11:40:29 +00:00
|
|
|
#include <IO/WriteHelpers.h>
|
|
|
|
#include <IO/Operators.h>
|
2021-10-11 16:11:50 +00:00
|
|
|
#include <Processors/Formats/IOutputFormat.h>
|
2021-08-25 21:51:43 +00:00
|
|
|
#include <Parsers/getInsertQuery.h>
|
2020-05-14 21:51:07 +00:00
|
|
|
|
2020-04-28 00:56:44 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
|
2021-10-11 16:11:50 +00:00
|
|
|
ODBCSink::ODBCSink(
|
|
|
|
nanodbc::ConnectionHolderPtr connection_holder_,
|
|
|
|
const std::string & remote_database_name_,
|
|
|
|
const std::string & remote_table_name_,
|
|
|
|
const Block & sample_block_,
|
|
|
|
ContextPtr local_context_,
|
|
|
|
IdentifierQuotingStyle quoting_)
|
|
|
|
: ISink(sample_block_)
|
2021-10-13 09:50:20 +00:00
|
|
|
, log(&Poco::Logger::get("ODBCSink"))
|
2021-06-07 18:09:16 +00:00
|
|
|
, connection_holder(std::move(connection_holder_))
|
2020-04-28 00:56:44 +00:00
|
|
|
, db_name(remote_database_name_)
|
|
|
|
, table_name(remote_table_name_)
|
|
|
|
, sample_block(sample_block_)
|
2021-04-11 06:13:11 +00:00
|
|
|
, local_context(local_context_)
|
2020-05-14 21:51:07 +00:00
|
|
|
, quoting(quoting_)
|
2020-04-28 00:56:44 +00:00
|
|
|
{
|
|
|
|
description.init(sample_block);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-10-11 16:11:50 +00:00
|
|
|
void ODBCSink::consume(Chunk chunk)
|
2020-04-28 00:56:44 +00:00
|
|
|
{
|
2021-10-11 16:11:50 +00:00
|
|
|
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
|
2021-03-22 11:40:29 +00:00
|
|
|
WriteBufferFromOwnString values_buf;
|
2021-10-11 16:11:50 +00:00
|
|
|
auto writer = local_context->getOutputFormat("Values", values_buf, sample_block);
|
2021-03-22 11:40:29 +00:00
|
|
|
writer->write(block);
|
2020-04-28 00:56:44 +00:00
|
|
|
|
2021-03-22 11:40:29 +00:00
|
|
|
std::string query = getInsertQuery(db_name, table_name, block.getColumnsWithTypeAndName(), quoting) + values_buf.str();
|
2021-06-07 18:09:16 +00:00
|
|
|
execute<void>(connection_holder,
|
|
|
|
[&](nanodbc::connection & connection) { execute(connection, query); });
|
2020-04-28 00:56:44 +00:00
|
|
|
}
|
|
|
|
|
2020-05-13 18:30:26 +00:00
|
|
|
}
|