Executor1 2 3 4 5 6 7 8 9 10 11 12 13 14  * Executor:线程池中的顶层接口  */ public  class  _01_T_MyExecutor  implements  Executor      public  static  void  main (String[] args)           new  _01_T_MyExecutor().execute(() -> System.out.println("hello executor" ));     }     @Override      public  void  execute (Runnable command)           command.run();     } } 
ExecutorService1 2 3 4 5 6 7 8 9 10  * ExecutorService:真正的线程池接口,其继承了 Executor。  * 提供的方法:  *      void  execute (Runnable command) :执行任务/命令,没有返回值,一般用来执行 Runnable  *      <T> Future<T> submit (Callable<T> task) :执行任务,有返回值,一般有来执行 Callable   *      void  shutdown () :关闭连接池  *      ...  */ public  class  _02_T_ExecutorService  } 
Callable1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34  * Callable 接口和 Runnable 接口相似,区别就是 Callable 需要实现 call 方法,而 Runnable 需要实现 run 方法;  * 并且,call 方法还可以返回任何对象,无论是什么对象,JVM 都会当作 Object 来处理。但是如果使用了泛型,我们就不用每次都对 Object 进行转换了。  */ public  class  _03_T_Callable  implements  Callable <Integer >     @Override      public  Integer call ()  throws  Exception          return  1000 ;     }     public  static  void  main (String[] args)           _03_T_Callable tCallable = new  _03_T_Callable();                   * FutureTask          * 1 ) 作为 Funture 的唯一实现类,可以得到 Callable 的返回值          * 2 ) 同时实现了 Runnable 接口,可以作为 Runnable 被线程执行          *          */         FutureTask<Integer> futureTask = new  FutureTask<>(tCallable);         Thread thread = new  Thread(futureTask);         thread.start();         try  {             Integer integer = futureTask.get();             System.out.println(integer);         } catch  (InterruptedException e) {             e.printStackTrace();         } catch  (ExecutionException e) {             e.printStackTrace();         }     } } 
Future1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34  * Callable 接口和 Runnable 接口相似,区别就是 Callable 需要实现 call 方法,而 Runnable 需要实现 run 方法;  * 并且,call 方法还可以返回任何对象,无论是什么对象,JVM 都会当作 Object 来处理。但是如果使用了泛型,我们就不用每次都对 Object 进行转换了。  */ public  class  _03_T_Callable  implements  Callable <Integer >     @Override      public  Integer call ()  throws  Exception          return  1000 ;     }     public  static  void  main (String[] args)           _03_T_Callable tCallable = new  _03_T_Callable();                   * FutureTask          * 1 ) 作为 Funture 的唯一实现类,可以得到 Callable 的返回值          * 2 ) 同时实现了 Runnable 接口,可以作为 Runnable 被线程执行          *          */         FutureTask<Integer> futureTask = new  FutureTask<>(tCallable);         Thread thread = new  Thread(futureTask);         thread.start();         try  {             Integer integer = futureTask.get();             System.out.println(integer);         } catch  (InterruptedException e) {             e.printStackTrace();         } catch  (ExecutionException e) {             e.printStackTrace();         }     } } 
Executors1 2 3 4 5 6 7 8 9 10  * Executors:工具类、线程池的工厂类,用于创建并返回不同类型的线程池  *  * Executors.newFixedThread(n):创建一个可重用固定线程的线程池。  * Executors.newCachedTreadPool():创建一个可缓存的线程池,调用 execute 将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60  秒钟未被使用的线程。  * Executors.newSingleThreadExecutor():创建一个只有一个线程的线程池。  * Executors.newScheduledThreadPool(n):创建一个线程池,它可以安排在给定延迟后运行命令或者定期地执行。  */ public  class  _04_T_Executors  } 
Executor.newFixedThread(n)1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31  * Executors.newFixedThread(n):创建一个可重用固定线程的线程池。  */ public  class  _05_T_FixedThreadPool      public  static  void  main (String[] args)  throws  InterruptedException          ExecutorService service = Executors.newFixedThreadPool(5 );         for  (int  i = 0 ; i < 6 ; i++) {             service.execute(() -> {                 try  {                     TimeUnit.MILLISECONDS.sleep(500 );                 } catch  (InterruptedException e) {                     e.printStackTrace();                 }                 System.out.println(Thread.currentThread().getName());             });         }         System.out.println(service);         service.shutdown();         System.out.println(service.isTerminated());         System.out.println(service.isShutdown());         System.out.println(service);         TimeUnit.SECONDS.sleep(5 );         System.out.println(service.isTerminated());         System.out.println(service.isShutdown());         System.out.println(service);     } } 
练习: 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65  * 获取 1  到 200000  之间的所有质数,比较传统写法(不用线程池)和使用线程池之间的效率。  */ public  class  _07_T_ParallelComputing      public  static  void  main (String[] args)  throws  ExecutionException, InterruptedException          long  start = System.currentTimeMillis();         List<Integer> result = getPrime(1 , 200000 );         long  end = System.currentTimeMillis();         System.out.println(end - start);         final  int  cpuCoreNum = 4 ;         ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);         MyTask t1 = new  MyTask(1 , 80000 );         MyTask t2 = new  MyTask(80001 , 130000 );         MyTask t3 = new  MyTask(130001 , 170000 );         MyTask t4 = new  MyTask(170001 , 200000 );         Future<List<Integer>> future1 = service.submit(t1);         Future<List<Integer>> future2 = service.submit(t2);         Future<List<Integer>> future3 = service.submit(t3);         Future<List<Integer>> future4 = service.submit(t4);         start = System.currentTimeMillis();         future1.get();         future2.get();         future3.get();         future4.get();         end = System.currentTimeMillis();         System.out.println(end - start);         service.shutdown();     }     static  class  MyTask  implements  Callable <List <Integer >>          int  startPos, endPos;         public  MyTask (int  startPos, int  endPos)               this .startPos = startPos;             this .endPos = endPos;         }         @Override          public  List<Integer> call ()  throws  Exception              List<Integer> prime = getPrime(startPos, endPos);             return  prime;         }     }     private  static  boolean  isPrime (int  num)           for  (int  i = 2 ; i < num / 2 ; i++) {             if  (num % i == 0 ) return  false ;         }         return  true ;     }     private  static  List<Integer> getPrime (int  start, int  end)           List<Integer> results = new  ArrayList<>();         for  (int  i = start; i < end; i++) {             if  (isPrime(i)) results.add(i);         }         return  results;     } } 
Executor.newCachedTreadPool()1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30  * Executor.newCachedTreadPool():创建一个可缓存的线程池,调用 execute 将重用以前构造的线程(如果线程可用)。  * 如果没有可用的线程,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60  秒钟未被使用的线程。  */ public  class  _08_T_CachedThreadPool      public  static  void  main (String[] args)  throws  InterruptedException                   ExecutorService service = Executors.newCachedThreadPool();         System.out.println(service);         for  (int  i = 0 ; i < 2 ; i++) {             service.execute(() -> {                 try  {                     TimeUnit.MILLISECONDS.sleep(500 );                 } catch  (InterruptedException e) {                     e.printStackTrace();                 }                 System.out.println(Thread.currentThread().getName());             });         }         System.out.println(service);         TimeUnit.SECONDS.sleep(70 );         System.out.println(service);     } } 
Executors.newSingleThreadExecutor()1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16  * Executors.newSingleThreadExecutor():创建一个只有一个线程的线程池。  */ public  class  _09_T_SingleThreadPool      public  static  void  main (String[] args)           ExecutorService service = Executors.newSingleThreadExecutor();         for  (int  i = 0 ; i < 5 ; i++) {             final  int  j = i;             service.execute(() -> {                 System.out.println(j + " "  + Thread.currentThread().getName());             });         }         service.shutdown();     } } 
Executors.newScheduledThreadPool(n)1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30  * Executors.newScheduledThreadPool(n):创建一个线程池,它可以安排在给定延迟后运行命令或者定期地执行。  */ public  class  _10_T_ScheduledThreadPool      public  static  void  main (String[] args)  throws  InterruptedException          ScheduledExecutorService service = Executors.newScheduledThreadPool(4 );                   * scheduleAtFixedRate(Runnable command,long  initialDelay,long  period,TimeUnit unit):以固定的频率去执行任务。          * 4 个参数:          *      Runnable command:实现 Runnable 接口          *      long  initialDelay:初始化线程启动延迟的时间          *      long  period:每隔多长时间执行任务(间隔的频率)          *      TimeUnit unit:第三个参数的时间单位          */         service.scheduleAtFixedRate(() -> {             try  {                 TimeUnit.MILLISECONDS.sleep(1000 );             } catch  (InterruptedException e) {                 e.printStackTrace();             }             System.out.println(Thread.currentThread().getName());         }, 0 , 500 , TimeUnit.MILLISECONDS);         TimeUnit.SECONDS.sleep(20 );         service.shutdown();     } } 
Executors.newWorkStealingPool()1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39  *  newWorkStealingPool(工作窃取):适合使用在很耗时的操作,但是 newWorkStealingPool 不是 ThreadPoolExecutor 的扩展。  * 它是新的线程池类 ForkJoinPool 的扩展(也就是说它的底层实现是 ForkJoinPool),但是都是在统一的一个 Executors 类中实现。  * 由于能够合理的使用 CPU 进行对任务操作(并行操作),所以适合使用在很耗时的任务中。  */ public  class  _11_T_WorkStealingPool      public  static  void  main (String[] args)  throws  IOException          ExecutorService service = Executors.newWorkStealingPool();         System.out.println(Runtime.getRuntime().availableProcessors());                  service.execute(new  R(1000 ));         service.execute(new  R(2000 ));         service.execute(new  R(2000 ));         service.execute(new  R(2000 ));         service.execute(new  R(2000 ));                  System.in.read();     }     static  class  R  implements  Runnable           int  time;         public  R (int  time)               this .time = time;         }         @Override          public  void  run ()               try  {                 TimeUnit.MILLISECONDS.sleep(time);             } catch  (InterruptedException e) {                 e.printStackTrace();             }             System.out.println(time + " "  + Thread.currentThread().getName());         }     } } 
ForkJoinPool1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 public  class  _12_T_ForkJoinPool      static  int [] nums = new  int [1000000 ];     static  final  int  MAX_NUM = 50000 ;     static  Random r = new  Random();     static  {         for  (int  i = 0 ; i < nums.length; i++) {             nums[i] = r.nextInt(100 );         }         System.out.println(Arrays.stream(nums).sum());     }          static  class  AddAction  extends  RecursiveAction           int  start, end;         public  AddAction (int  start, int  end)               this .start = start;             this .end = end;         }         @Override          protected  void  compute ()               if  (end - start <= MAX_NUM) {                 long  sum = 0L ;                 for  (int  i = start; i < end; i++) {                     sum += nums[i];                 }                 System.out.println("from:"  + start + " to:"  + end + " = "  + sum);             } else  {                 int  middle = start + (end - start) / 2 ;                 AddAction subAct1 = new  AddAction(start, middle);                 AddAction subAct2 = new  AddAction(middle, end);                 subAct1.fork();                 subAct2.fork();             }         }     }          static  class  AddTask  extends  RecursiveTask <Long >          int  start, end;         public  AddTask (int  start, int  end)               this .start = start;             this .end = end;         }         @Override          protected  Long compute ()               if  (end - start <= MAX_NUM) {                 long  sum = 0L ;                 for  (int  i = start; i < end; i++) {                     sum += nums[i];                 }                 System.out.println("from:"  + start + " to:"  + end + " = "  + sum);                 return  sum;             } else  {                 int  middle = start + (end - start) / 2 ;                 AddTask subTask1 = new  AddTask(start, middle);                 AddTask subTask2 = new  AddTask(middle, end);                 subTask1.fork();                 subTask2.fork();                 return  subTask1.join() + subTask2.join();             }         }     }     public  static  void  main (String[] args)  throws  IOException, ExecutionException, InterruptedException          ForkJoinPool fjp = new  ForkJoinPool();                   * RecursiveAction 的执行方法          */         AddAction addAction = new  AddAction(0 , nums.length);         fjp.execute(addAction);         System.in.read();                   * RecursiveTask 的执行方法          */         AddTask addTask = new  AddTask(0 , nums.length);         Future<Long> submit = fjp.submit(addTask);         Long result = submit.get();         System.out.println(result);     } } 
  
    
      If you like this blog or find it useful for you, you are welcome to comment on it. You are also welcome to share this blog, so that more people can participate in it. If the images used in the blog infringe your copyright, please contact the author to delete them. Thank you !