先上代码:
服务端:
public class Server { static int port = 8787; public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(new ProtobufDecoder(CallMessage.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufServerHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true); // 服务器绑定端口监听
ChannelFuture f = b.bind(port).sync();
// 监听服务器关闭监听
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
@Sharable
public class ProtobufServerHandler extends ChannelInboundHandlerAdapter {
int i = 0; @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof CallMessage) {
CallMessage message = (CallMessage) msg;
System.out.println(message.getContent());
i++;
}
} @Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
System.out.println("断开连接->" + i);
}}客户端:
public class Client {
static int port = 8787;
static String host = "127.0.0.1"; public static void main(String[] args) throws InterruptedException, IOException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 编码
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast("protobufEncoder", new ProtobufEncoder());
}
}); // 连接服务端
Channel ch = b.connect(host, port).sync().channel(); long start = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
Message m = Message.newBuilder().setContent("random" + RandomUtils.nextInt(1, 10000)).build();
ch.writeAndFlush(m);
}
long end = System.currentTimeMillis();
System.out.println(end - start + "ms");
} finally {
group.shutdownGracefully();
}
}
}现在是从Client向Server发送数据时,打印出来就接收到64条数据.
如果在发送时停一下,加个Thread.sleep(1); 就能全部接收到了.
不明白这是为什么,如果不加sleep想要接收全部数据 应该怎么做.
谢谢各位