我们大部分人的编程习惯都是线性编程,所谓线性编程就是一个请求涉及到A,B,C,D等n个有顺序关系的操作在编码处理层面都是顺序性的,这样会导致随着业务的发展,依赖A操作结果的业务越来越多,请求处理会出现A->B->C->D->E....等很多个操作和A操作耦合在一起,会直接导致接口的rt变高,另外业务层面边界变得模糊,各个业务线的逻辑相互穿插,相互强依赖.
先举几个例子:
? ? 我们用编程思维逐个分析一下上述案例:订单支付成功后,核销用户优惠券,然后扣除积分,然后调履约系统发货,才算支付完成;自来水公司广播来水了,等小明洗完衣服、小明妈妈做完饭和爸爸上完厕所才算广播结束;放学铃声响了,同学们都去玩了,老师骑着二八大杠到家了,门卫大爷把门锁上了才算放学;客服切换工作状态为在线后,要等在线会话分配完成,离线工单分单完成才算上线成功.
软件编程讲究高内聚低耦合,也就是领域边界问题,细想一下前边几种场景前后有没有必然的同步依赖关系,再抽象一点,每一个业务场景、每一个操作都有自己的业务主体和操作主线,也就是说理论上只关心自己领域边界内的操作完成,对于其他的附带操作不是必然的,如果后续有操作依赖其结果,那么可以通过消息或者事件订阅来做逻辑层面解耦和操作层面异步化.
也即是操作主线完成操作业务主体领域内的业务,把结果发布出去,有依赖该结果的业务,自己订阅并在领域内消化处理该事件,理论上适用于业务边界清晰和若依赖性场景,如果是强依赖型需要自己做一致性保证或者业务层面重新抽象和定于领域边界问题,考虑把强依赖性的放到同一个业务领域.
事件是对操作行为的抽象,比如上述案例中的订单支付成功、放学铃响和客服上线等等,是基于当前业务变更产生的广播通知,周边业务可以基于此操作行为通知完成自己业务领域内的操作。
前边有提到某个业务完成自己操作后,需要将结果发布出去,对于依赖此结果完成后置操作的场景订阅事件完成自己的业务,发布通知又分为应用内和应用外,应用外通知基本上都是借助于消息中间件来发布,应用内也就是把事件发布出去,或者发布到应用内特有的容器或者队列,供依赖方消费。
监听器是对某种业务侧封装,订阅自己感兴趣的事件,在接收到事件通知后完成自己领域内的操作,比如在线会话监听器在接收到客服上线事件通知后,触发会话分配操作。
jdk1.0就引入了观察者模式,有两个核心的类:Observable和Observer.
新建被观察者:
@Slf4j public class TestObservable extends Observable { public TestObservable() { //1.添加观察者 this.addObserver(new TestObserver()); } public void doSomething() { log.info("TestObserve.doSomething currentThread={}",Thread.currentThread().getName()); //2.发布变更通知 this.setChanged(); this.notifyObservers(); //this.notifyObservers(); } }
新建观察者
@Slf4j public class TestObserver implements Observer { @Override public void update(Observable o, Object arg) { log.info("TestObserver.update receive change event;Observable={},arg={},currentThread={}",o,arg,Thread.currentThread().getName()); } }
编写测试代码并执行:
public static void main(String[] args) { TestObservable observable = new TestObservable(); observable.doSomething(); }
整个实现比较简单,但是过程中也有一些点我们需要注意并记录下来用于后续分析:
jdk自带的观察者模式实现我们主要分析一下Observable类就好,先看代码实现:
public class Observable { private boolean changed = false; private Vector<Observer> obs; public Observable() { obs = new Vector<>(); } public synchronized void addObserver(Observer o) { if (o == null) throw new NullPointerException(); if (!obs.contains(o)) { obs.addElement(o); } } public void notifyObservers() { notifyObservers(null); } public void notifyObservers(Object arg) { Object[] arrLocal; synchronized (this) { if (!changed) return; arrLocal = obs.toArray(); clearChanged(); } for (int i = arrLocal.length-1; i>=0; i--) ((Observer)arrLocal[i]).update(this, arg); } }
该类实现很简单,有几个关键信息:obs(观察者容器),addObserver(添加观察者),notifyObservers(事件通知),没有什么好分析的,就是在合适的时机将观察者添加到被观察者容器中,被观察者发生变更后调用notifyObservers方法把容器中的观察者都执行一遍。
上述代码是jdk1.0石器时代的代码,除了学习观察者模式和做案例讲解,估计罕有人用,当然互联网发展到今天我们还是以批判性的眼光来分析一下其优缺点.
优点: ?实现简单易懂
缺点: ?被观察者必须继承Observable类,观察/被观察关系装配与逻辑代码耦合,同步操作,泛化上下文
对于继承Observable类无能为力,除非自己重新写一套基于接口的来绕开单继承规则,那么我们可以从观察者/被观察者关系装配和同步操作这两个点着手考虑开进方案.
? ?首先为每一种事件类型定义对应的监听器:
public interface TestObservableListener extends Observer {}
然后观察者实现监听器接口并注入应用容器:
@Slf4j @Component public class TestObserver implements TestObservableListener { @Override public void update(Observable o, Object arg) { log.info("TestObserver.update receive change event;Observable={},arg={},currentThread={}",o,arg,Thread.currentThread().getName()); } }
被观察者实现ApplicationContextAware接口,借鸡生蛋从应用容器取出指定类型观察者放入被观察者容器:
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String,TestObservableListener> map = applicationContext.getBeansOfType(TestObservableListener.class); map.forEach((k,v) -> this.addObserver(v)); }
使用线程池异步通知变更事件:
public void doSomething() { log.info("TestObserve.doSomething currentThread={}",Thread.currentThread().getName()); ThreadPoolUtil.COMMON_POOL.execute(() -> { this.setChanged(); this.notifyObservers(); }); }
拉出来溜一圈:
通过改进实现了观察者与被观察者关系装配分离以及监听器操作异步化,能满足比较简单的业务场景,但是只能说还是很难用。
jdk1.1版本引入了事件模式,比较简单,有两个关键的类:EventObject和EventListener.
但是这个版本只是引入了事件的概念,在编码层面并没有本质性的改善,后续版本引入属性变更事件,实用性也好了起来,有三个比较核心的类:PropertyChangeEvent/PropertyChangeSupport/PropertyChangeListener.
新建监听器:
@Slf4j public class SimplePropertyChangeListener implements PropertyChangeListener { @Override public void propertyChange(PropertyChangeEvent evt) { log.info("SimplePropertyChangeListener.propertyChange evt={},thread={}",evt,Thread.currentThread().getName()); } }
新建业务类:
@Slf4j public class EventManager { private PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this); public EventManager() { this.propertyChangeSupport.addPropertyChangeListener(new SimplePropertyChangeListener()); } private String str; public void setStr(String str) { log.info("EventManager.setStr;thread={}",Thread.currentThread().getName()); String oldStr = this.str; this.str = str; this.propertyChangeSupport.firePropertyChange("str",oldStr,str); } }
测试:
public static void main(String[] args) { new EventManager().setStr("hello"); }
实现也简单,过程中也有一些点我们需要注意并记录下来用于后续分析:
其他两个类比较简单,直接分析PropertyChangeSupport类:
public class PropertyChangeSupport implements Serializable { private PropertyChangeListenerMap map = new PropertyChangeListenerMap(); public PropertyChangeSupport(Object sourceBean) { if (sourceBean == null) { throw new NullPointerException(); } source = sourceBean; } public void addPropertyChangeListener(PropertyChangeListener listener) { if (listener == null) { return; } if (listener instanceof PropertyChangeListenerProxy) { PropertyChangeListenerProxy proxy = (PropertyChangeListenerProxy)listener; // Call two argument add method. addPropertyChangeListener(proxy.getPropertyName(), proxy.getListener()); } else { this.map.add(null, listener); } } public void firePropertyChange(String propertyName, Object oldValue, Object newValue) { if (oldValue == null || newValue == null || !oldValue.equals(newValue)) { firePropertyChange(new PropertyChangeEvent(this.source, propertyName, oldValue, newValue)); } } public void firePropertyChange(PropertyChangeEvent event) { Object oldValue = event.getOldValue(); Object newValue = event.getNewValue(); if (oldValue == null || newValue == null || !oldValue.equals(newValue)) { String name = event.getPropertyName(); PropertyChangeListener[] common = this.map.get(null); PropertyChangeListener[] named = (name != null) ? this.map.get(name) : null; fire(common, event); fire(named, event); } } private static void fire(PropertyChangeListener[] listeners, PropertyChangeEvent event) { if (listeners != null) { for (PropertyChangeListener listener : listeners) { listener.propertyChange(event); } } } }
该类内部维护一个map,用于存放监听器,在业务类调用firePropertyChange方法后,从容器中取出符合条件的监听器循环调用propertyChange方法。
优点: ?实现简单,PropertyChangeSupport封装了监听器容器和对一些复杂操作做了透明化处理
缺点: ?需要手动将监听器添加到PropertyChangeSupport维护的容器中,同步操作
改进方案和观察者模式类似,利用spring容器的扩展点将业务方和监听器关系交给容器维护,在触发fire的时候利用多线程。
做一下思维发散,我们可以继承PropertyChangeSupport重写fire方法,在这里做多线程异步化。
EventBus是谷歌开源的实现事件驱动编程的事件总线,并且提供了基于注解的编码方式,对于需要实现应用内业务解耦的场景,是一个不错的选择,常用的有有两个类和一个注解:EventBus、AsyncEventBus和@Subscribe.
创建事件:
@Data class TestEvent1 { private int message; public TestEvent1(int message) { this.message = message; } }
创建事件监听器:
@Slf4j class EventListener { @Subscribe public void onEvent(TestEvent1 event) { log.info("EventListener onEvent event={},thread={}",event,Thread.currentThread().getName()); } }
guava的事件总线设计和实现都已经比较成熟,能够满足绝大多数应用内业务解耦的诉求,但是有些点还是对我们不够友好,在编码层面还是要对其做一些封装处理。
guava时间总线的核心类就是EventBus,我们剪取我们用到的方法看一下其实现:
public class EventBus { private static final Logger logger = Logger.getLogger(EventBus.class.getName()); private final String identifier; private final Executor executor; private final SubscriberExceptionHandler exceptionHandler; private final SubscriberRegistry subscribers = new SubscriberRegistry(this); private final Dispatcher dispatcher; public EventBus() { this("default"); } public EventBus(String identifier) { this( identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), LoggingHandler.INSTANCE); } public EventBus(SubscriberExceptionHandler exceptionHandler) { this( "default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler); } EventBus( String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) { this.identifier = checkNotNull(identifier); this.executor = checkNotNull(executor); this.dispatcher = checkNotNull(dispatcher); this.exceptionHandler = checkNotNull(exceptionHandler); } public void register(Object object) { subscribers.register(object); } public void post(Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } } }
identifier是时间总线的身份信息,subscribers是事件监听器容器,dispatcher是事件派发器,我们看一下构造器和方法:
构造器:
EventBus( String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) { this.identifier = checkNotNull(identifier); this.executor = checkNotNull(executor); this.dispatcher = checkNotNull(dispatcher); this.exceptionHandler = checkNotNull(exceptionHandler); }
创建事件总线指定标识符,监听器业务执行器,事件派发器,异常处理器.
注册监听器:
public void register(Object object) { subscribers.register(object); }
调用了SubscriberRegistry监听器注册中心的注册方法:
void register(Object listener) { Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
先调用了findAllSubscribers方法,从监听器所属类解析出使用@Subscribe注解的方法并封装成Subscriber放入Multimap返回.
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) { Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create(); Class<?> clazz = listener.getClass(); for (Method method : getAnnotatedMethods(clazz)) { Class<?>[] parameterTypes = method.getParameterTypes(); Class<?> eventType = parameterTypes[0]; methodsInListener.put(eventType, Subscriber.create(bus, listener, method)); } return methodsInListener; }
然后将返回的Multimap<Event,Subscriber>打平转换成Map<Event,List<Subscriber>>,遍历Map使用CopyOnWriteArraySet做事件的读写分离,然后把监听器注册到subscribers中.
发布事件:
public void post(Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }
先从监听器注册中心根据事件类型获取监听器列表,然后调用派发器发布事件,选取一种派发器LegacyAsyncDispatcher看一下实现:
@Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); while (subscribers.hasNext()) { queue.add(new EventWithSubscriber(event, subscribers.next())); } EventWithSubscriber e; while ((e = queue.poll()) != null) { e.subscriber.dispatchEvent(e.event); } }
将事件封装成EventWithSubscriber放入本地队列,然后再派发出去,最后再看一下Subscriber的事件派发实现:
final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); }
此处是执行事件监听器逻辑的地方,如果创建事件总线的时候传入了线程池,那么这里会使用线程池的的线程执行,如果没有传入线程池,EventBus会使用MoreExecutors.directExecutor()作为Executor,而该Executor直接使用业务线程执行监听器逻辑,也就是同步的.
enum DirectExecutor implements Executor { INSTANCE; @Override public void execute(Runnable command) { command.run(); } @Override public String toString() { return "MoreExecutors.directExecutor()"; } }
然后我们再简单分析一下AsyncEventBus:
public class AsyncEventBus extends EventBus { public AsyncEventBus(String identifier, Executor executor) { super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE); } public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) { super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler); } public AsyncEventBus(Executor executor) { super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE); } }
可以理解为多了需要传入线程池的构造器.
优点: ?接入简单,设计成熟,可以根据事件类型将监听器归类
缺点: ?需要手动将监听器添加到EventBus事件监听器注册中心
自定义事件监听器接口,主要用于在spring容器管理的时候归类处理:
public interface IEventListener {}
自定义事件监听器:
@Subscribe public void handle(CloseSessionEvent event) { log.info("CloseSessionEventListener.handle receive event={}",event); }
继承AsyncEventBus自定义异步事件总线,利用spring容器将事件监听器自动注册到事件总线:
@Slf4j @Service public class EventBusHelper extends AsyncEventBus implements ApplicationContextAware { public static final String DEFAULT_EVENT_BUS = "async_event_bus"; private static final AtomicBoolean init = new AtomicBoolean(false); private static final Executor DEFAULT_EXECUTOR = ThreadPoolUtil.COMMON_POOL; public EventBusHelper() { this(DEFAULT_EVENT_BUS); } public EventBusHelper(String identifier) { this(identifier,DEFAULT_EXECUTOR); } public EventBusHelper(String identifier,Executor executor) { super(identifier,executor); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if(init.get()) { return; } Map<String,IEventListener> beanMap = applicationContext.getBeansOfType(IEventListener.class); beanMap.forEach((k,v) -> this.register(v)); } }
这样就实现了事件监听器管理与业务代码解耦,在业务类中引入EventBusHelper调用post发布事件即可,其他的已经交给IOC处理.
spring作为除了jdk之外最被广泛使用的基础构件,每个模块之间也大量使用了事件驱动编程,并且留出了扩展点供开发者使用,在满足开闭原则的情况下,开发者可以写很少的代码就能复用spring的事件驱动编程.
我们实际用到的有3个类和一个注解:ApplicationEvent、ApplicationListener、ApplicationEventPublisher和@EventListener注解.
创建事件类:
public class TestSwitchStateEvent extends ApplicationEvent { private Integer originStatus; private Integer targetStatus; private String casAccount; /** * Create a new ApplicationEvent. * * @param targetStatus the object on which the event initially occurred (never {@code null}) */ public TestSwitchStateEvent(Integer targetStatus) { super(targetStatus); } public TestSwitchStateEvent(Integer originStatus, Integer targetStatus, String casAccount) { this(targetStatus); this.targetStatus = targetStatus; this.originStatus = originStatus; this.casAccount = casAccount; } }
新增监听器:
@Slf4j @Component public class Test3SwitchStateListener implements ApplicationListener<TestSwitchStateEvent> { @Override public void onApplicationEvent(TestSwitchStateEvent event) { log.info("Test3SwitchStateListener.switchStateListener receive event={}",event); } }?????
编写事件发布逻辑,spring容器已经初始化了ApplicationEventPublisher,只需要实现ApplicationEventPublisherAware接口注入线程事件发布器:
@Slf4j @Component public class TestApplicationEventPublisher implements ApplicationEventPublisherAware, InitializingBean { private ApplicationEventPublisher applicationEventPublisher; @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.applicationEventPublisher = applicationEventPublisher; } public void doSomething() { log.info("TestApplicationEventPublisher.doSomething thread={}",Thread.currentThread().getName()); this.applicationEventPublisher.publishEvent(new TestSwitchStateEvent(2,1,"Typhoon")); } @Override public void afterPropertiesSet() throws Exception { this.doSomething(); } }
启动测试,事件正常发布和订阅:
????调整一下监听器实现,使用注解方式:
@Slf4j @Component public class Test3SwitchStateListener { @EventListener public void onApplicationEvent(TestSwitchStateEvent event) { log.info("Test3SwitchStateListener.switchStateListener receive event={},thread={}",event,Thread.currentThread().getName()); } }
和实现接口的方式效果一样,但是从截图中能看出事件发布者和监听器都使用相同线程执行,当然这不一定算是问题,但是在我们业务场景中对于事件驱动的使用主要是为了业务解耦,既然解耦了为什么不用异步?
spring事件驱动的原理大致从两个点分析,@EventListener解析成监听器逻辑和事件发送逻辑。
EventListenerMethodProcessor负责将@EventListener注解的方法解析封装成ApplicationListener,看其核心方法processBean:
private void processBean(final String beanName, final Class<?> targetType) { if (!this.nonAnnotatedClasses.contains(targetType) && !isSpringContainerClass(targetType)) { Map<Method, EventListener> annotatedMethods = null; try { annotatedMethods = MethodIntrospector.selectMethods(targetType, (MethodIntrospector.MetadataLookup<EventListener>) method -> AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class)); } catch (Throwable ex) { } if (CollectionUtils.isEmpty(annotatedMethods)) { this.nonAnnotatedClasses.add(targetType); } else { // Non-empty set of methods ConfigurableApplicationContext context = this.applicationContext; Assert.state(context != null, "No ApplicationContext set"); List<EventListenerFactory> factories = this.eventListenerFactories; Assert.state(factories != null, "EventListenerFactory List not initialized"); for (Method method : annotatedMethods.keySet()) { for (EventListenerFactory factory : factories) { if (factory.supportsMethod(method)) { Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName)); ApplicationListener<?> applicationListener = factory.createApplicationListener(beanName, targetType, methodToUse); if (applicationListener instanceof ApplicationListenerMethodAdapter) { ((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator); } context.addApplicationListener(applicationListener); break; } } } } } }
解析到@EventListener注解的方法后,DefaultEventListenerFactory将其封装成ApplicationListenerMethodAdapter(实现了ApplicationListener接口),然后放到ApplicationContext容器中备用.
接着看一下事件发布逻辑,ApplicationEventPublisher直接调用了AbstractApplicationContext的pulishEvent方法:
@Override public void publishEvent(ApplicationEvent event) { publishEvent(event, null); }? ?
?继续看重载方法publishEvent:
protected void publishEvent(Object event, @Nullable ResolvableType eventType) { Assert.notNull(event, "Event must not be null"); ApplicationEvent applicationEvent; if (event instanceof ApplicationEvent) { applicationEvent = (ApplicationEvent) event; } else { applicationEvent = new PayloadApplicationEvent<>(this, event); if (eventType == null) { eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType(); } } // Multicast right now if possible - or lazily once the multicaster is initialized if (this.earlyApplicationEvents != null) { this.earlyApplicationEvents.add(applicationEvent); } else { //1.发布事件 getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); } // 2.父容器发布事件 if (this.parent != null) { if (this.parent instanceof AbstractApplicationContext) { ((AbstractApplicationContext) this.parent).publishEvent(event, eventType); } else { this.parent.publishEvent(event); } } }????
?代码中标注了两个关键点,当前容器发布事件和如果有父容器也同样发布事件(处理逻辑类似),继续看发布事件逻辑,ApplicationEventMulticaster接口只有一个默认的实现SimpleApplicationEventMulticaster:
@Override public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)); for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) { Executor executor = getTaskExecutor(); if (executor != null) { executor.execute(() -> invokeListener(listener, event)); } else { invokeListener(listener, event); } } }
这里逻辑也不复杂,解析事件类型,然后根据事件类型从容器中获取监听器列表,然后逐个调用监听器的onApplicationEvent方法,需要注意的是如果有线程池就放到线程池中执行.
优点: ?接入简单,设计成熟,可以根据事件类型将监听器归类,支持注解
缺点: ?异步化支持不好,如果要支持异步要开启另外一个套件@EnableAsync
既然提到改进,那按照我的使用习惯,我并不想为了支持异步化然后又在应用维度开启一个全局化的能力@EnableAsync,那就改成自定义实现,用一个异步化事件监听器+自定义线程池解决.
新增异步监听器注解:
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface AsyncEventListener { @AliasFor("classes") Class<?>[] value() default {}; @AliasFor("value") Class<?>[] classes() default {}; String condition() default ""; }?????
新增异步事件监听适配器(无法复用ApplicationListenerMethodAdapter):
@Slf4j public class AsyncApplicationListenerMethodAdapter implements GenericApplicationListener { public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor"; private final String beanName; private final Method method; private final Method targetMethod; private final AnnotatedElementKey methodKey; private final List<ResolvableType> declaredEventTypes; @Nullable private final String condition; private final int order; @Nullable private ApplicationContext applicationContext; @Nullable private EventExpressionEvaluator evaluator; public AsyncApplicationListenerMethodAdapter(String beanName, Class<?> targetClass, Method method) { this.beanName = beanName; this.method = BridgeMethodResolver.findBridgedMethod(method); this.targetMethod = (!Proxy.isProxyClass(targetClass) ? AopUtils.getMostSpecificMethod(method, targetClass) : this.method); this.methodKey = new AnnotatedElementKey(this.targetMethod, targetClass); AsyncEventListener ann = AnnotatedElementUtils.findMergedAnnotation(this.targetMethod, AsyncEventListener.class); this.declaredEventTypes = resolveDeclaredEventTypes(method, ann); this.condition = (ann != null ? ann.condition() : null); this.order = resolveOrder(this.targetMethod); } private static List<ResolvableType> resolveDeclaredEventTypes(Method method, @Nullable AsyncEventListener ann) { int count = method.getParameterCount(); if (count > 1) { throw new IllegalStateException( "Maximum one parameter is allowed for event listener method: " + method); } if (ann != null) { Class<?>[] classes = ann.classes(); if (classes.length > 0) { List<ResolvableType> types = new ArrayList<>(classes.length); for (Class<?> eventType : classes) { types.add(ResolvableType.forClass(eventType)); } return types; } } if (count == 0) { throw new IllegalStateException( "Event parameter is mandatory for event listener method: " + method); } return Collections.singletonList(ResolvableType.forMethodParameter(method, 0)); } void init(ApplicationContext applicationContext, EventExpressionEvaluator evaluator) { this.applicationContext = applicationContext; this.evaluator = evaluator; } @Override public void onApplicationEvent(ApplicationEvent event) { processEvent(event); } public void processEvent(ApplicationEvent event) { Object[] args = resolveArguments(event); if (shouldHandle(event, args)) { Executor executor = this.getDefaultExecutor(applicationContext); executor.execute(() -> { log.info("AsyncApplicationListenerMethodAdapter.processEvent async execute current thread={}",Thread.currentThread().getName()); Object result = doInvoke(args); if (result != null) { handleResult(result); } else { log.trace("No result object given - no result to handle"); } }); } } protected void handleResult(Object result) { if (result.getClass().isArray()) { Object[] events = ObjectUtils.toObjectArray(result); for (Object event : events) { publishEvent(event); } } else if (result instanceof Collection<?>) { Collection<?> events = (Collection<?>) result; for (Object event : events) { publishEvent(event); } } else { publishEvent(result); } } private void publishEvent(@Nullable Object event) { if (event != null) { Assert.notNull(this.applicationContext, "ApplicationContext must not be null"); this.applicationContext.publishEvent(event); } } }?????
分析一下processEvent方法,先调用getDefaultExecutor获取Executor然后执行监听器逻辑,看一下getDefaultExecutor实现:
protected Executor getDefaultExecutor(@Nullable ApplicationContext applicationContext) { if (applicationContext != null) { try { return applicationContext.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { log.warn("Could not find unique TaskExecutor bean", ex); try { return applicationContext.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { } } catch (NoSuchBeanDefinitionException ex) { log.debug("Could not find default TaskExecutor bean", ex); try { return applicationContext.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { log.info("No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); } // Giving up -> either using local default executor or none at all... } } return ThreadPoolUtil.COMMON_POOL; }?????
看过spring异步编程源码的看到这里会很熟悉,这里借用了其实现思想和逻辑,优先从容器中获取TaskExecutor,如果没有根据指定的名称获取,如果最后还获取不到线程池,那么返回指定的兜底线程池来做异步逻辑.
?????新建异步事件监听器工厂:
public class AsyncEventListenerFactory implements EventListenerFactory, Ordered { private int order = LOWEST_PRECEDENCE; public void setOrder(int order) { this.order = order; } @Override public int getOrder() { return this.order; } public boolean supportsMethod(Method method) { return AnnotatedElementUtils.hasAnnotation(method, AsyncEventListener.class); } @Override public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) { return new AsyncApplicationListenerMethodAdapter(beanName, type, method); } }?????
新增异步EventListenerMethodProcessor用于解析@AsyncEventListener.
public class AsyncEventListenerMethodProcessor implements SmartInitializingSingleton, ApplicationContextAware, BeanFactoryPostProcessor { private void processBean(final String beanName, final Class<?> targetType) { if (!this.nonAnnotatedClasses.contains(targetType) && !isSpringContainerClass(targetType)) { Map<Method, AsyncEventListener> annotatedMethods = null; try { annotatedMethods = MethodIntrospector.selectMethods(targetType, (MethodIntrospector.MetadataLookup<AsyncEventListener>) method -> AnnotatedElementUtils.findMergedAnnotation(method, AsyncEventListener.class)); } catch (Throwable ex) { } if (CollectionUtils.isEmpty(annotatedMethods)) { this.nonAnnotatedClasses.add(targetType); } else { // Non-empty set of methods ConfigurableApplicationContext context = this.applicationContext; Assert.state(context != null, "No ApplicationContext set"); List<EventListenerFactory> factories = this.eventListenerFactories; Assert.state(factories != null, "EventListenerFactory List not initialized"); for (Method method : annotatedMethods.keySet()) { for (EventListenerFactory factory : factories) { if ( factory.supportsMethod(method)) { Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName)); ApplicationListener<?> applicationListener = factory.createApplicationListener(beanName, targetType, methodToUse); if (applicationListener instanceof AsyncApplicationListenerMethodAdapter) { ((AsyncApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator); } context.addApplicationListener(applicationListener); break; } } } if (logger.isDebugEnabled()) { logger.debug(annotatedMethods.size() + " @EventListener methods processed on bean '" + beanName + "': " + annotatedMethods); } } } } }
解析@AsyncEventListener注解的方法并封装成AsyncApplicationListenerMethodAdapter添加到容器中.
????是骡子是马拉出来遛遛,使用@AsyncEventListener注解方法:
@Component @Slf4j public class Test2SwitchStateListener { @AsyncEventListener protected void switchStateListener(TestSwitchStateEvent event) { log.info("Test2SwitchStateListener.switchStateListener receive event={}",event); } }
执行成功并且开启了新线程异步执行.
本篇介绍了事件驱动编程和几种常见的事件编程实现,对于设计和实现维度来说,guava的事件总线和spring事件驱动都比较成熟,功能比较完善,能够满足大部分业务场景,对于使用spring全家桶的应用可以直接使用spring事件驱动编程,其他情况下也没有严格意义上的孰优孰劣之分,看个人和团队使用习惯.
简单总结一下,事件驱动有三个重要概念:事件、事件发布者和事件监听者,事件驱动解决的是应用内部业务解耦,实现的时候注意要做异步化.
前言 微服务成了互联网架构的标配模式,对微服务之间的调用的流量治理和管控就尤...
3月24日,腾讯发布2020年Q4及全年财报,其中金融科技及企业服务第四季收入385亿...
1.百度是个大骗子,我抄了十几年的满分作文却从未得过满分。 2.学神在刷难题,...
作者 | 楚奕 来源 | 阿里技术公众号 这篇文章主要从技术视角介绍下跨平台WebCanv...
本文转载自微信公众号「后端Q」,作者conan。转载本文请联系后端Q公众号。 概述 ...
1.某女生寝室门口贴着一个告示男生与饭盒不得入内,问何解?答曰两者都会搞大女...
创业与投资的本质,都是追寻一种能够穿越时空,抵达未来的高效方式。 德勤管理咨...
1.在报名的路上,我看见远处的学校,轰!的一声没了。希望如此。 2.男:我一直...
背景 有时候我会碰到快速搭建测试服务的需求,比如像这样: 搭建一个 HTTP Servi...
基于阿里巴巴的互联网架构、大数据技术,利用混合云架构打造全新的云化电子税 务...