现在在做一个应用
流程如下:
一个程序向本机的3306用udp传输数据→我的程序监听本机的3306端口→解析3306端口的数据(每一条解析出来都需要调用存储过程,这个存储过程1300多行代码 执行一次花费的时间0.3秒左右)问题:别人的程序向这个3306端口传输数据的量是 50--100条/秒我用单线程的时候 每秒钟只能处理有限的数据 多余的数据就缓存了起来 每秒钟累计起来 到一定时间就会累计非常多 导致这些数据等待的时间超过了6分钟就自动丢弃了 也就是丢包了于是我采用了多线程监听3306这个端口的数据 问题出在 当一条数据进入3306端口的时候 我的线程会出现有两个同时去处理这条数据的情况 时有时无 不知道怎么让这些线程处理的数据不要重复呢? 各位大大们 小弟很无奈啊..单线程的时候不会出现重复的数据 也就是一条数据处理两次 多线程的时候会出现一条数据多个线程同时处理不知道是不是小弟的多线程写法是不是有问题...
求教..下面帖出 单线程和多线程的代码单线程的代码如下(我的单线程和多线程就是注释那里相互切换)package com.maphao.aisvoyage.ede.xsocket.server;import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.maphao.aisvoyage.ede.ServerFactory;
import com.maphao.aisvoyage.ede.XServer;
import com.maphao.aisvoyage.ede.xsocket.thread.ThreadPool;public class UDPServer implements XServer, Runnable { private static final Log logger = LogFactory.getLog(UDPServer.class); private static int port = 3344; private static int size = 2048;
private boolean isStart = true;

private static Thread runner = null; private static UDPServer server = null; private static DatagramSocket datagramSocket = null; private static DatagramPacket datagramPacket = null;

// private static Thread[] threads = new Thread[10]; //10个线程 private UDPServer() { } public static UDPServer getServer() {
if (server == null) {
server = new UDPServer();
// for (int i = 0; i < threads.length; i++) {
// threads[i] = new Thread(server);
// }
runner = new Thread(server);
byte[] buffer = new byte[size];
try {
datagramSocket = new DatagramSocket(ServerFactory.udpPort);
} catch (SocketException e) {
e.printStackTrace();
}
datagramPacket = new DatagramPacket(buffer, buffer.length);
}
return server;
} @Override
public void run() {
try {
logger.debug("UDP server start on "
+ datagramSocket.getLocalAddress().getHostAddress() + ":"
+ datagramSocket.getPort());
while (isStart) {
datagramSocket.receive(datagramPacket);
UDPServerHandler handler = new UDPServerHandler();
handler.onData(datagramPacket);
}
} catch (SocketException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} } @Override
public void startServer() {
isStart = true;
runner.start();
// 创建线程,并启动发送
// for (Thread t : threads) {
// t = new Thread(server);
// t.start();
// }
} @Override
public void stopServer() {
isStart = false;
runner.interrupt();
// for (Thread t : threads) {
// t = new Thread(server);
// t.interrupt();
// }
} @Override
public void restart() { } public static void main(String[] args) {
UDPServer.getServer().startServer();
}}
多线程的代码如下package com.maphao.aisvoyage.ede.xsocket.server;import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.maphao.aisvoyage.ede.ServerFactory;
import com.maphao.aisvoyage.ede.XServer;
import com.maphao.aisvoyage.ede.xsocket.thread.ThreadPool;public class UDPServer implements XServer, Runnable { private static final Log logger = LogFactory.getLog(UDPServer.class); private static int port = 3344; private static int size = 2048; private boolean isStart = true;

// private static Thread runner = null; private static UDPServer server = null; private static DatagramSocket datagramSocket = null; private static DatagramPacket datagramPacket = null;

private static Thread[] threads = new Thread[10]; //10个线程 private UDPServer() { } public static UDPServer getServer() {
if (server == null) {
server = new UDPServer();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(server);
}
// runner = new Thread(server);
byte[] buffer = new byte[size];
try {
datagramSocket = new DatagramSocket(ServerFactory.udpPort);
} catch (SocketException e) {
e.printStackTrace();
}
datagramPacket = new DatagramPacket(buffer, buffer.length);
}
return server;
} @Override
public void run() {
try {
logger.debug("UDP server start on "
+ datagramSocket.getLocalAddress().getHostAddress() + ":"
+ datagramSocket.getPort());
while (isStart) {
datagramSocket.receive(datagramPacket);
UDPServerHandler handler = new UDPServerHandler();
handler.onData(datagramPacket);
}
} catch (SocketException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} } @Override
public void startServer() {
isStart = true;
// runner.start();
// 创建线程,并启动发送
for (Thread t : threads) {
t = new Thread(server);
t.start();
}
} @Override
public void stopServer() {
isStart = false;
// runner.interrupt();
for (Thread t : threads) {
t = new Thread(server);
t.interrupt();
}
} @Override
public void restart() { } public static void main(String[] args) {
UDPServer.getServer().startServer();
}}
在线等待...先谢谢关注的大大们..

解决方案 »

  1.   

    改成同步块试试 
    synchronized(datagramPacket){
       handler.onData(datagramPacket);
    }
      

  2.   

    不计开销的话可以把datagramPacket改为非静态的然后加同步块呢
      

  3.   


    是这样么? 这样不行呢 会丢掉很多的数据...
    @Override
    public void run() {
    try {
    logger.debug("UDP server start on "
    + datagramSocket.getLocalAddress().getHostAddress() + ":"
    + datagramSocket.getPort());
    while (isStart) {
    byte[] buffer = new byte[size];
    DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length);
    datagramSocket.receive(datagramPacket);
    UDPServerHandler handler = new UDPServerHandler();
    synchronized(datagramPacket){
    handler.onData(datagramPacket);
    }
    }
    } catch (SocketException e) {
    e.printStackTrace();
    } catch (IOException e) {
    e.printStackTrace();
    } }
      

  4.   


    我去百度了下 这个类似乎用于管理session的呢
    和我这里的需求好像关联不大 我的理解能力有限
      

  5.   


    你理解错了 
    不是多个线程同时要处理一条数据因为web服务端一个线程处理一条数据调用数据库的存储过程需要大约0.01-0.2秒左右  这个时候数据库的cpu(E5300双核)是2%左右 一直不停的处理cpu占用是 30%左右需要注意的是:单线程的时候 每秒钟只能处理10-30条左右 而实际需要处理 50-100条左右所以为了提高程序的性能 发挥服务器cpu最大优势(客服机的cpu是24核的)
    我采用多线程处理  这个时候处理的速度就能达到实时的处理 每秒钟接收到多少 就能处理多少CPU占用会因为线程的增加而升高..只是偶尔会有重复的记录 也就是有两个或者多个线程会同时处理3306这个端口接受到的数据..
      

  6.   

    现在我改成了用一个线程把所有3306监听到的数据都存到一个list里面去 
    这个集合是全局的
    同时再用多线程操作读取这个集合 读取一个删除一个 (list的size为0就不读取,有就读取)
    现在问题是多线程操作这个集合代码的run我怎么处理以后就卡死了 cpu占用100%
    成死循环了 4个线程也不会这样阿?
    没搞明白这个run运行怎么用那么多cpu
    package com.maphao.aisvoyage.ede.xsocket.server;import com.maphao.aisvoyage.ede.XServer;
    import com.maphao.aisvoyage.ede.xsocket.threadpool.ListPool;public class UDPAISProcess implements XServer, Runnable { private static UDPAISProcess server = null; private static Thread[] threads = new Thread[4];

    private static UDPServerHandler handler = null; private UDPAISProcess() {

    } public static UDPAISProcess getServer() {
    if (server == null) {
    server = new UDPAISProcess();
    handler = new UDPServerHandler();
    }
    return server;
    } @Override
    public void run() {
    try {
    while (true) {
    if(ListPool.dplist.size() > 0){
    handler.onData(ListPool.dplist.get(0));
    ListPool.dplist.remove(0);
    }
    }
    } catch (Exception e) {
    e.printStackTrace();
    } } @Override
    public void startServer() {
    for (Thread t : threads) {
    t = new Thread(server);
    t.start();
    }
    } @Override
    public void stopServer() {
    for (Thread t : threads) {
    t = new Thread(server);
    t.interrupt();
    }
    } @Override
    public void restart() { }}
      

  7.   

    没个线程执行后,sleep一定时间,不知道能否解决你的问题
      

  8.   

    主线程每隔一段时间检查一下List中是否有数据,如果没有则sleep,如果有的话,启动线程处理,当线程处理完数据后,退出线程。
      

  9.   

    我用了sleep还是有重复的数据 随即sleep 只是减小了重复的几率..
      

  10.   


    server = new UDPServer();
        for (int i = 0; i < threads.length; i++) {
              threads[i] = new Thread(server);//这里明明已经new了
          }    这里明明已经new了,
     @Override
        public void startServer() {
            for (Thread t : threads) {
                t = new Thread(server);//为何这里还new
                t.start();
            }
        }
    改成这样,直接start        public void startServer() {
            for (Thread t : threads) {
                t.start();
            }
        }这是问题1哦还有啊,多个DatagramSocket都绑定到了同一个ip地址和同一个端口了,这样会让到达的数据包复制n份给这些Datagramsocket,可能是你每个线程都new 了Datagramsocket,所以每个线程都会处理
      

  11.   


    server = new UDPServer();
        for (int i = 0; i < threads.length; i++) {
              threads[i] = new Thread(server);//这里明明已经new了
          }    这里明明已经new了,
     @Override
        public void startServer() {
            for (Thread t : threads) {
                t = new Thread(server);//为何这里还new
                t.start();
            }
        }
    改成这样,直接start        public void startServer() {
            for (Thread t : threads) {
                t.start();
            }
        }这是问题1哦还有啊,多个DatagramSocket都绑定到了同一个ip地址和同一个端口了,这样会让到达的数据包复制n份给这些Datagramsocket,可能是你每个线程都new 了Datagramsocket,所以每个线程都会处理

    突然进来看看这个帖子 后来我是用多个端口解决的 每一个端口一个线程来监听 那边发数据也发送到不同的端口