博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringBoot 整合rocketMQ 过滤消息,事务消息
阅读量:2169 次
发布时间:2019-05-01

本文共 5749 字,大约阅读时间需要 19 分钟。

RocketMq提供消息过滤的功能,用于同一topic下,区分不同业务场景的消息。

Tag,即消息标签,用于对某个Topic下的消息进行分类。消息队列RocketMQ版的生产者在发送消息时,已经指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。

使用springboot-starter的方式发送tag消息,只需要如下形式即可,不需要单独指定参数:

topic:tag

示例代码:

控制器,方便测试,将tag作为接口参数:

/**     * 消息过滤,发送tag消息     */    @RequestMapping("/send/tag/{tag}")    public void tag(@PathVariable("tag") String tag) {        //发送格式topic:tag        rocketMqProducer.tag("消息过滤,发送tag消息", "test_tag:"+tag);    }

 

生产者:

/**     * tag消息过滤     */    public void tag(String msgBody, String topic) {        SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());        if (ObjectUtils.isNotEmpty(sendResult)) {            //sendResult不空则表示消息发送成功            log.info("send success , send msg = {}, messageId = {}", msgBody, sendResult.getMsgId());        }    }

消费者,获取tag为“Tom”的消息,selectorExpression设置为Tom,全部接收设置为*:

@Slf4j@Component@RocketMQMessageListener(topic = "test_tag", selectorExpression = "Tom", consumerGroup = "test_tag")public class TagMessageListener implements RocketMQListener
{ @Override public void onMessage(MessageExt messageExt) { byte[] body = messageExt.getBody(); String msg = new String(body); log.info("receive sync message:{}", msg); }}

 下面分别发送Tom,Jerry,分别看结果:

发送Tom,接收到消息

 

事务消息

原理

Half(Prepare) Message
指的是暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次 确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。

Message Status Check

由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长 期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。

执行流程

  1. 发送方向 MQ 服务端发送消息。
  2. MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半 消息,订阅方将不会接受该消息。
  5. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
  6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

定义生产者

import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class SpringTransactionProducer {    @Autowired    private RocketMQTemplate rocketMQTemplate;    /**     * 发送消息     *     * @param topic     * @param msg     */    public void sendMsg(String topic, String msg) {        Message message = MessageBuilder.withPayload(msg).build();        // myTransactionGroup要和@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")定义的一致        this.rocketMQTemplate.sendMessageInTransaction("myTransactionGroup",topic,message, null);        System.out.println("发送消息成功");    }}

 消费者

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "spring-tx-my-topic",                            consumerGroup = "tx-consumer",                            selectorExpression = "*")public class SpringTxConsumer implements RocketMQListener
{ @Override public void onMessage(String msg) { System.out.println("接收到消息 -> " + msg); }}

定义TransactionListenerImpl

用于监听事务消息

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.springframework.messaging.Message;import java.util.HashMap;import java.util.Map;@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")public class TransactionListenerImpl implements RocketMQLocalTransactionListener {    private static Map
STATE_MAP = new HashMap<>(); /** * 执行业务逻辑 * * @param message * @param o * @return */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); try { System.out.println("执行操作"); Thread.sleep(500); // 设置事务状态 STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT); // 返回事务状态给生产者 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { e.printStackTrace(); } STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK); return RocketMQLocalTransactionState.ROLLBACK; } /** * 回查 * * @param message * @return */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); System.out.println("回查消息 -> transId = " + transId + ", state = " + STATE_MAP.get(transId)); return STATE_MAP.get(transId); }}

测试类

import com.sunyuqi.transaction.SpringTransactionProducer;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)@SpringBootTest(classes = DemoApplication.class)public class TxMessgeTest {    @Autowired    private SpringTransactionProducer springTransactionProducer;    @Test    public void test()    {        springTransactionProducer.sendMsg("spring-tx-my-topic","事务消息");        try {            Thread.sleep(100000L);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

 

转载地址:http://phxzb.baihongyu.com/

你可能感兴趣的文章
【LEETCODE】228-Summary Ranges
查看>>
【LEETCODE】27-Remove Element
查看>>
【LEETCODE】66-Plus One
查看>>
【LEETCODE】26-Remove Duplicates from Sorted Array
查看>>
【LEETCODE】118-Pascal's Triangle
查看>>
【LEETCODE】119-Pascal's Triangle II
查看>>
【LEETCODE】88-Merge Sorted Array
查看>>
【LEETCODE】19-Remove Nth Node From End of List
查看>>
【LEETCODE】125-Valid Palindrome
查看>>
【LEETCODE】28-Implement strStr()
查看>>
【LEETCODE】6-ZigZag Conversion
查看>>
【LEETCODE】8-String to Integer (atoi)
查看>>
【LEETCODE】14-Longest Common Prefix
查看>>
【LEETCODE】38-Count and Say
查看>>
【LEETCODE】278-First Bad Version
查看>>
【LEETCODE】303-Range Sum Query - Immutable
查看>>
【LEETCODE】21-Merge Two Sorted Lists
查看>>
【LEETCODE】231-Power of Two
查看>>
【LEETCODE】172-Factorial Trailing Zeroes
查看>>
【LEETCODE】112-Path Sum
查看>>