import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;import javax.sql.DataSource;import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.simple.SimpleJdbcInsert;
import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;public class ToolsDataBase implements Runnable {private SimpleJdbcTemplate simpleJdbcTemplate;
private SimpleJdbcInsert simpleJdbcInsert;private DataSource sourceDataSource;
private DataSource targetDataSource;
private String sourceTabName;
private String targetTabName;
private DataSource JdbcDataSource;// 业务库的链接private DataSource JdbcDataSource70;// 70数据库的链接private DataSource JdbcDataSource2;// DB2数据库连接private DataSource JdbcDataSource73;// 73上的DB2数据库连接
//查询插入队列
private BlockingQueue queue = new LinkedBlockingQueue(8);private JdbcTemplate jdbcTemplate;int fetchSize = 0,
//每次提交行数
maxRows = 2000;// 指示查询数据是否完成
public volatile boolean flag = false;
public boolean insertFlag = false;/**
* 查询数据
*   
* @throws InterruptedException
*   
* @throws DataAccessException
* @throws InterruptedException
*/
public boolean queryData() throws InterruptedException {
long start = System.currentTimeMillis();
Map map = new HashMap();
String sql = "SELECT * FROM (SELECT ROW_NUMBER() OVER ( ORDER BY recordid) ROWNUM, * FROM "
+ sourceTabName
+ ") CAL WHERE ROWNUM BETWEEN ( "
+ maxRows
+ " * "
+ fetchSize
+ " ) + 1 AND "
+ maxRows
+ " *("
+ fetchSize + "+1) ORDER BY recordid";
List list = simpleJdbcTemplate.queryForList(sql, map);
long end = System.currentTimeMillis();
System.out.println("~~查询:" + (end - start)/1000 + "秒");
start = System.currentTimeMillis();
if (list.size() == 0) {
return true;
} else {
queue.put(list);
fetchSize++;
return false;
}
}/**
* 插入数据
*   
* @throws InterruptedException
*/
public void InsertData() throws InterruptedException {
long start = System.currentTimeMillis();
List<Map<String, Object>> list = (List<Map<String, Object>>) queue
.take();
Map<String, Object>[] maps = convertList(list);
try {
simpleJdbcInsert.executeBatch(maps);
} catch (RuntimeException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("~~插入:" + (end - start)/1000 + "秒");
}/**
* 将泛型List转化为泛型数组.
*   
* @param list
* @return
*/
private Map<String, Object>[] convertList(List<Map<String, Object>> list) {
Map<String, Object>[] maps = (Map<String, Object>[]) new Map<?, ?>[list
.size()];
for (int i = 0; i < list.size(); i++) {
maps[i] = list.get(i);
}
return maps;
}//查询线程
public void run() {try {
long start = System.currentTimeMillis();
while (!flag) {
flag = queryData();
}
long end = System.currentTimeMillis();
System.out.println((end - start) + "ms");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}}/**
* 插入线程
*/
public void InsertRun() {
try {
while (!(flag && queue.size() == 0)) {
InsertData();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}public static void main(String[] args) throws Exception {
ApplicationContext ctx = new FileSystemXmlApplicationContext(
new String[] {
"/WebRoot/WEB-INF/config/spring/applicationContext.xml",
"/WebRoot/WEB-INF/config/spring/sys-Context.xml",
"/WebRoot/WEB-INF/config/spring/busi-Context.xml",
"/WebRoot/WEB-INF/config/spring/web-Context.xml",
"/WebRoot/WEB-INF/config/spring/base-Context.xml",
"/WebRoot/WEB-INF/config/spring/webServices.xml",
"/WebRoot/WEB-INF/config/spring/form-Context.xml",
"/WebRoot/WEB-INF/config/spring/log-Context.xml",
"/WebRoot/WEB-INF/config/spring/stat-Context.xml"
});
DataSource sourceDataSource = (DataSource) ctx
.getBean("JdbcDataSource70");
DataSource targetDataSource = (DataSource) ctx
.getBean("JdbcDataSource2");
ToolsDataBase toolsDataBase = new ToolsDataBase(sourceDataSource,
targetDataSource, "tdbase", "tdbase");
long start = System.currentTimeMillis();
new Thread(toolsDataBase).start();
toolsDataBase.InsertRun();
long end = System.currentTimeMillis();
System.out.println((end - start) + "ms");
}//开始工作.
public void doStart(){
long start = System.currentTimeMillis();
new Thread(this).start();
this.InsertRun();
long end = System.currentTimeMillis();
System.out.println((end - start) + "ms");
}public ToolsDataBase(DataSource sourceDataSource,
DataSource targetDataSource, String sourceTabName,
String targetTabName) {
super();
this.sourceDataSource = sourceDataSource;
this.targetDataSource = targetDataSource;
this.sourceTabName = sourceTabName;
this.targetTabName = targetTabName;
// 创建查询器
simpleJdbcTemplate = new SimpleJdbcTemplate(sourceDataSource);
// 创建插入器
simpleJdbcInsert = new SimpleJdbcInsert(targetDataSource)
.withTableName(targetTabName);
}
}