共有10个线程读取csv文件
线程1读取1-20000
2读取20001-40000
共有10个线程
10个parser
应注意哪些东西
下面这个没用到多线程的隔离等代码
高手给我点播下巴
public class DealThread implements Runnable { private static Connection conn = ConnectionFactory.getConnection();
private int start;
private int end;
private CsvFileParser parser; // private
public DealThread(int s, int e, CsvFileParser p) {
this.start = s;
this.end = e;
this.parser = p;
} public void run() {
System.out.println(" start: " + start + " end: " + end);
PreparedStatement pstmt1 = null;
PreparedStatement pstmt2 = null;
PreparedStatement pstmt3 = null;
try {
try {
pstmt1 = conn
.prepareStatement("insert into CUSTOMER values(?,?,?,?,?,?)");
pstmt2 = conn
.prepareStatement("insert into CUSTOMER_INFO values(?,?,? )");
for (int i = start; i <= end; i++) {
try {
String id = UuidHex.getUuidHex();
String email = parser.getByFieldName("email");
String name = parser.getByFieldName("name");
String gender = parser.getByFieldName("gender");
int g = 0;
if (null != gender) {
g = Integer.parseInt(gender);
}
String city = parser.getByFieldName("city");
String cellphone = parser.getByFieldName("cellphone");
String telephone = parser.getByFieldName("telephone");
String salary = parser.getByFieldName("salary");
String ismarried = parser.getByFieldName("ismarried");
String education = parser.getByFieldName("education");
String industry = parser.getByFieldName("industry");
String corpname = parser.getByFieldName("corpname");
String corptype = parser.getByFieldName("corptype");
String position = parser.getByFieldName("position");
pstmt1.setString(1, id);
pstmt1.setString(2, email);
pstmt1.setString(3, email);
pstmt1.addBatch(); String customerInfoId = UuidHex.getUuidHex();
pstmt2.setString(1, customerInfoId);
pstmt2.setString(2, city);
pstmt2.setString(3, city);
pstmt2.setString(4, city);
pstmt2.addBatch();
if (i != 0 && i % 5000 == 0) {
System.out.println(i);
pstmt1.executeBatch();
pstmt2.executeBatch();
conn.commit();
}
if (!parser.hasMore()) {
break;
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
continue;
}
}
pstmt1.executeBatch();
pstmt2.executeBatch();
conn.commit();
} finally {
parser.close();
if (pstmt1 != null) {
pstmt1.close();
}
if (pstmt2 != null) {
pstmt2.close();
}
if (conn != null) {
conn.close();
}
}
} catch (NumberFormatException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
线程1读取1-20000
2读取20001-40000
共有10个线程
10个parser
应注意哪些东西
下面这个没用到多线程的隔离等代码
高手给我点播下巴
public class DealThread implements Runnable { private static Connection conn = ConnectionFactory.getConnection();
private int start;
private int end;
private CsvFileParser parser; // private
public DealThread(int s, int e, CsvFileParser p) {
this.start = s;
this.end = e;
this.parser = p;
} public void run() {
System.out.println(" start: " + start + " end: " + end);
PreparedStatement pstmt1 = null;
PreparedStatement pstmt2 = null;
PreparedStatement pstmt3 = null;
try {
try {
pstmt1 = conn
.prepareStatement("insert into CUSTOMER values(?,?,?,?,?,?)");
pstmt2 = conn
.prepareStatement("insert into CUSTOMER_INFO values(?,?,? )");
for (int i = start; i <= end; i++) {
try {
String id = UuidHex.getUuidHex();
String email = parser.getByFieldName("email");
String name = parser.getByFieldName("name");
String gender = parser.getByFieldName("gender");
int g = 0;
if (null != gender) {
g = Integer.parseInt(gender);
}
String city = parser.getByFieldName("city");
String cellphone = parser.getByFieldName("cellphone");
String telephone = parser.getByFieldName("telephone");
String salary = parser.getByFieldName("salary");
String ismarried = parser.getByFieldName("ismarried");
String education = parser.getByFieldName("education");
String industry = parser.getByFieldName("industry");
String corpname = parser.getByFieldName("corpname");
String corptype = parser.getByFieldName("corptype");
String position = parser.getByFieldName("position");
pstmt1.setString(1, id);
pstmt1.setString(2, email);
pstmt1.setString(3, email);
pstmt1.addBatch(); String customerInfoId = UuidHex.getUuidHex();
pstmt2.setString(1, customerInfoId);
pstmt2.setString(2, city);
pstmt2.setString(3, city);
pstmt2.setString(4, city);
pstmt2.addBatch();
if (i != 0 && i % 5000 == 0) {
System.out.println(i);
pstmt1.executeBatch();
pstmt2.executeBatch();
conn.commit();
}
if (!parser.hasMore()) {
break;
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
continue;
}
}
pstmt1.executeBatch();
pstmt2.executeBatch();
conn.commit();
} finally {
parser.close();
if (pstmt1 != null) {
pstmt1.close();
}
if (pstmt2 != null) {
pstmt2.close();
}
if (conn != null) {
conn.close();
}
}
} catch (NumberFormatException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
我要是楼主,就一个线程都文件,3~5个线程(没个线程对应一个数据库链接)向数据库中插入数据。
因为,我觉得,录入速度慢的瓶颈,应该是insert语句执行的效率较低造成的。
当然,如果,多线程读取文件并同时录入读取的信息,这也是一个好办法。
但是,我查了一下CsvFileParser类,网上的代码并没有对文件进行拆分读取、解析这种功能。
所以,貌似楼主的代码,可能是10个线程干读了10遍同样的文件,而且是从头到尾的。所以,推荐楼主编写自己的Parser类,用于针对文件的随机访问解析(即无论文件的哪个部分它都能解析)。
然后,多线程,没个线程解析文件的相应部分,就可以了。当然,我还是推荐多线程插数据。
因为是静态的connection,所以共享一个conn
-----------------------------------
因为我运行过parser
知道了读取效率
并且速度没有想象中慢
所以用10个parser来读取
所以没必要自己写parser(我也不会写)因为数据存数据库需要很长时间
所以由10个线程同时进行批量操作
也就是主要是多线程插入数据
我上面代码应该是多线程插数据吧?
对底层东西不熟
不知道哪里出了问题了
preparedstatement设成static会有问题吗?
难道我上面的错误就是因为connection是static
所以下面的preparedstatement不能执行了
我去试试
是不是静态的有什么关系啊?
你在其它地方还用到了这个connection?
然后10个线程独立进行操作
但是进行后发现2个表里面数据并不是同步的
就像A表里面有40000条数据的时候B表中只有20000条
为什么不是2个表数据不是同步的?
还有就是managerdriver.getconnection这个方式被synchronized了
然后我用conn和preparedstatement的时候都是各线程相互独立的,没冲突的吧?
*
* <p>
* 创建时间:2009-10-29 上午01:40:20
*
* @author 侯磊
* @since 1.0
*/
public class RecordBean {
private String id; private String email; private String name; private int gender; private String city; private String cellphone; private String telephone; private String salary; private String ismarried; private String education; private String industry; private String corpname; private String corptype; private String position; public String getCellphone() {
return cellphone;
} public void setCellphone(String cellphone) {
this.cellphone = cellphone;
} public String getCity() {
return city;
} public void setCity(String city) {
this.city = city;
} public String getCorpname() {
return corpname;
} public void setCorpname(String corpname) {
this.corpname = corpname;
} public String getCorptype() {
return corptype;
} public void setCorptype(String corptype) {
this.corptype = corptype;
} public String getEducation() {
return education;
} public void setEducation(String education) {
this.education = education;
} public String getEmail() {
return email;
} public void setEmail(String email) {
this.email = email;
} public int getGender() {
return gender;
} public void setGender(int gender) {
this.gender = gender;
} public String getId() {
return id;
} public void setId(String id) {
this.id = id;
} public String getIndustry() {
return industry;
} public void setIndustry(String industry) {
this.industry = industry;
} public String getIsmarried() {
return ismarried;
} public void setIsmarried(String ismarried) {
this.ismarried = ismarried;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public String getPosition() {
return position;
} public void setPosition(String position) {
this.position = position;
} public String getSalary() {
return salary;
} public void setSalary(String salary) {
this.salary = salary;
} public String getTelephone() {
return telephone;
} public void setTelephone(String telephone) {
this.telephone = telephone;
}}读取文件内容,生成bean对象,数据库操作使用线程池来操作package houlei.csdn.mutiThread.insert2DB;import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.concurrent.Executor;/**
* 用于读取文件。
* <p>
* 创建时间:2009-10-29 上午01:44:24
* @author 侯磊
* @since 1.0
*/
public class FileReader implements Runnable {
private String fileName = "";
private Executor pool;
public FileReader(String fileName, Executor pool) {
super();
this.fileName = fileName;
this.pool = pool;
} public void run() {
CsvFileParser parser = null;
if(pool==null || fileName==null)return;
try {
parser = new CsvFileParser(new FileInputStream(fileName));
while(parser.hasMore()){
RecordBean bean = new RecordBean();
String id = UUIDHex.getUUIDHex();
String email = parser.getByFieldName("email");
String name = parser.getByFieldName("name");
String gender = parser.getByFieldName("gender");
int g = 0;
if (null != gender) {
g = Integer.parseInt(gender);
}
String city = parser.getByFieldName("city");
String cellphone = parser.getByFieldName("cellphone");
String telephone = parser.getByFieldName("telephone");
String salary = parser.getByFieldName("salary");
String ismarried = parser.getByFieldName("ismarried");
String education = parser.getByFieldName("education");
String industry = parser.getByFieldName("industry");
String corpname = parser.getByFieldName("corpname");
String corptype = parser.getByFieldName("corptype");
String position = parser.getByFieldName("position");
bean.setId(id);
bean.setEmail(email);
bean.setName(name);
bean.setGender(g);
bean.setCity(city);
bean.setCellphone(cellphone);
bean.setTelephone(telephone);
bean.setSalary(salary);
bean.setIsmarried(ismarried);
bean.setEducation(education);
bean.setIndustry(industry);
bean.setCorpname(corpname);
bean.setCorptype(corptype);
bean.setPosition(position);
pool.execute(new TableCustomerRecorder(bean));
pool.execute(new TableCustomerInfoRecorder(bean));
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally{
try {
if(parser!=null)parser.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}}
import java.sql.SQLException;/**
* 保证一条线程最多只拥有一个数据库连接
* <p>
* 创建时间:2009-10-29 上午02:27:36
* @author 侯磊
* @since 1.0
*/
public class ConnectionFactory {
private static final ThreadLocal<Connection> threadLocal = new ThreadLocal<Connection>();
public static Connection getConnection() throws SQLException {
Connection session = (Connection) threadLocal.get(); if (session == null || session.isClosed()) {
session = createConnection();
threadLocal.set(session);
} return session;
}
public static void closeConnection() throws SQLException{
Connection session = (Connection) threadLocal.get();
threadLocal.set(null);
if (session != null) {
session.close();
}
}
/**
* 要根据情况来填写
*/
private static Connection createConnection() {
return null;
}
}
表一插入的操作package houlei.csdn.mutiThread.insert2DB;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
*
* <p>
* 创建时间:2009-10-29 上午02:14:21
* @author 侯磊
* @since 1.0
*/
public class TableCustomerRecorder implements Runnable {
private RecordBean bean; public TableCustomerRecorder(RecordBean bean) {
super();
this.bean = bean;
} public void run() {
PreparedStatement pstm = null;
try {
Connection conn = ConnectionFactory.getConnection();
pstm = conn.prepareStatement("insert into CUSTOMER values(?,?,?,?,?,?)");
pstm.setString(1, bean.getId());
pstm.setString(2, bean.getEmail());
pstm.setString(3, bean.getEmail());
pstm.execute();
} catch (SQLException e) {
e.printStackTrace();
} finally{
try {
if(pstm!=null) pstm.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}表二的插入操作package houlei.csdn.mutiThread.insert2DB;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;/**
*
* <p>
* 创建时间:2009-10-29 上午02:35:35
* @author 侯磊
* @since 1.0
*/
public class TableCustomerInfoRecorder implements Runnable {
private RecordBean bean; public TableCustomerInfoRecorder(RecordBean bean) {
super();
this.bean = bean;
} public void run() {
PreparedStatement pstm = null;
try {
Connection conn = ConnectionFactory.getConnection();
pstm = conn.prepareStatement("insert into CUSTOMER_INFO values(?,?,? )");
pstm.setString(1, bean.getId());
pstm.setString(2, bean.getCity());
pstm.setString(3, bean.getCity());
pstm.setString(4, bean.getCity());
pstm.execute();
} catch (SQLException e) {
e.printStackTrace();
} finally{
try {
if(pstm!=null) pstm.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}}
入口类没啥好说的,随便写写就行。public class Main { public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(10);
String fileName ="D:\\test.csv";
new FileReader(fileName,pool).run();
pool.shutdown();
}}
就是一条记录一个insert
并且我这数量是几十万的
这样的话
线程岂不是多死了?
当然,楼主如果觉得还是batch效率高,也可以改成批量更新这种形式。
多添两个计数器变量,注意线程安全。实现起来并不难。
楼主貌似没有理解我代码所表达意思,楼主不妨run一下我的代码试试,估计效率会更高。
线程数量嘛,一共创建了11个线程,算上JRE的线程数,总体不会超过30个的。
一个线程读取文件数据,十个线程进行数据库操作(其实有4~6个,就够了),应该没啥太大问题。
程序运行起来,你可以看看数据库的连接数量,应该和线程池的线程数量相同
(每个线程分配一个数据库连接)。