mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #10071 from ClickHouse/add_conversion_stream
Add converting stream to localhost clickhouse dict
This commit is contained in:
commit
491f4b2c60
@ -2,6 +2,7 @@
|
||||
#include <memory>
|
||||
#include <Client/ConnectionPool.h>
|
||||
#include <DataStreams/RemoteBlockInputStream.h>
|
||||
#include <DataStreams/ConvertingBlockInputStream.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
#include <Interpreters/executeQuery.h>
|
||||
#include <Common/isLocalAddress.h>
|
||||
@ -131,6 +132,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadAll()
|
||||
{
|
||||
BlockIO res = executeQuery(load_all_query, context, true);
|
||||
/// FIXME res.in may implicitly use some objects owned be res, but them will be destructed after return
|
||||
res.in = std::make_shared<ConvertingBlockInputStream>(context, res.in, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
|
||||
return res.in;
|
||||
}
|
||||
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, sample_block, context);
|
||||
@ -140,7 +142,11 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll()
|
||||
{
|
||||
std::string load_update_query = getUpdateFieldAndDate();
|
||||
if (is_local)
|
||||
return executeQuery(load_update_query, context, true).in;
|
||||
{
|
||||
auto res = executeQuery(load_update_query, context, true);
|
||||
res.in = std::make_shared<ConvertingBlockInputStream>(context, res.in, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
|
||||
return res.in;
|
||||
}
|
||||
return std::make_shared<RemoteBlockInputStream>(pool, load_update_query, sample_block, context);
|
||||
}
|
||||
|
||||
@ -183,7 +189,12 @@ std::string ClickHouseDictionarySource::toString() const
|
||||
BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(const std::string & query)
|
||||
{
|
||||
if (is_local)
|
||||
return executeQuery(query, context, true).in;
|
||||
{
|
||||
auto res = executeQuery(query, context, true);
|
||||
res.in = std::make_shared<ConvertingBlockInputStream>(
|
||||
context, res.in, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
|
||||
return res.in;
|
||||
}
|
||||
|
||||
return std::make_shared<RemoteBlockInputStream>(pool, query, sample_block, context);
|
||||
}
|
||||
|
@ -0,0 +1,2 @@
|
||||
First WINDOWS 1
|
||||
Second LINUX 2
|
@ -0,0 +1,27 @@
|
||||
DROP DATABASE IF EXISTS database_for_dict;
|
||||
|
||||
CREATE DATABASE database_for_dict;
|
||||
|
||||
CREATE TABLE database_for_dict.table_for_dict (
|
||||
CompanyID String,
|
||||
OSType Enum('UNKNOWN' = 0, 'WINDOWS' = 1, 'LINUX' = 2, 'ANDROID' = 3, 'MAC' = 4),
|
||||
SomeID Int32
|
||||
)
|
||||
ENGINE = Memory();
|
||||
|
||||
INSERT INTO database_for_dict.table_for_dict VALUES ('First', 'WINDOWS', 1), ('Second', 'LINUX', 2);
|
||||
|
||||
CREATE DICTIONARY database_for_dict.dict_with_conversion
|
||||
(
|
||||
CompanyID String DEFAULT '',
|
||||
OSType String DEFAULT '',
|
||||
SomeID Int32 DEFAULT 0
|
||||
)
|
||||
PRIMARY KEY CompanyID
|
||||
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' DB 'database_for_dict'))
|
||||
LIFETIME(MIN 1 MAX 20)
|
||||
LAYOUT(COMPLEX_KEY_HASHED());
|
||||
|
||||
SELECT * FROM database_for_dict.dict_with_conversion ORDER BY CompanyID;
|
||||
|
||||
DROP DATABASE IF EXISTS database_for_dict;
|
Loading…
Reference in New Issue
Block a user