Add one more test for db replicated and fix related bug

This commit is contained in:
Val 2020-05-27 21:33:37 +03:00
parent 0e9f516738
commit a0af67b636
2 changed files with 40 additions and 20 deletions

View File

@ -201,6 +201,13 @@ void DatabaseReplicated::propose(const ASTPtr & query) {
current_zookeeper = getZooKeeper();
auto lock = createSimpleZooKeeperLock(current_zookeeper, zookeeper_path, "propose_lock", replica_name);
while (!lock->tryLock()) {
// TODO it seems that zk lock doesn't work at all
// need to find a different solution for proposal
pcg64 rng(randomSeed());
std::this_thread::sleep_for(std::chrono::milliseconds(std::uniform_int_distribution<int>(0, 1000)(rng)));
}
// schedule and deactive combo
// ensures that replica is up to date
// and since propose lock is acquired,
@ -224,6 +231,7 @@ void DatabaseReplicated::propose(const ASTPtr & query) {
lock->unlock();
saveState();
background_log_executor->activateAndSchedule();
}
void DatabaseReplicated::updateSnapshot() {

View File

@ -33,38 +33,50 @@ def test_create_replicated_table(started_cluster):
node1.query("CREATE TABLE testdb.replicated_table (d Date, k UInt64, i32 Int32) ENGINE=ReplicatedMergeTree(d, k, 8192);")
time.sleep(DURATION_SECONDS)
logging.info(node2.query("desc table testdb.replicated_table"))
assert node1.query("desc table testdb.replicated_table") == node2.query("desc table testdb.replicated_table")
def test_alter_table(started_cluster):
def test_simple_alter_table(started_cluster):
DURATION_SECONDS = 1
node1.query("CREATE TABLE testdb.alter_test (CounterID UInt32, StartDate Date, UserID UInt32, VisitID UInt32, NestedColumn Nested(A UInt8, S String), ToDrop UInt32) ENGINE = MergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192);\
ALTER TABLE testdb.alter_test ADD COLUMN Added0 UInt32;\
ALTER TABLE testdb.alter_test ADD COLUMN Added2 UInt32;\
ALTER TABLE testdb.alter_test ADD COLUMN Added1 UInt32 AFTER Added0;\
ALTER TABLE testdb.alter_test ADD COLUMN AddedNested1 Nested(A UInt32, B UInt64) AFTER Added2;\
ALTER TABLE testdb.alter_test ADD COLUMN AddedNested1.C Array(String) AFTER AddedNested1.B;\
ALTER TABLE testdb.alter_test ADD COLUMN AddedNested2 Nested(A UInt32, B UInt64) AFTER AddedNested1;")
node1.query("CREATE TABLE testdb.alter_test (CounterID UInt32, StartDate Date, UserID UInt32, VisitID UInt32, NestedColumn Nested(A UInt8, S String), ToDrop UInt32) ENGINE = MergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192);")
node1.query("ALTER TABLE testdb.alter_test ADD COLUMN Added0 UInt32;")
node1.query("ALTER TABLE testdb.alter_test ADD COLUMN Added2 UInt32;")
node1.query("ALTER TABLE testdb.alter_test ADD COLUMN Added1 UInt32 AFTER Added0;")
node1.query("ALTER TABLE testdb.alter_test ADD COLUMN AddedNested1 Nested(A UInt32, B UInt64) AFTER Added2;")
node1.query("ALTER TABLE testdb.alter_test ADD COLUMN AddedNested1.C Array(String) AFTER AddedNested1.B;")
node1.query("ALTER TABLE testdb.alter_test ADD COLUMN AddedNested2 Nested(A UInt32, B UInt64) AFTER AddedNested1;")
time.sleep(DURATION_SECONDS)
assert node1.query("desc table testdb.alter_test") == node2.query("desc table testdb.alter_test")
def test_create_replica_from_snapshot(started_cluster):
def test_create_replica_after_delay(started_cluster):
DURATION_SECONDS = 3
"""
right now snapshot's created every 6 proposes.
later on it must be configurable
for now let's check snapshot
by creating a new node just after 10 log entries
"""
node1.query("ALTER TABLE testdb.alter_test ADD COLUMN Added3 UInt32 ;") #9
node1.query("ALTER TABLE testdb.alter_test ADD COLUMN Added4 UInt32 ;") #10
node1.query("ALTER TABLE testdb.alter_test ADD COLUMN Added5 UInt32 ;") #1
# by this moment snapshot must be created
node3.query("CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', '{replica}');")
node1.query("ALTER TABLE testdb.alter_test ADD COLUMN Added3 UInt32 ;")
node1.query("ALTER TABLE testdb.alter_test ADD COLUMN Added4 UInt32 ;")
node1.query("ALTER TABLE testdb.alter_test ADD COLUMN Added5 UInt32 ;")
time.sleep(DURATION_SECONDS)
assert node3.query("desc table testdb.alter_test") == node1.query("desc table testdb.alter_test")
def test_alters_from_different_replicas(started_cluster):
DURATION_SECONDS = 1
node1.query("CREATE TABLE testdb.concurrent_test (CounterID UInt32, StartDate Date, UserID UInt32, VisitID UInt32, NestedColumn Nested(A UInt8, S String), ToDrop UInt32) ENGINE = MergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192);")
time.sleep(DURATION_SECONDS)
node3.query("ALTER TABLE testdb.concurrent_test ADD COLUMN Added0 UInt32;")
time.sleep(DURATION_SECONDS)
node1.query("ALTER TABLE testdb.concurrent_test ADD COLUMN Added2 UInt32;")
time.sleep(DURATION_SECONDS)
node3.query("ALTER TABLE testdb.concurrent_test ADD COLUMN Added1 UInt32 AFTER Added0;")
time.sleep(DURATION_SECONDS)
node1.query("ALTER TABLE testdb.concurrent_test ADD COLUMN AddedNested1 Nested(A UInt32, B UInt64) AFTER Added2;")
time.sleep(DURATION_SECONDS)
node3.query("ALTER TABLE testdb.concurrent_test ADD COLUMN AddedNested1.C Array(String) AFTER AddedNested1.B;")
time.sleep(DURATION_SECONDS)
node1.query("ALTER TABLE testdb.concurrent_test ADD COLUMN AddedNested2 Nested(A UInt32, B UInt64) AFTER AddedNested1;")
time.sleep(DURATION_SECONDS)
assert node3.query("desc table testdb.concurrent_test") == node1.query("desc table testdb.concurrent_test")