基于redis消息发布订阅模式实现消息服务的分布式事务 - Go语言中文社区

基于redis消息发布订阅模式实现消息服务的分布式事务


假设有A和B两个系统,分别可以处理任务A和任务B。此时系统A中存在一个业务流程,需要将任务A和任务B在同一个事务中处理。下面来介绍基于消息中间件来实现这种分布式事务。(服务间通信使用dubbo)

 

 

1)A任务若提交成功,则向事务队列(user:topic)中添加一条信息,若提交失败,则整个事务失败

2)向事务队列user:topic添加消息成功,则通知B系统,开始执行B任务。若向事务队列user:topic添加消息失败,则整个事务失败

3)B系统接收到通知消息时开始执行B任务,执行之前需要检查是否是重复通知,此处应该有幂等性。B任务若执行成功,则删除事务队列(user:topic)中相对应的数据。

1、WEB层,这里是测试的入口

@Controller
@RequestMapping(value = "/system")
public class UserController extends BaseController{

    @Autowired
    private UserService userService;

    static int i = 0;

    @RequestMapping("/test")
    public void test(){
        BaseSysUserPo sysUserPo = new BaseSysUserPo();
        sysUserPo.setUserId(""+i++);
        sysUserPo.setIsDelete("0");
        sysUserPo.setAmount(new BigDecimal(1));
        ResultBean resultBean1 = userService.addUser(sysUserPo);
        System.out.println("==============="+resultBean1.getMessage());
    }
}

2、SERVICE层,服务层,也就是上面所说的A系统,这里使用User模块来代替

    @Autowired
    private SysUserLogic sysUserLogic;
    @Autowired
    private RedisTemplate<String,String> redisTemplate;
    /**
     * 测试添加
     * @param baseSysUserPo
     * @return
     */
    String channel = "user:topic";
    public ResultBean addUser(BaseSysUserPo baseSysUserPo) {
        ResultBean resultBean1 = new ResultBean();
        BaseSysMenuPo menuPo = new BaseSysMenuPo();
        menuPo.setMenuId(baseSysUserPo.getUserId());
        menuPo.setIsDelete("0");
        menuPo.setAmount(new BigDecimal(1));
        try {
            resultBean1 = sysUserLogic.add(baseSysUserPo);
            if (resultBean1.isSuccess()) {
                if (redisTemplate.opsForList().leftPush(channel, JSONObject.toJSONString(menuPo)) > 0) {//将消息放入保证队列,如果失败,则整个事务失败。
                    redisTemplate.convertAndSend(channel, JSONObject.toJSONString(menuPo));//通知消息到远程服务。
                } else {
                    resultBean1.error("确认消息发送异常");
                    throw new RuntimeException("确认消息发送异常");//将消息放入保证队列,失败,整个事务失败。
                }
            }
        } catch (LogicException e) {
            e.printStackTrace();
            return resultBean1;
        }
        return resultBean1;
    }

3、SERVICE层,服务层,上面所说的B系统,这里使用Menu模块代替

package com.system.listener;

import com.alibaba.fastjson.JSONObject;
import com.fengyong.base.rely.ResultBean;
import com.fengyong.core.logic.LogicException;
import com.system.po.menu.base.BaseSysMenuPo;
import com.system.service.menu.logic.SysMenuLogic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

/**
 * 描述:
 *
 * @author fengyong
 * @date 2019-03-01.
 */
@Service
public class TopicMessageListener  implements MessageListener {
    private RedisTemplate<String,String> redisTemplate;
    @Autowired
    private SysMenuLogic sysMenuLogic;

    public void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] body = message.getBody();//请使用valueSerializer
        byte[] channel = message.getChannel();
        //请参考配置文件,本例中key,value的序列化方式均为string。
        //其中key必须为stringSerializer。和redisTemplate.convertAndSend对应
        String itemValue = (String)redisTemplate.getValueSerializer().deserialize(body);
        String topic = (String)redisTemplate.getStringSerializer().deserialize(channel);
        System.out.println(itemValue+"------"+topic);
        BaseSysMenuPo menuPo = JSONObject.parseObject(itemValue,BaseSysMenuPo.class);
        ResultBean resultBean = null;
        try {
            //TODO 此处应该判断是否是重复通知
            if(menuPo.getMenuId().equals("5"))
                throw new RuntimeException("数据产生异常,回滚");
            resultBean = sysMenuLogic.add(menuPo);
        } catch (LogicException e) {
            e.printStackTrace();
        }
        if(resultBean.isSuccess()){//事务成功,将保证队列的数据删掉。
            redisTemplate.opsForList().remove(topic,0,itemValue);
        }
    }
}

4、Redis的消息订阅配置

<bean id="topicMessageListener" class="com.system.listener.TopicMessageListener">
        <property name="redisTemplate" ref="redisTemplate"></property>
    </bean>
    <bean id="topicContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer" destroy-method="destroy">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="taskExecutor">
            <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
                <property name="poolSize" value="3"></property>
            </bean>
        </property>
        <property name="messageListeners">
            <map>
                <entry key-ref="topicMessageListener">
                    <bean class="org.springframework.data.redis.listener.ChannelTopic">
                        <constructor-arg value="user:topic"/>
                    </bean>
                </entry>
            </map>
        </property>
    </bean>

5、利用ab压测,由于是本地压测,并发就只设置100,请求1000次

bogon:~ fengyong$ ab -n 1000 -c 100 http://localhost:8080/system/test.html
This is ApacheBench, Version 2.3 <$Revision: 1826891 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 100 requests
Completed 200 requests
Completed 300 requests
Completed 400 requests
Completed 500 requests
Completed 600 requests
Completed 700 requests
Completed 800 requests
Completed 900 requests
Completed 1000 requests
Finished 1000 requests


Server Software:        Apache-Coyote/1.1
Server Hostname:        localhost
Server Port:            8080

Document Path:          /system/test.html
Document Length:        0 bytes

Concurrency Level:      100
Time taken for tests:   4.043 seconds
Complete requests:      1000
Failed requests:        0
Total transferred:      121000 bytes
HTML transferred:       0 bytes
Requests per second:    247.34 [#/sec] (mean)
Time per request:       404.309 [ms] (mean)
Time per request:       4.043 [ms] (mean, across all concurrent requests)
Transfer rate:          29.23 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    1   1.1      0       5
Processing:    23  386  69.9    369     677
Waiting:       17  386  70.0    369     677
Total:         23  387  69.9    370     680

Percentage of the requests served within a certain time (ms)
  50%    370
  66%    398
  75%    411
  80%    420
  90%    460
  95%    521
  98%    597
  99%    640
 100%    680 (longest request)
bogon:~ fengyong$ 

6、查看数据库结果,可以很明显的看到,A系统的数据库(xdj)中的User表有1000条数据,B系统的数据库(xdj2)中的Menu表有999条数据,失败的是id等于5的一条数据,然后再看redis中user:topic队列里的数据,是一条id等于5的数据。

xdj数据库图片

xdj2数据库图像

redis数据图片

7、该条失败的数据,可以采用人工处理。由于本人在互联网金融行业工作,接过不少渠道,下游也有不少商户,由于网络或者上游某些原因,导致我们没有收到通知时,都是人工处理,绝对不会使用程序重发请求或者使用程序换渠道重发请求,因为如果某天夜里上游的网络通信故障,导致我们的请求批量重发,那造成的损失是以万为单位的计算的,血的教训。

 

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/fengyong7723131/article/details/88568096
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-01 22:56:49
  • 阅读 ( 985 )
  • 分类:Redis

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢