本文共 5749 字,大约阅读时间需要 19 分钟。
RocketMq提供消息过滤的功能,用于同一topic下,区分不同业务场景的消息。
Tag,即消息标签,用于对某个Topic下的消息进行分类。消息队列RocketMQ版的生产者在发送消息时,已经指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。
使用springboot-starter的方式发送tag消息,只需要如下形式即可,不需要单独指定参数:
topic:tag
示例代码:
控制器,方便测试,将tag作为接口参数:/** * 消息过滤,发送tag消息 */ @RequestMapping("/send/tag/{tag}") public void tag(@PathVariable("tag") String tag) { //发送格式topic:tag rocketMqProducer.tag("消息过滤,发送tag消息", "test_tag:"+tag); }
生产者:
/** * tag消息过滤 */ public void tag(String msgBody, String topic) { SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build()); if (ObjectUtils.isNotEmpty(sendResult)) { //sendResult不空则表示消息发送成功 log.info("send success , send msg = {}, messageId = {}", msgBody, sendResult.getMsgId()); } }
消费者,获取tag为“Tom”的消息,selectorExpression设置为Tom,全部接收设置为*:
@Slf4j@Component@RocketMQMessageListener(topic = "test_tag", selectorExpression = "Tom", consumerGroup = "test_tag")public class TagMessageListener implements RocketMQListener{ @Override public void onMessage(MessageExt messageExt) { byte[] body = messageExt.getBody(); String msg = new String(body); log.info("receive sync message:{}", msg); }}
下面分别发送Tom,Jerry,分别看结果:
发送Tom,接收到消息
事务消息
原理
Half(Prepare) Message 指的是暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次 确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。Message Status Check
由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长 期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。执行流程
定义生产者
import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class SpringTransactionProducer { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送消息 * * @param topic * @param msg */ public void sendMsg(String topic, String msg) { Message message = MessageBuilder.withPayload(msg).build(); // myTransactionGroup要和@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")定义的一致 this.rocketMQTemplate.sendMessageInTransaction("myTransactionGroup",topic,message, null); System.out.println("发送消息成功"); }}
消费者
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "spring-tx-my-topic", consumerGroup = "tx-consumer", selectorExpression = "*")public class SpringTxConsumer implements RocketMQListener{ @Override public void onMessage(String msg) { System.out.println("接收到消息 -> " + msg); }}
定义TransactionListenerImpl
用于监听事务消息import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.springframework.messaging.Message;import java.util.HashMap;import java.util.Map;@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")public class TransactionListenerImpl implements RocketMQLocalTransactionListener { private static MapSTATE_MAP = new HashMap<>(); /** * 执行业务逻辑 * * @param message * @param o * @return */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); try { System.out.println("执行操作"); Thread.sleep(500); // 设置事务状态 STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT); // 返回事务状态给生产者 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { e.printStackTrace(); } STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK); return RocketMQLocalTransactionState.ROLLBACK; } /** * 回查 * * @param message * @return */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); System.out.println("回查消息 -> transId = " + transId + ", state = " + STATE_MAP.get(transId)); return STATE_MAP.get(transId); }}
测试类
import com.sunyuqi.transaction.SpringTransactionProducer;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)@SpringBootTest(classes = DemoApplication.class)public class TxMessgeTest { @Autowired private SpringTransactionProducer springTransactionProducer; @Test public void test() { springTransactionProducer.sendMsg("spring-tx-my-topic","事务消息"); try { Thread.sleep(100000L); } catch (InterruptedException e) { e.printStackTrace(); } }}
转载地址:http://phxzb.baihongyu.com/