观察者模式,又称为发布订阅模式,属于行为模式。
定义一对多的依赖关系,观察者和被观察者,订阅者发发布者的比喻都很形象。
java中对该模式有原生实现,这里贴上删除注释的代码
public interface Observer {
void update(Observable o, Object arg);
}
public class Observable {
private boolean changed = false;
private Vector<Observer> obs;
public Observable() {
obs = new Vector<>();
}
public synchronized void addObserver(Observer o) {
if (o == null)
throw new NullPointerException();
if (!obs.contains(o)) {
obs.addElement(o);
}
}
public synchronized void deleteObserver(Observer o) {
obs.removeElement(o);
}
public void notifyObservers() {
notifyObservers(null);
}
public void notifyObservers(Object arg) {
Object[] arrLocal;
synchronized (this) {
if (!changed)
return;
arrLocal = obs.toArray();
clearChanged();
}
for (int i = arrLocal.length-1; i>=0; i--)
((Observer)arrLocal[i]).update(this, arg);
}
public synchronized void deleteObservers() {
obs.removeAllElements();
}
protected synchronized void setChanged() {
changed = true;
}
protected synchronized void clearChanged() {
changed = false;
}
public synchronized boolean hasChanged() {
return changed;
}
public synchronized int countObservers() {
return obs.size();
}
}
实际上需要关注的只有几个方法,Observer 的 update 用来给 Observable 调用,也就是发送消息通知他。
然后 Obserable 有 addObserver ,deleteObserver 方法用来添加删除观察者,还有一个 notifyObservers 用来通知观察者更新。
被观察者维护一个 Vector,来添加不定量的观察者。
对这个实现来说,当更新时需要先调用 setChanged(只是他选择这样实现,这一步不是必须的)。
如果需要实现自己的观察者/被观察者,直接实现 Observer 接口和继承 Observable 类(或者组合他,组合优于继承)。
UML
这里模仿java的实现给出py的版本,不同只实现最小的必要方法,Observer 的 update 方法,以及 Observable 的 addObserver, deleteObserver, notifyObservers。同样保证添加和删除是线程安全的。
import abc
from threading import Lock
def synchronize(*func_names):
lock = Lock()
def func_decorator(func):
def wrapper(*args, **kwargs):
with lock:
return func(*args, **kwargs)
return wrapper
def decorator(cls):
sync_funcs = {func_name: func_decorator(getattr(cls, func_name)) for func_name in func_names}
cls_wraper = type(cls.__name__, (cls,), sync_funcs)
return cls_wraper
return decorator
@synchronize('add_observer', 'delete_observer', 'notify_observers')
class Observable:
def __init__(self):
self._obs = []
def add_observer(self, observer):
self._obs.append(observer)
def delete_observer(self, observer):
self._obs.remove(observer)
def notify_observers(self, data):
for ob in self._obs:
ob.update(self, data)
class Observer(abc.ABC):
@abc.abstractmethod
def update(self, observable, data):
pass
不过最好还是更改 Observable 类,使用一个新的类型来管理 observers
@synchronize('append', 'remove', '__iter__')
class _ObserverBunch:
def __init__(self):
self._obs = []
def append(self, obs):
self._obs.append(obs)
def remove(self, obs):
self._obs.remove(obs)
def __iter__(self):
return tuple(self._obs)
class Observable:
def __init__(self):
self._obs_bunch = _ObserverBunch()
def add_observer(self, observer):
self._obs_bunch.append(observer)
def delete_observer(self, observer):
self._obs_bunch.remove(observer)
def notify_observers(self, data):
for obs in self._obs_bunch :
obs.update(self, data)
当然这里还是少了一些验证代码,不过也不用将这些验证代码写入到 Observable 类的方法中了.
修改 _ObserverBunch 类:
@synchronize('append', 'remove', '__iter__')
class _ObserverBunch:
def __init__(self):
self._obs = []
def append(self, obs):
assert obs is not None
if obs in self._obs:
return
self._obs.append(obs)
def remove(self, obs):
assert obs is not None
assert obs in self._obs
self._obs.remove(obs)
def __iter__(self):
return tuple(self._obs)
当然,可以写成异常,这里简单起见,作为合约