Java JUC高并发编程(五)-线程池Fork/Join异步回调

news/2024/7/3 2:51:34

目录

一、ThreadPool线程池

1、概述

2、线程池的架构

 3、线程池使用方式

4、线程池底层原理

5、线程池的七个参数

6、线程池底层工作流程

7、拒绝策略

8、自定义线程池

二、Fork/Join分支合并框架

1、概述

2、Fork/Join分支合并框架

三、CompletableFuture异步回调


一、ThreadPool线程池

1、概述

线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销。进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

线程池的优势:线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

线程池的主要特点:

1、降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

2、提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行。

3、提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

2、线程池的架构

 3、线程池使用方式

Executors工具类

Executors常用方法:
1.static ExecutorService newFixedThreadPool(int nThreads):创建一个可重用固定线程数的
线程池,以共享的无界队列方式来运行这些线程。
2.static ExecutorService newSingleThreadExecutor():创建一个使用单个 worker 线程的
Executor,以无界队列方式来运行该线程。
3.static ExecutorService newCachedThreadPool():创建一个可根据需要创建新线程的线程池,
但是在以前构造的线程可用时将重用它们。 

Executors.newFixedThreadPool(int nThreads):一池N线程。
Executors.newSingleThreadExecutor():一个任务一个任务执行,一池一线程。
Executors.newCachedThreadPool():线程池根据需求创建线程,可扩容,遇强则强。

newFixedThreadPool()示例:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo1 {

	public static void main(String[] args) {
		ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
		try {
			for (int i = 1; i <=10; i++) {
				threadPool1.execute(()->{
					System.out.println(Thread.currentThread().getName()+" 办理业务");
				});
			}
		}catch (Exception e) {
			e.printStackTrace();
		} finally {
			threadPool1.shutdown();
		}
	}
}

newSingleThreadExecutor();示例:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo1 {

	public static void main(String[] args) {
		ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
		
		try {
			for (int i = 1; i <=10; i++) {
				threadPool2.execute(()->{
					System.out.println(Thread.currentThread().getName()+" 办理业务");
				});
			}
		}catch (Exception e) {
			e.printStackTrace();
		} finally {
			threadPool2.shutdown();
		}
	}
}

newCachedThreadPool();示例:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo1 {

	public static void main(String[] args) {
		ExecutorService threadPool3 = Executors.newCachedThreadPool();
		
		try {
			for (int i = 1; i <=10; i++) {
				threadPool3.execute(()->{
					System.out.println(Thread.currentThread().getName()+" 办理业务");
				});
			}
		}catch (Exception e) {
			e.printStackTrace();
		} finally {
			threadPool3.shutdown();
		}
	}
}

4、线程池底层原理

    ExecutorService threadPool1 = Executors.newFixedThreadPool(5);

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    ExecutorService threadPool2 = Executors.newSingleThreadExecutor();

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    ExecutorService threadPool3 = Executors.newCachedThreadPool();

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

上面三个多线程底层创建都是new ThreadPoolExecutor();

5、线程池的七个参数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

int corePoolSize:常驻线程数量(核心)
int maximumPoolSize:最大线程数量
long keepAliveTime:线程存活时间
TimeUnit unit:线程存活时间
BlockingQueue<Runnable> workQueue:阻塞队列
ThreadFactory threadFactory:线程工厂
RejectedExecutionHandler handler:拒绝策略

6、线程池底层工作流程

 如上图所示:

线程池工作流程:线程池执行execute()方法,才开始创建线程。线程池先执行①corePool(常驻线程)中的任务,其次把任务放到②阻塞队列中,再其次任务放到③最大线程池剩余空间中(并新创建线程执行③中的任务,而②中的阻塞任务会继续等待),还超过上述所有个数之和的情况下,会启动④也就是拒绝策略生效了。

7、拒绝策略

AbortPolic(默认):直接抛出RejectedExecutionException异常阻止系统正常运行。

CallerRunsPolicy:"调用者运行"一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。

DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中,尝试再次提交当前任务。

DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常,如果允许任务丢失,这是最好的策略。

8、自定义线程池

阿里巴巴Java开发手册

[强制]线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors返回的线程池对象的弊端如下:

1)FixedThreadPoolSingleThreadPool

允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。

2)CachedThreadPoolScheduledThreadPool

允许的创建线程数量为Integer.MAX_VALUE,可能会堆积大量的线程,从而导致OOM。

=============================================================

自定义线程池示例:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolDemo2 {

	public static void main(String[] args) {
		ExecutorService threadPool = new ThreadPoolExecutor(2,5,2L,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3)
				,Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
		try {
			for (int i = 1; i <=10; i++) {
				threadPool.execute(()->{
					System.out.println(Thread.currentThread().getName()+" 办理业务");
				});
			}
		}catch (Exception e) {
			e.printStackTrace();
		} finally {
			threadPool.shutdown();
		}
	}
}

二、Fork/Join分支合并框架

1、概述

Fork/Join它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join框架要完成两件事情:

Fork:把一个复杂任务进行分拆,大事化小

Join:把分拆任务的结果进行合并

2、Fork/Join分支合并框架

RecursiveTask 递归任务:继承后可以实现递归(自己调自己)调用的任务。
@since 1.7

public abstract class RecursiveTask<V> extends ForkJoinTask<V>

RecursiveTask构造方法:
1.RecursiveTask()

RecursiveTask方法:
1.protected abstract V compute():此任务执行的主要计算。
2.protected boolean exec():实现递归任务的执行约定。
3.V getRawResult():返回ForkJoinTask将返回的结果。join(),即使此任务异常完成,
或者如果未知此任务已完成,则为null。
4.protected void setRawResult(V value):强制返回给定的值作为结果。

-----------------------------------------------------------------------
ForkJoinTask<V>
@since 1.7

public abstract class ForkJoinTask<V> extends Object 
implements Future<V>, Serializable

ForkJoinTask<V>构造方法:
1.ForkJoinTask()

ForkJoinTask<V>常用方法:
1.ForkJoinTask<V> fork():安排异步执行此任务。
2.V get():如有必要,等待计算完成,然后检索其结果。
3.V join():完成计算后返回计算结果。

-----------------------------------------------------------------------
ForkJoinPool

public class ForkJoinPool extends AbstractExecutorService

ForkJoinPool构造方法:
1.ForkJoinPool()

ForkJoinPool常用方法:
1.void execute(ForkJoinTask<?> task):安排(异步)执行给定任务。
2.void execute(Runnable task):在将来的某个时间执行给定的命令。
3.<T> ForkJoinTask<T> submit(Callable<T> task):提交一个返回值的任务以供执行,
并返回一个表示任务挂起结果的未来。
4.<T> ForkJoinTask<T> submit(ForkJoinTask<T> task):提交要执行的ForkJoinTask。
5.ForkJoinTask<?> submit(Runnable task):提交可运行任务以执行,并返回表示该任务的未来。
6.<T> ForkJoinTask<T> submit(Runnable task, T result):提交可运行任务以执行,
并返回表示该任务的未来。
7.void shutdown():启动有序关机,执行以前提交的任务,但不接受新任务。

 Fork/Join示例:

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

//求1+2+3+....100。需要拆分任务不超过10个数进行相加拆分
public class ForkJoinDemo {

	public static void main(String[] args) throws Exception {
		MyTask myTask = new MyTask(0,100);
		//创建分支合并池对象
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
		//获取最终合并之后结果
		Integer result = forkJoinTask.get();
		System.out.println(result);
		//关闭池对象
		forkJoinPool.shutdown();
	}
}

@SuppressWarnings("serial")
class MyTask extends RecursiveTask<Integer>{

	private static final Integer VALUE = 10;
	private int begin;
	private int end;
	private int result;
	
	public MyTask(int begin, int end) {
		this.begin = begin;
		this.end = end;
	}

	@Override
	protected Integer compute() {
		//判断
		if ((end-begin) <= VALUE) {
			for (int i = begin; i <= end; i++) {
				result = result + i;
			}
		}else {
			int middle = (begin+end)/2;
			//拆分左边
			MyTask task01 = new MyTask(begin, middle);
			//拆分右边
			MyTask task02 = new MyTask(middle+1, end);
			//调用方法拆分
			task01.fork();
			task02.fork();
			//合并结果
			result = task01.join() +task02.join();
		}
		return result;
	}
}

三、CompletableFuture异步回调

同步:

异步:

CompletableFuture异步回调示例:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureDemo {

	public static void main(String[] args) throws Exception {
		
		//同步调用
		CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{
			System.out.println(Thread.currentThread().getName() + " :completableFuture1");
		});
		completableFuture1.get();
		
		//异步调用
		CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
			System.out.println(Thread.currentThread().getName() + " :completableFuture2");
			return 1024;
		});
		completableFuture2.whenComplete((t,u)->{
			System.out.println("--t--"+t);//t:表示成功的返回值
			System.out.println("--u--"+u);//u:表示失败异常的信息
		}).get();
		
		//异步调用
		CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(()->{
			System.out.println(Thread.currentThread().getName() + " :completableFuture3");
			//模拟异常
			int i = 10/0;
			return 1024;
		});
		completableFuture3.whenComplete((t,u)->{
			System.out.println("--t--"+t);//t:表示成功的返回值
			System.out.println("--u--"+u);//u:表示失败异常的信息
		}).get();
		
	}
}

实际开发中一般不会用CompletableFuture异步回调。都会使用MQ消息队列的方式。

Java JUC高并发编程(一)


http://www.niftyadmin.cn/n/3298008.html

相关文章

动态代理 jdk as cglib asm

jdk的代理分为静态代理和动态代理&#xff0c;静态代理用的很少&#xff0c;一般都是动态代理&#xff0c;CGLIB代理是生成的目标类的子类&#xff0c;所以类和方法不能声明为final的&#xff0c;要不然就会有问题jdk的代理是必须要实现接口的&#xff0c;而CGLIB不是&#xff…

Docker学习(一)-简介

目录 一、概述 二、Docker安装 Docker安装前提条件 Docker的基本组成 Docker架构图 Docker安装 hello-word示例&#xff1a; 三、Docker运行原理 四、Docker常用命令 1、帮助命令 2、镜像命令 3、容器命令 五、Docker镜像原理 Docker镜像加载原理 六、Docker容器…

不能失去你所以我为你改变自己

不能失去你所以我为你改变自己从 喷嚏网--读书、培训、8小时外的健康生活! 之 [铂程斋] 作者&#xff1a;xilei1 那时候洛洛和陈果已通过网络“相识”了两年。洛洛是石家庄的&#xff0c;而陈果是长沙的。两人聊得很开心&#xff0c;一直以来也很默契。两个人常常想到一起&…

用mstsc連接服務器收到超出最大連接數的問題

方法一:在win2003里面copy一個sysdir/windows/system32/tsadmin.exe文件運行連接,將那幾個占坑不拉s的人給踢出去,然后重新連接 方法二:運行里面輸入mstsc /console /v:yourservernameorIP:port連接(以前可以用,但不知道為什,我手下的幾臺服務器都無法用) 转载于:https://www.c…

Docker学习(二)-DockerFile

目录 一、DockerFile解析 DockerFile构建过程解析 DockerFile保留字指令 二、本地镜像发布到阿里云 一、DockerFile解析 DockerFile是用来构建Docker镜像的构建文件&#xff0c;是由一系列命令和参数构成的脚本。 构建三步骤&#xff1a; 1、编写DockerFile文件 2、doc…

有状态as无状态之线程问题

有状态就是有数据存储功能。有状态对象(Stateful Bean)&#xff0c;就是有实例变量的对象 &#xff0c;可以保存数据&#xff0c;是非线程安全的。在不同方法调用间不保留任何状态。 无状态就是一次操作&#xff0c;不能保存数据。无状态对象(Stateless Bean)&#xff0c;就是没…

JVM学习(二)--类加载器子系统

目录 一、内存结构 二、类的加载器及类的加载过程 类加载器的分类 启动类加载器&#xff08;引导类加载器&#xff08;Bootstrap ClassLoader&#xff09;&#xff09; 扩展类加载器&#xff08;Extension ClassLoader&#xff09; 应用程序类加载器&#xff08;系统类加…

程控制中关于搜索、控制计算机的功能 和 VB中截获shell程序的输出

相信大家对“冰河”之类的软件一定都非常的感兴趣&#xff0c;这里我们一起来讨论一下“冰河”类软件中关于如何实现搜索、控制远端计算机&#xff08;在局域网、或互联网上搜索那些中了木马的机器&#xff09;的功能。 一、 编程原理 客户端程序&#xff1a;&#xff08;控制远…