ActiveMQ事务
关于事务的定义及ACID特性这里不赘述。
对比MySQL数据库来说,MySQL有事务的概念,ActiveMQ也有事务的概念,这里说的都是本地事务,rocketMq还支持分布式事务
消息事务,是保证消息传递原子性的一个重要特性,和JDBC的事务特征类似,一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器。生产者,消费者与消息服务器都支持事务性。ActiveMQ得事务主要偏向在生产者得应用
事务具体实现
java制定了jdbc来规范对数据库的访问,同样的,java也有jms(java message services)来规范对于消息中间件的访问,activemq是完全支持jms1.1规范的
mysql事务实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public static void main(String[] args) { Connection conn =getConnection(); try { conn.setAutoCommit(false); insertUser(conn); insertAddress(conn); conn.commit(); } catch (SQLException e) { System.out.println("************事务处理出现异常***********"); e.printStackTrace(); try { conn.rollback(); System.out.println("*********事务回滚成功***********"); } catch (Exception e2) { e2.printStackTrace(); }finally { try { conn.close(); } catch (SQLException e1) { e1.printStackTrace(); } } } }
|
ActiveMQ事务消息实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Test public void p2pSender() { ConnectionFactory connectionFactory = jmsQueueTemplate.getConnectionFactory(); Session session = null; try { Connection connection = connectionFactory.createConnection(); session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue("test.trsaction")); for (int i = 0; i < 10; i++) { if (i == 4) {
} TextMessage textMessage = session.createTextMessage("消息-----" + i); producer.send(textMessage); } session.commit(); } catch (JMSException e) { e.printStackTrace(); try { session.rollback(); } catch (JMSException ex) { ex.printStackTrace(); } } }
|
手动制造了一个异常,没有异常时:可以成功发送10条消息
异常那行代码打开,再次执行,从控制台可以发现,信息并没有发出去。
ActiveMQ与MySQL事务总结
整合入spring后,应该委托spring进行事务管理,对于Mysql,我们配置了DatasourceTransactionManager,使用@transactional注解即可。
对于activemq也是一样,不过使用的 就是JmsTransactionManager了。
消息的事务
消息事务是在生产者producer到broker或broker到consumer过程中同一个session中发生的,保证几条消息在发送过程中的原子性,在支持事务的session中,producer发送message时在message中带有transactionID。broker收到message后判断是否有transactionID,如果有就把message保存在transaction store中,等待commit或者rollback消息。
消息生产者的事务
在ActiveMQ中,Connection类的createSession方法有两个参数,一个是消息确认机制(自动确认还是手动确认),另一个是事务消息,当我们把这个事务消息设置为true的时候,activeMQ不会主动提交事务,需要我们在最后使用session.commit()方法去提交事务,这样才能在activeMQ的控制台看到消息。运行下面的代码,如果去掉session.commmit(),是看不到消息的。总之,开启事务消息以后必须要使用session.commit()方法提交消息。
消息消费者的事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class JmsConsumerTx {
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(); Connection connection = activeMQConnectionFactory.createConnection(); connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
while (true) { TextMessage textMessage = (TextMessage) messageConsumer.receive(4000L); if (null != textMessage) { System.out.println("***消费者接受到消息***:" + textMessage.getText()); } else { break; } }
session.commit(); messageConsumer.close(); session.close(); connection.close(); } }
|
参考资料
- ActiveMQ事务、异步发送、消息确认概念
- ActiveMQ的事务消息
- ActiveMQ的消息事务和消息的确认机制
- ActiveMQ事务消息和非事务消息