Design Patterns

views 575 words

opt + cmd + L 对齐

Observer (publisher / subscriber)

What

订阅-发布模式定义了对象之间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖它的对象都可以得到通知, 并被自动更新.

观察者模式的通知方式:

  1. 通过直接调用等同步方式实现(如函数调用, HTTP接口调用等)
    • 同步调用指被观察者发布消息后,必须等所有观察者响应结束后才可以进行接下来的操作
  2. 通过消息队列异步调用
    • 异步调用指被观察者发布消息后,即可进行接下来的操作

观察者模式的特点:

  • 优点
    1. 观察者与被观察者之间是抽象耦合的 (时间 和 空间)
    2. 可以将许多符合单一职责原则的模块进行触发,也可以很方便地实现广播
  • 缺点
    1. 观察者模式可能会带来整体系统效率的浪费
    2. 如果被观察者之间有依赖关系,其逻辑关系的梳理需要费些心思
  • 应用场景
    1. 消息交换场景, 如消息队列等
    2. 多级触发场景, 比如支持中断模式的场景中,一个中断即会引发一连串反应,就可以使用观察者模式

订阅-发布模式 V.S. 观察者模式

  • 订阅-发布模式和观察者模式概念相似,但在订阅-发布模式中,订阅者和发布者之间多了一层中间件:一个被抽象出来的信息调度中心

Diagram

-w803

-w798

Example

例子: 类似微信公众号, 用户(订阅者)订阅某个公众号(发布者), 公众号(发布者)发布信息后, 用户(订阅者)可以实时更新. (这里用不同的平台版本号作为是否可以注册发布者的条件)

调度中心

class _PublisherManager(object):
    def __init__(self):
        self.publisher_objs_map = {}
        self.publisher_to_subscriber_map = {}

    def register_publisher(self, publisher_obj):
        if publisher_obj in self.publisher_objs_map:
            print('You have been registered before')
            return

        self.publisher_objs_map[publisher_obj] = []
        self.publisher_to_subscriber_map[publisher_obj] = []

    def publish_msg(self, publisher_obj, msg):
        if publisher_obj not in self.publisher_objs_map:
            print('Please register and become a publisher before publish msg')
            return
        self.publisher_objs_map[publisher_obj].append(msg)
        # print(self.publisher_objs_map, '~~~~~')
        subscriber_objs = self.publisher_to_subscriber_map[publisher_obj]
        # print(self.publisher_to_subscriber_map, '!!!!!')

        self.notify_msg(subscriber_objs, msg)

    def subscribe_msg(self, subscriber_obj, publisher_obj):
        if publisher_obj not in self.publisher_objs_map:
            print(f'this publisher: {publisher_obj.publisher_name} not exist')
            return

        self.publisher_to_subscriber_map[publisher_obj].append(subscriber_obj)

    def notify_msg(self, subscriber_objs, msg):
        for subscriber_obj in subscriber_objs:
            # print(subscriber_obj,'========')
            subscriber_obj.receive_new_msg(msg)


pm = _PublisherManager()

发布者

class PublisherBase(object):
    def __init__(self, name):
        self.publisher_name = name

    def register_publisher(self):
        if not self.check_register_condition():
            print('sorry, register failed')
            return

        pm.register_publisher(self)

    def publish_msg(self, msg):
        pm.publish_msg(self, msg)

    def check_register_condition(self):
        pass


class PublisherAndroid(PublisherBase):
    def __init__(self, name):
        super(PublisherAndroid, self).__init__(name)
        self.system_version = 0

    def set_system_version(self, version_id):
        self.system_version = version_id

    def check_register_condition(self):
        if self.system_version >= 4.0:
            return True
        else:
            print(f'current version: {self.system_version} is too low')
            return False


class PublisherIOS(PublisherBase):
    def __init__(self, name):
        super(PublisherIOS, self).__init__(name)
        self.system_version = 0

    def set_system_version(self, version_id):
        self.system_version = version_id

    def check_register_condition(self):
        if self.system_version >= 8.0:
            return True
        else:
            print(f'current version: {self.system_version} is too low')
            return False

订阅者

class SubscriberBase(object):
    def __init__(self, name):
        self.subscriber_name = name

    def subscribe_msg(self, publisher):
        pm.subscribe_msg(self, publisher)

    def receive_new_msg(self, msg):
        pass


class SubscriberAndroid(SubscriberBase):
    def receive_new_msg(self, msg):
        print(f'Android platform user:{self.subscriber_name} receive a new msg: {msg}')


class SubscriberIOS(SubscriberBase):
    def receive_new_msg(self, msg):
        print(f'IOS platform user:{self.subscriber_name} receive a new msg: {msg}')
if __name__ == '__main__':
    new_publisher = PublisherAndroid('pythonCommunity')
    new_publisher.set_system_version(6.0)
    new_publisher.register_publisher()

    user1 = SubscriberAndroid('Alan')
    user2 = SubscriberIOS('Charon')

    user1.subscribe_msg(new_publisher)
    user2.subscribe_msg(new_publisher)

    msg = {'title': "HELLO PYTHON"}
    # new_publisher.publish_msg(msg)
    new_publisher1 = PublisherAndroid('testName')
    # new_publisher1.register_publisher()
    user1.subscribe_msg(new_publisher1)

    new_publisher1.publish_msg(msg)

例子2 - 观察者模式 (没有调度中心):

class SubjectBase:

    def register(self, observer):
        pass

    def unregister(self, observer):
        pass

    def notify(self, msg):
        pass


class Weather(SubjectBase):

    def __init__(self):
        self.observers = []

    def register(self, observer):
        if not observer in self.observers:
            self.observers.append(observer)

    def unregister(self, observer):
        self.observers.remove(observer)

    def notify(self, msg):
        for observer in self.observers:
            observer.do_something(msg)


class Observer:

    def do_something(self, msg):
        pass


class Ruhua(Observer):

    def do_something(self, msg):
        if '雨' in msg:
            print('下雨了,如花可以去桥边等伯虎了')
        elif '雪' in msg:
            print('下雪了,如花可以去堆雪人了')
        elif '太阳' in msg:
            print('出太阳了,如花要去网吧了')


class Cuihua(Observer):

    def do_something(self, msg):
        if '雨' in msg:
            print('下雨了,翠花要做酸菜了')
        elif '雪' in msg:
            print('下雪了,翠花要喂雪人吃酸菜了')
        elif '太阳' in msg:
            print('出太阳了,翠花,上酸菜')


class Cunhua(Observer):

    def do_something(self, msg):
        if '雨' in msg:
            print('下雨了,村花在赏雨')
        elif '雪' in msg:
            print('下雪了,村花在赏雪')
        elif '太阳' in msg:
            print('出太阳了,村花在日光浴')


if __name__ == '__main__':
    ruhua = Ruhua()
    cuihua = Cuihua()
    cunhua = Cunhua()

    weather = Weather()
    weather.register(ruhua)
    weather.register(cuihua)
    weather.register(cunhua)

    weather.notify('下雨了')
    weather.notify('出太阳了')
    weather.notify('下雪了')

例子3 - 观察者模式 (没有调度中心):

"""
观察者模式
从业务流程的实现角度,实现该火警报警器。
"""
 
# class AlarmSensor:
#     def run(self):
#         print ("报警器...")
#
#
# class WaterSprinker:
#     def run(self):
#         print ("洒水器...")
#
#
# class EmergencyDialer:
#     def run(self):
#         print ("拨 119...")
 
 
# 将三个类提取共性,泛化出“观察者”类,并构造被观察者
class Observer:
    def update(self):
        pass
 
 
class AlarmSensor (Observer):
    def update(self, action):
        print ("收到报警: %s" % action)
        self.runAlarm ()
    
    def runAlarm(self):
        print ("报警器...")
 
 
class WaterSprinker (Observer):
    def update(self, action):
        print ("收到洒水: %s" % action)
        self.runSprinker ()
    
    def runSprinker(self):
        print ("洒水器...")
 
 
class EmergencyDialer (Observer):
    def update(self, action):
        print ("收到拨号: %s" % action)
        self.runDialer ()
    
    def runDialer(self):
        print ("拨 119...")
 
 
# 被观察者
class Observed:
    observers=[]
    action=""
    def addObserver(self,observer):
        self.observers.append(observer)
    def notifyAll(self):
        for obs in self.observers:
            obs.update(self.action)
class smokeSensor(Observed):
    def setAction(self,action):
        self.action=action
    def isFire(self):
        return True
 
 
if __name__=="__main__":
    alarm=AlarmSensor()
    sprinker=WaterSprinker()
    dialer=EmergencyDialer()
 
    smoke_sensor=smokeSensor()
    smoke_sensor.addObserver(alarm)
    smoke_sensor.addObserver(sprinker)
    smoke_sensor.addObserver(dialer)
 
 
    if smoke_sensor.isFire():
        smoke_sensor.setAction("着火!")
        smoke_sensor.notifyAll()