Alternatives to XA

  • message deduplication (e.g. Camel Idempotent Consumer)
  • Change Data Capture?

Cookbook

XA transactions with Oracle AQ and Camel on Spring Boot

Here’s how to configure a Camel JMS component wrapping Oracle AQ, to work with XA transactions, on Spring Boot:

  • First add a transaction manager, such as Narayana - easily done with the Narayana for Spring Boot starter dependency. Narayana will implement the JTA (TransactionManager) and Spring (PlatformTransactionManager) transaction manager interfaces.

  • Add a generic, auto-enlisting, pooled connection factory such as pooled-jms from messaginghub. This will enlist the Oracle XA resource in the transaction.

@Bean(name = "oracleaq")
JmsComponent oracleAQJmsComponent(org.springframework.transaction.PlatformTransactionManager transactionManager,
                                  javax.transaction.TransactionManager jtaTransactionManager)
        throws JMSException, SQLException {
    oracle.jdbc.xa.client.OracleXADataSource oracleXADataSource = new OracleXADataSource();
    oracleXADataSource.setURL("jdbc:oracle:thin:@localhost:1521:ORCLCDB");
    oracleXADataSource.setUser("scott");
    oracleXADataSource.setPassword("tiger");

    // Now we've created the XA datasource, we need something that will generate an XAConnection for us. A factory, perhaps!
    // Oracle client has a curious static method to do this.
    javax.jms.XAConnectionFactory oracleXACF = oracle.jms.AQjmsFactory.getXAConnectionFactory(oracleXADataSource);

    // Presumably this is a non-enlisting XAConnectionFactory.
    // So we need to wrap it in an enlisting connection factory?
    // Similar to: https://access.redhat.com/documentation/en-us/red_hat_fuse/7.2/html-single/apache_karaf_transaction_guide/index#manual-deployment-connection-factories
    org.messaginghub.pooled.jms.JmsPoolXAConnectionFactory pooledJmsXACF = new JmsPoolXAConnectionFactory();
    pooledJmsXACF.setConnectionFactory(oracleXACF);
    pooledJmsXACF.setTransactionManager(jtaTransactionManager); // Narayana implementing the JTA interface

    // Now create a Camel JMS component and wire in the enlisting connection factory and Narayana
    org.apache.camel.component.jms.JmsComponent jms = new JmsComponent();
    jms.setConnectionFactory(pooledJmsXACF);
    jms.setTransactionManager(transactionManager); // again Narayana, implementing the Spring interface
    jms.setTransacted(false);

    return jms;
}

When this is run with the Byteman rules (see below), this should happen in the logs (this is an example transaction with Oracle AQ, Postgresql and ActiveMQ Artemis):

***** JTA Transaction#init                     (TransactionImple < ac, NoTransaction >)
***** JTA Transaction#registerSynchronization  (TransactionImple < ac, BasicAction: 0:ffffc0a801ea:85d3:5de3ea97:21 status: ActionStatus.RUNNING >,sync=org.messaginghub.pooled.jms.pool.PooledXAConnection$Synchronization@14f834b1)
***** JTA Transaction#enlistResource           (xaRes=oracle.jms.AQjmsXAResource@1c4bc8d6)
***** JTA-XA XAResource#setTransactionTimeout  (oracle.jms.AQjmsXAResource@1c4bc8d6;seconds=60)
***** JTA-XA XAResource#setTransactionTimeout  (oracle.jdbc.driver.T4CXAResource@7fd2f093;seconds=60)
***** JTA-XA XAResource#start                  (oracle.jms.AQjmsXAResource@1c4bc8d6)
2019-12-01 16:30:26.774  INFO 25735 --- [sumer[FOOQUEUE]] route4                                   : Message sent to outbound: TEST JAVA
***** JTA-XA XAResource#isSameRM               (oracle.jms.AQjmsXAResource@1c4bc8d6;xares=org.postgresql.xa.PGXAConnection@935d3f9)
***** JTA-XA XAResource#isSameRM               (oracle.jdbc.driver.T4CXAResource@7fd2f093;xares=org.postgresql.xa.PGXAConnection@935d3f9)
***** JTA-XA XAResource#setTransactionTimeout  (org.postgresql.xa.PGXAConnection@935d3f9;seconds=60)
***** JTA-XA XAResource#start                  (org.postgresql.xa.PGXAConnection@935d3f9)
***** JMS-XA XAConnectionFactory#createXAConnection     (org.apache.activemq.ActiveMQXAConnectionFactory@1d239476)
***** JTA-XA XAResource#init                   (TransactionContext{transactionId=null,connection=null})
***** JTA Transaction#enlistResource           (xaRes=TransactionContext{transactionId=null,connection=ActiveMQConnection...})
***** JTA-XA XAResource#isSameRM               (org.postgresql.xa.PGXAConnection@935d3f9)
***** JTA-XA XAResource#isSameRM               (oracle.jms.AQjmsXAResource@1c4bc8d6)
***** JTA-XA XAResource#isSameRM               (oracle.jdbc.driver.T4CXAResource@7fd2f093)
***** JTA-XA XAResource#setTransactionTimeout  (TransactionContext{transactionId=null,connection=ActiveMQConnection...)
***** JTA-XA XAResource#start                  (TransactionContext{transactionId=null,connection=ActiveMQConnection...)
***** JTA Transaction#delistResource           (xaRes=org.postgresql.xa.PGXAConnection@935d3f9,flag=67108864)
***** JTA-XA XAResource#end                    (org.postgresql.xa.PGXAConnection@935d3f9)
***** JTA-XA XAResource#end                    (oracle.jms.AQjmsXAResource@1c4bc8d6)
***** JTA-XA XAResource#prepare                (oracle.jms.AQjmsXAResource@1c4bc8d6)
***** JTA-XA XAResource#prepare                (org.postgresql.xa.PGXAConnection@935d3f9)
***** JTA-XA XAResource#end                    (TransactionContext{..,connection=ActiveMQConnection)
***** JTA-XA XAResource#prepare                (TransactionContext{..,connection=ActiveMQConnection)
***** JTA-XA XAResource#commit                 (oracle.jms.AQjmsXAResource@1c4bc8d6)
***** JTA-XA XAResource#commit                 (org.postgresql.xa.PGXAConnection@935d3f9)
***** JTA-XA XAResource#commit                 (TransactionContext{transactionId=null)

Testing and monitoring

Unit testing

This sample test from the Narayana Spring Boot repo shows how to use Byteman to enforce a failure before committing, testing that XA is working:

@Test
@BMRule(name = "Fail before commit",
        targetClass = "com.arjuna.ats.arjuna.coordinator.BasicAction",
        targetMethod = "phase2Commit",
        targetLocation = "ENTRY",
        helper = "me.snowdrop.boot.narayana.utils.BytemanHelper",
        action = "incrementCommitsCounter(); failFirstCommit($0.get_uid());")
public void testCrashBeforeCommit() throws Exception {
    // Setup dummy XAResource and its recovery helper
    setupXaMocks();

    this.transactionManager.begin();
    this.transactionManager.getTransaction()
            .enlistResource(this.xaResource);
    this.messagesService.sendMessage("test-message");
    Entry entry = this.entriesService.createEntry("test-value");
    try {
        // Byteman rule will cause commit to fail
        this.transactionManager.commit();
        fail("Exception was expected");
    } catch (Exception ignored) {
    }

    // Just after crash message and entry shouldn't be available
    assertThat(this.messagesService.getReceivedMessages())
            .isEmpty();
    assertThat(this.entriesService.getEntries())
            .isEmpty();

    await("Wait for the recovery to happen")
            .atMost(Duration.ofSeconds(30))
            .untilAsserted(() -> {
                assertThat(this.messagesService.getReceivedMessages())
                        .as("Test message should have been received after transaction was committed")
                        .containsOnly("test-message");
                assertThat(this.entriesService.getEntries())
                        .as("Test entry should exist after transaction was committed")
                        .containsOnly(entry);
            });
}

The test above references this helper class for Byteman:

public class BytemanHelper {

    private static int commitsCounter;

    public static void reset() {
        commitsCounter = 0;
    }

    public void failFirstCommit(Uid uid) {
        // Increment is called first, so counter should be 1
        if (commitsCounter == 1) {
            System.out.println(BytemanHelper.class.getName() + " fail first commit");
            ActionManager.manager().remove(uid);
            ThreadActionData.popAction();
            throw new RuntimeException("Failing first commit");
        }
    }

    public void incrementCommitsCounter() {
        commitsCounter++;
        System.out.println(BytemanHelper.class.getName() + " increment commits counter: " + commitsCounter);
    }

}

Instrumentation with Byteman

Here are some sample Byteman rules, which will print log statements at key phases during a transaction - useful for monitoring the status of a transaction:

RULE Transaction.<init>
INTERFACE javax.transaction.Transaction
METHOD <init>
AT ENTRY
IF TRUE
DO traceln("***** JTA ***** Transaction#<init>: " + $0)
ENDRULE

RULE Transaction.commit
INTERFACE javax.transaction.Transaction
METHOD commit()
IF TRUE
DO traceln("***** JTA ***** Transaction#commit: " + $0)
ENDRULE

RULE Transaction.rollback
INTERFACE javax.transaction.Transaction
METHOD rollback()
IF TRUE
DO traceln("***** JTA ***** Transaction#rollback: " + $0)
ENDRULE


#############


RULE XAResource.<init>
INTERFACE javax.transaction.xa.XAResource
METHOD <init>
AT ENTRY
IF TRUE
DO traceln("XA-BTM ***** XAResource -- create : " + $0)
ENDRULE

RULE follow XAResource start
INTERFACE javax.transaction.xa.XAResource
METHOD start(Xid, int )
AT ENTRY
IF TRUE
DO traceln("XA-BTM ***** XAResource -- start : " + $0)
ENDRULE

RULE follow XAResource commit
INTERFACE javax.transaction.xa.XAResource
METHOD commit(Xid, boolean )
AT ENTRY
IF TRUE
DO traceln("XA-BTM ***** XAResource -- commit  : " + $0)
ENDRULE

RULE follow XAResource end
INTERFACE javax.transaction.xa.XAResource
METHOD end(Xid, int )
AT ENTRY
IF TRUE
DO traceln("XA-BTM ***** XAResource -- end  : " + $0)
ENDRULE


RULE follow XAResource prepare
INTERFACE javax.transaction.xa.XAResource
METHOD prepare(Xid)
AT ENTRY
IF TRUE
DO traceln("XA-BTM ***** XAResource -- prepare  : " + $0)
ENDRULE

RULE follow XAResource recover
INTERFACE javax.transaction.xa.XAResource
METHOD recover(int)
AT ENTRY
IF TRUE
DO traceln("XA-BTM ***** XAResource -- recover  : " + $0)
ENDRULE


RULE follow XAResource rollback
INTERFACE javax.transaction.xa.XAResource
METHOD rollback(Xid)
AT ENTRY
IF TRUE
DO traceln("XA-BTM ***** XAResource -- rollback  : " + $0)
ENDRULE

RULE follow XAResource forget
INTERFACE javax.transaction.xa.XAResource
METHOD forget(Xid)
AT ENTRY
IF TRUE
DO traceln("XA-BTM ***** XAResource -- forget  : " + $0)
ENDRULE

RULE follow XAResource isSameRM
INTERFACE javax.transaction.xa.XAResource
METHOD isSameRM(XAResource)
AT ENTRY
IF TRUE
DO traceln("XA-BTM ***** XAResource -- isSameRM  : " + $0)
ENDRULE

RULE follow XAResource setTransactionTimeout
INTERFACE javax.transaction.xa.XAResource
METHOD setTransactionTimeout(int)
AT ENTRY
IF TRUE
DO traceln("XA-BTM ***** XAResource -- setTransactionTimeout  : " + $0)
ENDRULE