一、线程
1、什么是线程
线程 (thread) 是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际 运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线 程并行执行不同的任务。
2、如何创建线程
2.1、java 中创建线程
/** * 继承thread类,重写run方法 */classmythreadextendsthread{ @override publicvoidrun(){ system.out.println(mythread...+thread.currentthread().getname());}}/** * 实现runnable接口,实现run方法 */classmyrunnableimplementsrunnable{ @override publicvoidrun(){ system.out.println(myrunnable...+thread.currentthread().getname());}}/** * 实现callable接口,指定返回类型,实现call方法 */classmycallableimplementscallable { @override publicstringcall()throwsexception{ returnmycallable...+thread.currentthread().getname();}}
2.2、测试一下
publicstaticvoidmain(string[] args)throwsexception{ mythread thread =newmythread(); thread.run();//mythread...main thread.start();//mythread...thread-0 myrunnable myrunnable =newmyrunnable(); thread thread1 =newthread(myrunnable); myrunnable.run();//myrunnable...main thread1.start();//myrunnable...thread-1 mycallable mycallable =newmycallable(); futuretask futuretask =newfuturetask(mycallable); thread thread2 =newthread(futuretask); thread2.start(); system.out.println(mycallable.call());//mycallable...main system.out.println(futuretask.get());//mycallable...thread-2}2.3、问题
既然我们创建了线程,那为何我们直接调用方法和我们调用 start () 方法的结果不同?new thread () 是否真实创建了线程?
2.4、问题分析
我们直接调用方法,可以看到是执行的主线程,而调用 start () 方法就是开启了新线程,那说明 new thread () 并没有创建线程,而是在 start () 中创建了线程。 那我们看下 thread 类 start () 方法:
classthreadimplementsrunnable{//thread类实现了runnalbe接口,实现了run()方法 privaterunnable target; publicsynchronizedvoidstart(){ ... boolean started =false; try{ start0();//可以看到,start()方法真实的调用时start0()方法 started =true; }finally{ ... } } privatenativevoidstart0();//start0()是一个native方法,由jvm调用底层操作系统,开启一个线程,由操作系统过统一调度 @override publicvoidrun(){ if(target !=null){ target.run();//操作系统在执行新开启的线程时,回调runnable接口的run()方法,执行我们预设的线程任务 } } }2.5、总结
1. java 不能直接创建线程执行任务,而是通过创建 thread 对象调用操作系统开启线程,在由操作系 统回调 runnable 接口的 run () 方法执行任务;
2. 实现 runnable 的方式,将线程实际要执行的回调任务单独提出来了,实现线程的启动与回调任务 解耦;
3. 实现 callable 的方式,通过 future 模式不但将线程的启动与回调任务解耦,而且可以在执行完成后 获取到执行的结果;
二、多线程
1、什么是多线程
多线程 (multithreading),是指从软件或者硬件上实现多个线程并发执行的技术。同一个线程只 能处理完一个任务在处理下一个任务,有时我们需要多个任务同时处理,这时,我们就需要创建多 个线程来同时处理任务。
2、多线程有什么好处
2.1、串行处理
publicstaticvoidmain(string[] args)throwsexception{ system.out.println(start...); long start =system.currenttimemillis(); for(int i =0; i <5; i++){ thread.sleep(2000);//每个任务执行2秒 system.out.println(task done...);//处理执行结果 } long end =system.currenttimemillis(); system.out.println(end...,time = +(end - start));}//执行结果start...task done...task done...task done...task done...task done... end...,time =10043
2.2、并行处理
publicstaticvoidmain(string[] args)throwsexception{ system.out.println(start...); long start =system.currenttimemillis(); list list =newarraylist(); for(int i =0; i { try{ system.out.println(future.get());//处理执行结果 } catch (exception e) { } }); long end =system.currenttimemillis(); system.out.println(end...,time = +(end - start));} //执行结果 start... task done... task done... task done... task done... task done... end...,time =20052.3、总结
1. 多线程可以把一个任务拆分为几个子任务,多个子任务可以并发执行,每一个子任务就是一个线程。
2. 多线程是为了同步完成多项任务,不是为了提高运行效率,而是为了提高资源使用效率来提高系统 的效率。
2.4、多线程的问题
上面示例中我们可以看到,如果每来一个任务,我们就创建一个线程,有很多任务的情况下,我们 会创建大量的线程,可能会导致系统资源的耗尽。同时,我们知道线程的执行是需要抢占 cpu 资源 的,那如果有太多的线程,就会导致大量时间用在线程切换的开销上。 再有,每来一个任务都需要创建一个线程,而创建一个线程需要调用操作系统底层方法,开销较 大,而线程执行完成后就被回收了。在需要大量线程的时候,创建线程的时间就花费不少了。
三、线程池
1、如何设计一个线程池
由于多线程的开发存在上述的一些问题,那我们是否可以设计一个东西来避免这些问题呢?当然可以!线程池就是为了解决这些问题而生的。那我们该如何设计一个线程池来解决这些问题呢?或者说,一个线程池该具备什么样的功能?
1.1、线程池基本功能
1. 多线程会创建大量的线程耗尽资源,那线程池应该对线程数量有所限制,可以保证不会耗尽系统资 源; 2. 每次创建新的线程会增加创建时的开销,那线程池应该减少线程的创建,尽量复用已创建好的线 程;
1.2、线程池面临问题
1. 我们知道线程在执行完自己的任务后就会被回收,那我们如何复用线程? 2. 我们指定了线程的最大数量,当任务数超出线程数时,我们该如何处理?
1.3、创新源于生活
先假设一个场景:假设我们是一个物流公司的管理人员,要配送的货物就是我们的任务,货车就是 我们配送工具,我们当然不能有多少货物就准备多少货车。那当顾客源源不断的将货物交给我们配 送,我们该如何管理才能让公司经营的最好呢? 1. 最开始货物来的时候,我们还没有货车,每批要运输的货物我们都要购买一辆车来运输; 2. 当货车运输完成后,暂时还没有下一批货物到达,那货车就在仓库停着,等有货物来了立马就可以 运输; 3. 当我们有了一定数量的车后,我们认为已经够用了,那后面就不再买车了,这时要是由新的货物来 了,我们就会让货物先放仓库,等有车回来在配送; 4. 当 618 大促来袭,要配送的货物太多,车都在路上,仓库也都放满了,那怎么办呢?我们就选择临 时租一些车来帮忙配送,提高配送的效率; 5. 但是货物还是太多,我们增加了临时的货车,依旧配送不过来,那这时我们就没办法了,只能让发 货的客户排队等候或者干脆不接受了; 6. 大促圆满完成后,累计的货物已经配送完成了,为了降低成本,我们就将临时租的车都还了;
1.4、技术源于创新
基于上述场景,物流公司就是我们的线程池、货物就是我们的线程任务、货车就是我们的线程。我 们如何设计公司的管理货车的流程,就应该如何设计线程池管理线程的流程。 1. 当任务进来我们还没有线程时,我们就该创建线程执行任务; 2. 当线程任务执行完成后,线程不释放,等着下一个任务进来后接着执行; 3. 当创建的线程数量达到一定量后,新来的任务我们存起来等待空闲线程执行,这就要求线程池有个 存任务的容器; 4. 当容器存满后,我们需要增加一些临时的线程来提高处理效率; 5. 当增加临时线程后依旧处理不了的任务,那就应该将此任务拒绝; 6. 当所有任务执行完成后,就应该将临时的线程释放掉,以免增加不必要的开销;
2、线程池具体分析
上文中,我们讲了该如何设计一个线程池,下面我们看看大神是如何设计的;
2.1、 java 中的线程池是如何设计的
2.1.1、 线程池设计
看下线程池中的属性,了解线程池的设计。
publicclassthreadpoolexecutorextendsabstractexecutorservice{ //线程池的打包控制状态,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量 privatefinalatomicinteger ctl =newatomicinteger(ctlof(running,0)); //值为29,用来表示偏移量 privatestaticfinalint count_bits =integer.size -3; //线程池的最大容量 privatestaticfinalint capacity =(1<< count_bits)-1; //线程池的运行状态,总共有5个状态,用高3位来表示 privatestaticfinalint running =-1<< count_bits;//接受新任务并处理阻塞队列中的任务 privatestaticfinalint shutdown =0<< count_bits;//不接受新任务但会处理阻塞队列中的任务 privatestaticfinalint stop =1<< count_bits;//不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务 privatestaticfinalint tidying =2<< count_bits;//所有任务都已终止, 工作线程数量为0,即将要执行terminated()钩子方法 privatestaticfinalint terminated =3<< count_bits;// terminated()方法已经执行结束 //任务缓存队列,用来存放等待执行的任务 privatefinalblockingqueue workqueue; //全局锁,对线程池状态等属性修改时需要使用这个锁 privatefinalreentrantlock mainlock =newreentrantlock(); //线程池中工作线程的集合,访问和修改需要持有全局锁 privatefinalhashset workers =newhashset(); // 终止条件 privatefinalcondition termination = mainlock.newcondition(); //线程池中曾经出现过的最大线程数 privateint largestpoolsize; //已完成任务的数量 privatelong completedtaskcount; //线程工厂 privatevolatilethreadfactory threadfactory; //任务拒绝策略 privatevolatilerejectedexecutionhandler handler; //线程存活时间 privatevolatilelong keepalivetime; //是否允许核心线程超时 privatevolatileboolean allowcorethreadtimeout; //核心池大小,若allowcorethreadtimeout被设置,核心线程全部空闲超时被回收的情况下会为0 privatevolatileint corepoolsize; //最大池大小,不得超过capacity privatevolatileint maximumpoolsize; //默认的任务拒绝策略 privatestaticfinalrejectedexecutionhandler defaulthandler =newabortpolicy(); //运行权限相关 privatestaticfinalruntimepermission shutdownperm = newruntimepermission(modifythread); ... }小结一下:以上线程池的设计可以看出,线程池的功能还是很完善的。 1. 提供了线程创建、数量及存活时间等的管理; 2. 提供了线程池状态流转的管理; 3. 提供了任务缓存的各种容器; 4. 提供了多余任务的处理机制; 5. 提供了简单的统计功能;
2.1.2、线程池构造函数
//构造函数 publicthreadpoolexecutor(int corepoolsize,//核心线程数 int maximumpoolsize,//最大允许线程数 long keepalivetime,//线程存活时间 timeunit unit,//存活时间单位 blockingqueue workqueue,//任务缓存队列 threadfactory threadfactory,//线程工厂 rejectedexecutionhandler handler){//拒绝策略 if(corepoolsize <0|| maximumpoolsize <=0|| maximumpoolsize < corepoolsize || keepalivetime <0) thrownewillegalargumentexception(); if(workqueue ==null|| threadfactory ==null|| handler ==null) thrownewnullpointerexception(); this.corepoolsize = corepoolsize; this.maximumpoolsize = maximumpoolsize; this.workqueue = workqueue; this.keepalivetime = unit.tonanos(keepalivetime); this.threadfactory = threadfactory; this.handler = handler;}小结一下: 1. 构造函数告诉了我们可以怎样去适用线程池,线程池的哪些特性是我们可以控制的;
2.1.3、线程池执行
2.1.3.1、提交任务方法
• public void execute(runnable command);
• future submit(runnable task);
• future submit(runnable task, t result);
• future submit(callable task);
publicfuture submit(runnable task){ if(task ==null)thrownewnullpointerexception(); runnablefuture ftask =newtaskfor(task,null); execute(ftask); return ftask;}
可以看到 submit 方法的底层调用的也是 execute 方法,所以我们这里只分析 execute 方法;
public void execute(runnable command) { if (command == null) throw new nullpointerexception(); int c = ctl.get(); //第一步:创建核心线程 if (workercountof(c) =shutdown && (rs != shutdown || firsttask != null || workqueue.isempty()) //线程池已关闭,并且无需执行缓存队列中的任务,则不创建 if(rs >= shutdown && !(rs == shutdown && firsttask ==null&& ! workqueue.isempty())) returnfalse; for(;;){ int wc =workercountof(c); if(wc >= capacity || wc >=(core ? corepoolsize : maximumpoolsize)) returnfalse; if(compareandincrementworkercount(c))//cas增加线程数 break retry; c = ctl.get();// re-read ctl if(runstateof(c)!= rs) continue retry; // else cas failed due to workercount change; retry inner loop } } //上面的流程走完,就可以真实开始创建线程了 boolean workerstarted =false; boolean workeradded =false; worker w =null; try{ w =newworker(firsttask);//这里创建了线程 finalthread t = w.thread; if(t !=null){ finalreentrantlock mainlock =this.mainlock; mainlock.lock(); try{ // recheck while holding lock. // back out on threadfactory failure or if // shut down before lock acquired. int rs =runstateof(ctl.get()); if(rs largestpoolsize) largestpoolsize = s; workeradded =true; } }finally{ mainlock.unlock(); } if(workeradded){ t.start();//添加成功,启动线程 workerstarted =true; } } }finally{ if(! workerstarted) addworkerfailed(w);//添加线程失败操作 } return workerstarted; }小结:addworker () 方法主要功能;
1. 增加线程数;
2. 创建线程 worker 实例加入线程池;
3. 加入完成开启线程;
4. 启动失败则回滚增加流程;
2.1.3.3、工作线程的实现privatefinalclassworker//worker类是threadpoolexecutor的内部类 extendsabstractqueuedsynchronizer implementsrunnable { finalthread thread;//持有实际线程 runnable firsttask;//worker所对应的第一个任务,可能为空 volatilelong completedtasks;//记录执行任务数 worker(runnable firsttask){ setstate(-1);// inhibit interrupts until runworker this.firsttask = firsttask; this.thread =getthreadfactory().newthread(this); } publicvoidrun(){ runworker(this);//当前线程调用threadpoolexecutor中的runworker方法,在这里实现的线程复用 } ...继承aqs,实现了不可重入锁... }
小结:工作线程 worker 类主要功能;
1. 此类持有一个工作线程,不断处理拿到的新任务,持有的线程即为可复用的线程;
2. 此类可看作一个适配类,在 run () 方法中真实调用 runworker () 方法不断获取新任务,完成线程复用;
2.1.3.4、线程的复用
finalvoidrunworker(worker w){//threadpoolexecutor中的runworker方法,在这里实现的线程复用 thread wt =thread.currentthread(); runnable task = w.firsttask; w.firsttask =null; w.unlock();// allow interrupts boolean completedabruptly =true;//标识线程是否异常终止 try{ while(task !=null||(task =gettask())!=null){//这里会不断从任务队列获取任务并执行 w.lock(); //线程是否需要中断 if((runstateatleast(ctl.get(), stop)|| (thread.interrupted()&& runstateatleast(ctl.get(), stop)))&& !wt.isinterrupted()) wt.interrupt(); try{ beforeexecute(wt, task);//执行任务前的hook方法,可自定义 throwable thrown =null; try{ task.run();//执行实际的任务 }catch(runtimeexception x){ thrown = x;throw x; }catch(error x){ thrown = x;throw x; }catch(throwable x){ thrown = x;thrownewerror(x); }finally{ afterexecute(task, thrown);//执行任务后的hook方法,可自定义 } }finally{ task =null;//执行完成后,将当前线程中的任务制空,准备执行下一个任务 w.completedtasks++; w.unlock(); } } completedabruptly =false; }finally{ processworkerexit(w, completedabruptly);//线程执行完成后的清理工作 } }小结:runworker () 方法主要功能;
1. 循环从缓存队列中获取新的任务,直到没有任务为止;
2. 使用 worker 持有的线程真实执行任务;
3. 任务都执行完成后的清理工作;
2.1.3.5、队列中获取待执行任务privaterunnablegettask(){ boolean timedout =false;//标识当前线程是否超时未能获取到task对象 for(;;){ int c = ctl.get(); int rs =runstateof(c); // check if queue empty only if necessary. if(rs >= shutdown &&(rs >= stop || workqueue.isempty())){ decrementworkercount(); returnnull; } int wc =workercountof(c); // are workers subject to culling? boolean timed = allowcorethreadtimeout || wc > corepoolsize; if((wc > maximumpoolsize ||(timed && timedout)) &&(wc >1|| workqueue.isempty())){ if(compareanddecrementworkercount(c))//若线程存活时间超时,则cas减去线程数量 returnnull; continue; } try{ runnable r = timed ? workqueue.poll(keepalivetime,timeunit.nanoseconds)://允许超时回收则阻塞等待 workqueue.take();//不允许则直接获取,没有就返回null if(r !=null) return r; timedout =true; }catch(interruptedexception retry){ timedout =false; } } }小结:gettask () 方法主要功能;
1. 实际在缓存队列中获取待执行的任务;
2. 在这里管理线程是否要阻塞等待,控制线程的数量;
2.1.3.6、清理工作privatevoidprocessworkerexit(worker w,boolean completedabruptly){ if(completedabruptly)// if abrupt, then workercount wasn't adjusted decrementworkercount(); finalreentrantlock mainlock =this.mainlock; mainlock.lock(); try{ completedtaskcount += w.completedtasks; workers.remove(w);//移除执行完成的线程 }finally{ mainlock.unlock(); } tryterminate();//每次回收完一个线程后都尝试终止线程池 int c = ctl.get(); if(runstatelessthan(c, stop)){//到这里说明线程池没有终止 if(!completedabruptly){ int min = allowcorethreadtimeout ?0: corepoolsize; if(min ==0&&! workqueue.isempty()) min =1; if(workercountof(c)>= min) return;// replacement not needed } addworker(null,false);//异常终止线程的话,需要在常见一个线程 } }
小结:processworkerexit () 方法主要功能;
1. 真实完成线程池线程的回收;
2. 调用尝试终止线程池;
3. 保证线程池正常运行;
2.1.3.7、尝试终止线程池
finalvoidtryterminate(){ for(;;){ int c = ctl.get(); //若线程池正在执行、线程池已终止、线程池还需要执行缓存队列中的任务时,返回 if(isrunning(c)|| runstateatleast(c, tidying)|| (runstateof(c)== shutdown &&! workqueue.isempty())) return; //执行到这里,线程池为shutdown且无待执行任务 或 stop 状态 if(workercountof(c)!=0){ interruptidleworkers(only_one);//只中断一个线程 return; } //执行到这里,线程池已经没有可用线程了,可以终止了 finalreentrantlock mainlock =this.mainlock; mainlock.lock(); try{ if(ctl.compareandset(c,ctlof(tidying,0))){//cas设置线程池终止 try{ terminated();//执行钩子方法 }finally{ ctl.set(ctlof(terminated,0));//这里将线程池设为终态 termination.signalall(); } return; } }finally{ mainlock.unlock(); } // else retry on failed cas } }小结:tryterminate () 方法主要功能;
1. 实际尝试终止线程池;
2. 终止成功则调用钩子方法,并且将线程池置为终态。
2.2、java 线程池总结
以上通过对 java 线程池的具体分析我们可以看出,虽然流程看似复杂,但其实有很多内容都是状态重复校验、线程安全的保证等内容,其主要的功能与我们前面所提出的设计功能一致,只是额外增加了一些扩展,下面我们简单整理下线程池的功能;
2.2.1、主要功能
1. 线程数量及存活时间的管理;
2. 待处理任务的存储功能;
3. 线程复用机制功能;
4. 任务超量的拒绝功能;
2.2.2、扩展功能
1. 简单的执行结果统计功能;
2. 提供线程执行异常处理机制;
3. 执行前后处理流程自定义;
4. 提供线程创建方式的自定义;
2.2.3、流程总结
以上通过对 java 线程池任务提交流程的分析我们可以看出,线程池执行的简单流程如下图所示;
2.3、java 线程池使用
线程池基本使用验证上述流程:
publicstaticvoidmain(string[] args)throwsexception{ //创建线程池 threadpoolexecutor threadpoolexecutor =newthreadpoolexecutor( 5,10,100,timeunit.seconds,newarrayblockingqueue(5)); //加入4个任务,小于核心线程,应该只有4个核心线程,队列为0 for(int i =0; i <4; i++){ threadpoolexecutor.submit(newmyrunnable()); } system.out.println(worker count = + threadpoolexecutor.getpoolsize());//worker count = 4 system.out.println(queue size = + threadpoolexecutor.getqueue().size());//queue size = 0 //再加4个任务,超过核心线程,但是没有超过核心线程 + 缓存队列容量,应该5个核心线程,队列为3 for(int i =0; i <4; i++){ threadpoolexecutor.submit(newmyrunnable()); } system.out.println(worker count = + threadpoolexecutor.getpoolsize());//worker count = 5 system.out.println(queue size = + threadpoolexecutor.getqueue().size());//queue size = 3 //再加4个任务,队列满了,应该5个热核心线程,队列5个,非核心线程2个 for(int i =0; i <4; i++){ threadpoolexecutor.submit(newmyrunnable()); } system.out.println(worker count = + threadpoolexecutor.getpoolsize());//worker count = 7 system.out.println(queue size = + threadpoolexecutor.getqueue().size());//queue size = 5 //再加4个任务,核心线程满了,应该5个热核心线程,队列5个,非核心线程5个,最后一个拒绝 for(int i =0; i <4; i++){ try{ threadpoolexecutor.submit(newmyrunnable()); }catch(exception e){ e.printstacktrace();//java.util.concurrent.rejectedexecutionexception } } system.out.println(worker count = + threadpoolexecutor.getpoolsize());//worker count = 10 system.out.println(queue size = + threadpoolexecutor.getqueue().size());//queue size = 5 system.out.println(threadpoolexecutor.gettaskcount());//共执行15个任务 //执行完成,休眠15秒,非核心线程释放,应该5个核心线程,队列为0 thread.sleep(1500); system.out.println(worker count = + threadpoolexecutor.getpoolsize());//worker count = 5 system.out.println(queue size = + threadpoolexecutor.getqueue().size());//queue size = 0 //关闭线程池 threadpoolexecutor.shutdown(); }
预测:2014年最“火”的九大电子产业
大宇2999K彩电行幅增大检修一例
Sony 2018年产品线依旧重点关注电视行业
802.11n路由器拆解:可换刷固件来升级
氨气检测仪的工作原理及应用特性分析
如何设计一个线程池?JAVA中的线程池是如何设计的?
KNX网络的特点及在智慧城市中的作用
大田智能灌溉控制系统
力积电投资55亿美元建设日本工厂 加速汽车芯片业务
智能镜子显示屏的应用将增强家居生活互动体验
苹果在国内市场销量上表现并不突出,但利润远超国产手机
PCB线路板如何去设计散热比较合适
小米6什么时候上市?小米6最新消息:小米6产能曝光:欧皇都不管用!
Tilt Five桌游AR头显发起众筹,起售价300美元
这真的不是iphone7plus吗?金立S10真机图流出,四摄是亮点
音圈模组3D打印帮助诊治癌
触摸屏技术与触控设计技巧解析
灰尘会对EPS应急电源造成哪些影响
一台家用打印机,就能实现打印自由
IGBT龙头企业中车电气产品毛利率持续下滑