1、侦听 lister线程:收数据并将数据放入线程池ConmuniDataPool中
ReciveData rd = new ReciveData();
rd.setData(newBuf);
int poolSize=ConmuniDataPool.getInstance().size();
Log.getInstance().outLog("数据接收池大小="+poolSize);
ConmuniDataPool.getInstance().add(rd);
2、处理线程池中的数据:
Enumeration<?> dataList = ConmuniDataPool.getInstance().elements();
while (dataList.hasMoreElements()) {
try {
ReciveData rd = (ReciveData) dataList.nextElement();
pool.execute(new ConmunicationDataProcess(rd));
dispatchCount++;
Log.getInstance().outLog("派发数目:"+dispatchCount);
if(dispatchCount%100==0){
Log.getInstance().outLog("接收线程池中任务数:"+pool.getQueue().size()+",池中线程数:"+pool.getPoolSize()+",getTaskCount="+pool.getTaskCount()+" ,getCompletedTaskCount="+pool.getCompletedTaskCount());
}
current = System.currentTimeMillis();
//处理完后将数据对象删除
ConmuniDataPool.getInstance().remove(rd);
} catch (RejectedExecutionException ex) {
ex.printStackTrace();
Log.getInstance().outLog(
"需要等待线程池中的空闲线程处理通讯端数据:"+ex.getMessage()+",接收线程池中任务数:"+pool.getQueue().size());
Thread.sleep(1000);
}
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Log.getInstance().outLog("ConmunicationDataDispatcher 错误2:"+e.getMessage());
}
}
3、 从线程池中获取数据,对数据进行解析后放入ToDataCenterList.getInstance().add(obj);
其中ToDataCenterList定义如下:
public class ToDataCenterList extends Vector<Object> { private static final long serialVersionUID = 1L; private ToDataCenterList() {
} private static ToDataCenterList instance = null; public static synchronized ToDataCenterList getInstance() {
if (instance == null) {
instance = new ToDataCenterList();
}
return instance;
}
}
5、 将数据发送给其他的服务器
public class DBDispatcher2 extends Thread {
public DBDispatcher2() {
} public void run() {
while (true) {
try {
//=== 发送对象到数据中心
int s = ToDataCenterList.getInstance().size();
if(s>50) ThreadPoolManager.getInstance().newActiveThread();
if(s<=50) ThreadPoolManager.getInstance().freeThread();
ToDataCenterList tcl = ToDataCenterList.getInstance();
//对象池中的数据为什么会是零??
Log.getInstance().outLog("对象池数据数为: "+tcl.size()); if ( s > 0) {
for(int i=0;i<tcl.size();){
Object data = (Object) tcl.remove(i);
if (!DBDataCenter2.getInstance().send(data)) {
tcl.add(data);
Log.getInstance().outLog("发送对象数据到数据服务中心失败");
Thread.sleep(500);
} else {
Log.getInstance().outLog("发送对象数据到数据服务中心成功");
}
}
} else {
try {
Thread.sleep(500);
ThreadPoolManager.getInstance().freeThread();
} catch (InterruptedException ex) {
Log.getInstance().outLog("DBDispatcher2--1 "+ex.getMessage());
} }
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
Log.getInstance().outLog("DBDispatcher2--2 "+ex.getMessage());
}
} catch (Exception ex) {
Log.getInstance().outLog("DBDispatcher2--3 "+ex.getMessage());
ex.printStackTrace();
try {
Thread.sleep(50);
} catch (InterruptedException ex1) {
Log.getInstance().outLog("DBDispatcher2--4 "+ex1.getMessage());
} finally {
} }
}
}
}刚刚启动运行正常,对象池中都有数据,但是
为什么运行一段时间后,监测到lister线程在不断的接收新的数据(数据接收池大小不断地在增长),但是对象池中的始终没有数据ToDataCenterList 大小为0
public class ThreadPoolManager
{
private static ThreadPoolManager instance = null;
private int maxThread=10;
public Vector freeThread;
public Vector activeThread;
public void setMaxThread(int threadCount)
{
maxThread = threadCount;
}
public static synchronized ThreadPoolManager getInstance() {
if (instance == null) {
instance = new ThreadPoolManager();
}
return instance;
}
public ThreadPoolManager()
{
setMaxThread(maxThread);
// System.out.println("启动对象数据分发线程池...");
Log.getInstance().outLog("启动对象数据分发线程池...");
freeThread = new Vector();
activeThread = new Vector();
for(int i = 1; i <= 50; i++) //20090511
{
DBDispatcher2 thread = new DBDispatcher2();
freeThread.addElement(thread);
}
}
public void newActiveThread()
{
if(freeThread.size()>0){
Log.getInstance().outLog("对象Active Thread : "+freeThread.size());
Thread th = (Thread)freeThread.get(0);
freeThread.remove(0);
th.start();
activeThread.add(th);
Log.getInstance().outLog("对象 Active Thread end : "+freeThread.size());
}else{
Log.getInstance().outLog("没有空闲线程发送对象数据");//没有空闲的线程如何处理呀?
}
}
public void freeThread(){
if(activeThread.size()>1){
Log.getInstance().outLog("对象free Active Thread: "+activeThread.size());
Thread th = (Thread)activeThread.get(0);
activeThread.remove(0);
th.stop();
th = new DBDispatcher2();
freeThread.add(th);
Log.getInstance().outLog(" 对象free Active Thread: "+activeThread.size());
}else{//如果小于等于1怎么办?
}
}
2009-05-12 00:29:35,785 [Thread-7]- 对象池数据数为: 0
2009-05-12 00:29:35,802 [Thread-5]- 非对象池数据数为: 0
2009-05-12 00:29:35,825 [Thread-0]- 接收前置数据数:37107
2009-05-12 00:29:35,825 [Thread-0]- 进入接收池的数据:[00000bbb000000233f80000030300123003514501]
2009-05-12 00:29:35,826 [Thread-0]- 数据接收池大小=34661
2009-05-12 00:29:35,826 [Thread-0]- 接收到智能终端上传数据:[00000bbb000000233f800]
2009-05-12 00:29:35,831 [Thread-0]- 接收前置数据数:37108
2009-05-12 00:29:35,832 [Thread-0]- 进入接收池的数据:[00000bbb000000233f8000003030012300]
2009-05-12 00:29:35,832 [Thread-0]- 数据接收池大小=34662
2009-05-12 00:29:35,832 [Thread-0]- 接收到智能终端上传数据:[00000bbb000000233f800]
2009-05-12 00:29:35,901 [Thread-0]- 接收前置数据数:37109
2009-05-12 00:29:35,901 [Thread-0]- 进入接收池的数据:[00000bbb000000363f80000030305236005324]
2009-05-12 00:29:35,901 [Thread-0]- 数据接收池大小=34663
2009-05-12 00:29:35,901 [Thread-0]- 接收到智能终端上传数据:[00000bbb000000363f80000030305236005324501f314]
2009-05-12 00:29:35,921 [Thread-0]- 接收前置数据数:37110
2009-05-12 00:29:35,921 [Thread-0]- 进入接收池的数据:[00000bbb000000363f800000303052360032515]
2009-05-12 00:29:35,921 [Thread-0]- 数据接收池大小=34664
2009-05-12 00:29:35,921 [Thread-0]- 接收到智能终端上传数据:[00000bbb000000363f8000003030523600]
2009-05-12 00:29:35,940 [Thread-0]- 接收前置数据数:37111
2009-05-12 00:29:35,940 [Thread-0]- 进入接收池的数据:[00000bbb000000363f80000030305236005d45501f3141682b0]
2009-05-12 00:29:35,940 [Thread-0]- 数据接收池大小=34665
2009-05-12 00:29:35,941 [Thread-0]- 接收到智能终端上传数据:[00000bbb000000363f80000030305236005d45501f31]
//2、处理线程池中的数据:
Enumeration <?> dataList = ConmuniDataPool.getInstance().elements();
while (dataList.hasMoreElements()) { //这个循环是处在另一个更大的循环中吗?
//如果不是,那么,你在遍历一次dataList后,该循环退出,以后受到的数据,将无法处理。
//ConmuniDataPool.getInstance().elements();
//这句拿到的枚举值,只是当前的情况,以后新加入的数据,需要你重新遍历,
//也就是要不断的做这个循环
try {
ReciveData rd = (ReciveData) dataList.nextElement(); pool.execute(new ConmunicationDataProcess(rd));
dispatchCount++;
Log.getInstance().outLog("派发数目:"+dispatchCount);
if(dispatchCount%100==0){
Log.getInstance().outLog("接收线程池中任务数:"+pool.getQueue().size()+",池中线程数:"+pool.getPoolSize()+",getTaskCount="+pool.getTaskCount()+" ,getCompletedTaskCount="+pool.getCompletedTaskCount());
}
current = System.currentTimeMillis();
// 处理完后将数据对象删除
ConmuniDataPool.getInstance().remove(rd);
} catch (RejectedExecutionException ex) {
ex.printStackTrace();
Log.getInstance().outLog(
"需要等待线程池中的空闲线程处理通讯端数据:"+ex.getMessage()+",接收线程池中任务数:"+pool.getQueue().size());
Thread.sleep(1000);
}
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Log.getInstance().outLog("ConmunicationDataDispatcher 错误2:"+e.getMessage());
}
} }
private int data_process_thread_num;
private boolean isRuning;
private final ThreadPoolExecutor pool;
private long current;
static long dispatchCount = 0;//测试排放数
public ConmunicationDataDispatcher() {
current = System.currentTimeMillis();
data_process_thread_num = Config.getInt("data_process_thread_num");
if (data_process_thread_num == 0) {
data_process_thread_num = 10;
}
pool = new ThreadPoolExecutor(10, data_process_thread_num, 4,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(150),
new ThreadPoolExecutor.CallerRunsPolicy());//DiscardOldestPolicy
Log.getInstance().outLog("接收线程池初始最大线程数目:"+pool.getMaximumPoolSize());
isRuning = true;
} public void run() {
while (isRuning) {
try {
if (ConmuniDataPool.getInstance().size() == 0) {
try {
Thread.sleep(1000);
dispatchCount=0;
Log.getInstance().outLog("数据池大小为零");
//此时应该怎么办?
continue;
} catch (InterruptedException ex1) {
Log.getInstance().outLog("ConmunicationDataDispatcher 错误:"+ex1.getMessage());
}
}
Log.getInstance().outLog("待处理通讯端数据数:"+ConmuniDataPool.getInstance().size());
Enumeration<?> dataList = ConmuniDataPool.getInstance().elements();
while (dataList.hasMoreElements()) {
try {
ReciveData rd = (ReciveData) dataList.nextElement();
//2009-05-11彭涛将ConmuniDataPool.getInstance().remove(rd);移到下面的地方
pool.execute(new ConmunicationDataProcess(rd));
//处理完后将数据对象删除
ConmuniDataPool.getInstance().remove(rd);
dispatchCount++;
Log.getInstance().outLog("派发数目:"+dispatchCount);
if(dispatchCount%100==0){
Log.getInstance().outLog("接收线程池中任务数:"+pool.getQueue().size()+",池中线程数:"+pool.getPoolSize()+",getTaskCount="+pool.getTaskCount()+" ,getCompletedTaskCount="+pool.getCompletedTaskCount());
}
current = System.currentTimeMillis();
} catch (RejectedExecutionException ex) {
ex.printStackTrace();
Log.getInstance().outLog(
"需要等待线程池中的空闲线程处理通讯端数据:"+ex.getMessage()+",接收线程池中任务数:"+pool.getQueue().size());
Thread.sleep(1000);
}
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Log.getInstance().outLog("ConmunicationDataDispatcher 错误2:"+e.getMessage());
}
}
} catch (Exception ex) {
Log.getInstance().outLog(
"通讯端数据分发服务异常" + ex.getStackTrace().toString());
isRuning = false;
break;
}
}
// try{
// pool.shutdownNow();
// }catch(Exception ex){
// Log.getInstance().outLog("ConmunicationDataDispatcher 错误3:"+ex.getMessage());
// }
Log.getInstance().outLog("通讯端数据分发服务退出");
} public boolean isRuning() {
return isRuning;
} public void setRuning(boolean isRuning) {
this.isRuning = isRuning;
} public long getCurrent() {
return current;
} public void setCurrent(long current) {
this.current = current;
}
}
private int data_process_thread_num;
private boolean isRuning;
private final ThreadPoolExecutor pool;
private long current;
static long dispatchCount = 0;//测试排放数 你这里使用的是ConmunicationDataDispatcher类的静态变量
并且我看你的ConmunicationDataDispatcher构造函数中没有对测试排放数进行处理
所以他一直为0
pool.execute(new ConmunicationDataProcess(rd));
dispatchCount++;
Log.getInstance().outLog("派发数目:"+dispatchCount);
if(dispatchCount%100==0){
Log.getInstance().outLog("接收线程池中任务数:"+pool.getQueue().size()+",池中线程数:"+pool.getPoolSize()+",getTaskCount="+pool.getTaskCount()+" ,getCompletedTaskCount="+pool.getCompletedTaskCount());
}