美文网首页JAVA技术文章
spring @scheduled并发

spring @scheduled并发

作者: rejoice001 | 来源:发表于2017-12-06 21:09 被阅读14次

1、基于springboot的项目开启自动配置

package com.plateno.booking.sync.vienna;


import javax.annotation.PostConstruct;
import javax.jms.ConnectionFactory;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ImportResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;


import com.plateno.booking.sync.vienna.annotation.EnableMysql;
import com.plateno.booking.sync.vienna.annotation.EnableRedis;
import com.plateno.booking.sync.vienna.jms.messageconverter.PlatenoMessageConverter;
import com.plateno.booking.sync.vienna.util.BookingRedisHelper;
import com.plateno.booking.sync.vienna.util.SpringContextUtils;
//@EnableMysql
//@EnableRedis
//开启自动配置
@SpringBootApplication(exclude = { 
        DataSourceAutoConfiguration.class,
        RedisAutoConfiguration.class
        }, scanBasePackageClasses = {
        BookingSyncVienna.class })
public class BookingSyncVienna {
    


    public static void main(String[] args) {
        SpringApplication springApplication = new SpringApplication(BookingSyncVienna.class);
        springApplication.run(args);
    }


    @Autowired
    JmsTemplate jmsTemplate;


    @Bean
    MessageConverter messageConverter() {
        return new PlatenoMessageConverter();
    }


    @Bean
    SpringContextUtils springContextUtils() {
        return new SpringContextUtils();
    }


    @PostConstruct
    public void init() {
        this.jmsTemplate.setMessageConverter(messageConverter());
    }


    /**
     * 创建默认的listerFactory 
     * see:JmsListenerAnnotationBeanPostProcessor->static final String DEFAULT_JMS_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "jmsListenerContainerFactory";
     * @param connectionFactory
     * @param configurer
     * @return
     */
    @Bean(name="jmsListenerContainerFactory")
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setMessageConverter(messageConverter());
        configurer.configure(factory, connectionFactory);
        return factory;
    }


/*  @Bean
    @Autowired
    public BookingRedisHelper redisHelper(RedisTemplate redisTemplate) {
        BookingRedisHelper bookingRedisHelper = new BookingRedisHelper();
        bookingRedisHelper.setRedisTemplate(redisTemplate);
        return bookingRedisHelper;
    }
*/
    /*
     * @Bean public DefaultJmsListenerContainerFactory
     * listenerContainerFactory() { DefaultJmsListenerContainerFactory factory =
     * new DefaultJmsListenerContainerFactory();
     * factory.setConnectionFactory(connectionFactory());
     * factory.setMessageConverter(objectConverter());
     * factory.setConcurrency("5"); return factory; }
     */
    /*
     * @Bean public ConnectionFactory connectionFactory(){
     * PooledConnectionFactory pooledConnectionFactory = new
     * PooledConnectionFactory(); ActiveMQConnectionFactory mqConnectionFactory
     * = new ActiveMQConnectionFactory();
     * mqConnectionFactory.setBrokerURL(brokerUrl);
     * mqConnectionFactory.setUseAsyncSend(true);
     * mqConnectionFactory.setOptimizeAcknowledgeTimeOut(3000);
     * pooledConnectionFactory.setConnectionFactory(mqConnectionFactory);
     * pooledConnectionFactory.setMaxConnections(20);
     * mqConnectionFactory.setTransactedIndividualAck(true); return
     * pooledConnectionFactory; }
     */
}

2、创建作业类

package com.plateno.booking.sync.vienna.test;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.plateno.booking.sync.vienna.properties.CronProperties;

@Component
@ConditionalOnProperty(prefix = "", name = "test", havingValue = "true")
public class TestJob {
    
    private static Logger logger = LoggerFactory.getLogger(TestJob.class);
    
    @Scheduled(cron="0/2 * * * * ?")
    @Async//异步执行,配合线程池实现并发
    public void test(){
        
        System.err.println("---------------------");
        System.err.println("a");
        logger.error(Thread.currentThread().getName());
        System.err.println("---------------------");
        try {
            Thread.sleep(20000);//执行睡眠,这样如果并发执行,那么上面的打印会2秒执行一次,而下面的ffff则会22秒打印一次
            System.out.println("ffff");
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    
    /*@Scheduled(cron="0/1 * * * * ?")
    public void test1(){
        System.err.println("---------------------");
        System.err.println("b");
        logger.error(Thread.currentThread().getName());
        System.err.println("---------------------");
    }*/
}

3、定义线程池配置文件(@enableSchedulling@scheduled默认是基于单线程),多线程并发需要配置xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context-4.2.xsd
            http://www.springframework.org/schema/task
            http://www.springframework.org/schema/task/spring-task-4.2.xsd">

 <task:scheduler id="scheduler" pool-size="10" />
<task:executor id="executor" keep-alive="7200" pool-size="100-200"
queue-capacity="500" rejection-policy="CALLER_RUNS" />
<task:annotation-driven executor="executor"
scheduler="scheduler" />
</beans>

同时,如果想使用注解开发

配置类加上@EnableAsync即可,本质上都是创建AsyncAnnotationBeanPostProcessor对象,另外如果要自定义excutor和exceptionHandller,可以参考@EnableAsync源码
注释:

/**
 * Enables Spring's asynchronous method execution capability, similar to functionality
 * found in Spring's {@code <task:*>} XML namespace.
 *
 * <p>To be used on @{@link Configuration} classes as follows, where {@code MyAsyncBean}
 * is a user-defined type with one or more methods annotated with either Spring's
 * {@code @Async} annotation, the EJB 3.1 {@code @javax.ejb.Asynchronous} annotation,
 * or any custom annotation specified via the {@link #annotation} attribute.
 *
 * <pre class="code">
 * @Configuration
 * @EnableAsync
 * public class AppConfig {
 *
 *     @Bean
 *     public MyAsyncBean asyncBean() {
 *         return new MyAsyncBean();
 *     }
 * }</pre>
 *
 * <p>The {@link #mode} attribute controls how advice is applied; if the mode is
 * {@link AdviceMode#PROXY} (the default), then the other attributes control the behavior
 * of the proxying.
 *
 * <p>Note that if the {@linkplain #mode} is set to {@link AdviceMode#ASPECTJ}, then the
 * value of the {@link #proxyTargetClass} attribute will be ignored. Note also that in
 * this case the {@code spring-aspects} module JAR must be present on the classpath.
 *
 * <p>By default, Spring will be searching for an associated thread pool definition:
 * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context,
 * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If
 * neither of the two is resolvable, a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
 * will be used to process async method invocations. Besides, annotated methods having a
 * {@code void} return type cannot transmit any exception back to the caller. By default,
 * such uncaught exceptions are only logged.
 *
 * <p>To customize all this, implement {@link AsyncConfigurer} and provide:
 * <ul>
 * <li>your own {@link java.util.concurrent.Executor Executor} through the
 * {@link AsyncConfigurer#getAsyncExecutor getAsyncExecutor()} method, and</li>
 * <li>your own {@link org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler
 * AsyncUncaughtExceptionHandler} through the {@link AsyncConfigurer#getAsyncUncaughtExceptionHandler
 * getAsyncUncaughtExceptionHandler()}
 * method.</li>
 * </ul>
 *
 * <pre class="code">
 * @Configuration
 * @EnableAsync
 * public class AppConfig implements AsyncConfigurer {
 *
 *     @Bean
 *     public MyAsyncBean asyncBean() {
 *         return new MyAsyncBean();
 *     }
 *
 *     @Override
 *     public Executor getAsyncExecutor() {
 *         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
 *         executor.setCorePoolSize(7);
 *         executor.setMaxPoolSize(42);
 *         executor.setQueueCapacity(11);
 *         executor.setThreadNamePrefix("MyExecutor-");
 *         executor.initialize();
 *         return executor;
 *     }
 *
 *     @Override
 *     public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
 *         return MyAsyncUncaughtExceptionHandler();
 *     }
 * }</pre>
 *
 * <p>If only one item needs to be customized, {@code null} can be returned to
 * keep the default settings. Consider also extending from {@link AsyncConfigurerSupport}
 * when possible.
 *
 * <p>Note: In the above example the {@code ThreadPoolTaskExecutor} is not a fully managed
 * Spring bean. Add the {@code @Bean} annotation to the {@code getAsyncExecutor()} method
 * if you want a fully managed bean. In such circumstances it is no longer necessary to
 * manually call the {@code executor.initialize()} method as this will be invoked
 * automatically when the bean is initialized.
 *
 * <p>For reference, the example above can be compared to the following Spring XML
 * configuration:
 *
 * <pre class="code">
 * {@code
 * <beans>
 *
 *     <task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/>
 *
 *     <task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/>
 *
 *     <bean id="asyncBean" class="com.foo.MyAsyncBean"/>
 *
 *     <bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/>
 *
 * </beans>
 * }</pre>

参考:http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html

执行结果:
睡眠之前的2秒打印一次,睡眠之后的22秒打印一次

相关文章

网友评论

    本文标题:spring @scheduled并发

    本文链接:https://www.haomeiwen.com/subject/lbalixtx.html