Design Patterns
opt + cmd + L 对齐
Observer (publisher / subscriber)
What
订阅-发布模式定义了对象之间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖它的对象都可以得到通知, 并被自动更新.
观察者模式的通知方式:
- 通过直接调用等同步方式实现(如函数调用, HTTP接口调用等)
- 同步调用指被观察者发布消息后,必须等所有观察者响应结束后才可以进行接下来的操作
- 通过消息队列异步调用
- 异步调用指被观察者发布消息后,即可进行接下来的操作
观察者模式的特点:
- 优点
- 观察者与被观察者之间是抽象耦合的 (时间 和 空间)
- 可以将许多符合单一职责原则的模块进行触发,也可以很方便地实现广播
- 缺点
- 观察者模式可能会带来整体系统效率的浪费
- 如果被观察者之间有依赖关系,其逻辑关系的梳理需要费些心思
- 应用场景
- 消息交换场景, 如消息队列等
- 多级触发场景, 比如支持中断模式的场景中,一个中断即会引发一连串反应,就可以使用观察者模式
订阅-发布模式 V.S. 观察者模式
- 订阅-发布模式和观察者模式概念相似,但在订阅-发布模式中,订阅者和发布者之间多了一层中间件:一个被抽象出来的信息调度中心
Diagram
Example
例子: 类似微信公众号, 用户(订阅者)订阅某个公众号(发布者), 公众号(发布者)发布信息后, 用户(订阅者)可以实时更新. (这里用不同的平台版本号作为是否可以注册发布者的条件)
调度中心
class _PublisherManager(object):
def __init__(self):
self.publisher_objs_map = {}
self.publisher_to_subscriber_map = {}
def register_publisher(self, publisher_obj):
if publisher_obj in self.publisher_objs_map:
print('You have been registered before')
return
self.publisher_objs_map[publisher_obj] = []
self.publisher_to_subscriber_map[publisher_obj] = []
def publish_msg(self, publisher_obj, msg):
if publisher_obj not in self.publisher_objs_map:
print('Please register and become a publisher before publish msg')
return
self.publisher_objs_map[publisher_obj].append(msg)
# print(self.publisher_objs_map, '~~~~~')
subscriber_objs = self.publisher_to_subscriber_map[publisher_obj]
# print(self.publisher_to_subscriber_map, '!!!!!')
self.notify_msg(subscriber_objs, msg)
def subscribe_msg(self, subscriber_obj, publisher_obj):
if publisher_obj not in self.publisher_objs_map:
print(f'this publisher: {publisher_obj.publisher_name} not exist')
return
self.publisher_to_subscriber_map[publisher_obj].append(subscriber_obj)
def notify_msg(self, subscriber_objs, msg):
for subscriber_obj in subscriber_objs:
# print(subscriber_obj,'========')
subscriber_obj.receive_new_msg(msg)
pm = _PublisherManager()
发布者
class PublisherBase(object):
def __init__(self, name):
self.publisher_name = name
def register_publisher(self):
if not self.check_register_condition():
print('sorry, register failed')
return
pm.register_publisher(self)
def publish_msg(self, msg):
pm.publish_msg(self, msg)
def check_register_condition(self):
pass
class PublisherAndroid(PublisherBase):
def __init__(self, name):
super(PublisherAndroid, self).__init__(name)
self.system_version = 0
def set_system_version(self, version_id):
self.system_version = version_id
def check_register_condition(self):
if self.system_version >= 4.0:
return True
else:
print(f'current version: {self.system_version} is too low')
return False
class PublisherIOS(PublisherBase):
def __init__(self, name):
super(PublisherIOS, self).__init__(name)
self.system_version = 0
def set_system_version(self, version_id):
self.system_version = version_id
def check_register_condition(self):
if self.system_version >= 8.0:
return True
else:
print(f'current version: {self.system_version} is too low')
return False
订阅者
class SubscriberBase(object):
def __init__(self, name):
self.subscriber_name = name
def subscribe_msg(self, publisher):
pm.subscribe_msg(self, publisher)
def receive_new_msg(self, msg):
pass
class SubscriberAndroid(SubscriberBase):
def receive_new_msg(self, msg):
print(f'Android platform user:{self.subscriber_name} receive a new msg: {msg}')
class SubscriberIOS(SubscriberBase):
def receive_new_msg(self, msg):
print(f'IOS platform user:{self.subscriber_name} receive a new msg: {msg}')
if __name__ == '__main__':
new_publisher = PublisherAndroid('pythonCommunity')
new_publisher.set_system_version(6.0)
new_publisher.register_publisher()
user1 = SubscriberAndroid('Alan')
user2 = SubscriberIOS('Charon')
user1.subscribe_msg(new_publisher)
user2.subscribe_msg(new_publisher)
msg = {'title': "HELLO PYTHON"}
# new_publisher.publish_msg(msg)
new_publisher1 = PublisherAndroid('testName')
# new_publisher1.register_publisher()
user1.subscribe_msg(new_publisher1)
new_publisher1.publish_msg(msg)
例子2 - 观察者模式 (没有调度中心):
class SubjectBase:
def register(self, observer):
pass
def unregister(self, observer):
pass
def notify(self, msg):
pass
class Weather(SubjectBase):
def __init__(self):
self.observers = []
def register(self, observer):
if not observer in self.observers:
self.observers.append(observer)
def unregister(self, observer):
self.observers.remove(observer)
def notify(self, msg):
for observer in self.observers:
observer.do_something(msg)
class Observer:
def do_something(self, msg):
pass
class Ruhua(Observer):
def do_something(self, msg):
if '雨' in msg:
print('下雨了,如花可以去桥边等伯虎了')
elif '雪' in msg:
print('下雪了,如花可以去堆雪人了')
elif '太阳' in msg:
print('出太阳了,如花要去网吧了')
class Cuihua(Observer):
def do_something(self, msg):
if '雨' in msg:
print('下雨了,翠花要做酸菜了')
elif '雪' in msg:
print('下雪了,翠花要喂雪人吃酸菜了')
elif '太阳' in msg:
print('出太阳了,翠花,上酸菜')
class Cunhua(Observer):
def do_something(self, msg):
if '雨' in msg:
print('下雨了,村花在赏雨')
elif '雪' in msg:
print('下雪了,村花在赏雪')
elif '太阳' in msg:
print('出太阳了,村花在日光浴')
if __name__ == '__main__':
ruhua = Ruhua()
cuihua = Cuihua()
cunhua = Cunhua()
weather = Weather()
weather.register(ruhua)
weather.register(cuihua)
weather.register(cunhua)
weather.notify('下雨了')
weather.notify('出太阳了')
weather.notify('下雪了')
例子3 - 观察者模式 (没有调度中心):
"""
观察者模式
从业务流程的实现角度,实现该火警报警器。
"""
# class AlarmSensor:
# def run(self):
# print ("报警器...")
#
#
# class WaterSprinker:
# def run(self):
# print ("洒水器...")
#
#
# class EmergencyDialer:
# def run(self):
# print ("拨 119...")
# 将三个类提取共性,泛化出“观察者”类,并构造被观察者
class Observer:
def update(self):
pass
class AlarmSensor (Observer):
def update(self, action):
print ("收到报警: %s" % action)
self.runAlarm ()
def runAlarm(self):
print ("报警器...")
class WaterSprinker (Observer):
def update(self, action):
print ("收到洒水: %s" % action)
self.runSprinker ()
def runSprinker(self):
print ("洒水器...")
class EmergencyDialer (Observer):
def update(self, action):
print ("收到拨号: %s" % action)
self.runDialer ()
def runDialer(self):
print ("拨 119...")
# 被观察者
class Observed:
observers=[]
action=""
def addObserver(self,observer):
self.observers.append(observer)
def notifyAll(self):
for obs in self.observers:
obs.update(self.action)
class smokeSensor(Observed):
def setAction(self,action):
self.action=action
def isFire(self):
return True
if __name__=="__main__":
alarm=AlarmSensor()
sprinker=WaterSprinker()
dialer=EmergencyDialer()
smoke_sensor=smokeSensor()
smoke_sensor.addObserver(alarm)
smoke_sensor.addObserver(sprinker)
smoke_sensor.addObserver(dialer)
if smoke_sensor.isFire():
smoke_sensor.setAction("着火!")
smoke_sensor.notifyAll()