共有10个线程读取csv文件
线程1读取1-20000
    2读取20001-40000

共有10个线程
10个parser
应注意哪些东西
下面这个没用到多线程的隔离等代码
高手给我点播下巴 
public class DealThread implements Runnable { private static Connection conn = ConnectionFactory.getConnection();
private int start;
private int end;
private CsvFileParser parser; // private
public DealThread(int s, int e, CsvFileParser p) {
this.start = s;
this.end = e;
this.parser = p;
} public void run() {
System.out.println(" start: " + start + " end: " + end);
PreparedStatement pstmt1 = null;
PreparedStatement pstmt2 = null;
PreparedStatement pstmt3 = null;
try {
try {
pstmt1 = conn
.prepareStatement("insert into CUSTOMER values(?,?,?,?,?,?)");
pstmt2 = conn
.prepareStatement("insert into CUSTOMER_INFO values(?,?,? )");
 
for (int i = start; i <= end; i++) {
try {
String id = UuidHex.getUuidHex();
String email = parser.getByFieldName("email");
String name = parser.getByFieldName("name");
String gender = parser.getByFieldName("gender");
int g = 0;
if (null != gender) {
g = Integer.parseInt(gender);
}
String city = parser.getByFieldName("city");
String cellphone = parser.getByFieldName("cellphone");
String telephone = parser.getByFieldName("telephone");
String salary = parser.getByFieldName("salary");
String ismarried = parser.getByFieldName("ismarried");
 
String education = parser.getByFieldName("education");
String industry = parser.getByFieldName("industry");
String corpname = parser.getByFieldName("corpname");
String corptype = parser.getByFieldName("corptype");
String position = parser.getByFieldName("position");
pstmt1.setString(1, id);
pstmt1.setString(2, email);
pstmt1.setString(3, email);
 
pstmt1.addBatch(); String customerInfoId = UuidHex.getUuidHex();
pstmt2.setString(1, customerInfoId);
pstmt2.setString(2, city);
pstmt2.setString(3, city);
pstmt2.setString(4, city);
pstmt2.addBatch();  
if (i != 0 && i % 5000 == 0) {
System.out.println(i);
pstmt1.executeBatch();
pstmt2.executeBatch();
 
conn.commit();
}
if (!parser.hasMore()) {
break;
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
continue;
}
}
pstmt1.executeBatch();
pstmt2.executeBatch();
 
conn.commit();
} finally {
parser.close();
if (pstmt1 != null) {
pstmt1.close();
}
if (pstmt2 != null) {
pstmt2.close();
}
if (conn != null) {
conn.close();
}
}
} catch (NumberFormatException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

解决方案 »

  1.   


    我要是楼主,就一个线程都文件,3~5个线程(没个线程对应一个数据库链接)向数据库中插入数据。
    因为,我觉得,录入速度慢的瓶颈,应该是insert语句执行的效率较低造成的。
    当然,如果,多线程读取文件并同时录入读取的信息,这也是一个好办法。
    但是,我查了一下CsvFileParser类,网上的代码并没有对文件进行拆分读取、解析这种功能。
    所以,貌似楼主的代码,可能是10个线程干读了10遍同样的文件,而且是从头到尾的。所以,推荐楼主编写自己的Parser类,用于针对文件的随机访问解析(即无论文件的哪个部分它都能解析)。
    然后,多线程,没个线程解析文件的相应部分,就可以了。当然,我还是推荐多线程插数据。
      

  2.   


    因为是静态的connection,所以共享一个conn
    -----------------------------------
    因为我运行过parser
    知道了读取效率
    并且速度没有想象中慢
    所以用10个parser来读取
    所以没必要自己写parser(我也不会写)因为数据存数据库需要很长时间
    所以由10个线程同时进行批量操作
    也就是主要是多线程插入数据
    我上面代码应该是多线程插数据吧?
    对底层东西不熟
    不知道哪里出了问题了
    preparedstatement设成static会有问题吗?
      

  3.   

    不好意思,看错了
    难道我上面的错误就是因为connection是static
    所以下面的preparedstatement不能执行了
    我去试试
      

  4.   


    是不是静态的有什么关系啊?
    你在其它地方还用到了这个connection?
      

  5.   

    我每个线程都new一个新的connection出来
    然后10个线程独立进行操作
    但是进行后发现2个表里面数据并不是同步的
    就像A表里面有40000条数据的时候B表中只有20000条
    为什么不是2个表数据不是同步的?
    还有就是managerdriver.getconnection这个方式被synchronized了
    然后我用conn和preparedstatement的时候都是各线程相互独立的,没冲突的吧?
      

  6.   

    文件的记录,应该有对应的封装类package houlei.csdn.mutiThread.insert2DB;/**
     * 
     * <p>
     * 创建时间:2009-10-29 上午01:40:20
     * 
     * @author 侯磊
     * @since 1.0
     */
    public class RecordBean {

    private String id; private String email; private String name; private int gender; private String city; private String cellphone; private String telephone; private String salary; private String ismarried; private String education; private String industry; private String corpname; private String corptype; private String position; public String getCellphone() {
    return cellphone;
    } public void setCellphone(String cellphone) {
    this.cellphone = cellphone;
    } public String getCity() {
    return city;
    } public void setCity(String city) {
    this.city = city;
    } public String getCorpname() {
    return corpname;
    } public void setCorpname(String corpname) {
    this.corpname = corpname;
    } public String getCorptype() {
    return corptype;
    } public void setCorptype(String corptype) {
    this.corptype = corptype;
    } public String getEducation() {
    return education;
    } public void setEducation(String education) {
    this.education = education;
    } public String getEmail() {
    return email;
    } public void setEmail(String email) {
    this.email = email;
    } public int getGender() {
    return gender;
    } public void setGender(int gender) {
    this.gender = gender;
    } public String getId() {
    return id;
    } public void setId(String id) {
    this.id = id;
    } public String getIndustry() {
    return industry;
    } public void setIndustry(String industry) {
    this.industry = industry;
    } public String getIsmarried() {
    return ismarried;
    } public void setIsmarried(String ismarried) {
    this.ismarried = ismarried;
    } public String getName() {
    return name;
    } public void setName(String name) {
    this.name = name;
    } public String getPosition() {
    return position;
    } public void setPosition(String position) {
    this.position = position;
    } public String getSalary() {
    return salary;
    } public void setSalary(String salary) {
    this.salary = salary;
    } public String getTelephone() {
    return telephone;
    } public void setTelephone(String telephone) {
    this.telephone = telephone;
    }}读取文件内容,生成bean对象,数据库操作使用线程池来操作package houlei.csdn.mutiThread.insert2DB;import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.util.concurrent.Executor;/**
     * 用于读取文件。
     * <p>
     * 创建时间:2009-10-29 上午01:44:24
     * @author 侯磊
     * @since 1.0
     */
    public class FileReader implements Runnable {
    private String fileName = "";
    private Executor pool;

    public FileReader(String fileName, Executor pool) {
    super();
    this.fileName = fileName;
    this.pool = pool;
    } public void run() {
    CsvFileParser parser = null;
    if(pool==null || fileName==null)return;
    try {
    parser = new CsvFileParser(new FileInputStream(fileName));
    while(parser.hasMore()){
    RecordBean bean = new RecordBean();
    String id = UUIDHex.getUUIDHex();
                    String email = parser.getByFieldName("email");
                    String name = parser.getByFieldName("name");
                    String gender = parser.getByFieldName("gender");
                    int g = 0;
                    if (null != gender) {
                        g = Integer.parseInt(gender);
                    }
                    String city = parser.getByFieldName("city");
                    String cellphone = parser.getByFieldName("cellphone");
                    String telephone = parser.getByFieldName("telephone");
                    String salary = parser.getByFieldName("salary");
                    String ismarried = parser.getByFieldName("ismarried");
                    String education = parser.getByFieldName("education");
                    String industry = parser.getByFieldName("industry");
                    String corpname = parser.getByFieldName("corpname");
                    String corptype = parser.getByFieldName("corptype");
                    String position = parser.getByFieldName("position");
                    bean.setId(id);
                    bean.setEmail(email);
                    bean.setName(name);
                    bean.setGender(g);
                    bean.setCity(city);
                    bean.setCellphone(cellphone);
                    bean.setTelephone(telephone);
                    bean.setSalary(salary);
                    bean.setIsmarried(ismarried);
                    bean.setEducation(education);
                    bean.setIndustry(industry);
                    bean.setCorpname(corpname);
                    bean.setCorptype(corptype);
                    bean.setPosition(position);
                    pool.execute(new TableCustomerRecorder(bean));
                    pool.execute(new TableCustomerInfoRecorder(bean));
    }
    } catch (FileNotFoundException e) {
    e.printStackTrace();
    } catch (IOException e) {
    e.printStackTrace();
    } finally{
    try {
    if(parser!=null)parser.close();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }}
      

  7.   

    数据库连接的工厂类,由于使用线程池,连接个数应该有所控制才是。package houlei.csdn.mutiThread.insert2DB;import java.sql.Connection;
    import java.sql.SQLException;/**
     * 保证一条线程最多只拥有一个数据库连接
     * <p>
     * 创建时间:2009-10-29 上午02:27:36
     * @author 侯磊
     * @since 1.0
     */
    public class ConnectionFactory {
    private static final ThreadLocal<Connection> threadLocal = new ThreadLocal<Connection>();
    public static Connection getConnection() throws SQLException {
    Connection session = (Connection) threadLocal.get(); if (session == null || session.isClosed()) {
    session = createConnection();
    threadLocal.set(session);
    }        return session;
        }

    public static void closeConnection() throws SQLException{
    Connection session = (Connection) threadLocal.get();
            threadLocal.set(null);
            if (session != null) {
                session.close();
            }
        }

    /**
     * 要根据情况来填写
     */
    private static Connection createConnection() {
    return null;
    }
    }
    表一插入的操作package houlei.csdn.mutiThread.insert2DB;import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    /**
     *
     * <p>
     * 创建时间:2009-10-29 上午02:14:21
     * @author 侯磊
     * @since 1.0
     */
    public class TableCustomerRecorder implements Runnable {
    private RecordBean bean; public TableCustomerRecorder(RecordBean bean) {
    super();
    this.bean = bean;
    } public void run() {
    PreparedStatement pstm = null;
    try {
    Connection conn = ConnectionFactory.getConnection();
    pstm = conn.prepareStatement("insert into CUSTOMER values(?,?,?,?,?,?)");
    pstm.setString(1, bean.getId());
                pstm.setString(2, bean.getEmail());
                pstm.setString(3, bean.getEmail());
                pstm.execute();
    } catch (SQLException e) {
    e.printStackTrace();
    } finally{
    try {
    if(pstm!=null) pstm.close();
    } catch (SQLException e) {
    e.printStackTrace();
    }
    }
    }

    }表二的插入操作package houlei.csdn.mutiThread.insert2DB;import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;/**
     *
     * <p>
     * 创建时间:2009-10-29 上午02:35:35
     * @author 侯磊
     * @since 1.0
     */
    public class TableCustomerInfoRecorder implements Runnable {
    private RecordBean bean; public TableCustomerInfoRecorder(RecordBean bean) {
    super();
    this.bean = bean;
    } public void run() {
    PreparedStatement pstm = null;
    try {
    Connection conn = ConnectionFactory.getConnection();
    pstm = conn.prepareStatement("insert into CUSTOMER_INFO values(?,?,? )");
    pstm.setString(1, bean.getId());
                pstm.setString(2, bean.getCity());
                pstm.setString(3, bean.getCity());
                pstm.setString(4, bean.getCity());
                pstm.execute();
    } catch (SQLException e) {
    e.printStackTrace();
    } finally{
    try {
    if(pstm!=null) pstm.close();
    } catch (SQLException e) {
    e.printStackTrace();
    }
    }
    }}
    入口类没啥好说的,随便写写就行。public class Main { public static void main(String[] args) {
    ExecutorService pool = Executors.newFixedThreadPool(10);
    String fileName ="D:\\test.csv";
    new FileReader(fileName,pool).run();
    pool.shutdown();
    }}
      

  8.   

    你那个jdbc没批量吧?
    就是一条记录一个insert
    并且我这数量是几十万的
    这样的话
    线程岂不是多死了?
      

  9.   

    JDBC没有使用batch进行批量更新,但是,PrepareStatement的效率也不会太差。
    当然,楼主如果觉得还是batch效率高,也可以改成批量更新这种形式。
    多添两个计数器变量,注意线程安全。实现起来并不难。
    楼主貌似没有理解我代码所表达意思,楼主不妨run一下我的代码试试,估计效率会更高。
    线程数量嘛,一共创建了11个线程,算上JRE的线程数,总体不会超过30个的。
    一个线程读取文件数据,十个线程进行数据库操作(其实有4~6个,就够了),应该没啥太大问题。
    程序运行起来,你可以看看数据库的连接数量,应该和线程池的线程数量相同
    (每个线程分配一个数据库连接)。