程序开始执行这个方法public class ScoketRun implements Runnable { private ThreadPoolManager poolManager = null;
public void run() {
poolManager = ThreadPoolManager.newInstance();
poolManager.doStart();
}
public ThreadPoolManager getPoolManager() {
return poolManager;
}
}线程池类public class ThreadPoolManager { private static ThreadPoolManager tpm = new ThreadPoolManager();
private static ServerSocket serverSocket;
public static int COUNT = 0;
// 线程池维护线程的最少数量
private final static int CORE_POOL_SIZE = 30;
// 线程池维护线程的最大数量
private final static int MAX_POOL_SIZE = 50;
// 线程池维护线程所允许的空闲时间
private final static int KEEP_ALIVE_TIME = 0;
// 线程池所使用的缓冲队列大小
private final static int WORK_QUEUE_SIZE = 30;
// 消息缓冲队列
private final static Queue msgQueue = new LinkedList();
// 访问消息缓存的调度线程
final Runnable accessBufferThread = new Runnable() { public void run() {
// 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中
if (hasMoreAcquire()) {
String msg = (String) msgQueue.poll();
Runnable task = new AccessDBThread(msg);
threadPool.execute(task);
}
}
};
private final static RejectedExecutionHandler handler = new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(((ServerHandler) r) + "消息放入队列中重新等待执行");
msgQueue.offer(r);
}
};
// 管理访问的线程池
private final static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new ArrayBlockingQueue(WORK_QUEUE_SIZE), handler); // 调度线程池
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(
accessBufferThread, 0, 1, TimeUnit.SECONDS); public static ThreadPoolManager newInstance() {
try {
serverSocket = new ServerSocket(SocketSwitch.PORT);
} catch (IOException e) {
e.printStackTrace();
}
return tpm;
} private ThreadPoolManager() {
} private boolean hasMoreAcquire() {
return !msgQueue.isEmpty();
} public void addLogMsg(String msg) {
Runnable task = new AccessDBThread(msg);
threadPool.execute(task);
} public static void doStart() {
while (true) {
try {
Socket socket = serverSocket.accept();
System.out.println("获得socket");//程序运行一段时间(几个小时)后每次都在这里停住了,下面一行代码没有执行.请教,为什么我的程序运行一段时间后就不执行下面这行代码了.
threadPool.execute(new ServerHandler(socket));
} catch (IOException e) {
e.printStackTrace();
}
}
} public static void doEnd() {
try {
if (serverSocket != null && !serverSocket.isClosed()) {
serverSocket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}这里是处理类public class ServerHandler extends Thread { private Logger logger = Logger.getLogger(ServerHandler.class);
private Socket socket;
private InputStream is;
private OutputStream os;
private DataDao dao = new DataDao();
StringBuffer DTUNUM = new StringBuffer();//DTU号码
String znum = "";//站点编号
SendMap sendmap = new SendMap();
public ServerHandler(Socket socket) {
this.socket = socket;
} @Override
public void run() {
System.out.println(ThreadPoolManager.COUNT
+ " New connection accepted " + socket.getInetAddress()
+ ":" + socket.getPort());
ThreadPoolManager.COUNT = ThreadPoolManager.COUNT + 1;
System.out.println(socket);
try {
is = socket.getInputStream();
os = socket.getOutputStream();
} catch (IOException ex) {
java.util.logging.Logger.getLogger(ServerHandler.class.getName()).log(Level.SEVERE, null, ex);
return;
}
List data = new ArrayList();
int i = 0;
int j = 0;
while (socket.isConnected()&&is!=null&&os!=null) {
try {
i = 0;
j = 0;
data.clear();
System.out.println("准备进入循环");
//int length = is.available(); //此方法不可阻塞
while (is != null) {
i = is.read();
//当关闭程序时跳出循环;
if (i == -1) {
return;
}
//将报文中的数据字节不足2位的前面补0
if (Integer.toHexString(i).length() < 2) {
System.out.print("0" + Integer.toHexString(i) + " ");
data.add("0" + Integer.toHexString(i));
} else {
System.out.print(Integer.toHexString(i) + " ");
data.add(Integer.toHexString(i));
}
//当注册包和用户数据读完时跳出循环
if ((j == 21 || j == 83) && (Integer.toHexString(i).equals("7b"))) {
break;
}
//传输方式为‘透明传输’,报文读完时会跳出循环。非透明传输时应该删除这个条件
if (j == 67) {
logger.info("原始数据" + data.toString());
break;
}
j++; }
System.out.println("跳出循环j=" + j); //获取需要发送的字符串
String strdata = returndata(j, data);
os.write(Util.hexStringToBytes(strdata));
os.flush();
System.out.println("应答" + strdata);
Thread.sleep(1000);
}
} catch (Exception e) {
try {
os.close();
is.close();
socket.close();
logger.error("异常" + e.getMessage());
e.printStackTrace();
break;
} catch (IOException ex) {
java.util.logging.Logger.getLogger(ServerHandler.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
public void run() {
poolManager = ThreadPoolManager.newInstance();
poolManager.doStart();
}
public ThreadPoolManager getPoolManager() {
return poolManager;
}
}线程池类public class ThreadPoolManager { private static ThreadPoolManager tpm = new ThreadPoolManager();
private static ServerSocket serverSocket;
public static int COUNT = 0;
// 线程池维护线程的最少数量
private final static int CORE_POOL_SIZE = 30;
// 线程池维护线程的最大数量
private final static int MAX_POOL_SIZE = 50;
// 线程池维护线程所允许的空闲时间
private final static int KEEP_ALIVE_TIME = 0;
// 线程池所使用的缓冲队列大小
private final static int WORK_QUEUE_SIZE = 30;
// 消息缓冲队列
private final static Queue msgQueue = new LinkedList();
// 访问消息缓存的调度线程
final Runnable accessBufferThread = new Runnable() { public void run() {
// 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中
if (hasMoreAcquire()) {
String msg = (String) msgQueue.poll();
Runnable task = new AccessDBThread(msg);
threadPool.execute(task);
}
}
};
private final static RejectedExecutionHandler handler = new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(((ServerHandler) r) + "消息放入队列中重新等待执行");
msgQueue.offer(r);
}
};
// 管理访问的线程池
private final static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new ArrayBlockingQueue(WORK_QUEUE_SIZE), handler); // 调度线程池
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(
accessBufferThread, 0, 1, TimeUnit.SECONDS); public static ThreadPoolManager newInstance() {
try {
serverSocket = new ServerSocket(SocketSwitch.PORT);
} catch (IOException e) {
e.printStackTrace();
}
return tpm;
} private ThreadPoolManager() {
} private boolean hasMoreAcquire() {
return !msgQueue.isEmpty();
} public void addLogMsg(String msg) {
Runnable task = new AccessDBThread(msg);
threadPool.execute(task);
} public static void doStart() {
while (true) {
try {
Socket socket = serverSocket.accept();
System.out.println("获得socket");//程序运行一段时间(几个小时)后每次都在这里停住了,下面一行代码没有执行.请教,为什么我的程序运行一段时间后就不执行下面这行代码了.
threadPool.execute(new ServerHandler(socket));
} catch (IOException e) {
e.printStackTrace();
}
}
} public static void doEnd() {
try {
if (serverSocket != null && !serverSocket.isClosed()) {
serverSocket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}这里是处理类public class ServerHandler extends Thread { private Logger logger = Logger.getLogger(ServerHandler.class);
private Socket socket;
private InputStream is;
private OutputStream os;
private DataDao dao = new DataDao();
StringBuffer DTUNUM = new StringBuffer();//DTU号码
String znum = "";//站点编号
SendMap sendmap = new SendMap();
public ServerHandler(Socket socket) {
this.socket = socket;
} @Override
public void run() {
System.out.println(ThreadPoolManager.COUNT
+ " New connection accepted " + socket.getInetAddress()
+ ":" + socket.getPort());
ThreadPoolManager.COUNT = ThreadPoolManager.COUNT + 1;
System.out.println(socket);
try {
is = socket.getInputStream();
os = socket.getOutputStream();
} catch (IOException ex) {
java.util.logging.Logger.getLogger(ServerHandler.class.getName()).log(Level.SEVERE, null, ex);
return;
}
List data = new ArrayList();
int i = 0;
int j = 0;
while (socket.isConnected()&&is!=null&&os!=null) {
try {
i = 0;
j = 0;
data.clear();
System.out.println("准备进入循环");
//int length = is.available(); //此方法不可阻塞
while (is != null) {
i = is.read();
//当关闭程序时跳出循环;
if (i == -1) {
return;
}
//将报文中的数据字节不足2位的前面补0
if (Integer.toHexString(i).length() < 2) {
System.out.print("0" + Integer.toHexString(i) + " ");
data.add("0" + Integer.toHexString(i));
} else {
System.out.print(Integer.toHexString(i) + " ");
data.add(Integer.toHexString(i));
}
//当注册包和用户数据读完时跳出循环
if ((j == 21 || j == 83) && (Integer.toHexString(i).equals("7b"))) {
break;
}
//传输方式为‘透明传输’,报文读完时会跳出循环。非透明传输时应该删除这个条件
if (j == 67) {
logger.info("原始数据" + data.toString());
break;
}
j++; }
System.out.println("跳出循环j=" + j); //获取需要发送的字符串
String strdata = returndata(j, data);
os.write(Util.hexStringToBytes(strdata));
os.flush();
System.out.println("应答" + strdata);
Thread.sleep(1000);
}
} catch (Exception e) {
try {
os.close();
is.close();
socket.close();
logger.error("异常" + e.getMessage());
e.printStackTrace();
break;
} catch (IOException ex) {
java.util.logging.Logger.getLogger(ServerHandler.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
public static void doStart() {
while (true) {
try {
Socket socket = serverSocket.accept();
System.out.println("获得socket");//程序运行一段时间(几个小时)后每次都在这里停住了,下面一行代码没有执行.请教,为什么我的程序运行一段时间后就不执行下面这行代码了.
threadPool.execute(new ServerHandler(socket));
} catch (IOException e) {
e.printStackTrace();
}
}
}获得socket
这句话打印出来了,但下面一行代码就没有执行了.
不是在这一步停住了.System.out.println("获得socket"); 是执行完这一句后停止了.
当连接数达到 最大就 不会再做 accept了
i = is.read();
//当关闭程序时跳出循环;
if (i == -1) {
return;
}
这里读完你就直接return了,要加个finnally 去关闭socket和io,
你命令行 netstat 看下是不是 有 很多 tcp/IP 的close_wait状态?
根据你说的情况,感觉像是等待队列里面等待的时间太长了,线程池中线程都在执行,没有空闲的线程执行任务造成的
我这里的socket 是长连接, 不需要关闭的..而且程序不是在accept这里停住, 是在下一步
不知道什么原因,读完后, i 的值并没有等于-1,所以程序根本就没有执行到return这里.只有当我关闭程序或断开连接时i才等于-1.我这里的socket是长连接, 不需要(不能)关闭的.
每个线程io都阻塞了。解决这个问题,就行了。即使你把线程最大也没用。
read() == -1改成可以退出的条件就是。
return;
}
程序根本就没有运行到这里来.这是以前写的忘记删了.
i就没有等于过-1, 只有当我关闭程序时 i才等于-1
跳出循环是在这里
//传输方式为‘透明传输’,报文读完时会跳出循环。非透明传输时应该删除这个条件
if (j == 67) {
logger.info("原始数据" + data.toString());
break;
}
i的值之所以不会等于-1是因为read()方法一直阻塞在那里。
读文件和读socket是不一样的,读文件到达文件末尾之后会返回-1,但是读socket会一直阻塞在哪里等待socket的继续输入,除非你关闭socket,才会到达socket的末尾,返回-1。你先改一下试试,如果还不行晚上我替你调试一下程序。
logger.info("原始数据" + data.toString());
break;
}
我的程序是在这里跳出循环的. 不是在这里.
if (i == -1) {
return;
}
你的run方法里面有两个while循环。你那样写只能跳出自己那个while循环,外面还有一个。
自己看怎么解决!
你的意思是每个线程都让它一直在那循环,不打算释放?
如果是这个意思,那你线程池过一段时间就到了最大数了,那就会阻塞。
你这样不管设置多少线程池,即使设置int max。到达最大线程也只是时间问题。那你这个程序那不就在那等着时间蹦掉嘛?或者是我理解错了。
一共13个线程,每个线程都一直运行不释放,直到线程抛出异常时我会 socket.close();
当有一个线程抛出异常后,就会有一个新的线程加进来.