小弟从J2EE 转过来做通信,有很多不懂的地方。公司分派第一个任务是 解析协议。就是从一条二进制的数据中根据固定的格式获取需要的内容,然后存到数据库中。存到数据库中到不是很难。可是在解析数据包这块就不太懂了。有没有哪个哥哥能讲讲网络传输的过程,最好细化到调用哪个类中的哪个方法。还有 这些2进制的数据 是怎么获得到的。为了方便你们解答,以下我提供了些源码。
import java.nio.channels.*;
import java.nio.charset.*;
import java.nio.*;import java.util.*;
import java.io.*;
import java.net.*;/*
* 需增加的内容:心跳检测,超过一定时间没收到数据,则关闭连接
*/
public class StandTcpListener implements Runnable {
String host = ""; //服务器地址
int port = 5000; //连接端口
InetAddress lstenAddr;
Selector selector = null; //对象多路复用选择器
ServerSocketChannel serverChannel = null; //socket监听通道
ByteBuffer byteBuffer;
ByteBuffer recvBuffer;
//Tcp监听方法 参数 (主机地址 端口号)
public StandTcpListener(String host, int port) {
this.host = host;
this.port = port;
byteBuffer = ByteBuffer.allocate(4096*2); //分配一个字符缓冲区
recvBuffer = ByteBuffer.allocate(4096);
initialize();
}
public String ServiceName(){
return "Tcp服务";
}
boolean Started = false;
public void Start()
{
if (!Started){
Thread th = new Thread(this);
th.start();
Started = true;
}
}
public void initialize() {
//初始化,分别实例化一个选择器,一个服务器端可选择通道
try {
this.selector = Selector.open(); //分配一个选择器
this.serverChannel = ServerSocketChannel.open(); //服务器端可选择通道
this.serverChannel.configureBlocking(false);
InetAddress localhost = null; //IP地址
if (host.length() > 0)
localhost = InetAddress.getByName(host); //根据主机地址获取IP
else
localhost = MyUtil.GetLocalAddress();//InetAddress.getLocalHost() ==>MyUtil
InetSocketAddress isa = new InetSocketAddress(localhost, this.port);
this.serverChannel.socket().bind(isa); //将该套接字绑定到服务器某一可用端口
lstenAddr = localhost;
}
catch (IOException ex) {
Logger.LogError(ex);
}
}
public void run() {
Logger.LogMessage(ServiceName() + "启动,IP:" + lstenAddr.getHostAddress() + ",TcpPort:" + port);
try {
this.serverChannel.register(this.selector,SelectionKey.OP_ACCEPT);
while (true)
{
if (selector.select() > 0) {
try {
Set<SelectionKey> readyKeys = this.selector.selectedKeys();
Iterator<SelectionKey> i = readyKeys.iterator();
while (i.hasNext()) {
SelectionKey key = (SelectionKey) i.next();
i.remove();
if (key.isAcceptable()) {
ServerSocketChannel nextReady = (ServerSocketChannel) key.channel();
SocketChannel sc = (SocketChannel) nextReady.accept();
Socket s = sc.socket();
s.getChannel().configureBlocking(false);
//SelectionKey readWriteKey =s.getChannel().register(this.selector,SelectionKey.OP_READ |SelectionKey.OP_WRITE);
SelectionKey readWriteKey = s.getChannel().register(this.selector,SelectionKey.OP_READ);
Logger.LogMessage(ServiceName() + "--新的连接建立");
}
else if (key.isReadable()) {
readMessage(key);
}
else if (key.isWritable()) { //如果是通道写准备好事件
System.out.println("writeable");
}
}
}
catch (Exception ex){
Logger.LogError(ex);
}
}
}
}
catch (ClosedChannelException ex) {
Logger.LogError(ex);
}
catch (IOException ex) {
Logger.LogError(ex);
}
} protected void CloseClient(SocketChannel cn){
try {
OnClosed(cn);
Socket s = cn.socket();
s.close();
cn.close();
}
catch (Throwable ex){
Logger.LogError(ex);
}
}
public void readMessage(SelectionKey key)
{
SocketChannel channel = (SocketChannel)key.channel();
try
{
byte[] pkgEnd = GetPackageEnd();
if (pkgEnd == null){
byteBuffer.clear();
int nbytes = channel.read(byteBuffer);
if (nbytes <= 0)
{
CloseClient(channel);
return;
}
OnDataReceived(channel, byteBuffer.array(), nbytes);
}
else{
recvBuffer.clear();
int nbytes = channel.read(recvBuffer);
if (nbytes <= 0)
{
CloseClient(channel);
return;
}
recvBuffer.limit(nbytes);
if (byteBuffer.position() + recvBuffer.limit() > byteBuffer.capacity())
{
byteBuffer.clear();
}
recvBuffer.flip();
byteBuffer.put(recvBuffer);
int idx = MyUtil.ByteRIndexOf(byteBuffer.array(), byteBuffer.position(), pkgEnd);
if (idx >= 0){
int AllLen = byteBuffer.position();
int ActiveLen = idx + pkgEnd.length;
OnDataReceived(channel, byteBuffer.array(), ActiveLen);
byteBuffer.position(ActiveLen);
byteBuffer.limit(AllLen);
byteBuffer.compact();
}
}
/*
Charset charset = Charset.forName("us-ascii");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode(buf);
result = charBuffer.toString();*/
}
catch(Throwable e)
{
Logger.LogError(e);
CloseClient((SocketChannel)key.channel());
}
}
public boolean SendData(SocketChannel socket, byte[] btsSent, int DataLen){
try
{
return socket.write(ByteBuffer.wrap(btsSent, 0, DataLen)) == DataLen;
}
catch (Exception ex){
Logger.LogError(ex);
}
return false;
}
public byte[] GetPackageEnd(){
return null;
}
public void OnDataReceived(SocketChannel Sender, byte[] btes, int DataLen){
}
public void OnClosed(SocketChannel Sender){
}
//结束时释放资源 throws IOException
public void finalize() {
try {
this.serverChannel.close();
this.selector.close();
}
catch (Throwable ex) {
Logger.LogError(ex);
}
}
}
import java.net.*;public class UdpMulticast {
MulticastSocket s = null;
InetAddress group = null;
String groupAddr = "";
int port = 9150;
boolean Created = false;
RecevThread thread = null;
public UdpMulticast(String GroupAddr, int Port)
{
groupAddr = GroupAddr;
port = Port;
CreateSocket();
}
private boolean CreateSocket()
{
Created = false;
try {
group = InetAddress.getByName(groupAddr);
} catch (UnknownHostException e1) {
Logger.LogError("组播地址绑定失败");
} try {
s = new MulticastSocket(port);
s.setLoopbackMode(true);
} catch (Exception e1) {
Logger.LogError("组播地址创建失败");
Logger.LogError(e1);
} try {
s.joinGroup(group);
Created = true;
} catch (Exception e1) {
Logger.LogError("组播地址加入失败");
Logger.LogError(e1);
}
if (Created)
{
thread = new RecevThread(s);
thread.start();
}
return Created;
}
public boolean SendMsg(String str)
{
if (!Created)
Created = CreateSocket();
if (!Created)
return false;
try
{
byte[] bts = str.getBytes();
DatagramPacket data = new DatagramPacket(bts, bts.length,group, port);
s.send(data);
return true;
}
catch (Exception ex)
{
Logger.LogError(ex);
}
return false;
}
protected void OnRecv(String message)
{
}
class RecevThread extends Thread
{
MulticastSocket s=null; public RecevThread(MulticastSocket s)
{
this.s=s;
}; public void run()
{
byte[] buf = new byte[2048];
DatagramPacket recv = new DatagramPacket(buf, buf.length);
try
{
while(true)
{
s.receive(recv);
String str=new String(recv.getData(), 0, recv.getLength());
OnRecv(str);
}
}
catch (Exception e1)
{
Logger.LogError("组播接受失败");
Logger.LogError(e1);
}
}
}
}
import java.nio.channels.*;
import java.nio.charset.*;
import java.nio.*;import java.util.*;
import java.io.*;
import java.net.*;/*
* 需增加的内容:心跳检测,超过一定时间没收到数据,则关闭连接
*/
public class StandTcpListener implements Runnable {
String host = ""; //服务器地址
int port = 5000; //连接端口
InetAddress lstenAddr;
Selector selector = null; //对象多路复用选择器
ServerSocketChannel serverChannel = null; //socket监听通道
ByteBuffer byteBuffer;
ByteBuffer recvBuffer;
//Tcp监听方法 参数 (主机地址 端口号)
public StandTcpListener(String host, int port) {
this.host = host;
this.port = port;
byteBuffer = ByteBuffer.allocate(4096*2); //分配一个字符缓冲区
recvBuffer = ByteBuffer.allocate(4096);
initialize();
}
public String ServiceName(){
return "Tcp服务";
}
boolean Started = false;
public void Start()
{
if (!Started){
Thread th = new Thread(this);
th.start();
Started = true;
}
}
public void initialize() {
//初始化,分别实例化一个选择器,一个服务器端可选择通道
try {
this.selector = Selector.open(); //分配一个选择器
this.serverChannel = ServerSocketChannel.open(); //服务器端可选择通道
this.serverChannel.configureBlocking(false);
InetAddress localhost = null; //IP地址
if (host.length() > 0)
localhost = InetAddress.getByName(host); //根据主机地址获取IP
else
localhost = MyUtil.GetLocalAddress();//InetAddress.getLocalHost() ==>MyUtil
InetSocketAddress isa = new InetSocketAddress(localhost, this.port);
this.serverChannel.socket().bind(isa); //将该套接字绑定到服务器某一可用端口
lstenAddr = localhost;
}
catch (IOException ex) {
Logger.LogError(ex);
}
}
public void run() {
Logger.LogMessage(ServiceName() + "启动,IP:" + lstenAddr.getHostAddress() + ",TcpPort:" + port);
try {
this.serverChannel.register(this.selector,SelectionKey.OP_ACCEPT);
while (true)
{
if (selector.select() > 0) {
try {
Set<SelectionKey> readyKeys = this.selector.selectedKeys();
Iterator<SelectionKey> i = readyKeys.iterator();
while (i.hasNext()) {
SelectionKey key = (SelectionKey) i.next();
i.remove();
if (key.isAcceptable()) {
ServerSocketChannel nextReady = (ServerSocketChannel) key.channel();
SocketChannel sc = (SocketChannel) nextReady.accept();
Socket s = sc.socket();
s.getChannel().configureBlocking(false);
//SelectionKey readWriteKey =s.getChannel().register(this.selector,SelectionKey.OP_READ |SelectionKey.OP_WRITE);
SelectionKey readWriteKey = s.getChannel().register(this.selector,SelectionKey.OP_READ);
Logger.LogMessage(ServiceName() + "--新的连接建立");
}
else if (key.isReadable()) {
readMessage(key);
}
else if (key.isWritable()) { //如果是通道写准备好事件
System.out.println("writeable");
}
}
}
catch (Exception ex){
Logger.LogError(ex);
}
}
}
}
catch (ClosedChannelException ex) {
Logger.LogError(ex);
}
catch (IOException ex) {
Logger.LogError(ex);
}
} protected void CloseClient(SocketChannel cn){
try {
OnClosed(cn);
Socket s = cn.socket();
s.close();
cn.close();
}
catch (Throwable ex){
Logger.LogError(ex);
}
}
public void readMessage(SelectionKey key)
{
SocketChannel channel = (SocketChannel)key.channel();
try
{
byte[] pkgEnd = GetPackageEnd();
if (pkgEnd == null){
byteBuffer.clear();
int nbytes = channel.read(byteBuffer);
if (nbytes <= 0)
{
CloseClient(channel);
return;
}
OnDataReceived(channel, byteBuffer.array(), nbytes);
}
else{
recvBuffer.clear();
int nbytes = channel.read(recvBuffer);
if (nbytes <= 0)
{
CloseClient(channel);
return;
}
recvBuffer.limit(nbytes);
if (byteBuffer.position() + recvBuffer.limit() > byteBuffer.capacity())
{
byteBuffer.clear();
}
recvBuffer.flip();
byteBuffer.put(recvBuffer);
int idx = MyUtil.ByteRIndexOf(byteBuffer.array(), byteBuffer.position(), pkgEnd);
if (idx >= 0){
int AllLen = byteBuffer.position();
int ActiveLen = idx + pkgEnd.length;
OnDataReceived(channel, byteBuffer.array(), ActiveLen);
byteBuffer.position(ActiveLen);
byteBuffer.limit(AllLen);
byteBuffer.compact();
}
}
/*
Charset charset = Charset.forName("us-ascii");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode(buf);
result = charBuffer.toString();*/
}
catch(Throwable e)
{
Logger.LogError(e);
CloseClient((SocketChannel)key.channel());
}
}
public boolean SendData(SocketChannel socket, byte[] btsSent, int DataLen){
try
{
return socket.write(ByteBuffer.wrap(btsSent, 0, DataLen)) == DataLen;
}
catch (Exception ex){
Logger.LogError(ex);
}
return false;
}
public byte[] GetPackageEnd(){
return null;
}
public void OnDataReceived(SocketChannel Sender, byte[] btes, int DataLen){
}
public void OnClosed(SocketChannel Sender){
}
//结束时释放资源 throws IOException
public void finalize() {
try {
this.serverChannel.close();
this.selector.close();
}
catch (Throwable ex) {
Logger.LogError(ex);
}
}
}
import java.net.*;public class UdpMulticast {
MulticastSocket s = null;
InetAddress group = null;
String groupAddr = "";
int port = 9150;
boolean Created = false;
RecevThread thread = null;
public UdpMulticast(String GroupAddr, int Port)
{
groupAddr = GroupAddr;
port = Port;
CreateSocket();
}
private boolean CreateSocket()
{
Created = false;
try {
group = InetAddress.getByName(groupAddr);
} catch (UnknownHostException e1) {
Logger.LogError("组播地址绑定失败");
} try {
s = new MulticastSocket(port);
s.setLoopbackMode(true);
} catch (Exception e1) {
Logger.LogError("组播地址创建失败");
Logger.LogError(e1);
} try {
s.joinGroup(group);
Created = true;
} catch (Exception e1) {
Logger.LogError("组播地址加入失败");
Logger.LogError(e1);
}
if (Created)
{
thread = new RecevThread(s);
thread.start();
}
return Created;
}
public boolean SendMsg(String str)
{
if (!Created)
Created = CreateSocket();
if (!Created)
return false;
try
{
byte[] bts = str.getBytes();
DatagramPacket data = new DatagramPacket(bts, bts.length,group, port);
s.send(data);
return true;
}
catch (Exception ex)
{
Logger.LogError(ex);
}
return false;
}
protected void OnRecv(String message)
{
}
class RecevThread extends Thread
{
MulticastSocket s=null; public RecevThread(MulticastSocket s)
{
this.s=s;
}; public void run()
{
byte[] buf = new byte[2048];
DatagramPacket recv = new DatagramPacket(buf, buf.length);
try
{
while(true)
{
s.receive(recv);
String str=new String(recv.getData(), 0, recv.getLength());
OnRecv(str);
}
}
catch (Exception e1)
{
Logger.LogError("组播接受失败");
Logger.LogError(e1);
}
}
}
}
解决方案 »
免费领取超大流量手机卡,每月29元包185G流量+100分钟通话, 中国电信官方发货