问题描述:
      希望通过管理后台控制客户端的行为,比如:升级、断电等。通讯框架采用Netty,现在遇见的问题是 服务器端(即管理后台)怎么向客户端发送消息(网上大部分说缓存客户端的Channel),但是项目的后台、前台、以及Netty Server都是分离的。 这个时候该怎么做呢?请大家给点意见!

解决方案 »

  1.   

    1.普通字符串消息传递服务器端:public void startServer(int port) throws InterruptedException{
    EventLoopGroup work=new NioEventLoopGroup();
    EventLoopGroup boss=new NioEventLoopGroup();
    ServerBootstrap bootStrap=new ServerBootstrap();
    bootStrap.group(boss, work);
    bootStrap.channel(NioServerSocketChannel.class);
    bootStrap.option(ChannelOption.SO_BACKLOG,1024);
    bootStrap.childHandler(new ChannelInitializer<SocketChannel>(){
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline=ch.pipeline();
    pipeline.addLast(new StringDecoder());
    pipeline.addLast(new StringEncoder());
    pipeline.addLast(new ServerHandler());
    }
    });
    //绑定端口,同步等待成功.
    ChannelFuture future=bootStrap.bind(port).sync();
    //等待服务端监听端口关闭
    future.channel().closeFuture().sync();boss.shutdownGracefully();
    work.shutdownGracefully();}public class ServerHandler extends ChannelHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
    throws Exception {
    System.out.println("msg:"+msg);
    /*ByteBuf buf=(ByteBuf)msg;
    byte[] req=new byte[buf.readableBytes()];
    buf.readBytes(req);
    String body=new String(req,"UTF-8");
    System.out.println("body:"+body);
    ctx.pipeline().writeAndFlush(buf);*///手动解析收到的数据
    ctx.pipeline().writeAndFlush("hi......");}}客户端:public void startClient(String host,int port) throws InterruptedException{
    EventLoopGroup group=new NioEventLoopGroup();
    Bootstrap bootStrap=new Bootstrap();
    bootStrap.group(group);
    bootStrap.channel(NioSocketChannel.class);
    bootStrap.option(ChannelOption.SO_KEEPALIVE,true);
    bootStrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel sc) throws Exception {
    ChannelPipeline pipeline=sc.pipeline();
    pipeline.addLast(new StringDecoder());
    pipeline.addLast(new StringEncoder());
    pipeline.addLast(new ClientHandler());
    }
    });
    //绑定端口,同步等待成功.
    ChannelFuture future=bootStrap.connect(host,port).sync();
    future.channel().writeAndFlush("hello");
    //等待服务端监听端口关闭
    future.channel().closeFuture().sync();
    group.shutdownGracefully();}public class ClientHandler extends ChannelHandlerAdapter{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
    throws Exception {
    System.out.println("msg:"+msg);
    }}2.对象类型消息传递(传递对象直接实现Serializable)服务器端(与普通字符串不同的部分标红):public void startServer(int port) throws InterruptedException{
    EventLoopGroup work=new NioEventLoopGroup();
    EventLoopGroup boss=new NioEventLoopGroup();
    ServerBootstrap bootStrap=new ServerBootstrap();
    bootStrap.group(boss, work);
    bootStrap.channel(NioServerSocketChannel.class);
    bootStrap.option(ChannelOption.SO_BACKLOG,1024);
    bootStrap.childHandler(new ChannelInitializer<SocketChannel>(){
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline=ch.pipeline();
    pipeline.addLast(new ObjectEncoder());//将String类型转化为ByteBuf类型,解析发送出去的
    pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));//将收到的ByteBuf转化为String类型,解析收到的
    pipeline.addLast(new ServerHandler());
    }
    });
    //绑定端口,同步等待成功.
    ChannelFuture future=bootStrap.bind(port).sync();
    //等待服务端监听端口关闭
    future.channel().closeFuture().sync();
    boss.shutdownGracefully();
    work.shutdownGracefully();}ServerHandler处理器代码一样。客户端:public void startClient(String host,int port) throws InterruptedException{
    EventLoopGroup group=new NioEventLoopGroup();
    Bootstrap bootStrap=new Bootstrap();
    bootStrap.group(group);
    bootStrap.channel(NioSocketChannel.class);
    bootStrap.option(ChannelOption.SO_KEEPALIVE,true);
    bootStrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel sc) throws Exception {
    ChannelPipeline pipeline=sc.pipeline();
    pipeline.addLast(new ObjectEncoder());//将String类型转化为ByteBuf类型,解析发送出去的
    pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));//将收到的ByteBuf转化为String类型,解析收到的
    pipeline.addLast(new ClientHandler());
    }
    });
    //绑定端口,同步等待成功.
    ChannelFuture future=bootStrap.connect(host,port).sync();
    Student student=new Student("user","psw");
    future.channel().writeAndFlush(student);
    //等待服务端监听端口关闭
    future.channel().closeFuture().sync();
    group.shutdownGracefully();}ClientHandler处理器代码一样。对象Student(必须实现Serializable接口):public class Student implements Serializable{
    private String name;
    private String psw;
    public Student(String name,String psw){
    this.name=name;
    this.psw=psw;
    }
    public String getName() {
    return name;
    }
    public void setName(String name) {
    this.name = name;
    }
    public String getPsw() {
    return psw;
    }
    public void setPsw(String psw) {
    this.psw = psw;
    }
    @Override
    public String toString() {
    return "[name:"+name+";password:"+psw+"]";
    }}2.对象类型消息传递(protostuff实现序列化)服务端:bootStrap.childHandler(new ChannelInitializer<SocketChannel>(){
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline=ch.pipeline();
    pipeline.addLast(new MyDecoder(Student.class));
    pipeline.addLast(new MyEncoder(Student.class));
    pipeline.addLast(new ServerHandler());}客户端:bootStrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel sc) throws Exception {
    ChannelPipeline pipeline=sc.pipeline();
    pipeline.addLast(new MyDecoder(Student.class));
    pipeline.addLast(new MyEncoder(Student.class));pipeline.addLast(new ClientHandler());传递对象:public class Student{
    private String name;private String psw;解码器:public class MyDecoder extends ByteToMessageDecoder{
        private Class<?> genericClass;
        public MyDecoder(Class<?> genericClass) {
            this.genericClass = genericClass;
        }
        @Override
        public final void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
            int dataLength = buf.readInt();//这边读取的第一个是长度字段
            byte[] data = new byte[dataLength];
            buf.readBytes(data);//将in中的数据写入data
            Object obj = ProtoStuffUtil.deserialize(data, genericClass);
            out.add(obj);
        }}编码器:public class MyEncoder extends MessageToByteEncoder{
        private Class<?> genericClass;
        public MyEncoder(Class<?> genericClass) {
            this.genericClass = genericClass;
        }
        @Override
        public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
            if (genericClass.isInstance(in)) {
                byte[] data = ProtoStuffUtil.serialize(in);
                out.writeInt(data.length);//这边设置的第一个是长度字段
                out.writeBytes(data);
            }
        }}序列化工具类:public class ProtoStuffUtil {
        private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();
        //对象转字节数组序列化
        public static <T> byte[] serialize(T obj) {
            Class<T> cls = (Class<T>) obj.getClass();//获取对象的类
            LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);//使用LinkedBuffer分配一块默认大小的buffer空间
            try {
                Schema<T> schema = getSchema(cls);//通过对象的类构建对象的schema
                return ProtostuffIOUtil.toByteArray(obj, schema, buffer);//使用给定的schema将对象序列化为一个byte数组,并返回
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            } finally {
                buffer.clear();
            }
        }
        //字节数组转对象反序列化
        public static <T> T deserialize(byte[] data, Class<T> cls) {
            try {
                T message = cls.newInstance();
                Schema<T> schema = getSchema(cls);
                ProtostuffIOUtil.mergeFrom(data, message, schema);//使用给定的schema将byte数组和对象合并
                return message;
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
        private static <T> Schema<T> getSchema(Class<T> cls) {
            Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
            if (schema == null) {
                schema = RuntimeSchema.createFrom(cls);
                if (schema != null) {
                    cachedSchema.put(cls, schema);
                }
            }
            return schema;
        }
    }
      

  2.   

    netty server肯定持有和前台(客户端)连接的所有channel。所以你的后台要不搭在netty server上,要不然也作为一个特殊的客户端链接到netty server,netty server接到此客户端的消息后,转发(广播)给所有的客户端。