blob: 5925bfbc941d82c543ce2443b132f102825013e9 [file] [log] [blame]
// SPDX-License-Identifier: LGPL-2.1-or-later
// Copyright (c) 2012-2014 Monty Program Ab
// Copyright (c) 2015-2021 MariaDB Corporation Ab
package org.mariadb.jdbc.integration;
import static org.junit.jupiter.api.Assertions.*;
import java.io.ByteArrayInputStream;
import java.sql.*;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.mariadb.jdbc.Connection;
import org.mariadb.jdbc.Statement;
import org.mariadb.jdbc.export.HaMode;
public class FailoverTest extends Common {
@Test
public void simpleFailoverTransactionReplay() throws SQLException {
Assumptions.assumeTrue(
!"skysql".equals(System.getenv("srv")) && !"skysql-ha".equals(System.getenv("srv")));
try (Connection con = createProxyCon(HaMode.SEQUENTIAL, "")) {
con.setNetworkTimeout(Runnable::run, 200);
long threadId = con.getContext().getThreadId();
Statement stmt = con.createStatement();
proxy.restart(200);
assertThrowsContains(
SQLTransientConnectionException.class,
() -> stmt.execute("SELECT 1"),
"Driver has reconnect connection after a communications link failure");
;
Assertions.assertTrue(con.getContext().getThreadId() != threadId);
}
}
@Test
public void simpleFailoverTransactionReplayNoQuery() throws SQLException {
Assumptions.assumeTrue(
!"skysql".equals(System.getenv("srv")) && !"skysql-ha".equals(System.getenv("srv")));
try (Connection con = createProxyCon(HaMode.SEQUENTIAL, "")) {
con.setNetworkTimeout(Runnable::run, 200);
long threadId = con.getContext().getThreadId();
Statement stmt = con.createStatement();
proxy.restart(200);
con.isValid(1000);
Assertions.assertTrue(con.getContext().getThreadId() != threadId);
}
}
@Test
public void transactionReplay() throws SQLException {
transactionReplay(true);
transactionReplay(false);
}
private void transactionReplay(boolean transactionReplay) throws SQLException {
Assumptions.assumeTrue(
!"skysql".equals(System.getenv("srv")) && !"skysql-ha".equals(System.getenv("srv")));
Statement st = sharedConn.createStatement();
st.execute("DROP TABLE IF EXISTS transaction_failover");
st.execute(
"CREATE TABLE transaction_failover "
+ "(id int not null primary key auto_increment, test varchar(20)) "
+ "engine=innodb");
try (Connection con =
createProxyCon(HaMode.SEQUENTIAL, "&transactionReplay=" + transactionReplay)) {
assertEquals(Connection.TRANSACTION_REPEATABLE_READ, con.getTransactionIsolation());
con.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
final Statement stmt = con.createStatement();
con.setNetworkTimeout(Runnable::run, 200);
long threadId = con.getContext().getThreadId();
stmt.executeUpdate("INSERT INTO transaction_failover (test) VALUES ('test0')");
con.setAutoCommit(false);
stmt.executeUpdate("INSERT INTO transaction_failover (test) VALUES ('test1')");
stmt.executeUpdate("INSERT INTO transaction_failover (test) VALUES ('test2')");
proxy.restart(300);
if (transactionReplay) {
stmt.executeUpdate("INSERT INTO transaction_failover (test) VALUES ('test3')");
con.commit();
ResultSet rs = stmt.executeQuery("SELECT * FROM transaction_failover");
for (int i = 0; i < 4; i++) {
assertTrue(rs.next());
assertEquals("test" + i, rs.getString("test"));
}
con.commit();
Assertions.assertTrue(con.getContext().getThreadId() != threadId);
assertFalse(con.getAutoCommit());
assertEquals(Connection.TRANSACTION_READ_UNCOMMITTED, con.getTransactionIsolation());
} else {
Common.assertThrowsContains(
SQLTransientConnectionException.class,
() -> stmt.executeUpdate("INSERT INTO transaction_failover (test) VALUES ('test3')"),
"In progress transaction was lost");
}
}
}
@Test
public void transactionReplayDuringCommit() throws SQLException {
transactionReplayDuringCommit(true);
transactionReplayDuringCommit(false);
}
private void transactionReplayDuringCommit(boolean transactionReplay) throws SQLException {
Assumptions.assumeTrue(
!"skysql".equals(System.getenv("srv")) && !"skysql-ha".equals(System.getenv("srv")));
Statement st = sharedConn.createStatement();
st.execute("DROP TABLE IF EXISTS transaction_failover");
st.execute(
"CREATE TABLE transaction_failover "
+ "(id int not null primary key auto_increment, test varchar(20)) "
+ "engine=innodb");
try (Connection con =
createProxyCon(HaMode.SEQUENTIAL, "&transactionReplay=" + transactionReplay)) {
assertEquals(Connection.TRANSACTION_REPEATABLE_READ, con.getTransactionIsolation());
con.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
final Statement stmt = con.createStatement();
con.setNetworkTimeout(Runnable::run, 200);
long threadId = con.getContext().getThreadId();
stmt.executeUpdate("INSERT INTO transaction_failover (test) VALUES ('test0')");
con.setAutoCommit(false);
stmt.executeUpdate("INSERT INTO transaction_failover (test) VALUES ('test1')");
stmt.executeUpdate("INSERT INTO transaction_failover (test) VALUES ('test2')");
proxy.restart(300);
if (transactionReplay) {
Common.assertThrowsContains(
SQLTransientConnectionException.class,
con::commit,
"Driver has reconnect connection after a communications failure");
ResultSet rs = stmt.executeQuery("SELECT * FROM transaction_failover");
for (int i = 0; i < 1; i++) {
assertTrue(rs.next());
assertEquals("test" + i, rs.getString("test"));
}
Assertions.assertTrue(con.getContext().getThreadId() != threadId);
assertFalse(con.getAutoCommit());
assertEquals(Connection.TRANSACTION_READ_UNCOMMITTED, con.getTransactionIsolation());
} else {
Common.assertThrowsContains(
SQLTransientConnectionException.class, con::commit, "during a COMMIT statement");
}
}
}
@Test
public void transactionReplayPreparedStatement() throws Exception {
Assumptions.assumeTrue(
!"skysql".equals(System.getenv("srv")) && !"skysql-ha".equals(System.getenv("srv")));
transactionReplayPreparedStatement(true, true);
transactionReplayPreparedStatement(false, true);
transactionReplayPreparedStatement(true, false);
transactionReplayPreparedStatement(false, false);
}
private void transactionReplayPreparedStatement(boolean binary, boolean transactionReplay)
throws SQLException {
Statement stmt = sharedConn.createStatement();
stmt.execute("DROP TABLE IF EXISTS transaction_failover_3");
stmt.execute(
"CREATE TABLE transaction_failover_3 "
+ "(id int not null primary key auto_increment, test varchar(20)) "
+ "engine=innodb");
try (Connection con =
createProxyCon(
HaMode.SEQUENTIAL,
"&useServerPrepStmts=" + binary + "&transactionReplay=" + transactionReplay)) {
stmt = con.createStatement();
con.setNetworkTimeout(Runnable::run, 200);
long threadId = con.getContext().getThreadId();
stmt.executeUpdate("INSERT INTO transaction_failover_3 (test) VALUES ('test0')");
con.setAutoCommit(false);
stmt.executeUpdate("INSERT INTO transaction_failover_3 (test) VALUES ('test1')");
try (PreparedStatement p =
con.prepareStatement("INSERT INTO transaction_failover_3 (test) VALUES (?)")) {
p.setString(1, "test2");
p.execute();
p.setAsciiStream(1, new ByteArrayInputStream("test3".getBytes()));
p.execute();
proxy.restart(300);
p.setString(1, "test4");
if (transactionReplay) {
p.execute();
} else {
Common.assertThrowsContains(
SQLTransientConnectionException.class,
p::execute,
"In progress transaction was lost");
}
}
if (transactionReplay) {
con.commit();
ResultSet rs = stmt.executeQuery("SELECT * FROM transaction_failover_3");
for (int i = 0; i < 5; i++) {
assertTrue(rs.next());
assertEquals("test" + i, rs.getString("test"));
}
con.commit();
Assertions.assertTrue(con.getContext().getThreadId() != threadId);
assertFalse(con.getAutoCommit());
}
}
}
@Test
public void transactionReplayPreparedStatementBatch() throws Exception {
Assumptions.assumeTrue(
!"maxscale".equals(System.getenv("srv"))
&& !"skysql".equals(System.getenv("srv"))
&& !"skysql-ha".equals(System.getenv("srv")));
for (int i = 0; i < 8; i++) {
transactionReplayPreparedStatementBatch((i & 1) > 0, (i & 2) > 0, (i & 4) > 0);
}
}
private void transactionReplayPreparedStatementBatch(
boolean text, boolean useBulk, boolean transactionReplay) throws SQLException {
Statement stmt = sharedConn.createStatement();
stmt.execute("DROP TABLE IF EXISTS transaction_failover_2");
stmt.execute(
"CREATE TABLE transaction_failover_2 "
+ "(id int not null primary key auto_increment, test varchar(20)) "
+ "engine=innodb");
try (Connection con =
createProxyCon(
HaMode.SEQUENTIAL,
"&useServerPrepStmts="
+ !text
+ "&useBulkStmts="
+ useBulk
+ "&transactionReplay="
+ transactionReplay)) {
con.setNetworkTimeout(Runnable::run, 500);
long threadId = con.getContext().getThreadId();
execute(con, transactionReplay, threadId);
threadId = con.getContext().getThreadId();
execute(con, transactionReplay, threadId);
}
}
private void execute(Connection con, boolean transactionReplay, long threadId)
throws SQLException {
Statement stmt = con.createStatement();
stmt.executeUpdate("INSERT INTO transaction_failover_2 (test) VALUES ('test0')");
con.setAutoCommit(false);
stmt.executeUpdate("INSERT INTO transaction_failover_2 (test) VALUES ('test1')");
try (PreparedStatement p =
con.prepareStatement("INSERT INTO transaction_failover_2 (test) VALUES (?)")) {
p.setString(1, "test2");
p.execute();
p.setString(1, "test3");
p.addBatch();
p.setString(1, "test4");
p.addBatch();
p.executeBatch();
proxy.restart(300);
p.setString(1, "test5");
p.addBatch();
p.setString(1, "test6");
p.addBatch();
if (transactionReplay) {
p.executeBatch();
con.commit();
ResultSet rs = stmt.executeQuery("SELECT * FROM transaction_failover_2");
for (int i = 0; i < 6; i++) {
assertTrue(rs.next());
assertEquals("test" + i, rs.getString("test"));
}
con.commit();
Assertions.assertTrue(con.getContext().getThreadId() != threadId);
assertFalse(con.getAutoCommit());
} else {
try {
p.executeBatch();
Assertions.fail();
} catch (SQLException e) {
SQLException ee = (SQLException) ((e instanceof BatchUpdateException) ? e.getCause() : e);
assertEquals(ee.getMessage(), e.getMessage());
assertEquals(ee.getSQLState(), e.getSQLState());
assertEquals(ee.getErrorCode(), e.getErrorCode());
assertTrue(ee.getMessage().contains("In progress transaction was lost"));
}
}
}
stmt.execute("TRUNCATE transaction_failover_2");
stmt.executeUpdate("INSERT INTO transaction_failover_2 (test) VALUES ('test0')");
con.setAutoCommit(false);
stmt.executeUpdate("INSERT INTO transaction_failover_2 (test) VALUES ('test1')");
try (PreparedStatement p =
con.prepareStatement("INSERT INTO transaction_failover_2 (test) VALUES (?)")) {
proxy.restart(300);
p.setString(1, "test2");
p.addBatch();
p.setString(1, "test3");
p.addBatch();
p.setString(1, "test4");
p.addBatch();
p.setString(1, "test5");
p.addBatch();
if (transactionReplay) {
p.executeBatch();
con.commit();
ResultSet rs = stmt.executeQuery("SELECT * FROM transaction_failover_2");
for (int i = 0; i < 5; i++) {
assertTrue(rs.next());
assertEquals("test" + i, rs.getString("test"));
}
con.commit();
Assertions.assertTrue(con.getContext().getThreadId() != threadId);
assertFalse(con.getAutoCommit());
} else {
try {
p.executeBatch();
Assertions.fail();
} catch (SQLException e) {
Throwable ee = (e instanceof BatchUpdateException) ? e.getCause() : e;
assertTrue(ee.getMessage().contains("In progress transaction was lost"));
}
}
}
}
}