您的位置:首页 > 其它

Executors多线程的简单使用

2015-01-13 15:22 183 查看
package com.spidersite;

import java.sql.Connection;

import java.sql.DriverManager;

import java.util.ArrayList;

import java.util.Date;

import java.util.HashSet;

import java.util.Hashtable;

import java.util.List;

import java.util.Map;

import java.util.Set;

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

import org.apache.log4j.Logger;

import com.JdbcUtil;

import com.LoadConfig;

import com.Util;

import com.excel.ExportExcel;

public class GroupSite {

    

private Connection con = null;

    private static Logger log4j = Logger.getLogger(GroupSite.class);

    private Set<Integer> set = new HashSet<Integer>();

    private Hashtable<Integer, Object> hashTable = new Hashtable<Integer, Object>();

    private ScheduledExecutorService sc = Executors.newScheduledThreadPool(50);

      List<Future<String>> resultList = new ArrayList<Future<String>>();  

//    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

//    private final Lock readLock = lock.readLock();

//    private final Lock writeLock = lock.writeLock();

    private final Lock lock = new ReentrantLock();

    int conFail=0;

    int readLockCount=0;

    int writeLockCount=0;

    public  Connection getCon(){

//        int count=0;

//        Properties p = Util.getPro("sql.properties");

//        DbParams db = new DbParams(p.getProperty("driverClassName"), p.getProperty("url"), p.getProperty("username"), p.getProperty("password"));

//        try{

//            return JdbcUtil.getConnection(db);

//        }catch (Exception e) {

//            conFail++;

//            if(conFail<12){

//                return getCon();

//            }

//        }

        Connection conn = null;

        try {

//            System.out.println("获得conn---");

            conn = DriverManager.getConnection("proxool.connPool");

//            System.out.println("获得conn+++++++++");

            return conn;

        } catch (Exception e) {

            System.out.println("失败");

            log4j.error(e.getMessage());

        }

        if(conn==null){

            System.out.println("出错了 重拿-----------------------------------------------------------");

            return getCon();

        }

//        

        return null;

    }

    

    

    private int getSiteCount( String table){

        int count = 0;

        String sql=" select count(*) as count from "+table;

        con = getCon();

        List<Map<String,Object>> list = JdbcUtil.execSql(sql, getCon());

        if(list != null && list.size()>0){

            Map<String,Object> map = list.get(0);

            count = Integer.valueOf(map.get("count").toString());

        }

        return count;

    }

    

    public List<Map<String,Object>> getSite(){

        String sql=" select siteid,sitename,SITEURL as url from sysspidersite order by siteid desc";

        con =getCon();

        return JdbcUtil.execSql(sql, con);

    }

    

    

    private void group(int count){

        String sql="";

        int code=0;

        Set<Integer> codeSet = new HashSet<Integer>();

        List<Map<String,Object>> list = null;

        for (int i=0;i<=count; i=i+1000){

            con = getCon();

            sql = " select SITENAME as sitename,SITEID as siteid from sysspidersite order by siteid limit "+i+",1000";

            list = JdbcUtil.execSql(sql, con);

            if(list != null && list.size()>0){

                for (Map<String, Object> map : list) {

                    if(!Util.isEmpty(map.get("sitename"))){

                        String siteid = map.get("siteid").toString();

                        String siteName = map.get("sitename").toString();

                        String groupName = siteName;

                        if(siteName.trim().contains("-")){

                            groupName = siteName.split("-")[0];

                        }else if(siteName.trim().contains(" ")){

                            groupName = siteName.trim().split(" ")[0];

                        }else if(siteName.trim().contains("——")){

                            groupName = siteName.trim().split("——")[0];

                        }

                        code = groupName.trim().hashCode();

                        if(!codeSet.contains(code)){

                            codeSet.add(code);

                            System.out.println(groupName);

                            insertGroup(groupName,siteid);

                        }else{

                            update(groupName, siteid);

                        }

                    }

                }

            }

        }

    }

    

    public void insertGroup(String name,String siteid){

        String sql=" insert into sitegroup (`group_name`,`group_count`,`group_sites`)value(?,?,?)";

//        con = getCon();

        JdbcUtil.execUpdate(sql, new String[]{name,"1",siteid}, getCon());

    }

    

    private void update(String name,String siteid){

        String sql=" update sitegroup set group_sites=CONCAT(group_sites,?) , group_count=group_count+1 where group_name=?";

//        con = getCon();

        JdbcUtil.update(sql, new String[]{","+siteid,name}, getCon());

    }

    

    public void init(){

        int count = getSiteCount("sysspidersite");

        group(count);

    }

    

    public void inits(){

        new ExportExcel().exportExcel(getSite());

    }

    

    private void thiredGroup(final int start,final String type,final int pageCount){

        

        

        Runnable runs = new Runnable() {

            

            @Override

            public void run()   {

                Long time = new Date().getTime();

                        String sql="select SITENAME as sitename,SITEID as siteid from sysspidersite order by siteid limit "+start+","+pageCount;

                        List<Map<String,Object>> list = JdbcUtil.execSql(sql, getCon());

                        if(list != null && list.size()>0){

                            for (Map<String, Object> map : list) {

                                if(!Util.isEmpty(map.get("sitename"))){

                                    String siteid = map.get("siteid").toString();

                                    String siteName = map.get("sitename").toString();

                                    String groupName = siteName;

                                    if(siteName.trim().contains("-")){

                                        groupName = siteName.split("-")[0];

                                    }else if(siteName.trim().contains(" ")){

                                        groupName = siteName.trim().split(" ")[0];

                                    }else if(siteName.trim().contains("——")){

                                        groupName = siteName.trim().split("——")[0];

                                    }

                                        groupName = groupName.trim();

                                        int code =0;

                                        code = groupName.hashCode();

//                                        lock.lock();

//                                        readLockCount++;

                                        try{

                                        boolean trunth=false;

                                        trunth = hashTable.containsKey(co
ca9e
de);

                                        if(!trunth){

                                            

                                            writeLockCount++;

                                            hashTable.put(code, 1);

                                            System.out.println("type:"+type+"insert:"+groupName+" "+siteid);

                                            insertGroup(groupName,siteid);

                                        }else{

                                            System.out.println("type:"+type+"update:"+groupName+" "+siteid);

                                            update(groupName, siteid);

                                        }

                                        }finally{

                                            readLockCount--;

//                                            lock.unlock();

                                        }

                                        System.err.println("lock:"+readLockCount);

                                }

                            }

                        }

                System.err.println("该线程用时:"+(new Date().getTime()-time));    

                sc.shutdown();

            }

        };

            

        sc.execute(runs);

//        sc.shutdown();

    }

    

    

    public void runTask(){

        int counts = getSiteCount("sysspidersite");

        int t=1000;

           for(int i=0;i<counts;i=i+t){

               thiredGroup(i, "thired"+(i/t), t);

           }

    }

    

    public void callTask(){

        

        int counts = getSiteCount("sysspidersite");

        int t=1000;

           for(int i=0;i<counts;i=i+t){

//             System.out.println(i);

            Future<String> ft =    sc.submit(this.new ThirdTask(i, t));

                resultList.add(ft);

           }

           for (Future<String> fs : resultList) {

            try {

                    System.out.println(fs.get());     //打印各个线程(任务)执行的结果

            } catch (InterruptedException e) {

                    e.printStackTrace();

            } catch (ExecutionException e) {

                    e.printStackTrace();

            } finally {

                    //启动一次顺序关闭,执行以前提交的任务,但不接受新任务。如果已经关闭,则调用没有其他作用。

                    sc.shutdown();

            }

    }

        

    }

    

    

    class ThirdTask implements Callable<String>{

        private int start;

        private int limit;

        public ThirdTask(int start,int limit){

            this.start=start;

            this.limit=limit;

        }

        @Override

        public String call() throws Exception {

            Long time = new Date().getTime();

                    String sql="select SITENAME as sitename,SITEID as siteid from sysspidersite order by siteid limit "+start+","+limit;

                    List<Map<String,Object>> list = JdbcUtil.execSql(sql, getCon());

                    if(list != null && list.size()>0){

                        for (Map<String, Object> map : list) {

                            if(!Util.isEmpty(map.get("sitename"))){

                                String siteid = map.get("siteid").toString();

                                String siteName = map.get("sitename").toString();

                                String groupName = siteName;

                                if(siteName.trim().contains("-")){

                                    groupName = siteName.split("-")[0];

                                }else if(siteName.trim().contains(" ")){

                                    groupName = siteName.trim().split(" ")[0];

                                }else if(siteName.trim().contains("——")){

                                    groupName = siteName.trim().split("——")[0];

                                }

                                    groupName = groupName.trim();

                                    int code =0;

                                    code = groupName.hashCode();

//                                    lock.lock();

//                                    readLockCount++;

                                    try{

                                    boolean trunth=false;

                                    trunth = hashTable.containsKey(code);

                                    if(!trunth){

                                        

                                        writeLockCount++;

                                        hashTable.put(code, 1);

                                        System.out.println("type:third"+(start/limit)+"insert:"+groupName+" "+siteid);

                                        insertGroup(groupName,siteid);

                                    }else{

                                        System.out.println("type:third"+(start/limit)+"update:"+groupName+" "+siteid);

                                        update(groupName, siteid);

                                    }

                                    }finally{

                                        readLockCount--;

//                                        lock.unlock();

                                    }

//                                    System.err.println("lock:"+readLockCount);

                            }

                        }

                    }

            

            return "测试线程:"+start/limit+" "+Thread.currentThread().getName()+" 该线程用时:"+(new Date().getTime()-time);

        }

        

    }

    

    public static void main(String[] args) {

         LoadConfig.init();

         new GroupSite().runTask();

    }
}

//注意:多线程下千万不要只用一个Connection成员变量 最好在线程代码中实例(今天吃了大亏 一直随机报错找了半天)

另外使用Callable +内部类特别好用有返回值的线程就是好 就是因为用了它才找到错误的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: