我用 java写的 akka zeromq 看不到效果,pubSocket发送的message subSocket 不能够收到,请高手解答一下。
代码如下package com.hantek.akka.zeromq;import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.text.SimpleDateFormat;
import java.util.Date;import scala.Serializable;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.serialization.Serialization;
import akka.serialization.SerializationExtension;
import akka.util.Duration;
import akka.zeromq.Bind;
import akka.zeromq.Connect;
import akka.zeromq.Frame;
import akka.zeromq.Listener;
import akka.zeromq.Subscribe;
import akka.zeromq.ZMQMessage;
import akka.zeromq.ZeroMQ;
import akka.zeromq.ZeroMQExtension;public class TestZeroMQ1 {

public static final Object TICK = "TICK"; public static class Heap
implements Serializable {
/**
 * 
 */
private static final long serialVersionUID = 2807062186812700532L; public final long timestamp; public final long used; public final long max; public Heap(long timestamp, long used, long max){
this.timestamp = timestamp;
this.used = used;
this.max = max;
}
} public static class Load
implements Serializable {
/**
 * 
 */
private static final long serialVersionUID = 3321800432516503175L; public final long timestamp; public final double loadAverage; public Load(long timestamp, double loadAverage) {
this.timestamp = timestamp;
this.loadAverage = loadAverage;
}
} public static class HealthProbe
extends UntypedActor { ActorRef pubSocket = ZeroMQExtension.get(getContext().system()).newPubSocket(new Bind("tcp://127.0.0.1:1237")); MemoryMXBean memory = ManagementFactory.getMemoryMXBean(); OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); Serialization ser = SerializationExtension.get(getContext().system());
int i=0;
@Override
public void preStart() {
System.out.println("_____pubSocket____开始");
getContext().system().scheduler().schedule(Duration.parse("1 second"),
Duration.parse("1 second"), getSelf(), TICK);
} @Override
public void postRestart(Throwable reason) {
// don't call preStart, only schedule once
} @Override
public void onReceive(Object message) {
if (message.equals(TICK)) {
i++;
if(i==10){
System.exit(1);
}
MemoryUsage currentHeap = memory.getHeapMemoryUsage();
long timestamp = System.currentTimeMillis(); byte[] heapPayload = ser.serializerFor(Heap.class).toBinary(
new Heap(timestamp, currentHeap.getUsed(), currentHeap.getMax()));
pubSocket.tell(new ZMQMessage(new Frame("health.heap"),new Frame(heapPayload)));
byte[] loadPayload = ser.serializerFor(Load.class).toBinary(
new Load(timestamp, os.getSystemLoadAverage()));
pubSocket.tell(new ZMQMessage(new Frame("health.load"),new Frame(loadPayload)));
//
System.out.println((i)+"_____pubSocket__TIKE__onReceive :"+message);
}
else {
unhandled(message);
System.out.println("_____pubSocket__UNHAND :"+message);
}
}
} public static class Logger
extends UntypedActor { ActorRef subSocket = ZeroMQExtension.get(getContext().system())
.newSubSocket(new Connect("tcp://127.0.0.1:1237"),
new Listener(getSelf()), new Subscribe("health")); Serialization ser = SerializationExtension.get(getContext().system()); SimpleDateFormat timestampFormat = new SimpleDateFormat("HH:mm:ss.SSS");

LoggingAdapter log = Logging.getLogger(getContext().system(), this); @Override
public void onReceive(Object message) {
//************************************
if(message == ZeroMQ.connecting()){
System.out.println("&&&&&_Logger_Connecting =:"+message);
}
else if (message instanceof ZMQMessage) {
ZMQMessage m = (ZMQMessage) message;
if (m.firstFrameAsString().equals("health.heap")) {
Heap heap = (Heap) ser.serializerFor(Heap.class)
.fromBinary(m.payload(1));
log.info("Used heap {} bytes, at {}", heap.used,
timestampFormat.format(new Date(heap.timestamp)));
//************************************
System.out.println("***_Logger_****_heap :"+heap);
}
else if (m.firstFrameAsString().equals("health.load")) {
Load load = (Load) ser.serializerFor(Load.class)
.fromBinary(m.payload(1));
log.info("Load average {}, at {}", load.loadAverage,
timestampFormat.format(new Date(load.timestamp)));
//************************************
System.out.println("***Logger****load =:"+load);
}
}
else {
unhandled(message);
System.out.println("***_Logger_****_unhandled =:"+message);
}
}
} public static class HeapAlerter
extends UntypedActor {
ActorRef subSocket = ZeroMQExtension.get(getContext().system())
.newSubSocket(new Connect("tcp://127.0.0.1:1237"),
new Listener(getSelf()), new Subscribe("health.heap")); Serialization ser = SerializationExtension.get(getContext().system()); LoggingAdapter log = Logging.getLogger(getContext().system(), this); int count = 0; @Override
public void onReceive(Object message) {
if(message == ZeroMQ.connecting() ){
System.out.println("&&&&&_Alerter_Connecting =:"+message);
}
else if (message instanceof ZMQMessage) {
System.out.println("&&&&&_Alerter_onReceive =:"+message);
ZMQMessage m = (ZMQMessage) message;
if (m.firstFrameAsString().equals("health.heap")) {
Heap heap = (Heap) ser.serializerFor(Heap.class).fromBinary(m.payload(1));
if (((double) heap.used / heap.max) > 0.9) {
count += 1;
}
else {
count = 0;
}
if (count > 10) {
log.warning("Need more memory, using {} %",
(100.0 * heap.used / heap.max));
}
}
System.out.println("&&&&&_Alerter_onReceive_&&&&&& =:"+message);
}
else {
System.out.println("&&&&&_Alerter_unhandled_&&&&&& =:"+message);
unhandled(message);
}
}
} public static void main(String[] args) {
ActorSystem system = ActorSystem.create("system");
system.actorOf(new Props(HealthProbe.class), "health");
system.actorOf(new Props(Logger.class), "logger");
system.actorOf(new Props(HeapAlerter.class), "alerter");
}
}