JavaGroups是一种可靠组通信工具,在同一个台主机、局域网甚至是广域网中,组成员可以加入一个组,发送消息给其它的组成员并从其它成员中接收消息,系统跟踪所有组成员加入、退出和崩溃,并将这些系统信息发送给其它组成员。在JavaGroups中,组并不需要明确地创建,当第一个成员加入一个组时,自动创建了该组,第一个成员同时作为系统的协调者统一发送系统信息(譬如成员的加入退出等)给其它成员,而其他组成员通过与系统协调者的通信来获得系统的变化情况。目前大部分开源的分布式缓存的底层都是基于JGroups,包括鼎鼎大名的JBossCache、OSCache等等。    上图为JavaGroups的概念架构图。可以看出,JavaGroups从概念上自下而上分为三个部分,协议栈、通道(Channel接口)、BuildingBlocks And DistributedDataStructures (高层组件和分布式数据结构)和应用程序。其中,通道、高层组件和分布式数据结构是系统暴露出来的接口,用户应用程序通过与该部分的交互来使用底层的通信功能。以下将分别对各部分进行详细介绍。
    协议栈是系统最核心的部分,其定义了组成员之间如何通信、协调,组如何初始化,组如何检测成员等等最核心和根本的功能。协议栈是一些有顺序的协议的组合,当通道发送一条消息时,首先将其传递给协议栈(Protocol stack),协议栈将其传递给最顶端的协议处理,每个协议处理完之后将其传递给其底端协议处理,最后,最底端的协议将消息通过网络将其传送出去。
    下图是一个协议堆例子,其中:CAUSAL协议定义了一个组成员顺序发送出去的消息必须按相同的顺序被其它的组成员所接受;GMS协议定义了组管理服务,包括成员的加入、离开和崩溃的处理;MERGE协议定义了子组的合并,即协调者定期广播一些HELLO和地址信息,如果接受到回应和其组名一致并且目前还没加入本组,则将其合并进来;FRAG定义了消息分桢机制,即当一个消息超过一定大小的时候将其分解为几个小消息发送;UDP定义了底层的传送机制。协议栈灵活的定义机制使用户可以灵活地根据自己的需求来定义使用的协议栈。    下面列出一些常用的协议的作用:
    1)核心微协议
CAUSAL:组中消息的原因排序。实现使用一个矢量时钟
FD:使用 heartbeat 协议的故障检测。根据成员关系列表中的排序,Heartbeat 消息被发送到邻居成员
FD_SOCK:基于 TCP 套接字的故障检测。基于环 的 ping 被在邻居成员之间发送。当所有成员都在同一物理主机上时工作得最好
FD_PID: 使用进程 ID 的故障检测 (本地 JNI 代码,以获得所需的 PID)。只能在同一台主机上 (一个 IP 地址) 工作
FD_PROB: 使用随机算法的故障检测。组的每个成员发送 heartbeat,并维护其他成员的 heartbeat 计数
FLOW_CONTROL:流控制实现,限制了消息收据之间发送的消息的最大数量
FLUSH:以一致的方式跨所有成员清除所有的消息。通常是在视图更改之前执行
FRAG:消息分段和重新装配(Message fragmentation and reassembly)。确保较大的消息在沿着栈往下发送之前被分为 
FRAG_SIZE:大小的段。分段的消息在沿着栈往上发送之前在接收方被重新装配
GMS:组管理服务。基于虚拟同步模型管理组成员关系
MERGEMERGE2:合并分离的子组。当网络由于故障而分离成多个部分时就形成了子组
NACKACK:实现可靠的传输。基于消息序列号请求丢失消息的重新传输。确保从每个起始通道发出的消息的正确排序JMS将 Java Message Service 用于传输。与任何 JMS 实现协同工作
STATE_TRANSFER: 实现状态传输协议,使得新成员可以从同等物(coordinator)或所有成员获得现有的状态。需要 FLUSH 微协议在协议栈上
UNICAST:实现可靠的单播传输。请求丢失消息的重新传输,并确保发出消息的正确排序
VIEW_ENFORCER:直到接收到第一个 
VIEW_CHANGE: 才丢弃消息。客户端只有在成为组成员后才需要处理消息
STABLE:实现分布式的垃圾收集协议 (也就是说,删除所有已被所有组成员接收到的消息)
VERIFY_SUSPECT:发送消息以确保以前怀疑的成员已真正崩溃(crashed)
UDP:一般用作组消息传输的最低层。IP 多播用于组广播,而 UDP 用于点到点通信PING:用于引导成员关系管理。使用 IP 多播 "ping" 消息来定位成员,然后再请求这      
些成员加入组中
    2)基于随机广播的微协议
pbcast.GMS:组管理服务,基于随机广播 (杂谈)。不需要 FLUSH
pbcast.FD:基于杂谈(gossip) 的被动故障检测。不发送 heartbeat 消息
pbcast.PBCAST:实现随机广播,有规律地以杂谈的方式发送到成员关系的一个随机子集
pbcast.STABLE:实现分布式的垃圾收集协议 (也就是说,删除所有已被所有组成员接收到的消息)
pbcast.NAKACK:对丢失消息的重新传输和消息的顺序传送的否认实现pbcast.STATE_TRANSFER:将随机广播用于状态传输实现。不需要 QUEUE
    3)穿越 WAN 和防火墙的微协议
TCP :用于取代 UDP 作为最低层传输。通过 TCP 连接发送多个单播消息到成员,而不是发送多播消息 (不可能)。已经内置了可靠性、FIFO 排序和流控制
TCPPING:使用一组已知的成员来引导通过 TCP 的成员关系管理
TCPGOSSIP:使用一个外部杂谈 (参见 参考资料) 服务器,来为成员关系管理的引导定位成员的初始集合 
TUNNEL: 当用于代替 UDP 或 TCP 作为最低层传输协议时,启用通过防火墙的隧道技术。与防火墙外的一个 JavaGroups Router 进程协同工作
    JavaGroups中所有的协议都在org.jgroups.protocol包及其自包中定义,各协议详细的定义和实现可参见源代码。
    通道类似于Socket,用户通过通道与底层的交互,使用编程更加方便。通道相当于一个组的句柄,对用户而言,组通信最核心的功能是加入组、退出组、发送消息、接收消息,都通过通道来达到这个目的,即org.jgroups.Channel的实现。值得注意的是,JavaGroups中Channel仅是一个抽象类,可以使用不同的通道实现,见下图:    在这里我们仅探讨最常用的org.jgroups.Jchannel实现。
    1)指定协议栈并创建通道,并连接到一个组
[code]
String props = "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;"
   + "mcast_send_buf_size=64000;mcast_recv_buf_size=64000):"
   + "PING(timeout=2000;num_initial_members=3):"
   + "MERGE2(min_interval=5000;max_interval=10000):"
   + "FD_SOCK:"
   + "VERIFY_SUSPECT(timeout=1500):"
   + "pbcast.NAKACK(max_xmit_size=8096;gc_lag=50;retransmit_timeout=600,1200,2400,4800):"
   + "UNICAST(timeout=600,1200,2400,4800):"
   + "pbcast.STABLE(desired_avg_gossip=20000):"
   + "FRAG(frag_size=8096;down_thread=false;up_thread=false):" 
+ "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
   + "shun=false;print_local_addr=true):" + "STATE_TRANSFER:QUEUE";
JChannel channel = new JChannel(props);
channel.connect(“groupname”);
[/code]
     2)发送一条消息
[code]
channel.send(new Message(null, null, “Hello, JavaGroups”);
[/code]
     3)接受一条消息
[code]
Object obj = channel.receive(0); 
[/code]
     4)退出组
[code]
channel.close();
[/code]
     可以通过设置通道的一些选项来定义通道的行为
     public void setOpt(int option, Object value)
     目前有效的值分别是
     1)Channel.BLOCK:是否接受阻塞信息,默认为Boolean.TRUE
     2)Channel.LOCAL:消息广播时,消息发送者是否也会接受到该消息,默认为Boolean.TRUE
     3)Channel.AUTO_RECONNECT:通道断开时是否自动重连接,默认值为Boolean.TRUE
     4)Channel.AUTO_GETSTATE:是否自动重新获得状态信息,默认值为Boolean.TRUE
     系统以及组成员之间通过消息的方式来进行通信,消息分为如下几种(以下说明的类位于org.jgroups包下):
     1)普通消息(Message类)
     用户组成员之间的交互消息,包括消息头、消息接受者、消息发送者和消息体
     2)视图(View类)
     即组成员的情况,当一个成员加入、退出或者崩溃时,系统协调者(见上,第一个组成员)向其它发送新的视图信息
     3)疑似事件(SuspectEvent类)
     当检测到一个组成员可能已经离开或者崩溃时,发送该信息。
     4)阻塞事件(BlockEvent类)
     当系统协调者在更新视图时,会发送该阻塞信息给其它组成员,让其在视图更新之后再做发送消息的操作
     5)获得状态事件(GetStateEvent类)
     6)设置状态事件(SetStateEvent类)
     应用程序可能需要保持一些状态,并在各个成员之间同步,可以通过这两个事件知晓状态的变化
     就此,我们已经对JavaGroups的架构和组件有一个基本的了解,下部分将通过例子进一步介绍JavaGroups的使用

解决方案 »

  1.   

    我的BLOG:http://ayufox.blogcn.com/
      

  2.   

    [code]
    String props = "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;"
       + "mcast_send_buf_size=64000;mcast_recv_buf_size=64000):"
       + "PING(timeout=2000;num_initial_members=3):"
       + "MERGE2(min_interval=5000;max_interval=10000):"
       + "FD_SOCK:"
       + "VERIFY_SUSPECT(timeout=1500):"
       + "pbcast.NAKACK(max_xmit_size=8096;gc_lag=50;retransmit_timeout=600,1200,2400,4800):"
       + "UNICAST(timeout=600,1200,2400,4800):"
       + "pbcast.STABLE(desired_avg_gossip=20000):"
       + "FRAG(frag_size=8096;down_thread=false;up_thread=false):" 
    + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
       + "shun=false;print_local_addr=true):" + "STATE_TRANSFER:QUEUE";
    JChannel channel = new JChannel(props);
    channel.connect(“groupname”);
    [/code]
    老兄对于这个里面的mcast_addr=228.8.8.8;mcast_port=45566 我不明白是什么意思?