Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Alexey Milovidov 2018-11-28 17:50:17 +03:00
commit 8c1c024472
4 changed files with 49 additions and 29 deletions

View File

@ -23,6 +23,7 @@
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterAlterQuery.h>
@ -75,25 +76,22 @@ ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, co
return modified_query_ast;
}
/// insert query has database and table names as bare strings
/// If the query is null, it creates a insert query with the database and tables
/// Or it creates a copy of query, changes the database and table names.
ASTPtr rewriteInsertQuery(const ASTPtr & query, const std::string & database, const std::string & table)
/// The columns list in the original INSERT query is incorrect because inserted blocks are transformed
/// to the form of the sample block of the Distributed table. So we rewrite it and add all columns from
/// the sample block instead.
ASTPtr createInsertToRemoteTableQuery(const std::string & database, const std::string & table, const Block & sample_block)
{
ASTPtr modified_query_ast = nullptr;
if (query == nullptr)
modified_query_ast = std::make_shared<ASTInsertQuery>();
else
modified_query_ast = query->clone();
auto query = std::make_shared<ASTInsertQuery>();
query->database = database;
query->table = table;
auto & actual_query = typeid_cast<ASTInsertQuery &>(*modified_query_ast);
actual_query.database = database;
actual_query.table = table;
actual_query.table_function = nullptr;
/// make sure query is not INSERT SELECT
actual_query.select = nullptr;
auto columns = std::make_shared<ASTExpressionList>();
query->columns = columns;
query->children.push_back(columns);
for (const auto & col : sample_block)
columns->children.push_back(std::make_shared<ASTIdentifier>(col.name));
return modified_query_ast;
return query;
}
/// Calculate maximum number in file names in directory and all subdirectories.
@ -274,7 +272,7 @@ BlockInputStreams StorageDistributed::read(
}
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr & query, const Settings & settings)
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Settings & settings)
{
auto cluster = getCluster();
@ -298,7 +296,7 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr & query, const Setti
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
return std::make_shared<DistributedBlockOutputStream>(
*this, rewriteInsertQuery(query, remote_database, remote_table), cluster, settings, insert_sync, timeout);
*this, createInsertToRemoteTableQuery(remote_database, remote_table, getSampleBlock()), cluster, settings, insert_sync, timeout);
}

View File

@ -6,8 +6,8 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'])
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'])
#test reproducing issue https://github.com/yandex/ClickHouse/issues/3162
@pytest.fixture(scope="module")
@ -19,7 +19,7 @@ def started_cluster():
node.query('''
CREATE TABLE local_test (
t UInt64,
date Date MATERIALIZED toDate(t/1000),
date Date DEFAULT toDate(t/1000),
shard UInt64,
col1 String,
col2 String
@ -45,6 +45,6 @@ CREATE TABLE dist_test (
cluster.shutdown()
def test(started_cluster):
node1.query("INSERT INTO dist_test (t, shard, col1, col2) VALUES (1000, 1, 'foo', 'bar'), (1000, 2, 'x', 'y')")
#time.sleep(3)
node1.query("INSERT INTO local_test (t, shard, col1, col2) VALUES (1000, 0, 'x', 'y')")
node2.query("INSERT INTO local_test (t, shard, col1, col2) VALUES (1000, 1, 'foo', 'bar')")
assert node1.query("SELECT col1, col2 FROM dist_test WHERE (t < 3600000) AND (col1 = 'foo') ORDER BY t ASC") == "foo\tbar\n"

View File

@ -120,16 +120,16 @@ def test_inserts_batching(started_cluster):
# Batches of max 3 rows are formed as min_insert_block_size_rows = 3.
# Blocks:
# 1. Failed batch that is retried with the same contents.
# 2. Full batch of inserts with (d, x) order of columns.
# 3. Full batch of inserts with (x, d) order of columns.
# 2. Full batch of inserts before ALTER.
# 3. Full batch of inserts before ALTER.
# 4. Full batch of inserts after ALTER (that have different block structure).
# 5. What was left to insert with (d, x) order before ALTER.
# 5. What was left to insert with the column structure before ALTER.
expected = '''\
20000101_20000101_1_1_0\t[1]
20000101_20000101_2_2_0\t[3,4,5]
20000101_20000101_3_3_0\t[2,7,8]
20000101_20000101_2_2_0\t[2,3,4]
20000101_20000101_3_3_0\t[5,6,7]
20000101_20000101_4_4_0\t[10,11,12]
20000101_20000101_5_5_0\t[6,9]
20000101_20000101_5_5_0\t[8,9]
'''
assert TSV(result) == TSV(expected)

View File

@ -49,6 +49,28 @@ def test_insertion_sync(started_cluster):
assert node2.query("SELECT count() FROM local_table").rstrip() == '20000'
# Insert with explicitly specified columns.
node1.query('''
SET insert_distributed_sync = 1, insert_distributed_timeout = 1;
INSERT INTO distributed_table(date, val) VALUES ('2000-01-01', 100500)''')
# Insert with columns specified in different order.
node1.query('''
SET insert_distributed_sync = 1, insert_distributed_timeout = 1;
INSERT INTO distributed_table(val, date) VALUES (100500, '2000-01-01')''')
# Insert with an incomplete list of columns.
node1.query('''
SET insert_distributed_sync = 1, insert_distributed_timeout = 1;
INSERT INTO distributed_table(val) VALUES (100500)''')
expected = TSV('''
0000-00-00 100500
2000-01-01 100500
2000-01-01 100500''')
assert TSV(node2.query('SELECT date, val FROM local_table WHERE val = 100500 ORDER BY date')) == expected
"""
def test_insertion_sync_fails_on_error(started_cluster):
with PartitionManager() as pm: