package com.xhc.business.bo.impl;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;import org.hibernate.Query;
import org.hibernate.Session;import com.xhc.business.bo.base.IDatatransfersBo;
import com.xhc.common.util.ConfigUtil;
import com.xhc.common.util.MessyCodeCheck;
import com.xhc.hibernate.po.TlcLogStddeliver;public class DatatransfersBo extends BaseBo implements IDatatransfersBo { // 常规变量定义
public static final String C1 = "XA";
public static final String C2 = "XB";
public static final String C3 = "XZ";
public static int n = 1;
private int pageSize = 500;
public static int i = 1;
public static Date startDate;
// 返回成功值
static final String SUCCESS = "OK"; public boolean findList() {
startDate = new Date();
// 查表状态为0的所有数据,表示未传
StringBuffer hql = new StringBuffer(
"from TlcLogStddeliver t where 1=1 ");
hql.append(" and t.proStatus='0'");
Query query = getDao().getHibernateTemplate().getSessionFactory()
.getCurrentSession().createQuery(hql.toString());
StringBuffer countHql = new StringBuffer(
"select count(*) from TlcLogStddeliver t where 1=1");
countHql.append(" and t.proStatus ='0' ");
List countList = getDao().find(countHql.toString());
long count = (Long) countList.get(0);
// 循环传输数据的总次数
for (int i = 1; i < count / pageSize; i++) {
List<TlcLogStddeliver> listTable = query.setFirstResult(
(i - 1) * pageSize).setMaxResults(pageSize).list();
if (null != listTable && listTable.size() > 0) {
Thread t = this.createTask(listTable);
t.start();
} }
return true;
} /**
 * 以http get 方式发送参数到相应的uri
 * 
 * @param param
 *            格式 m=?&c=?
 * @param uri
 *            网页地址
 * @return 返回网页内容或错误信息
 */
public static String handlerSendURL(String param, String uri) {
String backStr = "";
HttpURLConnection httpConnection = null;
InputStream input = null;
try {
String urlStr = uri + param;
URL url = new URL(urlStr);
httpConnection = (HttpURLConnection) url.openConnection();
httpConnection.setDoOutput(true);
httpConnection.setReadTimeout(10000);
httpConnection.setConnectTimeout(10000);
httpConnection.setRequestMethod("GET");
httpConnection.setRequestProperty("Content-Type",
"application/x-www-form-urlencoded");
OutputStream os = httpConnection.getOutputStream();
int responseCode = httpConnection.getResponseCode();
if (200 == responseCode) {
input = httpConnection.getInputStream();
} else {
input = httpConnection.getErrorStream();
}
BufferedReader bf = new BufferedReader(new InputStreamReader(input));
StringBuffer sbf = new StringBuffer();
String temp = "";
while ((temp = bf.readLine()) != null) {
sbf.append(temp);
}
backStr = sbf.toString();
input.close();
os.close();
bf.close();
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return backStr;
} /**
 * 根据对应的条件值更新状态值置为1
 * 
 * @param tlsTable
 */
public void update(TlcLogStddeliver tlsTable) {
Session session = getDao().getHibernateTemplate().getSessionFactory()
.getCurrentSession();
TlcLogStddeliver tls = (TlcLogStddeliver) session.createQuery(
"from TlcLogStddeliver tls where tls.getNumseqid=?")
.setParameter(1, tlsTable.getNumseqid());
tls.setProStatus("1");
session.update(tls);
session.beginTransaction();
} public Thread createTask(final List<TlcLogStddeliver> listTable) {
return new Thread() {
public void run() {
for (TlcLogStddeliver tlcTable : listTable) {
System.out.println("正在传送当前第" + DatatransfersBo.n + "条数据");
System.out.println("手机号---------"
+ tlcTable.getVc2srcmobile());
System.out
.println("手机内容" + tlcTable.getVc2messagecontent()); // 数据库存在乱码更新状态值为1
if (MessyCodeCheck.isMessyCode(tlcTable
.getVc2messagecontent())) {
update(tlcTable);
// 手机号为空更新状态值为1
} else if (null == tlcTable.getVc2srcmobile()
|| "" == tlcTable.getVc2srcmobile()) {
update(tlcTable);
}
// 手机内容为空更新状态值为1
else if (null == tlcTable.getVc2messagecontent()
|| "" == tlcTable.getVc2messagecontent()) {
update(tlcTable);
}
// 手机内容为XA开始的
else if (null != tlcTable.getVc2messagecontent()
&& tlcTable.getVc2messagecontent().length() > 5
&& tlcTable.getVc2messagecontent().substring(0, 2)
.equals(DatatransfersBo.C1)) {
String XAURL = ConfigUtil.getConfig().getString(
ConfigUtil.XA_URL_PATH);
// URL传参服务器返回状态值
String state = DatatransfersBo
.handlerSendURL(
"m="
+ URLEncoder.encode(tlcTable
.getVc2srcmobile())
+ "&c="
+ URLEncoder
.encode(
tlcTable
.getVc2messagecontent()
.substring(
tlcTable
.getVc2messagecontent()
.lastIndexOf(
'/') + 1))
.replaceAll("\\+",
"%20") + "",
XAURL);
// 判断返回状态如果为OK则更新表状态为1
if (state.equals(DatatransfersBo.SUCCESS)) {
update(tlcTable);
} else {
System.out.println("服务器接收失败");
// 重新传
DatatransfersBo.i--;
}
}
// 手机内容为XZ开始的将状态置为1
else if (null != tlcTable.getVc2messagecontent()
&& tlcTable.getVc2messagecontent().length() > 5
&& tlcTable.getVc2messagecontent().substring(0, 2)
.equals(DatatransfersBo.C3)) {
update(tlcTable);
}
// 内容为XB开始的将状态置为1
else if (null != tlcTable.getVc2messagecontent()
&& tlcTable.getVc2messagecontent().length() > 5
&& tlcTable.getVc2messagecontent().substring(0, 2)
.equals(DatatransfersBo.C2)) {
String XBURL = ConfigUtil.getConfig().getString(
ConfigUtil.XB_URL_PATH);
// URL传参返回状态值
String state = DatatransfersBo
.handlerSendURL(
"m="
+ URLEncoder.encode(tlcTable
.getVc2srcmobile())
+ "&c="
+ URLEncoder
.encode(
tlcTable
.getVc2messagecontent()
.substring(
tlcTable
.getVc2messagecontent()
.lastIndexOf(
'/') + 1))
.replaceAll("\\+",
"%20") + "",
XBURL);
// 判断返回状态如果为OK则更新表状态为1
if (state.equals(DatatransfersBo.SUCCESS)) {
update(tlcTable);
} else {
System.out.println("服务器接收失败");
// 重新传
DatatransfersBo.i--;
}
}
Date startDate = new Date();
Date endDate = new Date();
System.out
.println("结束时间: "
+ (new SimpleDateFormat("HH:mm:ss")
.format(endDate)));
System.out.println("总共用多少时间"
+ (endDate.getTime() - DatatransfersBo.startDate
.getTime()) / 1000 + "(秒)"); } }
};
}
}这段代码中的,循环那里如何控制我的线程并发数,循环的线程是总记录数/每次要传的次数,数据量很大的情况下,线程不好控制,请问如何解决

解决方案 »

  1.   

    代码太乱,太多,没看
    但是,我可以说,为什么不用线程池呢?java.util.concurrent.Executors里有很多方法创建固定个数的线程池还有java.util.concurrent.ThreadPoolExecutor等
      

  2.   

    startDate = new Date();
    StringBuffer countHql = new StringBuffer(
    "select count(*) from TlcLogStddeliver t where 1=1");
    countHql.append(" and t.proStatus='0'");
    List countList = getDao().find(countHql.toString());
    long count = (Long) countList.get(0);
    totalPage = (int) (count % pageSize == 0 ? count / pageSize : count
    / pageSize + 1);
    // 构造一个线程池
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 10, 3,
    TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
    new ThreadPoolExecutor.DiscardOldestPolicy());
    // 循环传输数据的总次数
    for (int i = 1; i <= totalPage; i++) {
    System.out.println("循环了第" + i + "次");
    // 查表状态为0的所有数据,表示未传
    StringBuffer hql = new StringBuffer(
    "from TlcLogStddeliver t where 1=1 ");
    hql.append(" and t.proStatus=?");
    Query query = getDao().getHibernateTemplate().getSessionFactory()
    .getCurrentSession().createQuery(hql.toString());
    query.setParameter(0, "0");
    List<TlcLogStddeliver> listTable = query
    .setFirstResult((i - 1) * pageSize).setMaxResults(pageSize)
    .list();
    if (null != listTable && listTable.size() > 0) {
    Thread t = this.createTask(listTable);
    threadPool.execute(t);
    t.start();
    } }我用了可是还是没有控制线程数,