数据总线模式
@SuppressWarnings("boxing")
public class DataBus {
/**
* 数据总线模式:
* Allows send of messages/events between components of an application
* without them needing to know about each other.
* They only need to know about the type of the message/event being sent.
* 允许在应用程序的组件之间发送消息,组件之间不需要相互了解。
* 他们只需要知道发送消息或事件的类型。
*/
@Test
public void all() {
final IDataBus dataBus = DataBusImpl.of(Lists.newArrayList());
dataBus.attach(new MessageMember());
dataBus.attach(new AggregateMemeber());
dataBus.publish(StartEvent.of(System.currentTimeMillis()));
dataBus.publish(MessageEvent.of("hello"));
dataBus.publish(MessageEvent.of("world"));
dataBus.publish(EndEvent.of(System.currentTimeMillis()));
}
}
/**
* 1)数据总线抽象
*/
interface IDataBus {
void attach(Member member);
void detach(Member member);
void publish(DataEvent dataEvent);
}
/**
* 2)参与者抽象
*/
interface Member extends Consumer<DataEvent> {
IDataBus getDataBus();
void inject(IDataBus dataBus);
}
/**
* 3)事件抽象
*/
interface DataEvent {
}
/**
* 4)数据总线实现
*/
@Value(staticConstructor = "of")
class DataBusImpl implements IDataBus {
private List<Member> members;
@Override
public void attach(Member member) {
members.add(member);
member.inject(this);
}
@Override
public void detach(Member member) {
members.remove(member);
}
@Override
public void publish(DataEvent dataEvent) {
members.forEach(mem -> mem.accept(dataEvent));
}
}
/**
* 5)具体的事件
*/
@Value(staticConstructor = "of")
class StartEvent implements DataEvent {
private final Long start;
}
@Value(staticConstructor = "of")
class EndEvent implements DataEvent {
private final Long end;
}
@Value(staticConstructor = "of")
class MessageEvent implements DataEvent {
private final String message;
}
/**
* 6)具体的参与者
*/
@Value(staticConstructor = "of")
class AggregateEvent implements DataEvent {
private final String aggregatedmessage;
}
abstract class BaseMember implements Member {
private IDataBus dataBus;
@Override
public void inject(IDataBus dataBus) {
this.dataBus = dataBus;
}
@Override
public IDataBus getDataBus() {
return dataBus;
}
}
@Slf4j
class MessageMember extends BaseMember {
private Long start;
private Long end;
private final List<String> messages = Lists.newArrayList();;
public void handle(StartEvent t) {
start = t.getStart();
messages.clear();
}
public void handle(EndEvent t) {
end = t.getEnd();
final String join = String.join("-", messages);
log.info("start:{} end:{} messages:{}", start, end, join);
getDataBus().publish(AggregateEvent.of(join));
}
public void handle(MessageEvent t) {
messages.add(t.getMessage());
}
@Override
public void accept(DataEvent t) {
if (StartEvent.class.isInstance(t)) {
handle((StartEvent) t);
} else if (EndEvent.class.isInstance(t)) {
handle((EndEvent) t);
} else if (MessageEvent.class.isInstance(t)) {
handle((MessageEvent) t);
}
}
}
@Slf4j
class AggregateMemeber extends BaseMember {
@Override
public void accept(DataEvent t) {
if (AggregateEvent.class.isInstance(t)) {
final AggregateEvent ae = (AggregateEvent) t;
log.info("received:{} ", ae.getAggregatedmessage());
}
}
}