190710-ActiveMQ事务

ActiveMQ事务

关于事务的定义及ACID特性这里不赘述。
对比MySQL数据库来说,MySQL有事务的概念,ActiveMQ也有事务的概念,这里说的都是本地事务,rocketMq还支持分布式事务

消息事务,是保证消息传递原子性的一个重要特性,和JDBC的事务特征类似,一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器。生产者,消费者与消息服务器都支持事务性。ActiveMQ得事务主要偏向在生产者得应用

事务具体实现

java制定了jdbc来规范对数据库的访问,同样的,java也有jms(java message services)来规范对于消息中间件的访问,activemq是完全支持jms1.1规范的

mysql事务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void main(String[] args) {
Connection conn =getConnection();
try {
conn.setAutoCommit(false);
insertUser(conn);
insertAddress(conn);
conn.commit();
} catch (SQLException e) {
System.out.println("************事务处理出现异常***********");
e.printStackTrace();
try {
conn.rollback();
System.out.println("*********事务回滚成功***********");
} catch (Exception e2) {
// TODO: handle exception
e2.printStackTrace();
}finally {
try {
conn.close();
} catch (SQLException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
}

ActiveMQ事务消息实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Test
public void p2pSender() {
//获取连接工厂
ConnectionFactory connectionFactory = jmsQueueTemplate.getConnectionFactory();
Session session = null;
try {
Connection connection = connectionFactory.createConnection();
// 参数一:是否开启消息事务
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

MessageProducer producer = session.createProducer(session.createQueue("test.trsaction"));
//发送10条消息,开启事务后,要么一起成功,要么一起失败
for (int i = 0; i < 10; i++) {
if (i == 4) {
//出现异常
// throw new RuntimeException("i cannot equals 4");
}
TextMessage textMessage = session.createTextMessage("消息-----" + i);
producer.send(textMessage);
}
//开启事务后,需要手动提交
session.commit();
} catch (JMSException e) {
e.printStackTrace();
//消息事务回滚
try {
session.rollback();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}

手动制造了一个异常,没有异常时:可以成功发送10条消息

success_10

异常那行代码打开,再次执行,从控制台可以发现,信息并没有发出去。

error_10

ActiveMQ与MySQL事务总结

整合入spring后,应该委托spring进行事务管理,对于Mysql,我们配置了DatasourceTransactionManager,使用@transactional注解即可。
对于activemq也是一样,不过使用的 就是JmsTransactionManager了。

消息的事务

消息事务是在生产者producer到broker或broker到consumer过程中同一个session中发生的,保证几条消息在发送过程中的原子性,在支持事务的session中,producer发送message时在message中带有transactionID。broker收到message后判断是否有transactionID,如果有就把message保存在transaction store中,等待commit或者rollback消息。

消息生产者的事务

在ActiveMQ中,Connection类的createSession方法有两个参数,一个是消息确认机制(自动确认还是手动确认),另一个是事务消息,当我们把这个事务消息设置为true的时候,activeMQ不会主动提交事务,需要我们在最后使用session.commit()方法去提交事务,这样才能在activeMQ的控制台看到消息。运行下面的代码,如果去掉session.commmit(),是看不到消息的。总之,开启事务消息以后必须要使用session.commit()方法提交消息。

消息消费者的事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class JmsConsumerTx {

public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static final String QUEUE_NAME = "queue01";

public static void main(String[] args) throws JMSException, IOException {

//1.创建连接工场,按照给定的Url地址,采用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
//2.通过连接工场,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

//创建会话session
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

//创建目的地
Queue queue = session.createQueue(QUEUE_NAME);

//创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);


while (true) {
//循环等待接收消息,消费消息
TextMessage textMessage = (TextMessage) messageConsumer.receive(4000L);
if (null != textMessage) {
System.out.println("***消费者接受到消息***:" + textMessage.getText());
} else {
break;
}
}

session.commit();
messageConsumer.close();
session.close();
connection.close();
}
}

参考资料

  1. ActiveMQ事务、异步发送、消息确认概念
  2. ActiveMQ的事务消息
  3. ActiveMQ的消息事务和消息的确认机制
  4. ActiveMQ事务消息和非事务消息
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×