1、侦听    lister线程:收数据并将数据放入线程池ConmuniDataPool中
        
         ReciveData rd = new ReciveData();
rd.setData(newBuf);
int poolSize=ConmuniDataPool.getInstance().size();
Log.getInstance().outLog("数据接收池大小="+poolSize);
ConmuniDataPool.getInstance().add(rd);




2、处理线程池中的数据:
Enumeration<?> dataList = ConmuniDataPool.getInstance().elements();
while (dataList.hasMoreElements()) {
try {
ReciveData rd = (ReciveData) dataList.nextElement();

pool.execute(new ConmunicationDataProcess(rd));
dispatchCount++;
Log.getInstance().outLog("派发数目:"+dispatchCount);
if(dispatchCount%100==0){
Log.getInstance().outLog("接收线程池中任务数:"+pool.getQueue().size()+",池中线程数:"+pool.getPoolSize()+",getTaskCount="+pool.getTaskCount()+" ,getCompletedTaskCount="+pool.getCompletedTaskCount());
}
current = System.currentTimeMillis();
//处理完后将数据对象删除
ConmuniDataPool.getInstance().remove(rd);
} catch (RejectedExecutionException ex) {
ex.printStackTrace();
Log.getInstance().outLog(
"需要等待线程池中的空闲线程处理通讯端数据:"+ex.getMessage()+",接收线程池中任务数:"+pool.getQueue().size());
Thread.sleep(1000);
}
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Log.getInstance().outLog("ConmunicationDataDispatcher 错误2:"+e.getMessage());
}
}

3、 从线程池中获取数据,对数据进行解析后放入ToDataCenterList.getInstance().add(obj);
其中ToDataCenterList定义如下:

public class ToDataCenterList extends Vector<Object> { private static final long serialVersionUID = 1L; private ToDataCenterList() {
} private static ToDataCenterList instance = null; public static synchronized ToDataCenterList getInstance() {
if (instance == null) {
instance = new ToDataCenterList();
}
return instance;
}
}


5、 将数据发送给其他的服务器

public class DBDispatcher2 extends Thread {
public DBDispatcher2() {
} public void run() {
while (true) {
try {
//===  发送对象到数据中心
int s = ToDataCenterList.getInstance().size();
if(s>50) ThreadPoolManager.getInstance().newActiveThread();
if(s<=50) ThreadPoolManager.getInstance().freeThread();
ToDataCenterList tcl = ToDataCenterList.getInstance();
//对象池中的数据为什么会是零??
Log.getInstance().outLog("对象池数据数为: "+tcl.size()); if ( s > 0) {
for(int i=0;i<tcl.size();){
Object data = (Object) tcl.remove(i);
if (!DBDataCenter2.getInstance().send(data)) {
tcl.add(data);
Log.getInstance().outLog("发送对象数据到数据服务中心失败");
Thread.sleep(500);
} else {
Log.getInstance().outLog("发送对象数据到数据服务中心成功");
}
}
} else {
try {
Thread.sleep(500);
ThreadPoolManager.getInstance().freeThread();
} catch (InterruptedException ex) {
Log.getInstance().outLog("DBDispatcher2--1 "+ex.getMessage());
} }
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
Log.getInstance().outLog("DBDispatcher2--2 "+ex.getMessage());
}
} catch (Exception ex) {
Log.getInstance().outLog("DBDispatcher2--3 "+ex.getMessage());
ex.printStackTrace();
try {
Thread.sleep(50);
} catch (InterruptedException ex1) {
Log.getInstance().outLog("DBDispatcher2--4 "+ex1.getMessage());
} finally {
} }
}
}
}刚刚启动运行正常,对象池中都有数据,但是
为什么运行一段时间后,监测到lister线程在不断的接收新的数据(数据接收池大小不断地在增长),但是对象池中的始终没有数据ToDataCenterList 大小为0

解决方案 »

  1.   

    对线程的管理是这样的
    public class ThreadPoolManager 

    private static ThreadPoolManager instance = null;
    private int maxThread=10; 
    public Vector freeThread;
    public Vector activeThread;
    public void setMaxThread(int threadCount) 
     { 
    maxThread = threadCount; 
     } 

    public static synchronized ThreadPoolManager getInstance() {
    if (instance == null) {
    instance = new ThreadPoolManager();
    }
    return instance;
    }
     public ThreadPoolManager() 
     { 
     setMaxThread(maxThread); 
    //  System.out.println("启动对象数据分发线程池..."); 
     Log.getInstance().outLog("启动对象数据分发线程池...");
     freeThread = new Vector();
     activeThread = new Vector();
     for(int i = 1; i <= 50; i++) //20090511
     { 
     DBDispatcher2 thread = new DBDispatcher2(); 
     freeThread.addElement(thread);   
     } 
     

     
     public void newActiveThread() 
     { 
     if(freeThread.size()>0){
     Log.getInstance().outLog("对象Active Thread : "+freeThread.size());
     Thread th = (Thread)freeThread.get(0);
     freeThread.remove(0);
     th.start();
     activeThread.add(th);
     Log.getInstance().outLog("对象 Active Thread end : "+freeThread.size());
     }else{
     Log.getInstance().outLog("没有空闲线程发送对象数据");//没有空闲的线程如何处理呀?
     
     }
     
     } 
     public void freeThread(){
     if(activeThread.size()>1){
     Log.getInstance().outLog("对象free Active Thread: "+activeThread.size());
     Thread th = (Thread)activeThread.get(0);
     
     activeThread.remove(0);
     th.stop();
     th = new DBDispatcher2(); 
     freeThread.add(th);
     Log.getInstance().outLog(" 对象free Active Thread: "+activeThread.size());
     }else{//如果小于等于1怎么办?
     
     }

     }
      

  2.   

    日志文件监测如下:
    2009-05-12 00:29:35,785 [Thread-7]- 对象池数据数为: 0
    2009-05-12 00:29:35,802 [Thread-5]- 非对象池数据数为: 0
    2009-05-12 00:29:35,825 [Thread-0]- 接收前置数据数:37107
    2009-05-12 00:29:35,825 [Thread-0]- 进入接收池的数据:[00000bbb000000233f80000030300123003514501]
    2009-05-12 00:29:35,826 [Thread-0]- 数据接收池大小=34661
    2009-05-12 00:29:35,826 [Thread-0]- 接收到智能终端上传数据:[00000bbb000000233f800]
    2009-05-12 00:29:35,831 [Thread-0]- 接收前置数据数:37108
    2009-05-12 00:29:35,832 [Thread-0]- 进入接收池的数据:[00000bbb000000233f8000003030012300]
    2009-05-12 00:29:35,832 [Thread-0]- 数据接收池大小=34662
    2009-05-12 00:29:35,832 [Thread-0]- 接收到智能终端上传数据:[00000bbb000000233f800]
    2009-05-12 00:29:35,901 [Thread-0]- 接收前置数据数:37109
    2009-05-12 00:29:35,901 [Thread-0]- 进入接收池的数据:[00000bbb000000363f80000030305236005324]
    2009-05-12 00:29:35,901 [Thread-0]- 数据接收池大小=34663
    2009-05-12 00:29:35,901 [Thread-0]- 接收到智能终端上传数据:[00000bbb000000363f80000030305236005324501f314]
    2009-05-12 00:29:35,921 [Thread-0]- 接收前置数据数:37110
    2009-05-12 00:29:35,921 [Thread-0]- 进入接收池的数据:[00000bbb000000363f800000303052360032515]
    2009-05-12 00:29:35,921 [Thread-0]- 数据接收池大小=34664
    2009-05-12 00:29:35,921 [Thread-0]- 接收到智能终端上传数据:[00000bbb000000363f8000003030523600]
    2009-05-12 00:29:35,940 [Thread-0]- 接收前置数据数:37111
    2009-05-12 00:29:35,940 [Thread-0]- 进入接收池的数据:[00000bbb000000363f80000030305236005d45501f3141682b0]
    2009-05-12 00:29:35,940 [Thread-0]- 数据接收池大小=34665
    2009-05-12 00:29:35,941 [Thread-0]- 接收到智能终端上传数据:[00000bbb000000363f80000030305236005d45501f31]
      

  3.   

    代码有些多,简单看了一下,问题可能在
    //2、处理线程池中的数据: 
    Enumeration <?> dataList = ConmuniDataPool.getInstance().elements(); 
    while (dataList.hasMoreElements()) { //这个循环是处在另一个更大的循环中吗?
    //如果不是,那么,你在遍历一次dataList后,该循环退出,以后受到的数据,将无法处理。
    //ConmuniDataPool.getInstance().elements(); 
    //这句拿到的枚举值,只是当前的情况,以后新加入的数据,需要你重新遍历,
    //也就是要不断的做这个循环
    try { 
    ReciveData rd = (ReciveData) dataList.nextElement();  pool.execute(new ConmunicationDataProcess(rd)); 
    dispatchCount++; 
    Log.getInstance().outLog("派发数目:"+dispatchCount); 
    if(dispatchCount%100==0){ 
    Log.getInstance().outLog("接收线程池中任务数:"+pool.getQueue().size()+",池中线程数:"+pool.getPoolSize()+",getTaskCount="+pool.getTaskCount()+" ,getCompletedTaskCount="+pool.getCompletedTaskCount()); 

    current = System.currentTimeMillis(); 
    // 处理完后将数据对象删除 
    ConmuniDataPool.getInstance().remove(rd); 
    } catch (RejectedExecutionException ex) { 
    ex.printStackTrace(); 
    Log.getInstance().outLog( 
    "需要等待线程池中的空闲线程处理通讯端数据:"+ex.getMessage()+",接收线程池中任务数:"+pool.getQueue().size()); 
    Thread.sleep(1000); 

    try { 
    Thread.sleep(5); 
    } catch (InterruptedException e) { 
    Log.getInstance().outLog("ConmunicationDataDispatcher 错误2:"+e.getMessage()); 

    }  }
      

  4.   

    是这样的:public class ConmunicationDataDispatcher extends Thread {
    private int data_process_thread_num;
    private boolean isRuning;
    private final ThreadPoolExecutor pool;
    private long current;
    static long dispatchCount = 0;//测试排放数
    public ConmunicationDataDispatcher() {
    current = System.currentTimeMillis();
    data_process_thread_num = Config.getInt("data_process_thread_num");
    if (data_process_thread_num == 0) {
    data_process_thread_num = 10;
    }
    pool = new ThreadPoolExecutor(10, data_process_thread_num, 4,
    TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(150),
    new ThreadPoolExecutor.CallerRunsPolicy());//DiscardOldestPolicy
    Log.getInstance().outLog("接收线程池初始最大线程数目:"+pool.getMaximumPoolSize());
    isRuning = true;
    } public void run() {
    while (isRuning) {
    try {
    if (ConmuniDataPool.getInstance().size() == 0) {
    try {
    Thread.sleep(1000);
    dispatchCount=0;
    Log.getInstance().outLog("数据池大小为零");
    //此时应该怎么办?
    continue;
    } catch (InterruptedException ex1) {
    Log.getInstance().outLog("ConmunicationDataDispatcher 错误:"+ex1.getMessage());
    }
    }
    Log.getInstance().outLog("待处理通讯端数据数:"+ConmuniDataPool.getInstance().size());
    Enumeration<?> dataList = ConmuniDataPool.getInstance().elements();
    while (dataList.hasMoreElements()) {
    try {
    ReciveData rd = (ReciveData) dataList.nextElement();
    //2009-05-11彭涛将ConmuniDataPool.getInstance().remove(rd);移到下面的地方
    pool.execute(new ConmunicationDataProcess(rd));

    //处理完后将数据对象删除
    ConmuniDataPool.getInstance().remove(rd);
    dispatchCount++;
    Log.getInstance().outLog("派发数目:"+dispatchCount);
    if(dispatchCount%100==0){
    Log.getInstance().outLog("接收线程池中任务数:"+pool.getQueue().size()+",池中线程数:"+pool.getPoolSize()+",getTaskCount="+pool.getTaskCount()+" ,getCompletedTaskCount="+pool.getCompletedTaskCount());
    }
    current = System.currentTimeMillis();

    } catch (RejectedExecutionException ex) {
    ex.printStackTrace();
    Log.getInstance().outLog(
    "需要等待线程池中的空闲线程处理通讯端数据:"+ex.getMessage()+",接收线程池中任务数:"+pool.getQueue().size());
    Thread.sleep(1000);
    }
    try {
    Thread.sleep(5);
    } catch (InterruptedException e) {
    Log.getInstance().outLog("ConmunicationDataDispatcher 错误2:"+e.getMessage());
    }
    }
    } catch (Exception ex) {
    Log.getInstance().outLog(
    "通讯端数据分发服务异常" + ex.getStackTrace().toString());
    isRuning = false;
    break;
    }
    }

    // try{
    //   pool.shutdownNow();
    // }catch(Exception ex){
    // Log.getInstance().outLog("ConmunicationDataDispatcher 错误3:"+ex.getMessage());
    // }
    Log.getInstance().outLog("通讯端数据分发服务退出");
    } public boolean isRuning() {
    return isRuning;
    } public void setRuning(boolean isRuning) {
    this.isRuning = isRuning;
    } public long getCurrent() {
    return current;
    } public void setCurrent(long current) {
    this.current = current;
    }
    }
      

  5.   

    Log.getInstance().outLog("派发数目:"+dispatchCount); public class ConmunicationDataDispatcher extends Thread { 
    private int data_process_thread_num; 
    private boolean isRuning; 
    private final ThreadPoolExecutor pool; 
    private long current; 
    static long dispatchCount = 0;//测试排放数 你这里使用的是ConmunicationDataDispatcher类的静态变量
    并且我看你的ConmunicationDataDispatcher构造函数中没有对测试排放数进行处理
    所以他一直为0
      

  6.   

    第二段中有对dispatchCount 的处理:dispatchCount ++;
    pool.execute(new ConmunicationDataProcess(rd)); 
    dispatchCount++; 
    Log.getInstance().outLog("派发数目:"+dispatchCount); 
    if(dispatchCount%100==0){ 
    Log.getInstance().outLog("接收线程池中任务数:"+pool.getQueue().size()+",池中线程数:"+pool.getPoolSize()+",getTaskCount="+pool.getTaskCount()+" ,getCompletedTaskCount="+pool.getCompletedTaskCount());