Add converting stream to localhost clickhouse dict

This commit is contained in:
alesapin 2020-04-06 22:09:39 +03:00
parent f0124ffc2b
commit 04e3e3179c
3 changed files with 42 additions and 2 deletions

View File

@ -2,6 +2,7 @@
#include <memory>
#include <Client/ConnectionPool.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/executeQuery.h>
#include <Common/isLocalAddress.h>
@ -131,6 +132,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadAll()
{
BlockIO res = executeQuery(load_all_query, context, true);
/// FIXME res.in may implicitly use some objects owned be res, but them will be destructed after return
res.in = std::make_shared<ConvertingBlockInputStream>(context, res.in, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return res.in;
}
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, sample_block, context);
@ -140,7 +142,11 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll()
{
std::string load_update_query = getUpdateFieldAndDate();
if (is_local)
return executeQuery(load_update_query, context, true).in;
{
auto res = executeQuery(load_update_query, context, true);
res.in = std::make_shared<ConvertingBlockInputStream>(context, res.in, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return res.in;
}
return std::make_shared<RemoteBlockInputStream>(pool, load_update_query, sample_block, context);
}
@ -183,7 +189,12 @@ std::string ClickHouseDictionarySource::toString() const
BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(const std::string & query)
{
if (is_local)
return executeQuery(query, context, true).in;
{
auto res = executeQuery(query, context, true);
res.in = std::make_shared<ConvertingBlockInputStream>(
context, res.in, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return res.in;
}
return std::make_shared<RemoteBlockInputStream>(pool, query, sample_block, context);
}

View File

@ -0,0 +1,2 @@
First WINDOWS 1
Second LINUX 2

View File

@ -0,0 +1,27 @@
DROP DATABASE IF EXISTS database_for_dict;
CREATE DATABASE database_for_dict;
CREATE TABLE database_for_dict.table_for_dict (
CompanyID String,
OSType Enum('UNKNOWN' = 0, 'WINDOWS' = 1, 'LINUX' = 2, 'ANDROID' = 3, 'MAC' = 4),
SomeID Int32
)
ENGINE = Memory();
INSERT INTO database_for_dict.table_for_dict VALUES ('First', 'WINDOWS', 1), ('Second', 'LINUX', 2);
CREATE DICTIONARY database_for_dict.dict_with_conversion
(
CompanyID String DEFAULT '',
OSType String DEFAULT '',
SomeID Int32 DEFAULT 0
)
PRIMARY KEY CompanyID
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' DB 'database_for_dict'))
LIFETIME(MIN 1 MAX 20)
LAYOUT(COMPLEX_KEY_HASHED());
SELECT * FROM database_for_dict.dict_with_conversion ORDER BY CompanyID;
DROP DATABASE IF EXISTS database_for_dict;