近期开发一个项目处于时间的考虑,需要用到多线程处理数据,并将处理后的数据输出到不同用户编号下生成的文件中。但是,在实际测试启动两个线程的情况下,输出文件的时候,第一个启动的线程输出的开始部分信息是正确的,只要第二个线程一启动,第一个线程和第二个线程查询到的数据就全部的输出到第二个线程对应的文件中。感觉非常的混乱,用锁试了下也没办法锁定,要不就是锁死了连文件也不生成,要不就随机写!由于第一次用到多线程,求高手们帮忙解决下!
下面贴出部分代码:public class AcctrptApplication {
// log4j引用
private static Logger log = Logger.getLogger(AcctrptApplication.class);
private static ReadWriteLock rwl = new ReentrantReadWriteLock(); /**
* @批量入口
* @param args
*/
public static void main(String[] args) {
Map argsMap = null;
try {
argsMap = checkArgs(args);
} catch (Exception e1) {
e1.printStackTrace();
}
// 读取配置文件
try {
AcctrptAppConfig.init(); } catch (Exception e) {
e.printStackTrace();
}
long start=System.currentTimeMillis();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
log.info("【作业开始】" + "开始时间:" + sdf.format((date.getTime())));
ApplicationCache.init();
List threadTaskes = new ArrayList();
// 获取最大线程
int maxThread = ((Integer) argsMap.get("-t")).intValue();
// 平均线程
int ave = 0;
try {
ave = maxThread / ApplicationCache.acctRptInfoList.size(); } catch (Exception e1) {
log.error("没有符合跑批对象!");
e1.printStackTrace();
System.exit(0);
}
if (ave == 0) {
ave = 1;
}
for (AcctRptInfoBean AcctRpt : ApplicationCache.acctRptInfoList) {
rwl.writeLock().lock();
threadTaskes.add(new MutileTasksManager(AcctRpt, ave,rwl));
}
List resul = null;
ExecutorService exec = Executors.newCachedThreadPool();
try {
resul = exec.invokeAll(threadTaskes);
exec.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RejectedExecutionException e) {
log.error("任务无法安排执行");
e.printStackTrace();
log.info("【没有数据符合跑批条件,退出批量应用程序!】");
System.exit(0);
} finally {
ApplicationCache.close();
}
if (resul != null) {
for (Iterator iterator = resul.iterator(); iterator.hasNext();) {
Future future = (Future) iterator.next();
try {
log.info(future.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
long end=System.currentTimeMillis();
long runtime=end-start;
log.info("【批量账户报告作业结束】" + "结束时间:" + sdf.format((date.getTime())));
log.info("【批量账户报告作业完成】" + "总计时间:"
+ tiemFomat(runtime));
System.exit(0);
} private static Map<String, Object> checkArgs(String[] args)
throws Exception {
Map argsMap = new HashMap(); argsMap.put("-t", Integer.valueOf(10)); int i = 0;
for (int size = args.length; i < size; i++) {
String s = args[i];
if ((s.equals("-t"))) {
if (i + 1 < size) {
i++;
argsMap.put(s, Integer.valueOf(Integer.parseInt(args[i])));
}
}
}
return argsMap;
}
}第二个文件:public class MutileTasksManager implements Callable<String> {
// 引用log4j
private static Logger log = Logger.getLogger(MutileTasksManager.class);
// 账户报告信息
private AcctRptInfoBean AcctRpt;
// 线程数量
private int threadNumber;
private String rptId;
public static ExecutorService exec;
private ReadWriteLock rwl;
private Lock lock = new ReentrantLock(); /**
* @构造方法
* @param AcctRpt
* @param threadNumber
*/
public MutileTasksManager(AcctRptInfoBean AcctRpt, int threadNumber,ReadWriteLock rwl) {
this.AcctRpt = AcctRpt;
this.threadNumber = threadNumber;
this.rptId = AcctRpt.getRPTID();
this.rwl=rwl;
}; /***
* @线程池调度
*/
public String call() throws Exception {
// TODO Auto-generated method stub
// System.out.println("111");
log.info("【线程分配】:" + "【报告模板】:" + this.AcctRpt.getRPTID() + " "
+ "线程任务开始!");
exec = null;
exec = Executors
.newFixedThreadPool(threadNumber + 1); BlockingQueue queue = new ArrayBlockingQueue(threadNumber); // 查询
GetAcctpritInfoTasks select = new GetAcctpritInfoTasks(threadNumber,
queue, AcctRpt);
exec.execute(select);
// 线程池分配线程
List threadTasks = new ArrayList();
for (int i = 0; i < threadNumber; i++) {
//rwl.writeLock().tryLock();
threadTasks.add(new GenerateRptManagerTasks(AcctRpt, queue,rwl));
}
List resList = null;
try {
resList = exec.invokeAll(threadTasks);
} catch (InterruptedException e) {
rwl.writeLock().unlock();
e.printStackTrace();
}finally{
}
exec.shutdown();
String result = "查询模板记录:" + select.getQueryNum() + " 条";
log.info("【线程结束】");
return result;
}
}
这个是处理数据输出到文件的。以前没用过线程,还望懂的多指点下,先谢谢了!
下面贴出部分代码:public class AcctrptApplication {
// log4j引用
private static Logger log = Logger.getLogger(AcctrptApplication.class);
private static ReadWriteLock rwl = new ReentrantReadWriteLock(); /**
* @批量入口
* @param args
*/
public static void main(String[] args) {
Map argsMap = null;
try {
argsMap = checkArgs(args);
} catch (Exception e1) {
e1.printStackTrace();
}
// 读取配置文件
try {
AcctrptAppConfig.init(); } catch (Exception e) {
e.printStackTrace();
}
long start=System.currentTimeMillis();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
log.info("【作业开始】" + "开始时间:" + sdf.format((date.getTime())));
ApplicationCache.init();
List threadTaskes = new ArrayList();
// 获取最大线程
int maxThread = ((Integer) argsMap.get("-t")).intValue();
// 平均线程
int ave = 0;
try {
ave = maxThread / ApplicationCache.acctRptInfoList.size(); } catch (Exception e1) {
log.error("没有符合跑批对象!");
e1.printStackTrace();
System.exit(0);
}
if (ave == 0) {
ave = 1;
}
for (AcctRptInfoBean AcctRpt : ApplicationCache.acctRptInfoList) {
rwl.writeLock().lock();
threadTaskes.add(new MutileTasksManager(AcctRpt, ave,rwl));
}
List resul = null;
ExecutorService exec = Executors.newCachedThreadPool();
try {
resul = exec.invokeAll(threadTaskes);
exec.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RejectedExecutionException e) {
log.error("任务无法安排执行");
e.printStackTrace();
log.info("【没有数据符合跑批条件,退出批量应用程序!】");
System.exit(0);
} finally {
ApplicationCache.close();
}
if (resul != null) {
for (Iterator iterator = resul.iterator(); iterator.hasNext();) {
Future future = (Future) iterator.next();
try {
log.info(future.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
long end=System.currentTimeMillis();
long runtime=end-start;
log.info("【批量账户报告作业结束】" + "结束时间:" + sdf.format((date.getTime())));
log.info("【批量账户报告作业完成】" + "总计时间:"
+ tiemFomat(runtime));
System.exit(0);
} private static Map<String, Object> checkArgs(String[] args)
throws Exception {
Map argsMap = new HashMap(); argsMap.put("-t", Integer.valueOf(10)); int i = 0;
for (int size = args.length; i < size; i++) {
String s = args[i];
if ((s.equals("-t"))) {
if (i + 1 < size) {
i++;
argsMap.put(s, Integer.valueOf(Integer.parseInt(args[i])));
}
}
}
return argsMap;
}
}第二个文件:public class MutileTasksManager implements Callable<String> {
// 引用log4j
private static Logger log = Logger.getLogger(MutileTasksManager.class);
// 账户报告信息
private AcctRptInfoBean AcctRpt;
// 线程数量
private int threadNumber;
private String rptId;
public static ExecutorService exec;
private ReadWriteLock rwl;
private Lock lock = new ReentrantLock(); /**
* @构造方法
* @param AcctRpt
* @param threadNumber
*/
public MutileTasksManager(AcctRptInfoBean AcctRpt, int threadNumber,ReadWriteLock rwl) {
this.AcctRpt = AcctRpt;
this.threadNumber = threadNumber;
this.rptId = AcctRpt.getRPTID();
this.rwl=rwl;
}; /***
* @线程池调度
*/
public String call() throws Exception {
// TODO Auto-generated method stub
// System.out.println("111");
log.info("【线程分配】:" + "【报告模板】:" + this.AcctRpt.getRPTID() + " "
+ "线程任务开始!");
exec = null;
exec = Executors
.newFixedThreadPool(threadNumber + 1); BlockingQueue queue = new ArrayBlockingQueue(threadNumber); // 查询
GetAcctpritInfoTasks select = new GetAcctpritInfoTasks(threadNumber,
queue, AcctRpt);
exec.execute(select);
// 线程池分配线程
List threadTasks = new ArrayList();
for (int i = 0; i < threadNumber; i++) {
//rwl.writeLock().tryLock();
threadTasks.add(new GenerateRptManagerTasks(AcctRpt, queue,rwl));
}
List resList = null;
try {
resList = exec.invokeAll(threadTasks);
} catch (InterruptedException e) {
rwl.writeLock().unlock();
e.printStackTrace();
}finally{
}
exec.shutdown();
String result = "查询模板记录:" + select.getQueryNum() + " 条";
log.info("【线程结束】");
return result;
}
}
这个是处理数据输出到文件的。以前没用过线程,还望懂的多指点下,先谢谢了!
// 引用Log4j
private static Logger log = Logger.getLogger(GetAcctpritInfoTasks.class);
// DBBase
private DBConnectionBase dbBase;
// 线程数量
private int threedNumber;
// 账户信息
private AcctRptInfoBean acctRptInfo;
// public static List<AcctRptRelBean> acctRptDelList = null;
// 绑定队列
private BlockingQueue queue;
//
private int queryNum = 0;
public static List acctRptDelInfoList = null; /**
* @构造方法
* @param threedNumber
* @param queue
* @param acctRptInfo
*/
public GetAcctpritInfoTasks(int threedNumber, BlockingQueue queue,
AcctRptInfoBean acctRptInfo) { this.threedNumber = threedNumber;
this.queue = queue;
this.acctRptInfo = acctRptInfo;
this.dbBase = new DBConnectionBase(); } /**
* @执行线程获取数据
*/
public void run() {
// TODO Auto-generated method stub
log.info("【获取数据】:" + acctRptInfo.getRPTID()); while (true) {
try {
acctRptDelInfoList = null;
acctRptDelInfoList = dbBase.getAcctRptDel(acctRptInfo.getRPTID());
} catch (Exception e) {
e.printStackTrace();
}
if (acctRptDelInfoList.size() == 0) {
System.out.println("acctRptDelInfoList.size() =" + acctRptInfo.getRPTID()+ "="+acctRptDelInfoList.size());
} else {
queryNum = acctRptDelInfoList.size();
try {
queue.put(acctRptDelInfoList);
} catch (Exception e) {
e.printStackTrace();
}
}
for (int i = 0; i < threedNumber; i++) {
try {
this.queue.put(Collections.EMPTY_LIST);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
dbBase.closeDbconn();
}
}
public int getQueryNum() {
return this.queryNum;
}
}第四个public class GenerateRptManagerTasks implements Callable<TasksInfo> {
// 引用log4j
private static Logger log = Logger.getLogger(GenerateRptManagerTasks.class);
// 报告信息bean
private AcctRptInfoBean AcctRpt;
//
private BlockingQueue queue;
private TasksInfo tasksInfo;
private int receiveNum;
private int complexNum;
private DBConnectionBase dbBase = null;
//public float f_banlance = 0.0f; // 期初
private ReadWriteLock rwl;
private Lock lock = new ReentrantLock();
private Condition _save = lock.newCondition();
private String pathFile =null;
public GenerateRptManagerTasks() {
// TODO Auto-generated constructor stub
}
public GenerateRptManagerTasks(AcctRptInfoBean AcctRpt, BlockingQueue queue,ReadWriteLock rwl) {
// TODO Auto-generated constructor stub
this.AcctRpt = AcctRpt;
this.tasksInfo = new TasksInfo();
this.queue = queue;
receiveNum = 0;
complexNum = 0;
dbBase = new DBConnectionBase();
this.rwl=rwl;
} /**
*
*/
public TasksInfo call() throws Exception {
// TODO Auto-generated method stub
long start = System.currentTimeMillis();
List dataList;
int putptr = 0;
int count = 0;
// lock.lock();
try {
while ((dataList = (List) queue.take()) != Collections.EMPTY_LIST) {
receiveNum += dataList.size();
//
relateAcctHis(dataList);rwl.writeLock().lock();
}
tasksInfo.setState(true);
} catch (InterruptedException e) {
e.printStackTrace();
rwl.writeLock().unlock();
tasksInfo.setState(false);
}finally
{
//
//lock.unlock();
} long end = System.currentTimeMillis();
tasksInfo.setTime(end - start);
tasksInfo.setComplex(complexNum);
tasksInfo.setReceive(receiveNum); return tasksInfo;
} /**
*
* @通过账号获取相关历史交易记录
* @param dataList
* @throws Exception
*/
public void relateAcctHis(List<AcctRptRelBean> dataList) throws Exception {
// TODO Auto-generated method stub
// 账户历史交易记录List
//rwl = new ReentrantReadWriteLock();
List<AcctxnHisDetailBean> acctxnHisDetailBean = null;
RptHisDetailBean rptLog = null;
GenerateAcctrptFile genFile = new GenerateAcctrptFile();
// String pathFile = null;
int i_inCount = 1;
int i ; for ( i = 0; i < dataList.size(); i++) { AcctRptRelBean acctRel = (AcctRptRelBean) dataList.get(i);
boolean LastRptId=false;
if(i==(dataList.size()-1)){
LastRptId=true;
}
double f_banlance = 0.0f;
acctxnHisDetailBean = dbBase.getAcctxnHisDetail(
acctRel.getACCTNO(), AcctRpt);
if (AcctRpt.getRPT_TYPE().equals("01")) {
f_banlance = dbBase.getStartBalance(acctRel.getACCTNO(), AcctRpt.getRPT_CYCLE()); }
if (AcctRpt.getRPT_FORMAT().equals("TXT")) {
i_inCount = genFile.writeTxt(acctxnHisDetailBean, AcctRpt,
pathFile, i_inCount,LastRptId,f_banlance);
}// csv文件格式
else if (AcctRpt.getRPT_FORMAT().equals("CSV")) {
i_inCount = genFile.writeCsv(acctxnHisDetailBean, AcctRpt, i_inCount,LastRptId,f_banlance);
System.out.println("RptID: "+acctRel.getRPTID()+"\t账号:" +acctRel.getACCTNO() + "\t期初余额:" +f_banlance);
}
}
rptLog = new RptHisDetailBean();
CommUtil comm = new CommUtil();
comm.writeLog(rptLog,AcctRpt,pathFile,dbBase);
dbBase.updateAcctInfo(AcctRpt);
//rwl.writeLock().unlock();
dbBase.closeDbconn();
dbBase.closeSdbDbconn();
}
}最后个public class GenerateAcctrptFile {
//创建文件
pathFile = new FileCreateImp().getFileNamePath(AcctRpt);
//打开文件
csv.open(pathFile);
//写数据
csv.write(new String[] { " ", " ", " ", " ", " ", "对账单报告" });
...
//关闭
}