Add _shard_num virtual column for the Distributed engine

With JOIN from system.clusters one can figure out from which server data
came.

TODO:
- optimization to avoid communicating with unrelated shards (for queries
  like "AND _shard_num = n")
- fix aliases (see tests with serverError expected)

v0: AddingConstColumnBlockInputStream
v2: VirtualColumnUtils::rewriteEntityInAst
v3: fix remote(Distributed) by appending _shard_num only if has been requested
This commit is contained in:
Azat Khuzhin 2019-09-19 00:17:00 +03:00
parent 33edb0929b
commit 81aeff2d2a
6 changed files with 159 additions and 10 deletions

View File

@ -4,6 +4,7 @@
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/LazyBlockInputStream.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/checkStackSize.h>
@ -34,12 +35,14 @@ SelectStreamFactory::SelectStreamFactory(
QueryProcessingStage::Enum processed_stage_,
QualifiedTableName main_table_,
const Scalars & scalars_,
bool has_virtual_shard_num_column_,
const Tables & external_tables_)
: header(header_),
processed_stage{processed_stage_},
main_table(std::move(main_table_)),
table_func_ptr{nullptr},
scalars{scalars_},
has_virtual_shard_num_column(has_virtual_shard_num_column_),
external_tables{external_tables_}
{
}
@ -49,11 +52,13 @@ SelectStreamFactory::SelectStreamFactory(
QueryProcessingStage::Enum processed_stage_,
ASTPtr table_func_ptr_,
const Scalars & scalars_,
bool has_virtual_shard_num_column_,
const Tables & external_tables_)
: header(header_),
processed_stage{processed_stage_},
table_func_ptr{table_func_ptr_},
scalars{scalars_},
has_virtual_shard_num_column(has_virtual_shard_num_column_),
external_tables{external_tables_}
{
}
@ -81,23 +86,38 @@ BlockInputStreamPtr createLocalStream(const ASTPtr & query_ast, const Context &
return stream;
}
static String formattedAST(const ASTPtr & ast)
{
if (!ast)
return "";
std::stringstream ss;
formatAST(*ast, ss, false, true);
return ss.str();
}
}
void SelectStreamFactory::createForShard(
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast,
const String &, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res)
{
auto modified_query_ast = query_ast->clone();
if (has_virtual_shard_num_column)
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, "_shard_num", shard_info.shard_num);
auto emplace_local_stream = [&]()
{
res.emplace_back(createLocalStream(query_ast, context, processed_stage));
res.emplace_back(createLocalStream(modified_query_ast, context, processed_stage));
};
String modified_query = formattedAST(modified_query_ast);
auto emplace_remote_stream = [&]()
{
auto stream = std::make_shared<RemoteBlockInputStream>(
shard_info.pool, query, header, context, nullptr, throttler, scalars, external_tables, processed_stage);
shard_info.pool, modified_query, header, context, nullptr, throttler, scalars, external_tables, processed_stage);
stream->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr)
stream->setMainTable(main_table);
@ -194,7 +214,7 @@ void SelectStreamFactory::createForShard(
/// Do it lazily to avoid connecting in the main thread.
auto lazily_create_stream = [
pool = shard_info.pool, shard_num = shard_info.shard_num, query, header = header, query_ast, context, throttler,
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast, context, throttler,
main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
stage = processed_stage, local_delay]()
-> BlockInputStreamPtr
@ -229,7 +249,7 @@ void SelectStreamFactory::createForShard(
}
if (try_results.empty() || local_delay < max_remote_delay)
return createLocalStream(query_ast, context, stage);
return createLocalStream(modified_query_ast, context, stage);
else
{
std::vector<IConnectionPool::Entry> connections;
@ -238,7 +258,7 @@ void SelectStreamFactory::createForShard(
connections.emplace_back(std::move(try_result.entry));
return std::make_shared<RemoteBlockInputStream>(
std::move(connections), query, header, context, nullptr, throttler, scalars, external_tables, stage);
std::move(connections), modified_query, header, context, nullptr, throttler, scalars, external_tables, stage);
}
};

View File

@ -19,6 +19,7 @@ public:
QueryProcessingStage::Enum processed_stage_,
QualifiedTableName main_table_,
const Scalars & scalars_,
bool has_virtual_shard_num_column_,
const Tables & external_tables);
/// TableFunction in a query.
@ -27,6 +28,7 @@ public:
QueryProcessingStage::Enum processed_stage_,
ASTPtr table_func_ptr_,
const Scalars & scalars_,
bool has_virtual_shard_num_column_,
const Tables & external_tables_);
void createForShard(
@ -41,6 +43,7 @@ private:
QualifiedTableName main_table;
ASTPtr table_func_ptr;
Scalars scalars;
bool has_virtual_shard_num_column = false;
Tables external_tables;
};

View File

@ -216,7 +216,10 @@ StorageDistributed::StorageDistributed(
const ASTPtr & sharding_key_,
const String & data_path_,
bool attach_)
: table_name(table_name_), database_name(database_name_),
: IStorage(ColumnsDescription({
{"_shard_num", std::make_shared<DataTypeUInt32>()},
}, true)),
table_name(table_name_), database_name(database_name_),
remote_database(remote_database_), remote_table(remote_table_),
global_context(context_), cluster_name(global_context.getMacros()->expand(cluster_name_)), has_sharding_key(sharding_key_),
path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(table_name) + '/'))
@ -305,7 +308,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Con
}
BlockInputStreams StorageDistributed::read(
const Names & /*column_names*/,
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
@ -324,11 +327,15 @@ BlockInputStreams StorageDistributed::read(
const Scalars & scalars = context.hasQueryContext() ? context.getQueryContext().getScalars() : Scalars{};
bool has_virtual_shard_num_column = std::find(column_names.begin(), column_names.end(), "_shard_num") != column_names.end();
if (has_virtual_shard_num_column && !isVirtualColumn("_shard_num"))
has_virtual_shard_num_column = false;
ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr
? ClusterProxy::SelectStreamFactory(
header, processed_stage, remote_table_function_ptr, scalars, context.getExternalTables())
header, processed_stage, remote_table_function_ptr, scalars, has_virtual_shard_num_column, context.getExternalTables())
: ClusterProxy::SelectStreamFactory(
header, processed_stage, QualifiedTableName{remote_database, remote_table}, scalars, context.getExternalTables());
header, processed_stage, QualifiedTableName{remote_database, remote_table}, scalars, has_virtual_shard_num_column, context.getExternalTables());
if (settings.optimize_skip_unused_shards)
{

View File

@ -0,0 +1,36 @@
remote(system.one)
0
0
0
1 0
1 0
2 0
1 0
dist_1
1
1 10
10
1
1
1 10
1 20
10
20
dist_2
1
2
1 100
2 100
100
100
remote(Distributed)
1 100
1 100
JOIN system.clusters
1 10 localhost ::1 9000
1 20 localhost ::1 9000
1 10 localhost ::1 9000
1 20 localhost ::1 9000
dist_3
100 foo
foo 100 foo

View File

@ -0,0 +1,72 @@
-- make the order static
SET max_threads = 1;
-- remote(system.one)
SELECT 'remote(system.one)';
SELECT * FROM remote('127.0.0.1', system.one);
SELECT * FROM remote('127.0.0.{1,2}', system.one);
SELECT _shard_num, * FROM remote('127.0.0.1', system.one);
SELECT _shard_num, * FROM remote('127.0.0.{1,2}', system.one);
SELECT _shard_num, * FROM remote('127.0.0.{1,2}', system.one) WHERE _shard_num = 1;
-- dist_1 using test_shard_localhost
SELECT 'dist_1';
CREATE TABLE mem1 (key Int) Engine=Memory();
CREATE TABLE dist_1 AS mem1 Engine=Distributed(test_shard_localhost, currentDatabase(), mem1);
SELECT _shard_num FROM dist_1;
INSERT INTO mem1 VALUES (10);
SELECT _shard_num FROM dist_1;
SELECT _shard_num, key FROM dist_1;
SELECT key FROM dist_1;
INSERT INTO dist_1 VALUES (20);
SELECT _shard_num FROM dist_1;
SELECT _shard_num, key FROM dist_1;
SELECT key FROM dist_1;
-- dist_2 using test_cluster_two_shards_localhost
SELECT 'dist_2';
CREATE TABLE mem2 (key Int) Engine=Memory();
CREATE TABLE dist_2 AS mem2 Engine=Distributed(test_cluster_two_shards_localhost, currentDatabase(), mem2);
SELECT _shard_num FROM dist_2;
INSERT INTO mem2 VALUES (100);
SELECT _shard_num FROM dist_2;
SELECT _shard_num, key FROM dist_2;
SELECT key FROM dist_2;
-- multiple _shard_num
SELECT 'remote(Distributed)';
SELECT _shard_num, key FROM remote('127.0.0.1', currentDatabase(), dist_2);
-- JOIN system.clusters
SELECT 'JOIN system.clusters';
SELECT a._shard_num, a.key, b.host_name, b.host_address, b.port
FROM (SELECT *, _shard_num FROM dist_1) a
JOIN system.clusters b
ON a._shard_num = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost';
-- Requires toUInt32() otherwise Type mismatch of columns (53)
SELECT _shard_num, key, b.host_name, b.host_address, b.port
FROM dist_1 a
JOIN system.clusters b
ON toUInt32(_shard_num) = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost';
-- rewrite does not work with aliases, hence Missing columns (47)
SELECT a._shard_num, key FROM dist_1 a; -- { serverError 47; }
-- the same with JOIN, just in case
SELECT a._shard_num, a.key, b.host_name, b.host_address, b.port
FROM dist_1 a
JOIN system.clusters b
ON a._shard_num = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost'; -- { serverError 47; }
SELECT 'dist_3';
CREATE TABLE mem3 (key Int, _shard_num String) Engine=Memory();
CREATE TABLE dist_3 AS mem3 Engine=Distributed(test_shard_localhost, currentDatabase(), mem3);
INSERT INTO mem3 VALUES (100, 'foo');
SELECT * FROM dist_3;
SELECT _shard_num, * FROM dist_3;

View File

@ -121,5 +121,16 @@ If the server ceased to exist or had a rough restart (for example, after a devic
When the max_parallel_replicas option is enabled, query processing is parallelized across all replicas within a single shard. For more information, see the section [max_parallel_replicas](../settings/settings.md#settings-max_parallel_replicas).
## Virtual Columns
- `_shard_num` — Contains the `shard_num` (from `system.clusters`). Type: [UInt32](../../data_types/int_uint.md).
!!! note "Note"
Since [`remote`](../../query_language/table_functions/remote.md)/`cluster` table functions internally create temporary instance of the same Distributed engine, `_shard_num` is available there too.
**See Also**
- [Virtual columns](index.md#table_engines-virtual_columns)
[Original article](https://clickhouse.yandex/docs/en/operations/table_engines/distributed/) <!--hide-->