队列
分布式队列是一种常见的数据结构。为了在ZooKepeer中实现分布式队列,第一步是要使用一个znode代表队列本身。分布式客户端通过create()方法将内容放入一个名叫"queue-"的sequence ephemeral节点。由于是使用sequence节点,创建的路径将是_path-to-queue_/queue-X,其中X是一个递增的数字。当一个客户端希望移除一个内容,它可以调用getChildren方法,并设置watch为true,并从最小的子节点开始处理。不到整个子节点处理完成,客户端不需要再次调用getChildren方法。如果没有子节点,消费者可以等待watch触发再次查询队列。
在ZooKepeer的发布中src/recipes/queue(https://github.com/apache/zookeeper/tree/master/src/recipes/queue)的目录有ZooKeeper的Queue实现
优先级队列
如果需要实现一个优先级队列,你只要在以上队列中进行两处简单改动。第一在增加到队列中时,路径名称使用"queue-YY",其中YY表示优先级,越小优先级越高。第二,在移除节点时,客户端使用实时的子节点列表。也就是当watch事件触发会使用新的子元素列表来取代原有的列表
Curator实现
public static <T> QueueBuilder<T> builder(CuratorFramework client, QueueConsumer<T> consumer, QueueSerializer<T> serializer, java.lang.String queuePath)
/**
创建queue的Builder方法
client 客户端实例
consumer 消息消费者-如果是生产者可以设为null
serializer 序列化用
queuePath 队列的地址
**/
QueueBuilder<MessageType> builder = QueueBuilder.builder(client, consumer, serializer, path);
DistributedQueue<MessageType queue = builder.build();
queue.put(aMessage); //将消息放入队列
其中QueueBuilder可以使用函数式的方法调用更多配置的方法,如lockPath可以确保消息在成功处理之后才会被从queue中删除,达到类似事务的功能。
问题
Curator不建议使用ZooKeeper来实现Queue。原因如下
- ZooKeeper有1MB的传输限制。所以ZNode需要较小。较难处理大数据量的队列
- ZooKeeper在拥有大ZNode时启动时会相对较慢,会显著拖慢queue初始化和同步过程
- ZNode过大时会较难清理。getChildren会调用失败。Netflix自己创建了特定的处理过程,增大jute.maxbuffer来处理并删除大节点
- ZooKeeper性能在有很多节点且拥有大量子节点的情况下表现不佳
- ZooKeeper数据时存在内存中的。所以内存大小会是一个瓶颈