[toc]
前言
圈主 [Rocky编程日记] 学习多线程的设计模式笔记记录。希望我写得笔记你能够喜欢, 希望我写的笔记能够给你提供帮助。同时若笔记中存在不对的地方,那一定是圈主当时的理解还不够, 希望你能够及时指出嗷~
目前正在整理ing, 还请别急~
代码仓库地址:
code-multithread-pattern
需要引用 如下 pom文件依赖
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
多线程程序的评价标准
安全性 ---- 不损坏对象
不损坏对象。 程序正常运行的必要条件之一。对象损坏是指对象的状态和设计者的原意不一致,通常是指对象的字段的值并非预期值。
判断一个类是否是线程安全的: ** 一个类即使被多个线程使用,也可以确保安全性, 那么这个类就是线程安全的。**
什么是线程兼容 [Bloch] ? ArrayList 虽然是非线程安全的, 但通过执行适当的互斥处理, 也可以安全地使用。
某个线程是线程安全的还是非线程安全的, 与这个类的方法是否是 synchronized 方法无关。
生存性 ---- 必要的时候能够被执行
生存性是指无论是什么时候, 必要的处理都一定能够被执行。
即使对象没有损坏, 也不一定代表程序就一定好。极端一点说,假如程序在运行过程中突然停止, 这时, 由于处理已经停止, 对象的状态就不会发生改变, 所以对象状态也就不会异常。虽然符合 "安全性" , 但无法运行的程序根本没有意义。
有时候生存性 和 安全性 会相互制约。例如, 有时候只重视安全性, 生存性就会下降。例如 死锁(deadlock), 多个线程互相等待对方释放锁的情形。
可复用性 ---- 类可重复利用
指类能够重复利用。
类如果能够作为组件从正常运行的软件中分割出来,那么就说明这个类有很高的可复用性。
J2 SE 5.0 中引入的 java.util.concurrent包中提供了便于多线程编程的可复用性高的类。
性能 ---- 能快速、大批量地执行处理
指能快速、大批量地执行处理。
影响性能可能因素
吞吐量 : 单位时间内完成的处理数量。能完成的处理越多, 则表示吞吐量越大。
响应性 : 从发出请求到收到响应的时间。时间越短, 响应性也就越好。举例: 相比于按下按钮后无任何反应, 10秒后才提示 "处理完毕" 这种方式, 在按下按钮时立刻提示 "处理开始" 这种方式的响应性更高, 即使到处理结束花费的时间稍多一点也没关系。响应性好也称为 "等待时间" 短。
容量 : 指可同时进行的处理数量。
效率、可伸缩性、降级等
Single Threaded Execution 模式
Single Threaded Execution 模式简介
意思是 "以一个线程执行 "。类似于独木桥同一时间内只允许一个人通行一样, 该模式用于设置限制, 以确保同一时间内只能让一个线程执行处理。
该模式有时候也被称为 "临界区" 或 "临界域" 这个名称侧重于执行处理的线程(过桥的人), 而临界区域临界域的名称则侧重于执行范围(人过的桥)。
案例简介: 模拟三位通行者频繁通过一个一次只允许一个人经过的门的情形。当通行者通过门的时候, 统计人数便会递增。并且程序还会记录通行者的 "姓名与出生地"。
通行者
public class User {
private String name;
private String address;
public User(String name, String address) {
this.name = name;
this.address = address;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
}
用户任务
/**
* 用户任务
*/
public class UserThread extends Thread {
private Logger log = LoggerFactory.getLogger(this.getClass());
/**
* final 与 创建线程安全的实例
*/
private final SafeGate safeGate;
private final User user;
public UserThread(SafeGate safeGate, User user) {
this.safeGate = safeGate;
this.user = user;
}
@Override
public void run() {
log.info("线程: {} 姓名:{} 经过安全门",Thread.currentThread().getName(),user.getName());
while (true) {
safeGate.pass(user);
}
}
}
安全门
public class SafeGate {
private int number;
private String name;
private String address;
public void pass(User user) {
this.number++;
this.name = user.getName();
// 休眠
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
this.address = user.getAddress();
check();
}
@Override
public String toString() {
return "No." + number + ": "+ name + " : " + address;
}
private void check() {
if (name.charAt(0) != address.charAt(0)) {
System.out.println("=== BROKEN ===" + toString());
}
}
}
测试
public class SafeGateTest {
public static void main(String[] args) {
SafeGate safeGate = new SafeGate();
new UserThread(safeGate,new User("rocky", "ro_Zhou")).start();
new UserThread(safeGate,new User("luo","mei_Zhou")).start();
new UserThread(safeGate,new User("chen","chen_Zhou")).start();
}
}
结果
=== BROKEN ===No.3: rocky : mei_Zhou
=== BROKEN ===No.3: rocky : mei_Zhou
=== BROKEN ===No.6: chen : ro_Zhou
=== BROKEN ===No.6: chen : ro_Zhou
=== BROKEN ===No.6: chen : ro_Zhou
=== BROKEN ===No.10: rocky : chen_Zhou
=== BROKEN ===No.10: rocky : chen_Zhou
=== BROKEN ===No.12: chen : mei_Zhou
=== BROKEN ===No.12: chen : mei_Zhou
=== BROKEN ===No.15: luo : mei_Zhou
=== BROKEN ===No.16: luo : ro_Zhou
=== BROKEN ===No.17: rocky : chen_Zhou
休眠了一段时间 , 控制台日志显示与预期结果不一致。不知道为啥会出现原因的可以 搜一搜 缓存一致性协议(MESI)。
改造使之安全
/**
* 安全门
*/
public class SafeGate {
private int number;
private String name;
private String address;
public synchronized void pass(User user) {
this.number++;
this.name = user.getName();
// 休眠
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
this.address = user.getAddress();
check();
}
@Override
public synchronized String toString() {
return "No." + number + ": "+ name + " : " + address;
}
private void check() {
if (name.charAt(0) != address.charAt(0)) {
System.out.println("=== BROKEN ===" + toString());
}
}
}
Single Threaded Execution 模式中的登场角色
SharedResource(共享资源)
Single Threaded Execution 模式中出现了一个发挥 SharedResource
(共享资源) 作用的类。由 SafeGate
扮演 SharedResource
角色。
SharedResource
角色是可被多个线程访问的类, 包含很多方法, 但这些方法主要分为如下两类。
safeMethod
: 多个线程同时调用也不会发生问题的方法unsafeMethod:
多个线程同时会调用会发生问题, 因此必须加以保护的方法。
Single Threaded Execution模式会保护 unsafeMethod
, 使其同时只能由一个线程访问。Java则是通过 unsafeMethod
声明 为 synchronzied
方法来保护。我们将只允许单个线程执行的程序范围称为临界区。
在什么情况下使用 Single Threaded Execution 呢 ?
多线程时
- 单线程程序中并不需要使用 Single Threaded Execution模式, 因此也就无需使用
synchronzied
方法。当然, 在synchronzied
方法并不会破坏程序的安全性。但synchronzied
方法要比调用一般方法花费时间, 这会稍微降低程序性能。
- 单线程程序中并不需要使用 Single Threaded Execution模式, 因此也就无需使用
多个线程访问时
状态有可能发生变化
之所以需要使用 Single Threaded Execution 模式, 是因为
ShareResource
角色的状态会变化。如果在创建实例后, 实例的状态再也不会发生变化, 那就无需使用 Single Threaded Execution 模式。
需要确保安全性时
只有在需要确保安全性时, 才需要使用 Single Threaded Execution 模式
例如, Java的集合类大多都是非线程安全的。这是为了在不需要考虑安全性的时候提高程序运行速度。
用户在使用类时, 需要考虑自己要用的类是否是线程安全的。
生存性与死锁
在 Single Thread Execution
模式中, 满足下列条件时, 死锁就会发生。
存在多个
SharedResource
角色的锁的同时, 还想获取其他SharedResource
角色的锁获取
SharedResource
角色的锁的顺序并不固定(SharedResource
角色是对称的)
可复用性和继承反常
假设我们现在要编写 一个 SharedResource
角色的子类。如果子类能够访问 SharedResource
角色的字段, 那么编写子类的开发人员就可能会不小心编写出无保护的 unsafeMethod
。即使能够确保好不容易编写的 SharedResource
角色的安全性, 在子类化时还是有可能会失去安全性。如果不将包含子类在内的所有 unsafeMethod
都声名为 synchronized
方法, 就无法确定 SharedResource
角色的安全性。
在 面向对象的程序设计中, 伴随子类化而出现的 "继承" 起着非常重要的作用。但对于多线程程序设计来说, 继承会引起一些麻烦的问题。我们通常称之为 继承反常。
临界区的大小和性能
Single Threaded Execution 模式会降低程序性能, 原因有如下两个方面。
获取锁花费时间 : 进入
synchronized
方法时, 线程需要获取对象的锁, 该处理会花费时间。如果
SharedResource
角色的数量减少了, 那么要获取的锁的数量也会相应地减少, 从而就能够抑制性能的下降了。线程冲突引起的等待 : 当线程执行临界区内的处理时, 其他想要进入临界区的线程会阻塞。这种状况称为线程冲突。发生冲突时, 程序的整体性能会随着线程等待时间的增加而下降。
如果尽可能地缩小临界区的范围, 降低线程冲突的概率, 那么就能够抑制性能的下降
Immutable Object(不可变对象) 模式
Immutable Object模式简介
Java.lang.String
类是用于表示字符串, String 类中并没有修改字符串内容的方法。也就是说, String 的实例所表示的字符串内容绝对不会发生变化。正因为如此, String类中的方法无需声明为 synchronized。 因为实例的内部状态不会发生改变, 所以无论 String 实例被多少个线程访问,也无需执行线程的互斥处理。
Immutable就是不变的、不发生改变。
Immutable模式中存在着确保实例状态不发生改变的类。在访问这些实例时不需要执行耗时的互斥处理。如果能用好该模式,就可以提高程序性能。
如String就是一个不可变(immutable)类。
示例代码
User类
public class User {
private final String name;
private final String address;
public User(String name, String address) {
this.name = name;
this.address = address;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", address='" + address + '\'' +
'}';
}
public String getName() {
return name;
}
public String getAddress() {
return address;
}
}
PrintPersonThread
类
public class PrintPersonThread extends Thread {
private User user;
public PrintPersonThread(User user) {
this.user = user;
}
public void run() {
while (true) {
System.out.println(Thread.currentThread().getName() + " prints " + user);
}
}
}
PrintPersonThreadTest
public class PrintPersonThreadTest {
public static void main(String[] args) {
User alice = new User("Rocky编程日记", "Rocky编程日记");
new PrintPersonThread(alice).start();
new PrintPersonThread(alice).start();
new PrintPersonThread(alice).start();
}
}
现在回想一下 Single Threaded Execution模式。该模式会将修改或引用实例状态的地方设置为临界区, 使这个区域只能由一个线程同时执行。但像 User
类这样, 实例的状态绝对不会发生改变时, 情况就不一样啦。即使多个线程同时对该实例执行处理, 实例也不会出错, 因为实例的状态肯定不会发生改变。既然实例的状态肯定不会发生改变,那么也就无须使用 synchronized 来保护实例。因为即使想破坏实例, 也破坏不了。
Immutable 模式中的登场角色
Immutable(不可变的)
- Immutable 角色是一个类, 在这个角色中,字段的值不可以修改, 也不存在修改字段内容的方法。Immutable 角色实例被创建后, 状态就不再发生变化。此时, 无需对Immutable角色应用 Single Threaded Execution模式。
在什么场景下使用Immutable模式
实例创建后, 状态不再发生变化时
首先, 如前面反复讲述的那样, "实例创建后, 状态不再发生变化"是必要条件。
实例的状态是由字段的值决定的,所以 "将字段声明为final字段, 且不存在setter方法"是重点所在
即使字段是final 字段, 且不存在 setter 方法, 也有可能不是不可变的(immutable)。因为即使字段的值不会发生变化, 字段引用的实例也有可能会发生变化。
实例是共享的,且被频繁访问时
Immutable模式的优点是 "不需要使用 synchronized 进行保护"。
不需要使用 synchronized 进行保护就意味着能够在不失去安全性和生存性的前提下提高性能。
当实例被多个线程共享, 且有可能被频繁访问时, Immutable 模式的优点就会凸显出来。
考虑成对的 mutable 类 和 immutable 类 [性能]
思考: 假设存在于一个类, 由于该类会被多个线程访问, 所以我们使用
synchronized
进行了保护。这里, 如果该类中存在 setting 方法, 那么 Immutable模式就不成立啦。假设 我们查看程序后发现实际上这个 setter 方法并未被使用, 那么就可以将字段声明为 final, 删除setter 方法, 并注意遵守不可变性,这样或许就能将其改造为可适用于 Immutable 模式了。
实际上可能使用了 setter 方法。这时候, 该类就不适用于 Immutable 模式了, 但这还不是放弃的时候, 我们来仔细查看了一下整个程序是如何使用该类的, 看是不是可以分为使用 setter 方法的情况下与不使用 setter 方法的情况。如果可以明确分为这两种情况, 那我们是不是可以将这个类拆分为 mutable 类 和 immutable 类, 然后再设计成可以根据 mutable 实例创建 immutable 实例, 并可以反过来根据 immutable 实例创建 mutable 实例呢? 这样, immutable 类的部分就可以应用 Immutable 模式了。
Java 的标准类库中就有成对的 mutable 类 和 immutable 类, 例如:
java.lang.StringBuffer
类和java.lang.String
类。StringBuffer
类是表示字符串的 mutable 类。表示的字符串能够随便改写, 为了确保安全, 改写时需要妥善使用synchronized
。而 String 类是表示字符串的 immutable 类。String 实例表示的字符串不可以改写。StringBuffer
类中有一个以 String 为参数的构造函数, 而String
类中有一个以StringBuffer
为参数的构造函数。也就是说,StringBuffer
的实例和String
的实例可以互相转换。
为了确保不可变性(可复用性)
不可变性是一个很微妙的性质, 代码稍微一修改, 程序可能就会失去不可变性。如果从使用了 Immutable 模式的程序中删除了 synchronized, 那么当失去不可变性时,程序的安全性就会完全丧失, 所以一定要注意。
Guarded Suspension(保护性暂挂) 模式
Guarded Suspension 模式简介?
是“被保护着的”、“被防卫着的”意思,suspension则是“暂停”的意思。当现在并不适合马上执行某个操作时,就要求想要执行该操作的线程等待,这就是Guarded Suspension Pattern。 Guarded Suspension Pattern 会要求线程等候,以保障实例的安全性,其它类似的称呼还有guarded wait、spin lock等。
示例程序
Request
/**
* 一个请求的类
*/
public class Request {
private final String name;
public Request(String name) {
this.name = name;
}
public String getName() {
return name;
}
public String toString() {
return "[ Request " + name + " ]";
}
}
RequestQueue
/**
* 依次存放请求的类
*/
public class RequestQueue {
private final Queue<Request> queue = new LinkedList<Request>();
/**
* 1. 施加守护条件进行保护
* 2. 不等待和等待的情况
* 3. 执行 wait, 等待条件发生变化 [明确线程在等待什么?应该何时执行notify/notifyAll]
* 4. 执行到while的下一条语句时一定能确定的事情
* @return
*/
public synchronized Request getRequest() {
while (queue.peek() == null) { // 守护条件的逻辑非运算
try {
System.out.println(Thread.currentThread().getName() + ": wait() begins, queue = " + queue);
wait();
System.out.println(Thread.currentThread().getName() + ": wait() ends, queue = " + queue);
} catch (InterruptedException e) {
}
}
// != null 守护条件
return queue.remove();
}
public synchronized void putRequest(Request request) {
queue.offer(request);
System.out.println(Thread.currentThread().getName() + ": notifyAll() begins, queue = " + queue);
notifyAll();
System.out.println(Thread.currentThread().getName() + ": notifyAll() ends, queue = " + queue);
}
}
ClientThread
/**
* 发送请求的类
*/
public class ClientThread extends Thread {
private final Random random;
private final RequestQueue requestQueue;
public ClientThread(RequestQueue requestQueue, String name, long seed) {
super(name);
this.requestQueue = requestQueue;
this.random = new Random(seed);
}
public void run() {
for (int i = 0; i < 10000; i++) {
Request request = new Request("No." + i);
System.out.println(Thread.currentThread().getName() + " requests " + request);
requestQueue.putRequest(request);
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
}
}
}
}
ServerThread
/**
* 接收请求的类
*/
public class ServerThread extends Thread {
private final Random random;
private final RequestQueue requestQueue;
public ServerThread(RequestQueue requestQueue, String name, long seed) {
super(name);
this.requestQueue = requestQueue;
this.random = new Random(seed);
}
public void run() {
for (int i = 0; i < 10000; i++) {
Request request = requestQueue.getRequest();
System.out.println(Thread.currentThread().getName() + " handles " + request);
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
}
}
}
}
SuspensionTest
/**
* 测试程序的类
*/
public class SuspensionTest {
public static void main(String[] args) {
RequestQueue requestQueue = new RequestQueue();
new ClientThread(requestQueue, "Rocky", 3141592L).start();
new ServerThread(requestQueue, "Rocky_BCRJ", 6535897L).start();
}
}
Guarded Suspension 模式中的角色
GuardedObject
(被守护的对象)GuardedObject
角色是一个持有被守护的方法 (guardeMethod
) 的类。当线程执行guardeMethod
方法时, 若守护条件成立, 则可以立即执行; 当守护条件不成立时, 就要进行等待。守护条件的成立与否会随着GuardedObject
角色的状态不同而发生变化。还有可能持有其他改变实例状态(特别是改变守护条件)的方法(
stateChangingMethod
)。在 Java 中
guardeMethod
通过 while语句和wait方法来实现,stateChangingMethod
则通过notify/notifyAll
方法来实现
在什么场景下使用 Guarded Suspension
附加条件的
synchronized
在
Single Threaded Execution
模式中, 只要有一个线程进入临界区, 其他线程就无法进入, 只能等待。在
Guarded Suspension
模式中, 线程是否等待取决于守护条件。Guarded Suspension
模式是在Single Threaded Execution
模式的基础上附加了条件而形成的。也就是说,Guarded Suspension
模式是类似于附加条件的 synchronized 这样的模式。
多线程版本的
if
- 单线程中, 执行操作的主体操作线程只有一个。如果该类唯一线程进入等待线程, 就没有线程来改变实例的状态。
忘记改变状态与生存性
- 在
wait
的线程每次被notify / notifyAll
时都会检查守护条件。不管被notify / notifyAll
多少次, 如果守护条件不成立, 线程都会随着while
再次wait
。
- 在
wait
与notify / notifyAll
的责任示例程序中, 你会发现
wait / notifyAll
只出现在RequestQueue
类中, 而并未出现在ClientThread、ServerThread
类中。Guarded Suspension
模式的实现封装在RequestQueue
类中。这种将
wait / notifyAll
隐藏起来的做法对RequestQueue
类的复用性来说是非常重要的。其他类无需 考虑wait / notifyAll
的问题, 只需要调用就可以了。
各种称呼
特征: 存在循环, 存在条件检查, 因为某种原因而 " 等待"
guarded suspension
: 被守护而暂停执行的含义guarded wait
: 被守护而等待, 其实现方法为线程使用wait
进行等待, 被notify / notifyAll
后, 再次检查条件是否成立。由于线程在使用wait
进行等待的期间是待在等待队列中停止执行的。所以并不会浪费虚拟机的处理时间。busy wait
: 忙于等待, 其实现方法并未使用wait
进行等待, 而是执行yield
的同时检查守护条件。由于等待端的线程是在持续运行的。所以会浪费虚拟机的处理时间。spin lock
: 通过旋转来锁定, 在条件成立之前, 通过while
循环 旋转等待的情形。polling
: 反复检查某个事件是否发生, 若发生, 则执行相应处理的方式 。
Baking 模式
Balking 模式 简介
Balking模式: 如果现在不适合执行这个操作, 或者没必要执行这个操作, 就停止处理, 直接返回。
示例程序
Data
/**
* 可以修改并保存的数据的类
*/
public class Data {
private final String filename; // 保存的文件名
private String content; // 数据内容
private boolean changed; // 修改后的内容若未保存,则为true
public Data(String filename, String content) {
this.filename = filename;
this.content = content;
this.changed = true;
}
// 修改数据内容
public synchronized void change(String newContent) {
content = newContent;
changed = true;
}
// 若数据内容修改过,则保存到文件中
public synchronized void save() throws IOException {
if (!changed) {
System.out.println(Thread.currentThread().getName() + " balks");
return;
}
doSave();
changed = false;
}
// 将数据内容实际保存到文件中
private void doSave() throws IOException {
System.out.println(Thread.currentThread().getName() + " calls doSave, content = " + content);
Writer writer = new FileWriter(filename);
writer.write(content);
writer.close();
}
}
SaverThread
public class SaverThread extends Thread {
private final Data data;
public SaverThread(String name, Data data) {
super(name);
this.data = data;
}
public void run() {
try {
while (true) {
data.save(); // 要求保存数据
Thread.sleep(1000); // 休眠约1秒
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ChangerThread
public class ChangerThread extends Thread {
private final Data data;
private final Random random = new Random();
public ChangerThread(String name, Data data) {
super(name);
this.data = data;
}
public void run() {
try {
for (int i = 0; true; i++) {
data.change("No." + i); // 修改数据
Thread.sleep(random.nextInt(1000)); // 执行其他操作
data.save(); // 显式地保存
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Test
public class Test {
public static void main(String[] args) {
Data data = new Data("data.txt", "(empty)");
new ChangerThread("ChangerThread", data).start();
new SaverThread("SaverThread", data).start();
}
}
Balking 模式中登场角色
GuardedObject
(被守护的对象)GuardedObject
角色是一个拥有被防护的方法(guardedMethod
)的类。当线程执行guardedMethod
方法时, 若守护条件成立, 则执行实际的处理。而当守护条件不成立时, 则不执行实际的处理, 直接返回。守护条件的成立与否, 会随着GuardedObject
角色的状态变化而发生变化。除了
guardedMethod
之外,GuardedObject
角色还有可能有其他来改变状态的方法(stateChangingMethod
)。
在什么场景下使用 Balking 模式
并不需要执行时
不需要等待守护条件成立时
守护条件仅在第一次成立时
Producer-Consumer(生产者/消费者) 模式
模式简介
就是生产者-消费者模式。生产者和消费者在为不同的处理线程,生产者必须将数据安全地交给消费者,消费者进行消费时,如果生产者还没有建立数据,则消费者需要等待。 一般来说,可能存在多个生产者和消费者,不过也有可能生产者和消费者都只有一个,当双方都只有一个时,我们也称之为Pipe Pattern。
示例程序
MakerThread
用于制作蛋糕, 并将其放置到桌子上, 也就是糕点师 。
该类会先暂停一段随机时长, 然后再调用 Table 类的 put 方法将制作好的蛋糕放置到桌子上。暂停的这段时间模拟的是 "制作蛋糕所花费的时间"。
public class MakerThread extends Thread {
private final Random random;
private final Table table;
private static int id = 0; // 蛋糕的流水号(所有糕点师共用)
public MakerThread(String name, Table table, long seed) {
super(name);
this.table = table;
this.random = new Random(seed);
}
public void run() {
try {
while (true) {
Thread.sleep(random.nextInt(1000));
String cake = "[ Cake No." + nextId() + " by " + getName() + " ]";
table.put(cake);
}
} catch (InterruptedException e) {
}
}
private static synchronized int nextId() {
return id++;
}
}
EaterThread
用于表示从桌子上取蛋糕吃的客人。
客人通过 Table 类的take 方法取桌子上的蛋糕。然后 , 与
MakerThread
类一样,EaterThread
也会暂停一段随机长的时间。这段暂停时间模拟的是 "吃蛋糕花费的时间"。
public class EaterThread extends Thread {
private final Random random;
private final Table table;
public EaterThread(String name, Table table, long seed) {
super(name);
this.table = table;
this.random = new Random(seed);
}
public void run() {
try {
while (true) {
String cake = table.take();
Thread.sleep(random.nextInt(1000));
}
} catch (InterruptedException e) {
}
}
}
Table
public class Table {
private final String[] buffer;
private int tail; // 下次put的位置
private int head; // 下次take的位置
private int count; // buffer中的蛋糕个数
public Table(int count) {
this.buffer = new String[count];
this.head = 0;
this.tail = 0;
this.count = 0;
}
public synchronized void put(String cake) throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " puts " + cake);
while (count >= buffer.length) {
System.out.println(Thread.currentThread().getName() + " wait BEGIN");
wait();
System.out.println(Thread.currentThread().getName() + " wait END");
}
buffer[tail] = cake;
tail = (tail + 1) % buffer.length;
count++;
notifyAll();
}
public synchronized String take() throws InterruptedException {
while (count <= 0) {
System.out.println(Thread.currentThread().getName() + " wait BEGIN");
wait();
System.out.println(Thread.currentThread().getName() + " wait END");
}
String cake = buffer[head];
head = (head + 1) % buffer.length;
count--;
notifyAll();
System.out.println(Thread.currentThread().getName() + " takes " + cake);
return cake;
}
}
Producer-Consumer 模式中的登场角色
Data
:Producer
角色生成Data
角色, 并将其传递给Channel
。Producer
(生产者) : 从Channel
角色获取Data
角色并使用 。Consumer
(消费者) : 从Channel
角色获取Data
角色并使用。Channel
(通道) :Channel
角色保管从Producer
角色来获取Data
角色, 还会响应Consumer
角色的请求, 传递Data
角色。为了确保安全性,Channel
角色会对Producer
角色和Consumer
角色的访问执行互斥处理。当
Producer
角色将Data
角色传递给Channel
角色时, 如果Channel
角色的状态不适合接收Data
角色, 那么Producer
角色将一直等待, 直至Channel
角色的状态变为可以接收为止。当
Consumer
角色从Channel
角色获取Data
角色时, 如果Channel
角色中没有可以传递的Data
角色, 那么Consumer
角色将一直等待, 直至Channel
角色的状态变为可以传递Data
角色为止。当存在多个
Producer
角色 和Consumer
角色时, 为了避免各处理互相影响,Channel
角色需要执行互斥处理。Channel
角色相当于Producer
角色 和Consumer
角色之间, 承担用于传递Data
角色的中转站、通道的任务。
在什么场景下使用 Producer-Consumer
守护安全性的
Channel
角色在该模式下, 承担安全守护责任的是
Channel
角色。Channel
角色执行线程间的互斥处理, 确保Producer
角色正确地将Data
角色传递给Consumer
角色。Producer
角色不可以直接调用Consumer
方法么?直接调用方法
Consumer
角色想要获取Data
角色, 通常都是因为想使用这些Data
角色来执行某些处理。如果Producer
角色直接调用Consumer
角色的方法, 那么执行处理的就不是Consumer
角色的线程, 而是Producer
角色的线程了。这样一来, 执行处理花费的时间必须由
Producer
角色的线程来承担, 准备下一个数据的处理也会相应发生延迟。这样子会使程序的响应性变得很差。插入 Channel 角色
Producer
角色将Data
角色传递给Channel
角色之后, 无需等待Consumer
角色对Data
角色进行处理。可以立即开始准备下一个Data
角色。也就是说,Producer
角色可以持续不断地创建Data
角色。Producer
角色不会受到Consumer
角色的处理进展状况的影响。
Channel 角色的剩余空间所导致的问题
在示例程序中, 桌子上最多可以放置3个蛋糕。当糕点师放置蛋糕时,如果桌子上的蛋糕个数在3个以内, 则可以顺利放置, 但如果是4个及4个以上时, 就必须等待客人取走蛋糕才行。如果客人吃得慢, 糕点师就必须等待很久。
如果桌子上可以放置的蛋糕个数增多会怎么样呢? 这时, 就算客人吃得慢, 糕点师也无需等待, 可以直接制作蛋糕并放到桌子上。桌子上可以放置的蛋糕个数(
buffer
字段的元素个数)是用于缓冲MakerThread
和EaterThread
之间的处理速度差的。当然, 如果客人吃蛋糕的平均速度小于糕点师制作蛋糕的速度, 那么桌子上的蛋糕会逐渐增多, 一段时间后还是会达到
buffer
字段的元素个数上限。如果使用
java.util.LinkedList
类, 那么创建的Channel
角色能储存的实例个数就不会存在上限。但这时, 如果EaterThread
的平均速度较慢, 一段时间之后(或许要过很长一段时间) 内存就会不足, 也就无法再创建表示蛋糕的实例。以什么顺序传递 Data 角色呢
队列 ---- 先接收的先传递
栈 ---- 后接收的先传递
优先队列 ---- "优先"的先传递
"存在中间角色"的意义
线程的协调运行要考虑 "放在中间的东西"
线程的互斥处理要考虑 "应该保护的东西"
Consumer角色只有一个使会怎么样呢
该模式考虑多个
Producer
给多个Consumer
传递数据的情况。这里, 我们来思考一下Consumer
角色只有一个时程序会怎么样。也就是 "多个Producer
角色和一个Consumer
角色"的情况。如果
Consumer
角色有一个, 也就是说处理Channel
角色中储存的Data
角色的线程只有一个, 不需要注意Consumer
角色的线程之间互相影响。如果Consumer
角色有多个, 我们就要注意不能让Consumer
角色的线程之间互相影响。[多生产者单消费者模型类似于事件分发机制]
Read-Write Lock 模式
Read-Write Lock 模式 简介
将读取与写入分开处理,在读取数据之前必须获取用来读取的锁定,而写入的时候必须获取用来写入的锁定。
因为读取时实例的状态不会改变,所以多个线程可以同时读取;
但是,写入会改变实例的状态,所以当有一个线程写入的时候,其它线程既不能读取与不能写入。
示例程序
Data
package code.rocky.readWriteLock;
public class Data {
private final char[] buffer;
private final ReadWriteLock lock = new ReadWriteLock();
public Data(int size) {
this.buffer = new char[size];
for (int i = 0; i < buffer.length; i++) {
buffer[i] = '*';
}
}
public char[] read() throws InterruptedException {
lock.readLock();
try {
return doRead();
} finally {
// 释放用于读取的锁
lock.readUnlock();
}
}
public void write(char c) throws InterruptedException {
lock.writeLock();
try {
doWrite(c);
} finally {
// 确保一定会调用 采用 before/after模式
lock.writeUnlock();
}
}
// 用于创建一个新的 char 数组,来复制 buffer 字段的内容 ,并返回 newbuf
private char[] doRead() {
char[] newbuf = new char[buffer.length];
for (int i = 0; i < buffer.length; i++) {
newbuf[i] = buffer[i];
}
slowly();
return newbuf;
}
// 用于执行实际的写入操作。该方法会以参数传入的字符c来填满 buffer字段。"以传入的字符填满数组" 这个操作并没有什么特别的意义, 只是为了让运行结果容易理解而已。注意: slowly(), 这里假定了 写入操作的时间比读取操作的时间长
private void doWrite(char c) {
for (int i = 0; i < buffer.length; i++) {
buffer[i] = c;
slowly();
}
}
private void slowly() {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
}
}
}
WriteThread
package code.rocky.readWriteLock;
import java.util.Random;
public class WriterThread extends Thread {
private static final Random random = new Random();
private final Data data;
private final String filler;
private int index = 0;
public WriterThread(Data data, String filler) {
this.data = data;
this.filler = filler;
}
public void run() {
try {
while (true) {
char c = nextchar();
data.write(c);
// 0~3000毫秒随机休眠
Thread.sleep(random.nextInt(3000));
}
} catch (InterruptedException e) {
}
}
// 用于获取下一次应该写入的字符
private char nextchar() {
char c = filler.charAt(index);
index++;
if (index >= filler.length()) {
index = 0;
}
return c;
}
}
ReaderThread
package code.rocky.readWriteLock;
public class ReaderThread extends Thread {
private final Data data;
public ReaderThread(Data data) {
this.data = data;
}
public void run() {
try {
long begin = System.currentTimeMillis();
for (int i = 0; i < 20; i++) {
char[] readBuf = data.read();
System.out.println(Thread.currentThread().getName() + " reads " + String.valueOf(readBuf));
}
long time = System.currentTimeMillis() - begin;
System.out.println(Thread.currentThread().getName() + ": time = " + time);
} catch (InterruptedException e) {
}
}
}
ReadWriteLock
package code.rocky.readWriteLock;
public final class ReadWriteLock {
private int readingReaders = 0; // (A)…实际正在读取中的线程个数
private int waitingWriters = 0; // (B)…正在等待写入的线程个数
private int writingWriters = 0; // (C)…实际正在写入中的线程个数
private boolean preferWriter = true; // 若写入优先,则为true
public synchronized void readLock() throws InterruptedException {
while (writingWriters > 0 || (preferWriter && waitingWriters > 0)) {
wait();
}
readingReaders++; // (A) 实际正在读取的线程个数加1
}
public synchronized void readUnlock() {
readingReaders--; // (A) 实际正在读取的线程个数减1
preferWriter = true;
notifyAll();
}
public synchronized void writeLock() throws InterruptedException {
waitingWriters++; // (B) 正在等待写入的线程个数加1
try {
while (readingReaders > 0 || writingWriters > 0) {
wait();
}
} finally {
waitingWriters--; // (B) 正在等待写入的线程个数减1
}
writingWriters++; // (C) 实际正在写入的线程个数加1
}
public synchronized void writeUnlock() {
writingWriters--; // (C) 实际正在写入的线程个数减1
preferWriter = false;
notifyAll();
}
}
Read-Write Lock登场角色
Reader
(读者):Reader
角色对SharedResource
角色执行 read 操作。Writer
(写者):Writer
角色对SharedResource
角色执行 write 操作。SharedResource
(共享资源) :SharedResource
角色表示的是Reader
角色和Write
角色二者共享的资源。SharedResource
角色提供不修改内部状态的操作(read) 和修改内部状态的操作(write)。ReadWriterLock
(读写锁) :ReadWriterLock
角色提供了SharedResource
角色实现 read 操作和 write操作所需的锁。
在什么场景下使用 Read-Write Lock
利用 "读取"操作的线程之间不会冲突的特性来提高程序性能
Read-Write Lock
模式利用了读取操作的线程之间不会冲突的特性。由于读取操作不会修改SharedResource
角色的状态, 所以彼此之间无需执行互斥处理。因此多个Reader
角色可以同时执行read
操作, 从而提高程序性能。性能的提升还需要考虑 "适合读取操作繁重时" 和 "适合读取频率比写入频率高时" 两个大方针。
适合读取操作繁重时
在单纯使用
Single Threaded Execution
模式的情况下, 就算是 read操作, 每次也只能运行一个线程。如果 read 的操作很繁重, 那么使用 Read-Write Lock模式比使用Single Threaded Execution
模式更加合适但是, 因为Read-Write Lock模式的处理比
Single Thread Execution
模式复杂, 所以当 read 的操作很简单(不耗费时间)时,Single Thread Execution
模式反而会更加合适。适合读取频率比写入频率高时
Read-Write Lock模式的优点是 Reader 角色之间不会发生冲突, 但是, 如果写入处理(write) 的频率很高, write 角色便会频繁停止 Reader角色的处理, 这样就无法体现出 Read-Write Lock 模式的优点了。
锁的含义
synchronized
可以用于获取实例的锁。Java的每个实例都持有一个"锁", 但同一个锁不可以由两个以上的线程同时获取。这种结构是 Java 编程规范规定的, Java虚拟机也是这样实现的。这也是Java语言一开始就提供的所谓的物理锁, Java程序并不能改变这种锁的运行。而 "用于读取的锁" 和 "用于写入的锁" 所指的锁与使用
synchronized
获取的锁是不一样的。这并不是 Java编程规范规定的结构, 而是开发人员自己实现的一种结构。逻辑锁
Thread-Per-Message模式
Thread-Per-Message 模式简介
Thread-Per-Message 模式是指每个message一个线程,message可以理解成“消息”、“命令”或者“请求”。
每一个message都会分配一个线程,由这个线程执行工作,使用Thread-Per-Message Pattern时,“委托消息的一端”与“执行消息的一端”回会是不同的线程。
示例程序
Helper
public class Helper {
public void handle(int count, char c) {
System.out.println(" handle(" + count + ", " + c + ") BEGIN");
for (int i = 0; i < count; i++) {
slowly();
System.out.print(c);
}
System.out.println("");
System.out.println(" handle(" + count + ", " + c + ") END");
}
private void slowly() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
Host
public class Host {
private final Helper helper = new Helper();
public void request(final int count, final char c) {
System.out.println(" request(" + count + ", " + c + ") BEGIN");
helper.handle(count, c);
System.out.println(" request(" + count + ", " + c + ") END");
}
}
HostTest
public class HostTest {
public static void main(String[] args) {
System.out.println("main BEGIN");
Host host = new Host();
host.request(10, 'A');
host.request(20, 'B');
host.request(30, 'C');
System.out.println("main END");
}
}
Thread-Per-Message模式中的登场角色
Client (委托人) : Client角色会向 Host 角色发出请求(request), 但是并不知道 Host 角色 是如何实现该请求的。
Host : Host角色收到 Client 角色的请求(request) 之后, 会新创建 并启动一个线程。新创建的线程将使用
Helper
角色来处理请求。Helper (助手): Helper角色 为 Host 角色提供请求处理的功能。Host角色创建的新线程会利用 Helper角色。
在什么场景下使用 Thread-Per-Message模式
提高响应性, 缩短延迟时间
Thread-Per-Message 模式能够提高与 Client 角色 对应的 Host角色的响应性, 降低延迟时间。尤其是当 handle 操作非常耗时, 或者 handle 操作需要等待 输入/输出时, 效果非常明显。
在 Thread-Per-Message 模式下, Host 角色会启动新的线程。由于启动线程也会花费时间, 所以想要提高响应性时, 是否使用 Thread-Per-Message 模型 取决于 "handle操作花费的时间" 和 "线程启动花费的时间" 之间的均衡。
为了缩短线程启动花费的时间, 我们可以使用 worker Thread模式
适用于操作顺序没有要求时
在 Thread-Per-Message模式中, handle 方法并不一定是按 request 方法的调用顺序来执行的。
适用于不需要返回值时
在 Thread-Per-Message 模式中, request 方法并不会等待 handle 方法执行结束。所以 request 得不到 handle 的运行结果。因此, Thread-Per-Message模式适用于不需要获取返回值的情况。
应用于服务器
为了使服务器可以处理多个请求, 我们可以使用 Thread-Per-Message模式。服务器本身的线程接收客户端的请求, 而这些请求的实际处理则交由其他线程来执行, 服务器本身的线程则返回, 去等待客户端的其他请求。
调用方法 + 启动线程 ---> 发送消息
通常情况下, 当调用方法时, 该方法中的所有处理都被执行完之后, 控制权才会返回。Thread-Per-Message 模式的 request 方法也是一个普通方法, 所以当该方法中的处理被执行完毕后, 控制权就会返回。
"request 方法真正想要的操作(显示字符串)执行了吗?" 虽然控制权从 request 返回了, 但这并等于显示 字符串的操作也就执行完了。虽然 request 触发了目标操作的开始(触发器), 但并不等待处理结束。
Worker Thread 模式
什么是 worker thread 模式
这是一个来自工作车间的故事, 在这里, 工人们负责组装塑料模型。
客人会将很多装有塑料模型的箱子带到工作车间来, 然后摆放在桌子上。
工人必须将客户送过来的塑料模型一个一个组装起来。他们会先取回放在桌子上的装有塑料模型的箱子, 然后在阅读了箱子中的说明书后开始组装。当一箱模型组装完成后, 工人们会继续去取下一个箱子。当所有模型全部组装完成后, 工人们会等待新的模型被送过来。
也称为 Thread Pool(线程池) 模式
示例程序
ClientThread
类的线程会向Channel
类发送工作请求(委托) (说是工作, 其实只是显示出委托者的名字和委托编号)
Channel
类的实例雇佣了五个工人线程(WorkerThread
) 进行工作。所有工人线程都在等待工作请求的到来。工作请求到来后, 工人线程会从
Channel
那里获取一项工作请求并开始工作。工作完成后, 工人线程会回到Channel
那里等待下一项工作请求。
Channel
public class Channel {
private static final int MAX_REQUEST = 100;
private final Request[] requestQueue;
private int tail; // 下次putRequest的位置
private int head; // 下次takeRequest的位置
private int count; // Request的数量
private final WorkerThread[] threadPool;
public Channel(int threads) {
this.requestQueue = new Request[MAX_REQUEST];
this.head = 0;
this.tail = 0;
this.count = 0;
threadPool = new WorkerThread[threads];
for (int i = 0; i < threadPool.length; i++) {
threadPool[i] = new WorkerThread("Worker-" + i, this);
}
}
public void startWorkers() {
for (int i = 0; i < threadPool.length; i++) {
threadPool[i].start();
}
}
public synchronized void putRequest(Request request) {
while (count >= requestQueue.length) {
try {
wait();
} catch (InterruptedException e) {
}
}
requestQueue[tail] = request;
tail = (tail + 1) % requestQueue.length;
count++;
notifyAll();
}
public synchronized Request takeRequest() {
while (count <= 0) {
try {
wait();
} catch (InterruptedException e) {
}
}
Request request = requestQueue[head];
head = (head + 1) % requestQueue.length;
count--;
notifyAll();
return request;
}
}
Request
public class Request {
private final String name;
private final int number;
public Request(String name, int number) {
this.name = name;
this.number = number;
}
public void execute() {
System.out.println(Thread.currentThread().getName() + " executes " + this);
}
public String toString() {
return "[ Request from " + name + " No." + number + " ]";
}
}
ClientThread
public class ClientThread extends Thread {
private final Channel channel;
public ClientThread(String name, Channel channel) {
super(name);
this.channel = channel;
}
public void run() {
for (int i = 0; true; i++) {
Request request = new Request(getName(), i);
channel.putRequest(request);
}
}
}
WorkerThread
public class WorkerThread extends Thread {
private final Channel channel;
public WorkerThread(String name, Channel channel) {
super(name);
this.channel = channel;
}
public void run() {
while (true) {
Request request = channel.takeRequest();
request.execute();
}
}
}
ChannelTest
public class ChannelTest {
public static void main(String[] args) {
Channel channel = new Channel(5); // 工人线程的个数
channel.startWorkers();
new ClientThread("Alice", channel).start();
new ClientThread("Bobby", channel).start();
new ClientThread("Chris", channel).start();
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
}
System.exit(0);
}
}
Worker Thread 模式中的登场角色
Client (委托者) : 角色创建表示工作请求的 Request 角色并将其传递给
Channel
角色。Channel (通信线路) :
Channel
角色接收来自于Client
角色的Request
角色, 并将其传递给Worker
角色。Worker (工人) :
Worker
角色从Channel
角色, 并进行工作。当一项工作完成后, 它会继续去获取另外的Request
角色。Request (请求) :
Request
角色是表示工作的角色, 并进行工作。当一项工作完成后, 它会继续去获取另外的Request
角色。
什么场景下使用 Worker Thread模式
提高吞吐量
如果可以将自己的工作交给其他人, 那么自己就可以做下一项工作。线程也是一样的。如果将工作交给其他线程, 自己就可以做下一项工作(Thread-Per-Message)。
由于启动新线程需要花费时间, 所以 Worker Thread 模式的主题之一就是通过轮流地和反复地使用线程来提高吞吐量。
容量控制
Worker Thread模式还有另外一个主题, 那就是可以同时提供的服务的数量, 即容量的控制。
Worker 角色的数量
Worker角色的数量是可以自由地定义的。在示例程序中, 传递给
Channel
的构造函数的参数 threads 即表示这个数值。Worker 角色会创建 threads 个WorkerThread
的实例。Worker角色的数量越多, 可以并发进行的处理也越多。但是, 即使 Worker角色的数量超过了 同时被请求的工作数量,也不会对提高程序处理效率有什么帮助。因为多余的
Worker
角色不但不会工作, 还会占有内存。增加容量就会增加消耗的资源, 所以必须根据程序实际运行的环境来相应地调整Worker
角色的数量。最开始只有几个 Worker 角色
当工作增加时就增加 Worker 角色
但是, 如果增加得太多会导致内存耗尽, 因此到达极限后就不再增加
Worker
角色反之,当工作减少(即等待工作的
Worker
角色增加)时, 就要逐渐减少Worker
角色
Request 角色的数量
Channel
角色中保存着Request
角色。 只要Worker
角色不断地进行工作, 在Channel
角色中保存的Request
角色就不会增加很多。不过, 当接收到的工作的数量超出了Worker
角色的处理能力后,Channel
角色中就会积累很多Request
角色。这时,Client
角色必须等待一段时间才能将Request
角色发送给Channel
角色。如果
Channel
角色可以保存很多Request
角色, 那么就可以填补(缓冲)Client
角色与Worker
角色之间的处理速度差异。
调用与执行的分离
Client
角色负责发送工作请求。它会将工作内容封装为Request
角色, 然后传递给Channel
角色。在普通的方法调用中, 这部分相当于 "设置参数并调用方法"。其中, "设置参数" 与 "创建Request
角色"相对应, 而 "传递给Channel
角色" 大致与 "调用方法"相对应。Worker
角色负责进行工作。它使用从Channel
角色接收到的Request
角色来执行实际的处理。在普通的方法调用中, 这部分相当于 "执行方法"。在进行普通的方法调用时, "调用方法" 和 "执行方法" 是连续进行的。因为调用方法后, 方法会立即执行。在普通的方法调用中, 调用与执行是无法分开的。
在
Worker Thread
模式 和Thread-Per-Message
模式中, 方法的调用和方法的执行是特意被分开的。方法的调用被称为invocation
,方法的执行被称为execution
。因此, 可以说Worker Thread
模式和Thread-Per-Message
模式将方法的调用和执行 分离开了。调用与执行的分离同时也是Command
模式。提高响应速度
如果调用和执行不可分离, 那么当执行需要花费很长时间时, 就会拖调用处理的后腿。但是如果将调用和执行分离, 那么即使执行需要花费很长时间也没有什么关系, 因为执行完调用处理的一方可以先继续执行其他处理, 这样就可以提高响应速度。
控制执行顺序 (调度)
如果调用和执行不可分离, 那么在调用后就必须开始执行。
但是如果将调用和执行分离, 执行就可以不再受调用顺序的制约。我们可以通过设置
Request
角色的优先级, 并控制Channel
角色 将Request
角色传递给Worker
角色的顺序来实现上述处理。可以取消和反复执行
将调用和执行分离后, 还可以实现 "即使调用了也可以取消执行" 这种功能。
由于调用的结果是
Request
角色对象, 所以既可以将Request
角色保存, 又可以反复地执行。通往分布式之路
将调用和执行分离后, 可以将负责调用的计算机与负责执行的计算机分离开来, 然后通过网络将扮演
Request
角色的对象从一台计算机传递至另外一台计算机。
Runnable 接口的意义
java.lang.Runnable
接口有时会被用作Worker Thread
模式中的Request
角色。也就是说, 该模式会创建一个实现了Runnable
接口的类的实例对象(Runnable
对象) 来表示工作内容, 然后将它传递给Channel
角色,让其完成这项工作。多态 Request 角色
在示例程序中,
ClientThread
传递给Channel
的只是Request
的实例。但是,WorkThread
并不知道Request
类的详细信息。WorkerThread
只是单纯的接收Request
的实例, 然后调用它的execute
方法而已。也就是说 , 即使我们编写了一个
Request
类的子类并将它的实例传递给了Channel
,WorkThread
也可以正常地调用execute
方法。用面向对象的术语来说, 就是这里使用了多态性。Request
类是表示工作的类, 编写Request
类的子类相当于增加工作的种类。我们就实现了具有多态的Request
角色。Request
角色中包含了完成工作所必需的全部信息。因此, 即使我们实现了多态的Request
角色并增加了工作的种类, 也无需修改Channel
角色和Worker
角色。这是因为即使工作种类增加了,worker
角色依然只是调用execute
方法而已。独自一人的 Worker 角色
请大家想象一下工人线程(
Worker
角色只有一个的情况。)当工人线程只有一个时, 由于工人线程进行处理的范围变成了单线程, 所以会有互斥处理可以省略的可能性。
Future 模式
什么是 Future 模式
Future 的意思是 未来、期货。假设有一个方法需要花费很长时间才能获取运行结果。那么, 与其一直等待结果, 不如先拿一张 "提货单"。获取提货单并不耗费时间。这里的 "提货单" 我们称为
Future
角色。获取
Future
角色的线程会在稍后使用Future
角色来获取运行结果。这与凭着提货单去取蛋糕非常相似。如果运行结果已经出来了,那么直接领取即可; 如果运行结果还没有出来, 那么需要等待结果出来。
示例程序
Data
public interface Data {
public abstract String getContent();
}
FutureData
public class FutureData extends FutureTask<RealData> implements Data {
public FutureData(Callable<RealData> callable) {
super(callable);
}
public String getContent() {
String string = null;
try {
string = get().getContent();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return string;
}
}
Host
public class Host {
public FutureData request(final int count, final char c) {
System.out.println(" request(" + count + ", " + c + ") BEGIN");
// (1) 创建FutureData的实例
// (向构造函数中传递 Callable<RealData>)
FutureData future = new FutureData(
new Callable<RealData>() {
public RealData call() {
return new RealData(count, c);
}
}
);
// (2) 启动一个新线程,用于创建RealData的实例
new Thread(future).start();
System.out.println(" request(" + count + ", " + c + ") END");
// (3) 返回FutureData的实例
return future;
}
}
RealData
public class RealData implements Data {
private final String content;
public RealData(int count, char c) {
System.out.println(" making RealData(" + count + ", " + c + ") BEGIN");
char[] buffer = new char[count];
for (int i = 0; i < count; i++) {
buffer[i] = c;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
System.out.println(" making RealData(" + count + ", " + c + ") END");
this.content = new String(buffer);
}
public String getContent() {
return content;
}
}
HostTest
public class HostTest {
public static void main(String[] args) {
System.out.println("main BEGIN");
Host host = new Host();
Data data1 = host.request(10, 'A');
Data data2 = host.request(20, 'B');
Data data3 = host.request(30, 'C');
System.out.println("main otherJob BEGIN");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("main otherJob END");
System.out.println("data1 = " + data1.getContent());
System.out.println("data2 = " + data2.getContent());
System.out.println("data3 = " + data3.getContent());
System.out.println("main END");
}
}
Future 模式中的登场角色
Client
:Client
角色向Host
角色发出请求(request) , 并会立即接收到请求的处理结果(返回值) ---VirtualData
角色Host
: Host角色会创建新的线程, 并开始在新线程中创建RealData
角色。同时, 它会将Future
角色返回给Client
角色。VirtualData
(虚拟数据) :VirtualData
角色是让Future
角色与RealData
角色具有一致性的角色。RealData
(真实数据): 表示真实数据的角色。创建该对象需要花费很多时间。Future
:Future
角色是RealData
角色的 "提货单", 由Host
角色传递给Client
角色。Future
角色就是VirtualData
角色。实际上, 当Client
角色操作Future
角色时, 线程会调用wait
方法等待, 直至RealData
角色创建完成。但是, 一旦RealData
角色创建完成, 线程就不会再继续等待。Future
角色会将Client
角色的操作委托给RealData
角色。
什么场景下使用Future模式
吞吐量会提高吗
异步方法调用的 "返回值"
"准备返回值" 和 "使用返回值" 的分离
变种 --- 不让主线程久等的 Future 角色
变种 --- 会发生变化的 Future 角色
谁会在意多线程呢? "可复用性"
回调 与 Future 模式
Two-phase Termination(两阶段终止) 模式
什么是 Two-phase Termination
小孩子在玩玩具时经常会将玩具弄得满房间都是。晚上到了睡觉时间, 妈妈就会对小孩子说: "先收拾房间再睡觉噢", 这时, 小孩子会开始打扫房间。
它是一种先执行完终止处理再终止线程的模式。
示例程序
CountUpThread
package code.rocky.twoPhaseTermination;
public class CountUpThread extends Thread {
// 计数值
private long counter = 0;
// 终止请求
public void shutdownRequest() {
interrupt();
}
// 线程体
public void run() {
try {
while (!isInterrupted()) {
doWork();
}
} catch (InterruptedException e) {
} finally {
doShutdown();
}
}
// 操作
private void doWork() throws InterruptedException {
counter++;
System.out.println("doWork: counter = " + counter);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
// 终止处理
private void doShutdown() {
System.out.println("doShutdown: counter = " + counter);
}
}
CountUpThreadTest
package code.rocky.twoPhaseTermination;
public class CountUpThreadTest {
public static void main(String[] args) {
System.out.println("main: BEGIN");
try {
// 启动线程
CountUpThread t = new CountUpThread();
t.start();
// 稍微间隔一段时间
Thread.sleep(10000);
// 线程的终止请求
System.out.println("main: shutdownRequest");
t.shutdownRequest();
System.out.println("main: join");
// 等待线程终止
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("main: END");
}
}
Two-Phase Termination 模式中的登场角色
TerminationRequester
(终止请求发出者) :TerminationRequester
角色负责向Terminator
角色发出终止请求。Terminator
(终止者) :Terminator
角色负责接收终止请求, 并实际执行终止处理。它提供了表示终止请求的shutdownRequest
方法不需要使用Single Threaded Execution
什么场景下使用 Two-Phase Termination
不能使用
Thread
类的 stop 方法java.lang.Thread
类提供了用于强制终止线程的 stop 方法。但是 stop 是 "不推荐使用的方法",我们不应当使用它。因为如果使用 stop 方法, 实例的安全性就无法确保。使用 stop 方法后, 线程会在抛出
java.lang.ThreadDeath
异常后终止。即使该线程正处于访问临界区的过程中也会终止。仅仅检查标志是不够的
因为当想要终止线程时, 该线程可能正在 sleep。而当线程正在 sleep 时, 即使将
shutdownRequest
标志设置为 true, 线程也不会开始终止处理。等到 sleep 时间过后, 线程可能会在某个时间点开始终止处理, 但是这样程序的响应性就下降了。如果使用interrupt
方法的话, 就可以中断 sleep。另外, 线程当时也可能正在 wait。而当线程正在 wait 时, 即使将
shutdownRequested
标志设为true, 线程也不会从等待队列中出来, 所以我们必须使用 interrupt方法对线程下达中断wait
的指标。仅仅检查中断状态是不够的
调用interrupt方法后,如果线程正在 sleep或是 wait, 那么会抛出
InterruptedExecption
异常,而如果不抛出异常, 线程就会变为中断状态。也就是说, 没有必要特意准备一个新的shutdownRequested
标志。只要捕获InterruptedException
, 使用isInterrupted
方法来检查线程是否处于中断状态不就可以了么在长时间处理前检查终止请求
为了能够在接受到终止请求后立即开始终止处理, 我们应当在执行长时间处理前检查
shutdownRequested
标志或是调用isShutdownRequested
方法。join
方法 和isAlive
方法isAlive
方法来确定指定的线程是否已经终止。如果返回值是true, 则表示该线程还活着; 如果返回值是false, 则表示该线程已经终止。使用java.lang.Thread
的getState
方法也可以获取线程的状态, 不过如果只是检查线程是否已经终止, 使用isAlive
会更好。java.util.concurrent.ExecutorService
接口 与Two-Phase Termination
模式要捕获程序整体的终止时
未捕获的异常的处理器
退出钩子
优雅地终止线程
"线程优雅地执行终止处理, 然后终止运行" 这种状态用英语单词来形容的话, 就是
Graceful(优雅的、高贵的、得体的)
。这种状态相当于工作的结束并不是慌慌张张地放下已经着手的工作不管, 而是在进行必要的整理后才正式终止。安全地终止(安全性)
即使接收到终止请求, 线程也不会立即终止。首先表示是否已经接收到终止请求的
shutdownRequested
标志会被设置为true
。然后, 仅在线程运行至不会破坏对象安全性的位置时, 程序才会开始终止处理。必定会进行终止处理(生存性)
线程在接收到终止请求后, 会中断可以中断的wait, 转入终止处理。为此,
shutdownRequest
方法会调用interrupt
方法。另外, 为了确保在抛出异常后程序也会执行终止处理, 我们使用了 try… finally 语句块。
发出终止请求后尽快进入终止线程(响应性)
线程在接收到终止请求后, 会中断可以中断的sleep, 尽快进入终止处理。为此,
shutdownRequest
方法会调用 interrupt 方法。另外, 在执行长时间处理前需要检查
shutdownRequested
标志。
Thread Specific Storage(线程特有存储) 模式
什么是 Thread Specific Storage
就是“线程独有的存储库”,该模式会对每个线程提供独有的内存空间。
java.lang.ThreadLocal
类提供了该模式的实现,ThreadLocal
的实例是一种集合(collection)架构,该实例管理了很多对象,可以想象成一个保管有大量保险箱的房间。
ThreadLocal
简介
见 我的 ThreadLocal
解读
示例程序
Log
public class Log {
private static PrintWriter writer = null;
// 初始化writer字段
static {
try {
writer = new PrintWriter(new FileWriter("log.txt"));
} catch (IOException e) {
e.printStackTrace();
}
}
// 写日志
public static void println(String s) {
writer.println(s);
}
// 关闭日志
public static void close() {
writer.println("==== End of log ====");
writer.close();
}
}
LogTest
public class LogTest {
public static void main(String[] args) {
System.out.println("BEGIN");
for (int i = 0; i < 10; i++) {
Log.println("main: i = " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
Log.close();
System.out.println("END");
}
}
使用 Thread-Specific-Storage模式 示例程序
ClientThread
public class ClientThread extends Thread {
public ClientThread(String name) {
super(name);
}
public void run() {
System.out.println(getName() + " BEGIN");
for (int i = 0; i < 10; i++) {
Log.println("i = " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
Log.close();
System.out.println(getName() + " END");
}
}
Log
public class Log {
private static final ThreadLocal<TSLog> tsLogCollection = new ThreadLocal<TSLog>();
// 写日志
public static void println(String s) {
getTSLog().println(s);
}
// 关闭日志
public static void close() {
getTSLog().close();
}
// 获取线程特有的日志
private static TSLog getTSLog() {
TSLog tsLog = tsLogCollection.get();
// 如果该线程是第一次调用本方法,就新生成并注册一个日志
if (tsLog == null) {
tsLog = new TSLog(Thread.currentThread().getName() + "-log.txt");
tsLogCollection.set(tsLog);
}
return tsLog;
}
}
TSLog
public class TSLog {
private PrintWriter writer = null;
// 初始化writer字段
public TSLog(String filename) {
try {
writer = new PrintWriter(new FileWriter(filename));
} catch (IOException e) {
e.printStackTrace();
}
}
// 写日志
public void println(String s) {
writer.println(s);
}
// 关闭日志
public void close() {
writer.println("==== End of log ====");
writer.close();
}
}
ClientThreadTest
public class ClientThreadTest {
public static void main(String[] args) {
new ClientThread("Alice").start();
new ClientThread("Bobby").start();
new ClientThread("Chris").start();
}
}
Thread-Specific Storage 模式中的登场角色
Client
(委托者) :Client
角色将处理委托给TSObjectProxy
角色。一个TSObjectProxy
角色会被多个Client
角色使用。TSObjectProxy
(线程特有的对象的代理人):TSObjectProxy
角色会执行多个Client
角色委托给它的处理。首先,TSObjectProxy
角色使用TSObjectCollection
角色获取与Client
角色对应的TSObjet
角色。TSObjectCollection
(线程特有的对象的集合):TSObjectCollection
角色有一张Client
角色与TSObject
角色之间的对应表。 当getTSObject
方法被调用后, 它会去查看对应表, 返回与Client
角色相对应的TSObject
角色。另外, 当setTSObject
方法被调用后, 它会将Client
角色与TSObject
角色之间的键值对应关系设置到对应表中。TSObject
(线程特有对象):TSObject
角色中保存着线程特有的信息。TSObject
角色由TSObjectCollection
角色管理。TSObject
角色的方法只会被单线程调用。
什么场景下使用 Thread-Specific Storage
局部变量与
java.lang.ThreadLocal
类线程本来都是有自己特有的存储空间的, 即用于保存方法的局部变量的栈。方法中定义的局部变量属于该线程特有, 其他线程无法访问它们。但是,这些变量在方法调用结束后就会消失。而
ThreadLocal
则与方法调用无关,它是一个用于为线程分配特有的存储空间的类。保存线程特有的信息的位置
线程特有的信息的"保存位置"有以下两种。
线程外(thread-external)
线程内(thread-internal)
在线程外保存线程特有的信息
TSLOG
的实例都被保存在Log
类中的java.lang.ThreadLocal
的实例中。TreadLocal
的实例就是存储间, 各个线程的储物间内。线程并不会背着储物柜四处走动。像这样, 将线程持有的信息保存在线程外部的方法称为 "线程外"。在线程内保存线程特有的信息
编写一个
Thread
类的子类 --MyThread
。如果在MyThread
中声明字段, 该字段就是线程特有的信息。这就是线程内保存线程特有的信息。
不必担心其他线程访问
Thread-Specific Storage
是"线程特有的存储"的意思。不会被其他线程随意访问 这一特性非常重要。这是因为,在多线程编程中,互斥处理非常重要,但是优雅地执行互斥处理却非常困难。
Thread-Specific Storage
模式为我们提供了一种以线程作为键,让每个线程只能访问它特有的对象的机制。该对象是以线程为单位来保存的,绝对不用担心其他线程会访问该对象。吞吐量的提高很大程度上取决于实现方式
Thread-Specific Storage
模式并没有执行互斥处理。因此,这很容易让人误解为与使用 Single Threaded Execution模式相比, 此时的吞吐量会有所提高。但是, 事实并非一定如此。原因如上文所述, 可能TSObjectCollection
角色中执行了隐藏的互斥处理。此外, 每次通过TSObjectProxy
角色调用方法时, 使用TSObjectCollection
角色都会产生额外的性能开销。Thread-Specific Storage
更看重如下所示的可复用性。不改变结构即可实现程序
没有显式地执行互斥处理, 所以编程时犯错的可能性较小
上下文的危险性
在
Thread-Specific Storage
模式中,TSObjectCollection
角色会自动判断当前的线程。也就是说, 我们没有必要将线程的相关信息通过参数传递给TSObjectCollection
角色。这相当于在程序中引入了上下文。上下文虽然很方便,但是也有一定的危险性。因为开发人员看不到处理中所使用的信息。
基于角色与基于任务
基于角色的方式即在表示线程的实例中保存进行工作所必需的信息(上下文、状态)。这样可以减少和减轻线程之间的交互信息量。一个线程会使用从其他线程接收到的信息来执行处理, 改变自己的内部状态。通常,我们称这样的线程为角色。
基于任务的方式不在线程中保存信息(上下文、状态)。在这种方式下, 这些信息不保存在线程中,而是保存在线程之间交互的实例中。而且, 不仅是数据, 连用于执行请求的方法都定义在其中。像这样在线程之间交互的实例可以称为消息、请求或是命令。这里我们暂且称其为任务。
Active Object(主动对象) 模式
Active Object 的简介
Active 是 "主动的"的意思, 因此 Active Object 就是"主动对象"的意思。所谓"主动的", 一般指 "有自己特有的线程"。因此,举例来说,java 的
java.lang.Thread
类的实例就是一种主动对象。不过,在 Active Object 模式中出场的主动对象可不仅仅 "有自己特有的线程"。它同时还具有可以从外部接收和处理异步消息并根据需要返回处理结果的特征。
Active Object 模式中的主动对象会通过自己特有的线程在合适的时机处理从外部接收到的异步消息。
示例程序
见code-multithread-pattern
Active Object 模式中的登场角色
Client
(委托者) :Client
角色调用ActiveObject
角色的方法来委托处理。它能够调用的只有ActiveObject
角色提供的方法。调用这些方法, (如果ActivationQueue
角色没有满) 程序控制权会立即返回。虽然
Client
角色只知道ActiveObject
角色,但它实际调用的是Proxy
角色。Client
角色在获取处理结果时, 会调用VirtualResult
角色的getResultValue
方法。这里使用了Future
模式。ActiveObject
(主动对象) : Active Object角色定义了主动对象向Client角色提供的接口(API )。ActiveObject
角色定义了主动对象向Client
角色提供的接口(API)。Proxy
(代理人) :Proxy
角色负责将方法调用转换为MethodRequest
角色的对象。转换后的MethodRequest
角色会被传递给Scheduler
角色。Proxy
角色实现了ActiveObject
角色提供的接口(API)。调用
Proxy
角色的方法的就是Client
角色。将方法调用转换为MethodRequest
角色, 并传递给Scheduler
角色的操作都是使用Client
角色的线程进行的。Scheduler
:Scheduler
角色负责将Proxy
角色传递来的MethodRequest
角色传递给ActivationQueue
角色, 以及从ActivationQueue
角色取出并执行MethodRequest
角色这两项工作。Client
角色的线程负责将MethodRequest
传递给ActivationQueue
角色。而从ActivationQueue角色取出并执行MethodRequest角色这项工作则是使用Scheduler角色自己的线程进行的。在Active Object模式中,只有使用 Client 角色和 Scheduler 角色时才会启动新线程。
Scheduler角色会把 MethodRequest角色放入 ActivationQueue 角色或者从ActivationQueue角色取出 MethodRequest 角色。
因此,Scheduler 角色可以判断下次要执行哪个请求。如果想实现请求调度的判断逻辑,可以将它们实现在Scheduler角色中。也正是因为如此,我们才将其命名为Scheduler。
MethodRequest
MethodRequest角色是与来自Client角色的请求对应的角色。MethodRequest定义了负责执行处理的 Servant角色,以及负责设置返回值的Future角色和负责执行请求的方法(everto)
MethodRequest角色为主动对象的接口(API)赋予了对象的表象形式。
ContreteMethodRequest
ConcreteMethodRequest角色是使MethodRequest角色与具体的方法相对应的角色。Active Object角色中定义的每个方法,会有各个类与之对应,比如MethodAlphaRequest。
Servant(仆人)
Servant角色负责实际地处理请求。
调用Servant角色的是Scheduler角色的线程。Scheduler角色会从ActivationQueue角色取出一个MethodRequest角色(实际上是ConcreteMethodRequest角色)并执行它。此时,Scheduler 角色调用的就是Servant角色的方法。
Servant角色实现了ActiveObject角色定义的接口(API )。
Proxy角色会将请求转换为MethodRequest角色,而Servant角色则会实际地执行该请求。Scheduler角色介于Proxy角色和Servant 角色之间,负责管理按照什么顺序执行请求。
ActivationQueue (主动队列)
ActivationQueue角色是保存MethodRequest角色的类。
调用putRequest方法的是Client角色的线程,而调用takeRequest方法的是Scheduler角色的线程。这里使用了Producer-Consumer模式。
VirtualResult(虚拟结果)
VirtualResult角色与Future角色、RealResult角色共同构成了Future模式。
Client角色在获取处理结果时会调用VirtualResult角色(实际上是Future角色)的getResultvalue方法。
Future(期货)
Future角色是 Client角色在获取处理结果时实际调用的角色。当处理结果还没有出来的时候,它会使用Guarded Suspension模式让 Client角色的线程等待结果出来。
RealResult(真实结果)
RealResult角色是表示处理结果的角色。Servant角色会创建一个RealResult角色作为处理结果,然后调用Future角色的setRealResult方法将其设置到Future角色中。
什么场景下 使用 Active Object
到底做了些什么事情
定义了接口( API): 由Active Object 角色定义API
接收异步消息:Proxy 角色将方法调用转换为MethodRequest 角色后保存在ActivationQueue角色中
与Client角色运行于不同的线程:由 Scheduler角色提供线程
执行处理:由Servant角色单线程执行处理
返回返回值:Future角色是返回值的提货单
运用模式时需要考虑问题的粒度
Active Object模式的组成要素众多,是一个非常庞大的模式。因此,在运用该模式时,必须注意问题的粒度。所谓问题的粒度,是指问题的大小,也就是用于解决问题的每个处理到底有多大。
关于并发性
Proxy 角色即使被多个线程调用也没有问题({ concurrent } )
Servant角色只能被一个线程调用( { sequential } )
增加方法
Scheduler 角色的作用
在[POSA2]中,Scheduler 角色如下。首先,各ConcreteMethodRequest 角色会定义guard方法。接着,如果可以执行ConcreteMethodRequest 角色,就让guard方法返回true。仅当guard方法的返回值是true时,Scheduler角色才会调用ConcreteMethodRequest角色的execute方法”。这样,ConcreteMethodRequest角色的守护条件就可以整合在它们各自的方法中了。.
主动对象之间的交互
可以编写多个主动对象,然后让它们之间互相交互。也就是说,Servant角色会调用其他 ActiveObject 角色的方法。
通往分布式-从跨越线程界线变为跨越计算机界线
这个方法运行于哪个线程呢? 在Active Object模式中,“方法的调用”的部分运行于Client角色的线程中,“方法的执行”部分运行于Scheduler角色的线程中。
这里其实也是“调用与执行的分离”: 执行invocation的线程(Client角色)与执行execution 的线程(Scheduler角色)被分离开了。
如果将线程分离开来,那么就可以很容易地将线程运行于的计算机也分离开来,即将执invocation的机器与执行execution的计算机分离开,然后用网络将它们连接起来。那么网络之间互相传输的是什么呢?对,就是 MethodRequest角色和Result角色。
由于方法的调用和设置返回值都已经被转换为了对象这种“有形的东西”,所以可以通过网络交互。这可以说是“从跨越线程界线变为了跨越计算机界线”。
http: / / docs.oracle.com/javase/8/docs/technotes/qguides/rmi/
其他有关的设计模式
看到的一些其他的多线程设计模式, 罗列在下面。欢迎小伙伴们反馈噢~
Promise (承诺)模式
Serial Thread Confinement(串行线程封闭) 模式
Master-Slave(主仆) 模式
Pipeline(流水线) 模式
Half-sync/ Half-async(半同步/半异步)模式
参考文献
参考1: https://blog.csdn.net/weixin_30399797/article/details/95064835
参考4: 多线程与设计模式PDF
参考5: 多线程与设计模式代码仓库地址https://www.ituring.com.cn/book/1812