前言

观察者模式(Observer Pattern)和发布订阅模式(Publish-Subscribe Pattern)都是常见的行为设计模式,用于实现对象之间的解耦和通信。它们之间的主要区别在于对象之间的关系和通信方式。

观察者模式:

  • 关系:观察者模式中存在一个被观察者(Subject)和多个观察者(Observer)之间的一对多关系。当被观察者的状态发生变化时,所有观察者都会收到通知并进行相应的更新。
  • 通信方式:被观察者直接通知观察者,观察者通过注册或订阅的方式与被观察者建立联系,当被观察者状态改变时,观察者会得到通知。

发布订阅模式:

  • 关系:发布订阅模式中引入了一个消息代理(Message Broker)或事件通道(Event Channel),发布者(Publisher)将消息发布到通道中,订阅者(Subscriber)从通道中订阅感兴趣的消息。
  • 通信方式:发布者和订阅者之间通过消息代理进行通信,发布者不需要直接知道订阅者的存在,订阅者也不需要直接知道发布者的存在,它们之间通过消息通道进行解耦。

形象例子:

假设一个新闻发布系统,有多个用户(观察者)希望订阅不同类型的新闻。这里可以使用观察者模式和发布订阅模式来实现:

  • 观察者模式:每个用户订阅自己感兴趣的新闻频道(观察者注册到被观察者),当某一类型的新闻发布时,所有订阅该类型的用户都会收到通知并查看相关新闻。
  • 发布订阅模式:新闻发布系统将不同类型的新闻发布到对应的频道(消息通道),用户可以选择订阅自己感兴趣的频道(订阅者订阅消息通道),当新闻发布到频道时,订阅该频道的用户会收到相关新闻。

通过消息代理实现发布者与订阅者的解耦在发布订阅模式中有以下好处:

  1. 松耦合:发布者和订阅者之间的解耦使得它们可以独立演化,互不影响。发布者无需关注订阅者的存在,而订阅者也无需关注具体的发布者。这种松耦合的设计使得系统更加灵活、可扩展和易于维护。
  2. 可伸缩性:发布订阅模式支持多个发布者和多个订阅者之间的通信。当系统需要增加新的发布者或订阅者时,只需要将其连接到消息代理即可,而不需要修改已有的发布者或订阅者的代码。这种可伸缩性使得系统可以方便地进行扩展。
  3. 解耦复杂性:由于发布者和订阅者之间通过消息代理进行通信,发布者无需关注订阅者的逻辑处理,而订阅者也无需关注发布者的具体实现。这样可以将复杂的通信逻辑集中在消息代理中,简化了发布者和订阅者的逻辑,提高了系统的可读性和可维护性。
  4. 灵活性:发布订阅模式可以支持不同类型的消息和多个订阅者之间的多对多关系。发布者可以根据需要选择要发布的消息类型,而订阅者可以根据自己的需求选择订阅感兴趣的消息类型。这种灵活性使得系统可以根据实际情况进行定制化的通信。

总而言之,通过消息代理实现发布者与订阅者的解耦可以提供松耦合、可伸缩性、解耦复杂性和灵活性等好处。这使得系统更加灵活、可扩展和易于维护,并支持复杂的通信需求。

具体应用

观察者模式

见之前写的 《前端分享—ES6之Promise源码系列【干货】 》 中通过分析Promise的调用流程:

  • Promise的构造方法接收一个executor(),在new Promise()时就立刻执行这个executor回调
  • executor()内部的异步任务被放入宏/微任务队列,等待执行
  • then()被执行,收集成功/失败回调,放入成功/失败队列
  • executor()的异步任务被执行,触发resolve/reject,从成功/失败队列中取出回调依次执行

意识到这是个「观察者模式」,这种收集依赖 -> 触发通知 -> 取出依赖执行 的方式,被广泛运用于观察者模式的实现,在Promise里,执行顺序是then收集依赖 -> 异步触发resolve -> resolve执行依赖

发布订阅模式

《前端接口防止重复请求实现方案》中通过发布订阅模式去解决接口重复调用的问题

 1import axios from "axios"
 2
 3let instance = axios.create({
 4    baseURL: "/api/"
 5})
 6
 7
 8class EventEmitter {
 9    constructor() {
10        this.event = {}
11    }
12    on(type, cbres, cbrej) {
13        if (!this.event[type]) {
14            this.event[type] = [[cbres, cbrej]]
15        } else {
16            this.event[type].push([cbres, cbrej])
17        }
18    }
19
20    emit(type, res, ansType) {
21        if (!this.event[type]) return
22        else {
23            this.event[type].forEach(cbArr => {
24                if(ansType === 'resolve') {
25                    cbArr[0](res)
26                }else{
27                    cbArr[1](res)
28                }
29            });
30        }
31    }
32}
33
34
35
36function generateReqKey(config, hash) {
37    const { method, url, params, data } = config;
38    return [method, url, JSON.stringify(params), JSON.stringify(data), hash].join("&");
39}
40
41
42const pendingRequest = new Set();
43
44const ev = new EventEmitter()
45
46
47instance.interceptors.request.use(async (config) => {
48    let hash = location.hash
49    
50    let reqKey = generateReqKey(config, hash)
51    
52    if(pendingRequest.has(reqKey)) {
53        
54        
55        let res = null
56        try {
57            
58          res = await new Promise((resolve, reject) => {
59                    ev.on(reqKey, resolve, reject)
60                })
61          return Promise.reject({
62                    type: 'limiteResSuccess',
63                    val: res
64                })
65        }catch(limitFunErr) {
66            
67            return Promise.reject({
68                        type: 'limiteResError',
69                        val: limitFunErr
70                    })
71        }
72    }else{
73        
74        config.pendKey = reqKey
75        pendingRequest.add(reqKey)
76    }
77
78    return config;
79  }, function (error) {
80    return Promise.reject(error);
81  });
82
83
84instance.interceptors.response.use(function (response) {
85    
86    handleSuccessResponse_limit(response)
87    return response;
88  }, function (error) {
89    return handleErrorResponse_limit(error)
90  });
91
92
93function handleSuccessResponse_limit(response) {
94      const reqKey = response.config.pendKey
95    if(pendingRequest.has(reqKey)) {
96      let x = null
97      try {
98        x = JSON.parse(JSON.stringify(response))
99      }catch(e) {
100        x = response
101      }
102      pendingRequest.delete(reqKey)
103      ev.emit(reqKey, x, 'resolve')
104      delete ev.reqKey
105    }
106}
107
108
109function handleErrorResponse_limit(error) {
110    if(error.type && error.type === 'limiteResSuccess') {
111      return Promise.resolve(error.val)
112    }else if(error.type && error.type === 'limiteResError') {
113      return Promise.reject(error.val);
114    }else{
115      const reqKey = error.config.pendKey
116      if(pendingRequest.has(reqKey)) {
117        let x = null
118        try {
119          x = JSON.parse(JSON.stringify(error))
120        }catch(e) {
121          x = error
122        }
123        pendingRequest.delete(reqKey)
124        ev.emit(reqKey, x, 'reject')
125        delete ev.reqKey
126      }
127    }
128      return Promise.reject(error);
129}
130
131export default instance;
132

总结

参考:

juejin.cn/post/734184… blog.csdn.net/weixin_4581…

个人笔记记录 2021 ~ 2025