mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
Some more tests
This commit is contained in:
parent
f01c8edbff
commit
4ad0f45f0e
@ -66,6 +66,6 @@ WHERE oid = 'postgres_table'::regclass;
|
|||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
## WARNINGS {#warnings}
|
## Warning {#warning}
|
||||||
|
|
||||||
1. **TOAST** values convertions is not supported. Default value for the data type will be used.
|
1. **TOAST** values convertion is not supported. Default value for the data type will be used.
|
||||||
|
@ -41,6 +41,6 @@ SELECT key, value, _version FROM test.postgresql_replica;
|
|||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
## WARNINGS {#warnings}
|
## Warning {#warning}
|
||||||
|
|
||||||
1. **TOAST** values convertions is not supported. Default value for the data type will be used.
|
1. **TOAST** values convertion is not supported. Default value for the data type will be used.
|
||||||
|
@ -233,7 +233,7 @@ void StorageMaterializedPostgreSQL::shutdown()
|
|||||||
|
|
||||||
void StorageMaterializedPostgreSQL::dropInnerTableIfAny(bool no_delay, ContextPtr local_context)
|
void StorageMaterializedPostgreSQL::dropInnerTableIfAny(bool no_delay, ContextPtr local_context)
|
||||||
{
|
{
|
||||||
/// If it is a table with database engine MaterializedPostgreSQL - return, becuase delition of
|
/// If it is a table with database engine MaterializedPostgreSQL - return, because delition of
|
||||||
/// internal tables is managed there.
|
/// internal tables is managed there.
|
||||||
if (is_materialized_postgresql_database)
|
if (is_materialized_postgresql_database)
|
||||||
return;
|
return;
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
#include <Processors/Transforms/FilterTransform.h>
|
#include <Processors/Transforms/FilterTransform.h>
|
||||||
|
|
||||||
#include <Databases/MySQL/DatabaseMaterializeMySQL.h>
|
#include <Databases/MySQL/DatabaseMaterializeMySQL.h>
|
||||||
|
#include <Storages/ReadFinalForExternalReplicaStorage.h>
|
||||||
#include <Storages/SelectQueryInfo.h>
|
#include <Storages/SelectQueryInfo.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -37,7 +38,7 @@ StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_stora
|
|||||||
|
|
||||||
Pipe StorageMaterializeMySQL::read(
|
Pipe StorageMaterializeMySQL::read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
SelectQueryInfo & query_info,
|
SelectQueryInfo & query_info,
|
||||||
ContextPtr context,
|
ContextPtr context,
|
||||||
QueryProcessingStage::Enum processed_stage,
|
QueryProcessingStage::Enum processed_stage,
|
||||||
@ -46,61 +47,8 @@ Pipe StorageMaterializeMySQL::read(
|
|||||||
{
|
{
|
||||||
/// If the background synchronization thread has exception.
|
/// If the background synchronization thread has exception.
|
||||||
rethrowSyncExceptionIfNeed(database);
|
rethrowSyncExceptionIfNeed(database);
|
||||||
|
return readFinalFromNestedStorage(nested_storage, column_names, metadata_snapshot,
|
||||||
NameSet column_names_set = NameSet(column_names.begin(), column_names.end());
|
query_info, context, processed_stage, max_block_size, num_streams);
|
||||||
auto lock = nested_storage->lockForShare(context->getCurrentQueryId(), context->getSettingsRef().lock_acquire_timeout);
|
|
||||||
const StorageMetadataPtr & nested_metadata = nested_storage->getInMemoryMetadataPtr();
|
|
||||||
|
|
||||||
Block nested_header = nested_metadata->getSampleBlock();
|
|
||||||
ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2);
|
|
||||||
ColumnWithTypeAndName & version_column = nested_header.getByPosition(nested_header.columns() - 1);
|
|
||||||
|
|
||||||
if (ASTSelectQuery * select_query = query_info.query->as<ASTSelectQuery>(); select_query && !column_names_set.count(version_column.name))
|
|
||||||
{
|
|
||||||
auto & tables_in_select_query = select_query->tables()->as<ASTTablesInSelectQuery &>();
|
|
||||||
|
|
||||||
if (!tables_in_select_query.children.empty())
|
|
||||||
{
|
|
||||||
auto & tables_element = tables_in_select_query.children[0]->as<ASTTablesInSelectQueryElement &>();
|
|
||||||
|
|
||||||
if (tables_element.table_expression)
|
|
||||||
tables_element.table_expression->as<ASTTableExpression &>().final = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
String filter_column_name;
|
|
||||||
Names require_columns_name = column_names;
|
|
||||||
ASTPtr expressions = std::make_shared<ASTExpressionList>();
|
|
||||||
if (column_names_set.empty() || !column_names_set.count(sign_column.name))
|
|
||||||
{
|
|
||||||
require_columns_name.emplace_back(sign_column.name);
|
|
||||||
|
|
||||||
const auto & sign_column_name = std::make_shared<ASTIdentifier>(sign_column.name);
|
|
||||||
const auto & fetch_sign_value = std::make_shared<ASTLiteral>(Field(Int8(1)));
|
|
||||||
|
|
||||||
expressions->children.emplace_back(makeASTFunction("equals", sign_column_name, fetch_sign_value));
|
|
||||||
filter_column_name = expressions->children.back()->getColumnName();
|
|
||||||
|
|
||||||
for (const auto & column_name : column_names)
|
|
||||||
expressions->children.emplace_back(std::make_shared<ASTIdentifier>(column_name));
|
|
||||||
}
|
|
||||||
|
|
||||||
Pipe pipe = nested_storage->read(require_columns_name, nested_metadata, query_info, context, processed_stage, max_block_size, num_streams);
|
|
||||||
pipe.addTableLock(lock);
|
|
||||||
|
|
||||||
if (!expressions->children.empty() && !pipe.empty())
|
|
||||||
{
|
|
||||||
Block pipe_header = pipe.getHeader();
|
|
||||||
auto syntax = TreeRewriter(context).analyze(expressions, pipe_header.getNamesAndTypesList());
|
|
||||||
ExpressionActionsPtr expression_actions = ExpressionAnalyzer(expressions, syntax, context).getActions(true /* add_aliases */, false /* project_result */);
|
|
||||||
|
|
||||||
pipe.addSimpleTransform([&](const Block & header)
|
|
||||||
{
|
|
||||||
return std::make_shared<FilterTransform>(header, expression_actions, filter_column_name, false);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
return pipe;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
NamesAndTypesList StorageMaterializeMySQL::getVirtuals() const
|
NamesAndTypesList StorageMaterializeMySQL::getVirtuals() const
|
||||||
|
@ -844,22 +844,29 @@ def test_abrupt_server_restart_while_heavy_replication(started_cluster):
|
|||||||
port=started_cluster.postgres_port,
|
port=started_cluster.postgres_port,
|
||||||
database=True)
|
database=True)
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
NUM_TABLES = 2
|
NUM_TABLES = 6
|
||||||
|
|
||||||
for i in range(NUM_TABLES):
|
for i in range(NUM_TABLES):
|
||||||
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
|
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
|
||||||
|
|
||||||
def transaction(thread_id):
|
def transaction(thread_id):
|
||||||
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
if thread_id % 2:
|
||||||
port=started_cluster.postgres_port,
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
||||||
database=True, auto_commit=True)
|
port=started_cluster.postgres_port,
|
||||||
|
database=True, auto_commit=True)
|
||||||
|
else:
|
||||||
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
||||||
|
port=started_cluster.postgres_port,
|
||||||
|
database=True, auto_commit=False)
|
||||||
cursor_ = conn.cursor()
|
cursor_ = conn.cursor()
|
||||||
for query in queries:
|
for query in queries:
|
||||||
cursor_.execute(query.format(thread_id))
|
cursor_.execute(query.format(thread_id))
|
||||||
print('thread {}, query {}'.format(thread_id, query))
|
print('thread {}, query {}'.format(thread_id, query))
|
||||||
|
if thread_id % 2 == 0:
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
threads = []
|
threads = []
|
||||||
threads_num = 2
|
threads_num = 6
|
||||||
for i in range(threads_num):
|
for i in range(threads_num):
|
||||||
threads.append(threading.Thread(target=transaction, args=(i,)))
|
threads.append(threading.Thread(target=transaction, args=(i,)))
|
||||||
|
|
||||||
|
@ -8,6 +8,8 @@ from helpers.test_tools import assert_eq_with_retry
|
|||||||
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
|
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
|
||||||
from helpers.test_tools import TSV
|
from helpers.test_tools import TSV
|
||||||
|
|
||||||
|
import threading
|
||||||
|
|
||||||
cluster = ClickHouseCluster(__file__)
|
cluster = ClickHouseCluster(__file__)
|
||||||
instance = cluster.add_instance('instance', main_configs=['configs/log_conf.xml'], with_postgres=True, stay_alive=True)
|
instance = cluster.add_instance('instance', main_configs=['configs/log_conf.xml'], with_postgres=True, stay_alive=True)
|
||||||
|
|
||||||
@ -16,6 +18,40 @@ postgres_table_template = """
|
|||||||
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
|
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
queries = [
|
||||||
|
'INSERT INTO postgresql_replica select i, i from generate_series(0, 10000) as t(i);',
|
||||||
|
'DELETE FROM postgresql_replica WHERE (value*value) % 3 = 0;',
|
||||||
|
'UPDATE postgresql_replica SET value = value + 125 WHERE key % 2 = 0;',
|
||||||
|
"UPDATE postgresql_replica SET key=key+20000 WHERE key%2=0",
|
||||||
|
'INSERT INTO postgresql_replica select i, i from generate_series(40000, 50000) as t(i);',
|
||||||
|
'DELETE FROM postgresql_replica WHERE key % 10 = 0;',
|
||||||
|
'UPDATE postgresql_replica SET value = value + 101 WHERE key % 2 = 1;',
|
||||||
|
"UPDATE postgresql_replica SET key=key+80000 WHERE key%2=1",
|
||||||
|
'DELETE FROM postgresql_replica WHERE value % 2 = 0;',
|
||||||
|
'UPDATE postgresql_replica SET value = value + 2000 WHERE key % 5 = 0;',
|
||||||
|
'INSERT INTO postgresql_replica select i, i from generate_series(200000, 250000) as t(i);',
|
||||||
|
'DELETE FROM postgresql_replica WHERE value % 3 = 0;',
|
||||||
|
'UPDATE postgresql_replica SET value = value * 2 WHERE key % 3 = 0;',
|
||||||
|
"UPDATE postgresql_replica SET key=key+500000 WHERE key%2=1",
|
||||||
|
'INSERT INTO postgresql_replica select i, i from generate_series(1000000, 1050000) as t(i);',
|
||||||
|
'DELETE FROM postgresql_replica WHERE value % 9 = 2;',
|
||||||
|
"UPDATE postgresql_replica SET key=key+10000000",
|
||||||
|
'UPDATE postgresql_replica SET value = value + 2 WHERE key % 3 = 1;',
|
||||||
|
'DELETE FROM postgresql_replica WHERE value%5 = 0;'
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.timeout(30)
|
||||||
|
def check_tables_are_synchronized(table_name, order_by='key', postgres_database='postgres_database'):
|
||||||
|
expected = instance.query('select * from {}.{} order by {};'.format(postgres_database, table_name, order_by))
|
||||||
|
result = instance.query('select * from test.{} order by {};'.format(table_name, order_by))
|
||||||
|
|
||||||
|
while result != expected:
|
||||||
|
time.sleep(0.5)
|
||||||
|
result = instance.query('select * from test.{} order by {};'.format(table_name, order_by))
|
||||||
|
|
||||||
|
assert(result == expected)
|
||||||
|
|
||||||
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database'):
|
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database'):
|
||||||
if database == True:
|
if database == True:
|
||||||
conn_string = "host={} port={} dbname='{}' user='postgres' password='mysecretpassword'".format(ip, port, database_name)
|
conn_string = "host={} port={} dbname='{}' user='postgres' password='mysecretpassword'".format(ip, port, database_name)
|
||||||
@ -28,7 +64,6 @@ def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name=
|
|||||||
conn.autocommit = True
|
conn.autocommit = True
|
||||||
return conn
|
return conn
|
||||||
|
|
||||||
|
|
||||||
def create_postgres_db(cursor, name):
|
def create_postgres_db(cursor, name):
|
||||||
cursor.execute("CREATE DATABASE {}".format(name))
|
cursor.execute("CREATE DATABASE {}".format(name))
|
||||||
|
|
||||||
@ -487,6 +522,78 @@ def test_virtual_columns(started_cluster):
|
|||||||
cursor.execute('DROP TABLE postgresql_replica;')
|
cursor.execute('DROP TABLE postgresql_replica;')
|
||||||
|
|
||||||
|
|
||||||
|
def test_abrupt_connection_loss_while_heavy_replication(started_cluster):
|
||||||
|
instance.query("DROP DATABASE IF EXISTS test_database")
|
||||||
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
||||||
|
port=started_cluster.postgres_port,
|
||||||
|
database=True)
|
||||||
|
cursor = conn.cursor()
|
||||||
|
create_postgres_table(cursor, 'postgresql_replica');
|
||||||
|
|
||||||
|
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
|
||||||
|
create_materialized_table(ip=started_cluster.postgres_ip,
|
||||||
|
port=started_cluster.postgres_port)
|
||||||
|
|
||||||
|
for i in range(len(queries)):
|
||||||
|
query = queries[i]
|
||||||
|
cursor.execute(query)
|
||||||
|
print('query {}'.format(query))
|
||||||
|
|
||||||
|
started_cluster.pause_container('postgres1')
|
||||||
|
|
||||||
|
result = instance.query("SELECT count() FROM test.postgresql_replica")
|
||||||
|
print(result) # Just debug
|
||||||
|
|
||||||
|
started_cluster.unpause_container('postgres1')
|
||||||
|
|
||||||
|
check_tables_are_synchronized('postgresql_replica');
|
||||||
|
|
||||||
|
result = instance.query("SELECT count() FROM test.postgresql_replica")
|
||||||
|
print(result) # Just debug
|
||||||
|
|
||||||
|
|
||||||
|
def test_abrupt_server_restart_while_heavy_replication(started_cluster):
|
||||||
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
||||||
|
port=started_cluster.postgres_port,
|
||||||
|
database=True)
|
||||||
|
cursor = conn.cursor()
|
||||||
|
create_postgres_table(cursor, 'postgresql_replica');
|
||||||
|
|
||||||
|
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
|
||||||
|
create_materialized_table(ip=started_cluster.postgres_ip,
|
||||||
|
port=started_cluster.postgres_port)
|
||||||
|
|
||||||
|
for query in queries:
|
||||||
|
cursor.execute(query)
|
||||||
|
print('query {}'.format(query))
|
||||||
|
|
||||||
|
instance.restart_clickhouse()
|
||||||
|
|
||||||
|
result = instance.query("SELECT count() FROM test.postgresql_replica")
|
||||||
|
print(result) # Just debug
|
||||||
|
|
||||||
|
check_tables_are_synchronized('postgresql_replica');
|
||||||
|
|
||||||
|
result = instance.query("SELECT count() FROM test.postgresql_replica")
|
||||||
|
print(result) # Just debug
|
||||||
|
|
||||||
|
|
||||||
|
def test_drop_table_immediately(started_cluster):
|
||||||
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
||||||
|
port=started_cluster.postgres_port,
|
||||||
|
database=True)
|
||||||
|
cursor = conn.cursor()
|
||||||
|
create_postgres_table(cursor, 'postgresql_replica');
|
||||||
|
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(100000)")
|
||||||
|
|
||||||
|
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
|
||||||
|
create_materialized_table(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port)
|
||||||
|
instance.query('DROP TABLE test.postgresql_replica')
|
||||||
|
create_materialized_table(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port)
|
||||||
|
check_tables_are_synchronized('postgresql_replica');
|
||||||
|
instance.query('DROP TABLE test.postgresql_replica')
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
cluster.start()
|
cluster.start()
|
||||||
input("Cluster created, press any key to destroy...")
|
input("Cluster created, press any key to destroy...")
|
||||||
|
Loading…
Reference in New Issue
Block a user