微服务开发系列 第七篇:RocketMQ

总概

A、技术栈

  • 开发语言:Java 1.8
  • 数据库:MySQL、Redis、MongoDB、Elasticsearch
  • 微服务框架:Spring Cloud Alibaba
  • 微服务网关:Spring Cloud Gateway
  • 服务注册和配置中心:Nacos
  • 分布式事务:Seata
  • 链路追踪框架:Sleuth
  • 服务降级与熔断:Sentinel
  • ORM框架:MyBatis-Plus
  • 分布式任务调度平台:XXL-JOB
  • 消息中间件:RocketMQ
  • 分布式锁:Redisson
  • 权限:OAuth2
  • DevOps:Jenkins、Docker、K8S

B、本节实现目标网址:yii666.com

  • [mall-order]下单,用RocketMQ消息中间件发送消息,[mall-member]监听消费给用户加积分

文章来源地址:https://www.yii666.com/blog/468837.html

一、RocketMQ安装

供参考:

  • 保姆级教程 Windows11下安装RocketMQ

  • RocketMQ基础入门

二、功能描述

用户下单(mall-order服务)后,发送下单事件MQ, mall-member服务监听消费MQ,为用户增加积分,MQ此处的作用是解耦。

三、代码实现

3.1 maven加RocketMQ依赖包

在项目[mall-pom]的pom.xml里加入RocketMQ依赖包

<rocketmq.version>2.2.3</rocketmq.version>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${rocketmq.version}</version>
</dependency>

3.2 common.yml配置RocketMQ参数

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: group-${spring.profiles.active}
    send-message-timeout: 3000 # 消息发送超时时长,默认3s
    retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
    retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

common.yml完整配置

spring:
  redis:
    database: 0
    host: 127.0.0.1
    port: 6379a
    password: 123abc
    jedis:
      pool:
        max-active: 500  #连接池的最大数据库连接数。设为0表示无限制
        max-idle: 20   #最大空闲数
        max-wait: -1
        min-idle: 5
    timeout: 1000
    redisson: 
      password: 123abc
      cluster:
        nodeAddresses: ["redis://127.0.0.1:6379"]
      single:
        address: "redis://127.0.0.1:6379"
        database: 0
    
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.100.51:3306/ac_db?serverTimezone=Asia/Shanghai&useUnicode=true&tinyInt1isBit=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true
    username: ac_u
    password: ac_PWD_123

    #hikari数据库连接池
    hikari:
      pool-name: YH_HikariCP
      minimum-idle: 10 #最小空闲连接数量
      idle-timeout: 600000 #空闲连接存活最大时间,默认600000(10分钟)
      maximum-pool-size: 100 #连接池最大连接数,默认是10
      auto-commit: true  #此属性控制从池返回的连接的默认自动提交行为,默认值:true
      max-lifetime: 1800000 #此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
      connection-timeout: 30000 #数据库连接超时时间,默认30秒,即30000
      connection-test-query: SELECT 1

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: group-${spring.profiles.active}
    send-message-timeout: 3000 # 消息发送超时时长,默认3s
    retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
    retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

3.3 [mall-order]生产者

生产者OrderSender

package com.ac.order.mq.send;

import com.ac.common.qm.MqTopicConstant;
import com.ac.common.qm.msg.MqOrderMsg;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Slf4j
@Lazy
@Component
public class OrderSender {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public void asyncSend(MqOrderMsg mqMsg) {
        String payload = JSONObject.toJSONString(mqMsg);

        //Topic+Tag更精准接收消息
        String destination = MqTopicConstant.TOPIC_ORDER + ":" + mqMsg.getAction().getCode();

        rocketMQTemplate.asyncSend(destination, payload, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info(OrderSender.class.getSimpleName() + ",消息发送成功, result: {}", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                log.error(OrderSender.class.getSimpleName() + ",消息发送失败");
                e.printStackTrace();
            }
        });
    }
}

下单发送MQ文章来源地址https://www.yii666.com/blog/468837.html

@Slf4j
@Service
public class OrderServiceImpl implements OrderService {

    @Resource
    private OrderDao orderDaoImpl;

    @Resource
    private MemberFeignApi memberFeignApi;

    @Resource
    private OrderItemService orderItemServiceImpl;

    @Resource
    private OrderSender orderSender;

    @Override
    public OrderDetailDTO findOrderDetail(Long id) {
        return null;
    }

    @Override
    public IPage<OrderDTO> pageOrder(OrderPageQry qry) {
        return orderDaoImpl.pageOrder(qry);
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Long createOrder(OrderAddVO addVO) {
        Order order = new Order();
        order.setOrderNo(RandomUtil.randomNumbers(8));

        //省略支付流程
        order.setOrderState(OrderStateEnum.PAYED);
        order.setOrderTime(LocalDateTime.now());

        //通过feign取用户信息
        MemberDTO member = memberFeignApi.findMember(addVO.getMemberId());
        order.setMemberId(addVO.getMemberId());
        order.setMemberName(member.getMemberName());
        order.setMobile(member.getMobile());
        orderDaoImpl.save(order);

        BigDecimal discountAmount = new BigDecimal(0.00);
        BigDecimal productAmount = new BigDecimal(0.00);
        //存订单项信息
        for (OrderItemAddVO orderItemAdd : addVO.getOrderItemList()) {
            OrderItem orderItem = orderItemServiceImpl.addOrderItem(order.getId(), orderItemAdd);
            productAmount = productAmount.add(orderItem.getBuyPrice().multiply(new BigDecimal(orderItem.getBuyNum())));
        }

        //更新订单金额信息
        order.setDiscountAmount(discountAmount);
        order.setProductAmount(productAmount);
        BigDecimal payAmount = productAmount.subtract(discountAmount);
        order.setPayAmount(payAmount);
        orderDaoImpl.updateById(order);

        //发送下单MQ
        MqOrderMsg mqMsg = MqOrderMsg.builder()
                .action(MqOrderAction.PAID)
                .orderId(order.getId())
                .memberId(order.getMemberId())
                .payAmount(order.getPayAmount())
                .build();
        orderSender.asyncSend(mqMsg);

        return order.getId();
    }
}

3.4 [mall-member]消费者

MemberOrderListener消费者

package com.ac.member.mq.listener;

import com.ac.common.qm.MqTopicConstant;
import com.ac.common.qm.MqConsumerConstant;
import com.ac.common.qm.msg.MqOrderAction;
import com.ac.common.qm.msg.MqOrderMsg;
import com.ac.member.component.MemberIntegralComponent;
import com.ac.member.enums.IntegralSourceTypeEnum;
import com.ac.member.vo.IntegralLogEditVO;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Slf4j
@Service
@RocketMQMessageListener(
        consumerGroup = MqConsumerConstant.CONSUMER_MEMBER_ORDER,
        topic = MqTopicConstant.TOPIC_ORDER,
        selectorExpression = "PAID||REFUND",
        messageModel = MessageModel.CLUSTERING)
public class MemberOrderListener implements RocketMQListener<MessageExt> {

    @Resource
    private MemberIntegralComponent memberIntegralComponent;

    @Override
    public void onMessage(MessageExt message) {
        MqOrderMsg mqMsg = JSONObject.parseObject(message.getBody(), MqOrderMsg.class);
        log.info(MemberOrderListener.class.getSimpleName() + ",msgId={},msg={}", message.getMsgId(), mqMsg);
        try {
            //Topic+Tag更精准接收消息
            MqOrderAction action = mqMsg.getAction();
            if (MqOrderAction.PAID == action) {
                dealPaid(mqMsg);
            } else if (MqOrderAction.REFUND == action) {
                dealRefund(mqMsg);
            }
        } catch (Exception e) {
            log.error(MemberOrderListener.class.getSimpleName() + ",消费失败,mqMsg={},e={}", mqMsg, e.getMessage());
        }
    }

    /**
     * 处理订单付款事件
     *
     * @param mqMsg
     */
    private void dealPaid(MqOrderMsg mqMsg) {
        IntegralLogEditVO integralVO = new IntegralLogEditVO();
        integralVO.setMemberId(mqMsg.getMemberId());
        integralVO.setSourceType(IntegralSourceTypeEnum.AWARD_ORDER);
        integralVO.setSourceRemark("下单获得积分");
        integralVO.setIntegral(mqMsg.getPayAmount().longValue());

        memberIntegralComponent.recordIntegral(integralVO);
    }

    private void dealRefund(MqOrderMsg mqMsg) {
        log.info("处理退单事件");
    }
}

四、测试

4.1 下单

微服务开发系列 第七篇:RocketMQ下单

4.2 控制台日志

[mall-order]控制台MQ发送日志:

2023-04-04 15:58:37.052  INFO 25204 --- [ublicExecutor_1] com.ac.order.mq.send.OrderSender         : OrderSender,消息发送成功, result: SendResult [sendStatus=SEND_OK, msgId=7F000001627418B4AAC212E0B7F30000, offsetMsgId=AC100B8D00002A9F000000000003B369, messageQueue=MessageQueue [topic=TOPIC_ORDER, brokerName=LAPTOP-R0R80SCR, queueId=3], queueOffset=0]

[mall-member]控制台MQ接收日志:

2023-04-04 15:58:37.243  INFO 26788 --- [_MEMBER_ORDER_1] c.a.m.mq.listener.MemberOrderListener    : MemberOrderListener,msgId=7F000001627418B4AAC212E0B7F30000,msg=MqOrderMsg(action=PAID, orderId=281635594240001, memberId=264260572479489, payAmount=40.50)

4.3 数据库记录

微服务开发系列 第七篇:RocketMQt_order 微服务开发系列 第七篇:RocketMQt_member_integral文章地址https://www.yii666.com/blog/468837.html网址:yii666.com<

微服务开发系列 第七篇:RocketMQt_member_integral_log

4.4 RocketMQ Dashboard

微服务开发系列 第七篇:RocketMQDashboard列表

微服务开发系列 第七篇:RocketMQDashboard消息内容

版权声明:本文内容来源于网络,版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。文本页已经标记具体来源原文地址,请点击原文查看来源网址,站内文章以及资源内容站长不承诺其正确性,如侵犯了您的权益,请联系站长如有侵权请联系站长,将立刻删除

领支付宝红包赞助服务器费用

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信图片_20190322181744_03.jpg

微信扫一扫打赏

请作者喝杯咖啡吧~

支付宝扫一扫领取红包,优惠每天领

二维码1

zhifubaohongbao.png

二维码2

zhifubaohongbao2.png