美文网首页
异常补偿机制(队列)

异常补偿机制(队列)

作者: zianL | 来源:发表于2019-11-06 15:24 被阅读0次

前言

项目中常常与第三方系统交互的场景,但是往往我们不能保证每一次网络请求都是正常的,但是常规的做法是对发起http请求的方法进行异常捕获,在处理异常中进行一次重试,若还是失败就丢弃该请求,很明显这么不能满足我们业务场景;

方案

我们基于异常捕获重试的方案进行进一步改进,获取请求参数、执行的方法、该方法所属类 封装成一个对象,放入队列。由异步线程去消费这个对象,从这么对象中拿到类、和方法名反射获取methor ,构造请求参数,重新执行该方法直至成功为止;

package com.gzcb.creditcard.quartz.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;

/**
 * @description: 异常补偿队列
 * @author: libingdao
 * @create: 2019-11-04 15:06
 **/
@Component
@Slf4j
public class ExceptionQueue {
    private static final PriorityBlockingQueue<ExceptionCompensateDTO> message = new PriorityBlockingQueue();
    private final ExecutorService pools =  Executors.newCachedThreadPool();
    private final ExecutorService subpools =  Executors.newFixedThreadPool(2);
    /**
     * 是否启动消费者
     */
    private volatile boolean enable = true;

    @Async
    public void consumer(){
        while (!message.isEmpty()) {
            ExceptionCompensateDTO poll = message.poll();
            pools.submit(()->{
                //出队并删除,假如执行失败重新入队
                if(!StringUtils.isEmpty(poll)){
                    log.error("业务执行: {}",poll);
                    ClassMethod.invokeMethod(poll.getObj(),poll.getMethodName(),poll.getArgs());
                }
            });
        }
        enable = true;
        log.error("子线程执行结束");
    }

    public void  add(ExceptionCompensateDTO exceptionCompensateDTO){
        log.info("exceptionCompensateDTO: {}",exceptionCompensateDTO);
        if(!StringUtils.isEmpty(exceptionCompensateDTO)){
            message.add(exceptionCompensateDTO);
        }
        log.debug("enable:{}",enable);
        if(enable){
            enable = false;
            subpools.submit(()->{
                log.debug("启动子线程");
                consumer();
            });
        }
    }
}

ExceptionQueue 控制消息的出入队列

package com.gzcb.creditcard.quartz.utils;

import org.springframework.util.ReflectionUtils;

import java.lang.reflect.Method;

/**
 * @description:
 * @author: libingdao
 * @create: 2019-11-07 11:06
 **/
public class ClassMethod {
    /**
     * @param obj        调用此方法的对象
     * @param methodName 方法名称
     * @param args       调用的这个方法的参数参数列表
     */
    public static void invokeMethod(Object obj, String methodName, Object[] args) {
        Class argsClass[] = null;
        //1.参数存在
        if (args != null) {
            int len = args.length;
            argsClass = new Class[len];
            //2.根据参数得到相应的 Class的类对象
            for (int i = 0; i < len; ++i) {
                argsClass [i] = args[i].getClass();
            }
        }
        // 找到方法
        Method method = ReflectionUtils.findMethod(obj.getClass(), methodName, argsClass);
        // 执行方法
        ReflectionUtils.invokeMethod(method, obj, args);
    }
}

ClassMethod 反射获取被执行者对象和方法、参数然后执行;

package com.gzcb.creditcard.quartz.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @description:
 * @author: libingdao
 * @create: 2019-11-07 11:04
 **/
@Component
public class DeclareMethodA {
    public DeclareMethodA(){}
    @Autowired Test test;
    public void showA(Integer a,String name,NewClass nc){
        System.out.println("Integer="+a+" name="+name+" NewClass="+nc);
        test.xx();
    }
     public static class NewClass{
    }
}

DeclareMethodA 被执行对象

package com.gzcb.creditcard.quartz.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @description:
 * @author: libingdao
 * @create: 2019-11-07 11:04
 **/
@Component
public class DeclareMethodB {
    public DeclareMethodB(){}
    @Autowired Test test;
    public void show11(Integer a,String name){
        System.out.println("Integer11="+a+" name11="+name);
        test.xx();
    }
     public static class NewClass{
    }
}

DeclareMethodB被执行对象

package com.gzcb.creditcard.quartz.utils;
import org.springframework.stereotype.Component;
/**
 * @description:
 * @author: libingdao
 * @create: 2019-11-07 11:11
 **/
@Component
public class Test {
    public void xx(){
        System.out.println("test.xx");
    }
}

Test 对象主要是验证能否调通spring bean

package com.gzcb.creditcard.quartz.utils;
import lombok.Data;
/**
 * @description:
 * @author: libingdao
 * @create: 2019-11-04 15:13
 **/
@Data
public class ExceptionCompensateDTO implements Comparable{
    public Object obj;
    public String methodName;
    public Object[] args;
    public ExceptionCompensateDTO(Object obj, String methodName, Object[] args) {
        this.obj = obj;
        this.methodName = methodName;
        this.args = args;
    }
    @Override
    public int compareTo(Object o) {
        return 0;
    }
}
package com.gzcb.creditcard.quartz.sms;

import com.alibaba.fastjson.JSONObject;
import com.gzcb.creditcard.QuartzApplication;
import com.gzcb.creditcard.config.sms.bean.SmsTemplateBody;
import com.gzcb.creditcard.config.sms.bean.SmsTemplateQuery;
import com.gzcb.creditcard.quartz.utils.*;
import com.gzcb.creditcard.service.SMSService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = QuartzApplication.class)
public class SmsTemplateServiceImplTest {
    @Autowired
    DeclareMethodA declareMethod;
    @Autowired
    DeclareMethodB declareMethod11;
    @Autowired ExceptionQueueTesy exceptionQueueTesy;

static ExecutorService mainpools =  Executors.newCachedThreadPool();
    @Test
    public void exceptionQueueTesy(){

        for (Integer i = 0;i<1000;i++) {
            Integer ss = i;
            mainpools.submit(()->{
                for (Integer j = 0;j<100;j++) {
                    ExceptionCompensateDTO dto =
                            new ExceptionCompensateDTO(declareMethod,"show",new Object[]{ss+j,"今天天气不错"+ss+j,new DeclareMethod.NewClass()});
                    exceptionQueueTesy.add(dto);
                }
            });
            mainpools.submit(()->{
                for (Integer j = 0;j<100;j++) {
                    ExceptionCompensateDTO dto =
                            new ExceptionCompensateDTO(declareMethod11,"show11",new Object[]{ss+j,"今天天气很糟糕"+ss+j});
                    exceptionQueueTesy.add(dto);
                }
            });
        }
        try {
            Thread.sleep(2000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("主线程执行结束");
    }
}

测试模拟多线程请求

相关文章

  • spring-kafka异常消费补偿

    1、设置异常处理器 2、抛出需要补偿的异常 3、死信队列 超出重试次数后,消息会被发往死信队列(topicName...

  • 婚姻大学心理小课堂63--防御机制:补偿

    防御机制:补偿

  • Exception和Error有什么区别?

    1.异常:这种情况下的异常,可以通过完善任务重试机制,当执行异常时,保存当前任务信息加入重试队列。重试的策略根据业...

  • 补偿机制

    1 子安其实不明白,为什么他会和方少华成为朋友,又怎么变成后来传说中令人尴尬的关系。 第一次见到方少华,他正醉醺醺...

  • Spring RabbitMQ 死信机制

    采用死信机制的好处是可以提高系统的稳定性,当消息消费失败后,消息进入死信队列,可以对消息进行补偿,可以达到最终一致...

  • 郭召良CBT入门第四十三课提要:核心信念的机制与识别

    Ø 核心信念的心理机制 第一是执行机制:补偿策略 第二是维护机制:选择性注意/过滤 Ø 补偿策略或应对策略 在核心...

  • 99%的人都能看懂的分布式系统「补偿」机制

    摘要:我们来聊一聊在保证对外高可用的同时,憋出的“内伤”该如何通过「补偿」机制来自行消化。 一、「补偿」机制的意义...

  • 事物补偿机制

    事物补偿机制 事物补偿即在事物链中的任何一个正向事物操作,都必须存在一个完全符合回滚规则的可逆事物。如果是一个完整...

  • 12 补偿机制

    1 出现场景 2 实现原理 2.1 @RabbitListener 3 自定义重试 4 如何选择重试机制 4.1 ...

  • 补偿机制20211111

    最近我注意到自己的一些行为,这不是我第一次注意到这些行为,我想也不会是最后一次注意这些行为。 这是些什么行为呢? ...

网友评论

      本文标题:异常补偿机制(队列)

      本文链接:https://www.haomeiwen.com/subject/rorvbctx.html